1
0
Fork 0
telegraf/metric/deserialize.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

85 lines
2.1 KiB
Go

package metric
import (
"bytes"
"encoding/gob"
"errors"
"fmt"
"sync"
"github.com/influxdata/telegraf"
)
// storage for tracking data that can't be serialized to disk
var (
// grouped tracking metrics means that ID->Data association is not one to one,
// many metrics could be associated with one tracking ID so we cannot just
// clear this every time in FromBytes.
trackingStore = make(map[telegraf.TrackingID]telegraf.TrackingData)
mu = sync.Mutex{}
// ErrSkipTracking indicates that tracking information could not be found after
// deserializing a metric from bytes. In this case we should skip the metric
// and continue as if it does not exist.
ErrSkipTracking = errors.New("metric tracking data not found")
)
type serializedMetric struct {
M telegraf.Metric
TID telegraf.TrackingID
}
func ToBytes(m telegraf.Metric) ([]byte, error) {
var sm serializedMetric
if um, ok := m.(telegraf.UnwrappableMetric); ok {
sm.M = um.Unwrap()
} else {
sm.M = m
}
if tm, ok := m.(telegraf.TrackingMetric); ok {
sm.TID = tm.TrackingID()
mu.Lock()
trackingStore[sm.TID] = tm.TrackingData()
mu.Unlock()
}
var buf bytes.Buffer
encoder := gob.NewEncoder(&buf)
if err := encoder.Encode(&sm); err != nil {
return nil, fmt.Errorf("failed to encode metric to bytes: %w", err)
}
return buf.Bytes(), nil
}
func FromBytes(b []byte) (telegraf.Metric, error) {
buf := bytes.NewBuffer(b)
decoder := gob.NewDecoder(buf)
var sm *serializedMetric
if err := decoder.Decode(&sm); err != nil {
return nil, fmt.Errorf("failed to decode metric from bytes: %w", err)
}
m := sm.M
if sm.TID != 0 {
mu.Lock()
td := trackingStore[sm.TID]
if td == nil {
mu.Unlock()
return nil, ErrSkipTracking
}
rc := td.RefCount()
if rc <= 1 {
// only 1 metric left referencing this tracking ID, we can remove here since no subsequent metrics
// read can use this ID. If another metric in a metric group with this ID gets added later, it will
// simply be added back into the tracking store again.
trackingStore[sm.TID] = nil
}
mu.Unlock()
m = rebuildTrackingMetric(m, td)
}
return m, nil
}