//go:generate ../../../tools/readme_config_includer/generator package opentelemetry import ( "context" ntls "crypto/tls" _ "embed" "sort" "time" "github.com/influxdata/influxdb-observability/common" "github.com/influxdata/influxdb-observability/influx2otel" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" _ "google.golang.org/grpc/encoding/gzip" // Blank import to allow gzip encoding "google.golang.org/grpc/metadata" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/outputs" ) var userAgent = internal.ProductToken() //go:embed sample.conf var sampleConfig string type OpenTelemetry struct { ServiceAddress string `toml:"service_address"` tls.ClientConfig Timeout config.Duration `toml:"timeout"` Compression string `toml:"compression"` Headers map[string]string `toml:"headers"` Attributes map[string]string `toml:"attributes"` Coralogix *CoralogixConfig `toml:"coralogix"` Log telegraf.Logger `toml:"-"` metricsConverter *influx2otel.LineProtocolToOtelMetrics grpcClientConn *grpc.ClientConn metricsServiceClient pmetricotlp.GRPCClient callOptions []grpc.CallOption } type CoralogixConfig struct { AppName string `toml:"application"` SubSystem string `toml:"subsystem"` PrivateKey string `toml:"private_key"` } func (*OpenTelemetry) SampleConfig() string { return sampleConfig } func (o *OpenTelemetry) Connect() error { logger := &otelLogger{o.Log} if o.ServiceAddress == "" { o.ServiceAddress = defaultServiceAddress } if o.Timeout <= 0 { o.Timeout = defaultTimeout } if o.Compression == "" { o.Compression = defaultCompression } if o.Coralogix != nil { if o.Headers == nil { o.Headers = make(map[string]string) } o.Headers["ApplicationName"] = o.Coralogix.AppName o.Headers["ApiName"] = o.Coralogix.SubSystem o.Headers["Authorization"] = "Bearer " + o.Coralogix.PrivateKey } metricsConverter, err := influx2otel.NewLineProtocolToOtelMetrics(logger) if err != nil { return err } var grpcTLSDialOption grpc.DialOption if tlsConfig, err := o.ClientConfig.TLSConfig(); err != nil { return err } else if tlsConfig != nil { grpcTLSDialOption = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) } else if o.Coralogix != nil { // For coralogix, we enforce GRPC connection with TLS grpcTLSDialOption = grpc.WithTransportCredentials(credentials.NewTLS(&ntls.Config{})) } else { grpcTLSDialOption = grpc.WithTransportCredentials(insecure.NewCredentials()) } grpcClientConn, err := grpc.NewClient(o.ServiceAddress, grpcTLSDialOption, grpc.WithUserAgent(userAgent)) if err != nil { return err } metricsServiceClient := pmetricotlp.NewGRPCClient(grpcClientConn) o.metricsConverter = metricsConverter o.grpcClientConn = grpcClientConn o.metricsServiceClient = metricsServiceClient if o.Compression != "" && o.Compression != "none" { o.callOptions = append(o.callOptions, grpc.UseCompressor(o.Compression)) } return nil } func (o *OpenTelemetry) Close() error { if o.grpcClientConn != nil { err := o.grpcClientConn.Close() o.grpcClientConn = nil return err } return nil } // Split metrics up by timestamp and send to Google Cloud Stackdriver func (o *OpenTelemetry) Write(metrics []telegraf.Metric) error { metricBatch := make(map[int64][]telegraf.Metric) timestamps := make([]int64, 0, len(metrics)) for _, metric := range metrics { timestamp := metric.Time().UnixNano() if existingSlice, ok := metricBatch[timestamp]; ok { metricBatch[timestamp] = append(existingSlice, metric) } else { metricBatch[timestamp] = []telegraf.Metric{metric} timestamps = append(timestamps, timestamp) } } // sort the timestamps we collected sort.Slice(timestamps, func(i, j int) bool { return timestamps[i] < timestamps[j] }) o.Log.Debugf("Received %d metrics and split into %d groups by timestamp", len(metrics), len(metricBatch)) for _, timestamp := range timestamps { if err := o.sendBatch(metricBatch[timestamp]); err != nil { return err } } return nil } func (o *OpenTelemetry) sendBatch(metrics []telegraf.Metric) error { batch := o.metricsConverter.NewBatch() for _, metric := range metrics { var vType common.InfluxMetricValueType switch metric.Type() { case telegraf.Gauge: vType = common.InfluxMetricValueTypeGauge case telegraf.Untyped: vType = common.InfluxMetricValueTypeUntyped case telegraf.Counter: vType = common.InfluxMetricValueTypeSum case telegraf.Histogram: vType = common.InfluxMetricValueTypeHistogram case telegraf.Summary: vType = common.InfluxMetricValueTypeSummary default: o.Log.Warnf("Unrecognized metric type %v", metric.Type()) continue } err := batch.AddPoint(metric.Name(), metric.Tags(), metric.Fields(), metric.Time(), vType) if err != nil { o.Log.Warnf("Failed to add point: %v", err) continue } } md := pmetricotlp.NewExportRequestFromMetrics(batch.GetMetrics()) if md.Metrics().ResourceMetrics().Len() == 0 { return nil } if len(o.Attributes) > 0 { for i := 0; i < md.Metrics().ResourceMetrics().Len(); i++ { for k, v := range o.Attributes { md.Metrics().ResourceMetrics().At(i).Resource().Attributes().PutStr(k, v) } } } ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.Timeout)) if len(o.Headers) > 0 { ctx = metadata.NewOutgoingContext(ctx, metadata.New(o.Headers)) } defer cancel() _, err := o.metricsServiceClient.Export(ctx, md, o.callOptions...) return err } const ( defaultServiceAddress = "localhost:4317" defaultTimeout = config.Duration(5 * time.Second) defaultCompression = "gzip" ) func init() { outputs.Add("opentelemetry", func() telegraf.Output { return &OpenTelemetry{ ServiceAddress: defaultServiceAddress, Timeout: defaultTimeout, Compression: defaultCompression, } }) }