1
0
Fork 0
telegraf/plugins/outputs/librato/librato.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

254 lines
6.1 KiB
Go

//go:generate ../../../tools/readme_config_includer/generator
package librato
import (
"bytes"
_ "embed"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"regexp"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers/graphite"
)
//go:embed sample.conf
var sampleConfig string
// Librato structure for configuration and client
type Librato struct {
APIUser config.Secret `toml:"api_user"`
APIToken config.Secret `toml:"api_token"`
Debug bool `toml:"debug"`
SourceTag string `toml:"source_tag" deprecated:"1.0.0;1.35.0;use 'template' instead"`
Timeout config.Duration `toml:"timeout"`
Template string `toml:"template"`
Log telegraf.Logger `toml:"-"`
APIUrl string
client *http.Client
}
// https://www.librato.com/docs/kb/faq/best_practices/naming_convention_metrics_sources.html#naming-limitations-for-sources-and-metrics
var reUnacceptedChar = regexp.MustCompile("[^.a-zA-Z0-9_-]")
// LMetrics is the default struct for Librato's API format
type LMetrics struct {
Gauges []*Gauge `json:"gauges"`
}
// Gauge is the gauge format for Librato's API format
type Gauge struct {
Name string `json:"name"`
Value float64 `json:"value"`
Source string `json:"source"`
MeasureTime int64 `json:"measure_time"`
}
const libratoAPI = "https://metrics-api.librato.com/v1/metrics"
// NewLibrato is the main constructor for librato output plugins
func NewLibrato(apiURL string) *Librato {
return &Librato{
APIUrl: apiURL,
Template: "host",
}
}
func (*Librato) SampleConfig() string {
return sampleConfig
}
// Connect is the default output plugin connection function who make sure it
// can connect to the endpoint
func (l *Librato) Connect() error {
if l.APIUser.Empty() || l.APIToken.Empty() {
return errors.New("api_user and api_token required")
}
l.client = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
},
Timeout: time.Duration(l.Timeout),
}
return nil
}
func (l *Librato) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}
if l.Template == "" {
l.Template = "host"
}
if l.SourceTag != "" {
l.Template = l.SourceTag
}
var tempGauges []*Gauge
for _, m := range metrics {
if gauges, err := l.buildGauges(m); err == nil {
for _, gauge := range gauges {
tempGauges = append(tempGauges, gauge)
l.Log.Debugf("Got a gauge: %v", gauge)
}
} else {
l.Log.Infof("Unable to build Gauge for %s, skipping", m.Name())
l.Log.Debugf("Couldn't build gauge: %v", err)
}
}
metricCounter := len(tempGauges)
// make sure we send a batch of maximum 300
sizeBatch := 300
for start := 0; start < metricCounter; start += sizeBatch {
err := l.writeBatch(start, sizeBatch, metricCounter, tempGauges)
if err != nil {
return err
}
}
return nil
}
func (l *Librato) writeBatch(start, sizeBatch, metricCounter int, tempGauges []*Gauge) error {
lmetrics := LMetrics{}
end := start + sizeBatch
if end > metricCounter {
end = metricCounter
sizeBatch = end - start
}
lmetrics.Gauges = make([]*Gauge, sizeBatch)
copy(lmetrics.Gauges, tempGauges[start:end])
metricsBytes, err := json.Marshal(lmetrics)
if err != nil {
return fmt.Errorf("unable to marshal Metrics: %w", err)
}
l.Log.Debugf("Librato request: %v", string(metricsBytes))
req, err := http.NewRequest(
"POST",
l.APIUrl,
bytes.NewBuffer(metricsBytes))
if err != nil {
return fmt.Errorf("unable to create http.Request: %w", err)
}
req.Header.Add("Content-Type", "application/json")
user, err := l.APIUser.Get()
if err != nil {
return fmt.Errorf("getting user failed: %w", err)
}
token, err := l.APIToken.Get()
if err != nil {
user.Destroy()
return fmt.Errorf("getting token failed: %w", err)
}
req.SetBasicAuth(user.String(), token.String())
user.Destroy()
token.Destroy()
resp, err := l.client.Do(req)
if err != nil {
l.Log.Debugf("Error POSTing metrics: %v", err.Error())
return fmt.Errorf("error POSTing metrics: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 || l.Debug {
htmlData, err := io.ReadAll(resp.Body)
if err != nil {
l.Log.Debugf("Couldn't get response! (%v)", err)
}
if resp.StatusCode != 200 {
return fmt.Errorf(
"received bad status code, %d\n %s",
resp.StatusCode,
string(htmlData))
}
l.Log.Debugf("Librato response: %v", string(htmlData))
}
return nil
}
func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
if m.Time().Unix() == 0 {
return nil, fmt.Errorf("time was zero %s", m.Name())
}
metricSource := graphite.InsertField(graphite.SerializeBucketName("", m.Tags(), l.Template, ""), "value")
if metricSource == "" {
return nil, fmt.Errorf("undeterminable Source type from Field, %s", l.Template)
}
gauges := make([]*Gauge, 0, len(m.Fields()))
for fieldName, value := range m.Fields() {
metricName := m.Name()
if fieldName != "value" {
metricName = fmt.Sprintf("%s.%s", m.Name(), fieldName)
}
gauge := &Gauge{
Source: reUnacceptedChar.ReplaceAllString(metricSource, "-"),
Name: reUnacceptedChar.ReplaceAllString(metricName, "-"),
MeasureTime: m.Time().Unix(),
}
if !verifyValue(value) {
continue
}
if err := gauge.setValue(value); err != nil {
return nil, fmt.Errorf("unable to extract value from Fields: %w", err)
}
gauges = append(gauges, gauge)
}
l.Log.Debugf("Built gauges: %v", gauges)
return gauges, nil
}
func verifyValue(v interface{}) bool {
switch v.(type) {
case string:
return false
default:
return true
}
}
func (g *Gauge) setValue(v interface{}) error {
switch d := v.(type) {
case int64:
g.Value = float64(d)
case uint64:
g.Value = float64(d)
case float64:
g.Value = d
case bool:
if d {
g.Value = float64(1.0)
} else {
g.Value = float64(0.0)
}
default:
return fmt.Errorf("undeterminable type %+v", d)
}
return nil
}
// Close is used to close the connection to librato Output
func (*Librato) Close() error {
return nil
}
func init() {
outputs.Add("librato", func() telegraf.Output {
return NewLibrato(libratoAPI)
})
}