1
0
Fork 0
telegraf/plugins/serializers/carbon2/carbon2.go

120 lines
2.8 KiB
Go
Raw Permalink Normal View History

package carbon2
import (
"bytes"
"errors"
"fmt"
"strconv"
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/serializers"
)
const sanitizedChars = "!@#$%^&*()+`'\"[]{};<>,?/\\|="
type Serializer struct {
Format string `toml:"carbon2_format"`
SanitizeReplaceChar string `toml:"carbon2_sanitize_replace_char"`
Log telegraf.Logger `toml:"-"`
sanitizeReplacer *strings.Replacer
template string
}
func (s *Serializer) Init() error {
if s.SanitizeReplaceChar == "" {
s.SanitizeReplaceChar = ":"
}
if len(s.SanitizeReplaceChar) > 1 {
return errors.New("sanitize replace char has to be a singular character")
}
// Create replacer to replacing all characters requiring sanitization with the user-specified replacement
pairs := make([]string, 0, 2*len(sanitizedChars))
for _, c := range sanitizedChars {
pairs = append(pairs, string(c), s.SanitizeReplaceChar)
}
s.sanitizeReplacer = strings.NewReplacer(pairs...)
switch s.Format {
case "", "field_separate":
s.Format = "field_separate"
s.template = "metric=%s field=%s "
case "metric_includes_field":
s.template = "metric=%s_%s "
default:
return fmt.Errorf("unknown carbon2 format: %s", s.Format)
}
return nil
}
func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
return s.createObject(metric), nil
}
func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
var batch bytes.Buffer
for _, metric := range metrics {
batch.Write(s.createObject(metric))
}
return batch.Bytes(), nil
}
func (s *Serializer) createObject(metric telegraf.Metric) []byte {
var m bytes.Buffer
for fieldName, fieldValue := range metric.Fields() {
if _, ok := fieldValue.(string); ok {
continue
}
name := s.sanitizeReplacer.Replace(metric.Name())
var value string
if v, ok := fieldValue.(bool); ok {
if v {
value = "1"
} else {
value = "0"
}
} else {
var err error
value, err = internal.ToString(fieldValue)
if err != nil {
s.Log.Warnf("Cannot convert %v (%T) to string", fieldValue, fieldValue)
continue
}
}
m.WriteString(fmt.Sprintf(s.template, strings.ReplaceAll(name, " ", "_"), strings.ReplaceAll(fieldName, " ", "_")))
for _, tag := range metric.TagList() {
m.WriteString(strings.ReplaceAll(tag.Key, " ", "_"))
m.WriteString("=")
value := tag.Value
if len(value) == 0 {
value = "null"
}
m.WriteString(strings.ReplaceAll(value, " ", "_"))
m.WriteString(" ")
}
m.WriteString(" ")
m.WriteString(value)
m.WriteString(" ")
m.WriteString(strconv.FormatInt(metric.Time().Unix(), 10))
m.WriteString("\n")
}
return m.Bytes()
}
func init() {
serializers.Add("carbon2",
func() telegraf.Serializer {
return &Serializer{}
},
)
}