//go:generate ../../../tools/readme_config_includer/generator package warp10 import ( "bytes" _ "embed" "errors" "fmt" "io" "math" "net/http" "net/url" "sort" "strconv" "strings" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/outputs" ) //go:embed sample.conf var sampleConfig string const ( defaultClientTimeout = 15 * time.Second ) // Warp10 output plugin type Warp10 struct { Prefix string `toml:"prefix"` WarpURL string `toml:"warp_url"` Token config.Secret `toml:"token"` Timeout config.Duration `toml:"timeout"` PrintErrorBody bool `toml:"print_error_body"` MaxStringErrorSize int `toml:"max_string_error_size"` client *http.Client tls.ClientConfig Log telegraf.Logger `toml:"-"` } // MetricLine Warp 10 metrics type MetricLine struct { Metric string Timestamp int64 Value string Tags string } func (w *Warp10) createClient() (*http.Client, error) { tlsCfg, err := w.ClientConfig.TLSConfig() if err != nil { return nil, err } if w.Timeout == 0 { w.Timeout = config.Duration(defaultClientTimeout) } client := &http.Client{ Transport: &http.Transport{ TLSClientConfig: tlsCfg, Proxy: http.ProxyFromEnvironment, }, Timeout: time.Duration(w.Timeout), } return client, nil } func (*Warp10) SampleConfig() string { return sampleConfig } // Connect to warp10 func (w *Warp10) Connect() error { client, err := w.createClient() if err != nil { return err } w.client = client return nil } // GenWarp10Payload compute Warp 10 metrics payload func (w *Warp10) GenWarp10Payload(metrics []telegraf.Metric) string { collectString := make([]string, 0) for _, mm := range metrics { for _, field := range mm.FieldList() { metric := &MetricLine{ Metric: fmt.Sprintf("%s%s", w.Prefix, mm.Name()+"."+field.Key), Timestamp: mm.Time().UnixNano() / 1000, } metricValue, err := buildValue(field.Value) if err != nil { w.Log.Errorf("Could not encode value: %v", err) continue } metric.Value = metricValue tagsSlice := buildTags(mm.TagList()) metric.Tags = strings.Join(tagsSlice, ",") messageLine := fmt.Sprintf("%d// %s{%s} %s\n", metric.Timestamp, metric.Metric, metric.Tags, metric.Value) collectString = append(collectString, messageLine) } } return strings.Join(collectString, "") } // Write metrics to Warp10 func (w *Warp10) Write(metrics []telegraf.Metric) error { payload := w.GenWarp10Payload(metrics) if payload == "" { return nil } addr := w.WarpURL + "/api/v0/update" req, err := http.NewRequest("POST", addr, bytes.NewBufferString(payload)) if err != nil { return fmt.Errorf("unable to create new request %q: %w", addr, err) } req.Header.Set("Content-Type", "text/plain") token, err := w.Token.Get() if err != nil { return fmt.Errorf("getting token failed: %w", err) } req.Header.Set("X-Warp10-Token", token.String()) token.Destroy() resp, err := w.client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { if w.PrintErrorBody { //nolint:errcheck // err can be ignored since it is just for logging body, _ := io.ReadAll(resp.Body) return errors.New(w.WarpURL + ": " + HandleError(string(body), w.MaxStringErrorSize)) } if len(resp.Status) < w.MaxStringErrorSize { return errors.New(w.WarpURL + ": " + resp.Status) } return errors.New(w.WarpURL + ": " + resp.Status[0:w.MaxStringErrorSize]) } return nil } func buildTags(tags []*telegraf.Tag) []string { tagsString := make([]string, 0, len(tags)+1) for _, tag := range tags { key := url.QueryEscape(tag.Key) value := url.QueryEscape(tag.Value) tagsString = append(tagsString, fmt.Sprintf("%s=%s", key, value)) } tagsString = append(tagsString, "source=telegraf") sort.Strings(tagsString) return tagsString } func buildValue(v interface{}) (string, error) { var retv string switch p := v.(type) { case int64: retv = intToString(p) case string: retv = fmt.Sprintf("'%s'", strings.ReplaceAll(p, "'", "\\'")) case bool: retv = boolToString(p) case uint64: if p <= uint64(math.MaxInt64) { retv = strconv.FormatInt(int64(p), 10) } else { retv = strconv.FormatInt(math.MaxInt64, 10) } case float64: retv = floatToString(p) default: return "", fmt.Errorf("unsupported type: %T", v) } return retv, nil } func intToString(inputNum int64) string { return strconv.FormatInt(inputNum, 10) } func boolToString(inputBool bool) string { return strconv.FormatBool(inputBool) } /* Warp10 supports Infinity/-Infinity/NaN <' // class{label=value} 42.0 0// class-1{label=value}{attribute=value} 42 =1// Infinity '> PARSE <' // class{label=value} 42.0 0// class-1{label=value}{attribute=value} 42 =1// -Infinity '> PARSE <' // class{label=value} 42.0 0// class-1{label=value}{attribute=value} 42 =1// NaN '> PARSE */ func floatToString(inputNum float64) string { switch { case math.IsNaN(inputNum): return "NaN" case math.IsInf(inputNum, -1): return "-Infinity" case math.IsInf(inputNum, 1): return "Infinity" } return strconv.FormatFloat(inputNum, 'f', 6, 64) } // Close close func (*Warp10) Close() error { return nil } // Init Warp10 struct func (w *Warp10) Init() error { if w.MaxStringErrorSize <= 0 { w.MaxStringErrorSize = 511 } return nil } func init() { outputs.Add("warp10", func() telegraf.Output { return &Warp10{} }) } // HandleError read http error body and return a corresponding error func HandleError(body string, maxStringSize int) string { if body == "" { return "Empty return" } if strings.Contains(body, "Invalid token") { return "Invalid token" } if strings.Contains(body, "Write token missing") { return "Write token missing" } if strings.Contains(body, "Token Expired") { return "Token Expired" } if strings.Contains(body, "Token revoked") { return "Token revoked" } if strings.Contains(body, "exceed your Monthly Active Data Streams limit") || strings.Contains(body, "exceed the Monthly Active Data Streams limit") { return "Exceeded Monthly Active Data Streams limit" } if strings.Contains(body, "Daily Data Points limit being already exceeded") { return "Exceeded Daily Data Points limit" } if strings.Contains(body, "Application suspended or closed") { return "Application suspended or closed" } if strings.Contains(body, "broken pipe") { return "broken pipe" } if len(body) < maxStringSize { return body } return body[0:maxStringSize] }