1
0
Fork 0
telegraf/plugins/inputs/opentelemetry/grpc_services.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

103 lines
3.2 KiB
Go

package opentelemetry
import (
"context"
"fmt"
"github.com/influxdata/influxdb-observability/common"
"github.com/influxdata/influxdb-observability/otel2influx"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
)
type traceService struct {
ptraceotlp.UnimplementedGRPCServer
exporter *otel2influx.OtelTracesToLineProtocol
}
var _ ptraceotlp.GRPCServer = (*traceService)(nil)
func newTraceService(logger common.Logger, writer *writeToAccumulator, spanDimensions []string) (*traceService, error) {
expConfig := otel2influx.DefaultOtelTracesToLineProtocolConfig()
expConfig.Logger = logger
expConfig.Writer = writer
expConfig.SpanDimensions = spanDimensions
exp, err := otel2influx.NewOtelTracesToLineProtocol(expConfig)
if err != nil {
return nil, err
}
return &traceService{
exporter: exp,
}, nil
}
// Export processes and exports the trace data received in the request.
func (s *traceService) Export(ctx context.Context, req ptraceotlp.ExportRequest) (ptraceotlp.ExportResponse, error) {
err := s.exporter.WriteTraces(ctx, req.Traces())
return ptraceotlp.NewExportResponse(), err
}
type metricsService struct {
pmetricotlp.UnimplementedGRPCServer
exporter *otel2influx.OtelMetricsToLineProtocol
}
var _ pmetricotlp.GRPCServer = (*metricsService)(nil)
var metricsSchemata = map[string]common.MetricsSchema{
"prometheus-v1": common.MetricsSchemaTelegrafPrometheusV1,
"prometheus-v2": common.MetricsSchemaTelegrafPrometheusV2,
}
func newMetricsService(logger common.Logger, writer *writeToAccumulator, schema string) (*metricsService, error) {
ms, found := metricsSchemata[schema]
if !found {
return nil, fmt.Errorf("schema %q not recognized", schema)
}
expConfig := otel2influx.DefaultOtelMetricsToLineProtocolConfig()
expConfig.Logger = logger
expConfig.Writer = writer
expConfig.Schema = ms
exp, err := otel2influx.NewOtelMetricsToLineProtocol(expConfig)
if err != nil {
return nil, err
}
return &metricsService{
exporter: exp,
}, nil
}
// Export processes and exports the metrics data received in the request.
func (s *metricsService) Export(ctx context.Context, req pmetricotlp.ExportRequest) (pmetricotlp.ExportResponse, error) {
err := s.exporter.WriteMetrics(ctx, req.Metrics())
return pmetricotlp.NewExportResponse(), err
}
type logsService struct {
plogotlp.UnimplementedGRPCServer
converter *otel2influx.OtelLogsToLineProtocol
}
var _ plogotlp.GRPCServer = (*logsService)(nil)
func newLogsService(logger common.Logger, writer *writeToAccumulator, logRecordDimensions []string) (*logsService, error) {
expConfig := otel2influx.DefaultOtelLogsToLineProtocolConfig()
expConfig.Logger = logger
expConfig.Writer = writer
expConfig.LogRecordDimensions = logRecordDimensions
exp, err := otel2influx.NewOtelLogsToLineProtocol(expConfig)
if err != nil {
return nil, err
}
return &logsService{
converter: exp,
}, nil
}
// Export processes and exports the logs data received in the request.
func (s *logsService) Export(ctx context.Context, req plogotlp.ExportRequest) (plogotlp.ExportResponse, error) {
err := s.converter.WriteLogs(ctx, req.Logs())
return plogotlp.NewExportResponse(), err
}