1
0
Fork 0
telegraf/plugins/outputs/iotdb/iotdb_test.go

642 lines
19 KiB
Go
Raw Permalink Normal View History

package iotdb
import (
"math"
"strconv"
"testing"
"time"
"github.com/apache/iotdb-client-go/client"
"github.com/docker/go-connections/nat"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
)
// newMetricWithOrderedFields creates new Metric and makes sure fields are in
// order. This is required to define the expected output where the field order
// needs to be defines.
func newMetricWithOrderedFields(
name string,
tags []telegraf.Tag,
fields []telegraf.Field,
timestamp time.Time,
) telegraf.Metric {
m := metric.New(name, map[string]string{}, map[string]interface{}{}, timestamp)
for _, tag := range tags {
m.AddTag(tag.Key, tag.Value)
}
for _, field := range fields {
m.AddField(field.Key, field.Value)
}
return m
}
func TestInitInvalid(t *testing.T) {
tests := []struct {
name string
plugin *IoTDB
expected string
}{
{
name: "empty tag-conversion",
plugin: func() *IoTDB {
s := newIoTDB()
s.TreatTagsAs = ""
s.Log = &testutil.Logger{}
return s
}(),
expected: `unknown 'convert_tags_to' method ""`,
},
{
name: "empty uint-conversion",
plugin: func() *IoTDB {
s := newIoTDB()
s.ConvertUint64To = ""
s.Log = &testutil.Logger{}
return s
}(),
expected: `unknown 'uint64_conversion' method ""`,
},
{
name: "empty timestamp precision",
plugin: func() *IoTDB {
s := newIoTDB()
s.TimeStampUnit = ""
s.Log = &testutil.Logger{}
return s
}(),
expected: `unknown 'timestamp_precision' method ""`,
},
{
name: "invalid tag-conversion",
plugin: func() *IoTDB {
s := newIoTDB()
s.TreatTagsAs = "garbage"
s.Log = &testutil.Logger{}
return s
}(),
expected: `unknown 'convert_tags_to' method "garbage"`,
},
{
name: "invalid uint-conversion",
plugin: func() *IoTDB {
s := newIoTDB()
s.ConvertUint64To = "garbage"
s.Log = &testutil.Logger{}
return s
}(),
expected: `unknown 'uint64_conversion' method "garbage"`,
},
{
name: "invalid timestamp precision",
plugin: func() *IoTDB {
s := newIoTDB()
s.TimeStampUnit = "garbage"
s.Log = &testutil.Logger{}
return s
}(),
expected: `unknown 'timestamp_precision' method "garbage"`,
},
{
name: "negative timeout",
plugin: func() *IoTDB {
s := newIoTDB()
s.Timeout = config.Duration(time.Second * -5)
s.Log = &testutil.Logger{}
return s
}(),
expected: `negative timeout`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require.EqualError(t, tt.plugin.Init(), tt.expected)
})
}
}
// Test Metric conversion, which means testing function `convertMetricsToRecordsWithTags`
func TestMetricConversionToRecordsWithTags(t *testing.T) {
var testTimestamp = time.Date(2022, time.July, 20, 12, 25, 33, 44, time.UTC)
tests := []struct {
name string
plugin *IoTDB
expected recordsWithTags
metrics []telegraf.Metric
}{
{
name: "default config",
plugin: func() *IoTDB { s := newIoTDB(); return s }(),
expected: recordsWithTags{
DeviceIDList: []string{"root.computer.fan", "root.computer.fan", "root.computer.keyboard"},
MeasurementsList: [][]string{
{"temperature", "counter"},
{"counter", "temperature"},
{"temperature", "counter", "unsigned_big", "string", "bool", "int_text"},
},
ValuesList: [][]interface{}{
{float64(42.55), int64(987654321)},
{int64(123456789), float64(56.24)},
{float64(30.33), int64(123456789), int64(math.MaxInt64), "Made in China.", bool(false), "123456789011"},
},
DataTypesList: [][]client.TSDataType{
{client.DOUBLE, client.INT64},
{client.INT64, client.DOUBLE},
{client.DOUBLE, client.INT64, client.INT64, client.TEXT, client.BOOLEAN, client.TEXT},
},
TimestampList: []int64{testTimestamp.UnixNano(), testTimestamp.UnixNano(), testTimestamp.UnixNano()},
},
metrics: []telegraf.Metric{
newMetricWithOrderedFields(
"root.computer.fan",
[]telegraf.Tag{
{Key: "price", Value: "expensive"},
{Key: "owner", Value: "cpu"},
},
[]telegraf.Field{
{Key: "temperature", Value: float64(42.55)},
{Key: "counter", Value: int64(987654321)},
},
testTimestamp,
),
newMetricWithOrderedFields(
"root.computer.fan",
[]telegraf.Tag{ // same keys in different order
{Key: "owner", Value: "gpu"},
{Key: "price", Value: "cheap"},
},
[]telegraf.Field{ // same keys in different order
{Key: "counter", Value: int64(123456789)},
{Key: "temperature", Value: float64(56.24)},
},
testTimestamp,
),
newMetricWithOrderedFields(
"root.computer.keyboard",
nil,
[]telegraf.Field{
{Key: "temperature", Value: float64(30.33)},
{Key: "counter", Value: int64(123456789)},
{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
{Key: "string", Value: "Made in China."},
{Key: "bool", Value: bool(false)},
{Key: "int_text", Value: "123456789011"},
},
testTimestamp,
),
},
},
{
name: "unsigned int to text",
plugin: func() *IoTDB { cli002 := newIoTDB(); cli002.ConvertUint64To = "text"; return cli002 }(),
expected: recordsWithTags{
DeviceIDList: []string{"root.computer.uint_to_text"},
MeasurementsList: [][]string{{"unsigned_big"}},
ValuesList: [][]interface{}{{strconv.FormatUint(uint64(math.MaxInt64+1000), 10)}},
DataTypesList: [][]client.TSDataType{{client.TEXT}},
TimestampList: []int64{testTimestamp.UnixNano()},
},
metrics: []telegraf.Metric{
newMetricWithOrderedFields(
"root.computer.uint_to_text",
nil,
[]telegraf.Field{
{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
},
testTimestamp,
),
},
},
{
name: "unsigned int to int with overflow",
plugin: func() *IoTDB { cli002 := newIoTDB(); cli002.ConvertUint64To = "int64"; return cli002 }(),
expected: recordsWithTags{
DeviceIDList: []string{"root.computer.overflow"},
MeasurementsList: [][]string{{"unsigned_big"}},
ValuesList: [][]interface{}{{int64(-9223372036854774809)}},
DataTypesList: [][]client.TSDataType{{client.INT64}},
TimestampList: []int64{testTimestamp.UnixNano()},
},
metrics: []telegraf.Metric{
newMetricWithOrderedFields(
"root.computer.overflow",
nil,
[]telegraf.Field{
{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
},
testTimestamp,
),
},
},
{
name: "second timestamp precision",
plugin: func() *IoTDB { s := newIoTDB(); s.TimeStampUnit = "second"; return s }(),
expected: recordsWithTags{
DeviceIDList: []string{"root.computer.second"},
MeasurementsList: [][]string{{"unsigned_big"}},
ValuesList: [][]interface{}{{int64(math.MaxInt64)}},
DataTypesList: [][]client.TSDataType{{client.INT64}},
TimestampList: []int64{testTimestamp.Unix()},
},
metrics: []telegraf.Metric{
newMetricWithOrderedFields(
"root.computer.second",
nil,
[]telegraf.Field{
{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
},
testTimestamp,
),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.plugin.Log = &testutil.Logger{}
require.NoError(t, tt.plugin.Init())
actual, err := tt.plugin.convertMetricsToRecordsWithTags(tt.metrics)
require.NoError(t, err)
// Ignore the tags-list for comparison
actual.TagsList = nil
expected := tt.expected
require.EqualValues(t, &expected, actual)
})
}
}
// Test tag sanitize
func TestTagSanitization(t *testing.T) {
tests := []struct {
name string
plugin *IoTDB
expected []string
input []string
}{
{ // don't sanitize tags containing UnsopportedCharacter on IoTDB V1.3
name: "Don't Sanitize Tags",
plugin: func() *IoTDB { s := newIoTDB(); s.SanitizeTags = "1.3"; return s }(),
expected: []string{"word", "`word`", "word_"},
input: []string{"word", "`word`", "word_"},
},
{ // sanitize tags containing UnsupportedCharacter on IoTDB V1.3 enclosing them in backticks
name: "Sanitize Tags",
plugin: func() *IoTDB { s := newIoTDB(); s.SanitizeTags = "1.3"; return s }(),
expected: []string{"`wo rd`", "`@`", "`$`", "`#`", "`:`", "`{`", "`}`", "`1`", "`1234`"},
input: []string{"wo rd", "@", "$", "#", ":", "{", "}", "1", "1234"},
},
{ // test on forbidden word and forbidden syntax
name: "Errors",
plugin: func() *IoTDB { s := newIoTDB(); s.SanitizeTags = "1.3"; return s }(),
expected: []string{"", ""},
input: []string{"root", "wo`rd"},
},
{
name: "Don't Sanitize Tags",
plugin: func() *IoTDB { s := newIoTDB(); s.SanitizeTags = "0.13"; return s }(),
expected: []string{"word", "`word`", "word_", "@", "$", "#", ":", "{", "}"},
input: []string{"word", "`word`", "word_", "@", "$", "#", ":", "{", "}"},
},
{ // sanitize tags containing UnsupportedCharacter on IoTDB V0.13 enclosing them in backticks
name: "Sanitize Tags",
plugin: func() *IoTDB { s := newIoTDB(); s.SanitizeTags = "0.13"; return s }(),
expected: []string{"`wo rd`", "`\\`"},
input: []string{"wo rd", "\\"},
},
{ // test on forbidden word and forbidden syntax on IoTDB V0.13
name: "Errors",
plugin: func() *IoTDB { s := newIoTDB(); s.SanitizeTags = "0.13"; return s }(),
expected: []string{"", ""},
input: []string{"root", "wo`rd"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.plugin.Log = &testutil.Logger{}
require.NoError(t, tt.plugin.Init())
actuals := make([]string, 0, len(tt.input))
for _, input := range tt.input {
//nolint:errcheck // error cases handled by expected vs actual comparison
actual, _ := tt.plugin.validateTag(input)
actuals = append(actuals, actual)
}
require.EqualValues(t, tt.expected, actuals)
})
}
}
// Test tags handling, which means testing function `modifyRecordsWithTags`
func TestTagsHandling(t *testing.T) {
var testTimestamp = time.Date(2022, time.July, 20, 12, 25, 33, 44, time.UTC)
tests := []struct {
name string
plugin *IoTDB
expected recordsWithTags
input recordsWithTags
}{
{ // treat tags as fields. And input Tags are NOT in order.
name: "treat tags as fields",
plugin: func() *IoTDB { s := newIoTDB(); s.TreatTagsAs = "fields"; return s }(),
expected: recordsWithTags{
DeviceIDList: []string{"root.computer.fields"},
MeasurementsList: [][]string{{"temperature", "counter", "owner", "price"}},
ValuesList: [][]interface{}{
{float64(42.55), int64(987654321), "cpu", "expensive"},
},
DataTypesList: [][]client.TSDataType{
{client.DOUBLE, client.INT64, client.TEXT, client.TEXT},
},
TimestampList: []int64{testTimestamp.UnixNano()},
},
input: recordsWithTags{
DeviceIDList: []string{"root.computer.fields"},
MeasurementsList: [][]string{{"temperature", "counter"}},
ValuesList: [][]interface{}{
{float64(42.55), int64(987654321)},
},
DataTypesList: [][]client.TSDataType{
{client.DOUBLE, client.INT64},
},
TimestampList: []int64{testTimestamp.UnixNano()},
TagsList: [][]*telegraf.Tag{{
{Key: "owner", Value: "cpu"},
{Key: "price", Value: "expensive"},
}},
},
},
{ // treat tags as device IDs. And input Tags are in order.
name: "treat tags as device IDs",
plugin: func() *IoTDB { s := newIoTDB(); s.TreatTagsAs = "device_id"; return s }(),
expected: recordsWithTags{
DeviceIDList: []string{"root.computer.deviceID.cpu.expensive"},
MeasurementsList: [][]string{{"temperature", "counter"}},
ValuesList: [][]interface{}{
{float64(42.55), int64(987654321)},
},
DataTypesList: [][]client.TSDataType{
{client.DOUBLE, client.INT64},
},
TimestampList: []int64{testTimestamp.UnixNano()},
},
input: recordsWithTags{
DeviceIDList: []string{"root.computer.deviceID"},
MeasurementsList: [][]string{{"temperature", "counter"}},
ValuesList: [][]interface{}{
{float64(42.55), int64(987654321)},
},
DataTypesList: [][]client.TSDataType{
{client.DOUBLE, client.INT64},
},
TimestampList: []int64{testTimestamp.UnixNano()},
TagsList: [][]*telegraf.Tag{{
{Key: "owner", Value: "cpu"},
{Key: "price", Value: "expensive"},
}},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
input := tt.input
tt.plugin.Log = &testutil.Logger{}
require.NoError(t, tt.plugin.Init())
require.NoError(t, tt.plugin.modifyRecordsWithTags(&input))
// Ignore the tags-list for comparison
tt.input.TagsList = nil
require.EqualValues(t, tt.expected, tt.input)
})
}
}
// Test entire Metric conversion, from metrics to records which are ready to insert
func TestEntireMetricConversion(t *testing.T) {
var testTimestamp = time.Date(2022, time.July, 20, 12, 25, 33, 44, time.UTC)
tests := []struct {
name string
plugin *IoTDB
expected recordsWithTags
metrics []telegraf.Metric
requireEqual bool
}{
{
name: "default config",
plugin: func() *IoTDB { s := newIoTDB(); return s }(),
expected: recordsWithTags{
DeviceIDList: []string{"root.computer.screen.high.LED"},
MeasurementsList: [][]string{
{"temperature", "counter", "unsigned_big", "string", "bool", "int_text"},
},
ValuesList: [][]interface{}{
{float64(30.33), int64(123456789), int64(math.MaxInt64), "Made in China.", bool(false), "123456789011"},
},
DataTypesList: [][]client.TSDataType{
{client.DOUBLE, client.INT64, client.INT64, client.TEXT, client.BOOLEAN, client.TEXT},
},
TimestampList: []int64{testTimestamp.UnixNano()},
},
metrics: []telegraf.Metric{
newMetricWithOrderedFields(
"root.computer.screen",
[]telegraf.Tag{
{Key: "brightness", Value: "high"},
{Key: "type", Value: "LED"},
},
[]telegraf.Field{
{Key: "temperature", Value: float64(30.33)},
{Key: "counter", Value: int64(123456789)},
{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
{Key: "string", Value: "Made in China."},
{Key: "bool", Value: bool(false)},
{Key: "int_text", Value: "123456789011"},
},
testTimestamp,
),
},
requireEqual: true,
},
{
name: "wrong order of tags",
plugin: func() *IoTDB { s := newIoTDB(); return s }(),
expected: recordsWithTags{
DeviceIDList: []string{"root.computer.screen.LED.high"},
MeasurementsList: [][]string{
{"temperature", "counter", "unsigned_big", "string", "bool", "int_text"},
},
ValuesList: [][]interface{}{
{float64(30.33), int64(123456789), int64(math.MaxInt64), "Made in China.", bool(false), "123456789011"},
},
DataTypesList: [][]client.TSDataType{
{client.DOUBLE, client.INT64, client.INT64, client.TEXT, client.BOOLEAN, client.TEXT},
},
TimestampList: []int64{testTimestamp.UnixNano()},
},
metrics: []telegraf.Metric{
newMetricWithOrderedFields(
"root.computer.screen",
[]telegraf.Tag{
{Key: "brightness", Value: "high"},
{Key: "type", Value: "LED"},
},
[]telegraf.Field{
{Key: "temperature", Value: float64(30.33)},
{Key: "counter", Value: int64(123456789)},
{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
{Key: "string", Value: "Made in China."},
{Key: "bool", Value: bool(false)},
{Key: "int_text", Value: "123456789011"},
},
testTimestamp,
),
},
requireEqual: false,
},
{
name: "wrong order of tags",
plugin: func() *IoTDB { s := newIoTDB(); return s }(),
expected: recordsWithTags{
DeviceIDList: []string{"root.computer.screen.LED.high"},
MeasurementsList: [][]string{
{"temperature", "counter"},
},
ValuesList: [][]interface{}{
{float64(30.33), int64(123456789)},
},
DataTypesList: [][]client.TSDataType{
{client.DOUBLE, client.INT64},
},
TimestampList: []int64{testTimestamp.UnixNano()},
},
metrics: []telegraf.Metric{
newMetricWithOrderedFields(
"root.computer.screen",
[]telegraf.Tag{
{Key: "brightness", Value: "high"},
{Key: "type", Value: "LED"},
},
[]telegraf.Field{
{Key: "temperature", Value: float64(30.33)},
{Key: "counter", Value: int64(123456789)},
},
testTimestamp,
),
},
requireEqual: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.plugin.Log = &testutil.Logger{}
require.NoError(t, tt.plugin.Init())
actual, err := tt.plugin.convertMetricsToRecordsWithTags(tt.metrics)
require.NoError(t, err)
require.NoError(t, tt.plugin.modifyRecordsWithTags(actual))
// Ignore the tags-list for comparison
actual.TagsList = nil
expected := tt.expected
if tt.requireEqual {
require.EqualValues(t, &expected, actual)
} else {
require.NotEqualValues(t, &expected, actual)
}
})
}
}
// Start a container and do integration test.
func TestIntegrationInserts(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
const iotdbPort = "6667"
container := testutil.Container{
Image: "apache/iotdb:0.13.0-node",
ExposedPorts: []string{iotdbPort},
WaitingFor: wait.ForAll(
wait.ForListeningPort(nat.Port(iotdbPort)),
wait.ForLog("IoTDB has started."),
),
}
err := container.Start()
require.NoError(t, err, "failed to start IoTDB container")
defer container.Terminate()
t.Logf("Container Address:%q, ExposedPorts:[%v:%v]", container.Address, container.Ports[iotdbPort], iotdbPort)
// create a client and tests two groups of insertion
testClient := &IoTDB{
Host: container.Address,
Port: container.Ports[iotdbPort],
User: config.NewSecret([]byte("root")),
Password: config.NewSecret([]byte("root")),
Timeout: config.Duration(time.Second * 5),
ConvertUint64To: "int64_clip",
TimeStampUnit: "nanosecond",
TreatTagsAs: "device_id",
}
testClient.Log = &testutil.Logger{}
// generate Metrics to input
metrics := []telegraf.Metric{
newMetricWithOrderedFields(
"root.computer.unsigned_big",
nil,
[]telegraf.Field{
{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
},
time.Now(),
),
newMetricWithOrderedFields(
"root.computer.fan",
[]telegraf.Tag{
{Key: "price", Value: "expensive"},
{Key: "owner", Value: "cpu"},
},
[]telegraf.Field{
{Key: "temperature", Value: float64(42.55)},
{Key: "counter", Value: int64(987654321)},
},
time.Now(),
),
newMetricWithOrderedFields(
"root.computer.fan",
[]telegraf.Tag{ // same keys in different order
{Key: "owner", Value: "gpu"},
{Key: "price", Value: "cheap"},
},
[]telegraf.Field{ // same keys in different order
{Key: "counter", Value: int64(123456789)},
{Key: "temperature", Value: float64(56.24)},
},
time.Now(),
),
newMetricWithOrderedFields(
"root.computer.keyboard",
nil,
[]telegraf.Field{
{Key: "temperature", Value: float64(30.33)},
{Key: "counter", Value: int64(123456789)},
{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
{Key: "string", Value: "Made in China."},
{Key: "bool", Value: bool(false)},
{Key: "int_text", Value: "123456789011"},
},
time.Now(),
),
}
require.NoError(t, testClient.Connect())
require.NoError(t, testClient.Write(metrics))
require.NoError(t, testClient.Close())
}