1
0
Fork 0
telegraf/plugins/inputs/kinesis_consumer/store.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

179 lines
3.8 KiB
Go

package kinesis_consumer
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/influxdata/telegraf"
)
var errNotFound = errors.New("no iterator found")
type iterator struct {
stream string
shard string
seqnr string
modified bool
}
type store struct {
app string
table string
interval time.Duration
log telegraf.Logger
client *dynamodb.Client
iterators map[string]iterator
wg sync.WaitGroup
cancel context.CancelFunc
sync.Mutex
}
func newStore(app, table string, interval time.Duration, log telegraf.Logger) *store {
s := &store{
app: app,
table: table,
interval: interval,
log: log,
}
// Initialize the iterator states
s.iterators = make(map[string]iterator)
return s
}
func (s *store) run(ctx context.Context) error {
rctx, cancel := context.WithCancel(ctx)
s.cancel = cancel
// Create a client to connect to DynamoDB
cfg, err := config.LoadDefaultConfig(rctx)
if err != nil {
return fmt.Errorf("loading default config failed: %w", err)
}
s.client = dynamodb.NewFromConfig(cfg)
// Start the go-routine that pushes the states out to DynamoDB on a
// regular interval
s.wg.Add(1)
go func() {
defer s.wg.Done()
ticker := time.NewTicker(s.interval)
defer ticker.Stop()
for {
select {
case <-rctx.Done():
return
case <-ticker.C:
s.write(rctx)
}
}
}()
return nil
}
func (s *store) stop() {
ctx, cancel := context.WithTimeout(context.Background(), s.interval)
defer cancel()
s.write(ctx)
s.cancel()
s.wg.Wait()
}
func (s *store) write(ctx context.Context) {
s.Lock()
defer s.Unlock()
for k, iter := range s.iterators {
// Only write iterators modified since the last write
if !iter.modified {
continue
}
if _, err := s.client.PutItem(
ctx,
&dynamodb.PutItemInput{
TableName: aws.String(s.table),
Item: map[string]types.AttributeValue{
"namespace": &types.AttributeValueMemberS{Value: s.app + "-" + iter.stream},
"shard_id": &types.AttributeValueMemberS{Value: iter.shard},
"sequence_number": &types.AttributeValueMemberS{Value: iter.seqnr},
},
}); err != nil {
s.log.Errorf("storing iterator %s-%s/%s/%s failed: %v", s.app, iter.stream, iter.shard, iter.seqnr, err)
}
// Mark state as saved
iter.modified = false
s.iterators[k] = iter
}
}
func (s *store) set(stream, shard, seqnr string) {
s.Lock()
defer s.Unlock()
s.iterators[stream+"/"+shard] = iterator{
stream: stream,
shard: shard,
seqnr: seqnr,
modified: true,
}
}
func (s *store) get(ctx context.Context, stream, shard string) (string, error) {
s.Lock()
defer s.Unlock()
// Return the cached result if possible
if iter, found := s.iterators[stream+"/"+shard]; found {
return iter.seqnr, nil
}
// Retrieve the information from the database
resp, err := s.client.GetItem(ctx, &dynamodb.GetItemInput{
TableName: aws.String(s.table),
ConsistentRead: aws.Bool(true),
Key: map[string]types.AttributeValue{
"namespace": &types.AttributeValueMemberS{Value: s.app + "-" + stream},
"shard_id": &types.AttributeValueMemberS{Value: shard},
},
})
if err != nil {
return "", err
}
// Extract the sequence number
raw, found := resp.Item["sequence_number"]
if !found {
return "", fmt.Errorf("%w for %s-%s/%s", errNotFound, s.app, stream, shard)
}
seqnr, ok := raw.(*types.AttributeValueMemberS)
if !ok {
return "", fmt.Errorf("sequence number for %s-%s/%s is of unexpected type %T", s.app, stream, shard, raw)
}
// Fill the cache
s.iterators[stream+"/"+shard] = iterator{
stream: stream,
shard: shard,
seqnr: seqnr.Value,
}
return seqnr.Value, nil
}