package redistimeseries import ( "context" "fmt" "os" "path/filepath" "strings" "testing" "time" "github.com/docker/go-connections/nat" "github.com/redis/go-redis/v9" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go/wait" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/testutil" ) func TestConnectAndWriteIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } servicePort := "6379" container := testutil.Container{ Image: "redislabs/redistimeseries", ExposedPorts: []string{servicePort}, WaitingFor: wait.ForListeningPort(nat.Port(servicePort)), } require.NoError(t, container.Start(), "failed to start container") defer container.Terminate() redis := &RedisTimeSeries{ Address: fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort]), ConvertStringFields: true, Timeout: config.Duration(10 * time.Second), } // Verify that we can connect to the RedisTimeSeries server require.NoError(t, redis.Connect()) // Verify that we can successfully write data to the RedisTimeSeries server require.NoError(t, redis.Write(testutil.MockMetrics())) } func TestCases(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } const servicePort = "6379" // Get all testcase directories folders, err := os.ReadDir("testcases") require.NoError(t, err) // Register the plugin outputs.Add("redistimeseries", func() telegraf.Output { return &RedisTimeSeries{ ConvertStringFields: true, Timeout: config.Duration(10 * time.Second), } }) for _, f := range folders { // Only handle folders if !f.IsDir() { continue } t.Run(f.Name(), func(t *testing.T) { testcasePath := filepath.Join("testcases", f.Name()) configFilename := filepath.Join(testcasePath, "telegraf.conf") inputFilename := filepath.Join(testcasePath, "input.influx") expectedFilename := filepath.Join(testcasePath, "expected.out") expectedErrorFilename := filepath.Join(testcasePath, "expected.err") // Get parser to parse input and expected output parser := &influx.Parser{} require.NoError(t, parser.Init()) // Load the input data input, err := testutil.ParseMetricsFromFile(inputFilename, parser) require.NoError(t, err) // Read the expected output if any var expected []string if _, err := os.Stat(expectedFilename); err == nil { expected, err = testutil.ParseLinesFromFile(expectedFilename) require.NoError(t, err) } // Read the expected output if any var expectedError string if _, err := os.Stat(expectedErrorFilename); err == nil { expectedErrors, err := testutil.ParseLinesFromFile(expectedErrorFilename) require.NoError(t, err) require.Len(t, expectedErrors, 1) expectedError = expectedErrors[0] } // Configure the plugin cfg := config.NewConfig() require.NoError(t, cfg.LoadConfig(configFilename)) require.Len(t, cfg.Outputs, 1) // Setup a test-container container := testutil.Container{ Image: "redis/redis-stack-server:latest", ExposedPorts: []string{servicePort}, Env: map[string]string{}, WaitingFor: wait.ForListeningPort(nat.Port(servicePort)), } require.NoError(t, container.Start(), "failed to start container") defer container.Terminate() address := container.Address + ":" + container.Ports[servicePort] // Setup the plugin plugin := cfg.Outputs[0].Output.(*RedisTimeSeries) plugin.Address = address plugin.Log = testutil.Logger{} // Connect and write the metric(s) require.NoError(t, plugin.Connect()) defer plugin.Close() err = plugin.Write(input) if expectedError != "" { require.ErrorContains(t, err, expectedError) return } require.NoError(t, err) // // Check the metric nevertheless as we might get some metrics despite errors. actual := getAllRecords(t.Context(), address) require.ElementsMatch(t, expected, actual) }) } } func getAllRecords(testContext context.Context, address string) []string { client := redis.NewClient(&redis.Options{Addr: address}) ctx, cancel := context.WithTimeout(testContext, 10*time.Second) defer cancel() var records []string keys := client.Keys(ctx, "*") for _, key := range keys.Val() { info := client.TSInfo(ctx, key) var labels string if l, found := info.Val()["labels"]; found { lmap := l.(map[interface{}]interface{}) collection := make([]string, 0, len(lmap)) for k, v := range lmap { collection = append(collection, fmt.Sprintf("%v=%v", k, v)) } if len(collection) > 0 { labels = " " + strings.Join(collection, " ") } } result := client.TSRange(ctx, key, 0, int(time.Now().UnixMilli())) for _, point := range result.Val() { records = append(records, fmt.Sprintf("%s: %f %d%s", result.Args()[1], point.Value, point.Timestamp, labels)) } } return records }