1
0
Fork 0
telegraf/plugins/outputs/postgresql/table_source_test.go

327 lines
8.2 KiB
Go
Raw Permalink Normal View History

package postgresql
import (
"encoding/json"
"testing"
"time"
"github.com/coocood/freecache"
"github.com/jackc/pgx/v4"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs/postgresql/utils"
)
func TestTableSource(_ *testing.T) {
}
type source interface {
pgx.CopyFromSource
ColumnNames() []string
}
func nextSrcRow(src source) MSI {
if !src.Next() {
return nil
}
row := MSI{}
vals, err := src.Values()
if err != nil {
panic(err)
}
for i, name := range src.ColumnNames() {
row[name] = vals[i]
}
return row
}
func TestTableSourceIntegration_tagJSONB(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsJsonb = true
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"a": "one", "b": "two"}, MSI{"v": 1}),
}
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
row := nextSrcRow(tsrc)
require.NoError(t, tsrc.Err())
require.IsType(t, time.Time{}, row["time"])
var tags MSI
require.NoError(t, json.Unmarshal(row["tags"].([]byte), &tags))
require.EqualValues(t, MSI{"a": "one", "b": "two"}, tags)
require.EqualValues(t, 1, row["v"])
}
func TestTableSourceIntegration_tagTable(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
p.tagsCache = freecache.NewCache(5 * 1024 * 1024)
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"a": "one", "b": "two"}, MSI{"v": 1}),
}
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
ttsrc := NewTagTableSource(tsrc)
ttrow := nextSrcRow(ttsrc)
require.EqualValues(t, "one", ttrow["a"])
require.EqualValues(t, "two", ttrow["b"])
row := nextSrcRow(tsrc)
require.Equal(t, row["tag_id"], ttrow["tag_id"])
}
func TestTableSourceIntegration_tagTableJSONB(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
p.TagsAsJsonb = true
p.tagsCache = freecache.NewCache(5 * 1024 * 1024)
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"a": "one", "b": "two"}, MSI{"v": 1}),
}
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
ttsrc := NewTagTableSource(tsrc)
ttrow := nextSrcRow(ttsrc)
var tags MSI
require.NoError(t, json.Unmarshal(ttrow["tags"].([]byte), &tags))
require.EqualValues(t, MSI{"a": "one", "b": "two"}, tags)
}
func TestTableSourceIntegration_fieldsJSONB(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.FieldsAsJsonb = true
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 1, "b": 2}),
}
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
row := nextSrcRow(tsrc)
var fields MSI
require.NoError(t, json.Unmarshal(row["fields"].([]byte), &fields))
// json unmarshals numbers as floats
require.EqualValues(t, MSI{"a": 1.0, "b": 2.0}, fields)
}
// TagsAsForeignKeys=false
// Test that when a tag column is dropped, all metrics containing that tag are dropped.
func TestTableSourceIntegration_DropColumn_tag(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"a": "one", "b": "two"}, MSI{"v": 1}),
newMetric(t, "", MSS{"a": "one"}, MSI{"v": 2}),
}
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
// Drop column "b"
var col utils.Column
for _, c := range tsrc.TagColumns() {
if c.Name == "b" {
col = c
break
}
}
require.NoError(t, tsrc.DropColumn(col))
row := nextSrcRow(tsrc)
require.EqualValues(t, "one", row["a"])
require.EqualValues(t, 2, row["v"])
require.False(t, tsrc.Next())
}
// TagsAsForeignKeys=true, ForeignTagConstraint=true
// Test that when a tag column is dropped, all metrics containing that tag are dropped.
func TestTableSourceIntegration_DropColumn_tag_fkTrue_fcTrue(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
p.ForeignTagConstraint = true
p.tagsCache = freecache.NewCache(5 * 1024 * 1024)
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"a": "one", "b": "two"}, MSI{"v": 1}),
newMetric(t, "", MSS{"a": "one"}, MSI{"v": 2}),
}
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
// Drop column "b"
var col utils.Column
for _, c := range tsrc.TagColumns() {
if c.Name == "b" {
col = c
break
}
}
require.NoError(t, tsrc.DropColumn(col))
ttsrc := NewTagTableSource(tsrc)
row := nextSrcRow(ttsrc)
require.EqualValues(t, "one", row["a"])
require.False(t, ttsrc.Next())
row = nextSrcRow(tsrc)
require.EqualValues(t, 2, row["v"])
require.False(t, tsrc.Next())
}
// TagsAsForeignKeys=true, ForeignTagConstraint=false
// Test that when a tag column is dropped, metrics are still added while the tag is not.
func TestTableSourceIntegration_DropColumn_tag_fkTrue_fcFalse(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
p.ForeignTagConstraint = false
p.tagsCache = freecache.NewCache(5 * 1024 * 1024)
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"a": "one", "b": "two"}, MSI{"v": 1}),
newMetric(t, "", MSS{"a": "one"}, MSI{"v": 2}),
}
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
// Drop column "b"
var col utils.Column
for _, c := range tsrc.TagColumns() {
if c.Name == "b" {
col = c
break
}
}
require.NoError(t, tsrc.DropColumn(col))
ttsrc := NewTagTableSource(tsrc)
row := nextSrcRow(ttsrc)
require.EqualValues(t, "one", row["a"])
require.False(t, ttsrc.Next())
row = nextSrcRow(tsrc)
require.EqualValues(t, 1, row["v"])
row = nextSrcRow(tsrc)
require.EqualValues(t, 2, row["v"])
}
// Test that when a field is dropped, only the field is dropped, and all rows remain, unless it was the only field.
func TestTableSourceIntegration_DropColumn_field(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 1}),
newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 2, "b": 3}),
}
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
// Drop column "a"
var col utils.Column
for _, c := range tsrc.FieldColumns() {
if c.Name == "a" {
col = c
break
}
}
require.NoError(t, tsrc.DropColumn(col))
row := nextSrcRow(tsrc)
require.EqualValues(t, "foo", row["tag"])
require.EqualValues(t, 3, row["b"])
require.False(t, tsrc.Next())
}
func TestTableSourceIntegration_InconsistentTags(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"a": "1"}, MSI{"b": 2}),
newMetric(t, "", MSS{"c": "3"}, MSI{"d": 4}),
}
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
trow := nextSrcRow(tsrc)
require.EqualValues(t, "1", trow["a"])
require.Nil(t, trow["c"])
trow = nextSrcRow(tsrc)
require.Nil(t, trow["a"])
require.EqualValues(t, "3", trow["c"])
}
func TestTagTableSourceIntegration_InconsistentTags(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p, err := newPostgresqlTest(t)
require.NoError(t, err)
p.TagsAsForeignKeys = true
p.tagsCache = freecache.NewCache(5 * 1024 * 1024)
metrics := []telegraf.Metric{
newMetric(t, "", MSS{"a": "1"}, MSI{"b": 2}),
newMetric(t, "", MSS{"c": "3"}, MSI{"d": 4}),
}
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
ttsrc := NewTagTableSource(tsrc)
// ttsrc is in non-deterministic order
expected := []MSI{
{"a": "1", "c": nil},
{"a": nil, "c": "3"},
}
var actual []MSI
for row := nextSrcRow(ttsrc); row != nil; row = nextSrcRow(ttsrc) {
delete(row, "tag_id")
actual = append(actual, row)
}
require.ElementsMatch(t, expected, actual)
}