1
0
Fork 0
telegraf/plugins/outputs/nats/nats_test.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

184 lines
5.7 KiB
Go

package nats
import (
_ "embed"
"fmt"
"path/filepath"
"testing"
"time"
"github.com/docker/go-connections/nat"
"github.com/nats-io/nats.go/jetstream"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/testutil"
)
func TestConnectAndWriteIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
natsServicePort := "4222"
type testConfig struct {
name string
container testutil.Container
nats *NATS
streamConfigCompareFunc func(*testing.T, *jetstream.StreamInfo)
wantErr bool
}
testCases := []testConfig{
{
name: "valid without jetstream",
container: testutil.Container{
Image: "nats:latest",
ExposedPorts: []string{natsServicePort},
WaitingFor: wait.ForListeningPort(nat.Port(natsServicePort)),
},
nats: &NATS{
Name: "telegraf",
Subject: "telegraf",
serializer: &influx.Serializer{},
Log: testutil.Logger{},
},
},
{
name: "valid with jetstream",
container: testutil.Container{
Image: "nats:latest",
ExposedPorts: []string{natsServicePort},
Cmd: []string{"--js"},
WaitingFor: wait.ForListeningPort(nat.Port(natsServicePort)),
},
nats: &NATS{
Name: "telegraf",
Subject: "telegraf",
Jetstream: &StreamConfig{
Name: "my-telegraf-stream",
},
serializer: &influx.Serializer{},
Log: testutil.Logger{},
},
streamConfigCompareFunc: func(t *testing.T, si *jetstream.StreamInfo) {
require.Equal(t, "my-telegraf-stream", si.Config.Name)
require.Equal(t, []string{"telegraf"}, si.Config.Subjects)
},
},
{
name: "create stream with config",
container: testutil.Container{
Image: "nats:latest",
ExposedPorts: []string{natsServicePort},
Cmd: []string{"--js"},
WaitingFor: wait.ForListeningPort(nat.Port(natsServicePort)),
},
nats: &NATS{
Name: "telegraf",
Subject: "my-tel-sub-outer",
Jetstream: &StreamConfig{
Name: "telegraf-stream-with-cfg",
Subjects: []string{"my-tel-sub0", "my-tel-sub1", "my-tel-sub2"},
Retention: "workqueue",
MaxConsumers: 10,
Discard: "new",
Storage: "memory",
MaxMsgs: 100_000,
MaxBytes: 104_857_600,
MaxAge: config.Duration(10 * time.Minute),
Duplicates: config.Duration(5 * time.Minute),
MaxMsgSize: 120,
MaxMsgsPerSubject: 500,
},
serializer: &influx.Serializer{},
Log: testutil.Logger{},
},
streamConfigCompareFunc: func(t *testing.T, si *jetstream.StreamInfo) {
require.Equal(t, "telegraf-stream-with-cfg", si.Config.Name)
require.Equal(t, []string{"my-tel-sub0", "my-tel-sub1", "my-tel-sub2", "my-tel-sub-outer"}, si.Config.Subjects)
require.Equal(t, jetstream.WorkQueuePolicy, si.Config.Retention)
require.Equal(t, 10, si.Config.MaxConsumers)
require.Equal(t, jetstream.DiscardNew, si.Config.Discard)
require.Equal(t, jetstream.MemoryStorage, si.Config.Storage)
require.Equal(t, int64(100_000), si.Config.MaxMsgs)
require.Equal(t, int64(104_857_600), si.Config.MaxBytes)
require.Equal(t, 10*time.Minute, si.Config.MaxAge)
require.Equal(t, 5*time.Minute, si.Config.Duplicates)
require.Equal(t, int32(120), si.Config.MaxMsgSize)
require.Equal(t, int64(500), si.Config.MaxMsgsPerSubject)
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := tc.container.Start()
require.NoError(t, err, "failed to start container")
defer tc.container.Terminate()
server := []string{fmt.Sprintf("nats://%s:%s", tc.container.Address, tc.container.Ports[natsServicePort])}
tc.nats.Servers = server
// Verify that we can connect to the NATS daemon
require.NoError(t, tc.nats.Init())
err = tc.nats.Connect()
if tc.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
if tc.nats.Jetstream != nil {
stream, err := tc.nats.jetstreamClient.Stream(t.Context(), tc.nats.Jetstream.Name)
require.NoError(t, err)
si, err := stream.Info(t.Context())
require.NoError(t, err)
tc.streamConfigCompareFunc(t, si)
}
// Verify that we can successfully write data to the NATS daemon
err = tc.nats.Write(testutil.MockMetrics())
require.NoError(t, err)
})
}
}
func TestConfigParsing(t *testing.T) {
// Define test cases
testCases := []struct {
name string
path string
wantErr bool
}{
{name: "Valid Default", path: filepath.Join("testcases", "no-js.conf")},
{name: "Valid JS", path: filepath.Join("testcases", "js-default.conf")},
{name: "Valid JS Config", path: filepath.Join("testcases", "js-config.conf")},
{name: "Subjects warning", path: filepath.Join("testcases", "js-subjects.conf")},
{name: "Invalid JS", path: filepath.Join("testcases", "js-no-stream.conf"), wantErr: true},
}
// Register the plugin
outputs.Add("nats", func() telegraf.Output {
return &NATS{}
})
srl := &influx.Serializer{}
require.NoError(t, srl.Init())
// Run tests using the table-driven approach
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Configure the plugin
cfg := config.NewConfig()
require.NoError(t, cfg.LoadConfig(tc.path))
require.Len(t, cfg.Outputs, 1)
err := cfg.Outputs[0].Init()
if tc.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}