package tail import ( "os" "path/filepath" "runtime" "strings" "testing" "time" "github.com/google/go-cmp/cmp" "github.com/influxdata/tail" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/parsers/csv" "github.com/influxdata/telegraf/plugins/parsers/grok" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/testutil" ) func newInfluxParser() (telegraf.Parser, error) { parser := &influx.Parser{} if err := parser.Init(); err != nil { return nil, err } return parser, nil } func newTestTail() *Tail { offsetsMutex.Lock() offsetsCopy := make(map[string]int64, len(offsets)) for k, v := range offsets { offsetsCopy[k] = v } offsetsMutex.Unlock() watchMethod := "inotify" if runtime.GOOS == "windows" { watchMethod = "poll" } return &Tail{ MaxUndeliveredLines: 1000, offsets: offsetsCopy, WatchMethod: watchMethod, PathTag: "path", } } func TestTailBadLine(t *testing.T) { content := ` cpu mytag= foo usage_idle= 100 cpu usage_idle=100 ` tmpfile := filepath.Join(t.TempDir(), "input.csv") require.NoError(t, os.WriteFile(tmpfile, []byte(content), 0600)) logger := &testutil.CaptureLogger{} tt := newTestTail() tt.Log = logger tt.InitialReadOffset = "beginning" tt.Files = []string{tmpfile} tt.SetParserFunc(newInfluxParser) require.NoError(t, tt.Init()) var acc testutil.Accumulator require.NoError(t, tt.Start(&acc)) require.NoError(t, acc.GatherError(tt.Gather)) acc.Wait(1) tt.Stop() require.Len(t, logger.Errors(), 1) require.Contains(t, logger.Errors()[0], "Malformed log line") } func TestColoredLine(t *testing.T) { content := "cpu usage_idle=\033[4A\033[4A100\ncpu2 usage_idle=200\n" tmpfile := filepath.Join(t.TempDir(), "input.csv") require.NoError(t, os.WriteFile(tmpfile, []byte(content), 0600)) tt := newTestTail() tt.Log = testutil.Logger{} tt.InitialReadOffset = "beginning" tt.Filters = []string{"ansi_color"} tt.Files = []string{tmpfile} tt.SetParserFunc(newInfluxParser) require.NoError(t, tt.Init()) var acc testutil.Accumulator require.NoError(t, tt.Start(&acc)) defer tt.Stop() require.NoError(t, acc.GatherError(tt.Gather)) acc.Wait(2) acc.AssertContainsFields(t, "cpu", map[string]interface{}{ "usage_idle": float64(100), }) acc.AssertContainsFields(t, "cpu2", map[string]interface{}{ "usage_idle": float64(200), }) } func TestTailDosLineEndings(t *testing.T) { content := "cpu usage_idle=100\r\ncpu2 usage_idle=200\r\n" tmpfile := filepath.Join(t.TempDir(), "input.csv") require.NoError(t, os.WriteFile(tmpfile, []byte(content), 0600)) tt := newTestTail() tt.Log = testutil.Logger{} tt.InitialReadOffset = "beginning" tt.Files = []string{tmpfile} tt.SetParserFunc(newInfluxParser) require.NoError(t, tt.Init()) var acc testutil.Accumulator require.NoError(t, tt.Start(&acc)) defer tt.Stop() require.NoError(t, acc.GatherError(tt.Gather)) require.Eventually(t, func() bool { return acc.NMetrics() >= 2 }, time.Second, 100*time.Millisecond, "Did not receive 2 expected metrics") acc.AssertContainsFields(t, "cpu", map[string]interface{}{ "usage_idle": float64(100), }) acc.AssertContainsFields(t, "cpu2", map[string]interface{}{ "usage_idle": float64(200), }) } func TestGrokParseLogFilesWithMultiline(t *testing.T) { // we make sure the timeout won't kick in d, err := time.ParseDuration("100s") require.NoError(t, err) duration := config.Duration(d) tt := newTail() tt.Log = testutil.Logger{} tt.InitialReadOffset = "beginning" tt.Files = []string{filepath.Join("testdata", "test_multiline.log")} tt.MultilineConfig = multilineConfig{ Pattern: `^[^\[]`, MatchWhichLine: previous, InvertMatch: false, Timeout: &duration, } tt.SetParserFunc(createGrokParser) require.NoError(t, tt.Init()) var acc testutil.Accumulator require.NoError(t, tt.Start(&acc)) defer tt.Stop() require.Eventually(t, func() bool { return acc.NMetrics() >= 3 }, time.Second, 100*time.Millisecond, "Did not receive expected metrics count") expectedPath := filepath.Join("testdata", "test_multiline.log") acc.AssertContainsTaggedFields(t, "tail_grok", map[string]interface{}{ "message": "HelloExample: This is debug", }, map[string]string{ "path": expectedPath, "loglevel": "DEBUG", }) acc.AssertContainsTaggedFields(t, "tail_grok", map[string]interface{}{ "message": "HelloExample: This is info", }, map[string]string{ "path": expectedPath, "loglevel": "INFO", }) acc.AssertContainsTaggedFields(t, "tail_grok", map[string]interface{}{ "message": "HelloExample: Sorry, something wrong! java.lang.ArithmeticException: / by zero\t" + "at com.foo.HelloExample2.divide(HelloExample2.java:24)\tat com.foo.HelloExample2.main(HelloExample2.java:14)", }, map[string]string{ "path": expectedPath, "loglevel": "ERROR", }) require.Equal(t, uint64(3), acc.NMetrics()) } func TestGrokParseLogFilesWithMultilineTimeout(t *testing.T) { tmpfile, err := os.CreateTemp(t.TempDir(), "") require.NoError(t, err) defer tmpfile.Close() // This seems necessary in order to get the test to read the following lines. _, err = tmpfile.WriteString("[04/Jun/2016:12:41:48 +0100] INFO HelloExample: This is fluff\r\n") require.NoError(t, err) require.NoError(t, tmpfile.Sync()) // set tight timeout for tests d := 10 * time.Millisecond duration := config.Duration(d) tt := newTail() tt.Log = testutil.Logger{} tt.InitialReadOffset = "beginning" tt.Files = []string{tmpfile.Name()} tt.MultilineConfig = multilineConfig{ Pattern: `^[^\[]`, MatchWhichLine: previous, InvertMatch: false, Timeout: &duration, } tt.SetParserFunc(createGrokParser) require.NoError(t, tt.Init()) var acc testutil.Accumulator require.NoError(t, tt.Start(&acc)) time.Sleep(11 * time.Millisecond) // will force timeout _, err = tmpfile.WriteString("[04/Jun/2016:12:41:48 +0100] INFO HelloExample: This is info\r\n") require.NoError(t, err) require.NoError(t, tmpfile.Sync()) require.Eventually(t, func() bool { return acc.NMetrics() >= 2 }, 100*time.Millisecond, 10*time.Millisecond, "Did not receive expected metrics count after first write") time.Sleep(11 * time.Millisecond) // will force timeout _, err = tmpfile.WriteString("[04/Jun/2016:12:41:48 +0100] WARN HelloExample: This is warn\r\n") require.NoError(t, err) require.NoError(t, tmpfile.Sync()) require.Eventually(t, func() bool { return acc.NMetrics() >= 3 }, 100*time.Millisecond, 10*time.Millisecond, "Did not receive expected metrics count after second write") tt.Stop() require.Equal(t, uint64(3), acc.NMetrics()) expectedPath := tmpfile.Name() acc.AssertContainsTaggedFields(t, "tail_grok", map[string]interface{}{ "message": "HelloExample: This is info", }, map[string]string{ "path": expectedPath, "loglevel": "INFO", }) acc.AssertContainsTaggedFields(t, "tail_grok", map[string]interface{}{ "message": "HelloExample: This is warn", }, map[string]string{ "path": expectedPath, "loglevel": "WARN", }) } func TestGrokParseLogFilesWithMultilineTailerCloseFlushesMultilineBuffer(t *testing.T) { // we make sure the timeout won't kick in duration := config.Duration(100 * time.Second) tt := newTestTail() tt.Log = testutil.Logger{} tt.InitialReadOffset = "beginning" tt.Files = []string{filepath.Join("testdata", "test_multiline.log")} tt.MultilineConfig = multilineConfig{ Pattern: `^[^\[]`, MatchWhichLine: previous, InvertMatch: false, Timeout: &duration, } tt.SetParserFunc(createGrokParser) require.NoError(t, tt.Init()) var acc testutil.Accumulator require.NoError(t, tt.Start(&acc)) // Wait for the initial metrics require.Eventually(t, func() bool { return acc.NMetrics() >= 3 }, time.Second, 100*time.Millisecond, "Did not receive initial 3 metrics") // Close tailer, so multiline buffer is flushed tt.Stop() // Wait for the additional metric after flush require.Eventually(t, func() bool { return acc.NMetrics() >= 4 }, time.Second, 100*time.Millisecond, "Did not receive additional metric after flushing multiline buffer") expectedPath := filepath.Join("testdata", "test_multiline.log") acc.AssertContainsTaggedFields(t, "tail_grok", map[string]interface{}{ "message": "HelloExample: This is warn", }, map[string]string{ "path": expectedPath, "loglevel": "WARN", }) } func createGrokParser() (telegraf.Parser, error) { parser := &grok.Parser{ Measurement: "tail_grok", Patterns: []string{"%{TEST_LOG_MULTILINE}"}, CustomPatternFiles: []string{filepath.Join("testdata", "test-patterns")}, Log: testutil.Logger{}, } err := parser.Init() return parser, err } // The csv parser should only parse the header line once per file. func TestCSVHeadersParsedOnce(t *testing.T) { content := ` measurement,time_idle cpu,42 cpu,42 ` tmpfile := filepath.Join(t.TempDir(), "input.csv") require.NoError(t, os.WriteFile(tmpfile, []byte(content), 0600)) plugin := newTestTail() plugin.Log = testutil.Logger{} plugin.InitialReadOffset = "beginning" plugin.Files = []string{tmpfile} plugin.SetParserFunc(func() (telegraf.Parser, error) { parser := csv.Parser{ MeasurementColumn: "measurement", HeaderRowCount: 1, TimeFunc: func() time.Time { return time.Unix(0, 0) }, } err := parser.Init() return &parser, err }) require.NoError(t, plugin.Init()) expected := []telegraf.Metric{ testutil.MustMetric("cpu", map[string]string{ "path": tmpfile, }, map[string]interface{}{ "time_idle": 42, }, time.Unix(0, 0)), testutil.MustMetric("cpu", map[string]string{ "path": tmpfile, }, map[string]interface{}{ "time_idle": 42, }, time.Unix(0, 0)), } var acc testutil.Accumulator require.NoError(t, plugin.Start(&acc)) defer plugin.Stop() require.NoError(t, plugin.Gather(&acc)) require.Eventuallyf(t, func() bool { return acc.NMetrics() >= uint64(len(expected)) }, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics()) testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) } func TestCSVMultiHeaderWithSkipRowANDColumn(t *testing.T) { content := `garbage nonsense skip,measurement,value row,1,2 skip1,cpu,42 skip2,mem,100 ` tmpfile := filepath.Join(t.TempDir(), "input.csv") require.NoError(t, os.WriteFile(tmpfile, []byte(content), 0600)) expected := []telegraf.Metric{ testutil.MustMetric("cpu", map[string]string{ "path": tmpfile, }, map[string]interface{}{ "value2": 42, }, time.Unix(0, 0)), testutil.MustMetric("mem", map[string]string{ "path": tmpfile, }, map[string]interface{}{ "value2": 100, }, time.Unix(0, 0)), } plugin := newTestTail() plugin.Log = testutil.Logger{} plugin.InitialReadOffset = "beginning" plugin.Files = []string{tmpfile} plugin.SetParserFunc(func() (telegraf.Parser, error) { parser := csv.Parser{ MeasurementColumn: "measurement1", HeaderRowCount: 2, SkipRows: 1, SkipColumns: 1, TimeFunc: func() time.Time { return time.Unix(0, 0) }, } err := parser.Init() return &parser, err }) require.NoError(t, plugin.Init()) var acc testutil.Accumulator require.NoError(t, plugin.Start(&acc)) defer plugin.Stop() require.NoError(t, plugin.Gather(&acc)) require.Eventuallyf(t, func() bool { return acc.NMetrics() >= uint64(len(expected)) }, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics()) plugin.Stop() testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) } // Ensure that the first line can produce multiple metrics (#6138) func TestMultipleMetricsOnFirstLine(t *testing.T) { content := ` [{"time_idle": 42}, {"time_idle": 42}] ` tmpfile := filepath.Join(t.TempDir(), "input.csv") require.NoError(t, os.WriteFile(tmpfile, []byte(content), 0600)) expected := []telegraf.Metric{ testutil.MustMetric("cpu", map[string]string{ "customPathTagMyFile": tmpfile, }, map[string]interface{}{ "time_idle": 42.0, }, time.Unix(0, 0)), testutil.MustMetric("cpu", map[string]string{ "customPathTagMyFile": tmpfile, }, map[string]interface{}{ "time_idle": 42.0, }, time.Unix(0, 0)), } plugin := newTestTail() plugin.Log = testutil.Logger{} plugin.InitialReadOffset = "beginning" plugin.Files = []string{tmpfile} plugin.PathTag = "customPathTagMyFile" plugin.SetParserFunc(func() (telegraf.Parser, error) { p := &json.Parser{MetricName: "cpu"} err := p.Init() return p, err }) require.NoError(t, plugin.Init()) var acc testutil.Accumulator require.NoError(t, plugin.Start(&acc)) defer plugin.Stop() require.NoError(t, plugin.Gather(&acc)) require.Eventuallyf(t, func() bool { return acc.NMetrics() >= uint64(len(expected)) }, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics()) plugin.Stop() testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) } func TestCharacterEncoding(t *testing.T) { full := []telegraf.Metric{ testutil.MustMetric("cpu", map[string]string{ "cpu": "cpu0", }, map[string]interface{}{ "usage_active": 11.9, }, time.Unix(0, 0), ), testutil.MustMetric("cpu", map[string]string{ "cpu": "cpu1", }, map[string]interface{}{ "usage_active": 26.0, }, time.Unix(0, 0), ), testutil.MustMetric("cpu", map[string]string{ "cpu": "cpu2", }, map[string]interface{}{ "usage_active": 14.0, }, time.Unix(0, 0), ), testutil.MustMetric("cpu", map[string]string{ "cpu": "cpu3", }, map[string]interface{}{ "usage_active": 20.4, }, time.Unix(0, 0), ), testutil.MustMetric("cpu", map[string]string{ "cpu": "cpu-total", }, map[string]interface{}{ "usage_active": 18.4, }, time.Unix(0, 0), ), } watchMethod := "inotify" if runtime.GOOS == "windows" { watchMethod = "poll" } tests := []struct { name string testfiles string initialReadOffset string characterEncoding string offset int64 expected []telegraf.Metric }{ { name: "utf-8", testfiles: "cpu-utf-8.influx", initialReadOffset: "beginning", characterEncoding: "utf-8", expected: full, }, { name: "utf-8 seek", testfiles: "cpu-utf-8.influx", characterEncoding: "utf-8", offset: 0x33, expected: full[1:], }, { name: "utf-16le", testfiles: "cpu-utf-16le.influx", initialReadOffset: "beginning", characterEncoding: "utf-16le", expected: full, }, { name: "utf-16le seek", testfiles: "cpu-utf-16le.influx", characterEncoding: "utf-16le", offset: 0x68, expected: full[1:], }, { name: "utf-16be", testfiles: "cpu-utf-16be.influx", initialReadOffset: "beginning", characterEncoding: "utf-16be", expected: full, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { plugin := &Tail{ Files: []string{filepath.Join("testdata", tt.testfiles)}, InitialReadOffset: tt.initialReadOffset, MaxUndeliveredLines: 1000, Log: testutil.Logger{}, CharacterEncoding: tt.characterEncoding, WatchMethod: watchMethod, } plugin.SetParserFunc(newInfluxParser) require.NoError(t, plugin.Init()) if tt.offset != 0 { plugin.offsets = map[string]int64{ plugin.Files[0]: tt.offset, } } var acc testutil.Accumulator require.NoError(t, plugin.Start(&acc)) require.Eventuallyf(t, func() bool { return acc.NMetrics() >= uint64(len(tt.expected)) }, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(tt.expected), acc.NMetrics()) plugin.Stop() actual := acc.GetTelegrafMetrics() for _, m := range actual { m.RemoveTag("path") } testutil.RequireMetricsEqual(t, tt.expected, actual, testutil.IgnoreTime()) }) } } func TestTailEOF(t *testing.T) { tmpfile, err := os.CreateTemp(t.TempDir(), "") require.NoError(t, err) defer tmpfile.Close() _, err = tmpfile.WriteString("cpu usage_idle=100\r\n") require.NoError(t, err) require.NoError(t, tmpfile.Sync()) tt := newTestTail() tt.Log = testutil.Logger{} tt.InitialReadOffset = "beginning" tt.Files = []string{tmpfile.Name()} tt.SetParserFunc(newInfluxParser) require.NoError(t, tt.Init()) var acc testutil.Accumulator require.NoError(t, tt.Start(&acc)) defer tt.Stop() require.NoError(t, acc.GatherError(tt.Gather)) // Wait for initial metric require.Eventually(t, func() bool { require.NoError(t, acc.GatherError(tt.Gather)) return acc.NMetrics() >= 1 }, time.Second, 100*time.Millisecond, "Did not receive initial metric") _, err = tmpfile.WriteString("cpu2 usage_idle=200\r\n") require.NoError(t, err) require.NoError(t, tmpfile.Sync()) // Wait for second metric require.Eventually(t, func() bool { require.NoError(t, acc.GatherError(tt.Gather)) return acc.NMetrics() >= 2 }, time.Second, 100*time.Millisecond, "Did not receive second metric") acc.AssertContainsFields(t, "cpu", map[string]interface{}{ "usage_idle": float64(100), }) acc.AssertContainsFields(t, "cpu2", map[string]interface{}{ "usage_idle": float64(200), }) require.NoError(t, tmpfile.Close()) } func TestCSVBehavior(t *testing.T) { // Prepare the input file input, err := os.CreateTemp(t.TempDir(), "") require.NoError(t, err) defer input.Close() // Write header _, err = input.WriteString("a,b\n") require.NoError(t, err) require.NoError(t, input.Sync()) // Setup the CSV parser creator function parserFunc := func() (telegraf.Parser, error) { parser := &csv.Parser{ MetricName: "tail", HeaderRowCount: 1, } err := parser.Init() return parser, err } // Setup the plugin plugin := &Tail{ Files: []string{input.Name()}, InitialReadOffset: "beginning", MaxUndeliveredLines: 1000, offsets: make(map[string]int64, 0), PathTag: "path", Log: testutil.Logger{}, } plugin.SetParserFunc(parserFunc) require.NoError(t, plugin.Init()) expected := []telegraf.Metric{ metric.New( "tail", map[string]string{ "path": input.Name(), }, map[string]interface{}{ "a": int64(1), "b": int64(2), }, time.Unix(0, 0), ), metric.New( "tail", map[string]string{ "path": input.Name(), }, map[string]interface{}{ "a": int64(3), "b": int64(4), }, time.Unix(0, 0), ), } var acc testutil.Accumulator require.NoError(t, plugin.Start(&acc)) defer plugin.Stop() // Write the first line of data _, err = input.WriteString("1,2\n") require.NoError(t, err) require.NoError(t, input.Sync()) require.NoError(t, plugin.Gather(&acc)) // Write another line of data _, err = input.WriteString("3,4\n") require.NoError(t, err) require.NoError(t, input.Sync()) require.NoError(t, plugin.Gather(&acc)) require.Eventuallyf(t, func() bool { return acc.NMetrics() >= uint64(len(expected)) }, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics()) // Check the result options := []cmp.Option{ testutil.SortMetrics(), testutil.IgnoreTime(), } actual := acc.GetTelegrafMetrics() testutil.RequireMetricsEqual(t, expected, actual, options...) // Close the input file require.NoError(t, input.Close()) } func TestStatePersistence(t *testing.T) { // Prepare the input file lines := []string{ "metric,tag=value foo=1i 1730478201000000000\n", "metric,tag=value foo=2i 1730478211000000000\n", "metric,tag=value foo=3i 1730478221000000000\n", } content := []byte(strings.Join(lines, "")) inputFilename := filepath.Join(t.TempDir(), "input.influx") require.NoError(t, os.WriteFile(inputFilename, content, 0600)) // Define the metrics and state to skip the first metric state := map[string]int64{inputFilename: int64(len(lines[0]))} expectedState := map[string]int64{inputFilename: int64(len(content))} expected := []telegraf.Metric{ metric.New("metric", map[string]string{"tag": "value"}, map[string]interface{}{"foo": 2}, time.Unix(1730478211, 0), ), metric.New("metric", map[string]string{"tag": "value"}, map[string]interface{}{"foo": 3}, time.Unix(1730478221, 0), ), } // Configure the plugin plugin := &Tail{ Files: []string{inputFilename}, MaxUndeliveredLines: 1000, offsets: make(map[string]int64, 0), Log: testutil.Logger{}, } plugin.SetParserFunc(newInfluxParser) require.NoError(t, plugin.Init()) require.Empty(t, plugin.offsets) // Setup the "persisted" state var pi telegraf.StatefulPlugin = plugin require.NoError(t, pi.SetState(state)) require.Len(t, plugin.offsets, 1) // Run the plugin var acc testutil.Accumulator require.NoError(t, plugin.Start(&acc)) defer plugin.Stop() require.NoError(t, plugin.Gather(&acc)) require.Eventuallyf(t, func() bool { return acc.NMetrics() >= uint64(len(expected)) }, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics()) plugin.Stop() // Check the result options := []cmp.Option{ testutil.SortMetrics(), testutil.IgnoreTime(), } actual := acc.GetTelegrafMetrics() testutil.RequireMetricsEqual(t, expected, actual, options...) // Check getting the persisted state actualState, ok := pi.GetState().(map[string]int64) require.True(t, ok, "state is not a map[string]int64") require.Equal(t, expectedState, actualState) } func TestGetSeekInfo(t *testing.T) { tests := []struct { name string offsets map[string]int64 initial string expected *tail.SeekInfo }{ { name: "beginning without offset", initial: "beginning", expected: &tail.SeekInfo{ Whence: 0, Offset: 0, }, }, { name: "beginning with offset", offsets: map[string]int64{"test.log": 100}, initial: "beginning", expected: &tail.SeekInfo{ Whence: 0, Offset: 0, }, }, { name: "end without offset", initial: "end", expected: &tail.SeekInfo{ Whence: 2, Offset: 0, }, }, { name: "end with offset", offsets: map[string]int64{"test.log": 100}, initial: "end", expected: &tail.SeekInfo{ Whence: 2, Offset: 0, }, }, { name: "saved-or-beginning without offset", initial: "saved-or-beginning", expected: &tail.SeekInfo{ Whence: 0, Offset: 0, }, }, { name: "saved-or-beginning with offset", offsets: map[string]int64{"test.log": 100}, initial: "saved-or-beginning", expected: &tail.SeekInfo{ Whence: 0, Offset: 100, }, }, { name: "saved-or-end without offset", initial: "saved-or-end", expected: &tail.SeekInfo{ Whence: 2, Offset: 0, }, }, { name: "saved-or-end with offset", offsets: map[string]int64{"test.log": 100}, initial: "saved-or-end", expected: &tail.SeekInfo{ Whence: 0, Offset: 100, }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { plugin := &Tail{ MaxUndeliveredLines: 1000, InitialReadOffset: tt.initial, PathTag: "path", Log: &testutil.Logger{}, } require.NoError(t, plugin.Init()) plugin.offsets = tt.offsets seekInfo, err := plugin.getSeekInfo("test.log") require.NoError(t, err) require.Equal(t, tt.expected, seekInfo) }) } } func TestGetSeekInfoForPipes(t *testing.T) { tests := []struct { name string offsets map[string]int64 initial string }{ { name: "beginning without offset", initial: "beginning", }, { name: "beginning with offset", offsets: map[string]int64{"test.log": 100}, initial: "beginning", }, { name: "end without offset", initial: "end", }, { name: "end with offset", offsets: map[string]int64{"test.log": 100}, initial: "end", }, { name: "saved-or-end without offset", initial: "saved-or-end", }, { name: "saved-or-end with offset", offsets: map[string]int64{"test.log": 100}, initial: "saved-or-end", }, { name: "saved-or-beginning without offset", initial: "saved-or-beginning", }, { name: "saved-or-beginning with offset", initial: "saved-or-beginning", offsets: map[string]int64{"test.log": 100}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { plugin := &Tail{ InitialReadOffset: tt.initial, MaxUndeliveredLines: 1000, PathTag: "path", Pipe: true, Log: &testutil.Logger{}, } require.NoError(t, plugin.Init()) plugin.offsets = tt.offsets seekInfo, err := plugin.getSeekInfo("test.log") require.NoError(t, err) require.Nil(t, seekInfo) }) } } func TestInvalidInitialReadOffset(t *testing.T) { plugin := &Tail{ InitialReadOffset: "invalid", MaxUndeliveredLines: 1000, PathTag: "path", Log: &testutil.Logger{}, } require.ErrorContains(t, plugin.Init(), "invalid 'initial_read_offset' setting") } func TestSetInitialValueForInitialReadOffset(t *testing.T) { tests := []struct { name string InitialReadOffset string FromBeginning bool expected string }{ { name: "Set InitialReadOffset to beginning when from_beginning set to true and initial_read_offset not set", FromBeginning: true, expected: "beginning", }, { name: "Set InitialReadOffset to saved-or-end when from_beginning set to false and initial_read_offset not set", expected: "saved-or-end", }, { name: "Ignore from_beginning when initial_read_offset is set", InitialReadOffset: "end", expected: "end", }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { tt := newTail() tt.FromBeginning = test.FromBeginning tt.InitialReadOffset = test.InitialReadOffset require.NoError(t, tt.Init()) require.Equal(t, test.expected, tt.InitialReadOffset) }) } } func TestInitInitialReadOffset(t *testing.T) { tests := []struct { name string InitialReadOffset string FromBeginning bool expected string }{ { name: "Set InitialReadOffset to beginning when from_beginning set to true and initial_read_offset not set", FromBeginning: true, expected: "beginning", }, { name: "Ignore from_beginning when initial_read_offset is set", FromBeginning: true, InitialReadOffset: "end", expected: "end", }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { tt := newTail() tt.FromBeginning = test.FromBeginning tt.InitialReadOffset = test.InitialReadOffset require.NoError(t, tt.Init()) require.Equal(t, test.expected, tt.InitialReadOffset) }) } } // TestTailNoLeak tests that we don't leak file descriptors when repeatedly // tailing the same file across multiple Gather calls func TestTailNoLeak(t *testing.T) { // Create a temp directory for our test file tempDir := t.TempDir() logFile := filepath.Join(tempDir, "test.log") content := "cpu usage_idle=100\r\n" require.NoError(t, os.WriteFile(logFile, []byte(content), 0600)) // Setup the plugin tt := newTestTail() tt.Log = testutil.Logger{} tt.InitialReadOffset = "beginning" tt.Files = []string{logFile} tt.SetParserFunc(newInfluxParser) require.NoError(t, tt.Init()) // Start the plugin var acc testutil.Accumulator require.NoError(t, tt.Start(&acc)) defer tt.Stop() // Wait for the plugin to process the file using Gather call require.NoError(t, acc.GatherError(tt.Gather)) // Wait for the initial metrics require.Eventually(t, func() bool { return acc.NMetrics() >= 1 }, time.Second, 100*time.Millisecond, "Did not receive initial metric") // Make sure we got the first metric acc.AssertContainsFields(t, "cpu", map[string]interface{}{ "usage_idle": float64(100), }) // Verify we have exactly one tailer after the first Gather tt.tailersMutex.RLock() initialTailerCount := len(tt.tailers) tt.tailersMutex.RUnlock() require.Equal(t, 1, initialTailerCount, "Expected exactly one tailer after first Gather") // Call Gather multiple times to simulate multiple collection intervals // This is where we test for file descriptor leaks during normal operation for i := 0; i < 10; i++ { require.NoError(t, acc.GatherError(tt.Gather)) // After each Gather, verify we still have exactly one tailer tt.tailersMutex.RLock() currentTailerCount := len(tt.tailers) tt.tailersMutex.RUnlock() require.Equal(t, 1, currentTailerCount, "Expected exactly one tailer after Gather #%d, but found %d", i+1, currentTailerCount) } // Append new content to the file to verify the tailer is still working appendContent := "cpu usage_idle=200\r\n" f, err := os.OpenFile(logFile, os.O_APPEND|os.O_WRONLY, 0600) require.NoError(t, err) _, err = f.WriteString(appendContent) require.NoError(t, err) require.NoError(t, f.Close()) // Reset metrics to make it easier to test for the new value acc.ClearMetrics() // Call Gather to pick up the new content require.NoError(t, acc.GatherError(tt.Gather)) // Wait for the new metric require.Eventually(t, func() bool { return acc.NMetrics() >= 1 }, time.Second, 100*time.Millisecond, "Did not receive metric after appending to file") // Verify we got the new metric acc.AssertContainsFields(t, "cpu", map[string]interface{}{ "usage_idle": float64(200), }) // Final check: we should still have exactly one tailer tt.tailersMutex.RLock() finalTailerCount := len(tt.tailers) tt.tailersMutex.RUnlock() require.Equal(t, 1, finalTailerCount, "Expected exactly one tailer at the end of the test") } // TestTailCleanupUnusedTailers tests the fix for file descriptor leaks // by ensuring tailers for files that no longer match the glob pattern are cleaned up func TestTailCleanupUnusedTailers(t *testing.T) { // Create a temp directory for our test files tempDir := t.TempDir() // Create two test files file1 := filepath.Join(tempDir, "test1.log") file2 := filepath.Join(tempDir, "test2.log") content := "cpu usage_idle=100\r\n" require.NoError(t, os.WriteFile(file1, []byte(content), 0600)) require.NoError(t, os.WriteFile(file2, []byte(content), 0600)) // Setup the plugin with a glob pattern matching both files tt := newTestTail() tt.Log = testutil.Logger{} tt.InitialReadOffset = "beginning" tt.Files = []string{filepath.Join(tempDir, "*.log")} tt.SetParserFunc(newInfluxParser) require.NoError(t, tt.Init()) // Start the plugin var acc testutil.Accumulator require.NoError(t, tt.Start(&acc)) defer tt.Stop() // Initially there should be 2 tailers for the two matching files require.Eventually(t, func() bool { tt.tailersMutex.RLock() tailerCount := len(tt.tailers) tt.tailersMutex.RUnlock() return tailerCount == 2 }, time.Second, 100*time.Millisecond, "Expected two tailers to be initialized") // Wait for metrics to be processed from both files require.Eventually(t, func() bool { require.NoError(t, acc.GatherError(tt.Gather)) return acc.NMetrics() >= 2 }, time.Second, 100*time.Millisecond, "Did not receive 2 metrics from the initial files") // Verify that both files have tailers tt.tailersMutex.RLock() _, hasFile1 := tt.tailers[file1] _, hasFile2 := tt.tailers[file2] tt.tailersMutex.RUnlock() require.True(t, hasFile1, "Expected to have tailer for file1") require.True(t, hasFile2, "Expected to have tailer for file2") // Rename one of the files so it no longer matches the glob pattern // This should trigger cleanup during the next Gather() call newFile2 := filepath.Join(tempDir, "test2.old") require.NoError(t, os.Rename(file2, newFile2)) // Give the plugin multiple chances to detect the change via Gather() // The cleanup should happen automatically without needing to stop/restart require.Eventually(t, func() bool { require.NoError(t, acc.GatherError(tt.Gather)) tt.tailersMutex.RLock() tailerCount := len(tt.tailers) _, stillHasFile2 := tt.tailers[file2] tt.tailersMutex.RUnlock() // The tailer for file2 should be removed since it no longer matches return tailerCount == 1 && !stillHasFile2 }, 2*time.Second, 100*time.Millisecond, "Expected tailer for renamed file to be cleaned up") // Verify that the correct tailer remains tt.tailersMutex.RLock() _, hasFile1 = tt.tailers[file1] tt.tailersMutex.RUnlock() require.True(t, hasFile1, "Expected to still have tailer for file1") // Create a new file that matches the pattern // This should add a new tailer during the next Gather() call file3 := filepath.Join(tempDir, "test3.log") require.NoError(t, os.WriteFile(file3, []byte(content), 0600)) // Wait for the new tailer to be created require.Eventually(t, func() bool { require.NoError(t, acc.GatherError(tt.Gather)) tt.tailersMutex.RLock() tailerCount := len(tt.tailers) _, hasFile3 := tt.tailers[file3] tt.tailersMutex.RUnlock() return tailerCount == 2 && hasFile3 }, 2*time.Second, 100*time.Millisecond, "Expected new tailer to be created for file3") // Delete file1 to test another cleanup scenario require.NoError(t, os.Remove(file1)) // The tailer for file1 should eventually be cleaned up require.Eventually(t, func() bool { require.NoError(t, acc.GatherError(tt.Gather)) tt.tailersMutex.RLock() tailerCount := len(tt.tailers) _, stillHasFile1 := tt.tailers[file1] tt.tailersMutex.RUnlock() return tailerCount == 1 && !stillHasFile1 }, 2*time.Second, 100*time.Millisecond, "Expected tailer for deleted file to be cleaned up") // Verify that only the file3 tailer remains tt.tailersMutex.RLock() _, hasFile3 := tt.tailers[file3] tt.tailersMutex.RUnlock() require.True(t, hasFile3, "Expected to have tailer for file3") // Test cleanup when changing the glob pattern // First, remove file3 and modify the pattern to match nothing require.NoError(t, os.Remove(file3)) // Create a new plugin instance with a pattern that matches nothing // Since we can't change the pattern on a running plugin, we need to test this differently // Let's just verify that when all files are removed, all tailers are cleaned up require.Eventually(t, func() bool { require.NoError(t, acc.GatherError(tt.Gather)) tt.tailersMutex.RLock() tailerCount := len(tt.tailers) tt.tailersMutex.RUnlock() return tailerCount == 0 }, 3*time.Second, 100*time.Millisecond, "Expected all tailers to be cleaned up when files are removed") } // TestTailCleanupGlobPatternChange tests cleanup when the glob pattern is changed // This is a separate test since we can't change the pattern on a running plugin func TestTailCleanupGlobPatternChange(t *testing.T) { // Create a temp directory for our test files tempDir := t.TempDir() // Create test files file1 := filepath.Join(tempDir, "test.log") file2 := filepath.Join(tempDir, "other.txt") content := "cpu usage_idle=100\r\n" require.NoError(t, os.WriteFile(file1, []byte(content), 0600)) require.NoError(t, os.WriteFile(file2, []byte(content), 0600)) // First test with *.log pattern tt1 := newTestTail() tt1.Log = testutil.Logger{} tt1.InitialReadOffset = "beginning" tt1.Files = []string{filepath.Join(tempDir, "*.log")} tt1.SetParserFunc(newInfluxParser) require.NoError(t, tt1.Init()) var acc1 testutil.Accumulator require.NoError(t, tt1.Start(&acc1)) // Should have 1 tailer for the .log file require.Eventually(t, func() bool { require.NoError(t, acc1.GatherError(tt1.Gather)) tt1.tailersMutex.RLock() tailerCount := len(tt1.tailers) tt1.tailersMutex.RUnlock() return tailerCount == 1 }, time.Second, 100*time.Millisecond, "Expected one tailer for .log file") tt1.Stop() // Now test with *.txt pattern tt2 := newTestTail() tt2.Log = testutil.Logger{} tt2.InitialReadOffset = "beginning" tt2.Files = []string{filepath.Join(tempDir, "*.txt")} tt2.SetParserFunc(newInfluxParser) require.NoError(t, tt2.Init()) var acc2 testutil.Accumulator require.NoError(t, tt2.Start(&acc2)) defer tt2.Stop() // Should have 1 tailer for the .txt file require.Eventually(t, func() bool { require.NoError(t, acc2.GatherError(tt2.Gather)) tt2.tailersMutex.RLock() tailerCount := len(tt2.tailers) tt2.tailersMutex.RUnlock() return tailerCount == 1 }, time.Second, 100*time.Millisecond, "Expected one tailer for .txt file") // Verify it's tailing the correct file tt2.tailersMutex.RLock() _, hasFile2 := tt2.tailers[file2] tt2.tailersMutex.RUnlock() require.True(t, hasFile2, "Expected to have tailer for .txt file") }