1
0
Fork 0
telegraf/plugins/outputs/opensearch/opensearch_v2_test.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

343 lines
9 KiB
Go

package opensearch
import (
"context"
"fmt"
"math"
"testing"
"text/template"
"time"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/testutil"
)
func TestConnectAndWriteIntegrationV2(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := launchTestContainer(t, imageVersion2)
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Opensearch{
URLs: urls,
IndexName: `test-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
EnableGzip: false,
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: false,
HealthCheckInterval: config.Duration(time.Second * 10),
HealthCheckTimeout: config.Duration(time.Second * 1),
Log: testutil.Logger{},
}
var err error
e.indexTmpl, err = template.New("index").Parse(e.IndexName)
require.NoError(t, err)
// Verify that we can connect to Opensearch
require.NoError(t, e.Connect())
// Verify that we can successfully write data to Opensearch
require.NoError(t, e.Write(testutil.MockMetrics()))
}
func TestConnectAndWriteMetricWithNaNValueEmptyIntegrationV2(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := launchTestContainer(t, imageVersion2)
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Opensearch{
URLs: urls,
IndexName: `test-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: false,
HealthCheckInterval: config.Duration(time.Second * 10),
HealthCheckTimeout: config.Duration(time.Second * 1),
Log: testutil.Logger{},
}
metrics := []telegraf.Metric{
testutil.TestMetric(math.NaN()),
testutil.TestMetric(math.Inf(1)),
testutil.TestMetric(math.Inf(-1)),
}
// Verify that we can connect to Opensearch
require.NoError(t, e.Init())
require.NoError(t, e.Connect())
// Verify that we can fail for metric with unhandled NaN/inf/-inf values
for _, m := range metrics {
require.Error(t, e.Write([]telegraf.Metric{m}), "error sending bulk request to Opensearch: "+
"json: unsupported value: NaN")
}
}
func TestConnectAndWriteMetricWithNaNValueNoneIntegrationV2(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := launchTestContainer(t, imageVersion2)
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Opensearch{
URLs: urls,
IndexName: `test-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: false,
HealthCheckInterval: config.Duration(time.Second * 10),
HealthCheckTimeout: config.Duration(time.Second * 1),
FloatHandling: "none",
Log: testutil.Logger{},
}
metrics := []telegraf.Metric{
testutil.TestMetric(math.NaN()),
testutil.TestMetric(math.Inf(1)),
testutil.TestMetric(math.Inf(-1)),
}
var err error
e.indexTmpl, err = template.New("index").Parse(e.IndexName)
require.NoError(t, err)
// Verify that we can connect to Opensearch
err = e.Connect()
require.NoError(t, err)
// Verify that we can fail for metric with unhandled NaN/inf/-inf values
for _, m := range metrics {
err = e.Write([]telegraf.Metric{m})
require.Error(t, err, "error sending bulk request to Opensearch: json: unsupported value: NaN")
}
}
func TestConnectAndWriteMetricWithNaNValueDropIntegrationV2(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := launchTestContainer(t, imageVersion2)
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Opensearch{
URLs: urls,
IndexName: `test-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: false,
HealthCheckInterval: config.Duration(time.Second * 10),
HealthCheckTimeout: config.Duration(time.Second * 1),
FloatHandling: "drop",
Log: testutil.Logger{},
}
metrics := []telegraf.Metric{
testutil.TestMetric(math.NaN()),
testutil.TestMetric(math.Inf(1)),
testutil.TestMetric(math.Inf(-1)),
}
var err error
e.indexTmpl, err = template.New("index").Parse(e.IndexName)
require.NoError(t, err)
// Verify that we can connect to Opensearch
err = e.Connect()
require.NoError(t, err)
// Verify that we can fail for metric with unhandled NaN/inf/-inf values
for _, m := range metrics {
err = e.Write([]telegraf.Metric{m})
require.NoError(t, err)
}
}
func TestConnectAndWriteMetricWithNaNValueReplacementIntegrationV2(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
tests := []struct {
floatHandle string
floatReplacement float64
expectError bool
}{
{
"none",
0.0,
true,
},
{
"drop",
0.0,
false,
},
{
"replace",
0.0,
false,
},
}
container := launchTestContainer(t, imageVersion2)
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
for _, test := range tests {
e := &Opensearch{
URLs: urls,
IndexName: `test-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: false,
HealthCheckInterval: config.Duration(time.Second * 10),
HealthCheckTimeout: config.Duration(time.Second * 1),
FloatHandling: test.floatHandle,
FloatReplacement: test.floatReplacement,
Log: testutil.Logger{},
}
metrics := []telegraf.Metric{
testutil.TestMetric(math.NaN()),
testutil.TestMetric(math.Inf(1)),
testutil.TestMetric(math.Inf(-1)),
}
var err error
e.indexTmpl, err = template.New("index").Parse(e.IndexName)
require.NoError(t, err)
err = e.Connect()
require.NoError(t, err)
for _, m := range metrics {
err = e.Write([]telegraf.Metric{m})
if test.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
}
}
}
}
func TestTemplateManagementEmptyTemplateIntegrationV2(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := launchTestContainer(t, imageVersion2)
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Opensearch{
URLs: urls,
IndexName: `test-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
EnableGzip: false,
ManageTemplate: true,
TemplateName: "",
OverwriteTemplate: true,
Log: testutil.Logger{},
}
err := e.Init()
require.Error(t, err)
}
func TestTemplateManagementIntegrationV2(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := launchTestContainer(t, imageVersion2)
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Opensearch{
URLs: urls,
IndexName: `test-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
EnableGzip: false,
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: true,
Log: testutil.Logger{},
}
ctx, cancel := context.WithTimeout(t.Context(), time.Duration(e.Timeout))
defer cancel()
var err error
e.indexTmpl, err = template.New("index").Parse(e.IndexName)
require.NoError(t, err)
err = e.Connect()
require.NoError(t, err)
err = e.manageTemplate(ctx)
require.NoError(t, err)
}
func TestTemplateInvalidIndexPatternIntegrationV2(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := launchTestContainer(t, imageVersion2)
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Opensearch{
URLs: urls,
IndexName: `{{.Tag "tag1"}}-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
EnableGzip: false,
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: true,
Log: testutil.Logger{},
}
var err error
e.indexTmpl, err = template.New("index").Parse(e.IndexName)
require.NoError(t, err)
err = e.Connect()
require.Error(t, err)
}