package inputs_nats_consumer_test import ( "os" "path/filepath" "testing" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf/config" _ "github.com/influxdata/telegraf/migrations/inputs_nats_consumer" // register migration _ "github.com/influxdata/telegraf/plugins/inputs/nats_consumer" // register plugin _ "github.com/influxdata/telegraf/plugins/parsers/all" // register parsers ) func TestNoMigration(t *testing.T) { defaultCfg := []byte(` # Read metrics from NATS subject(s) [[inputs.nats_consumer]] ## urls of NATS servers servers = ["nats://localhost:4222"] ## subject(s) to consume ## If you use jetstream you need to set the subjects ## in jetstream_subjects subjects = ["telegraf"] ## jetstream subjects ## jetstream is a streaming technology inside of nats. ## With jetstream the nats-server persists messages and ## a consumer can consume historical messages. This is ## useful when telegraf needs to restart it don't miss a ## message. You need to configure the nats-server. ## https://docs.nats.io/nats-concepts/jetstream. jetstream_subjects = ["js_telegraf"] ## name a queue group queue_group = "telegraf_consumers" ## Optional credentials # username = "" # password = "" ## Optional NATS 2.0 and NATS NGS compatible user credentials # credentials = "/etc/telegraf/nats.creds" ## Use Transport Layer Security # secure = false ## Optional TLS Config # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false ## Sets the limits for pending msgs and bytes for each subscription ## These shouldn't need to be adjusted except in very high throughput scenarios # pending_message_limit = 65536 # pending_bytes_limit = 67108864 ## Max undelivered messages ## This plugin uses tracking metrics, which ensure messages are read to ## outputs before acknowledging them to the original broker to ensure data ## is not lost. This option sets the maximum messages to read from the ## broker that have not been written by an output. ## ## This value needs to be picked with awareness of the agent's ## metric_batch_size value as well. Setting max undelivered messages too high ## can result in a constant stream of data batches to the output. While ## setting it too low may never flush the broker's messages. # max_undelivered_messages = 1000 ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "influx" `) // Migrate and check that nothing changed output, n, err := config.ApplyMigrations(defaultCfg) require.NoError(t, err) require.NotEmpty(t, output) require.Zero(t, n) require.Equal(t, string(defaultCfg), string(output)) } func TestCases(t *testing.T) { // Get all directories in testdata folders, err := os.ReadDir("testcases") require.NoError(t, err) for _, f := range folders { // Only handle folders if !f.IsDir() { continue } t.Run(f.Name(), func(t *testing.T) { testcasePath := filepath.Join("testcases", f.Name()) inputFile := filepath.Join(testcasePath, "telegraf.conf") expectedFile := filepath.Join(testcasePath, "expected.conf") // Read the expected output expected := config.NewConfig() require.NoError(t, expected.LoadConfig(expectedFile)) require.NotEmpty(t, expected.Inputs) // Read the input data input, remote, err := config.LoadConfigFile(inputFile) require.NoError(t, err) require.False(t, remote) require.NotEmpty(t, input) // Migrate output, n, err := config.ApplyMigrations(input) require.NoError(t, err) require.NotEmpty(t, output) require.GreaterOrEqual(t, n, uint64(1)) actual := config.NewConfig() require.NoError(t, actual.LoadConfigData(output, config.EmptySourcePath)) // Test the output require.Len(t, actual.Inputs, len(expected.Inputs)) actualIDs := make([]string, 0, len(expected.Inputs)) expectedIDs := make([]string, 0, len(expected.Inputs)) for i := range actual.Inputs { actualIDs = append(actualIDs, actual.Inputs[i].ID()) expectedIDs = append(expectedIDs, expected.Inputs[i].ID()) } require.ElementsMatch(t, expectedIDs, actualIDs, string(output)) }) } }