1
0
Fork 0
telegraf/plugins/serializers/cloudevents/cloudevents.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

199 lines
4.6 KiB
Go

package cloudevents
import (
"encoding/json"
"errors"
"fmt"
"time"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/gofrs/uuid/v5"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/serializers"
)
const (
EventTypeSingle = "com.influxdata.telegraf.metric"
EventTypeBatch = "com.influxdata.telegraf.metrics"
)
type Serializer struct {
Version string `toml:"cloudevents_version"`
Source string `toml:"cloudevents_source"`
SourceTag string `toml:"cloudevents_source_tag"`
EventType string `toml:"cloudevents_event_type"`
EventTime string `toml:"cloudevents_event_time"`
BatchFormat string `toml:"cloudevents_batch_format"`
Log telegraf.Logger `toml:"-"`
idgen uuid.Generator
}
func (s *Serializer) Init() error {
switch s.Version {
case "":
s.Version = event.CloudEventsVersionV1
case event.CloudEventsVersionV03, event.CloudEventsVersionV1:
default:
return errors.New("invalid 'cloudevents_version'")
}
switch s.EventTime {
case "":
s.EventTime = "latest"
case "none", "earliest", "latest", "creation":
default:
return errors.New("invalid 'cloudevents_event_time'")
}
switch s.BatchFormat {
case "":
s.BatchFormat = "events"
case "metrics", "events":
default:
return errors.New("invalid 'cloudevents_batch_format'")
}
if s.Source == "" {
s.Source = "telegraf"
}
s.idgen = uuid.NewGen()
return nil
}
func (s *Serializer) Serialize(m telegraf.Metric) ([]byte, error) {
// Create the event that forms the envelop around the metric
evt, err := s.createEvent(m)
if err != nil {
return nil, err
}
return evt.MarshalJSON()
}
func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
switch s.BatchFormat {
case "metrics":
return s.batchMetrics(metrics)
case "events":
return s.batchEvents(metrics)
}
return nil, fmt.Errorf("unexpected batch-format %q", s.BatchFormat)
}
func (s *Serializer) batchMetrics(metrics []telegraf.Metric) ([]byte, error) {
// Determine the necessary information
eventType := EventTypeBatch
if s.EventType != "" {
eventType = s.EventType
}
id, err := s.idgen.NewV1()
if err != nil {
return nil, fmt.Errorf("generating ID failed: %w", err)
}
// Serialize the metrics
var earliest, latest time.Time
data := make([]map[string]interface{}, 0, len(metrics))
for _, m := range metrics {
ts := m.Time()
data = append(data, map[string]interface{}{
"name": m.Name(),
"tags": m.Tags(),
"fields": m.Fields(),
"timestamp": ts.UnixNano(),
})
if ts.Before(earliest) {
earliest = ts
}
if ts.After(latest) {
latest = ts
}
}
// Create the event that forms the envelop around the metric
evt := cloudevents.NewEvent(s.Version)
evt.SetSource(s.Source)
evt.SetID(id.String())
evt.SetType(eventType)
if err := evt.SetData(cloudevents.ApplicationJSON, data); err != nil {
return nil, fmt.Errorf("setting data failed: %w", err)
}
switch s.EventTime {
case "creation":
evt.SetTime(time.Now())
case "earliest":
evt.SetTime(earliest)
case "latest":
evt.SetTime(latest)
}
return json.Marshal(evt)
}
func (s *Serializer) batchEvents(metrics []telegraf.Metric) ([]byte, error) {
events := make([]*cloudevents.Event, 0, len(metrics))
for _, m := range metrics {
e, err := s.createEvent(m)
if err != nil {
s.Log.Errorf("Creating event for %v failed: %v", m, err)
continue
}
events = append(events, e)
}
return json.Marshal(events)
}
func (s *Serializer) createEvent(m telegraf.Metric) (*cloudevents.Event, error) {
// Determine the necessary information
source := s.Source
if s.SourceTag != "" {
if v, ok := m.GetTag(s.SourceTag); ok {
source = v
}
}
eventType := EventTypeSingle
if s.EventType != "" {
eventType = s.EventType
}
id, err := s.idgen.NewV1()
if err != nil {
return nil, fmt.Errorf("generating ID failed: %w", err)
}
// Serialize the metric
data := map[string]interface{}{
"name": m.Name(),
"tags": m.Tags(),
"fields": m.Fields(),
"timestamp": m.Time().UnixNano(),
}
// Create the event that forms the envelop around the metric
evt := cloudevents.NewEvent(s.Version)
evt.SetSource(source)
evt.SetID(id.String())
evt.SetType(eventType)
if err := evt.SetData(cloudevents.ApplicationJSON, data); err != nil {
return nil, fmt.Errorf("setting data failed: %w", err)
}
switch s.EventTime {
case "creation":
evt.SetTime(time.Now())
case "earliest", "latest":
evt.SetTime(m.Time())
}
return &evt, nil
}
func init() {
serializers.Add("cloudevents",
func() telegraf.Serializer {
return &Serializer{}
},
)
}