179 lines
3.8 KiB
Go
179 lines
3.8 KiB
Go
package kinesis_consumer
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
"github.com/aws/aws-sdk-go-v2/config"
|
|
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
|
|
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
)
|
|
|
|
var errNotFound = errors.New("no iterator found")
|
|
|
|
type iterator struct {
|
|
stream string
|
|
shard string
|
|
seqnr string
|
|
modified bool
|
|
}
|
|
|
|
type store struct {
|
|
app string
|
|
table string
|
|
interval time.Duration
|
|
log telegraf.Logger
|
|
|
|
client *dynamodb.Client
|
|
iterators map[string]iterator
|
|
|
|
wg sync.WaitGroup
|
|
cancel context.CancelFunc
|
|
|
|
sync.Mutex
|
|
}
|
|
|
|
func newStore(app, table string, interval time.Duration, log telegraf.Logger) *store {
|
|
s := &store{
|
|
app: app,
|
|
table: table,
|
|
interval: interval,
|
|
log: log,
|
|
}
|
|
|
|
// Initialize the iterator states
|
|
s.iterators = make(map[string]iterator)
|
|
|
|
return s
|
|
}
|
|
|
|
func (s *store) run(ctx context.Context) error {
|
|
rctx, cancel := context.WithCancel(ctx)
|
|
s.cancel = cancel
|
|
|
|
// Create a client to connect to DynamoDB
|
|
cfg, err := config.LoadDefaultConfig(rctx)
|
|
if err != nil {
|
|
return fmt.Errorf("loading default config failed: %w", err)
|
|
}
|
|
s.client = dynamodb.NewFromConfig(cfg)
|
|
|
|
// Start the go-routine that pushes the states out to DynamoDB on a
|
|
// regular interval
|
|
s.wg.Add(1)
|
|
go func() {
|
|
defer s.wg.Done()
|
|
|
|
ticker := time.NewTicker(s.interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-rctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
s.write(rctx)
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *store) stop() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), s.interval)
|
|
defer cancel()
|
|
s.write(ctx)
|
|
|
|
s.cancel()
|
|
s.wg.Wait()
|
|
}
|
|
|
|
func (s *store) write(ctx context.Context) {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
|
|
for k, iter := range s.iterators {
|
|
// Only write iterators modified since the last write
|
|
if !iter.modified {
|
|
continue
|
|
}
|
|
|
|
if _, err := s.client.PutItem(
|
|
ctx,
|
|
&dynamodb.PutItemInput{
|
|
TableName: aws.String(s.table),
|
|
Item: map[string]types.AttributeValue{
|
|
"namespace": &types.AttributeValueMemberS{Value: s.app + "-" + iter.stream},
|
|
"shard_id": &types.AttributeValueMemberS{Value: iter.shard},
|
|
"sequence_number": &types.AttributeValueMemberS{Value: iter.seqnr},
|
|
},
|
|
}); err != nil {
|
|
s.log.Errorf("storing iterator %s-%s/%s/%s failed: %v", s.app, iter.stream, iter.shard, iter.seqnr, err)
|
|
}
|
|
|
|
// Mark state as saved
|
|
iter.modified = false
|
|
s.iterators[k] = iter
|
|
}
|
|
}
|
|
|
|
func (s *store) set(stream, shard, seqnr string) {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
|
|
s.iterators[stream+"/"+shard] = iterator{
|
|
stream: stream,
|
|
shard: shard,
|
|
seqnr: seqnr,
|
|
modified: true,
|
|
}
|
|
}
|
|
|
|
func (s *store) get(ctx context.Context, stream, shard string) (string, error) {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
|
|
// Return the cached result if possible
|
|
if iter, found := s.iterators[stream+"/"+shard]; found {
|
|
return iter.seqnr, nil
|
|
}
|
|
|
|
// Retrieve the information from the database
|
|
resp, err := s.client.GetItem(ctx, &dynamodb.GetItemInput{
|
|
TableName: aws.String(s.table),
|
|
ConsistentRead: aws.Bool(true),
|
|
Key: map[string]types.AttributeValue{
|
|
"namespace": &types.AttributeValueMemberS{Value: s.app + "-" + stream},
|
|
"shard_id": &types.AttributeValueMemberS{Value: shard},
|
|
},
|
|
})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// Extract the sequence number
|
|
raw, found := resp.Item["sequence_number"]
|
|
if !found {
|
|
return "", fmt.Errorf("%w for %s-%s/%s", errNotFound, s.app, stream, shard)
|
|
}
|
|
seqnr, ok := raw.(*types.AttributeValueMemberS)
|
|
if !ok {
|
|
return "", fmt.Errorf("sequence number for %s-%s/%s is of unexpected type %T", s.app, stream, shard, raw)
|
|
}
|
|
|
|
// Fill the cache
|
|
s.iterators[stream+"/"+shard] = iterator{
|
|
stream: stream,
|
|
shard: shard,
|
|
seqnr: seqnr.Value,
|
|
}
|
|
|
|
return seqnr.Value, nil
|
|
}
|