1
0
Fork 0
telegraf/plugins/outputs/opentsdb/opentsdb.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

259 lines
5.8 KiB
Go

//go:generate ../../../tools/readme_config_includer/generator
package opentsdb
import (
_ "embed"
"errors"
"fmt"
"math"
"net"
"net/url"
"regexp"
"sort"
"strconv"
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
)
//go:embed sample.conf
var sampleConfig string
var (
allowedChars = regexp.MustCompile(`[^a-zA-Z0-9-_./\p{L}]`)
hyphenChars = strings.NewReplacer(
"@", "-",
"*", "-",
`%`, "-",
"#", "-",
"$", "-")
defaultHTTPPath = "/api/put"
defaultSeparator = "_"
)
type OpenTSDB struct {
Prefix string `toml:"prefix"`
Host string `toml:"host"`
Port int `toml:"port"`
HTTPBatchSize int `toml:"http_batch_size"`
HTTPPath string `toml:"http_path"`
Debug bool `toml:"debug"`
Separator string `toml:"separator"`
Log telegraf.Logger `toml:"-"`
}
func ToLineFormat(tags map[string]string) string {
tagsArray := make([]string, 0, len(tags))
for k, v := range tags {
tagsArray = append(tagsArray, fmt.Sprintf("%s=%s", k, v))
}
sort.Strings(tagsArray)
return strings.Join(tagsArray, " ")
}
func (*OpenTSDB) SampleConfig() string {
return sampleConfig
}
func (o *OpenTSDB) Connect() error {
if !strings.HasPrefix(o.Host, "http") && !strings.HasPrefix(o.Host, "tcp") {
o.Host = "tcp://" + o.Host
}
// Test Connection to OpenTSDB Server
u, err := url.Parse(o.Host)
if err != nil {
return fmt.Errorf("error in parsing host url: %w", err)
}
uri := fmt.Sprintf("%s:%d", u.Host, o.Port)
tcpAddr, err := net.ResolveTCPAddr("tcp", uri)
if err != nil {
return fmt.Errorf("failed to resolve TCP address: %w", err)
}
connection, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
return fmt.Errorf("failed to connect to OpenTSDB: %w", err)
}
defer connection.Close()
return nil
}
func (o *OpenTSDB) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}
u, err := url.Parse(o.Host)
if err != nil {
return fmt.Errorf("error in parsing host url: %w", err)
}
if u.Scheme == "" || u.Scheme == "tcp" {
return o.WriteTelnet(metrics, u)
} else if u.Scheme == "http" || u.Scheme == "https" {
return o.WriteHTTP(metrics, u)
}
return errors.New("unknown scheme in host parameter")
}
func (o *OpenTSDB) WriteHTTP(metrics []telegraf.Metric, u *url.URL) error {
http := openTSDBHttp{
Host: u.Host,
Port: o.Port,
Scheme: u.Scheme,
User: u.User,
BatchSize: o.HTTPBatchSize,
Path: o.HTTPPath,
Debug: o.Debug,
log: o.Log,
}
for _, m := range metrics {
now := m.Time().UnixNano() / 1000000000
tags := cleanTags(m.Tags())
for fieldName, value := range m.Fields() {
switch fv := value.(type) {
case int64:
case uint64:
case float64:
// JSON does not support these special values
if math.IsNaN(fv) || math.IsInf(fv, 0) {
continue
}
default:
o.Log.Debugf("OpenTSDB does not support metric value: [%s] of type [%T].", value, value)
continue
}
metric := &HTTPMetric{
Metric: sanitize(fmt.Sprintf("%s%s%s%s",
o.Prefix, m.Name(), o.Separator, fieldName)),
Tags: tags,
Timestamp: now,
Value: value,
}
if err := http.sendDataPoint(metric); err != nil {
return err
}
}
}
return http.flush()
}
func (o *OpenTSDB) WriteTelnet(metrics []telegraf.Metric, u *url.URL) error {
// Send Data with telnet / socket communication
uri := fmt.Sprintf("%s:%d", u.Host, o.Port)
tcpAddr, err := net.ResolveTCPAddr("tcp", uri)
if err != nil {
return fmt.Errorf("failed to resolve TCP address: %w", err)
}
connection, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
return fmt.Errorf("failed to connect to OpenTSDB: %w", err)
}
defer connection.Close()
for _, m := range metrics {
now := m.Time().UnixNano() / 1000000000
tags := ToLineFormat(cleanTags(m.Tags()))
for fieldName, value := range m.Fields() {
switch fv := value.(type) {
case int64:
case uint64:
case float64:
// JSON does not support these special values
if math.IsNaN(fv) || math.IsInf(fv, 0) {
continue
}
default:
o.Log.Debugf("OpenTSDB does not support metric value: [%s] of type [%T].", value, value)
continue
}
metricValue, buildError := buildValue(value)
if buildError != nil {
o.Log.Errorf("OpenTSDB: %s", buildError.Error())
continue
}
messageLine := fmt.Sprintf("put %s %v %s %s\n",
sanitize(fmt.Sprintf("%s%s%s%s", o.Prefix, m.Name(), o.Separator, fieldName)),
now, metricValue, tags)
_, err = connection.Write([]byte(messageLine))
if err != nil {
return fmt.Errorf("telnet writing error: %w", err)
}
}
}
return nil
}
func cleanTags(tags map[string]string) map[string]string {
tagSet := make(map[string]string, len(tags))
for k, v := range tags {
val := sanitize(v)
if val != "" {
tagSet[sanitize(k)] = val
}
}
return tagSet
}
func buildValue(v interface{}) (string, error) {
var retv string
switch p := v.(type) {
case int64:
retv = IntToString(p)
case uint64:
retv = UIntToString(p)
case float64:
retv = FloatToString(p)
default:
return retv, fmt.Errorf("unexpected type %T with value %v for OpenTSDB", v, v)
}
return retv, nil
}
func IntToString(inputNum int64) string {
return strconv.FormatInt(inputNum, 10)
}
func UIntToString(inputNum uint64) string {
return strconv.FormatUint(inputNum, 10)
}
func FloatToString(inputNum float64) string {
return strconv.FormatFloat(inputNum, 'f', 6, 64)
}
func (*OpenTSDB) Close() error {
return nil
}
func sanitize(value string) string {
// Apply special hyphenation rules to preserve backwards compatibility
value = hyphenChars.Replace(value)
// Replace any remaining illegal chars
return allowedChars.ReplaceAllLiteralString(value, "_")
}
func init() {
outputs.Add("opentsdb", func() telegraf.Output {
return &OpenTSDB{
HTTPPath: defaultHTTPPath,
Separator: defaultSeparator,
}
})
}