1215 lines
33 KiB
Go
1215 lines
33 KiB
Go
|
package postgresql
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"math"
|
||
|
"math/rand"
|
||
|
"strings"
|
||
|
"sync"
|
||
|
"testing"
|
||
|
"time"
|
||
|
|
||
|
"github.com/docker/go-connections/nat"
|
||
|
"github.com/jackc/pgx/v4"
|
||
|
"github.com/jackc/pgx/v4/pgxpool"
|
||
|
"github.com/stretchr/testify/require"
|
||
|
"github.com/testcontainers/testcontainers-go/wait"
|
||
|
|
||
|
"github.com/influxdata/telegraf"
|
||
|
"github.com/influxdata/telegraf/config"
|
||
|
"github.com/influxdata/telegraf/internal"
|
||
|
"github.com/influxdata/telegraf/metric"
|
||
|
"github.com/influxdata/telegraf/models"
|
||
|
"github.com/influxdata/telegraf/plugins/outputs/postgresql/utils"
|
||
|
"github.com/influxdata/telegraf/testutil"
|
||
|
)
|
||
|
|
||
|
type Log struct {
|
||
|
level pgx.LogLevel
|
||
|
format string
|
||
|
args []interface{}
|
||
|
}
|
||
|
|
||
|
func (l Log) String() string {
|
||
|
// We have to use Errorf() as Sprintf() doesn't allow usage of %w.
|
||
|
return fmt.Errorf("%s: "+l.format, append([]interface{}{l.level}, l.args...)...).Error()
|
||
|
}
|
||
|
|
||
|
// LogAccumulator is a log collector that satisfies telegraf.Logger.
|
||
|
type LogAccumulator struct {
|
||
|
logs []Log
|
||
|
cond *sync.Cond
|
||
|
tb testing.TB
|
||
|
emitLevel pgx.LogLevel
|
||
|
}
|
||
|
|
||
|
func NewLogAccumulator(tb testing.TB) *LogAccumulator {
|
||
|
return &LogAccumulator{
|
||
|
cond: sync.NewCond(&sync.Mutex{}),
|
||
|
tb: tb,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (la *LogAccumulator) Level() telegraf.LogLevel {
|
||
|
switch la.emitLevel {
|
||
|
case pgx.LogLevelInfo:
|
||
|
return telegraf.Info
|
||
|
case pgx.LogLevelWarn:
|
||
|
return telegraf.Warn
|
||
|
case pgx.LogLevelError:
|
||
|
return telegraf.Error
|
||
|
case pgx.LogLevelNone:
|
||
|
return telegraf.None
|
||
|
}
|
||
|
return telegraf.Debug
|
||
|
}
|
||
|
|
||
|
// Unused
|
||
|
func (*LogAccumulator) AddAttribute(string, interface{}) {}
|
||
|
|
||
|
func (la *LogAccumulator) append(level pgx.LogLevel, format string, args []interface{}) {
|
||
|
la.tb.Helper()
|
||
|
|
||
|
la.cond.L.Lock()
|
||
|
log := Log{level, format, args}
|
||
|
la.logs = append(la.logs, log)
|
||
|
|
||
|
if la.emitLevel == 0 || log.level <= la.emitLevel {
|
||
|
la.tb.Log(log.String())
|
||
|
}
|
||
|
|
||
|
la.cond.Broadcast()
|
||
|
la.cond.L.Unlock()
|
||
|
}
|
||
|
|
||
|
func (la *LogAccumulator) HasLevel(level pgx.LogLevel) bool {
|
||
|
la.cond.L.Lock()
|
||
|
defer la.cond.L.Unlock()
|
||
|
for _, log := range la.logs {
|
||
|
if log.level > 0 && log.level <= level {
|
||
|
return true
|
||
|
}
|
||
|
}
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
func (la *LogAccumulator) WaitLen(n int) []Log {
|
||
|
la.cond.L.Lock()
|
||
|
defer la.cond.L.Unlock()
|
||
|
for len(la.logs) < n {
|
||
|
la.cond.Wait()
|
||
|
}
|
||
|
return la.logs[:]
|
||
|
}
|
||
|
|
||
|
// Waits for a specific query log from pgx to show up.
|
||
|
func (la *LogAccumulator) WaitFor(f func(l Log) bool, waitCommit bool) {
|
||
|
la.cond.L.Lock()
|
||
|
defer la.cond.L.Unlock()
|
||
|
i := 0
|
||
|
var commitPid uint32
|
||
|
for {
|
||
|
for ; i < len(la.logs); i++ {
|
||
|
log := la.logs[i]
|
||
|
if commitPid == 0 {
|
||
|
if f(log) {
|
||
|
if !waitCommit {
|
||
|
return
|
||
|
}
|
||
|
commitPid = log.args[1].(MSI)["pid"].(uint32)
|
||
|
}
|
||
|
} else {
|
||
|
if len(log.args) < 2 {
|
||
|
continue
|
||
|
}
|
||
|
data, ok := log.args[1].(MSI)
|
||
|
if !ok || data["pid"] != commitPid {
|
||
|
continue
|
||
|
}
|
||
|
if log.args[0] == "Exec" && data["sql"] == "commit" {
|
||
|
return
|
||
|
} else if log.args[0] == "Exec" && data["sql"] == "rollback" {
|
||
|
// transaction aborted, start looking for another match
|
||
|
commitPid = 0
|
||
|
} else if log.level == pgx.LogLevelError {
|
||
|
commitPid = 0
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
la.cond.Wait()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (la *LogAccumulator) WaitForQuery(str string, waitCommit bool) {
|
||
|
la.WaitFor(func(log Log) bool {
|
||
|
return log.format == "PG %s - %+v" &&
|
||
|
(log.args[0].(string) == "Query" || log.args[0].(string) == "Exec") &&
|
||
|
strings.Contains(log.args[1].(MSI)["sql"].(string), str)
|
||
|
}, waitCommit)
|
||
|
}
|
||
|
|
||
|
func (la *LogAccumulator) WaitForCopy(tableName string, waitCommit bool) {
|
||
|
la.WaitFor(func(log Log) bool {
|
||
|
return log.format == "PG %s - %+v" &&
|
||
|
log.args[0].(string) == "CopyFrom" &&
|
||
|
log.args[1].(MSI)["tableName"].(pgx.Identifier)[1] == tableName
|
||
|
}, waitCommit)
|
||
|
}
|
||
|
|
||
|
// Clear any stored logs.
|
||
|
// Do not run this while any WaitFor* operations are in progress.
|
||
|
func (la *LogAccumulator) Clear() {
|
||
|
la.cond.L.Lock()
|
||
|
if len(la.logs) > 0 {
|
||
|
la.logs = nil
|
||
|
}
|
||
|
la.cond.L.Unlock()
|
||
|
}
|
||
|
|
||
|
func (la *LogAccumulator) Logs() []Log {
|
||
|
la.cond.L.Lock()
|
||
|
defer la.cond.L.Unlock()
|
||
|
return la.logs[:]
|
||
|
}
|
||
|
|
||
|
func (la *LogAccumulator) Errorf(format string, args ...interface{}) {
|
||
|
la.tb.Helper()
|
||
|
la.append(pgx.LogLevelError, format, args)
|
||
|
}
|
||
|
|
||
|
func (la *LogAccumulator) Error(args ...interface{}) {
|
||
|
la.tb.Helper()
|
||
|
la.append(pgx.LogLevelError, "%v", args)
|
||
|
}
|
||
|
|
||
|
func (la *LogAccumulator) Warnf(format string, args ...interface{}) {
|
||
|
la.tb.Helper()
|
||
|
la.append(pgx.LogLevelWarn, format, args)
|
||
|
}
|
||
|
|
||
|
func (la *LogAccumulator) Warn(args ...interface{}) {
|
||
|
la.tb.Helper()
|
||
|
la.append(pgx.LogLevelWarn, "%v", args)
|
||
|
}
|
||
|
|
||
|
func (la *LogAccumulator) Infof(format string, args ...interface{}) {
|
||
|
la.tb.Helper()
|
||
|
la.append(pgx.LogLevelInfo, format, args)
|
||
|
}
|
||
|
|
||
|
func (la *LogAccumulator) Info(args ...interface{}) {
|
||
|
la.tb.Helper()
|
||
|
la.append(pgx.LogLevelInfo, "%v", args)
|
||
|
}
|
||
|
|
||
|
func (la *LogAccumulator) Debugf(format string, args ...interface{}) {
|
||
|
la.tb.Helper()
|
||
|
la.append(pgx.LogLevelDebug, format, args)
|
||
|
}
|
||
|
|
||
|
func (la *LogAccumulator) Debug(args ...interface{}) {
|
||
|
la.tb.Helper()
|
||
|
la.append(pgx.LogLevelDebug, "%v", args)
|
||
|
}
|
||
|
|
||
|
func (la *LogAccumulator) Tracef(format string, args ...interface{}) {
|
||
|
la.tb.Helper()
|
||
|
la.append(pgx.LogLevelDebug, format, args)
|
||
|
}
|
||
|
|
||
|
func (la *LogAccumulator) Trace(args ...interface{}) {
|
||
|
la.tb.Helper()
|
||
|
la.append(pgx.LogLevelDebug, "%v", args)
|
||
|
}
|
||
|
|
||
|
var ctx = context.Background()
|
||
|
|
||
|
type PostgresqlTest struct {
|
||
|
*Postgresql
|
||
|
Logger *LogAccumulator
|
||
|
}
|
||
|
|
||
|
func newPostgresqlTest(tb testing.TB) (*PostgresqlTest, error) {
|
||
|
if testing.Short() {
|
||
|
tb.Skip("Skipping integration test in short mode")
|
||
|
}
|
||
|
|
||
|
servicePort := "5432"
|
||
|
username := "postgres"
|
||
|
password := "postgres"
|
||
|
testDatabaseName := "telegraf_test"
|
||
|
|
||
|
container := testutil.Container{
|
||
|
Image: "postgres:alpine",
|
||
|
ExposedPorts: []string{servicePort},
|
||
|
Env: map[string]string{
|
||
|
"POSTGRES_USER": username,
|
||
|
"POSTGRES_PASSWORD": password,
|
||
|
"POSTGRES_DB": "telegraf_test",
|
||
|
},
|
||
|
WaitingFor: wait.ForAll(
|
||
|
// the database comes up twice, once right away, then again a second
|
||
|
// time after the docker entrypoint starts configuration
|
||
|
wait.ForLog("database system is ready to accept connections").WithOccurrence(2),
|
||
|
wait.ForListeningPort(nat.Port(servicePort)),
|
||
|
),
|
||
|
}
|
||
|
tb.Cleanup(container.Terminate)
|
||
|
|
||
|
if err := container.Start(); err != nil {
|
||
|
return nil, fmt.Errorf("failed to start container: %w", err)
|
||
|
}
|
||
|
|
||
|
p := newPostgresql()
|
||
|
connection := fmt.Sprintf(
|
||
|
"host=%s port=%s user=%s password=%s dbname=%s",
|
||
|
container.Address,
|
||
|
container.Ports[servicePort],
|
||
|
username,
|
||
|
password,
|
||
|
testDatabaseName,
|
||
|
)
|
||
|
p.Connection = config.NewSecret([]byte(connection))
|
||
|
logger := NewLogAccumulator(tb)
|
||
|
p.Logger = logger
|
||
|
p.LogLevel = "debug"
|
||
|
|
||
|
if err := p.Init(); err != nil {
|
||
|
return nil, fmt.Errorf("failed to init plugin: %w", err)
|
||
|
}
|
||
|
|
||
|
pt := &PostgresqlTest{Postgresql: p}
|
||
|
pt.Logger = logger
|
||
|
|
||
|
return pt, nil
|
||
|
}
|
||
|
|
||
|
func TestPostgresqlConnectIntegration(t *testing.T) {
|
||
|
if testing.Short() {
|
||
|
t.Skip("Skipping integration test in short mode")
|
||
|
}
|
||
|
|
||
|
p, err := newPostgresqlTest(t)
|
||
|
require.NoError(t, err)
|
||
|
require.NoError(t, p.Connect())
|
||
|
require.EqualValues(t, 1, p.db.Stat().MaxConns())
|
||
|
|
||
|
p, err = newPostgresqlTest(t)
|
||
|
require.NoError(t, err)
|
||
|
connection, err := p.Connection.Get()
|
||
|
require.NoError(t, err)
|
||
|
p.Connection = config.NewSecret([]byte(connection.String() + " pool_max_conns=2"))
|
||
|
connection.Destroy()
|
||
|
|
||
|
require.NoError(t, p.Init())
|
||
|
require.NoError(t, p.Connect())
|
||
|
require.EqualValues(t, 2, p.db.Stat().MaxConns())
|
||
|
}
|
||
|
|
||
|
func TestConnectionIssueAtStartup(t *testing.T) {
|
||
|
// Test case for https://github.com/influxdata/telegraf/issues/14365
|
||
|
if testing.Short() {
|
||
|
t.Skip("Skipping integration test in short mode")
|
||
|
}
|
||
|
|
||
|
servicePort := "5432"
|
||
|
username := "postgres"
|
||
|
password := "postgres"
|
||
|
testDatabaseName := "telegraf_test"
|
||
|
|
||
|
container := testutil.Container{
|
||
|
Image: "postgres:alpine",
|
||
|
ExposedPorts: []string{servicePort},
|
||
|
Env: map[string]string{
|
||
|
"POSTGRES_USER": username,
|
||
|
"POSTGRES_PASSWORD": password,
|
||
|
"POSTGRES_DB": "telegraf_test",
|
||
|
},
|
||
|
WaitingFor: wait.ForAll(
|
||
|
// the database comes up twice, once right away, then again a second
|
||
|
// time after the docker entrypoint starts configuration
|
||
|
wait.ForLog("database system is ready to accept connections").WithOccurrence(2),
|
||
|
wait.ForListeningPort(nat.Port(servicePort)),
|
||
|
),
|
||
|
}
|
||
|
require.NoError(t, container.Start(), "failed to start container")
|
||
|
defer container.Terminate()
|
||
|
|
||
|
// Pause the container for connectivity issues
|
||
|
require.NoError(t, container.Pause())
|
||
|
|
||
|
// Create a model to be able to use the startup retry strategy
|
||
|
dsn := config.NewSecret([]byte(fmt.Sprintf(
|
||
|
"host=%s port=%s user=%s password=%s dbname=%s connect_timeout=1",
|
||
|
container.Address,
|
||
|
container.Ports[servicePort],
|
||
|
username,
|
||
|
password,
|
||
|
testDatabaseName,
|
||
|
)))
|
||
|
defer dsn.Destroy()
|
||
|
plugin := newPostgresql()
|
||
|
plugin.Connection = dsn
|
||
|
plugin.Logger = testutil.Logger{}
|
||
|
plugin.LogLevel = "debug"
|
||
|
model := models.NewRunningOutput(
|
||
|
plugin,
|
||
|
&models.OutputConfig{
|
||
|
Name: "postgres",
|
||
|
StartupErrorBehavior: "retry",
|
||
|
},
|
||
|
1000, 1000,
|
||
|
)
|
||
|
require.NoError(t, model.Init())
|
||
|
|
||
|
// The connect call should succeed even though the table creation was not
|
||
|
// successful due to the "retry" strategy
|
||
|
require.NoError(t, model.Connect())
|
||
|
|
||
|
// Writing the metrics in this state should fail because we are not fully
|
||
|
// started up
|
||
|
metrics := testutil.MockMetrics()
|
||
|
for _, m := range metrics {
|
||
|
model.AddMetric(m)
|
||
|
}
|
||
|
require.ErrorIs(t, model.WriteBatch(), internal.ErrNotConnected)
|
||
|
|
||
|
// Unpause the container, now writes should succeed
|
||
|
require.NoError(t, container.Resume())
|
||
|
require.NoError(t, model.WriteBatch())
|
||
|
model.Close()
|
||
|
}
|
||
|
|
||
|
func newMetric(
|
||
|
t *testing.T,
|
||
|
suffix string,
|
||
|
tags map[string]string,
|
||
|
fields map[string]interface{},
|
||
|
) telegraf.Metric {
|
||
|
return testutil.MustMetric(t.Name()+suffix, tags, fields, time.Now())
|
||
|
}
|
||
|
|
||
|
type MSS = map[string]string
|
||
|
type MSI = map[string]interface{}
|
||
|
|
||
|
func dbTableDump(t *testing.T, db *pgxpool.Pool, suffix string) []MSI {
|
||
|
rows, err := db.Query(ctx, "SELECT * FROM "+pgx.Identifier{t.Name() + suffix}.Sanitize())
|
||
|
require.NoError(t, err)
|
||
|
defer rows.Close()
|
||
|
|
||
|
var dump []MSI
|
||
|
for rows.Next() {
|
||
|
msi := MSI{}
|
||
|
vals, err := rows.Values()
|
||
|
require.NoError(t, err)
|
||
|
for i, fd := range rows.FieldDescriptions() {
|
||
|
msi[string(fd.Name)] = vals[i]
|
||
|
}
|
||
|
dump = append(dump, msi)
|
||
|
}
|
||
|
require.NoError(t, rows.Err())
|
||
|
return dump
|
||
|
}
|
||
|
|
||
|
func TestWriteIntegration_sequential(t *testing.T) {
|
||
|
if testing.Short() {
|
||
|
t.Skip("Skipping integration test in short mode")
|
||
|
}
|
||
|
|
||
|
p, err := newPostgresqlTest(t)
|
||
|
require.NoError(t, err)
|
||
|
require.NoError(t, p.Connect())
|
||
|
|
||
|
metrics := []telegraf.Metric{
|
||
|
newMetric(t, "_a", MSS{}, MSI{"v": 1}),
|
||
|
newMetric(t, "_b", MSS{}, MSI{"v": 2}),
|
||
|
newMetric(t, "_a", MSS{}, MSI{"v": 3}),
|
||
|
}
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
|
||
|
dumpA := dbTableDump(t, p.db, "_a")
|
||
|
dumpB := dbTableDump(t, p.db, "_b")
|
||
|
|
||
|
require.Len(t, dumpA, 2)
|
||
|
require.EqualValues(t, 1, dumpA[0]["v"])
|
||
|
require.EqualValues(t, 3, dumpA[1]["v"])
|
||
|
|
||
|
require.Len(t, dumpB, 1)
|
||
|
require.EqualValues(t, 2, dumpB[0]["v"])
|
||
|
|
||
|
p.Logger.Clear()
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
|
||
|
stmtCount := 0
|
||
|
for _, log := range p.Logger.Logs() {
|
||
|
if strings.Contains(log.String(), "info: PG ") {
|
||
|
stmtCount++
|
||
|
}
|
||
|
}
|
||
|
require.Equal(t, 6, stmtCount) // BEGIN, SAVEPOINT, COPY table _a, SAVEPOINT, COPY table _b, COMMIT
|
||
|
}
|
||
|
|
||
|
func TestWriteIntegration_concurrent(t *testing.T) {
|
||
|
if testing.Short() {
|
||
|
t.Skip("Skipping integration test in short mode")
|
||
|
}
|
||
|
|
||
|
p, err := newPostgresqlTest(t)
|
||
|
require.NoError(t, err)
|
||
|
p.dbConfig.MaxConns = 3
|
||
|
require.NoError(t, p.Connect())
|
||
|
|
||
|
// Write a metric so it creates a table we can lock.
|
||
|
metrics := []telegraf.Metric{
|
||
|
newMetric(t, "_a", MSS{}, MSI{"v": 1}),
|
||
|
}
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
p.Logger.WaitForCopy(t.Name()+"_a", false)
|
||
|
// clear so that the WaitForCopy calls below don't pick up this one
|
||
|
p.Logger.Clear()
|
||
|
|
||
|
// Lock the table so that we ensure the writes hangs and the plugin has to open another connection.
|
||
|
tx, err := p.db.Begin(ctx)
|
||
|
require.NoError(t, err)
|
||
|
defer tx.Rollback(ctx) //nolint:errcheck // ignore the returned error as we cannot do anything about it anyway
|
||
|
_, err = tx.Exec(ctx, "LOCK TABLE "+utils.QuoteIdentifier(t.Name()+"_a"))
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
metrics = []telegraf.Metric{
|
||
|
newMetric(t, "_a", MSS{}, MSI{"v": 2}),
|
||
|
}
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
|
||
|
// Note, there is technically a possible race here, where it doesn't try to insert into _a until after _b. However
|
||
|
// this should be practically impossible, and trying to engineer a solution to account for it would be even more
|
||
|
// complex than we already are.
|
||
|
|
||
|
metrics = []telegraf.Metric{
|
||
|
newMetric(t, "_b", MSS{}, MSI{"v": 3}),
|
||
|
}
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
|
||
|
p.Logger.WaitForCopy(t.Name()+"_b", false)
|
||
|
// release the lock on table _a
|
||
|
require.NoError(t, tx.Rollback(ctx))
|
||
|
p.Logger.WaitForCopy(t.Name()+"_a", false)
|
||
|
|
||
|
dumpA := dbTableDump(t, p.db, "_a")
|
||
|
dumpB := dbTableDump(t, p.db, "_b")
|
||
|
|
||
|
require.Len(t, dumpA, 2)
|
||
|
require.EqualValues(t, 1, dumpA[0]["v"])
|
||
|
require.EqualValues(t, 2, dumpA[1]["v"])
|
||
|
|
||
|
require.Len(t, dumpB, 1)
|
||
|
require.EqualValues(t, 3, dumpB[0]["v"])
|
||
|
|
||
|
// We should have had 3 connections. One for the lock, and one for each table.
|
||
|
require.EqualValues(t, 3, p.db.Stat().TotalConns())
|
||
|
}
|
||
|
|
||
|
// Test that the bad metric is dropped, and the rest of the batch succeeds.
|
||
|
func TestWriteIntegration_sequentialPermError(t *testing.T) {
|
||
|
if testing.Short() {
|
||
|
t.Skip("Skipping integration test in short mode")
|
||
|
}
|
||
|
|
||
|
p, err := newPostgresqlTest(t)
|
||
|
require.NoError(t, err)
|
||
|
require.NoError(t, p.Connect())
|
||
|
|
||
|
metrics := []telegraf.Metric{
|
||
|
newMetric(t, "_a", MSS{}, MSI{"v": 1}),
|
||
|
newMetric(t, "_b", MSS{}, MSI{"v": 2}),
|
||
|
}
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
|
||
|
metrics = []telegraf.Metric{
|
||
|
newMetric(t, "_a", MSS{}, MSI{"v": "a"}),
|
||
|
newMetric(t, "_b", MSS{}, MSI{"v": 3}),
|
||
|
}
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
|
||
|
dumpA := dbTableDump(t, p.db, "_a")
|
||
|
dumpB := dbTableDump(t, p.db, "_b")
|
||
|
require.Len(t, dumpA, 1)
|
||
|
require.Len(t, dumpB, 2)
|
||
|
|
||
|
haveError := false
|
||
|
for _, l := range p.Logger.Logs() {
|
||
|
if strings.Contains(l.String(), "write error") {
|
||
|
haveError = true
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
require.True(t, haveError, "write error not found in log")
|
||
|
}
|
||
|
|
||
|
// Test that in a bach with only 1 sub-batch, that we don't return an error.
|
||
|
func TestWriteIntegration_sequentialSinglePermError(t *testing.T) {
|
||
|
if testing.Short() {
|
||
|
t.Skip("Skipping integration test in short mode")
|
||
|
}
|
||
|
|
||
|
p, err := newPostgresqlTest(t)
|
||
|
require.NoError(t, err)
|
||
|
require.NoError(t, p.Connect())
|
||
|
|
||
|
metrics := []telegraf.Metric{
|
||
|
newMetric(t, "", MSS{}, MSI{"v": 1}),
|
||
|
}
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
|
||
|
metrics = []telegraf.Metric{
|
||
|
newMetric(t, "", MSS{}, MSI{"v": "a"}),
|
||
|
}
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
}
|
||
|
|
||
|
// Test that the bad metric is dropped, and the rest of the batch succeeds.
|
||
|
func TestWriteIntegration_concurrentPermError(t *testing.T) {
|
||
|
if testing.Short() {
|
||
|
t.Skip("Skipping integration test in short mode")
|
||
|
}
|
||
|
|
||
|
p, err := newPostgresqlTest(t)
|
||
|
require.NoError(t, err)
|
||
|
p.dbConfig.MaxConns = 2
|
||
|
require.NoError(t, p.Connect())
|
||
|
|
||
|
metrics := []telegraf.Metric{
|
||
|
newMetric(t, "_a", MSS{}, MSI{"v": 1}),
|
||
|
}
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
p.Logger.WaitForCopy(t.Name()+"_a", false)
|
||
|
|
||
|
metrics = []telegraf.Metric{
|
||
|
newMetric(t, "_a", MSS{}, MSI{"v": "a"}),
|
||
|
newMetric(t, "_b", MSS{}, MSI{"v": 2}),
|
||
|
}
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
p.Logger.WaitFor(func(l Log) bool {
|
||
|
return strings.Contains(l.String(), "write error")
|
||
|
}, false)
|
||
|
p.Logger.WaitForCopy(t.Name()+"_b", false)
|
||
|
|
||
|
dumpA := dbTableDump(t, p.db, "_a")
|
||
|
dumpB := dbTableDump(t, p.db, "_b")
|
||
|
require.Len(t, dumpA, 1)
|
||
|
require.Len(t, dumpB, 1)
|
||
|
}
|
||
|
|
||
|
// Verify that in sequential mode, errors are returned allowing telegraf agent to handle & retry
|
||
|
func TestWriteIntegration_sequentialTempError(t *testing.T) {
|
||
|
if testing.Short() {
|
||
|
t.Skip("Skipping integration test in short mode")
|
||
|
}
|
||
|
|
||
|
p, err := newPostgresqlTest(t)
|
||
|
require.NoError(t, err)
|
||
|
require.NoError(t, p.Connect())
|
||
|
|
||
|
// To avoid a race condition, we need to know when our goroutine has started listening to the log.
|
||
|
wg := sync.WaitGroup{}
|
||
|
wg.Add(1)
|
||
|
|
||
|
go func() {
|
||
|
// Wait for the CREATE TABLE, and then kill the connection.
|
||
|
// The WaitFor callback holds a lock on the log. Meaning it will block logging of the next action. So we trigger
|
||
|
// on CREATE TABLE so that there's a few statements to go before the COMMIT.
|
||
|
p.Logger.WaitFor(func(log Log) bool {
|
||
|
if strings.Contains(log.String(), "release wg") {
|
||
|
wg.Done()
|
||
|
}
|
||
|
|
||
|
if !strings.Contains(log.String(), "CREATE TABLE") {
|
||
|
return false
|
||
|
}
|
||
|
pid := log.args[1].(MSI)["pid"].(uint32)
|
||
|
|
||
|
conf := p.db.Config().ConnConfig
|
||
|
conf.Logger = nil
|
||
|
c, err := pgx.ConnectConfig(t.Context(), conf)
|
||
|
if err != nil {
|
||
|
t.Error(err)
|
||
|
return true
|
||
|
}
|
||
|
_, err = c.Exec(t.Context(), "SELECT pg_terminate_backend($1)", pid)
|
||
|
if err != nil {
|
||
|
t.Error(err)
|
||
|
}
|
||
|
return true
|
||
|
}, false)
|
||
|
}()
|
||
|
|
||
|
p.Logger.Infof("release wg")
|
||
|
wg.Wait()
|
||
|
|
||
|
metrics := []telegraf.Metric{
|
||
|
newMetric(t, "_a", MSS{}, MSI{"v": 1}),
|
||
|
}
|
||
|
require.Error(t, p.Write(metrics))
|
||
|
}
|
||
|
|
||
|
// Verify that when using concurrency, errors are not returned, but instead logged and automatically retried
|
||
|
func TestWriteIntegration_concurrentTempError(t *testing.T) {
|
||
|
if testing.Short() {
|
||
|
t.Skip("Skipping integration test in short mode")
|
||
|
}
|
||
|
|
||
|
p, err := newPostgresqlTest(t)
|
||
|
require.NoError(t, err)
|
||
|
p.dbConfig.MaxConns = 2
|
||
|
require.NoError(t, p.Connect())
|
||
|
|
||
|
// To avoid a race condition, we need to know when our goroutine has started listening to the log.
|
||
|
wg := sync.WaitGroup{}
|
||
|
wg.Add(1)
|
||
|
|
||
|
go func() {
|
||
|
// Wait for the CREATE TABLE, and then kill the connection.
|
||
|
// The WaitFor callback holds a lock on the log. Meaning it will block logging of the next action. So we trigger
|
||
|
// on CREATE TABLE so that there's a few statements to go before the COMMIT.
|
||
|
p.Logger.WaitFor(func(log Log) bool {
|
||
|
if strings.Contains(log.String(), "release wg") {
|
||
|
wg.Done()
|
||
|
}
|
||
|
|
||
|
if !strings.Contains(log.String(), "CREATE TABLE") {
|
||
|
return false
|
||
|
}
|
||
|
pid := log.args[1].(MSI)["pid"].(uint32)
|
||
|
|
||
|
conf := p.db.Config().ConnConfig
|
||
|
conf.Logger = nil
|
||
|
c, err := pgx.ConnectConfig(t.Context(), conf)
|
||
|
if err != nil {
|
||
|
t.Error(err)
|
||
|
return true
|
||
|
}
|
||
|
_, err = c.Exec(t.Context(), "SELECT pg_terminate_backend($1)", pid)
|
||
|
if err != nil {
|
||
|
t.Error(err)
|
||
|
}
|
||
|
return true
|
||
|
}, false)
|
||
|
}()
|
||
|
p.Logger.Infof("release wg")
|
||
|
wg.Wait()
|
||
|
|
||
|
metrics := []telegraf.Metric{
|
||
|
newMetric(t, "_a", MSS{}, MSI{"v": 1}),
|
||
|
}
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
|
||
|
p.Logger.WaitForCopy(t.Name()+"_a", false)
|
||
|
dumpA := dbTableDump(t, p.db, "_a")
|
||
|
require.Len(t, dumpA, 1)
|
||
|
|
||
|
haveError := false
|
||
|
for _, l := range p.Logger.Logs() {
|
||
|
if strings.Contains(l.String(), "write error") {
|
||
|
haveError = true
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
require.True(t, haveError, "write error not found in log")
|
||
|
}
|
||
|
|
||
|
func TestTimestampColumnNameIntegration(t *testing.T) {
|
||
|
if testing.Short() {
|
||
|
t.Skip("Skipping integration test in short mode")
|
||
|
}
|
||
|
|
||
|
p, err := newPostgresqlTest(t)
|
||
|
require.NoError(t, err)
|
||
|
p.TimestampColumnName = "timestamp"
|
||
|
require.NoError(t, p.Init())
|
||
|
require.NoError(t, p.Connect())
|
||
|
|
||
|
metrics := []telegraf.Metric{
|
||
|
metric.New(t.Name(), map[string]string{}, map[string]interface{}{"v": 42}, time.Unix(1691747345, 0)),
|
||
|
}
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
|
||
|
dump := dbTableDump(t, p.db, "")
|
||
|
require.Len(t, dump, 1)
|
||
|
require.EqualValues(t, 42, dump[0]["v"])
|
||
|
require.EqualValues(t, time.Unix(1691747345, 0).UTC(), dump[0]["timestamp"])
|
||
|
require.NotContains(t, dump[0], "time")
|
||
|
|
||
|
p.Logger.Clear()
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
|
||
|
stmtCount := 0
|
||
|
for _, log := range p.Logger.Logs() {
|
||
|
if strings.Contains(log.String(), "info: PG ") {
|
||
|
stmtCount++
|
||
|
}
|
||
|
}
|
||
|
require.Equal(t, 3, stmtCount) // BEGIN, COPY metrics table, COMMIT
|
||
|
}
|
||
|
|
||
|
func TestWriteTagTableIntegration(t *testing.T) {
|
||
|
if testing.Short() {
|
||
|
t.Skip("Skipping integration test in short mode")
|
||
|
}
|
||
|
|
||
|
p, err := newPostgresqlTest(t)
|
||
|
require.NoError(t, err)
|
||
|
p.TagsAsForeignKeys = true
|
||
|
require.NoError(t, p.Connect())
|
||
|
|
||
|
metrics := []telegraf.Metric{
|
||
|
newMetric(t, "", MSS{"tag": "foo"}, MSI{"v": 1}),
|
||
|
}
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
|
||
|
dump := dbTableDump(t, p.db, "")
|
||
|
require.Len(t, dump, 1)
|
||
|
require.EqualValues(t, 1, dump[0]["v"])
|
||
|
|
||
|
dumpTags := dbTableDump(t, p.db, p.TagTableSuffix)
|
||
|
require.Len(t, dumpTags, 1)
|
||
|
require.EqualValues(t, dump[0]["tag_id"], dumpTags[0]["tag_id"])
|
||
|
require.EqualValues(t, "foo", dumpTags[0]["tag"])
|
||
|
|
||
|
p.Logger.Clear()
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
|
||
|
stmtCount := 0
|
||
|
for _, log := range p.Logger.Logs() {
|
||
|
if strings.Contains(log.String(), "info: PG ") {
|
||
|
stmtCount++
|
||
|
}
|
||
|
}
|
||
|
require.Equal(t, 3, stmtCount) // BEGIN, COPY metrics table, COMMIT
|
||
|
}
|
||
|
|
||
|
// Verify that when using TagsAsForeignKeys and a tag can't be written, that we still add the metrics.
|
||
|
func TestWriteIntegration_tagError(t *testing.T) {
|
||
|
if testing.Short() {
|
||
|
t.Skip("Skipping integration test in short mode")
|
||
|
}
|
||
|
|
||
|
p, err := newPostgresqlTest(t)
|
||
|
require.NoError(t, err)
|
||
|
p.TagsAsForeignKeys = true
|
||
|
require.NoError(t, p.Connect())
|
||
|
|
||
|
metrics := []telegraf.Metric{
|
||
|
newMetric(t, "", MSS{"tag": "foo"}, MSI{"v": 1}),
|
||
|
}
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
|
||
|
// It'll have the table cached, so won't know we dropped it, will try insert, and get error.
|
||
|
_, err = p.db.Exec(ctx, "DROP TABLE \""+t.Name()+"_tag\"")
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
metrics = []telegraf.Metric{
|
||
|
newMetric(t, "", MSS{"tag": "foo"}, MSI{"v": 2}),
|
||
|
}
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
|
||
|
dump := dbTableDump(t, p.db, "")
|
||
|
require.Len(t, dump, 2)
|
||
|
require.EqualValues(t, 1, dump[0]["v"])
|
||
|
require.EqualValues(t, 2, dump[1]["v"])
|
||
|
}
|
||
|
|
||
|
// Verify that when using TagsAsForeignKeys and ForeignTagConstraint and a tag can't be written, that we drop the metrics.
|
||
|
func TestWriteIntegration_tagError_foreignConstraint(t *testing.T) {
|
||
|
if testing.Short() {
|
||
|
t.Skip("Skipping integration test in short mode")
|
||
|
}
|
||
|
|
||
|
p, err := newPostgresqlTest(t)
|
||
|
require.NoError(t, err)
|
||
|
p.TagsAsForeignKeys = true
|
||
|
p.ForeignTagConstraint = true
|
||
|
require.NoError(t, p.Connect())
|
||
|
|
||
|
metrics := []telegraf.Metric{
|
||
|
newMetric(t, "", MSS{"tag": "foo"}, MSI{"v": 1}),
|
||
|
}
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
|
||
|
// It'll have the table cached, so won't know we dropped it, will try insert, and get error.
|
||
|
_, err = p.db.Exec(ctx, "DROP TABLE \""+t.Name()+"_tag\"")
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
metrics = []telegraf.Metric{
|
||
|
newMetric(t, "", MSS{"tag": "bar"}, MSI{"v": 2}),
|
||
|
}
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
haveError := false
|
||
|
for _, l := range p.Logger.Logs() {
|
||
|
if strings.Contains(l.String(), "write error") {
|
||
|
haveError = true
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
require.True(t, haveError, "write error not found in log")
|
||
|
|
||
|
dump := dbTableDump(t, p.db, "")
|
||
|
require.Len(t, dump, 1)
|
||
|
require.EqualValues(t, 1, dump[0]["v"])
|
||
|
}
|
||
|
|
||
|
func TestWriteIntegration_utf8(t *testing.T) {
|
||
|
if testing.Short() {
|
||
|
t.Skip("Skipping integration test in short mode")
|
||
|
}
|
||
|
|
||
|
p, err := newPostgresqlTest(t)
|
||
|
require.NoError(t, err)
|
||
|
p.TagsAsForeignKeys = true
|
||
|
require.NoError(t, p.Connect())
|
||
|
|
||
|
metrics := []telegraf.Metric{
|
||
|
newMetric(t, "Ѧ𝙱Ƈᗞ",
|
||
|
MSS{"ăѣ𝔠ծ": "𝘈Ḇ𝖢𝕯٤ḞԍНǏ𝙅ƘԸⲘ𝙉০Ρ𝗤Ɍ𝓢ȚЦ𝒱Ѡ𝓧ƳȤ"},
|
||
|
MSI{"АḂⲤ𝗗": "𝘢ƀ𝖼ḋếᵮℊ𝙝Ꭵ𝕛кιṃդⱺ𝓅𝘲𝕣𝖘ŧ𝑢ṽẉ𝘅ყž𝜡"},
|
||
|
),
|
||
|
}
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
|
||
|
dump := dbTableDump(t, p.db, "Ѧ𝙱Ƈᗞ")
|
||
|
require.Len(t, dump, 1)
|
||
|
require.EqualValues(t, "𝘢ƀ𝖼ḋếᵮℊ𝙝Ꭵ𝕛кιṃդⱺ𝓅𝘲𝕣𝖘ŧ𝑢ṽẉ𝘅ყž𝜡", dump[0]["АḂⲤ𝗗"])
|
||
|
|
||
|
dumpTags := dbTableDump(t, p.db, "Ѧ𝙱Ƈᗞ"+p.TagTableSuffix)
|
||
|
require.Len(t, dumpTags, 1)
|
||
|
require.EqualValues(t, dump[0]["tag_id"], dumpTags[0]["tag_id"])
|
||
|
require.EqualValues(t, "𝘈Ḇ𝖢𝕯٤ḞԍНǏ𝙅ƘԸⲘ𝙉০Ρ𝗤Ɍ𝓢ȚЦ𝒱Ѡ𝓧ƳȤ", dumpTags[0]["ăѣ𝔠ծ"])
|
||
|
}
|
||
|
|
||
|
func TestWriteIntegration_UnsignedIntegers(t *testing.T) {
|
||
|
if testing.Short() {
|
||
|
t.Skip("Skipping integration test in short mode")
|
||
|
}
|
||
|
|
||
|
p, err := newPostgresqlTest(t)
|
||
|
require.NoError(t, err)
|
||
|
p.Uint64Type = PgUint8
|
||
|
require.NoError(t, p.Init())
|
||
|
if err := p.Connect(); err != nil {
|
||
|
if strings.Contains(err.Error(), "retrieving OID for uint8 data type") {
|
||
|
t.Skipf("pguint extension is not installed")
|
||
|
t.SkipNow()
|
||
|
}
|
||
|
require.NoError(t, err)
|
||
|
}
|
||
|
|
||
|
metrics := []telegraf.Metric{
|
||
|
newMetric(t, "", MSS{}, MSI{"v": uint64(math.MaxUint64)}),
|
||
|
}
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
|
||
|
dump := dbTableDump(t, p.db, "")
|
||
|
|
||
|
require.Len(t, dump, 1)
|
||
|
require.EqualValues(t, uint64(math.MaxUint64), dump[0]["v"])
|
||
|
}
|
||
|
|
||
|
// Last ditch effort to find any concurrency issues.
|
||
|
func TestStressConcurrencyIntegration(t *testing.T) {
|
||
|
t.Skip("Skipping very long test - run locally with no timeout")
|
||
|
|
||
|
metrics := []telegraf.Metric{
|
||
|
newMetric(t, "", MSS{"foo": "bar"}, MSI{"a": 1}),
|
||
|
newMetric(t, "", MSS{"pop": "tart"}, MSI{"b": 1}),
|
||
|
newMetric(t, "", MSS{"foo": "bar", "pop": "tart"}, MSI{"a": 2, "b": 2}),
|
||
|
newMetric(t, "_b", MSS{"foo": "bar"}, MSI{"a": 1}),
|
||
|
}
|
||
|
|
||
|
concurrency := 4
|
||
|
loops := 100
|
||
|
|
||
|
pctl, err := newPostgresqlTest(t)
|
||
|
require.NoError(t, err)
|
||
|
pctl.Logger.emitLevel = pgx.LogLevelWarn
|
||
|
require.NoError(t, pctl.Connect())
|
||
|
|
||
|
for i := 0; i < loops; i++ {
|
||
|
var wgStart, wgDone sync.WaitGroup
|
||
|
wgStart.Add(concurrency)
|
||
|
wgDone.Add(concurrency)
|
||
|
for j := 0; j < concurrency; j++ {
|
||
|
go func() {
|
||
|
mShuf := make([]telegraf.Metric, len(metrics))
|
||
|
copy(mShuf, metrics)
|
||
|
rand.Shuffle(len(mShuf), func(a, b int) { mShuf[a], mShuf[b] = mShuf[b], mShuf[a] })
|
||
|
|
||
|
p, err := newPostgresqlTest(t)
|
||
|
if err != nil {
|
||
|
t.Error(err)
|
||
|
}
|
||
|
|
||
|
p.TagsAsForeignKeys = true
|
||
|
p.Logger.emitLevel = pgx.LogLevelWarn
|
||
|
p.dbConfig.MaxConns = int32(rand.Intn(3) + 1)
|
||
|
if err := p.Connect(); err != nil {
|
||
|
t.Error(err)
|
||
|
}
|
||
|
wgStart.Done()
|
||
|
wgStart.Wait()
|
||
|
|
||
|
if err := p.Write(mShuf); err != nil {
|
||
|
t.Error(err)
|
||
|
}
|
||
|
if err := p.Close(); err != nil {
|
||
|
t.Error(err)
|
||
|
}
|
||
|
if p.Logger.HasLevel(pgx.LogLevelWarn) {
|
||
|
t.Errorf("logger mustn't have a warning level")
|
||
|
}
|
||
|
|
||
|
wgDone.Done()
|
||
|
}()
|
||
|
}
|
||
|
wgDone.Wait()
|
||
|
|
||
|
if t.Failed() {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestLongColumnNamesErrorIntegration(t *testing.T) {
|
||
|
if testing.Short() {
|
||
|
t.Skip("Skipping integration test in short mode")
|
||
|
}
|
||
|
|
||
|
// Setup the plugin
|
||
|
p, err := newPostgresqlTest(t)
|
||
|
require.NoError(t, err)
|
||
|
require.NoError(t, p.Init())
|
||
|
require.NoError(t, p.Connect())
|
||
|
|
||
|
// Define the metric to send
|
||
|
metrics := []telegraf.Metric{
|
||
|
metric.New(
|
||
|
t.Name(),
|
||
|
map[string]string{},
|
||
|
map[string]interface{}{
|
||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(0),
|
||
|
"value": 42,
|
||
|
},
|
||
|
time.Unix(0, 0).UTC(),
|
||
|
),
|
||
|
metric.New(
|
||
|
t.Name(),
|
||
|
map[string]string{},
|
||
|
map[string]interface{}{
|
||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(1),
|
||
|
"value": 43,
|
||
|
},
|
||
|
time.Unix(0, 1).UTC(),
|
||
|
),
|
||
|
metric.New(
|
||
|
t.Name(),
|
||
|
map[string]string{},
|
||
|
map[string]interface{}{
|
||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(2),
|
||
|
"value": 44,
|
||
|
},
|
||
|
time.Unix(0, 2).UTC(),
|
||
|
),
|
||
|
metric.New(
|
||
|
t.Name(),
|
||
|
map[string]string{},
|
||
|
map[string]interface{}{
|
||
|
"a_field_with_another_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(99),
|
||
|
"value": 45,
|
||
|
},
|
||
|
time.Unix(0, 9).UTC(),
|
||
|
),
|
||
|
}
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
|
||
|
// Check if the logging is restricted to once per field and all columns are
|
||
|
// mentioned
|
||
|
var longColLogErrs []string
|
||
|
for _, l := range p.Logger.logs {
|
||
|
msg := l.String()
|
||
|
if l.level == pgx.LogLevelError && strings.Contains(msg, "Column name too long") {
|
||
|
longColLogErrs = append(longColLogErrs, strings.TrimPrefix(msg, "error: Column name too long: "))
|
||
|
}
|
||
|
}
|
||
|
excpectedLongColumns := []string{
|
||
|
`"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63"`,
|
||
|
`"a_field_with_another_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63"`,
|
||
|
}
|
||
|
require.ElementsMatch(t, excpectedLongColumns, longColLogErrs)
|
||
|
|
||
|
// Denote the expected data in the table
|
||
|
expected := []map[string]interface{}{
|
||
|
{"time": time.Unix(0, 0).Unix(), "value": int64(42)},
|
||
|
{"time": time.Unix(0, 1).Unix(), "value": int64(43)},
|
||
|
{"time": time.Unix(0, 2).Unix(), "value": int64(44)},
|
||
|
{"time": time.Unix(0, 9).Unix(), "value": int64(45)},
|
||
|
{"time": time.Unix(0, 0).Unix(), "value": int64(42)},
|
||
|
{"time": time.Unix(0, 1).Unix(), "value": int64(43)},
|
||
|
{"time": time.Unix(0, 2).Unix(), "value": int64(44)},
|
||
|
{"time": time.Unix(0, 9).Unix(), "value": int64(45)},
|
||
|
}
|
||
|
|
||
|
// Get the actual table data nd convert the time to a timestamp for
|
||
|
// easier comparison
|
||
|
dump := dbTableDump(t, p.db, "")
|
||
|
require.Len(t, dump, len(expected))
|
||
|
for i, actual := range dump {
|
||
|
if raw, found := actual["time"]; found {
|
||
|
if t, ok := raw.(time.Time); ok {
|
||
|
actual["time"] = t.Unix()
|
||
|
}
|
||
|
}
|
||
|
require.EqualValues(t, expected[i], actual)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestLongColumnNamesClipIntegration(t *testing.T) {
|
||
|
if testing.Short() {
|
||
|
t.Skip("Skipping integration test in short mode")
|
||
|
}
|
||
|
|
||
|
// Setup the plugin
|
||
|
p, err := newPostgresqlTest(t)
|
||
|
require.NoError(t, err)
|
||
|
p.ColumnNameLenLimit = 63
|
||
|
require.NoError(t, p.Init())
|
||
|
require.NoError(t, p.Connect())
|
||
|
|
||
|
// Define the metric to send
|
||
|
metrics := []telegraf.Metric{
|
||
|
metric.New(
|
||
|
t.Name(),
|
||
|
map[string]string{},
|
||
|
map[string]interface{}{
|
||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(0),
|
||
|
"value": 42,
|
||
|
},
|
||
|
time.Unix(0, 0).UTC(),
|
||
|
),
|
||
|
metric.New(
|
||
|
t.Name(),
|
||
|
map[string]string{},
|
||
|
map[string]interface{}{
|
||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(1),
|
||
|
"value": 43,
|
||
|
},
|
||
|
time.Unix(0, 1).UTC(),
|
||
|
),
|
||
|
metric.New(
|
||
|
t.Name(),
|
||
|
map[string]string{},
|
||
|
map[string]interface{}{
|
||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(2),
|
||
|
"value": 44,
|
||
|
},
|
||
|
time.Unix(0, 2).UTC(),
|
||
|
),
|
||
|
metric.New(
|
||
|
t.Name(),
|
||
|
map[string]string{},
|
||
|
map[string]interface{}{
|
||
|
"a_field_with_another_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(99),
|
||
|
"value": 45,
|
||
|
},
|
||
|
time.Unix(0, 9).UTC(),
|
||
|
),
|
||
|
}
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
require.NoError(t, p.Write(metrics))
|
||
|
|
||
|
// Check if the logging is restricted to once per field and all columns are mentioned
|
||
|
var longColLogWarns []string
|
||
|
var longColLogErrs []string
|
||
|
for _, l := range p.Logger.logs {
|
||
|
msg := l.String()
|
||
|
if l.level == pgx.LogLevelWarn && strings.Contains(msg, "Limiting too long column name") {
|
||
|
longColLogWarns = append(longColLogWarns, strings.TrimPrefix(msg, "warn: Limiting too long column name: "))
|
||
|
continue
|
||
|
}
|
||
|
if l.level == pgx.LogLevelError && strings.Contains(msg, "Column name too long") {
|
||
|
longColLogErrs = append(longColLogErrs, strings.TrimPrefix(msg, "error: Column name too long: "))
|
||
|
continue
|
||
|
}
|
||
|
}
|
||
|
|
||
|
excpectedLongColumns := []string{
|
||
|
`"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63"`,
|
||
|
`"a_field_with_another_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63"`,
|
||
|
}
|
||
|
require.ElementsMatch(t, excpectedLongColumns, longColLogWarns)
|
||
|
require.Empty(t, longColLogErrs)
|
||
|
|
||
|
// Denote the expected data in the table
|
||
|
expected := []map[string]interface{}{
|
||
|
{
|
||
|
"time": time.Unix(0, 0).Unix(),
|
||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(0),
|
||
|
"a_field_with_another_very_long_name_exceeding_the_column_name_l": nil,
|
||
|
"value": int64(42),
|
||
|
},
|
||
|
{
|
||
|
"time": time.Unix(0, 1).Unix(),
|
||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(1),
|
||
|
"a_field_with_another_very_long_name_exceeding_the_column_name_l": nil,
|
||
|
"value": int64(43),
|
||
|
},
|
||
|
{
|
||
|
"time": time.Unix(0, 2).Unix(),
|
||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(2),
|
||
|
"a_field_with_another_very_long_name_exceeding_the_column_name_l": nil,
|
||
|
"value": int64(44),
|
||
|
},
|
||
|
{
|
||
|
"time": time.Unix(0, 9).Unix(),
|
||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": nil,
|
||
|
"a_field_with_another_very_long_name_exceeding_the_column_name_l": int64(99),
|
||
|
"value": int64(45),
|
||
|
},
|
||
|
{
|
||
|
"time": time.Unix(0, 0).Unix(),
|
||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(0),
|
||
|
"a_field_with_another_very_long_name_exceeding_the_column_name_l": nil,
|
||
|
"value": int64(42),
|
||
|
},
|
||
|
{
|
||
|
"time": time.Unix(0, 1).Unix(),
|
||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(1),
|
||
|
"a_field_with_another_very_long_name_exceeding_the_column_name_l": nil,
|
||
|
"value": int64(43),
|
||
|
},
|
||
|
{
|
||
|
"time": time.Unix(0, 2).Unix(),
|
||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(2),
|
||
|
"a_field_with_another_very_long_name_exceeding_the_column_name_l": nil,
|
||
|
"value": int64(44),
|
||
|
},
|
||
|
{
|
||
|
"time": time.Unix(0, 9).Unix(),
|
||
|
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": nil,
|
||
|
"a_field_with_another_very_long_name_exceeding_the_column_name_l": int64(99),
|
||
|
"value": int64(45),
|
||
|
},
|
||
|
}
|
||
|
|
||
|
// Get the actual table data nd convert the time to a timestamp for
|
||
|
// easier comparison
|
||
|
dump := dbTableDump(t, p.db, "")
|
||
|
require.Len(t, dump, len(expected))
|
||
|
for i, actual := range dump {
|
||
|
if raw, found := actual["time"]; found {
|
||
|
if t, ok := raw.(time.Time); ok {
|
||
|
actual["time"] = t.Unix()
|
||
|
}
|
||
|
}
|
||
|
require.EqualValues(t, expected[i], actual)
|
||
|
}
|
||
|
}
|