199 lines
4.6 KiB
Go
199 lines
4.6 KiB
Go
package cloudevents
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
cloudevents "github.com/cloudevents/sdk-go/v2"
|
|
"github.com/cloudevents/sdk-go/v2/event"
|
|
"github.com/gofrs/uuid/v5"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/plugins/serializers"
|
|
)
|
|
|
|
const (
|
|
EventTypeSingle = "com.influxdata.telegraf.metric"
|
|
EventTypeBatch = "com.influxdata.telegraf.metrics"
|
|
)
|
|
|
|
type Serializer struct {
|
|
Version string `toml:"cloudevents_version"`
|
|
Source string `toml:"cloudevents_source"`
|
|
SourceTag string `toml:"cloudevents_source_tag"`
|
|
EventType string `toml:"cloudevents_event_type"`
|
|
EventTime string `toml:"cloudevents_event_time"`
|
|
BatchFormat string `toml:"cloudevents_batch_format"`
|
|
Log telegraf.Logger `toml:"-"`
|
|
|
|
idgen uuid.Generator
|
|
}
|
|
|
|
func (s *Serializer) Init() error {
|
|
switch s.Version {
|
|
case "":
|
|
s.Version = event.CloudEventsVersionV1
|
|
case event.CloudEventsVersionV03, event.CloudEventsVersionV1:
|
|
default:
|
|
return errors.New("invalid 'cloudevents_version'")
|
|
}
|
|
|
|
switch s.EventTime {
|
|
case "":
|
|
s.EventTime = "latest"
|
|
case "none", "earliest", "latest", "creation":
|
|
default:
|
|
return errors.New("invalid 'cloudevents_event_time'")
|
|
}
|
|
|
|
switch s.BatchFormat {
|
|
case "":
|
|
s.BatchFormat = "events"
|
|
case "metrics", "events":
|
|
default:
|
|
return errors.New("invalid 'cloudevents_batch_format'")
|
|
}
|
|
|
|
if s.Source == "" {
|
|
s.Source = "telegraf"
|
|
}
|
|
|
|
s.idgen = uuid.NewGen()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Serializer) Serialize(m telegraf.Metric) ([]byte, error) {
|
|
// Create the event that forms the envelop around the metric
|
|
evt, err := s.createEvent(m)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return evt.MarshalJSON()
|
|
}
|
|
|
|
func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
|
switch s.BatchFormat {
|
|
case "metrics":
|
|
return s.batchMetrics(metrics)
|
|
case "events":
|
|
return s.batchEvents(metrics)
|
|
}
|
|
return nil, fmt.Errorf("unexpected batch-format %q", s.BatchFormat)
|
|
}
|
|
|
|
func (s *Serializer) batchMetrics(metrics []telegraf.Metric) ([]byte, error) {
|
|
// Determine the necessary information
|
|
eventType := EventTypeBatch
|
|
if s.EventType != "" {
|
|
eventType = s.EventType
|
|
}
|
|
id, err := s.idgen.NewV1()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("generating ID failed: %w", err)
|
|
}
|
|
|
|
// Serialize the metrics
|
|
var earliest, latest time.Time
|
|
data := make([]map[string]interface{}, 0, len(metrics))
|
|
for _, m := range metrics {
|
|
ts := m.Time()
|
|
data = append(data, map[string]interface{}{
|
|
"name": m.Name(),
|
|
"tags": m.Tags(),
|
|
"fields": m.Fields(),
|
|
"timestamp": ts.UnixNano(),
|
|
})
|
|
if ts.Before(earliest) {
|
|
earliest = ts
|
|
}
|
|
if ts.After(latest) {
|
|
latest = ts
|
|
}
|
|
}
|
|
|
|
// Create the event that forms the envelop around the metric
|
|
evt := cloudevents.NewEvent(s.Version)
|
|
evt.SetSource(s.Source)
|
|
evt.SetID(id.String())
|
|
evt.SetType(eventType)
|
|
if err := evt.SetData(cloudevents.ApplicationJSON, data); err != nil {
|
|
return nil, fmt.Errorf("setting data failed: %w", err)
|
|
}
|
|
switch s.EventTime {
|
|
case "creation":
|
|
evt.SetTime(time.Now())
|
|
case "earliest":
|
|
evt.SetTime(earliest)
|
|
case "latest":
|
|
evt.SetTime(latest)
|
|
}
|
|
|
|
return json.Marshal(evt)
|
|
}
|
|
|
|
func (s *Serializer) batchEvents(metrics []telegraf.Metric) ([]byte, error) {
|
|
events := make([]*cloudevents.Event, 0, len(metrics))
|
|
for _, m := range metrics {
|
|
e, err := s.createEvent(m)
|
|
if err != nil {
|
|
s.Log.Errorf("Creating event for %v failed: %v", m, err)
|
|
continue
|
|
}
|
|
events = append(events, e)
|
|
}
|
|
return json.Marshal(events)
|
|
}
|
|
|
|
func (s *Serializer) createEvent(m telegraf.Metric) (*cloudevents.Event, error) {
|
|
// Determine the necessary information
|
|
source := s.Source
|
|
if s.SourceTag != "" {
|
|
if v, ok := m.GetTag(s.SourceTag); ok {
|
|
source = v
|
|
}
|
|
}
|
|
eventType := EventTypeSingle
|
|
if s.EventType != "" {
|
|
eventType = s.EventType
|
|
}
|
|
id, err := s.idgen.NewV1()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("generating ID failed: %w", err)
|
|
}
|
|
|
|
// Serialize the metric
|
|
data := map[string]interface{}{
|
|
"name": m.Name(),
|
|
"tags": m.Tags(),
|
|
"fields": m.Fields(),
|
|
"timestamp": m.Time().UnixNano(),
|
|
}
|
|
|
|
// Create the event that forms the envelop around the metric
|
|
evt := cloudevents.NewEvent(s.Version)
|
|
evt.SetSource(source)
|
|
evt.SetID(id.String())
|
|
evt.SetType(eventType)
|
|
if err := evt.SetData(cloudevents.ApplicationJSON, data); err != nil {
|
|
return nil, fmt.Errorf("setting data failed: %w", err)
|
|
}
|
|
switch s.EventTime {
|
|
case "creation":
|
|
evt.SetTime(time.Now())
|
|
case "earliest", "latest":
|
|
evt.SetTime(m.Time())
|
|
}
|
|
|
|
return &evt, nil
|
|
}
|
|
|
|
func init() {
|
|
serializers.Add("cloudevents",
|
|
func() telegraf.Serializer {
|
|
return &Serializer{}
|
|
},
|
|
)
|
|
}
|