512 lines
12 KiB
Go
512 lines
12 KiB
Go
//go:build !windows
|
|
|
|
// TODO: Windows - should be enabled for Windows when super asterisk is fixed on Windows
|
|
// https://github.com/influxdata/telegraf/issues/6248
|
|
|
|
package exec
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/google/go-cmp/cmp"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/config"
|
|
"github.com/influxdata/telegraf/metric"
|
|
"github.com/influxdata/telegraf/plugins/inputs"
|
|
"github.com/influxdata/telegraf/plugins/parsers/csv"
|
|
"github.com/influxdata/telegraf/plugins/parsers/json"
|
|
"github.com/influxdata/telegraf/plugins/parsers/value"
|
|
"github.com/influxdata/telegraf/testutil"
|
|
)
|
|
|
|
const validJSON = `
|
|
{
|
|
"status": "green",
|
|
"num_processes": 82,
|
|
"cpu": {
|
|
"status": "red",
|
|
"nil_status": null,
|
|
"used": 8234,
|
|
"free": 32
|
|
},
|
|
"percent": 0.81,
|
|
"users": [0, 1, 2, 3]
|
|
}`
|
|
|
|
const malformedJSON = `
|
|
{
|
|
"status": "green",
|
|
`
|
|
|
|
type runnerMock struct {
|
|
out []byte
|
|
errout []byte
|
|
err error
|
|
}
|
|
|
|
func (r runnerMock) run(string) (out, errout []byte, err error) {
|
|
return r.out, r.errout, r.err
|
|
}
|
|
|
|
func TestExec(t *testing.T) {
|
|
// Setup parser
|
|
parser := &json.Parser{MetricName: "exec"}
|
|
require.NoError(t, parser.Init())
|
|
|
|
// Setup plugin
|
|
plugin := &Exec{
|
|
Commands: []string{"testcommand arg1"},
|
|
Log: testutil.Logger{},
|
|
}
|
|
plugin.SetParser(parser)
|
|
require.NoError(t, plugin.Init())
|
|
plugin.runner = &runnerMock{out: []byte(validJSON)}
|
|
|
|
// Gather the metrics and check the result
|
|
var acc testutil.Accumulator
|
|
require.NoError(t, acc.GatherError(plugin.Gather))
|
|
|
|
expected := []telegraf.Metric{
|
|
metric.New(
|
|
"exec",
|
|
map[string]string{},
|
|
map[string]interface{}{
|
|
"num_processes": float64(82),
|
|
"cpu_used": float64(8234),
|
|
"cpu_free": float64(32),
|
|
"percent": float64(0.81),
|
|
"users_0": float64(0),
|
|
"users_1": float64(1),
|
|
"users_2": float64(2),
|
|
"users_3": float64(3),
|
|
},
|
|
time.Unix(0, 0),
|
|
),
|
|
}
|
|
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
|
|
}
|
|
|
|
func TestExecMalformed(t *testing.T) {
|
|
// Setup parser
|
|
parser := &json.Parser{MetricName: "exec"}
|
|
require.NoError(t, parser.Init())
|
|
|
|
// Setup plugin
|
|
plugin := &Exec{
|
|
Commands: []string{"badcommand arg1"},
|
|
Log: testutil.Logger{},
|
|
}
|
|
plugin.SetParser(parser)
|
|
require.NoError(t, plugin.Init())
|
|
plugin.runner = &runnerMock{out: []byte(malformedJSON)}
|
|
|
|
// Gather the metrics and check the result
|
|
var acc testutil.Accumulator
|
|
require.ErrorContains(t, acc.GatherError(plugin.Gather), "unexpected end of JSON input")
|
|
require.Empty(t, acc.GetTelegrafMetrics())
|
|
}
|
|
|
|
func TestCommandError(t *testing.T) {
|
|
// Setup parser
|
|
parser := &json.Parser{MetricName: "exec"}
|
|
require.NoError(t, parser.Init())
|
|
|
|
// Setup plugin
|
|
plugin := &Exec{
|
|
Commands: []string{"badcommand"},
|
|
Log: testutil.Logger{},
|
|
}
|
|
plugin.SetParser(parser)
|
|
require.NoError(t, plugin.Init())
|
|
plugin.runner = &runnerMock{err: errors.New("exit status code 1")}
|
|
|
|
// Gather the metrics and check the result
|
|
var acc testutil.Accumulator
|
|
require.ErrorContains(t, acc.GatherError(plugin.Gather), "exit status code 1 for command")
|
|
require.Equal(t, 0, acc.NFields(), "No new points should have been added")
|
|
}
|
|
|
|
func TestCommandIgnoreError(t *testing.T) {
|
|
// Setup parser
|
|
parser := &json.Parser{MetricName: "exec"}
|
|
require.NoError(t, parser.Init())
|
|
|
|
// Setup plugin
|
|
plugin := &Exec{
|
|
Commands: []string{"badcommand"},
|
|
IgnoreError: true,
|
|
Log: testutil.Logger{},
|
|
}
|
|
plugin.SetParser(parser)
|
|
require.NoError(t, plugin.Init())
|
|
plugin.runner = &runnerMock{
|
|
out: []byte(validJSON),
|
|
errout: []byte("error"),
|
|
err: errors.New("exit status code 1"),
|
|
}
|
|
|
|
// Gather the metrics and check the result
|
|
var acc testutil.Accumulator
|
|
require.NoError(t, acc.GatherError(plugin.Gather))
|
|
|
|
expected := []telegraf.Metric{
|
|
metric.New(
|
|
"exec",
|
|
map[string]string{},
|
|
map[string]interface{}{
|
|
"num_processes": float64(82),
|
|
"cpu_used": float64(8234),
|
|
"cpu_free": float64(32),
|
|
"percent": float64(0.81),
|
|
"users_0": float64(0),
|
|
"users_1": float64(1),
|
|
"users_2": float64(2),
|
|
"users_3": float64(3),
|
|
},
|
|
time.Unix(0, 0),
|
|
),
|
|
}
|
|
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
|
|
}
|
|
|
|
func TestExecCommandWithGlob(t *testing.T) {
|
|
// Setup parser
|
|
parser := value.Parser{
|
|
MetricName: "metric",
|
|
DataType: "string",
|
|
}
|
|
require.NoError(t, parser.Init())
|
|
|
|
// Setup plugin
|
|
plugin := &Exec{
|
|
Commands: []string{"/bin/ech* metric_value"},
|
|
Timeout: config.Duration(5 * time.Second),
|
|
Log: testutil.Logger{},
|
|
}
|
|
plugin.SetParser(&parser)
|
|
require.NoError(t, plugin.Init())
|
|
|
|
// Gather the metrics and check the result
|
|
var acc testutil.Accumulator
|
|
require.NoError(t, acc.GatherError(plugin.Gather))
|
|
|
|
expected := []telegraf.Metric{
|
|
metric.New(
|
|
"metric",
|
|
map[string]string{},
|
|
map[string]interface{}{
|
|
"value": "metric_value",
|
|
},
|
|
time.Unix(0, 0),
|
|
),
|
|
}
|
|
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
|
|
}
|
|
|
|
func TestExecCommandWithoutGlob(t *testing.T) {
|
|
// Setup parser
|
|
parser := value.Parser{
|
|
MetricName: "metric",
|
|
DataType: "string",
|
|
}
|
|
require.NoError(t, parser.Init())
|
|
|
|
// Setup plugin
|
|
plugin := &Exec{
|
|
Commands: []string{"/bin/echo metric_value"},
|
|
Timeout: config.Duration(5 * time.Second),
|
|
Log: testutil.Logger{},
|
|
}
|
|
plugin.SetParser(&parser)
|
|
require.NoError(t, plugin.Init())
|
|
|
|
// Gather the metrics and check the result
|
|
var acc testutil.Accumulator
|
|
require.NoError(t, acc.GatherError(plugin.Gather))
|
|
|
|
expected := []telegraf.Metric{
|
|
metric.New(
|
|
"metric",
|
|
map[string]string{},
|
|
map[string]interface{}{
|
|
"value": "metric_value",
|
|
},
|
|
time.Unix(0, 0),
|
|
),
|
|
}
|
|
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
|
|
}
|
|
|
|
func TestExecCommandWithoutGlobAndPath(t *testing.T) {
|
|
// Setup parser
|
|
parser := value.Parser{
|
|
MetricName: "metric",
|
|
DataType: "string",
|
|
}
|
|
require.NoError(t, parser.Init())
|
|
|
|
// Setup plugin
|
|
plugin := &Exec{
|
|
Commands: []string{"echo metric_value"},
|
|
Timeout: config.Duration(5 * time.Second),
|
|
Log: testutil.Logger{},
|
|
}
|
|
plugin.SetParser(&parser)
|
|
require.NoError(t, plugin.Init())
|
|
|
|
// Gather the metrics and check the result
|
|
var acc testutil.Accumulator
|
|
require.NoError(t, acc.GatherError(plugin.Gather))
|
|
|
|
expected := []telegraf.Metric{
|
|
metric.New(
|
|
"metric",
|
|
map[string]string{},
|
|
map[string]interface{}{
|
|
"value": "metric_value",
|
|
},
|
|
time.Unix(0, 0),
|
|
),
|
|
}
|
|
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
|
|
}
|
|
|
|
func TestExecCommandWithEnv(t *testing.T) {
|
|
// Setup parser
|
|
parser := value.Parser{
|
|
MetricName: "metric",
|
|
DataType: "string",
|
|
}
|
|
require.NoError(t, parser.Init())
|
|
|
|
// Setup plugin
|
|
plugin := &Exec{
|
|
Commands: []string{"/bin/sh -c 'echo ${METRIC_NAME}'"},
|
|
Environment: []string{"METRIC_NAME=metric_value"},
|
|
Timeout: config.Duration(5 * time.Second),
|
|
Log: testutil.Logger{},
|
|
}
|
|
plugin.SetParser(&parser)
|
|
require.NoError(t, plugin.Init())
|
|
|
|
// Gather the metrics and check the result
|
|
var acc testutil.Accumulator
|
|
require.NoError(t, acc.GatherError(plugin.Gather))
|
|
|
|
expected := []telegraf.Metric{
|
|
metric.New(
|
|
"metric",
|
|
map[string]string{},
|
|
map[string]interface{}{
|
|
"value": "metric_value",
|
|
},
|
|
time.Unix(0, 0),
|
|
),
|
|
}
|
|
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
|
|
}
|
|
|
|
func TestTruncate(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
bufF func() *bytes.Buffer
|
|
expected string
|
|
}{
|
|
{
|
|
name: "should not truncate",
|
|
bufF: func() *bytes.Buffer {
|
|
return bytes.NewBufferString("hello world")
|
|
},
|
|
expected: "hello world",
|
|
},
|
|
{
|
|
name: "should truncate up to the new line",
|
|
bufF: func() *bytes.Buffer {
|
|
return bytes.NewBufferString("hello world\nand all the people")
|
|
},
|
|
expected: "hello world...",
|
|
},
|
|
{
|
|
name: "should truncate to the maxStderrBytes",
|
|
bufF: func() *bytes.Buffer {
|
|
var b bytes.Buffer
|
|
for i := 0; i < 2*maxStderrBytes; i++ {
|
|
b.WriteByte('b')
|
|
}
|
|
return &b
|
|
},
|
|
expected: strings.Repeat("b", maxStderrBytes) + "...",
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
buf := tt.bufF()
|
|
truncate(buf)
|
|
require.Equal(t, tt.expected, buf.String())
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestCSVBehavior(t *testing.T) {
|
|
// Setup the CSV parser
|
|
parser := &csv.Parser{
|
|
MetricName: "exec",
|
|
HeaderRowCount: 1,
|
|
ResetMode: "always",
|
|
}
|
|
require.NoError(t, parser.Init())
|
|
|
|
// Setup the plugin
|
|
plugin := &Exec{
|
|
Commands: []string{"echo \"a,b\n1,2\n3,4\""},
|
|
Timeout: config.Duration(5 * time.Second),
|
|
Log: testutil.Logger{},
|
|
}
|
|
plugin.SetParser(parser)
|
|
require.NoError(t, plugin.Init())
|
|
|
|
expected := []telegraf.Metric{
|
|
metric.New(
|
|
"exec",
|
|
map[string]string{},
|
|
map[string]interface{}{
|
|
"a": int64(1),
|
|
"b": int64(2),
|
|
},
|
|
time.Unix(0, 1),
|
|
),
|
|
metric.New(
|
|
"exec",
|
|
map[string]string{},
|
|
map[string]interface{}{
|
|
"a": int64(3),
|
|
"b": int64(4),
|
|
},
|
|
time.Unix(0, 2),
|
|
),
|
|
metric.New(
|
|
"exec",
|
|
map[string]string{},
|
|
map[string]interface{}{
|
|
"a": int64(1),
|
|
"b": int64(2),
|
|
},
|
|
time.Unix(0, 3),
|
|
),
|
|
metric.New(
|
|
"exec",
|
|
map[string]string{},
|
|
map[string]interface{}{
|
|
"a": int64(3),
|
|
"b": int64(4),
|
|
},
|
|
time.Unix(0, 4),
|
|
),
|
|
}
|
|
|
|
var acc testutil.Accumulator
|
|
// Run gather once
|
|
require.NoError(t, plugin.Gather(&acc))
|
|
// Run gather a second time
|
|
require.NoError(t, plugin.Gather(&acc))
|
|
require.Eventuallyf(t, func() bool {
|
|
acc.Lock()
|
|
defer acc.Unlock()
|
|
return acc.NMetrics() >= uint64(len(expected))
|
|
}, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics())
|
|
|
|
// Check the result
|
|
options := []cmp.Option{
|
|
testutil.SortMetrics(),
|
|
testutil.IgnoreTime(),
|
|
}
|
|
actual := acc.GetTelegrafMetrics()
|
|
testutil.RequireMetricsEqual(t, expected, actual, options...)
|
|
}
|
|
|
|
func TestCases(t *testing.T) {
|
|
// Register the plugin
|
|
inputs.Add("exec", func() telegraf.Input {
|
|
return &Exec{
|
|
Timeout: config.Duration(5 * time.Second),
|
|
Log: testutil.Logger{},
|
|
}
|
|
})
|
|
|
|
// Setup the plugin
|
|
cfg := config.NewConfig()
|
|
require.NoError(t, cfg.LoadConfigData([]byte(`
|
|
[[inputs.exec]]
|
|
commands = [ "echo \"a,b\n1,2\n3,4\"" ]
|
|
data_format = "csv"
|
|
csv_header_row_count = 1
|
|
`), config.EmptySourcePath))
|
|
require.Len(t, cfg.Inputs, 1)
|
|
plugin := cfg.Inputs[0]
|
|
require.NoError(t, plugin.Init())
|
|
|
|
expected := []telegraf.Metric{
|
|
metric.New(
|
|
"exec",
|
|
map[string]string{},
|
|
map[string]interface{}{
|
|
"a": int64(1),
|
|
"b": int64(2),
|
|
},
|
|
time.Unix(0, 1),
|
|
),
|
|
metric.New(
|
|
"exec",
|
|
map[string]string{},
|
|
map[string]interface{}{
|
|
"a": int64(3),
|
|
"b": int64(4),
|
|
},
|
|
time.Unix(0, 2),
|
|
),
|
|
metric.New(
|
|
"exec",
|
|
map[string]string{},
|
|
map[string]interface{}{
|
|
"a": int64(1),
|
|
"b": int64(2),
|
|
},
|
|
time.Unix(0, 3),
|
|
),
|
|
metric.New(
|
|
"exec",
|
|
map[string]string{},
|
|
map[string]interface{}{
|
|
"a": int64(3),
|
|
"b": int64(4),
|
|
},
|
|
time.Unix(0, 4),
|
|
),
|
|
}
|
|
|
|
var acc testutil.Accumulator
|
|
// Run gather once
|
|
require.NoError(t, plugin.Gather(&acc))
|
|
// Run gather a second time
|
|
require.NoError(t, plugin.Gather(&acc))
|
|
require.Eventuallyf(t, func() bool {
|
|
acc.Lock()
|
|
defer acc.Unlock()
|
|
return acc.NMetrics() >= uint64(len(expected))
|
|
}, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics())
|
|
|
|
// Check the result
|
|
options := []cmp.Option{
|
|
testutil.SortMetrics(),
|
|
testutil.IgnoreTime(),
|
|
}
|
|
actual := acc.GetTelegrafMetrics()
|
|
testutil.RequireMetricsEqual(t, expected, actual, options...)
|
|
}
|