1
0
Fork 0
telegraf/plugins/outputs/wavefront/wavefront.go

408 lines
12 KiB
Go
Raw Permalink Normal View History

//go:generate ../../../tools/readme_config_includer/generator
package wavefront
import (
"context"
_ "embed"
"errors"
"fmt"
"net/url"
"regexp"
"strings"
"time"
wavefront "github.com/wavefronthq/wavefront-sdk-go/senders"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
common_http "github.com/influxdata/telegraf/plugins/common/http"
"github.com/influxdata/telegraf/plugins/outputs"
serializers_wavefront "github.com/influxdata/telegraf/plugins/serializers/wavefront"
)
//go:embed sample.conf
var sampleConfig string
const maxTagLength = 254
type authCSPClientCredentials struct {
AppID config.Secret `toml:"app_id"`
AppSecret config.Secret `toml:"app_secret"`
OrgID *string `toml:"org_id"`
}
type Wavefront struct {
URL string `toml:"url"`
Token config.Secret `toml:"token"`
CSPBaseURL string `toml:"auth_csp_base_url"`
AuthCSPAPIToken config.Secret `toml:"auth_csp_api_token"`
AuthCSPClientCredentials *authCSPClientCredentials `toml:"auth_csp_client_credentials"`
Host string `toml:"host" deprecated:"1.28.0;1.35.0;use url instead"`
Port int `toml:"port" deprecated:"1.28.0;1.35.0;use url instead"`
Prefix string `toml:"prefix"`
SimpleFields bool `toml:"simple_fields"`
MetricSeparator string `toml:"metric_separator"`
ConvertPaths bool `toml:"convert_paths"`
ConvertBool bool `toml:"convert_bool"`
HTTPMaximumBatchSize int `toml:"http_maximum_batch_size"`
UseRegex bool `toml:"use_regex"`
UseStrict bool `toml:"use_strict"`
TruncateTags bool `toml:"truncate_tags"`
ImmediateFlush bool `toml:"immediate_flush"`
SendInternalMetrics bool `toml:"send_internal_metrics"`
SourceOverride []string `toml:"source_override"`
StringToNumber map[string][]map[string]float64 `toml:"string_to_number" deprecated:"1.9.0;1.35.0;use the enum processor instead"`
common_http.HTTPClientConfig
sender wavefront.Sender
Log telegraf.Logger `toml:"-"`
}
// instead of Sanitize which may miss some special characters we can use a regex pattern, but this is significantly slower than Sanitize
var sanitizedRegex = regexp.MustCompile(`[^a-zA-Z\d_.-]`)
var tagValueReplacer = strings.NewReplacer("*", "-")
var pathReplacer = strings.NewReplacer("_", "_")
func (*Wavefront) SampleConfig() string {
return sampleConfig
}
func (w *Wavefront) parseConnectionURL() (string, error) {
if w.URL == "" {
if w.Host == "" || w.Port <= 0 {
return "", errors.New("no URL specified")
}
generatedURL := fmt.Sprintf("http://%s:%d", w.Host, w.Port)
w.Log.Warnf("translating host/port into url: %s\n", generatedURL)
return generatedURL, nil
}
u, err := url.ParseRequestURI(w.URL)
if err != nil {
return "", fmt.Errorf("could not parse the provided URL: %s", w.URL)
}
return u.String(), nil
}
func (w *Wavefront) createSender(connectionURL string, flushSeconds int) (wavefront.Sender, error) {
client, err := w.CreateClient(context.Background(), w.Log)
if err != nil {
return nil, err
}
options := []wavefront.Option{
wavefront.BatchSize(w.HTTPMaximumBatchSize),
wavefront.FlushIntervalSeconds(flushSeconds),
wavefront.HTTPClient(client),
wavefront.SendInternalMetrics(w.SendInternalMetrics),
}
authOptions, err := w.makeAuthOptions()
if err != nil {
return nil, err
}
options = append(options, authOptions...)
return wavefront.NewSender(connectionURL, options...)
}
func (w *Wavefront) Connect() error {
flushSeconds := 5
if w.ImmediateFlush {
flushSeconds = 86400 // Set a very long flush interval if we're flushing directly
}
connectionURL, err := w.parseConnectionURL()
if err != nil {
return err
}
sender, err := w.createSender(connectionURL, flushSeconds)
if err != nil {
return errors.New("could not create Wavefront Sender for the provided url")
}
w.sender = sender
if w.ConvertPaths && w.MetricSeparator == "_" {
w.ConvertPaths = false
}
if w.ConvertPaths {
pathReplacer = strings.NewReplacer("_", w.MetricSeparator)
}
return nil
}
func (w *Wavefront) Write(metrics []telegraf.Metric) error {
for _, m := range metrics {
for _, point := range w.buildMetrics(m) {
err := w.sender.SendMetric(point.Metric, point.Value, point.Timestamp, point.Source, point.Tags)
if err != nil {
if isRetryable(err) {
// The internal buffer in the Wavefront SDK is full. To prevent data loss,
// we flush the buffer (which is a blocking operation) and try again.
w.Log.Debug("SDK buffer overrun, forcibly flushing the buffer")
if err = w.sender.Flush(); err != nil {
return fmt.Errorf("wavefront flushing error: %w", err)
}
// Try again.
err = w.sender.SendMetric(point.Metric, point.Value, point.Timestamp, point.Source, point.Tags)
if err != nil {
if isRetryable(err) {
return fmt.Errorf("wavefront sending error: %w", err)
}
}
}
w.Log.Errorf("Non-retryable error during Wavefront.Write: %v", err)
w.Log.Debugf("Non-retryable metric data: %+v", point)
}
}
}
if w.ImmediateFlush {
w.Log.Debugf("Flushing batch of %d points", len(metrics))
return w.sender.Flush()
}
return nil
}
func (w *Wavefront) buildMetrics(m telegraf.Metric) []*serializers_wavefront.MetricPoint {
ret := make([]*serializers_wavefront.MetricPoint, 0)
for fieldName, value := range m.Fields() {
var name string
if !w.SimpleFields && fieldName == "value" {
name = fmt.Sprintf("%s%s", w.Prefix, m.Name())
} else {
name = fmt.Sprintf("%s%s%s%s", w.Prefix, m.Name(), w.MetricSeparator, fieldName)
}
if w.UseRegex {
name = sanitizedRegex.ReplaceAllLiteralString(name, "-")
} else {
name = serializers_wavefront.Sanitize(w.UseStrict, name)
}
if w.ConvertPaths {
name = pathReplacer.Replace(name)
}
metric := &serializers_wavefront.MetricPoint{
Metric: name,
Timestamp: m.Time().Unix(),
}
metricValue, buildError := buildValue(value, metric.Metric, w)
if buildError != nil {
w.Log.Debugf("Error building tags: %s\n", buildError.Error())
continue
}
metric.Value = metricValue
source, tags := w.buildTags(m.Tags())
metric.Source = source
metric.Tags = tags
ret = append(ret, metric)
}
return ret
}
func (w *Wavefront) buildTags(mTags map[string]string) (string, map[string]string) {
// Remove all empty tags.
for k, v := range mTags {
if v == "" {
delete(mTags, k)
}
}
// find source, use source_override property if needed
var source string
if s, ok := mTags["source"]; ok {
source = s
delete(mTags, "source")
} else {
sourceTagFound := false
for _, s := range w.SourceOverride {
for k, v := range mTags {
if k == s {
source = v
if mTags["host"] != "" {
mTags["telegraf_host"] = mTags["host"]
}
sourceTagFound = true
delete(mTags, k)
break
}
}
if sourceTagFound {
break
}
}
if !sourceTagFound {
source = mTags["host"]
}
}
source = tagValueReplacer.Replace(source)
// remove default host tag
delete(mTags, "host")
// sanitize tag keys and values
tags := make(map[string]string)
for k, v := range mTags {
var key string
if w.UseRegex {
key = sanitizedRegex.ReplaceAllLiteralString(k, "-")
} else {
key = serializers_wavefront.Sanitize(w.UseStrict, k)
}
val := tagValueReplacer.Replace(v)
if w.TruncateTags {
if len(key) > maxTagLength {
w.Log.Warnf("Tag key length > 254. Skipping tag: %s", key)
continue
}
if len(key)+len(val) > maxTagLength {
w.Log.Debugf("Key+value length > 254: %s", key)
val = val[:maxTagLength-len(key)]
}
}
tags[key] = val
}
return source, tags
}
func buildValue(v interface{}, name string, w *Wavefront) (float64, error) {
switch p := v.(type) {
case bool:
if w.ConvertBool {
if p {
return 1, nil
}
return 0, nil
}
case int64:
return float64(v.(int64)), nil
case uint64:
return float64(v.(uint64)), nil
case float64:
return v.(float64), nil
case string:
for prefix, mappings := range w.StringToNumber {
if strings.HasPrefix(name, prefix) {
for _, mapping := range mappings {
val, hasVal := mapping[p]
if hasVal {
return val, nil
}
}
}
}
return 0, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name)
default:
return 0, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name)
}
return 0, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name)
}
func (w *Wavefront) Close() error {
w.sender.Close()
return nil
}
func (w *Wavefront) makeAuthOptions() ([]wavefront.Option, error) {
if !w.Token.Empty() {
tsecret, err := w.Token.Get()
if err != nil {
return nil, fmt.Errorf("failed to parse token value: %w", err)
}
token := tsecret.String()
tsecret.Destroy()
return []wavefront.Option{
wavefront.APIToken(token),
}, nil
}
if !w.AuthCSPAPIToken.Empty() {
tsecret, err := w.AuthCSPAPIToken.Get()
if err != nil {
return nil, fmt.Errorf("failed to CSP API token value: %w", err)
}
apiToken := tsecret.String()
tsecret.Destroy()
return []wavefront.Option{
wavefront.CSPAPIToken(apiToken, wavefront.CSPBaseURL(w.CSPBaseURL)),
}, nil
}
if w.AuthCSPClientCredentials != nil {
appIDSecret, err := w.AuthCSPClientCredentials.AppID.Get()
if err != nil {
return nil, fmt.Errorf("failed to parse Client Credentials App ID value: %w", err)
}
appID := appIDSecret.String()
appIDSecret.Destroy()
appSecret, err := w.AuthCSPClientCredentials.AppSecret.Get()
if err != nil {
return nil, fmt.Errorf("failed to parse Client Credentials App Secret value: %w", err)
}
cspAppSecret := appSecret.String()
appSecret.Destroy()
options := []wavefront.CSPOption{
wavefront.CSPBaseURL(w.CSPBaseURL),
}
if w.AuthCSPClientCredentials.OrgID != nil {
options = append(options, wavefront.CSPOrgID(*w.AuthCSPClientCredentials.OrgID))
}
return []wavefront.Option{
wavefront.CSPClientCredentials(appID, cspAppSecret, options...),
}, nil
}
return nil, nil
}
func init() {
outputs.Add("wavefront", func() telegraf.Output {
return &Wavefront{
MetricSeparator: ".",
ConvertPaths: true,
ConvertBool: true,
TruncateTags: false,
ImmediateFlush: true,
SendInternalMetrics: true,
HTTPMaximumBatchSize: 10000,
HTTPClientConfig: common_http.HTTPClientConfig{Timeout: config.Duration(10 * time.Second)},
CSPBaseURL: "https://console.cloud.vmware.com",
}
})
}
// TODO: Currently there's no canonical way to exhaust all
// retryable/non-retryable errors from wavefront, so this implementation just
// handles known non-retryable errors in a case-by-case basis and assumes all
// other errors are retryable.
// A support ticket has been filed against wavefront to provide a canonical way
// to distinguish between retryable and non-retryable errors (link is not
// public).
func isRetryable(err error) bool {
if err != nil {
// "empty metric name" errors are non-retryable as retry will just keep
// getting the same error again and again.
if strings.Contains(err.Error(), "empty metric name") {
return false
}
}
return true
}