262 lines
6.3 KiB
Go
262 lines
6.3 KiB
Go
|
package csv
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"os"
|
||
|
"path/filepath"
|
||
|
"strings"
|
||
|
"testing"
|
||
|
|
||
|
"github.com/influxdata/toml"
|
||
|
"github.com/stretchr/testify/require"
|
||
|
|
||
|
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||
|
"github.com/influxdata/telegraf/plugins/serializers"
|
||
|
"github.com/influxdata/telegraf/testutil"
|
||
|
)
|
||
|
|
||
|
func TestInvalidTimestampFormat(t *testing.T) {
|
||
|
s := Serializer{
|
||
|
TimestampFormat: "garbage",
|
||
|
}
|
||
|
require.EqualError(t, s.Init(), `invalid timestamp format "garbage"`)
|
||
|
}
|
||
|
|
||
|
func TestInvalidSeparator(t *testing.T) {
|
||
|
s := Serializer{
|
||
|
Separator: "garbage",
|
||
|
}
|
||
|
require.EqualError(t, s.Init(), `invalid separator "garbage"`)
|
||
|
|
||
|
s = Serializer{
|
||
|
Separator: "\n",
|
||
|
}
|
||
|
require.NoError(t, s.Init())
|
||
|
|
||
|
_, err := s.Serialize(testutil.TestMetric(42.3, "test"))
|
||
|
require.EqualError(t, err, "writing data failed: csv: invalid field or comment delimiter")
|
||
|
}
|
||
|
|
||
|
func TestSerializeTransformationNonBatch(t *testing.T) {
|
||
|
var tests = []struct {
|
||
|
name string
|
||
|
filename string
|
||
|
}{
|
||
|
{
|
||
|
name: "basic",
|
||
|
filename: "testcases/basic.conf",
|
||
|
},
|
||
|
{
|
||
|
name: "unix nanoseconds timestamp",
|
||
|
filename: "testcases/nanoseconds.conf",
|
||
|
},
|
||
|
{
|
||
|
name: "header",
|
||
|
filename: "testcases/header.conf",
|
||
|
},
|
||
|
{
|
||
|
name: "header with prefix",
|
||
|
filename: "testcases/prefix.conf",
|
||
|
},
|
||
|
{
|
||
|
name: "header and RFC3339 timestamp",
|
||
|
filename: "testcases/rfc3339.conf",
|
||
|
},
|
||
|
{
|
||
|
name: "header and semicolon",
|
||
|
filename: "testcases/semicolon.conf",
|
||
|
},
|
||
|
{
|
||
|
name: "ordered without header",
|
||
|
filename: "testcases/ordered.conf",
|
||
|
},
|
||
|
{
|
||
|
name: "ordered with header",
|
||
|
filename: "testcases/ordered_with_header.conf",
|
||
|
},
|
||
|
{
|
||
|
name: "ordered with header and prefix",
|
||
|
filename: "testcases/ordered_with_header_prefix.conf",
|
||
|
},
|
||
|
{
|
||
|
name: "ordered non-existing fields and tags",
|
||
|
filename: "testcases/ordered_not_exist.conf",
|
||
|
},
|
||
|
}
|
||
|
parser := &influx.Parser{}
|
||
|
require.NoError(t, parser.Init())
|
||
|
|
||
|
for _, tt := range tests {
|
||
|
t.Run(tt.name, func(t *testing.T) {
|
||
|
filename := filepath.FromSlash(tt.filename)
|
||
|
cfg, header, err := loadTestConfiguration(filename)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
// Get the input metrics
|
||
|
metrics, err := testutil.ParseMetricsFrom(header, "Input:", parser)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
// Get the expectations
|
||
|
expectedFn, err := testutil.ParseRawLinesFrom(header, "Output File:")
|
||
|
require.NoError(t, err)
|
||
|
require.Len(t, expectedFn, 1, "only a single output file is supported")
|
||
|
expected, err := loadCSV(expectedFn[0])
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
// Serialize
|
||
|
serializer := Serializer{
|
||
|
TimestampFormat: cfg.TimestampFormat,
|
||
|
Separator: cfg.Separator,
|
||
|
Header: cfg.Header,
|
||
|
Prefix: cfg.Prefix,
|
||
|
Columns: cfg.Columns,
|
||
|
}
|
||
|
require.NoError(t, serializer.Init())
|
||
|
// expected results use LF endings
|
||
|
serializer.writer.UseCRLF = false
|
||
|
var actual bytes.Buffer
|
||
|
for _, m := range metrics {
|
||
|
buf, err := serializer.Serialize(m)
|
||
|
require.NoError(t, err)
|
||
|
_, err = actual.ReadFrom(bytes.NewReader(buf))
|
||
|
require.NoError(t, err)
|
||
|
}
|
||
|
// Compare
|
||
|
require.EqualValues(t, string(expected), actual.String())
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestSerializeTransformationBatch(t *testing.T) {
|
||
|
var tests = []struct {
|
||
|
name string
|
||
|
filename string
|
||
|
}{
|
||
|
{
|
||
|
name: "basic",
|
||
|
filename: "testcases/basic.conf",
|
||
|
},
|
||
|
{
|
||
|
name: "unix nanoseconds timestamp",
|
||
|
filename: "testcases/nanoseconds.conf",
|
||
|
},
|
||
|
{
|
||
|
name: "header",
|
||
|
filename: "testcases/header.conf",
|
||
|
},
|
||
|
{
|
||
|
name: "header with prefix",
|
||
|
filename: "testcases/prefix.conf",
|
||
|
},
|
||
|
{
|
||
|
name: "header and RFC3339 timestamp",
|
||
|
filename: "testcases/rfc3339.conf",
|
||
|
},
|
||
|
{
|
||
|
name: "header and semicolon",
|
||
|
filename: "testcases/semicolon.conf",
|
||
|
},
|
||
|
{
|
||
|
name: "ordered without header",
|
||
|
filename: "testcases/ordered.conf",
|
||
|
},
|
||
|
{
|
||
|
name: "ordered with header",
|
||
|
filename: "testcases/ordered_with_header.conf",
|
||
|
},
|
||
|
{
|
||
|
name: "ordered with header and prefix",
|
||
|
filename: "testcases/ordered_with_header_prefix.conf",
|
||
|
},
|
||
|
{
|
||
|
name: "ordered non-existing fields and tags",
|
||
|
filename: "testcases/ordered_not_exist.conf",
|
||
|
},
|
||
|
}
|
||
|
parser := &influx.Parser{}
|
||
|
require.NoError(t, parser.Init())
|
||
|
|
||
|
for _, tt := range tests {
|
||
|
t.Run(tt.name, func(t *testing.T) {
|
||
|
filename := filepath.FromSlash(tt.filename)
|
||
|
cfg, header, err := loadTestConfiguration(filename)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
// Get the input metrics
|
||
|
metrics, err := testutil.ParseMetricsFrom(header, "Input:", parser)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
// Get the expectations
|
||
|
expectedFn, err := testutil.ParseRawLinesFrom(header, "Output File:")
|
||
|
require.NoError(t, err)
|
||
|
require.Len(t, expectedFn, 1, "only a single output file is supported")
|
||
|
expected, err := loadCSV(expectedFn[0])
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
// Serialize
|
||
|
serializer := Serializer{
|
||
|
TimestampFormat: cfg.TimestampFormat,
|
||
|
Separator: cfg.Separator,
|
||
|
Header: cfg.Header,
|
||
|
Prefix: cfg.Prefix,
|
||
|
Columns: cfg.Columns,
|
||
|
}
|
||
|
require.NoError(t, serializer.Init())
|
||
|
// expected results use LF endings
|
||
|
serializer.writer.UseCRLF = false
|
||
|
actual, err := serializer.SerializeBatch(metrics)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
// Compare
|
||
|
require.EqualValues(t, string(expected), string(actual))
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type Config Serializer
|
||
|
|
||
|
func loadTestConfiguration(filename string) (*Config, []string, error) {
|
||
|
buf, err := os.ReadFile(filename)
|
||
|
if err != nil {
|
||
|
return nil, nil, err
|
||
|
}
|
||
|
|
||
|
header := make([]string, 0)
|
||
|
for _, line := range strings.Split(string(buf), "\n") {
|
||
|
line = strings.TrimSpace(line)
|
||
|
if strings.HasPrefix(line, "#") {
|
||
|
header = append(header, line)
|
||
|
}
|
||
|
}
|
||
|
var cfg Config
|
||
|
err = toml.Unmarshal(buf, &cfg)
|
||
|
return &cfg, header, err
|
||
|
}
|
||
|
|
||
|
func loadCSV(filename string) ([]byte, error) {
|
||
|
return os.ReadFile(filename)
|
||
|
}
|
||
|
|
||
|
func BenchmarkSerialize(b *testing.B) {
|
||
|
s := &Serializer{}
|
||
|
require.NoError(b, s.Init())
|
||
|
metrics := serializers.BenchmarkMetrics(b)
|
||
|
b.ResetTimer()
|
||
|
for i := 0; i < b.N; i++ {
|
||
|
_, err := s.Serialize(metrics[i%len(metrics)])
|
||
|
require.NoError(b, err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func BenchmarkSerializeBatch(b *testing.B) {
|
||
|
s := &Serializer{}
|
||
|
require.NoError(b, s.Init())
|
||
|
m := serializers.BenchmarkMetrics(b)
|
||
|
metrics := m[:]
|
||
|
b.ResetTimer()
|
||
|
for i := 0; i < b.N; i++ {
|
||
|
_, err := s.SerializeBatch(metrics)
|
||
|
require.NoError(b, err)
|
||
|
}
|
||
|
}
|