1
0
Fork 0
telegraf/plugins/outputs/stackdriver/counter_cache.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

99 lines
2.6 KiB
Go

package stackdriver
import (
"path"
"sort"
"strings"
"sync"
"time"
monpb "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb"
tspb "google.golang.org/protobuf/types/known/timestamppb"
"github.com/influxdata/telegraf"
)
type counterCache struct {
sync.RWMutex
cache map[string]*counterCacheEntry
log telegraf.Logger
}
type counterCacheEntry struct {
LastValue *monpb.TypedValue
StartTime *tspb.Timestamp
}
func (cce *counterCacheEntry) Reset(ts *tspb.Timestamp) {
// always backdate a reset by -1ms, otherwise stackdriver's API will hate us
cce.StartTime = tspb.New(ts.AsTime().Add(time.Millisecond * -1))
}
func (cc *counterCache) get(key string) (*counterCacheEntry, bool) {
cc.RLock()
defer cc.RUnlock()
value, ok := cc.cache[key]
return value, ok
}
func (cc *counterCache) set(key string, value *counterCacheEntry) {
cc.Lock()
defer cc.Unlock()
cc.cache[key] = value
}
func (cc *counterCache) GetStartTime(key string, value *monpb.TypedValue, endTime *tspb.Timestamp) *tspb.Timestamp {
lastObserved, ok := cc.get(key)
// init: create a new key, backdate the state time to 1ms before the end time
if !ok {
newEntry := NewCounterCacheEntry(value, endTime)
cc.set(key, newEntry)
return newEntry.StartTime
}
// update of existing entry
if value.GetDoubleValue() < lastObserved.LastValue.GetDoubleValue() || value.GetInt64Value() < lastObserved.LastValue.GetInt64Value() {
// counter reset
lastObserved.Reset(endTime)
} else {
// counter increment
//
// ...but...
// start times cannot be over 25 hours old; reset after 1 day to be safe
age := endTime.GetSeconds() - lastObserved.StartTime.GetSeconds()
if age > 86400 {
lastObserved.Reset(endTime)
}
}
// update last observed value
lastObserved.LastValue = value
return lastObserved.StartTime
}
func NewCounterCache(log telegraf.Logger) *counterCache {
return &counterCache{
cache: make(map[string]*counterCacheEntry),
log: log}
}
func NewCounterCacheEntry(value *monpb.TypedValue, ts *tspb.Timestamp) *counterCacheEntry {
// Start times must be _before_ the end time, so backdate our original start time
// to 1ms before the observed time.
backDatedStart := ts.AsTime().Add(time.Millisecond * -1)
return &counterCacheEntry{LastValue: value, StartTime: tspb.New(backDatedStart)}
}
func GetCounterCacheKey(m telegraf.Metric, f *telegraf.Field) string {
// normalize tag list to form a predictable key
tags := make([]string, 0, len(m.TagList()))
for _, t := range m.TagList() {
tags = append(tags, strings.Join([]string{t.Key, t.Value}, "="))
}
sort.Strings(tags)
key := ""
if f != nil {
key = f.Key
}
return path.Join(m.Name(), strings.Join(tags, "/"), key)
}