1
0
Fork 0
telegraf/plugins/outputs/postgresql/table_manager_test.go

516 lines
15 KiB
Go
Raw Permalink Normal View History

package postgresql
import (
"strings"
"testing"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs/postgresql/sqltemplate"
"github.com/influxdata/telegraf/plugins/outputs/postgresql/utils"
)
func TestTableManagerIntegration_EnsureStructure(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
require.NoError(t, p.Connect())
cols := []utils.Column{
p.columnFromTag("foo", ""),
p.columnFromField("baz", 0),
}
missingCols, err := p.tableManager.EnsureStructure(
ctx,
p.db,
p.tableManager.table(t.Name()),
cols,
p.CreateTemplates,
p.AddColumnTemplates,
p.tableManager.table(t.Name()),
nil,
)
require.NoError(t, err)
require.Empty(t, missingCols)
tblCols := p.tableManager.table(t.Name()).columns
require.EqualValues(t, cols[0], tblCols["foo"])
require.EqualValues(t, cols[1], tblCols["baz"])
}
func TestTableManagerIntegration_EnsureStructure_alter(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
require.NoError(t, p.Connect())
cols := []utils.Column{
p.columnFromTag("foo", ""),
p.columnFromField("bar", 0),
}
_, err = p.tableManager.EnsureStructure(
ctx,
p.db,
p.tableManager.table(t.Name()),
cols,
p.CreateTemplates,
p.AddColumnTemplates,
p.tableManager.table(t.Name()),
nil,
)
require.NoError(t, err)
cols = append(cols, p.columnFromField("baz", 0))
missingCols, err := p.tableManager.EnsureStructure(
ctx,
p.db,
p.tableManager.table(t.Name()),
cols,
p.CreateTemplates,
p.AddColumnTemplates,
p.tableManager.table(t.Name()),
nil,
)
require.NoError(t, err)
require.Empty(t, missingCols)
tblCols := p.tableManager.table(t.Name()).columns
require.EqualValues(t, cols[0], tblCols["foo"])
require.EqualValues(t, cols[1], tblCols["bar"])
require.EqualValues(t, cols[2], tblCols["baz"])
}
func TestTableManagerIntegration_EnsureStructure_overflowTableName(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
require.NoError(t, p.Connect())
tbl := p.tableManager.table("ăăăăăăăăăăăăăăăăăăăăăăăăăăăăăăăă") // 32 2-byte unicode characters = 64 bytes
cols := []utils.Column{
p.columnFromField("foo", 0),
}
_, err = p.tableManager.EnsureStructure(
ctx,
p.db,
tbl,
cols,
p.CreateTemplates,
p.AddColumnTemplates,
tbl,
nil,
)
require.Error(t, err)
require.Contains(t, err.Error(), "table name too long")
require.False(t, isTempError(err))
}
func TestTableManagerIntegration_EnsureStructure_overflowTagName(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
require.NoError(t, p.Connect())
tbl := p.tableManager.table(t.Name())
cols := []utils.Column{
p.columnFromTag("ăăăăăăăăăăăăăăăăăăăăăăăăăăăăăăăă", "a"), // 32 2-byte unicode characters = 64 bytes
p.columnFromField("foo", 0),
}
_, err = p.tableManager.EnsureStructure(
ctx,
p.db,
tbl,
cols,
p.CreateTemplates,
p.AddColumnTemplates,
tbl,
nil,
)
require.Error(t, err)
require.False(t, isTempError(err))
}
func TestTableManagerIntegration_EnsureStructure_overflowFieldName(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
require.NoError(t, p.Connect())
tbl := p.tableManager.table(t.Name())
cols := []utils.Column{
p.columnFromField("foo", 0),
p.columnFromField("ăăăăăăăăăăăăăăăăăăăăăăăăăăăăăăăă", 0),
}
missingCols, err := p.tableManager.EnsureStructure(
ctx,
p.db,
tbl,
cols,
p.CreateTemplates,
p.AddColumnTemplates,
tbl,
nil,
)
require.NoError(t, err)
require.Len(t, missingCols, 1)
require.Equal(t, cols[1], missingCols[0])
}
func TestTableManagerIntegration_getColumns(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
require.NoError(t, p.Connect())
cols := []utils.Column{
p.columnFromTag("foo", ""),
p.columnFromField("baz", 0),
}
_, err = p.tableManager.EnsureStructure(
ctx,
p.db,
p.tableManager.table(t.Name()),
cols,
p.CreateTemplates,
p.AddColumnTemplates,
p.tableManager.table(t.Name()),
nil,
)
require.NoError(t, err)
p.tableManager.ClearTableCache()
require.Empty(t, p.tableManager.table(t.Name()).columns)
curCols, err := p.tableManager.getColumns(ctx, p.db, t.Name())
require.NoError(t, err)
require.EqualValues(t, cols[0], curCols["foo"])
require.EqualValues(t, cols[1], curCols["baz"])
}
func TestTableManagerIntegration_MatchSource(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
require.NoError(t, p.Connect())
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 1}),
}
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
require.Contains(t, p.tableManager.table(t.Name()+p.TagTableSuffix).columns, "tag")
require.Contains(t, p.tableManager.table(t.Name()).columns, "a")
}
func TestTableManagerIntegration_MatchSource_UnsignedIntegers(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.Uint64Type = PgUint8
require.NoError(t, p.Init())
if err := p.Connect(); err != nil {
if strings.Contains(err.Error(), "retrieving OID for uint8 data type") {
t.Skipf("pguint extension is not installed")
t.SkipNow()
}
require.NoError(t, err)
}
metrics := []telegraf.Metric{
newMetric(t, "", nil, MSI{"a": uint64(1)}),
}
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
require.Equal(t, PgUint8, p.tableManager.table(t.Name()).columns["a"].Type)
}
func TestTableManagerIntegration_noCreateTable(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.CreateTemplates = nil
require.NoError(t, p.Connect())
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 1}),
}
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
require.Error(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
}
func TestTableManagerIntegration_noCreateTagTable(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagTableCreateTemplates = nil
p.TagsAsForeignKeys = true
require.NoError(t, p.Connect())
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 1}),
}
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
require.Error(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
}
// verify that TableManager updates & caches the DB table structure unless the incoming metric can't fit.
func TestTableManagerIntegration_cache(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
require.NoError(t, p.Connect())
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 1}),
}
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
}
// Verify that when alter statements are disabled and a metric comes in with a new tag key, that the tag is omitted.
func TestTableManagerIntegration_noAlterMissingTag(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.AddColumnTemplates = make([]*sqltemplate.Template, 0)
require.NoError(t, p.Connect())
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 1}),
}
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
metrics = []telegraf.Metric{
newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 2}),
newMetric(t, "", MSS{"tag": "foo", "bar": "baz"}, MSI{"a": 3}),
}
tsrc = NewTableSources(p.Postgresql, metrics)[t.Name()]
require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
require.NotContains(t, tsrc.ColumnNames(), "bar")
}
// Verify that when using foreign tags and alter statements are disabled and a metric comes in with a new tag key, that
// the tag is omitted.
func TestTableManagerIntegration_noAlterMissingTagTableTag(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
p.TagTableAddColumnTemplates = make([]*sqltemplate.Template, 0)
require.NoError(t, p.Connect())
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 1}),
}
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
metrics = []telegraf.Metric{
newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 2}),
newMetric(t, "", MSS{"tag": "foo", "bar": "baz"}, MSI{"a": 3}),
}
tsrc = NewTableSources(p.Postgresql, metrics)[t.Name()]
ttsrc := NewTagTableSource(tsrc)
require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
require.NotContains(t, ttsrc.ColumnNames(), "bar")
}
// Verify that when using foreign tags and alter statements generate a permanent error and a metric comes in with a new
// tag key, that the tag is omitted.
func TestTableManagerIntegration_badAlterTagTable(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
tmpl := &sqltemplate.Template{}
require.NoError(t, tmpl.UnmarshalText([]byte("bad")))
p.TagTableAddColumnTemplates = []*sqltemplate.Template{tmpl}
require.NoError(t, p.Connect())
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 1}),
}
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
metrics = []telegraf.Metric{
newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 2}),
newMetric(t, "", MSS{"tag": "foo", "bar": "baz"}, MSI{"a": 3}),
}
tsrc = NewTableSources(p.Postgresql, metrics)[t.Name()]
ttsrc := NewTagTableSource(tsrc)
require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
require.NotContains(t, ttsrc.ColumnNames(), "bar")
}
// verify that when alter statements are disabled and a metric comes in with a new field key, that the field is omitted.
func TestTableManagerIntegration_noAlterMissingField(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.AddColumnTemplates = make([]*sqltemplate.Template, 0)
require.NoError(t, p.Connect())
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 1}),
}
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
metrics = []telegraf.Metric{
newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 2}),
newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 3, "b": 3}),
}
tsrc = NewTableSources(p.Postgresql, metrics)[t.Name()]
require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
require.NotContains(t, tsrc.ColumnNames(), "b")
}
// verify that when alter statements generate a permanent error and a metric comes in with a new field key, that the field is omitted.
func TestTableManagerIntegration_badAlterField(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
tmpl := &sqltemplate.Template{}
require.NoError(t, tmpl.UnmarshalText([]byte("bad")))
p.AddColumnTemplates = []*sqltemplate.Template{tmpl}
require.NoError(t, p.Connect())
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 1}),
}
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
metrics = []telegraf.Metric{
newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 2}),
newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 3, "b": 3}),
}
tsrc = NewTableSources(p.Postgresql, metrics)[t.Name()]
require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
require.NotContains(t, tsrc.ColumnNames(), "b")
}
func TestTableManager_addColumnTemplates(t *testing.T) {
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
require.NoError(t, p.Connect())
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"foo": "bar"}, MSI{"a": 1}),
}
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
p, err = newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
tmpl := &sqltemplate.Template{}
require.NoError(t, tmpl.UnmarshalText([]byte(`-- addColumnTemplate: {{ . }}`)))
p.AddColumnTemplates = []*sqltemplate.Template{tmpl}
require.NoError(t, p.Connect())
metrics = []telegraf.Metric{
newMetric(t, "", MSS{"pop": "tart"}, MSI{"a": 1, "b": 2}),
}
tsrc = NewTableSources(p.Postgresql, metrics)[t.Name()]
require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
p.Logger.Info("ok")
expected := `CREATE TABLE "public"."TestTableManager_addColumnTemplates" ("time" timestamp without time zone, "tag_id" bigint, "a" bigint, "b" bigint)`
stmtCount := 0
for _, log := range p.Logger.Logs() {
if strings.Contains(log.String(), expected) {
stmtCount++
}
}
require.Equal(t, 1, stmtCount)
}
func TestTableManager_TimeWithTimezone(t *testing.T) {
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
p.TimestampColumnType = "timestamp with time zone"
require.NoError(t, p.Init())
require.NoError(t, p.Connect())
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"pop": "tart"}, MSI{"a": 1, "b": 2}),
}
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
p.Logger.Info("ok")
expected := `CREATE TABLE "public"."TestTableManager_TimeWithTimezone" ("time" timestamp with time zone, "tag_id" bigint, "a" bigint, "b" bigint)`
stmtCount := 0
for _, log := range p.Logger.Logs() {
if strings.Contains(log.String(), expected) {
stmtCount++
}
}
require.Equal(t, 1, stmtCount)
}