106 lines
2.5 KiB
Go
106 lines
2.5 KiB
Go
package models
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
logging "github.com/influxdata/telegraf/logger"
|
|
"github.com/influxdata/telegraf/selfstat"
|
|
)
|
|
|
|
// SerializerConfig is the common config for all serializers.
|
|
type SerializerConfig struct {
|
|
Parent string
|
|
Alias string
|
|
DataFormat string
|
|
DefaultTags map[string]string
|
|
LogLevel string
|
|
}
|
|
|
|
type RunningSerializer struct {
|
|
Serializer telegraf.Serializer
|
|
Config *SerializerConfig
|
|
log telegraf.Logger
|
|
|
|
MetricsSerialized selfstat.Stat
|
|
BytesSerialized selfstat.Stat
|
|
SerializationTime selfstat.Stat
|
|
}
|
|
|
|
func NewRunningSerializer(serializer telegraf.Serializer, config *SerializerConfig) *RunningSerializer {
|
|
tags := map[string]string{"type": config.DataFormat}
|
|
if config.Alias != "" {
|
|
tags["alias"] = config.Alias
|
|
}
|
|
|
|
serializerErrorsRegister := selfstat.Register("serializer", "errors", tags)
|
|
logger := logging.New("serializers", config.DataFormat+"::"+config.Parent, config.Alias)
|
|
logger.RegisterErrorCallback(func() {
|
|
serializerErrorsRegister.Incr(1)
|
|
})
|
|
if err := logger.SetLogLevel(config.LogLevel); err != nil {
|
|
logger.Error(err)
|
|
}
|
|
SetLoggerOnPlugin(serializer, logger)
|
|
|
|
return &RunningSerializer{
|
|
Serializer: serializer,
|
|
Config: config,
|
|
MetricsSerialized: selfstat.Register(
|
|
"serializer",
|
|
"metrics_serialized",
|
|
tags,
|
|
),
|
|
BytesSerialized: selfstat.Register(
|
|
"serializer",
|
|
"bytes_serialized",
|
|
tags,
|
|
),
|
|
SerializationTime: selfstat.Register(
|
|
"serializer",
|
|
"serialization_time_ns",
|
|
tags,
|
|
),
|
|
log: logger,
|
|
}
|
|
}
|
|
|
|
func (r *RunningSerializer) LogName() string {
|
|
return logName("parsers", r.Config.DataFormat+"::"+r.Config.Parent, r.Config.Alias)
|
|
}
|
|
|
|
func (r *RunningSerializer) Init() error {
|
|
if p, ok := r.Serializer.(telegraf.Initializer); ok {
|
|
err := p.Init()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *RunningSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
|
|
start := time.Now()
|
|
buf, err := r.Serializer.Serialize(metric)
|
|
elapsed := time.Since(start)
|
|
r.SerializationTime.Incr(elapsed.Nanoseconds())
|
|
r.MetricsSerialized.Incr(1)
|
|
r.BytesSerialized.Incr(int64(len(buf)))
|
|
|
|
return buf, err
|
|
}
|
|
|
|
func (r *RunningSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
|
start := time.Now()
|
|
buf, err := r.Serializer.SerializeBatch(metrics)
|
|
elapsed := time.Since(start)
|
|
r.SerializationTime.Incr(elapsed.Nanoseconds())
|
|
r.MetricsSerialized.Incr(int64(len(metrics)))
|
|
r.BytesSerialized.Incr(int64(len(buf)))
|
|
|
|
return buf, err
|
|
}
|
|
|
|
func (r *RunningSerializer) Log() telegraf.Logger {
|
|
return r.log
|
|
}
|