259 lines
5.8 KiB
Go
259 lines
5.8 KiB
Go
//go:generate ../../../tools/readme_config_includer/generator
|
|
package opentsdb
|
|
|
|
import (
|
|
_ "embed"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"net"
|
|
"net/url"
|
|
"regexp"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/plugins/outputs"
|
|
)
|
|
|
|
//go:embed sample.conf
|
|
var sampleConfig string
|
|
|
|
var (
|
|
allowedChars = regexp.MustCompile(`[^a-zA-Z0-9-_./\p{L}]`)
|
|
hyphenChars = strings.NewReplacer(
|
|
"@", "-",
|
|
"*", "-",
|
|
`%`, "-",
|
|
"#", "-",
|
|
"$", "-")
|
|
defaultHTTPPath = "/api/put"
|
|
defaultSeparator = "_"
|
|
)
|
|
|
|
type OpenTSDB struct {
|
|
Prefix string `toml:"prefix"`
|
|
|
|
Host string `toml:"host"`
|
|
Port int `toml:"port"`
|
|
|
|
HTTPBatchSize int `toml:"http_batch_size"`
|
|
HTTPPath string `toml:"http_path"`
|
|
|
|
Debug bool `toml:"debug"`
|
|
|
|
Separator string `toml:"separator"`
|
|
|
|
Log telegraf.Logger `toml:"-"`
|
|
}
|
|
|
|
func ToLineFormat(tags map[string]string) string {
|
|
tagsArray := make([]string, 0, len(tags))
|
|
for k, v := range tags {
|
|
tagsArray = append(tagsArray, fmt.Sprintf("%s=%s", k, v))
|
|
}
|
|
sort.Strings(tagsArray)
|
|
return strings.Join(tagsArray, " ")
|
|
}
|
|
|
|
func (*OpenTSDB) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
func (o *OpenTSDB) Connect() error {
|
|
if !strings.HasPrefix(o.Host, "http") && !strings.HasPrefix(o.Host, "tcp") {
|
|
o.Host = "tcp://" + o.Host
|
|
}
|
|
// Test Connection to OpenTSDB Server
|
|
u, err := url.Parse(o.Host)
|
|
if err != nil {
|
|
return fmt.Errorf("error in parsing host url: %w", err)
|
|
}
|
|
|
|
uri := fmt.Sprintf("%s:%d", u.Host, o.Port)
|
|
tcpAddr, err := net.ResolveTCPAddr("tcp", uri)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to resolve TCP address: %w", err)
|
|
}
|
|
connection, err := net.DialTCP("tcp", nil, tcpAddr)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to OpenTSDB: %w", err)
|
|
}
|
|
defer connection.Close()
|
|
return nil
|
|
}
|
|
|
|
func (o *OpenTSDB) Write(metrics []telegraf.Metric) error {
|
|
if len(metrics) == 0 {
|
|
return nil
|
|
}
|
|
|
|
u, err := url.Parse(o.Host)
|
|
if err != nil {
|
|
return fmt.Errorf("error in parsing host url: %w", err)
|
|
}
|
|
|
|
if u.Scheme == "" || u.Scheme == "tcp" {
|
|
return o.WriteTelnet(metrics, u)
|
|
} else if u.Scheme == "http" || u.Scheme == "https" {
|
|
return o.WriteHTTP(metrics, u)
|
|
}
|
|
return errors.New("unknown scheme in host parameter")
|
|
}
|
|
|
|
func (o *OpenTSDB) WriteHTTP(metrics []telegraf.Metric, u *url.URL) error {
|
|
http := openTSDBHttp{
|
|
Host: u.Host,
|
|
Port: o.Port,
|
|
Scheme: u.Scheme,
|
|
User: u.User,
|
|
BatchSize: o.HTTPBatchSize,
|
|
Path: o.HTTPPath,
|
|
Debug: o.Debug,
|
|
log: o.Log,
|
|
}
|
|
|
|
for _, m := range metrics {
|
|
now := m.Time().UnixNano() / 1000000000
|
|
tags := cleanTags(m.Tags())
|
|
|
|
for fieldName, value := range m.Fields() {
|
|
switch fv := value.(type) {
|
|
case int64:
|
|
case uint64:
|
|
case float64:
|
|
// JSON does not support these special values
|
|
if math.IsNaN(fv) || math.IsInf(fv, 0) {
|
|
continue
|
|
}
|
|
default:
|
|
o.Log.Debugf("OpenTSDB does not support metric value: [%s] of type [%T].", value, value)
|
|
continue
|
|
}
|
|
|
|
metric := &HTTPMetric{
|
|
Metric: sanitize(fmt.Sprintf("%s%s%s%s",
|
|
o.Prefix, m.Name(), o.Separator, fieldName)),
|
|
Tags: tags,
|
|
Timestamp: now,
|
|
Value: value,
|
|
}
|
|
|
|
if err := http.sendDataPoint(metric); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return http.flush()
|
|
}
|
|
|
|
func (o *OpenTSDB) WriteTelnet(metrics []telegraf.Metric, u *url.URL) error {
|
|
// Send Data with telnet / socket communication
|
|
uri := fmt.Sprintf("%s:%d", u.Host, o.Port)
|
|
tcpAddr, err := net.ResolveTCPAddr("tcp", uri)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to resolve TCP address: %w", err)
|
|
}
|
|
connection, err := net.DialTCP("tcp", nil, tcpAddr)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to OpenTSDB: %w", err)
|
|
}
|
|
defer connection.Close()
|
|
|
|
for _, m := range metrics {
|
|
now := m.Time().UnixNano() / 1000000000
|
|
tags := ToLineFormat(cleanTags(m.Tags()))
|
|
|
|
for fieldName, value := range m.Fields() {
|
|
switch fv := value.(type) {
|
|
case int64:
|
|
case uint64:
|
|
case float64:
|
|
// JSON does not support these special values
|
|
if math.IsNaN(fv) || math.IsInf(fv, 0) {
|
|
continue
|
|
}
|
|
default:
|
|
o.Log.Debugf("OpenTSDB does not support metric value: [%s] of type [%T].", value, value)
|
|
continue
|
|
}
|
|
|
|
metricValue, buildError := buildValue(value)
|
|
if buildError != nil {
|
|
o.Log.Errorf("OpenTSDB: %s", buildError.Error())
|
|
continue
|
|
}
|
|
|
|
messageLine := fmt.Sprintf("put %s %v %s %s\n",
|
|
sanitize(fmt.Sprintf("%s%s%s%s", o.Prefix, m.Name(), o.Separator, fieldName)),
|
|
now, metricValue, tags)
|
|
|
|
_, err = connection.Write([]byte(messageLine))
|
|
if err != nil {
|
|
return fmt.Errorf("telnet writing error: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func cleanTags(tags map[string]string) map[string]string {
|
|
tagSet := make(map[string]string, len(tags))
|
|
for k, v := range tags {
|
|
val := sanitize(v)
|
|
if val != "" {
|
|
tagSet[sanitize(k)] = val
|
|
}
|
|
}
|
|
return tagSet
|
|
}
|
|
|
|
func buildValue(v interface{}) (string, error) {
|
|
var retv string
|
|
switch p := v.(type) {
|
|
case int64:
|
|
retv = IntToString(p)
|
|
case uint64:
|
|
retv = UIntToString(p)
|
|
case float64:
|
|
retv = FloatToString(p)
|
|
default:
|
|
return retv, fmt.Errorf("unexpected type %T with value %v for OpenTSDB", v, v)
|
|
}
|
|
return retv, nil
|
|
}
|
|
|
|
func IntToString(inputNum int64) string {
|
|
return strconv.FormatInt(inputNum, 10)
|
|
}
|
|
|
|
func UIntToString(inputNum uint64) string {
|
|
return strconv.FormatUint(inputNum, 10)
|
|
}
|
|
|
|
func FloatToString(inputNum float64) string {
|
|
return strconv.FormatFloat(inputNum, 'f', 6, 64)
|
|
}
|
|
|
|
func (*OpenTSDB) Close() error {
|
|
return nil
|
|
}
|
|
|
|
func sanitize(value string) string {
|
|
// Apply special hyphenation rules to preserve backwards compatibility
|
|
value = hyphenChars.Replace(value)
|
|
// Replace any remaining illegal chars
|
|
return allowedChars.ReplaceAllLiteralString(value, "_")
|
|
}
|
|
|
|
func init() {
|
|
outputs.Add("opentsdb", func() telegraf.Output {
|
|
return &OpenTSDB{
|
|
HTTPPath: defaultHTTPPath,
|
|
Separator: defaultSeparator,
|
|
}
|
|
})
|
|
}
|