//go:generate ../../../tools/readme_config_includer/generator package signalfx import ( "context" _ "embed" "errors" "fmt" "strings" "github.com/signalfx/golib/v3/datapoint" "github.com/signalfx/golib/v3/datapoint/dpsink" "github.com/signalfx/golib/v3/event" "github.com/signalfx/golib/v3/sfxclient" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/outputs" ) //go:embed sample.conf var sampleConfig string // init initializes the plugin context func init() { outputs.Add("signalfx", func() telegraf.Output { return NewSignalFx() }) } // SignalFx plugin context type SignalFx struct { AccessToken config.Secret `toml:"access_token"` SignalFxRealm string `toml:"signalfx_realm"` IngestURL string `toml:"ingest_url"` IncludedEventNames []string `toml:"included_event_names"` Log telegraf.Logger `toml:"-"` includedEventSet map[string]bool client dpsink.Sink ctx context.Context cancel context.CancelFunc } // GetMetricType returns the equivalent telegraf ValueType for a signalfx metric type func GetMetricType(mtype telegraf.ValueType) (metricType datapoint.MetricType) { switch mtype { case telegraf.Counter: metricType = datapoint.Counter case telegraf.Gauge: metricType = datapoint.Gauge case telegraf.Summary: metricType = datapoint.Gauge case telegraf.Histogram: metricType = datapoint.Gauge case telegraf.Untyped: metricType = datapoint.Gauge default: metricType = datapoint.Gauge } return metricType } // NewSignalFx - returns a new context for the SignalFx output plugin func NewSignalFx() *SignalFx { ctx, cancel := context.WithCancel(context.Background()) return &SignalFx{ IncludedEventNames: []string{""}, ctx: ctx, cancel: cancel, client: sfxclient.NewHTTPSink(), } } func (*SignalFx) SampleConfig() string { return sampleConfig } // Connect establishes a connection to SignalFx func (s *SignalFx) Connect() error { client := s.client.(*sfxclient.HTTPSink) token, err := s.AccessToken.Get() if err != nil { return fmt.Errorf("getting token failed: %w", err) } client.AuthToken = token.String() token.Destroy() if s.IngestURL != "" { client.DatapointEndpoint = datapointEndpointForIngestURL(s.IngestURL) client.EventEndpoint = eventEndpointForIngestURL(s.IngestURL) } else if s.SignalFxRealm != "" { client.DatapointEndpoint = datapointEndpointForRealm(s.SignalFxRealm) client.EventEndpoint = eventEndpointForRealm(s.SignalFxRealm) } else { return errors.New("signalfx_realm or ingest_url must be configured") } return nil } // Close closes any connections to SignalFx func (s *SignalFx) Close() error { s.cancel() s.client.(*sfxclient.HTTPSink).Client.CloseIdleConnections() return nil } func (s *SignalFx) ConvertToSignalFx(metrics []telegraf.Metric) ([]*datapoint.Datapoint, []*event.Event) { var dps []*datapoint.Datapoint var events []*event.Event for _, metric := range metrics { s.Log.Debugf("Processing the following measurement: %v", metric) var timestamp = metric.Time() metricType := GetMetricType(metric.Type()) for field, val := range metric.Fields() { // Copy the metric tags because they are meant to be treated as // immutable var metricDims = metric.Tags() // Generate the metric name metricName := getMetricName(metric.Name(), field) // Get the metric value as a datapoint value if metricValue, err := datapoint.CastMetricValueWithBool(val); err == nil { var dp = datapoint.New(metricName, metricDims, metricValue, metricType, timestamp) s.Log.Debugf("Datapoint: %v", dp.String()) dps = append(dps, dp) } else { // Skip if it's not an explicitly included event if !s.isEventIncluded(metricName) { continue } // We've already type checked field, so set property with value metricProps := map[string]interface{}{"message": val} var ev = event.NewWithProperties(metricName, event.AGENT, metricDims, metricProps, timestamp) s.Log.Debugf("Event: %v", ev.String()) events = append(events, ev) } } } return dps, events } // Write call back for writing metrics func (s *SignalFx) Write(metrics []telegraf.Metric) error { dps, events := s.ConvertToSignalFx(metrics) if len(dps) > 0 { err := s.client.AddDatapoints(s.ctx, dps) if err != nil { return err } } if len(events) > 0 { if err := s.client.AddEvents(s.ctx, events); err != nil { // If events error out but we successfully sent some datapoints, // don't return an error so that it won't ever retry -- that way we // don't send the same datapoints twice. if len(dps) == 0 { return err } s.Log.Errorf("Failed to send SignalFx event: %v", err) } } return nil } // isEventIncluded - checks whether a metric name for an event was put on the whitelist func (s *SignalFx) isEventIncluded(name string) bool { if s.includedEventSet == nil { s.includedEventSet = make(map[string]bool, len(s.includedEventSet)) for _, include := range s.IncludedEventNames { s.includedEventSet[include] = true } } return s.includedEventSet[name] } // getMetricName combines telegraf fields and tags into a full metric name func getMetricName(metric, field string) string { name := metric // Include field in metric name when it adds to the metric name if field != "value" { name = fmt.Sprintf("%s.%s", name, field) } return name } // ingestURLForRealm returns the base ingest URL for a particular SignalFx // realm func ingestURLForRealm(realm string) string { return fmt.Sprintf("https://ingest.%s.signalfx.com", realm) } // datapointEndpointForRealm returns the endpoint to which datapoints should be // POSTed for a particular realm. func datapointEndpointForRealm(realm string) string { return datapointEndpointForIngestURL(ingestURLForRealm(realm)) } // datapointEndpointForRealm returns the endpoint to which datapoints should be // POSTed for a particular ingest base URL. func datapointEndpointForIngestURL(ingestURL string) string { return strings.TrimRight(ingestURL, "/") + "/v2/datapoint" } // eventEndpointForRealm returns the endpoint to which events should be // POSTed for a particular realm. func eventEndpointForRealm(realm string) string { return eventEndpointForIngestURL(ingestURLForRealm(realm)) } // eventEndpointForRealm returns the endpoint to which events should be // POSTed for a particular ingest base URL. func eventEndpointForIngestURL(ingestURL string) string { return strings.TrimRight(ingestURL, "/") + "/v2/event" }