100 lines
2.6 KiB
Go
100 lines
2.6 KiB
Go
|
package stackdriver
|
||
|
|
||
|
import (
|
||
|
"path"
|
||
|
"sort"
|
||
|
"strings"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
monpb "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb"
|
||
|
tspb "google.golang.org/protobuf/types/known/timestamppb"
|
||
|
|
||
|
"github.com/influxdata/telegraf"
|
||
|
)
|
||
|
|
||
|
type counterCache struct {
|
||
|
sync.RWMutex
|
||
|
cache map[string]*counterCacheEntry
|
||
|
log telegraf.Logger
|
||
|
}
|
||
|
|
||
|
type counterCacheEntry struct {
|
||
|
LastValue *monpb.TypedValue
|
||
|
StartTime *tspb.Timestamp
|
||
|
}
|
||
|
|
||
|
func (cce *counterCacheEntry) Reset(ts *tspb.Timestamp) {
|
||
|
// always backdate a reset by -1ms, otherwise stackdriver's API will hate us
|
||
|
cce.StartTime = tspb.New(ts.AsTime().Add(time.Millisecond * -1))
|
||
|
}
|
||
|
|
||
|
func (cc *counterCache) get(key string) (*counterCacheEntry, bool) {
|
||
|
cc.RLock()
|
||
|
defer cc.RUnlock()
|
||
|
value, ok := cc.cache[key]
|
||
|
return value, ok
|
||
|
}
|
||
|
|
||
|
func (cc *counterCache) set(key string, value *counterCacheEntry) {
|
||
|
cc.Lock()
|
||
|
defer cc.Unlock()
|
||
|
cc.cache[key] = value
|
||
|
}
|
||
|
|
||
|
func (cc *counterCache) GetStartTime(key string, value *monpb.TypedValue, endTime *tspb.Timestamp) *tspb.Timestamp {
|
||
|
lastObserved, ok := cc.get(key)
|
||
|
|
||
|
// init: create a new key, backdate the state time to 1ms before the end time
|
||
|
if !ok {
|
||
|
newEntry := NewCounterCacheEntry(value, endTime)
|
||
|
cc.set(key, newEntry)
|
||
|
return newEntry.StartTime
|
||
|
}
|
||
|
|
||
|
// update of existing entry
|
||
|
if value.GetDoubleValue() < lastObserved.LastValue.GetDoubleValue() || value.GetInt64Value() < lastObserved.LastValue.GetInt64Value() {
|
||
|
// counter reset
|
||
|
lastObserved.Reset(endTime)
|
||
|
} else {
|
||
|
// counter increment
|
||
|
//
|
||
|
// ...but...
|
||
|
// start times cannot be over 25 hours old; reset after 1 day to be safe
|
||
|
age := endTime.GetSeconds() - lastObserved.StartTime.GetSeconds()
|
||
|
if age > 86400 {
|
||
|
lastObserved.Reset(endTime)
|
||
|
}
|
||
|
}
|
||
|
// update last observed value
|
||
|
lastObserved.LastValue = value
|
||
|
return lastObserved.StartTime
|
||
|
}
|
||
|
|
||
|
func NewCounterCache(log telegraf.Logger) *counterCache {
|
||
|
return &counterCache{
|
||
|
cache: make(map[string]*counterCacheEntry),
|
||
|
log: log}
|
||
|
}
|
||
|
|
||
|
func NewCounterCacheEntry(value *monpb.TypedValue, ts *tspb.Timestamp) *counterCacheEntry {
|
||
|
// Start times must be _before_ the end time, so backdate our original start time
|
||
|
// to 1ms before the observed time.
|
||
|
backDatedStart := ts.AsTime().Add(time.Millisecond * -1)
|
||
|
return &counterCacheEntry{LastValue: value, StartTime: tspb.New(backDatedStart)}
|
||
|
}
|
||
|
|
||
|
func GetCounterCacheKey(m telegraf.Metric, f *telegraf.Field) string {
|
||
|
// normalize tag list to form a predictable key
|
||
|
tags := make([]string, 0, len(m.TagList()))
|
||
|
for _, t := range m.TagList() {
|
||
|
tags = append(tags, strings.Join([]string{t.Key, t.Value}, "="))
|
||
|
}
|
||
|
sort.Strings(tags)
|
||
|
key := ""
|
||
|
if f != nil {
|
||
|
key = f.Key
|
||
|
}
|
||
|
return path.Join(m.Name(), strings.Join(tags, "/"), key)
|
||
|
}
|