1
0
Fork 0
telegraf/plugins/inputs/cloud_pubsub/cloud_pubsub_test.go

301 lines
7.3 KiB
Go
Raw Permalink Normal View History

package cloud_pubsub
import (
"encoding/base64"
"errors"
"testing"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
)
const (
msgInflux = "cpu_load_short,host=server01 value=23422.0 1422568543702900257\n"
)
// Test ingesting InfluxDB-format PubSub message
func TestRunParse(t *testing.T) {
subID := "sub-run-parse"
testParser := &influx.Parser{}
require.NoError(t, testParser.Init())
sub := &stubSub{
id: subID,
messages: make(chan *testMsg, 100),
}
sub.receiver = testMessagesReceive(sub)
decoder, err := internal.NewContentDecoder("identity")
require.NoError(t, err)
ps := &PubSub{
Log: testutil.Logger{},
parser: testParser,
stubSub: func() subscription { return sub },
Project: "projectIDontMatterForTests",
Subscription: subID,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
decoder: decoder,
}
acc := &testutil.Accumulator{}
require.NoError(t, ps.Init())
require.NoError(t, ps.Start(acc))
defer ps.Stop()
require.NotNil(t, ps.sub)
testTracker := &testTracker{}
msg := &testMsg{
value: msgInflux,
tracker: testTracker,
}
sub.messages <- msg
acc.Wait(1)
require.Equal(t, 1, acc.NFields())
metric := acc.Metrics[0]
validateTestInfluxMetric(t, metric)
}
// Test ingesting InfluxDB-format PubSub message
func TestRunBase64(t *testing.T) {
subID := "sub-run-base64"
testParser := &influx.Parser{}
require.NoError(t, testParser.Init())
sub := &stubSub{
id: subID,
messages: make(chan *testMsg, 100),
}
sub.receiver = testMessagesReceive(sub)
decoder, err := internal.NewContentDecoder("identity")
require.NoError(t, err)
ps := &PubSub{
Log: testutil.Logger{},
parser: testParser,
stubSub: func() subscription { return sub },
Project: "projectIDontMatterForTests",
Subscription: subID,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
Base64Data: true,
decoder: decoder,
}
acc := &testutil.Accumulator{}
require.NoError(t, ps.Init())
require.NoError(t, ps.Start(acc))
defer ps.Stop()
require.NotNil(t, ps.sub)
testTracker := &testTracker{}
msg := &testMsg{
value: base64.StdEncoding.EncodeToString([]byte(msgInflux)),
tracker: testTracker,
}
sub.messages <- msg
acc.Wait(1)
require.Equal(t, 1, acc.NFields())
metric := acc.Metrics[0]
validateTestInfluxMetric(t, metric)
}
func TestRunGzipDecode(t *testing.T) {
subID := "sub-run-gzip"
testParser := &influx.Parser{}
require.NoError(t, testParser.Init())
sub := &stubSub{
id: subID,
messages: make(chan *testMsg, 100),
}
sub.receiver = testMessagesReceive(sub)
decoder, err := internal.NewContentDecoder("gzip")
require.NoError(t, err)
ps := &PubSub{
Log: testutil.Logger{},
parser: testParser,
stubSub: func() subscription { return sub },
Project: "projectIDontMatterForTests",
Subscription: subID,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
ContentEncoding: "gzip",
decoder: decoder,
}
acc := &testutil.Accumulator{}
require.NoError(t, ps.Init())
require.NoError(t, ps.Start(acc))
defer ps.Stop()
require.NotNil(t, ps.sub)
testTracker := &testTracker{}
enc, err := internal.NewGzipEncoder()
require.NoError(t, err)
gzippedMsg, err := enc.Encode([]byte(msgInflux))
require.NoError(t, err)
msg := &testMsg{
value: string(gzippedMsg),
tracker: testTracker,
}
sub.messages <- msg
acc.Wait(1)
require.Equal(t, 1, acc.NFields())
metric := acc.Metrics[0]
validateTestInfluxMetric(t, metric)
}
func TestRunInvalidMessages(t *testing.T) {
subID := "sub-invalid-messages"
testParser := &influx.Parser{}
require.NoError(t, testParser.Init())
sub := &stubSub{
id: subID,
messages: make(chan *testMsg, 100),
}
sub.receiver = testMessagesReceive(sub)
decoder, err := internal.NewContentDecoder("identity")
require.NoError(t, err)
ps := &PubSub{
Log: testutil.Logger{},
parser: testParser,
stubSub: func() subscription { return sub },
Project: "projectIDontMatterForTests",
Subscription: subID,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
decoder: decoder,
}
acc := &testutil.Accumulator{}
require.NoError(t, ps.Init())
require.NoError(t, ps.Start(acc))
defer ps.Stop()
require.NotNil(t, ps.sub)
testTracker := &testTracker{}
msg := &testMsg{
value: "~invalidInfluxMsg~",
tracker: testTracker,
}
sub.messages <- msg
acc.WaitError(1)
// Make sure we acknowledged message so we don't receive it again.
testTracker.waitForAck(1)
require.Equal(t, 0, acc.NFields())
}
func TestRunOverlongMessages(t *testing.T) {
subID := "sub-message-too-long"
acc := &testutil.Accumulator{}
testParser := &influx.Parser{}
require.NoError(t, testParser.Init())
sub := &stubSub{
id: subID,
messages: make(chan *testMsg, 100),
}
sub.receiver = testMessagesReceive(sub)
decoder, err := internal.NewContentDecoder("identity")
require.NoError(t, err)
ps := &PubSub{
Log: testutil.Logger{},
parser: testParser,
stubSub: func() subscription { return sub },
Project: "projectIDontMatterForTests",
Subscription: subID,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
decoder: decoder,
// Add MaxMessageLen Param
MaxMessageLen: 1,
}
require.NoError(t, ps.Init())
require.NoError(t, ps.Start(acc))
defer ps.Stop()
require.NotNil(t, ps.sub)
testTracker := &testTracker{}
msg := &testMsg{
value: msgInflux,
tracker: testTracker,
}
sub.messages <- msg
acc.WaitError(1)
// Make sure we acknowledged message so we don't receive it again.
testTracker.waitForAck(1)
require.Equal(t, 0, acc.NFields())
}
func TestRunErrorInSubscriber(t *testing.T) {
subID := "sub-unexpected-error"
acc := &testutil.Accumulator{}
testParser := &influx.Parser{}
require.NoError(t, testParser.Init())
sub := &stubSub{
id: subID,
messages: make(chan *testMsg, 100),
}
fakeErrStr := "a fake error"
sub.receiver = testMessagesError(errors.New("a fake error"))
decoder, err := internal.NewContentDecoder("identity")
require.NoError(t, err)
ps := &PubSub{
Log: testutil.Logger{},
parser: testParser,
stubSub: func() subscription { return sub },
Project: "projectIDontMatterForTests",
Subscription: subID,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
decoder: decoder,
RetryReceiveDelaySeconds: 1,
}
require.NoError(t, ps.Init())
require.NoError(t, ps.Start(acc))
defer ps.Stop()
require.NotNil(t, ps.sub)
acc.WaitError(1)
require.Regexp(t, fakeErrStr, acc.Errors[0])
}
func validateTestInfluxMetric(t *testing.T, m *testutil.Metric) {
require.Equal(t, "cpu_load_short", m.Measurement)
require.Equal(t, "server01", m.Tags["host"])
require.InDelta(t, 23422.0, m.Fields["value"], testutil.DefaultDelta)
require.Equal(t, int64(1422568543702900257), m.Time.UnixNano())
}