460 lines
12 KiB
Go
460 lines
12 KiB
Go
|
package execd
|
||
|
|
||
|
import (
|
||
|
"bufio"
|
||
|
"flag"
|
||
|
"fmt"
|
||
|
"os"
|
||
|
"strings"
|
||
|
"testing"
|
||
|
"time"
|
||
|
|
||
|
"github.com/stretchr/testify/require"
|
||
|
|
||
|
"github.com/influxdata/telegraf"
|
||
|
"github.com/influxdata/telegraf/agent"
|
||
|
"github.com/influxdata/telegraf/config"
|
||
|
"github.com/influxdata/telegraf/logger"
|
||
|
"github.com/influxdata/telegraf/metric"
|
||
|
"github.com/influxdata/telegraf/models"
|
||
|
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||
|
"github.com/influxdata/telegraf/plugins/parsers/prometheus"
|
||
|
serializers_influx "github.com/influxdata/telegraf/plugins/serializers/influx"
|
||
|
"github.com/influxdata/telegraf/testutil"
|
||
|
)
|
||
|
|
||
|
func TestSettingConfigWorks(t *testing.T) {
|
||
|
cfg := `
|
||
|
[[inputs.execd]]
|
||
|
command = ["a", "b", "c"]
|
||
|
environment = ["d=e", "f=1"]
|
||
|
restart_delay = "1m"
|
||
|
signal = "SIGHUP"
|
||
|
`
|
||
|
conf := config.NewConfig()
|
||
|
require.NoError(t, conf.LoadConfigData([]byte(cfg), config.EmptySourcePath))
|
||
|
|
||
|
require.Len(t, conf.Inputs, 1)
|
||
|
inp, ok := conf.Inputs[0].Input.(*Execd)
|
||
|
require.True(t, ok)
|
||
|
require.EqualValues(t, []string{"a", "b", "c"}, inp.Command)
|
||
|
require.EqualValues(t, []string{"d=e", "f=1"}, inp.Environment)
|
||
|
require.EqualValues(t, 1*time.Minute, inp.RestartDelay)
|
||
|
require.EqualValues(t, "SIGHUP", inp.Signal)
|
||
|
}
|
||
|
|
||
|
func TestExternalInputWorks(t *testing.T) {
|
||
|
influxParser := models.NewRunningParser(&influx.Parser{}, &models.ParserConfig{})
|
||
|
require.NoError(t, influxParser.Init())
|
||
|
|
||
|
exe, err := os.Executable()
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
e := &Execd{
|
||
|
Command: []string{exe, "-mode", "counter"},
|
||
|
Environment: []string{"PLUGINS_INPUTS_EXECD_MODE=application", "METRIC_NAME=counter"},
|
||
|
RestartDelay: config.Duration(5 * time.Second),
|
||
|
Signal: "STDIN",
|
||
|
Log: testutil.Logger{},
|
||
|
}
|
||
|
e.SetParser(influxParser)
|
||
|
|
||
|
metrics := make(chan telegraf.Metric, 10)
|
||
|
defer close(metrics)
|
||
|
acc := agent.NewAccumulator(&TestMetricMaker{}, metrics)
|
||
|
|
||
|
require.NoError(t, e.Start(acc))
|
||
|
require.NoError(t, e.Gather(acc))
|
||
|
|
||
|
// grab a metric and make sure it's a thing
|
||
|
m := readChanWithTimeout(t, metrics, 10*time.Second)
|
||
|
|
||
|
e.Stop()
|
||
|
|
||
|
require.Equal(t, "counter", m.Name())
|
||
|
val, ok := m.GetField("count")
|
||
|
require.True(t, ok)
|
||
|
require.EqualValues(t, 0, val)
|
||
|
}
|
||
|
|
||
|
func TestParsesLinesContainingNewline(t *testing.T) {
|
||
|
parser := models.NewRunningParser(&influx.Parser{}, &models.ParserConfig{})
|
||
|
require.NoError(t, parser.Init())
|
||
|
|
||
|
metrics := make(chan telegraf.Metric, 10)
|
||
|
defer close(metrics)
|
||
|
acc := agent.NewAccumulator(&TestMetricMaker{}, metrics)
|
||
|
|
||
|
e := &Execd{
|
||
|
RestartDelay: config.Duration(5 * time.Second),
|
||
|
Signal: "STDIN",
|
||
|
acc: acc,
|
||
|
Log: testutil.Logger{},
|
||
|
}
|
||
|
e.SetParser(parser)
|
||
|
|
||
|
cases := []struct {
|
||
|
Name string
|
||
|
Value string
|
||
|
}{
|
||
|
{
|
||
|
Name: "no-newline",
|
||
|
Value: "my message",
|
||
|
}, {
|
||
|
Name: "newline",
|
||
|
Value: "my\nmessage",
|
||
|
},
|
||
|
}
|
||
|
|
||
|
for _, test := range cases {
|
||
|
t.Run(test.Name, func(t *testing.T) {
|
||
|
line := fmt.Sprintf("event message=\"%v\" 1587128639239000000", test.Value)
|
||
|
|
||
|
e.outputReader(strings.NewReader(line))
|
||
|
|
||
|
m := readChanWithTimeout(t, metrics, 1*time.Second)
|
||
|
|
||
|
require.Equal(t, "event", m.Name())
|
||
|
val, ok := m.GetField("message")
|
||
|
require.True(t, ok)
|
||
|
require.Equal(t, test.Value, val)
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestParsesPrometheus(t *testing.T) {
|
||
|
parser := models.NewRunningParser(&prometheus.Parser{}, &models.ParserConfig{})
|
||
|
require.NoError(t, parser.Init())
|
||
|
|
||
|
metrics := make(chan telegraf.Metric, 10)
|
||
|
defer close(metrics)
|
||
|
|
||
|
var acc testutil.Accumulator
|
||
|
|
||
|
e := &Execd{
|
||
|
RestartDelay: config.Duration(5 * time.Second),
|
||
|
Signal: "STDIN",
|
||
|
acc: &acc,
|
||
|
Log: testutil.Logger{},
|
||
|
}
|
||
|
e.SetParser(parser)
|
||
|
|
||
|
lines := `# HELP This is just a test metric.
|
||
|
# TYPE test summary
|
||
|
test{handler="execd",quantile="0.5"} 42.0
|
||
|
`
|
||
|
expected := []telegraf.Metric{
|
||
|
testutil.MustMetric(
|
||
|
"prometheus",
|
||
|
map[string]string{"handler": "execd", "quantile": "0.5"},
|
||
|
map[string]interface{}{"test": float64(42.0)},
|
||
|
time.Unix(0, 0),
|
||
|
),
|
||
|
}
|
||
|
|
||
|
e.outputReader(strings.NewReader(lines))
|
||
|
check := func() bool { return acc.NMetrics() == uint64(len(expected)) }
|
||
|
require.Eventually(t, check, 1*time.Second, 100*time.Millisecond)
|
||
|
actual := acc.GetTelegrafMetrics()
|
||
|
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime())
|
||
|
}
|
||
|
|
||
|
func TestStopOnError(t *testing.T) {
|
||
|
exe, err := os.Executable()
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
plugin := &Execd{
|
||
|
Command: []string{exe, "-mode", "fail"},
|
||
|
Environment: []string{"PLUGINS_INPUTS_EXECD_MODE=application"},
|
||
|
StopOnError: true,
|
||
|
RestartDelay: config.Duration(5 * time.Second),
|
||
|
Log: testutil.Logger{},
|
||
|
}
|
||
|
|
||
|
parser := models.NewRunningParser(&influx.Parser{}, &models.ParserConfig{})
|
||
|
require.NoError(t, parser.Init())
|
||
|
plugin.SetParser(parser)
|
||
|
|
||
|
var acc testutil.Accumulator
|
||
|
require.NoError(t, plugin.Start(&acc))
|
||
|
defer plugin.Stop()
|
||
|
|
||
|
require.Eventually(t, func() bool {
|
||
|
_, running := plugin.process.State()
|
||
|
return !running
|
||
|
}, 3*time.Second, 100*time.Millisecond)
|
||
|
|
||
|
state, running := plugin.process.State()
|
||
|
require.False(t, running)
|
||
|
require.Equal(t, 42, state.ExitCode())
|
||
|
}
|
||
|
|
||
|
func TestStopOnErrorSuccess(t *testing.T) {
|
||
|
exe, err := os.Executable()
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
plugin := &Execd{
|
||
|
Command: []string{exe, "-mode", "success"},
|
||
|
Environment: []string{"PLUGINS_INPUTS_EXECD_MODE=application"},
|
||
|
StopOnError: true,
|
||
|
RestartDelay: config.Duration(100 * time.Millisecond),
|
||
|
Log: testutil.Logger{},
|
||
|
}
|
||
|
|
||
|
parser := models.NewRunningParser(&influx.Parser{}, &models.ParserConfig{})
|
||
|
require.NoError(t, parser.Init())
|
||
|
plugin.SetParser(parser)
|
||
|
|
||
|
var acc testutil.Accumulator
|
||
|
require.NoError(t, plugin.Start(&acc))
|
||
|
defer plugin.Stop()
|
||
|
|
||
|
// Wait for at least two metric as this indicates the process was restarted
|
||
|
require.Eventually(t, func() bool {
|
||
|
return acc.NMetrics() > 1
|
||
|
}, 3*time.Second, 100*time.Millisecond)
|
||
|
}
|
||
|
|
||
|
func TestLoggingNoPrefix(t *testing.T) {
|
||
|
// Use own test as mocking executable
|
||
|
exe, err := os.Executable()
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
// Setup the plugin with a capturing logger
|
||
|
var l testutil.CaptureLogger
|
||
|
plugin := &Execd{
|
||
|
Command: []string{exe, "-mode", "logging"},
|
||
|
Environment: []string{
|
||
|
"PLUGINS_INPUTS_EXECD_MODE=application",
|
||
|
"MESSAGE=this is an error",
|
||
|
},
|
||
|
Signal: "STDIN",
|
||
|
StopOnError: true,
|
||
|
RestartDelay: config.Duration(100 * time.Millisecond),
|
||
|
Log: &l,
|
||
|
}
|
||
|
|
||
|
parser := models.NewRunningParser(&influx.Parser{}, &models.ParserConfig{})
|
||
|
require.NoError(t, parser.Init())
|
||
|
plugin.SetParser(parser)
|
||
|
|
||
|
// Run the plugin and trigger a report
|
||
|
var acc testutil.Accumulator
|
||
|
require.NoError(t, plugin.Start(&acc))
|
||
|
defer plugin.Stop()
|
||
|
require.NoError(t, plugin.Gather(&acc))
|
||
|
plugin.Stop()
|
||
|
|
||
|
// Wait for at least two metric as this indicates the process was restarted
|
||
|
require.Eventually(t, func() bool {
|
||
|
return acc.NMetrics() > 0 && l.NMessages() > 0
|
||
|
}, 3*time.Second, 100*time.Millisecond)
|
||
|
|
||
|
// Check the metric
|
||
|
expected := []telegraf.Metric{
|
||
|
metric.New("test", map[string]string{}, map[string]interface{}{"value": int64(0)}, time.Unix(0, 0)),
|
||
|
}
|
||
|
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
|
||
|
|
||
|
// Check the error message type
|
||
|
expectedLevel := byte(testutil.LevelError)
|
||
|
levels := make(map[byte]int, 0)
|
||
|
for _, m := range l.Messages() {
|
||
|
if strings.HasPrefix(m.Text, "Starting process") || strings.HasSuffix(m.Text, "shut down") {
|
||
|
continue
|
||
|
}
|
||
|
if m.Level != expectedLevel {
|
||
|
t.Logf("received msg %q (%s)", m.Text, string(m.Level))
|
||
|
} else {
|
||
|
require.Equal(t, "stderr: \"this is an error\"", m.Text)
|
||
|
}
|
||
|
levels[m.Level]++
|
||
|
}
|
||
|
require.Equal(t, 1, levels[testutil.LevelError])
|
||
|
require.Len(t, levels, 1)
|
||
|
}
|
||
|
|
||
|
func TestLoggingWithPrefix(t *testing.T) {
|
||
|
// Use own test as mocking executable
|
||
|
exe, err := os.Executable()
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
tests := []struct {
|
||
|
name string
|
||
|
level byte
|
||
|
}{
|
||
|
{"error", testutil.LevelError},
|
||
|
{"warn", testutil.LevelWarn},
|
||
|
{"info", testutil.LevelInfo},
|
||
|
{"debug", testutil.LevelDebug},
|
||
|
{"trace", testutil.LevelTrace},
|
||
|
}
|
||
|
|
||
|
for _, tt := range tests {
|
||
|
t.Run(tt.name, func(t *testing.T) {
|
||
|
// Setup the plugin with a capturing logger
|
||
|
var l testutil.CaptureLogger
|
||
|
plugin := &Execd{
|
||
|
Command: []string{exe, "-mode", "logging"},
|
||
|
Environment: []string{
|
||
|
"PLUGINS_INPUTS_EXECD_MODE=application",
|
||
|
fmt.Sprintf("MESSAGE=%s! a log message", string(tt.level)),
|
||
|
},
|
||
|
Signal: "STDIN",
|
||
|
StopOnError: true,
|
||
|
RestartDelay: config.Duration(100 * time.Millisecond),
|
||
|
Log: &l,
|
||
|
}
|
||
|
|
||
|
parser := models.NewRunningParser(&influx.Parser{}, &models.ParserConfig{})
|
||
|
require.NoError(t, parser.Init())
|
||
|
plugin.SetParser(parser)
|
||
|
|
||
|
// Run the plugin and trigger a report
|
||
|
var acc testutil.Accumulator
|
||
|
require.NoError(t, plugin.Start(&acc))
|
||
|
defer plugin.Stop()
|
||
|
require.NoError(t, plugin.Gather(&acc))
|
||
|
plugin.Stop()
|
||
|
|
||
|
// Wait for at least two metric as this indicates the process was restarted
|
||
|
require.Eventually(t, func() bool {
|
||
|
return acc.NMetrics() > 0 && l.NMessages() > 0
|
||
|
}, 3*time.Second, 100*time.Millisecond)
|
||
|
|
||
|
// Check the metric
|
||
|
expected := []telegraf.Metric{
|
||
|
metric.New("test", map[string]string{}, map[string]interface{}{"value": int64(0)}, time.Unix(0, 0)),
|
||
|
}
|
||
|
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
|
||
|
|
||
|
// Check the error message type
|
||
|
expectedLevel := tt.level
|
||
|
levels := make(map[byte]int, 0)
|
||
|
for _, m := range l.Messages() {
|
||
|
if strings.HasPrefix(m.Text, "Starting process") || strings.HasSuffix(m.Text, "shut down") {
|
||
|
continue
|
||
|
}
|
||
|
if m.Level != expectedLevel {
|
||
|
t.Logf("received msg %q (%s)", m.Text, string(m.Level))
|
||
|
} else {
|
||
|
require.Equal(t, "a log message", m.Text)
|
||
|
}
|
||
|
levels[m.Level]++
|
||
|
}
|
||
|
require.Equal(t, 1, levels[tt.level])
|
||
|
require.Len(t, levels, 1)
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func readChanWithTimeout(t *testing.T, metrics chan telegraf.Metric, timeout time.Duration) telegraf.Metric {
|
||
|
to := time.NewTimer(timeout)
|
||
|
defer to.Stop()
|
||
|
select {
|
||
|
case m := <-metrics:
|
||
|
return m
|
||
|
case <-to.C:
|
||
|
require.Fail(t, "Timeout waiting for metric")
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
type TestMetricMaker struct{}
|
||
|
|
||
|
func (*TestMetricMaker) Name() string {
|
||
|
return "TestPlugin"
|
||
|
}
|
||
|
|
||
|
func (tm *TestMetricMaker) LogName() string {
|
||
|
return tm.Name()
|
||
|
}
|
||
|
|
||
|
func (*TestMetricMaker) MakeMetric(aMetric telegraf.Metric) telegraf.Metric {
|
||
|
return aMetric
|
||
|
}
|
||
|
|
||
|
func (*TestMetricMaker) Log() telegraf.Logger {
|
||
|
return logger.New("TestPlugin", "test", "")
|
||
|
}
|
||
|
|
||
|
func TestMain(m *testing.M) {
|
||
|
var mode string
|
||
|
|
||
|
flag.StringVar(&mode, "mode", "counter", "determines the output when run as mockup program")
|
||
|
flag.Parse()
|
||
|
|
||
|
operationMode := os.Getenv("PLUGINS_INPUTS_EXECD_MODE")
|
||
|
if operationMode != "application" {
|
||
|
// Run the normal test mode
|
||
|
os.Exit(m.Run())
|
||
|
}
|
||
|
|
||
|
// Run as a mock program
|
||
|
switch mode {
|
||
|
case "counter":
|
||
|
if err := runCounterProgram(); err != nil {
|
||
|
os.Exit(1)
|
||
|
}
|
||
|
os.Exit(0)
|
||
|
case "fail":
|
||
|
os.Exit(42)
|
||
|
case "success":
|
||
|
fmt.Println("test value=42i")
|
||
|
os.Exit(0)
|
||
|
case "logging":
|
||
|
if err := runLoggingProgram(); err != nil {
|
||
|
os.Exit(1)
|
||
|
}
|
||
|
os.Exit(0)
|
||
|
}
|
||
|
os.Exit(23)
|
||
|
}
|
||
|
|
||
|
func runCounterProgram() error {
|
||
|
envMetricName := os.Getenv("METRIC_NAME")
|
||
|
serializer := &serializers_influx.Serializer{}
|
||
|
if err := serializer.Init(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
i := 0
|
||
|
scanner := bufio.NewScanner(os.Stdin)
|
||
|
for scanner.Scan() {
|
||
|
m := metric.New(envMetricName,
|
||
|
map[string]string{},
|
||
|
map[string]interface{}{"count": i},
|
||
|
time.Now(),
|
||
|
)
|
||
|
i++
|
||
|
|
||
|
b, err := serializer.Serialize(m)
|
||
|
if err != nil {
|
||
|
fmt.Fprintf(os.Stderr, "ERR %v\n", err)
|
||
|
return err
|
||
|
}
|
||
|
if _, err := fmt.Fprint(os.Stdout, string(b)); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func runLoggingProgram() error {
|
||
|
msg := os.Getenv("MESSAGE")
|
||
|
|
||
|
i := 0
|
||
|
scanner := bufio.NewScanner(os.Stdin)
|
||
|
for scanner.Scan() {
|
||
|
if _, err := fmt.Fprintf(os.Stdout, "test value=%di\n", i); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
if msg != "" {
|
||
|
if _, err := fmt.Fprintln(os.Stderr, msg); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|