407 lines
12 KiB
Go
407 lines
12 KiB
Go
//go:generate ../../../tools/readme_config_includer/generator
|
|
package wavefront
|
|
|
|
import (
|
|
"context"
|
|
_ "embed"
|
|
"errors"
|
|
"fmt"
|
|
"net/url"
|
|
"regexp"
|
|
"strings"
|
|
"time"
|
|
|
|
wavefront "github.com/wavefronthq/wavefront-sdk-go/senders"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/config"
|
|
common_http "github.com/influxdata/telegraf/plugins/common/http"
|
|
"github.com/influxdata/telegraf/plugins/outputs"
|
|
serializers_wavefront "github.com/influxdata/telegraf/plugins/serializers/wavefront"
|
|
)
|
|
|
|
//go:embed sample.conf
|
|
var sampleConfig string
|
|
|
|
const maxTagLength = 254
|
|
|
|
type authCSPClientCredentials struct {
|
|
AppID config.Secret `toml:"app_id"`
|
|
AppSecret config.Secret `toml:"app_secret"`
|
|
OrgID *string `toml:"org_id"`
|
|
}
|
|
|
|
type Wavefront struct {
|
|
URL string `toml:"url"`
|
|
Token config.Secret `toml:"token"`
|
|
CSPBaseURL string `toml:"auth_csp_base_url"`
|
|
AuthCSPAPIToken config.Secret `toml:"auth_csp_api_token"`
|
|
AuthCSPClientCredentials *authCSPClientCredentials `toml:"auth_csp_client_credentials"`
|
|
Host string `toml:"host" deprecated:"1.28.0;1.35.0;use url instead"`
|
|
Port int `toml:"port" deprecated:"1.28.0;1.35.0;use url instead"`
|
|
Prefix string `toml:"prefix"`
|
|
SimpleFields bool `toml:"simple_fields"`
|
|
MetricSeparator string `toml:"metric_separator"`
|
|
ConvertPaths bool `toml:"convert_paths"`
|
|
ConvertBool bool `toml:"convert_bool"`
|
|
HTTPMaximumBatchSize int `toml:"http_maximum_batch_size"`
|
|
UseRegex bool `toml:"use_regex"`
|
|
UseStrict bool `toml:"use_strict"`
|
|
TruncateTags bool `toml:"truncate_tags"`
|
|
ImmediateFlush bool `toml:"immediate_flush"`
|
|
SendInternalMetrics bool `toml:"send_internal_metrics"`
|
|
SourceOverride []string `toml:"source_override"`
|
|
StringToNumber map[string][]map[string]float64 `toml:"string_to_number" deprecated:"1.9.0;1.35.0;use the enum processor instead"`
|
|
|
|
common_http.HTTPClientConfig
|
|
|
|
sender wavefront.Sender
|
|
Log telegraf.Logger `toml:"-"`
|
|
}
|
|
|
|
// instead of Sanitize which may miss some special characters we can use a regex pattern, but this is significantly slower than Sanitize
|
|
var sanitizedRegex = regexp.MustCompile(`[^a-zA-Z\d_.-]`)
|
|
|
|
var tagValueReplacer = strings.NewReplacer("*", "-")
|
|
|
|
var pathReplacer = strings.NewReplacer("_", "_")
|
|
|
|
func (*Wavefront) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
func (w *Wavefront) parseConnectionURL() (string, error) {
|
|
if w.URL == "" {
|
|
if w.Host == "" || w.Port <= 0 {
|
|
return "", errors.New("no URL specified")
|
|
}
|
|
generatedURL := fmt.Sprintf("http://%s:%d", w.Host, w.Port)
|
|
w.Log.Warnf("translating host/port into url: %s\n", generatedURL)
|
|
return generatedURL, nil
|
|
}
|
|
|
|
u, err := url.ParseRequestURI(w.URL)
|
|
if err != nil {
|
|
return "", fmt.Errorf("could not parse the provided URL: %s", w.URL)
|
|
}
|
|
|
|
return u.String(), nil
|
|
}
|
|
|
|
func (w *Wavefront) createSender(connectionURL string, flushSeconds int) (wavefront.Sender, error) {
|
|
client, err := w.CreateClient(context.Background(), w.Log)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
options := []wavefront.Option{
|
|
wavefront.BatchSize(w.HTTPMaximumBatchSize),
|
|
wavefront.FlushIntervalSeconds(flushSeconds),
|
|
wavefront.HTTPClient(client),
|
|
wavefront.SendInternalMetrics(w.SendInternalMetrics),
|
|
}
|
|
|
|
authOptions, err := w.makeAuthOptions()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
options = append(options, authOptions...)
|
|
|
|
return wavefront.NewSender(connectionURL, options...)
|
|
}
|
|
|
|
func (w *Wavefront) Connect() error {
|
|
flushSeconds := 5
|
|
if w.ImmediateFlush {
|
|
flushSeconds = 86400 // Set a very long flush interval if we're flushing directly
|
|
}
|
|
connectionURL, err := w.parseConnectionURL()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
sender, err := w.createSender(connectionURL, flushSeconds)
|
|
|
|
if err != nil {
|
|
return errors.New("could not create Wavefront Sender for the provided url")
|
|
}
|
|
|
|
w.sender = sender
|
|
|
|
if w.ConvertPaths && w.MetricSeparator == "_" {
|
|
w.ConvertPaths = false
|
|
}
|
|
if w.ConvertPaths {
|
|
pathReplacer = strings.NewReplacer("_", w.MetricSeparator)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (w *Wavefront) Write(metrics []telegraf.Metric) error {
|
|
for _, m := range metrics {
|
|
for _, point := range w.buildMetrics(m) {
|
|
err := w.sender.SendMetric(point.Metric, point.Value, point.Timestamp, point.Source, point.Tags)
|
|
if err != nil {
|
|
if isRetryable(err) {
|
|
// The internal buffer in the Wavefront SDK is full. To prevent data loss,
|
|
// we flush the buffer (which is a blocking operation) and try again.
|
|
w.Log.Debug("SDK buffer overrun, forcibly flushing the buffer")
|
|
if err = w.sender.Flush(); err != nil {
|
|
return fmt.Errorf("wavefront flushing error: %w", err)
|
|
}
|
|
// Try again.
|
|
err = w.sender.SendMetric(point.Metric, point.Value, point.Timestamp, point.Source, point.Tags)
|
|
if err != nil {
|
|
if isRetryable(err) {
|
|
return fmt.Errorf("wavefront sending error: %w", err)
|
|
}
|
|
}
|
|
}
|
|
w.Log.Errorf("Non-retryable error during Wavefront.Write: %v", err)
|
|
w.Log.Debugf("Non-retryable metric data: %+v", point)
|
|
}
|
|
}
|
|
}
|
|
if w.ImmediateFlush {
|
|
w.Log.Debugf("Flushing batch of %d points", len(metrics))
|
|
return w.sender.Flush()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (w *Wavefront) buildMetrics(m telegraf.Metric) []*serializers_wavefront.MetricPoint {
|
|
ret := make([]*serializers_wavefront.MetricPoint, 0)
|
|
|
|
for fieldName, value := range m.Fields() {
|
|
var name string
|
|
if !w.SimpleFields && fieldName == "value" {
|
|
name = fmt.Sprintf("%s%s", w.Prefix, m.Name())
|
|
} else {
|
|
name = fmt.Sprintf("%s%s%s%s", w.Prefix, m.Name(), w.MetricSeparator, fieldName)
|
|
}
|
|
|
|
if w.UseRegex {
|
|
name = sanitizedRegex.ReplaceAllLiteralString(name, "-")
|
|
} else {
|
|
name = serializers_wavefront.Sanitize(w.UseStrict, name)
|
|
}
|
|
|
|
if w.ConvertPaths {
|
|
name = pathReplacer.Replace(name)
|
|
}
|
|
|
|
metric := &serializers_wavefront.MetricPoint{
|
|
Metric: name,
|
|
Timestamp: m.Time().Unix(),
|
|
}
|
|
|
|
metricValue, buildError := buildValue(value, metric.Metric, w)
|
|
if buildError != nil {
|
|
w.Log.Debugf("Error building tags: %s\n", buildError.Error())
|
|
continue
|
|
}
|
|
metric.Value = metricValue
|
|
|
|
source, tags := w.buildTags(m.Tags())
|
|
metric.Source = source
|
|
metric.Tags = tags
|
|
|
|
ret = append(ret, metric)
|
|
}
|
|
return ret
|
|
}
|
|
|
|
func (w *Wavefront) buildTags(mTags map[string]string) (string, map[string]string) {
|
|
// Remove all empty tags.
|
|
for k, v := range mTags {
|
|
if v == "" {
|
|
delete(mTags, k)
|
|
}
|
|
}
|
|
|
|
// find source, use source_override property if needed
|
|
var source string
|
|
if s, ok := mTags["source"]; ok {
|
|
source = s
|
|
delete(mTags, "source")
|
|
} else {
|
|
sourceTagFound := false
|
|
for _, s := range w.SourceOverride {
|
|
for k, v := range mTags {
|
|
if k == s {
|
|
source = v
|
|
if mTags["host"] != "" {
|
|
mTags["telegraf_host"] = mTags["host"]
|
|
}
|
|
|
|
sourceTagFound = true
|
|
delete(mTags, k)
|
|
break
|
|
}
|
|
}
|
|
if sourceTagFound {
|
|
break
|
|
}
|
|
}
|
|
|
|
if !sourceTagFound {
|
|
source = mTags["host"]
|
|
}
|
|
}
|
|
source = tagValueReplacer.Replace(source)
|
|
|
|
// remove default host tag
|
|
delete(mTags, "host")
|
|
|
|
// sanitize tag keys and values
|
|
tags := make(map[string]string)
|
|
for k, v := range mTags {
|
|
var key string
|
|
if w.UseRegex {
|
|
key = sanitizedRegex.ReplaceAllLiteralString(k, "-")
|
|
} else {
|
|
key = serializers_wavefront.Sanitize(w.UseStrict, k)
|
|
}
|
|
val := tagValueReplacer.Replace(v)
|
|
if w.TruncateTags {
|
|
if len(key) > maxTagLength {
|
|
w.Log.Warnf("Tag key length > 254. Skipping tag: %s", key)
|
|
continue
|
|
}
|
|
if len(key)+len(val) > maxTagLength {
|
|
w.Log.Debugf("Key+value length > 254: %s", key)
|
|
val = val[:maxTagLength-len(key)]
|
|
}
|
|
}
|
|
tags[key] = val
|
|
}
|
|
|
|
return source, tags
|
|
}
|
|
|
|
func buildValue(v interface{}, name string, w *Wavefront) (float64, error) {
|
|
switch p := v.(type) {
|
|
case bool:
|
|
if w.ConvertBool {
|
|
if p {
|
|
return 1, nil
|
|
}
|
|
return 0, nil
|
|
}
|
|
case int64:
|
|
return float64(v.(int64)), nil
|
|
case uint64:
|
|
return float64(v.(uint64)), nil
|
|
case float64:
|
|
return v.(float64), nil
|
|
case string:
|
|
for prefix, mappings := range w.StringToNumber {
|
|
if strings.HasPrefix(name, prefix) {
|
|
for _, mapping := range mappings {
|
|
val, hasVal := mapping[p]
|
|
if hasVal {
|
|
return val, nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return 0, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name)
|
|
default:
|
|
return 0, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name)
|
|
}
|
|
return 0, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name)
|
|
}
|
|
|
|
func (w *Wavefront) Close() error {
|
|
w.sender.Close()
|
|
return nil
|
|
}
|
|
|
|
func (w *Wavefront) makeAuthOptions() ([]wavefront.Option, error) {
|
|
if !w.Token.Empty() {
|
|
tsecret, err := w.Token.Get()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to parse token value: %w", err)
|
|
}
|
|
token := tsecret.String()
|
|
tsecret.Destroy()
|
|
|
|
return []wavefront.Option{
|
|
wavefront.APIToken(token),
|
|
}, nil
|
|
}
|
|
|
|
if !w.AuthCSPAPIToken.Empty() {
|
|
tsecret, err := w.AuthCSPAPIToken.Get()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to CSP API token value: %w", err)
|
|
}
|
|
apiToken := tsecret.String()
|
|
tsecret.Destroy()
|
|
return []wavefront.Option{
|
|
wavefront.CSPAPIToken(apiToken, wavefront.CSPBaseURL(w.CSPBaseURL)),
|
|
}, nil
|
|
}
|
|
|
|
if w.AuthCSPClientCredentials != nil {
|
|
appIDSecret, err := w.AuthCSPClientCredentials.AppID.Get()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to parse Client Credentials App ID value: %w", err)
|
|
}
|
|
appID := appIDSecret.String()
|
|
appIDSecret.Destroy()
|
|
|
|
appSecret, err := w.AuthCSPClientCredentials.AppSecret.Get()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to parse Client Credentials App Secret value: %w", err)
|
|
}
|
|
cspAppSecret := appSecret.String()
|
|
appSecret.Destroy()
|
|
|
|
options := []wavefront.CSPOption{
|
|
wavefront.CSPBaseURL(w.CSPBaseURL),
|
|
}
|
|
|
|
if w.AuthCSPClientCredentials.OrgID != nil {
|
|
options = append(options, wavefront.CSPOrgID(*w.AuthCSPClientCredentials.OrgID))
|
|
}
|
|
return []wavefront.Option{
|
|
wavefront.CSPClientCredentials(appID, cspAppSecret, options...),
|
|
}, nil
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
func init() {
|
|
outputs.Add("wavefront", func() telegraf.Output {
|
|
return &Wavefront{
|
|
MetricSeparator: ".",
|
|
ConvertPaths: true,
|
|
ConvertBool: true,
|
|
TruncateTags: false,
|
|
ImmediateFlush: true,
|
|
SendInternalMetrics: true,
|
|
HTTPMaximumBatchSize: 10000,
|
|
HTTPClientConfig: common_http.HTTPClientConfig{Timeout: config.Duration(10 * time.Second)},
|
|
CSPBaseURL: "https://console.cloud.vmware.com",
|
|
}
|
|
})
|
|
}
|
|
|
|
// TODO: Currently there's no canonical way to exhaust all
|
|
// retryable/non-retryable errors from wavefront, so this implementation just
|
|
// handles known non-retryable errors in a case-by-case basis and assumes all
|
|
// other errors are retryable.
|
|
// A support ticket has been filed against wavefront to provide a canonical way
|
|
// to distinguish between retryable and non-retryable errors (link is not
|
|
// public).
|
|
func isRetryable(err error) bool {
|
|
if err != nil {
|
|
// "empty metric name" errors are non-retryable as retry will just keep
|
|
// getting the same error again and again.
|
|
if strings.Contains(err.Error(), "empty metric name") {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|