1
0
Fork 0
telegraf/plugins/outputs/postgresql/postgresql_test.go

1215 lines
33 KiB
Go
Raw Permalink Normal View History

package postgresql
import (
"context"
"fmt"
"math"
"math/rand"
"strings"
"sync"
"testing"
"time"
"github.com/docker/go-connections/nat"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/outputs/postgresql/utils"
"github.com/influxdata/telegraf/testutil"
)
type Log struct {
level pgx.LogLevel
format string
args []interface{}
}
func (l Log) String() string {
// We have to use Errorf() as Sprintf() doesn't allow usage of %w.
return fmt.Errorf("%s: "+l.format, append([]interface{}{l.level}, l.args...)...).Error()
}
// LogAccumulator is a log collector that satisfies telegraf.Logger.
type LogAccumulator struct {
logs []Log
cond *sync.Cond
tb testing.TB
emitLevel pgx.LogLevel
}
func NewLogAccumulator(tb testing.TB) *LogAccumulator {
return &LogAccumulator{
cond: sync.NewCond(&sync.Mutex{}),
tb: tb,
}
}
func (la *LogAccumulator) Level() telegraf.LogLevel {
switch la.emitLevel {
case pgx.LogLevelInfo:
return telegraf.Info
case pgx.LogLevelWarn:
return telegraf.Warn
case pgx.LogLevelError:
return telegraf.Error
case pgx.LogLevelNone:
return telegraf.None
}
return telegraf.Debug
}
// Unused
func (*LogAccumulator) AddAttribute(string, interface{}) {}
func (la *LogAccumulator) append(level pgx.LogLevel, format string, args []interface{}) {
la.tb.Helper()
la.cond.L.Lock()
log := Log{level, format, args}
la.logs = append(la.logs, log)
if la.emitLevel == 0 || log.level <= la.emitLevel {
la.tb.Log(log.String())
}
la.cond.Broadcast()
la.cond.L.Unlock()
}
func (la *LogAccumulator) HasLevel(level pgx.LogLevel) bool {
la.cond.L.Lock()
defer la.cond.L.Unlock()
for _, log := range la.logs {
if log.level > 0 && log.level <= level {
return true
}
}
return false
}
func (la *LogAccumulator) WaitLen(n int) []Log {
la.cond.L.Lock()
defer la.cond.L.Unlock()
for len(la.logs) < n {
la.cond.Wait()
}
return la.logs[:]
}
// Waits for a specific query log from pgx to show up.
func (la *LogAccumulator) WaitFor(f func(l Log) bool, waitCommit bool) {
la.cond.L.Lock()
defer la.cond.L.Unlock()
i := 0
var commitPid uint32
for {
for ; i < len(la.logs); i++ {
log := la.logs[i]
if commitPid == 0 {
if f(log) {
if !waitCommit {
return
}
commitPid = log.args[1].(MSI)["pid"].(uint32)
}
} else {
if len(log.args) < 2 {
continue
}
data, ok := log.args[1].(MSI)
if !ok || data["pid"] != commitPid {
continue
}
if log.args[0] == "Exec" && data["sql"] == "commit" {
return
} else if log.args[0] == "Exec" && data["sql"] == "rollback" {
// transaction aborted, start looking for another match
commitPid = 0
} else if log.level == pgx.LogLevelError {
commitPid = 0
}
}
}
la.cond.Wait()
}
}
func (la *LogAccumulator) WaitForQuery(str string, waitCommit bool) {
la.WaitFor(func(log Log) bool {
return log.format == "PG %s - %+v" &&
(log.args[0].(string) == "Query" || log.args[0].(string) == "Exec") &&
strings.Contains(log.args[1].(MSI)["sql"].(string), str)
}, waitCommit)
}
func (la *LogAccumulator) WaitForCopy(tableName string, waitCommit bool) {
la.WaitFor(func(log Log) bool {
return log.format == "PG %s - %+v" &&
log.args[0].(string) == "CopyFrom" &&
log.args[1].(MSI)["tableName"].(pgx.Identifier)[1] == tableName
}, waitCommit)
}
// Clear any stored logs.
// Do not run this while any WaitFor* operations are in progress.
func (la *LogAccumulator) Clear() {
la.cond.L.Lock()
if len(la.logs) > 0 {
la.logs = nil
}
la.cond.L.Unlock()
}
func (la *LogAccumulator) Logs() []Log {
la.cond.L.Lock()
defer la.cond.L.Unlock()
return la.logs[:]
}
func (la *LogAccumulator) Errorf(format string, args ...interface{}) {
la.tb.Helper()
la.append(pgx.LogLevelError, format, args)
}
func (la *LogAccumulator) Error(args ...interface{}) {
la.tb.Helper()
la.append(pgx.LogLevelError, "%v", args)
}
func (la *LogAccumulator) Warnf(format string, args ...interface{}) {
la.tb.Helper()
la.append(pgx.LogLevelWarn, format, args)
}
func (la *LogAccumulator) Warn(args ...interface{}) {
la.tb.Helper()
la.append(pgx.LogLevelWarn, "%v", args)
}
func (la *LogAccumulator) Infof(format string, args ...interface{}) {
la.tb.Helper()
la.append(pgx.LogLevelInfo, format, args)
}
func (la *LogAccumulator) Info(args ...interface{}) {
la.tb.Helper()
la.append(pgx.LogLevelInfo, "%v", args)
}
func (la *LogAccumulator) Debugf(format string, args ...interface{}) {
la.tb.Helper()
la.append(pgx.LogLevelDebug, format, args)
}
func (la *LogAccumulator) Debug(args ...interface{}) {
la.tb.Helper()
la.append(pgx.LogLevelDebug, "%v", args)
}
func (la *LogAccumulator) Tracef(format string, args ...interface{}) {
la.tb.Helper()
la.append(pgx.LogLevelDebug, format, args)
}
func (la *LogAccumulator) Trace(args ...interface{}) {
la.tb.Helper()
la.append(pgx.LogLevelDebug, "%v", args)
}
var ctx = context.Background()
type PostgresqlTest struct {
*Postgresql
Logger *LogAccumulator
}
func newPostgresqlTest(tb testing.TB) (*PostgresqlTest, error) {
if testing.Short() {
tb.Skip("Skipping integration test in short mode")
}
servicePort := "5432"
username := "postgres"
password := "postgres"
testDatabaseName := "telegraf_test"
container := testutil.Container{
Image: "postgres:alpine",
ExposedPorts: []string{servicePort},
Env: map[string]string{
"POSTGRES_USER": username,
"POSTGRES_PASSWORD": password,
"POSTGRES_DB": "telegraf_test",
},
WaitingFor: wait.ForAll(
// the database comes up twice, once right away, then again a second
// time after the docker entrypoint starts configuration
wait.ForLog("database system is ready to accept connections").WithOccurrence(2),
wait.ForListeningPort(nat.Port(servicePort)),
),
}
tb.Cleanup(container.Terminate)
if err := container.Start(); err != nil {
return nil, fmt.Errorf("failed to start container: %w", err)
}
p := newPostgresql()
connection := fmt.Sprintf(
"host=%s port=%s user=%s password=%s dbname=%s",
container.Address,
container.Ports[servicePort],
username,
password,
testDatabaseName,
)
p.Connection = config.NewSecret([]byte(connection))
logger := NewLogAccumulator(tb)
p.Logger = logger
p.LogLevel = "debug"
if err := p.Init(); err != nil {
return nil, fmt.Errorf("failed to init plugin: %w", err)
}
pt := &PostgresqlTest{Postgresql: p}
pt.Logger = logger
return pt, nil
}
func TestPostgresqlConnectIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
require.NoError(t, p.Connect())
require.EqualValues(t, 1, p.db.Stat().MaxConns())
p, err = newPostgresqlTest(t)
require.NoError(t, err)
connection, err := p.Connection.Get()
require.NoError(t, err)
p.Connection = config.NewSecret([]byte(connection.String() + " pool_max_conns=2"))
connection.Destroy()
require.NoError(t, p.Init())
require.NoError(t, p.Connect())
require.EqualValues(t, 2, p.db.Stat().MaxConns())
}
func TestConnectionIssueAtStartup(t *testing.T) {
// Test case for https://github.com/influxdata/telegraf/issues/14365
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
servicePort := "5432"
username := "postgres"
password := "postgres"
testDatabaseName := "telegraf_test"
container := testutil.Container{
Image: "postgres:alpine",
ExposedPorts: []string{servicePort},
Env: map[string]string{
"POSTGRES_USER": username,
"POSTGRES_PASSWORD": password,
"POSTGRES_DB": "telegraf_test",
},
WaitingFor: wait.ForAll(
// the database comes up twice, once right away, then again a second
// time after the docker entrypoint starts configuration
wait.ForLog("database system is ready to accept connections").WithOccurrence(2),
wait.ForListeningPort(nat.Port(servicePort)),
),
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
// Pause the container for connectivity issues
require.NoError(t, container.Pause())
// Create a model to be able to use the startup retry strategy
dsn := config.NewSecret([]byte(fmt.Sprintf(
"host=%s port=%s user=%s password=%s dbname=%s connect_timeout=1",
container.Address,
container.Ports[servicePort],
username,
password,
testDatabaseName,
)))
defer dsn.Destroy()
plugin := newPostgresql()
plugin.Connection = dsn
plugin.Logger = testutil.Logger{}
plugin.LogLevel = "debug"
model := models.NewRunningOutput(
plugin,
&models.OutputConfig{
Name: "postgres",
StartupErrorBehavior: "retry",
},
1000, 1000,
)
require.NoError(t, model.Init())
// The connect call should succeed even though the table creation was not
// successful due to the "retry" strategy
require.NoError(t, model.Connect())
// Writing the metrics in this state should fail because we are not fully
// started up
metrics := testutil.MockMetrics()
for _, m := range metrics {
model.AddMetric(m)
}
require.ErrorIs(t, model.WriteBatch(), internal.ErrNotConnected)
// Unpause the container, now writes should succeed
require.NoError(t, container.Resume())
require.NoError(t, model.WriteBatch())
model.Close()
}
func newMetric(
t *testing.T,
suffix string,
tags map[string]string,
fields map[string]interface{},
) telegraf.Metric {
return testutil.MustMetric(t.Name()+suffix, tags, fields, time.Now())
}
type MSS = map[string]string
type MSI = map[string]interface{}
func dbTableDump(t *testing.T, db *pgxpool.Pool, suffix string) []MSI {
rows, err := db.Query(ctx, "SELECT * FROM "+pgx.Identifier{t.Name() + suffix}.Sanitize())
require.NoError(t, err)
defer rows.Close()
var dump []MSI
for rows.Next() {
msi := MSI{}
vals, err := rows.Values()
require.NoError(t, err)
for i, fd := range rows.FieldDescriptions() {
msi[string(fd.Name)] = vals[i]
}
dump = append(dump, msi)
}
require.NoError(t, rows.Err())
return dump
}
func TestWriteIntegration_sequential(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
require.NoError(t, p.Connect())
metrics := []telegraf.Metric{
newMetric(t, "_a", MSS{}, MSI{"v": 1}),
newMetric(t, "_b", MSS{}, MSI{"v": 2}),
newMetric(t, "_a", MSS{}, MSI{"v": 3}),
}
require.NoError(t, p.Write(metrics))
dumpA := dbTableDump(t, p.db, "_a")
dumpB := dbTableDump(t, p.db, "_b")
require.Len(t, dumpA, 2)
require.EqualValues(t, 1, dumpA[0]["v"])
require.EqualValues(t, 3, dumpA[1]["v"])
require.Len(t, dumpB, 1)
require.EqualValues(t, 2, dumpB[0]["v"])
p.Logger.Clear()
require.NoError(t, p.Write(metrics))
stmtCount := 0
for _, log := range p.Logger.Logs() {
if strings.Contains(log.String(), "info: PG ") {
stmtCount++
}
}
require.Equal(t, 6, stmtCount) // BEGIN, SAVEPOINT, COPY table _a, SAVEPOINT, COPY table _b, COMMIT
}
func TestWriteIntegration_concurrent(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.dbConfig.MaxConns = 3
require.NoError(t, p.Connect())
// Write a metric so it creates a table we can lock.
metrics := []telegraf.Metric{
newMetric(t, "_a", MSS{}, MSI{"v": 1}),
}
require.NoError(t, p.Write(metrics))
p.Logger.WaitForCopy(t.Name()+"_a", false)
// clear so that the WaitForCopy calls below don't pick up this one
p.Logger.Clear()
// Lock the table so that we ensure the writes hangs and the plugin has to open another connection.
tx, err := p.db.Begin(ctx)
require.NoError(t, err)
defer tx.Rollback(ctx) //nolint:errcheck // ignore the returned error as we cannot do anything about it anyway
_, err = tx.Exec(ctx, "LOCK TABLE "+utils.QuoteIdentifier(t.Name()+"_a"))
require.NoError(t, err)
metrics = []telegraf.Metric{
newMetric(t, "_a", MSS{}, MSI{"v": 2}),
}
require.NoError(t, p.Write(metrics))
// Note, there is technically a possible race here, where it doesn't try to insert into _a until after _b. However
// this should be practically impossible, and trying to engineer a solution to account for it would be even more
// complex than we already are.
metrics = []telegraf.Metric{
newMetric(t, "_b", MSS{}, MSI{"v": 3}),
}
require.NoError(t, p.Write(metrics))
p.Logger.WaitForCopy(t.Name()+"_b", false)
// release the lock on table _a
require.NoError(t, tx.Rollback(ctx))
p.Logger.WaitForCopy(t.Name()+"_a", false)
dumpA := dbTableDump(t, p.db, "_a")
dumpB := dbTableDump(t, p.db, "_b")
require.Len(t, dumpA, 2)
require.EqualValues(t, 1, dumpA[0]["v"])
require.EqualValues(t, 2, dumpA[1]["v"])
require.Len(t, dumpB, 1)
require.EqualValues(t, 3, dumpB[0]["v"])
// We should have had 3 connections. One for the lock, and one for each table.
require.EqualValues(t, 3, p.db.Stat().TotalConns())
}
// Test that the bad metric is dropped, and the rest of the batch succeeds.
func TestWriteIntegration_sequentialPermError(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
require.NoError(t, p.Connect())
metrics := []telegraf.Metric{
newMetric(t, "_a", MSS{}, MSI{"v": 1}),
newMetric(t, "_b", MSS{}, MSI{"v": 2}),
}
require.NoError(t, p.Write(metrics))
metrics = []telegraf.Metric{
newMetric(t, "_a", MSS{}, MSI{"v": "a"}),
newMetric(t, "_b", MSS{}, MSI{"v": 3}),
}
require.NoError(t, p.Write(metrics))
dumpA := dbTableDump(t, p.db, "_a")
dumpB := dbTableDump(t, p.db, "_b")
require.Len(t, dumpA, 1)
require.Len(t, dumpB, 2)
haveError := false
for _, l := range p.Logger.Logs() {
if strings.Contains(l.String(), "write error") {
haveError = true
break
}
}
require.True(t, haveError, "write error not found in log")
}
// Test that in a bach with only 1 sub-batch, that we don't return an error.
func TestWriteIntegration_sequentialSinglePermError(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
require.NoError(t, p.Connect())
metrics := []telegraf.Metric{
newMetric(t, "", MSS{}, MSI{"v": 1}),
}
require.NoError(t, p.Write(metrics))
metrics = []telegraf.Metric{
newMetric(t, "", MSS{}, MSI{"v": "a"}),
}
require.NoError(t, p.Write(metrics))
}
// Test that the bad metric is dropped, and the rest of the batch succeeds.
func TestWriteIntegration_concurrentPermError(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.dbConfig.MaxConns = 2
require.NoError(t, p.Connect())
metrics := []telegraf.Metric{
newMetric(t, "_a", MSS{}, MSI{"v": 1}),
}
require.NoError(t, p.Write(metrics))
p.Logger.WaitForCopy(t.Name()+"_a", false)
metrics = []telegraf.Metric{
newMetric(t, "_a", MSS{}, MSI{"v": "a"}),
newMetric(t, "_b", MSS{}, MSI{"v": 2}),
}
require.NoError(t, p.Write(metrics))
p.Logger.WaitFor(func(l Log) bool {
return strings.Contains(l.String(), "write error")
}, false)
p.Logger.WaitForCopy(t.Name()+"_b", false)
dumpA := dbTableDump(t, p.db, "_a")
dumpB := dbTableDump(t, p.db, "_b")
require.Len(t, dumpA, 1)
require.Len(t, dumpB, 1)
}
// Verify that in sequential mode, errors are returned allowing telegraf agent to handle & retry
func TestWriteIntegration_sequentialTempError(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
require.NoError(t, p.Connect())
// To avoid a race condition, we need to know when our goroutine has started listening to the log.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
// Wait for the CREATE TABLE, and then kill the connection.
// The WaitFor callback holds a lock on the log. Meaning it will block logging of the next action. So we trigger
// on CREATE TABLE so that there's a few statements to go before the COMMIT.
p.Logger.WaitFor(func(log Log) bool {
if strings.Contains(log.String(), "release wg") {
wg.Done()
}
if !strings.Contains(log.String(), "CREATE TABLE") {
return false
}
pid := log.args[1].(MSI)["pid"].(uint32)
conf := p.db.Config().ConnConfig
conf.Logger = nil
c, err := pgx.ConnectConfig(t.Context(), conf)
if err != nil {
t.Error(err)
return true
}
_, err = c.Exec(t.Context(), "SELECT pg_terminate_backend($1)", pid)
if err != nil {
t.Error(err)
}
return true
}, false)
}()
p.Logger.Infof("release wg")
wg.Wait()
metrics := []telegraf.Metric{
newMetric(t, "_a", MSS{}, MSI{"v": 1}),
}
require.Error(t, p.Write(metrics))
}
// Verify that when using concurrency, errors are not returned, but instead logged and automatically retried
func TestWriteIntegration_concurrentTempError(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.dbConfig.MaxConns = 2
require.NoError(t, p.Connect())
// To avoid a race condition, we need to know when our goroutine has started listening to the log.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
// Wait for the CREATE TABLE, and then kill the connection.
// The WaitFor callback holds a lock on the log. Meaning it will block logging of the next action. So we trigger
// on CREATE TABLE so that there's a few statements to go before the COMMIT.
p.Logger.WaitFor(func(log Log) bool {
if strings.Contains(log.String(), "release wg") {
wg.Done()
}
if !strings.Contains(log.String(), "CREATE TABLE") {
return false
}
pid := log.args[1].(MSI)["pid"].(uint32)
conf := p.db.Config().ConnConfig
conf.Logger = nil
c, err := pgx.ConnectConfig(t.Context(), conf)
if err != nil {
t.Error(err)
return true
}
_, err = c.Exec(t.Context(), "SELECT pg_terminate_backend($1)", pid)
if err != nil {
t.Error(err)
}
return true
}, false)
}()
p.Logger.Infof("release wg")
wg.Wait()
metrics := []telegraf.Metric{
newMetric(t, "_a", MSS{}, MSI{"v": 1}),
}
require.NoError(t, p.Write(metrics))
p.Logger.WaitForCopy(t.Name()+"_a", false)
dumpA := dbTableDump(t, p.db, "_a")
require.Len(t, dumpA, 1)
haveError := false
for _, l := range p.Logger.Logs() {
if strings.Contains(l.String(), "write error") {
haveError = true
break
}
}
require.True(t, haveError, "write error not found in log")
}
func TestTimestampColumnNameIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TimestampColumnName = "timestamp"
require.NoError(t, p.Init())
require.NoError(t, p.Connect())
metrics := []telegraf.Metric{
metric.New(t.Name(), map[string]string{}, map[string]interface{}{"v": 42}, time.Unix(1691747345, 0)),
}
require.NoError(t, p.Write(metrics))
dump := dbTableDump(t, p.db, "")
require.Len(t, dump, 1)
require.EqualValues(t, 42, dump[0]["v"])
require.EqualValues(t, time.Unix(1691747345, 0).UTC(), dump[0]["timestamp"])
require.NotContains(t, dump[0], "time")
p.Logger.Clear()
require.NoError(t, p.Write(metrics))
stmtCount := 0
for _, log := range p.Logger.Logs() {
if strings.Contains(log.String(), "info: PG ") {
stmtCount++
}
}
require.Equal(t, 3, stmtCount) // BEGIN, COPY metrics table, COMMIT
}
func TestWriteTagTableIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
require.NoError(t, p.Connect())
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"tag": "foo"}, MSI{"v": 1}),
}
require.NoError(t, p.Write(metrics))
dump := dbTableDump(t, p.db, "")
require.Len(t, dump, 1)
require.EqualValues(t, 1, dump[0]["v"])
dumpTags := dbTableDump(t, p.db, p.TagTableSuffix)
require.Len(t, dumpTags, 1)
require.EqualValues(t, dump[0]["tag_id"], dumpTags[0]["tag_id"])
require.EqualValues(t, "foo", dumpTags[0]["tag"])
p.Logger.Clear()
require.NoError(t, p.Write(metrics))
stmtCount := 0
for _, log := range p.Logger.Logs() {
if strings.Contains(log.String(), "info: PG ") {
stmtCount++
}
}
require.Equal(t, 3, stmtCount) // BEGIN, COPY metrics table, COMMIT
}
// Verify that when using TagsAsForeignKeys and a tag can't be written, that we still add the metrics.
func TestWriteIntegration_tagError(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
require.NoError(t, p.Connect())
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"tag": "foo"}, MSI{"v": 1}),
}
require.NoError(t, p.Write(metrics))
// It'll have the table cached, so won't know we dropped it, will try insert, and get error.
_, err = p.db.Exec(ctx, "DROP TABLE \""+t.Name()+"_tag\"")
require.NoError(t, err)
metrics = []telegraf.Metric{
newMetric(t, "", MSS{"tag": "foo"}, MSI{"v": 2}),
}
require.NoError(t, p.Write(metrics))
dump := dbTableDump(t, p.db, "")
require.Len(t, dump, 2)
require.EqualValues(t, 1, dump[0]["v"])
require.EqualValues(t, 2, dump[1]["v"])
}
// Verify that when using TagsAsForeignKeys and ForeignTagConstraint and a tag can't be written, that we drop the metrics.
func TestWriteIntegration_tagError_foreignConstraint(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
p.ForeignTagConstraint = true
require.NoError(t, p.Connect())
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"tag": "foo"}, MSI{"v": 1}),
}
require.NoError(t, p.Write(metrics))
// It'll have the table cached, so won't know we dropped it, will try insert, and get error.
_, err = p.db.Exec(ctx, "DROP TABLE \""+t.Name()+"_tag\"")
require.NoError(t, err)
metrics = []telegraf.Metric{
newMetric(t, "", MSS{"tag": "bar"}, MSI{"v": 2}),
}
require.NoError(t, p.Write(metrics))
haveError := false
for _, l := range p.Logger.Logs() {
if strings.Contains(l.String(), "write error") {
haveError = true
break
}
}
require.True(t, haveError, "write error not found in log")
dump := dbTableDump(t, p.db, "")
require.Len(t, dump, 1)
require.EqualValues(t, 1, dump[0]["v"])
}
func TestWriteIntegration_utf8(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
require.NoError(t, p.Connect())
metrics := []telegraf.Metric{
newMetric(t, "Ѧ𝙱Ƈᗞ",
MSS{"ăѣ𝔠ծ": "𝘈Ḇ𝖢𝕯٤ḞԍНǏ𝙅ƘԸⲘ𝙉০Ρ𝗤Ɍ𝓢ȚЦ𝒱Ѡ𝓧ƳȤ"},
MSI{"АḂⲤ𝗗": "𝘢ƀ𝖼ḋếᵮℊ𝙝Ꭵ𝕛кιṃդⱺ𝓅𝘲𝕣𝖘ŧ𝑢ṽẉ𝘅ყž𝜡"},
),
}
require.NoError(t, p.Write(metrics))
dump := dbTableDump(t, p.db, "Ѧ𝙱Ƈᗞ")
require.Len(t, dump, 1)
require.EqualValues(t, "𝘢ƀ𝖼ḋếᵮℊ𝙝Ꭵ𝕛кιṃդⱺ𝓅𝘲𝕣𝖘ŧ𝑢ṽẉ𝘅ყž𝜡", dump[0]["АḂⲤ𝗗"])
dumpTags := dbTableDump(t, p.db, "Ѧ𝙱Ƈᗞ"+p.TagTableSuffix)
require.Len(t, dumpTags, 1)
require.EqualValues(t, dump[0]["tag_id"], dumpTags[0]["tag_id"])
require.EqualValues(t, "𝘈Ḇ𝖢𝕯٤ḞԍНǏ𝙅ƘԸⲘ𝙉০Ρ𝗤Ɍ𝓢ȚЦ𝒱Ѡ𝓧ƳȤ", dumpTags[0]["ăѣ𝔠ծ"])
}
func TestWriteIntegration_UnsignedIntegers(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.Uint64Type = PgUint8
require.NoError(t, p.Init())
if err := p.Connect(); err != nil {
if strings.Contains(err.Error(), "retrieving OID for uint8 data type") {
t.Skipf("pguint extension is not installed")
t.SkipNow()
}
require.NoError(t, err)
}
metrics := []telegraf.Metric{
newMetric(t, "", MSS{}, MSI{"v": uint64(math.MaxUint64)}),
}
require.NoError(t, p.Write(metrics))
dump := dbTableDump(t, p.db, "")
require.Len(t, dump, 1)
require.EqualValues(t, uint64(math.MaxUint64), dump[0]["v"])
}
// Last ditch effort to find any concurrency issues.
func TestStressConcurrencyIntegration(t *testing.T) {
t.Skip("Skipping very long test - run locally with no timeout")
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"foo": "bar"}, MSI{"a": 1}),
newMetric(t, "", MSS{"pop": "tart"}, MSI{"b": 1}),
newMetric(t, "", MSS{"foo": "bar", "pop": "tart"}, MSI{"a": 2, "b": 2}),
newMetric(t, "_b", MSS{"foo": "bar"}, MSI{"a": 1}),
}
concurrency := 4
loops := 100
pctl, err := newPostgresqlTest(t)
require.NoError(t, err)
pctl.Logger.emitLevel = pgx.LogLevelWarn
require.NoError(t, pctl.Connect())
for i := 0; i < loops; i++ {
var wgStart, wgDone sync.WaitGroup
wgStart.Add(concurrency)
wgDone.Add(concurrency)
for j := 0; j < concurrency; j++ {
go func() {
mShuf := make([]telegraf.Metric, len(metrics))
copy(mShuf, metrics)
rand.Shuffle(len(mShuf), func(a, b int) { mShuf[a], mShuf[b] = mShuf[b], mShuf[a] })
p, err := newPostgresqlTest(t)
if err != nil {
t.Error(err)
}
p.TagsAsForeignKeys = true
p.Logger.emitLevel = pgx.LogLevelWarn
p.dbConfig.MaxConns = int32(rand.Intn(3) + 1)
if err := p.Connect(); err != nil {
t.Error(err)
}
wgStart.Done()
wgStart.Wait()
if err := p.Write(mShuf); err != nil {
t.Error(err)
}
if err := p.Close(); err != nil {
t.Error(err)
}
if p.Logger.HasLevel(pgx.LogLevelWarn) {
t.Errorf("logger mustn't have a warning level")
}
wgDone.Done()
}()
}
wgDone.Wait()
if t.Failed() {
break
}
}
}
func TestLongColumnNamesErrorIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
// Setup the plugin
p, err := newPostgresqlTest(t)
require.NoError(t, err)
require.NoError(t, p.Init())
require.NoError(t, p.Connect())
// Define the metric to send
metrics := []telegraf.Metric{
metric.New(
t.Name(),
map[string]string{},
map[string]interface{}{
"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(0),
"value": 42,
},
time.Unix(0, 0).UTC(),
),
metric.New(
t.Name(),
map[string]string{},
map[string]interface{}{
"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(1),
"value": 43,
},
time.Unix(0, 1).UTC(),
),
metric.New(
t.Name(),
map[string]string{},
map[string]interface{}{
"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(2),
"value": 44,
},
time.Unix(0, 2).UTC(),
),
metric.New(
t.Name(),
map[string]string{},
map[string]interface{}{
"a_field_with_another_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(99),
"value": 45,
},
time.Unix(0, 9).UTC(),
),
}
require.NoError(t, p.Write(metrics))
require.NoError(t, p.Write(metrics))
// Check if the logging is restricted to once per field and all columns are
// mentioned
var longColLogErrs []string
for _, l := range p.Logger.logs {
msg := l.String()
if l.level == pgx.LogLevelError && strings.Contains(msg, "Column name too long") {
longColLogErrs = append(longColLogErrs, strings.TrimPrefix(msg, "error: Column name too long: "))
}
}
excpectedLongColumns := []string{
`"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63"`,
`"a_field_with_another_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63"`,
}
require.ElementsMatch(t, excpectedLongColumns, longColLogErrs)
// Denote the expected data in the table
expected := []map[string]interface{}{
{"time": time.Unix(0, 0).Unix(), "value": int64(42)},
{"time": time.Unix(0, 1).Unix(), "value": int64(43)},
{"time": time.Unix(0, 2).Unix(), "value": int64(44)},
{"time": time.Unix(0, 9).Unix(), "value": int64(45)},
{"time": time.Unix(0, 0).Unix(), "value": int64(42)},
{"time": time.Unix(0, 1).Unix(), "value": int64(43)},
{"time": time.Unix(0, 2).Unix(), "value": int64(44)},
{"time": time.Unix(0, 9).Unix(), "value": int64(45)},
}
// Get the actual table data nd convert the time to a timestamp for
// easier comparison
dump := dbTableDump(t, p.db, "")
require.Len(t, dump, len(expected))
for i, actual := range dump {
if raw, found := actual["time"]; found {
if t, ok := raw.(time.Time); ok {
actual["time"] = t.Unix()
}
}
require.EqualValues(t, expected[i], actual)
}
}
func TestLongColumnNamesClipIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
// Setup the plugin
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.ColumnNameLenLimit = 63
require.NoError(t, p.Init())
require.NoError(t, p.Connect())
// Define the metric to send
metrics := []telegraf.Metric{
metric.New(
t.Name(),
map[string]string{},
map[string]interface{}{
"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(0),
"value": 42,
},
time.Unix(0, 0).UTC(),
),
metric.New(
t.Name(),
map[string]string{},
map[string]interface{}{
"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(1),
"value": 43,
},
time.Unix(0, 1).UTC(),
),
metric.New(
t.Name(),
map[string]string{},
map[string]interface{}{
"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(2),
"value": 44,
},
time.Unix(0, 2).UTC(),
),
metric.New(
t.Name(),
map[string]string{},
map[string]interface{}{
"a_field_with_another_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63": int64(99),
"value": 45,
},
time.Unix(0, 9).UTC(),
),
}
require.NoError(t, p.Write(metrics))
require.NoError(t, p.Write(metrics))
// Check if the logging is restricted to once per field and all columns are mentioned
var longColLogWarns []string
var longColLogErrs []string
for _, l := range p.Logger.logs {
msg := l.String()
if l.level == pgx.LogLevelWarn && strings.Contains(msg, "Limiting too long column name") {
longColLogWarns = append(longColLogWarns, strings.TrimPrefix(msg, "warn: Limiting too long column name: "))
continue
}
if l.level == pgx.LogLevelError && strings.Contains(msg, "Column name too long") {
longColLogErrs = append(longColLogErrs, strings.TrimPrefix(msg, "error: Column name too long: "))
continue
}
}
excpectedLongColumns := []string{
`"a_field_with_a_some_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63"`,
`"a_field_with_another_very_long_name_exceeding_the_column_name_limit_of_postgres_of_63"`,
}
require.ElementsMatch(t, excpectedLongColumns, longColLogWarns)
require.Empty(t, longColLogErrs)
// Denote the expected data in the table
expected := []map[string]interface{}{
{
"time": time.Unix(0, 0).Unix(),
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(0),
"a_field_with_another_very_long_name_exceeding_the_column_name_l": nil,
"value": int64(42),
},
{
"time": time.Unix(0, 1).Unix(),
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(1),
"a_field_with_another_very_long_name_exceeding_the_column_name_l": nil,
"value": int64(43),
},
{
"time": time.Unix(0, 2).Unix(),
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(2),
"a_field_with_another_very_long_name_exceeding_the_column_name_l": nil,
"value": int64(44),
},
{
"time": time.Unix(0, 9).Unix(),
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": nil,
"a_field_with_another_very_long_name_exceeding_the_column_name_l": int64(99),
"value": int64(45),
},
{
"time": time.Unix(0, 0).Unix(),
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(0),
"a_field_with_another_very_long_name_exceeding_the_column_name_l": nil,
"value": int64(42),
},
{
"time": time.Unix(0, 1).Unix(),
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(1),
"a_field_with_another_very_long_name_exceeding_the_column_name_l": nil,
"value": int64(43),
},
{
"time": time.Unix(0, 2).Unix(),
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": int64(2),
"a_field_with_another_very_long_name_exceeding_the_column_name_l": nil,
"value": int64(44),
},
{
"time": time.Unix(0, 9).Unix(),
"a_field_with_a_some_very_long_name_exceeding_the_column_name_li": nil,
"a_field_with_another_very_long_name_exceeding_the_column_name_l": int64(99),
"value": int64(45),
},
}
// Get the actual table data nd convert the time to a timestamp for
// easier comparison
dump := dbTableDump(t, p.db, "")
require.Len(t, dump, len(expected))
for i, actual := range dump {
if raw, found := actual["time"]; found {
if t, ok := raw.(time.Time); ok {
actual["time"] = t.Unix()
}
}
require.EqualValues(t, expected[i], actual)
}
}