1
0
Fork 0
telegraf/plugins/outputs/timestream/timestream_test.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

1414 lines
44 KiB
Go

package timestream
import (
"context"
"errors"
"fmt"
"math"
"reflect"
"sort"
"strconv"
"strings"
"testing"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/timestreamwrite"
"github.com/aws/aws-sdk-go-v2/service/timestreamwrite/types"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
common_aws "github.com/influxdata/telegraf/plugins/common/aws"
"github.com/influxdata/telegraf/testutil"
)
const tsDBName = "testDb"
const testSingleTableName = "SingleTableName"
const testSingleTableDim = "namespace"
var time1 = time.Date(2009, time.November, 10, 22, 0, 0, 0, time.UTC)
const time1Epoch = "1257890400"
const timeUnit = types.TimeUnitSeconds
var time2 = time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
const time2Epoch = "1257894000"
const metricName1 = "metricName1"
const metricName2 = "metricName2"
type mockTimestreamClient struct {
WriteRecordsRequestCount int
}
func (*mockTimestreamClient) CreateTable(
context.Context,
*timestreamwrite.CreateTableInput,
...func(*timestreamwrite.Options),
) (*timestreamwrite.CreateTableOutput, error) {
return nil, nil
}
func (m *mockTimestreamClient) WriteRecords(
context.Context,
*timestreamwrite.WriteRecordsInput,
...func(*timestreamwrite.Options),
) (*timestreamwrite.WriteRecordsOutput, error) {
m.WriteRecordsRequestCount++
return nil, nil
}
func (*mockTimestreamClient) DescribeDatabase(
context.Context,
*timestreamwrite.DescribeDatabaseInput,
...func(*timestreamwrite.Options),
) (*timestreamwrite.DescribeDatabaseOutput, error) {
return nil, errors.New("hello from DescribeDatabase")
}
func TestConnectValidatesConfigParameters(t *testing.T) {
WriteFactory = func(*common_aws.CredentialConfig) (WriteClient, error) {
return &mockTimestreamClient{}, nil
}
// checking base arguments
noDatabaseName := Timestream{Log: testutil.Logger{}}
require.ErrorContains(t, noDatabaseName.Connect(), "'database_name' key is required")
noMappingMode := Timestream{
DatabaseName: tsDBName,
Log: testutil.Logger{},
}
require.ErrorContains(t, noMappingMode.Connect(), "'mapping_mode' key is required")
incorrectMappingMode := Timestream{
DatabaseName: tsDBName,
MappingMode: "foo",
Log: testutil.Logger{},
}
require.Contains(t, incorrectMappingMode.Connect().Error(), "single-table")
// multi-measure config validation multi table mode
validConfigMultiMeasureMultiTableMode := Timestream{
DatabaseName: tsDBName,
MappingMode: MappingModeMultiTable,
UseMultiMeasureRecords: true,
MeasureNameForMultiMeasureRecords: "multi-measure-name",
Log: testutil.Logger{},
}
require.NoError(t, validConfigMultiMeasureMultiTableMode.Connect())
invalidConfigMultiMeasureMultiTableMode := Timestream{
DatabaseName: tsDBName,
MappingMode: MappingModeMultiTable,
UseMultiMeasureRecords: true,
// without MeasureNameForMultiMeasureRecords set we expect validation failure
Log: testutil.Logger{},
}
require.Contains(t, invalidConfigMultiMeasureMultiTableMode.Connect().Error(), "MeasureNameForMultiMeasureRecords")
// multi-measure config validation single table mode
validConfigMultiMeasureSingleTableMode := Timestream{
DatabaseName: tsDBName,
MappingMode: MappingModeSingleTable,
SingleTableName: testSingleTableName,
UseMultiMeasureRecords: true, // MeasureNameForMultiMeasureRecords is not needed as
// measurement name (from telegraf metric) is used as multi-measure name in TS
Log: testutil.Logger{},
}
require.NoError(t, validConfigMultiMeasureSingleTableMode.Connect())
invalidConfigMultiMeasureSingleTableMode := Timestream{
DatabaseName: tsDBName,
MappingMode: MappingModeSingleTable,
SingleTableName: testSingleTableName,
UseMultiMeasureRecords: true,
MeasureNameForMultiMeasureRecords: "multi-measure-name",
// value of MeasureNameForMultiMeasureRecords will be ignored and
// measurement name (from telegraf metric) is used as multi-measure name in TS
Log: testutil.Logger{},
}
err := invalidConfigMultiMeasureSingleTableMode.Connect()
require.ErrorContains(t, err, "MeasureNameForMultiMeasureRecords")
// multi-table arguments
validMappingModeMultiTable := Timestream{
DatabaseName: tsDBName,
MappingMode: MappingModeMultiTable,
Log: testutil.Logger{},
}
require.NoError(t, validMappingModeMultiTable.Connect())
singleTableNameWithMultiTable := Timestream{
DatabaseName: tsDBName,
MappingMode: MappingModeMultiTable,
SingleTableName: testSingleTableName,
Log: testutil.Logger{},
}
require.Contains(t, singleTableNameWithMultiTable.Connect().Error(), "SingleTableName")
singleTableDimensionWithMultiTable := Timestream{
DatabaseName: tsDBName,
MappingMode: MappingModeMultiTable,
SingleTableDimensionNameForTelegrafMeasurementName: testSingleTableDim,
Log: testutil.Logger{},
}
require.Contains(t, singleTableDimensionWithMultiTable.Connect().Error(),
"SingleTableDimensionNameForTelegrafMeasurementName")
// single-table arguments
noTableNameMappingModeSingleTable := Timestream{
DatabaseName: tsDBName,
MappingMode: MappingModeSingleTable,
Log: testutil.Logger{},
}
require.Contains(t, noTableNameMappingModeSingleTable.Connect().Error(), "SingleTableName")
noDimensionNameMappingModeSingleTable := Timestream{
DatabaseName: tsDBName,
MappingMode: MappingModeSingleTable,
SingleTableName: testSingleTableName,
Log: testutil.Logger{},
}
require.Contains(t, noDimensionNameMappingModeSingleTable.Connect().Error(),
"SingleTableDimensionNameForTelegrafMeasurementName")
validConfigurationMappingModeSingleTable := Timestream{
DatabaseName: tsDBName,
MappingMode: MappingModeSingleTable,
SingleTableName: testSingleTableName,
SingleTableDimensionNameForTelegrafMeasurementName: testSingleTableDim,
Log: testutil.Logger{},
}
require.NoError(t, validConfigurationMappingModeSingleTable.Connect())
// create table arguments
createTableNoMagneticRetention := Timestream{
DatabaseName: tsDBName,
MappingMode: MappingModeMultiTable,
CreateTableIfNotExists: true,
Log: testutil.Logger{},
}
require.Contains(t, createTableNoMagneticRetention.Connect().Error(),
"CreateTableMagneticStoreRetentionPeriodInDays")
createTableNoMemoryRetention := Timestream{
DatabaseName: tsDBName,
MappingMode: MappingModeMultiTable,
CreateTableIfNotExists: true,
CreateTableMagneticStoreRetentionPeriodInDays: 3,
Log: testutil.Logger{},
}
require.Contains(t, createTableNoMemoryRetention.Connect().Error(),
"CreateTableMemoryStoreRetentionPeriodInHours")
createTableValid := Timestream{
DatabaseName: tsDBName,
MappingMode: MappingModeMultiTable,
CreateTableIfNotExists: true,
CreateTableMagneticStoreRetentionPeriodInDays: 3,
CreateTableMemoryStoreRetentionPeriodInHours: 3,
Log: testutil.Logger{},
}
require.NoError(t, createTableValid.Connect())
// describe table on start arguments
describeTableInvoked := Timestream{
DatabaseName: tsDBName,
MappingMode: MappingModeMultiTable,
DescribeDatabaseOnStart: true,
Log: testutil.Logger{},
}
require.Contains(t, describeTableInvoked.Connect().Error(), "hello from DescribeDatabase")
}
func TestWriteMultiMeasuresSingleTableMode(t *testing.T) {
const recordCount = 100
mockClient := &mockTimestreamClient{0}
WriteFactory = func(*common_aws.CredentialConfig) (WriteClient, error) {
return mockClient, nil
}
localTime, err := strconv.Atoi(time1Epoch)
require.NoError(t, err)
inputs := make([]telegraf.Metric, 0, recordCount+1)
for i := 1; i <= recordCount+1; i++ {
localTime++
fieldName1 := "value_supported1" + strconv.Itoa(i)
fieldName2 := "value_supported2" + strconv.Itoa(i)
inputs = append(inputs, testutil.MustMetric(
"multi_measure_name",
map[string]string{"tag1": "value1"},
map[string]interface{}{
fieldName1: float64(10),
fieldName2: float64(20),
},
time.Unix(int64(localTime), 0),
))
}
plugin := Timestream{
MappingMode: MappingModeSingleTable,
SingleTableName: "test-multi-single-table-mode",
DatabaseName: tsDBName,
UseMultiMeasureRecords: true, // use multi
Log: testutil.Logger{},
}
// validate config correctness
require.NoError(t, plugin.Connect())
// validate multi-record generation
result := plugin.TransformMetrics(inputs)
// 'inputs' has a total of 101 metrics transformed to 2 writeRecord calls to TS
require.Len(t, result, 2, "Expected 2 WriteRecordsInput requests")
var transformedRecords []types.Record
for _, r := range result {
transformedRecords = append(transformedRecords, r.Records...)
// Assert that we use measure name from input
require.Equal(t, "multi_measure_name", *r.Records[0].MeasureName)
}
// Expected 101 records
require.Len(t, transformedRecords, recordCount+1, "Expected 101 records after transforming")
// validate write to TS
err = plugin.Write(inputs)
require.NoError(t, err, "Write to Timestream failed")
require.Equal(t, 2, mockClient.WriteRecordsRequestCount, "Expected 2 WriteRecords calls")
}
func TestWriteMultiMeasuresMultiTableMode(t *testing.T) {
const recordCount = 100
mockClient := &mockTimestreamClient{0}
WriteFactory = func(*common_aws.CredentialConfig) (WriteClient, error) {
return mockClient, nil
}
localTime, err := strconv.Atoi(time1Epoch)
require.NoError(t, err)
inputs := make([]telegraf.Metric, 0, recordCount)
for i := 1; i <= recordCount; i++ {
localTime++
fieldName1 := "value_supported1" + strconv.Itoa(i)
fieldName2 := "value_supported2" + strconv.Itoa(i)
inputs = append(inputs, testutil.MustMetric(
"multi_measure_name",
map[string]string{"tag1": "value1"},
map[string]interface{}{
fieldName1: float64(10),
fieldName2: float64(20),
},
time.Unix(int64(localTime), 0),
))
}
plugin := Timestream{
MappingMode: MappingModeMultiTable,
DatabaseName: tsDBName,
UseMultiMeasureRecords: true, // use multi
MeasureNameForMultiMeasureRecords: "config-multi-measure-name",
Log: testutil.Logger{},
}
// validate config correctness
err = plugin.Connect()
require.NoError(t, err, "Invalid configuration")
// validate multi-record generation
result := plugin.TransformMetrics(inputs)
// 'inputs' has a total of 101 metrics transformed to 2 writeRecord calls to TS
require.Len(t, result, 1, "Expected 1 WriteRecordsInput requests")
// Assert that we use measure name from config
require.Equal(t, "config-multi-measure-name", *result[0].Records[0].MeasureName)
var transformedRecords []types.Record
for _, r := range result {
transformedRecords = append(transformedRecords, r.Records...)
}
// Expected 100 records
require.Len(t, transformedRecords, recordCount, "Expected 100 records after transforming")
for _, input := range inputs {
fmt.Println("Input", input)
fmt.Println(*result[0].Records[0].MeasureName)
break
}
// validate successful write to TS
err = plugin.Write(inputs)
require.NoError(t, err, "Write to Timestream failed")
require.Equal(t, 1, mockClient.WriteRecordsRequestCount, "Expected 1 WriteRecords call")
}
func TestBuildMultiMeasuresInSingleAndMultiTableMode(t *testing.T) {
input1 := testutil.MustMetric(
metricName1,
map[string]string{"tag1": "value1"},
map[string]interface{}{
"measureDouble": aws.Float64(10),
},
time1,
)
input2 := testutil.MustMetric(
metricName1,
map[string]string{"tag2": "value2"},
map[string]interface{}{
"measureBigint": aws.Int32(20),
},
time1,
)
input3 := testutil.MustMetric(
metricName1,
map[string]string{"tag3": "value3"},
map[string]interface{}{
"measureVarchar": "DUMMY",
},
time1,
)
input4 := testutil.MustMetric(
metricName1,
map[string]string{"tag4": "value4"},
map[string]interface{}{
"measureBool": true,
},
time1,
)
input5 := testutil.MustMetric(
metricName1,
map[string]string{"tag5": "value5"},
map[string]interface{}{
"measureMaxUint64": uint64(math.MaxUint64),
},
time1,
)
input6 := testutil.MustMetric(
metricName1,
map[string]string{"tag6": "value6"},
map[string]interface{}{
"measureSmallUint64": uint64(123456),
},
time1,
)
expectedResultMultiTable := buildExpectedMultiRecords("config-multi-measure-name", metricName1)
plugin := Timestream{
MappingMode: MappingModeMultiTable,
DatabaseName: tsDBName,
UseMultiMeasureRecords: true, // use multi
MeasureNameForMultiMeasureRecords: "config-multi-measure-name",
Log: testutil.Logger{},
}
// validate config correctness
err := plugin.Connect()
require.NoError(t, err, "Invalid configuration")
// validate multi-record generation with MappingModeMultiTable
result := plugin.TransformMetrics([]telegraf.Metric{input1, input2, input3, input4, input5, input6})
require.Len(t, result, 1, "Expected 1 WriteRecordsInput requests")
require.EqualValues(t, expectedResultMultiTable, result[0])
require.True(t, arrayContains(result, expectedResultMultiTable), "Expected that the list of requests to Timestream: %+v\n "+
"will contain request: %+v\n\n", result, expectedResultMultiTable)
// singleTableMode
plugin = Timestream{
MappingMode: MappingModeSingleTable,
SingleTableName: "singleTableName",
DatabaseName: tsDBName,
UseMultiMeasureRecords: true, // use multi
Log: testutil.Logger{},
}
// validate config correctness
err = plugin.Connect()
require.NoError(t, err, "Invalid configuration")
expectedResultSingleTable := buildExpectedMultiRecords(metricName1, "singleTableName")
// validate multi-record generation with MappingModeSingleTable
result = plugin.TransformMetrics([]telegraf.Metric{input1, input2, input3, input4, input5, input6})
require.Len(t, result, 1, "Expected 1 WriteRecordsInput requests")
require.EqualValues(t, expectedResultSingleTable, result[0])
require.True(t, arrayContains(result, expectedResultSingleTable), "Expected that the list of requests to Timestream: %+v\n "+
"will contain request: %+v\n\n", result, expectedResultSingleTable)
}
func buildExpectedMultiRecords(multiMeasureName, tableName string) *timestreamwrite.WriteRecordsInput {
var recordsMultiTableMode []types.Record
recordDouble := buildMultiRecords([]SimpleInput{
{
t: time1Epoch,
tableName: metricName1,
dimensions: map[string]string{"tag1": "value1"},
measureValues: map[string]string{"measureDouble": "10"},
}}, multiMeasureName, types.MeasureValueTypeDouble)
recordsMultiTableMode = append(recordsMultiTableMode, recordDouble...)
recordBigint := buildMultiRecords([]SimpleInput{
{
t: time1Epoch,
tableName: metricName1,
dimensions: map[string]string{"tag2": "value2"},
measureValues: map[string]string{"measureBigint": "20"},
}}, multiMeasureName, types.MeasureValueTypeBigint)
recordsMultiTableMode = append(recordsMultiTableMode, recordBigint...)
recordVarchar := buildMultiRecords([]SimpleInput{
{
t: time1Epoch,
tableName: metricName1,
dimensions: map[string]string{"tag3": "value3"},
measureValues: map[string]string{"measureVarchar": "DUMMY"},
}}, multiMeasureName, types.MeasureValueTypeVarchar)
recordsMultiTableMode = append(recordsMultiTableMode, recordVarchar...)
recordBool := buildMultiRecords([]SimpleInput{
{
t: time1Epoch,
tableName: metricName1,
dimensions: map[string]string{"tag4": "value4"},
measureValues: map[string]string{"measureBool": "true"},
},
}, multiMeasureName, types.MeasureValueTypeBoolean)
recordsMultiTableMode = append(recordsMultiTableMode, recordBool...)
recordMaxUint64 := buildMultiRecords([]SimpleInput{
{
t: time1Epoch,
tableName: metricName1,
dimensions: map[string]string{"tag5": "value5"},
measureValues: map[string]string{"measureMaxUint64": "9223372036854775807"},
},
}, multiMeasureName, types.MeasureValueTypeBigint)
recordsMultiTableMode = append(recordsMultiTableMode, recordMaxUint64...)
recordUint64 := buildMultiRecords([]SimpleInput{
{
t: time1Epoch,
tableName: metricName1,
dimensions: map[string]string{"tag6": "value6"},
measureValues: map[string]string{"measureSmallUint64": "123456"},
},
}, multiMeasureName, types.MeasureValueTypeBigint)
recordsMultiTableMode = append(recordsMultiTableMode, recordUint64...)
expectedResultMultiTable := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(tsDBName),
TableName: aws.String(tableName),
Records: recordsMultiTableMode,
CommonAttributes: &types.Record{},
}
return expectedResultMultiTable
}
type mockTimestreamErrorClient struct {
ErrorToReturnOnWriteRecords error
}
func (*mockTimestreamErrorClient) CreateTable(
context.Context,
*timestreamwrite.CreateTableInput,
...func(*timestreamwrite.Options),
) (*timestreamwrite.CreateTableOutput, error) {
return nil, nil
}
func (m *mockTimestreamErrorClient) WriteRecords(
context.Context,
*timestreamwrite.WriteRecordsInput,
...func(*timestreamwrite.Options),
) (*timestreamwrite.WriteRecordsOutput, error) {
return nil, m.ErrorToReturnOnWriteRecords
}
func (*mockTimestreamErrorClient) DescribeDatabase(
context.Context,
*timestreamwrite.DescribeDatabaseInput,
...func(*timestreamwrite.Options),
) (*timestreamwrite.DescribeDatabaseOutput, error) {
return nil, nil
}
func TestThrottlingErrorIsReturnedToTelegraf(t *testing.T) {
WriteFactory = func(*common_aws.CredentialConfig) (WriteClient, error) {
return &mockTimestreamErrorClient{
ErrorToReturnOnWriteRecords: &types.ThrottlingException{Message: aws.String("Throttling Test")},
}, nil
}
plugin := Timestream{
MappingMode: MappingModeMultiTable,
DatabaseName: tsDBName,
Log: testutil.Logger{},
}
require.NoError(t, plugin.Connect())
input := testutil.MustMetric(
metricName1,
map[string]string{"tag1": "value1"},
map[string]interface{}{"value": float64(1)},
time1,
)
err := plugin.Write([]telegraf.Metric{input})
require.Error(t, err, "Expected an error to be returned to Telegraf, "+
"so that the write will be retried by Telegraf later.")
}
func TestRejectedRecordsErrorResultsInMetricsBeingSkipped(t *testing.T) {
WriteFactory = func(*common_aws.CredentialConfig) (WriteClient, error) {
return &mockTimestreamErrorClient{
ErrorToReturnOnWriteRecords: &types.RejectedRecordsException{Message: aws.String("RejectedRecords Test")},
}, nil
}
plugin := Timestream{
MappingMode: MappingModeMultiTable,
DatabaseName: tsDBName,
Log: testutil.Logger{},
}
require.NoError(t, plugin.Connect())
input := testutil.MustMetric(
metricName1,
map[string]string{"tag1": "value1"},
map[string]interface{}{"value": float64(1)},
time1,
)
err := plugin.Write([]telegraf.Metric{input})
require.NoError(t, err, "Expected to silently swallow the RejectedRecordsException, "+
"as retrying this error doesn't make sense.")
}
func TestWriteWhenRequestsGreaterThanMaxWriteGoRoutinesCount(t *testing.T) {
t.Skip("Skipping test due to data race, will be re-visited")
const maxWriteRecordsCalls = 5
const maxRecordsInWriteRecordsCall = 100
const totalRecords = maxWriteRecordsCalls * maxRecordsInWriteRecordsCall
mockClient := &mockTimestreamClient{0}
WriteFactory = func(*common_aws.CredentialConfig) (WriteClient, error) {
return mockClient, nil
}
plugin := Timestream{
MappingMode: MappingModeMultiTable,
DatabaseName: tsDBName,
// Spawn only one go routine to serve all 5 write requests
MaxWriteGoRoutinesCount: 2,
Log: testutil.Logger{},
}
require.NoError(t, plugin.Connect())
inputs := make([]telegraf.Metric, 0, totalRecords)
for i := 1; i <= totalRecords; i++ {
fieldName := "value_supported" + strconv.Itoa(i)
inputs = append(inputs, testutil.MustMetric(
metricName1,
map[string]string{"tag1": "value1"},
map[string]interface{}{
fieldName: float64(10),
},
time1,
))
}
err := plugin.Write(inputs)
require.NoError(t, err, "Expected to write without any errors ")
require.Equal(t, maxWriteRecordsCalls, mockClient.WriteRecordsRequestCount, "Expected 5 calls to WriteRecords")
}
func TestWriteWhenRequestsLesserThanMaxWriteGoRoutinesCount(t *testing.T) {
t.Skip("Skipping test due to data race, will be re-visited")
const maxWriteRecordsCalls = 2
const maxRecordsInWriteRecordsCall = 100
const totalRecords = maxWriteRecordsCalls * maxRecordsInWriteRecordsCall
mockClient := &mockTimestreamClient{0}
WriteFactory = func(*common_aws.CredentialConfig) (WriteClient, error) {
return mockClient, nil
}
plugin := Timestream{
MappingMode: MappingModeMultiTable,
DatabaseName: tsDBName,
// Spawn 5 parallel go routines to serve 2 write requests
// In this case only 2 of the 5 go routines will process the write requests
MaxWriteGoRoutinesCount: 5,
Log: testutil.Logger{},
}
require.NoError(t, plugin.Connect())
inputs := make([]telegraf.Metric, 0, totalRecords)
for i := 1; i <= totalRecords; i++ {
fieldName := "value_supported" + strconv.Itoa(i)
inputs = append(inputs, testutil.MustMetric(
metricName1,
map[string]string{"tag1": "value1"},
map[string]interface{}{
fieldName: float64(10),
},
time1,
))
}
err := plugin.Write(inputs)
require.NoError(t, err, "Expected to write without any errors ")
require.Equal(t, maxWriteRecordsCalls, mockClient.WriteRecordsRequestCount, "Expected 5 calls to WriteRecords")
}
func TestTransformMetricsSkipEmptyMetric(t *testing.T) {
input1 := testutil.MustMetric(
metricName1,
map[string]string{"tag1": "value1"},
map[string]interface{}{}, // no fields here
time1,
)
input2 := testutil.MustMetric(
metricName1,
map[string]string{"tag2": "value2"},
map[string]interface{}{
"value": float64(10),
},
time1,
)
input3 := testutil.MustMetric(
metricName1,
map[string]string{}, // record with no dimensions should appear in the results
map[string]interface{}{
"value": float64(20),
},
time1,
)
records := buildRecords([]SimpleInput{
{
t: time1Epoch,
tableName: metricName1,
dimensions: map[string]string{"tag2": "value2", testSingleTableDim: metricName1},
measureValues: map[string]string{"value": "10"},
},
{
t: time1Epoch,
tableName: metricName1,
dimensions: map[string]string{testSingleTableDim: metricName1},
measureValues: map[string]string{"value": "20"},
},
})
expectedResultSingleTable := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(tsDBName),
TableName: aws.String(testSingleTableName),
Records: records,
CommonAttributes: &types.Record{},
}
comparisonTest(t, MappingModeSingleTable,
[]telegraf.Metric{input1, input2, input3},
[]*timestreamwrite.WriteRecordsInput{expectedResultSingleTable})
recordsMulti := buildRecords([]SimpleInput{
{
t: time1Epoch,
tableName: metricName1,
dimensions: map[string]string{"tag2": "value2"},
measureValues: map[string]string{"value": "10"},
},
{
t: time1Epoch,
tableName: metricName1,
dimensions: map[string]string{},
measureValues: map[string]string{"value": "20"},
},
})
expectedResultMultiTable := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(tsDBName),
TableName: aws.String(metricName1),
Records: recordsMulti,
CommonAttributes: &types.Record{},
}
comparisonTest(t, MappingModeMultiTable,
[]telegraf.Metric{input1, input2, input3},
[]*timestreamwrite.WriteRecordsInput{expectedResultMultiTable})
}
func TestTransformMetricsRequestsAboveLimitAreSplit(t *testing.T) {
const maxRecordsInWriteRecordsCall = 100
inputs := make([]telegraf.Metric, 0, maxRecordsInWriteRecordsCall+1)
for i := 1; i <= maxRecordsInWriteRecordsCall+1; i++ {
fieldName := "value_supported" + strconv.Itoa(i)
inputs = append(inputs, testutil.MustMetric(
metricName1,
map[string]string{"tag1": "value1"},
map[string]interface{}{
fieldName: float64(10),
},
time1,
))
}
resultFields := make(map[string]string)
for i := 1; i <= maxRecordsInWriteRecordsCall; i++ {
fieldName := "value_supported" + strconv.Itoa(i)
resultFields[fieldName] = "10"
}
expectedResult1SingleTable := buildExpectedInput(SimpleInput{
t: time1Epoch,
tableName: testSingleTableName,
dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
measureValues: resultFields,
})
expectedResult2SingleTable := buildExpectedInput(SimpleInput{
t: time1Epoch,
tableName: testSingleTableName,
dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
measureValues: map[string]string{"value_supported" + strconv.Itoa(maxRecordsInWriteRecordsCall+1): "10"},
})
comparisonTest(t, MappingModeSingleTable,
inputs,
[]*timestreamwrite.WriteRecordsInput{expectedResult1SingleTable, expectedResult2SingleTable})
expectedResult1MultiTable := buildExpectedInput(SimpleInput{
t: time1Epoch,
tableName: metricName1,
dimensions: map[string]string{"tag1": "value1"},
measureValues: resultFields,
})
expectedResult2MultiTable := buildExpectedInput(SimpleInput{
t: time1Epoch,
tableName: metricName1,
dimensions: map[string]string{"tag1": "value1"},
measureValues: map[string]string{"value_supported" + strconv.Itoa(maxRecordsInWriteRecordsCall+1): "10"},
})
comparisonTest(t, MappingModeMultiTable,
inputs,
[]*timestreamwrite.WriteRecordsInput{expectedResult1MultiTable, expectedResult2MultiTable})
}
func TestTransformMetricsRequestsAboveLimitAreSplitSingleTable(t *testing.T) {
const maxRecordsInWriteRecordsCall = 100
localTime, err := strconv.Atoi(time1Epoch)
require.NoError(t, err)
inputs := make([]telegraf.Metric, 0, maxRecordsInWriteRecordsCall+1)
for i := 1; i <= maxRecordsInWriteRecordsCall+1; i++ {
localTime++
fieldName := "value_supported" + strconv.Itoa(i)
inputs = append(inputs, testutil.MustMetric(
metricName1,
map[string]string{"tag1": "value1"},
map[string]interface{}{
fieldName: float64(10),
},
time.Unix(int64(localTime), 0),
))
}
localTime, err = strconv.Atoi(time1Epoch)
require.NoError(t, err)
var recordsFirstReq []types.Record
for i := 1; i <= maxRecordsInWriteRecordsCall; i++ {
localTime++
recordsFirstReq = append(recordsFirstReq, buildRecord(SimpleInput{
t: strconv.Itoa(localTime),
tableName: testSingleTableName,
dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
measureValues: map[string]string{"value_supported" + strconv.Itoa(i): "10"},
})...)
}
expectedResult1SingleTable := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(tsDBName),
TableName: aws.String(testSingleTableName),
Records: recordsFirstReq,
CommonAttributes: &types.Record{},
}
var recordsSecondReq []types.Record
localTime++
recordsSecondReq = append(recordsSecondReq, buildRecord(SimpleInput{
t: strconv.Itoa(localTime),
tableName: testSingleTableName,
dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
measureValues: map[string]string{"value_supported" + strconv.Itoa(maxRecordsInWriteRecordsCall+1): "10"},
})...)
expectedResult2SingleTable := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(tsDBName),
TableName: aws.String(testSingleTableName),
Records: recordsSecondReq,
CommonAttributes: &types.Record{},
}
comparisonTest(t, MappingModeSingleTable,
inputs,
[]*timestreamwrite.WriteRecordsInput{expectedResult1SingleTable, expectedResult2SingleTable})
}
func TestTransformMetricsDifferentDimensionsSameTimestampsAreWrittenSeparate(t *testing.T) {
input1 := testutil.MustMetric(
metricName1,
map[string]string{"tag1": "value1"},
map[string]interface{}{
"value_supported1": float64(10), "value_supported2": float64(20),
},
time1,
)
input2 := testutil.MustMetric(
metricName2,
map[string]string{"tag2": "value2"},
map[string]interface{}{
"value_supported3": float64(30),
},
time1,
)
recordsSingle := buildRecords([]SimpleInput{
{
t: time1Epoch,
tableName: testSingleTableName,
dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"},
},
{
t: time1Epoch,
tableName: testSingleTableName,
dimensions: map[string]string{"tag2": "value2", testSingleTableDim: metricName2},
measureValues: map[string]string{"value_supported3": "30"},
},
})
expectedResultSingleTable := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(tsDBName),
TableName: aws.String(testSingleTableName),
Records: recordsSingle,
CommonAttributes: &types.Record{},
}
comparisonTest(t, MappingModeSingleTable,
[]telegraf.Metric{input1, input2},
[]*timestreamwrite.WriteRecordsInput{expectedResultSingleTable})
expectedResult1MultiTable := buildExpectedInput(SimpleInput{
t: time1Epoch,
tableName: metricName1,
dimensions: map[string]string{"tag1": "value1"},
measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"},
})
expectedResult2MultiTable := buildExpectedInput(SimpleInput{
t: time1Epoch,
tableName: metricName2,
dimensions: map[string]string{"tag2": "value2"},
measureValues: map[string]string{"value_supported3": "30"},
})
comparisonTest(t, MappingModeMultiTable,
[]telegraf.Metric{input1, input2},
[]*timestreamwrite.WriteRecordsInput{expectedResult1MultiTable, expectedResult2MultiTable})
}
func TestTransformMetricsSameDimensionsDifferentDimensionValuesAreWrittenSeparate(t *testing.T) {
input1 := testutil.MustMetric(
metricName1,
map[string]string{"tag1": "value1"},
map[string]interface{}{
"value_supported1": float64(10),
},
time1,
)
input2 := testutil.MustMetric(
metricName2,
map[string]string{"tag1": "value2"},
map[string]interface{}{
"value_supported1": float64(20),
},
time1,
)
recordsSingle := buildRecords([]SimpleInput{
{
t: time1Epoch,
tableName: testSingleTableName,
dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
measureValues: map[string]string{"value_supported1": "10"},
},
{
t: time1Epoch,
tableName: testSingleTableName,
dimensions: map[string]string{"tag1": "value2", testSingleTableDim: metricName2},
measureValues: map[string]string{"value_supported1": "20"},
},
})
expectedResultSingleTable := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(tsDBName),
TableName: aws.String(testSingleTableName),
Records: recordsSingle,
CommonAttributes: &types.Record{},
}
comparisonTest(t, MappingModeSingleTable,
[]telegraf.Metric{input1, input2},
[]*timestreamwrite.WriteRecordsInput{expectedResultSingleTable})
expectedResult1MultiTable := buildExpectedInput(SimpleInput{
t: time1Epoch,
tableName: metricName1,
dimensions: map[string]string{"tag1": "value1"},
measureValues: map[string]string{"value_supported1": "10"},
})
expectedResult2MultiTable := buildExpectedInput(SimpleInput{
t: time1Epoch,
tableName: metricName2,
dimensions: map[string]string{"tag1": "value2"},
measureValues: map[string]string{"value_supported1": "20"},
})
comparisonTest(t, MappingModeMultiTable,
[]telegraf.Metric{input1, input2},
[]*timestreamwrite.WriteRecordsInput{expectedResult1MultiTable, expectedResult2MultiTable})
}
func TestTransformMetricsSameDimensionsDifferentTimestampsAreWrittenSeparate(t *testing.T) {
input1 := testutil.MustMetric(
metricName1,
map[string]string{"tag1": "value1"},
map[string]interface{}{
"value_supported1": float64(10), "value_supported2": float64(20),
},
time1,
)
input2 := testutil.MustMetric(
metricName1,
map[string]string{"tag1": "value1"},
map[string]interface{}{
"value_supported3": float64(30),
},
time2,
)
recordsSingle := buildRecords([]SimpleInput{
{
t: time1Epoch,
tableName: testSingleTableName,
dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"},
},
{
t: time2Epoch,
tableName: testSingleTableName,
dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
measureValues: map[string]string{"value_supported3": "30"},
},
})
expectedResultSingleTable := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(tsDBName),
TableName: aws.String(testSingleTableName),
Records: recordsSingle,
CommonAttributes: &types.Record{},
}
comparisonTest(t, MappingModeSingleTable,
[]telegraf.Metric{input1, input2},
[]*timestreamwrite.WriteRecordsInput{expectedResultSingleTable})
recordsMultiTable := buildRecords([]SimpleInput{
{
t: time1Epoch,
tableName: metricName1,
dimensions: map[string]string{"tag1": "value1"},
measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"},
},
{
t: time2Epoch,
tableName: metricName1,
dimensions: map[string]string{"tag1": "value1"},
measureValues: map[string]string{"value_supported3": "30"},
},
})
expectedResultMultiTable := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(tsDBName),
TableName: aws.String(metricName1),
Records: recordsMultiTable,
CommonAttributes: &types.Record{},
}
comparisonTest(t, MappingModeMultiTable,
[]telegraf.Metric{input1, input2},
[]*timestreamwrite.WriteRecordsInput{expectedResultMultiTable})
}
func TestTransformMetricsSameDimensionsSameTimestampsAreWrittenTogether(t *testing.T) {
input1 := testutil.MustMetric(
metricName1,
map[string]string{"tag1": "value1"},
map[string]interface{}{
"value_supported1": float64(10), "value_supported2": float64(20),
},
time1,
)
input2 := testutil.MustMetric(
metricName1,
map[string]string{"tag1": "value1"},
map[string]interface{}{
"value_supported3": float64(30),
},
time1,
)
expectedResultSingleTable := buildExpectedInput(SimpleInput{
t: time1Epoch,
tableName: testSingleTableName,
dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20", "value_supported3": "30"},
})
comparisonTest(t, MappingModeSingleTable,
[]telegraf.Metric{input1, input2},
[]*timestreamwrite.WriteRecordsInput{expectedResultSingleTable})
expectedResultMultiTable := buildExpectedInput(SimpleInput{
t: time1Epoch,
tableName: metricName1,
dimensions: map[string]string{"tag1": "value1"},
measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20", "value_supported3": "30"},
})
comparisonTest(t, MappingModeMultiTable,
[]telegraf.Metric{input1, input2},
[]*timestreamwrite.WriteRecordsInput{expectedResultMultiTable})
}
func TestTransformMetricsDifferentMetricsAreWrittenToDifferentTablesInMultiTableMapping(t *testing.T) {
input1 := testutil.MustMetric(
metricName1,
map[string]string{"tag1": "value1"},
map[string]interface{}{
"value_supported1": float64(10), "value_supported2": float64(20),
},
time1,
)
input2 := testutil.MustMetric(
metricName2,
map[string]string{"tag1": "value1"},
map[string]interface{}{
"value_supported3": float64(30),
},
time1,
)
recordsSingle := buildRecords([]SimpleInput{
{
t: time1Epoch,
tableName: testSingleTableName,
dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"},
},
{
t: time1Epoch,
tableName: testSingleTableName,
dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName2},
measureValues: map[string]string{"value_supported3": "30"},
},
})
expectedResultSingleTable := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(tsDBName),
TableName: aws.String(testSingleTableName),
Records: recordsSingle,
CommonAttributes: &types.Record{},
}
comparisonTest(t, MappingModeSingleTable,
[]telegraf.Metric{input1, input2},
[]*timestreamwrite.WriteRecordsInput{expectedResultSingleTable})
expectedResult1MultiTable := buildExpectedInput(SimpleInput{
t: time1Epoch,
tableName: metricName1,
dimensions: map[string]string{"tag1": "value1"},
measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"},
})
expectedResult2MultiTable := buildExpectedInput(SimpleInput{
t: time1Epoch,
tableName: metricName2,
dimensions: map[string]string{"tag1": "value1"},
measureValues: map[string]string{"value_supported3": "30"},
})
comparisonTest(t, MappingModeMultiTable,
[]telegraf.Metric{input1, input2},
[]*timestreamwrite.WriteRecordsInput{expectedResult1MultiTable, expectedResult2MultiTable})
}
func TestTransformMetricsUnsupportedFieldsAreSkipped(t *testing.T) {
metricWithUnsupportedField := testutil.MustMetric(
metricName1,
map[string]string{"tag1": "value1"},
map[string]interface{}{
"value_supported1": float64(10), "value_unsupported": time.Now(),
},
time1,
)
expectedResultSingleTable := buildExpectedInput(SimpleInput{
t: time1Epoch,
tableName: testSingleTableName,
dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
measureValues: map[string]string{"value_supported1": "10"},
})
comparisonTest(t, MappingModeSingleTable,
[]telegraf.Metric{metricWithUnsupportedField},
[]*timestreamwrite.WriteRecordsInput{expectedResultSingleTable})
expectedResultMultiTable := buildExpectedInput(SimpleInput{
t: time1Epoch,
tableName: metricName1,
dimensions: map[string]string{"tag1": "value1"},
measureValues: map[string]string{"value_supported1": "10"},
})
comparisonTest(t, MappingModeMultiTable,
[]telegraf.Metric{metricWithUnsupportedField},
[]*timestreamwrite.WriteRecordsInput{expectedResultMultiTable})
}
func TestCustomEndpoint(t *testing.T) {
customEndpoint := "http://test.custom.endpoint.com"
plugin := Timestream{
MappingMode: MappingModeMultiTable,
DatabaseName: tsDBName,
Log: testutil.Logger{},
CredentialConfig: common_aws.CredentialConfig{EndpointURL: customEndpoint},
}
// validate config correctness
err := plugin.Connect()
require.NoError(t, err, "Invalid configuration")
// Check customURL is used
require.Equal(t, plugin.EndpointURL, customEndpoint)
}
func comparisonTest(t *testing.T,
mappingMode string,
telegrafMetrics []telegraf.Metric,
timestreamRecords []*timestreamwrite.WriteRecordsInput,
) {
var plugin Timestream
switch mappingMode {
case MappingModeSingleTable:
plugin = Timestream{
MappingMode: mappingMode,
DatabaseName: tsDBName,
SingleTableName: testSingleTableName,
SingleTableDimensionNameForTelegrafMeasurementName: testSingleTableDim,
Log: testutil.Logger{},
}
case MappingModeMultiTable:
plugin = Timestream{
MappingMode: mappingMode,
DatabaseName: tsDBName,
Log: testutil.Logger{},
}
}
comparison(t, plugin, mappingMode, telegrafMetrics, timestreamRecords)
}
func comparison(t *testing.T,
plugin Timestream,
mappingMode string,
telegrafMetrics []telegraf.Metric,
timestreamRecords []*timestreamwrite.WriteRecordsInput) {
result := plugin.TransformMetrics(telegrafMetrics)
require.Len(t, result, len(timestreamRecords), "The number of transformed records was expected to be different")
for _, tsRecord := range timestreamRecords {
require.True(t, arrayContains(result, tsRecord), "Expected that the list of requests to Timestream: \n%s\n\n "+
"will contain request: \n%s\n\nUsed MappingMode: %s", result, tsRecord, mappingMode)
}
}
func arrayContains(
array []*timestreamwrite.WriteRecordsInput,
element *timestreamwrite.WriteRecordsInput,
) bool {
sortWriteInputForComparison(*element)
for _, a := range array {
sortWriteInputForComparison(*a)
if reflect.DeepEqual(a, element) {
return true
}
}
return false
}
func sortWriteInputForComparison(element timestreamwrite.WriteRecordsInput) {
// sort the records by MeasureName, as they are kept in an array, but the order of records doesn't matter
sort.Slice(element.Records, func(i, j int) bool {
return strings.Compare(*element.Records[i].MeasureName, *element.Records[j].MeasureName) < 0
})
// sort the dimensions in CommonAttributes
if element.CommonAttributes != nil {
sort.Slice(element.CommonAttributes.Dimensions, func(i, j int) bool {
return strings.Compare(*element.CommonAttributes.Dimensions[i].Name,
*element.CommonAttributes.Dimensions[j].Name) < 0
})
}
// sort the dimensions in Records
for _, r := range element.Records {
sort.Slice(r.Dimensions, func(i, j int) bool {
return strings.Compare(*r.Dimensions[i].Name, *r.Dimensions[j].Name) < 0
})
}
}
type SimpleInput struct {
t string
tableName string
dimensions map[string]string
measureValues map[string]string
}
func buildExpectedInput(i SimpleInput) *timestreamwrite.WriteRecordsInput {
tsDimensions := make([]types.Dimension, 0, len(i.dimensions))
for k, v := range i.dimensions {
tsDimensions = append(tsDimensions, types.Dimension{
Name: aws.String(k),
Value: aws.String(v),
})
}
tsRecords := make([]types.Record, 0, len(i.measureValues))
for k, v := range i.measureValues {
tsRecords = append(tsRecords, types.Record{
MeasureName: aws.String(k),
MeasureValue: aws.String(v),
MeasureValueType: types.MeasureValueTypeDouble,
Dimensions: tsDimensions,
Time: aws.String(i.t),
TimeUnit: timeUnit,
})
}
result := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(tsDBName),
TableName: aws.String(i.tableName),
Records: tsRecords,
CommonAttributes: &types.Record{},
}
return result
}
func buildRecords(inputs []SimpleInput) []types.Record {
var tsRecords []types.Record
for _, inp := range inputs {
tsRecords = append(tsRecords, buildRecord(inp)...)
}
return tsRecords
}
func buildRecord(input SimpleInput) []types.Record {
tsDimensions := make([]types.Dimension, 0, len(input.dimensions))
for k, v := range input.dimensions {
tsDimensions = append(tsDimensions, types.Dimension{
Name: aws.String(k),
Value: aws.String(v),
})
}
tsRecords := make([]types.Record, 0, len(input.measureValues))
for k, v := range input.measureValues {
tsRecords = append(tsRecords, types.Record{
MeasureName: aws.String(k),
MeasureValue: aws.String(v),
MeasureValueType: types.MeasureValueTypeDouble,
Dimensions: tsDimensions,
Time: aws.String(input.t),
TimeUnit: timeUnit,
})
}
return tsRecords
}
func buildMultiRecords(inputs []SimpleInput, multiMeasureName string, measureType types.MeasureValueType) []types.Record {
tsRecords := make([]types.Record, 0, len(inputs))
for _, input := range inputs {
tsDimensions := make([]types.Dimension, 0, len(input.dimensions))
for k, v := range input.dimensions {
tsDimensions = append(tsDimensions, types.Dimension{
Name: aws.String(k),
Value: aws.String(v),
})
}
multiMeasures := make([]types.MeasureValue, 0, len(input.measureValues))
for k, v := range input.measureValues {
multiMeasures = append(multiMeasures, types.MeasureValue{
Name: aws.String(k),
Value: aws.String(v),
Type: measureType,
})
}
tsRecords = append(tsRecords, types.Record{
MeasureName: aws.String(multiMeasureName),
MeasureValueType: "MULTI",
MeasureValues: multiMeasures,
Dimensions: tsDimensions,
Time: aws.String(input.t),
TimeUnit: timeUnit,
})
}
return tsRecords
}