1
0
Fork 0
telegraf/plugins/outputs/cloud_pubsub/topic_stubbed.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

238 lines
5.2 KiB
Go

package cloud_pubsub
import (
"context"
"encoding/base64"
"errors"
"runtime"
"sync"
"testing"
"time"
"cloud.google.com/go/pubsub"
"github.com/stretchr/testify/require"
"google.golang.org/api/support/bundler"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/parsers/influx"
serializers_influx "github.com/influxdata/telegraf/plugins/serializers/influx"
)
const (
errMockFail = "this is an error"
)
type (
testMetric struct {
m telegraf.Metric
returnErr bool
}
bundledMsg struct {
*pubsub.Message
*stubResult
}
stubResult struct {
metricIDs []string
sendError bool
err chan error
done chan struct{}
}
stubTopic struct {
Settings pubsub.PublishSettings
ReturnErr map[string]bool
telegraf.Parser
*testing.T
Base64Data bool
ContentEncoding string
stopped bool
pLock sync.Mutex
published map[string]*pubsub.Message
bundler *bundler.Bundler
bLock sync.Mutex
bundleCount int
}
)
func getTestResources(tT *testing.T, settings pubsub.PublishSettings, testM []testMetric) (*PubSub, *stubTopic, []telegraf.Metric) {
// Instantiate a Influx line-protocol serializer
s := &serializers_influx.Serializer{}
require.NoError(tT, s.Init())
metrics := make([]telegraf.Metric, 0, len(testM))
t := &stubTopic{
T: tT,
ReturnErr: make(map[string]bool),
published: make(map[string]*pubsub.Message),
ContentEncoding: "identity",
}
for _, tm := range testM {
metrics = append(metrics, tm.m)
if tm.returnErr {
v, _ := tm.m.GetField("value")
t.ReturnErr[v.(string)] = true
}
}
ps := &PubSub{
Project: "test-project",
Topic: "test-topic",
stubTopic: func(string) topic { return t },
PublishCountThreshold: settings.CountThreshold,
PublishByteThreshold: settings.ByteThreshold,
PublishNumGoroutines: settings.NumGoroutines,
PublishTimeout: config.Duration(settings.Timeout),
ContentEncoding: "identity",
}
require.NoError(tT, ps.Init())
var err error
ps.encoder, err = internal.NewContentEncoder(ps.ContentEncoding)
require.NoError(tT, err)
ps.SetSerializer(s)
return ps, t, metrics
}
func (*stubTopic) ID() string {
return "test-topic"
}
func (t *stubTopic) Stop() {
t.pLock.Lock()
defer t.pLock.Unlock()
t.stopped = true
t.bundler.Flush()
}
func (t *stubTopic) Publish(ctx context.Context, msg *pubsub.Message) publishResult {
t.pLock.Lock()
defer t.pLock.Unlock()
if t.stopped || ctx.Err() != nil {
t.Fatalf("publish called after stop")
}
ids := t.parseIDs(msg)
r := &stubResult{
metricIDs: ids,
err: make(chan error, 1),
done: make(chan struct{}, 1),
}
for _, id := range ids {
_, ok := t.ReturnErr[id]
r.sendError = r.sendError || ok
}
bundled := &bundledMsg{msg, r}
if err := t.bundler.Add(bundled, len(msg.Data)); err != nil {
t.Fatalf("unexpected error while adding to bundle: %v", err)
}
return r
}
func (t *stubTopic) PublishSettings() pubsub.PublishSettings {
return t.Settings
}
func (t *stubTopic) SetPublishSettings(settings pubsub.PublishSettings) {
t.Settings = settings
t.initBundler()
}
func (t *stubTopic) initBundler() *stubTopic {
t.bundler = bundler.NewBundler(&bundledMsg{}, t.sendBundle())
t.bundler.DelayThreshold = 10 * time.Second
t.bundler.BundleCountThreshold = t.Settings.CountThreshold
if t.bundler.BundleCountThreshold > pubsub.MaxPublishRequestCount {
t.bundler.BundleCountThreshold = pubsub.MaxPublishRequestCount
}
t.bundler.BundleByteThreshold = t.Settings.ByteThreshold
t.bundler.BundleByteLimit = pubsub.MaxPublishRequestBytes
t.bundler.HandlerLimit = 25 * runtime.GOMAXPROCS(0)
return t
}
func (t *stubTopic) sendBundle() func(items interface{}) {
return func(items interface{}) {
t.bLock.Lock()
defer t.bLock.Unlock()
bundled := items.([]*bundledMsg)
for _, msg := range bundled {
r := msg.stubResult
for _, id := range r.metricIDs {
t.published[id] = msg.Message
}
if r.sendError {
r.err <- errors.New(errMockFail)
} else {
r.done <- struct{}{}
}
}
t.bundleCount++
}
}
func (t *stubTopic) parseIDs(msg *pubsub.Message) []string {
p := influx.Parser{}
err := p.Init()
require.NoError(t, err)
decoder, err := internal.NewContentDecoder(t.ContentEncoding)
require.NoError(t, err)
d, err := decoder.Decode(msg.Data)
if err != nil {
t.Errorf("unable to decode message: %v", err)
}
if t.Base64Data {
strData, err := base64.StdEncoding.DecodeString(string(d))
if err != nil {
t.Errorf("unable to base64 decode message: %v", err)
}
d = strData
}
metrics, err := p.Parse(d)
if err != nil {
t.Fatalf("unexpected parsing error: %v", err)
}
ids := make([]string, 0, len(metrics))
for _, met := range metrics {
id, _ := met.GetField("value")
ids = append(ids, id.(string))
}
return ids
}
func (r *stubResult) Get(ctx context.Context) (string, error) {
select {
case <-ctx.Done():
return "", ctx.Err()
case err := <-r.err:
return "", err
case <-r.done:
return "id-" + r.metricIDs[0], nil
}
}
func (t *stubTopic) getBundleCount() int {
t.bLock.Lock()
defer t.bLock.Unlock()
return t.bundleCount
}