396 lines
10 KiB
Go
396 lines
10 KiB
Go
//go:generate ../../../tools/readme_config_includer/generator
|
|
package converter
|
|
|
|
import (
|
|
_ "embed"
|
|
"encoding/base64"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"math/big"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/filter"
|
|
"github.com/influxdata/telegraf/internal"
|
|
"github.com/influxdata/telegraf/plugins/processors"
|
|
)
|
|
|
|
//go:embed sample.conf
|
|
var sampleConfig string
|
|
|
|
type Conversion struct {
|
|
Measurement []string `toml:"measurement"`
|
|
Tag []string `toml:"tag"`
|
|
String []string `toml:"string"`
|
|
Integer []string `toml:"integer"`
|
|
Unsigned []string `toml:"unsigned"`
|
|
Boolean []string `toml:"boolean"`
|
|
Float []string `toml:"float"`
|
|
Timestamp []string `toml:"timestamp"`
|
|
TimestampFormat string `toml:"timestamp_format"`
|
|
Base64IEEEFloat32 []string `toml:"base64_ieee_float32"`
|
|
}
|
|
|
|
type Converter struct {
|
|
Tags *Conversion `toml:"tags"`
|
|
Fields *Conversion `toml:"fields"`
|
|
Log telegraf.Logger `toml:"-"`
|
|
|
|
tagConversions *ConversionFilter
|
|
fieldConversions *ConversionFilter
|
|
}
|
|
|
|
type ConversionFilter struct {
|
|
Measurement filter.Filter
|
|
Tag filter.Filter
|
|
String filter.Filter
|
|
Integer filter.Filter
|
|
Unsigned filter.Filter
|
|
Boolean filter.Filter
|
|
Float filter.Filter
|
|
Timestamp filter.Filter
|
|
Base64IEEEFloat32 filter.Filter
|
|
}
|
|
|
|
func (*Converter) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
func (p *Converter) Init() error {
|
|
return p.compile()
|
|
}
|
|
|
|
func (p *Converter) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
|
|
for _, metric := range metrics {
|
|
p.convertTags(metric)
|
|
p.convertFields(metric)
|
|
}
|
|
return metrics
|
|
}
|
|
|
|
func (p *Converter) compile() error {
|
|
tf, err := compileFilter(p.Tags)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ff, err := compileFilter(p.Fields)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if tf == nil && ff == nil {
|
|
return errors.New("no filters found")
|
|
}
|
|
|
|
p.tagConversions = tf
|
|
p.fieldConversions = ff
|
|
return nil
|
|
}
|
|
|
|
func compileFilter(conv *Conversion) (*ConversionFilter, error) {
|
|
if conv == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
var err error
|
|
cf := &ConversionFilter{}
|
|
cf.Measurement, err = filter.Compile(conv.Measurement)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cf.Tag, err = filter.Compile(conv.Tag)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cf.String, err = filter.Compile(conv.String)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cf.Integer, err = filter.Compile(conv.Integer)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cf.Unsigned, err = filter.Compile(conv.Unsigned)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cf.Boolean, err = filter.Compile(conv.Boolean)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cf.Float, err = filter.Compile(conv.Float)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cf.Timestamp, err = filter.Compile(conv.Timestamp)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cf.Base64IEEEFloat32, err = filter.Compile(conv.Base64IEEEFloat32)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return cf, nil
|
|
}
|
|
|
|
// convertTags converts tags into measurements or fields.
|
|
func (p *Converter) convertTags(metric telegraf.Metric) {
|
|
if p.tagConversions == nil {
|
|
return
|
|
}
|
|
|
|
for key, value := range metric.Tags() {
|
|
switch {
|
|
case p.tagConversions.Measurement != nil && p.tagConversions.Measurement.Match(key):
|
|
metric.SetName(value)
|
|
case p.tagConversions.String != nil && p.tagConversions.String.Match(key):
|
|
metric.AddField(key, value)
|
|
case p.tagConversions.Integer != nil && p.tagConversions.Integer.Match(key):
|
|
if v, err := toInteger(value); err != nil {
|
|
p.Log.Errorf("Converting to integer [%T] failed: %v", value, err)
|
|
} else {
|
|
metric.AddField(key, v)
|
|
}
|
|
case p.tagConversions.Unsigned != nil && p.tagConversions.Unsigned.Match(key):
|
|
if v, err := toUnsigned(value); err != nil {
|
|
p.Log.Errorf("Converting to unsigned [%T] failed: %v", value, err)
|
|
} else {
|
|
metric.AddField(key, v)
|
|
}
|
|
case p.tagConversions.Boolean != nil && p.tagConversions.Boolean.Match(key):
|
|
if v, err := internal.ToBool(value); err != nil {
|
|
p.Log.Errorf("Converting to boolean [%T] failed: %v", value, err)
|
|
} else {
|
|
metric.AddField(key, v)
|
|
}
|
|
case p.tagConversions.Float != nil && p.tagConversions.Float.Match(key):
|
|
if v, err := toFloat(value); err != nil {
|
|
p.Log.Errorf("Converting to float [%T] failed: %v", value, err)
|
|
} else {
|
|
metric.AddField(key, v)
|
|
}
|
|
case p.tagConversions.Timestamp != nil && p.tagConversions.Timestamp.Match(key):
|
|
time, err := internal.ParseTimestamp(p.Tags.TimestampFormat, value, nil)
|
|
if err != nil {
|
|
p.Log.Errorf("Converting to timestamp [%T] failed: %v", value, err)
|
|
continue
|
|
}
|
|
metric.SetTime(time)
|
|
default:
|
|
continue
|
|
}
|
|
metric.RemoveTag(key)
|
|
}
|
|
}
|
|
|
|
// convertFields converts fields into measurements, tags, or other field types.
|
|
func (p *Converter) convertFields(metric telegraf.Metric) {
|
|
if p.fieldConversions == nil {
|
|
return
|
|
}
|
|
|
|
for key, value := range metric.Fields() {
|
|
switch {
|
|
case p.fieldConversions.Measurement != nil && p.fieldConversions.Measurement.Match(key):
|
|
if v, err := internal.ToString(value); err != nil {
|
|
p.Log.Errorf("Converting to measurement [%T] failed: %v", value, err)
|
|
} else {
|
|
metric.SetName(v)
|
|
}
|
|
metric.RemoveField(key)
|
|
case p.fieldConversions.Tag != nil && p.fieldConversions.Tag.Match(key):
|
|
if v, err := internal.ToString(value); err != nil {
|
|
p.Log.Errorf("Converting to tag [%T] failed: %v", value, err)
|
|
} else {
|
|
metric.AddTag(key, v)
|
|
}
|
|
metric.RemoveField(key)
|
|
case p.fieldConversions.Float != nil && p.fieldConversions.Float.Match(key):
|
|
if v, err := toFloat(value); err != nil {
|
|
p.Log.Errorf("Converting to float [%T] failed: %v", value, err)
|
|
metric.RemoveField(key)
|
|
} else {
|
|
metric.AddField(key, v)
|
|
}
|
|
case p.fieldConversions.Integer != nil && p.fieldConversions.Integer.Match(key):
|
|
if v, err := toInteger(value); err != nil {
|
|
p.Log.Errorf("Converting to integer [%T] failed: %v", value, err)
|
|
metric.RemoveField(key)
|
|
} else {
|
|
metric.AddField(key, v)
|
|
}
|
|
case p.fieldConversions.Unsigned != nil && p.fieldConversions.Unsigned.Match(key):
|
|
if v, err := toUnsigned(value); err != nil {
|
|
p.Log.Errorf("Converting to unsigned [%T] failed: %v", value, err)
|
|
metric.RemoveField(key)
|
|
} else {
|
|
metric.AddField(key, v)
|
|
}
|
|
case p.fieldConversions.Boolean != nil && p.fieldConversions.Boolean.Match(key):
|
|
if v, err := internal.ToBool(value); err != nil {
|
|
p.Log.Errorf("Converting to bool [%T] failed: %v", value, err)
|
|
metric.RemoveField(key)
|
|
} else {
|
|
metric.AddField(key, v)
|
|
}
|
|
case p.fieldConversions.String != nil && p.fieldConversions.String.Match(key):
|
|
if v, err := internal.ToString(value); err != nil {
|
|
p.Log.Errorf("Converting to string [%T] failed: %v", value, err)
|
|
metric.RemoveField(key)
|
|
} else {
|
|
metric.AddField(key, v)
|
|
}
|
|
case p.fieldConversions.Timestamp != nil && p.fieldConversions.Timestamp.Match(key):
|
|
if time, err := internal.ParseTimestamp(p.Fields.TimestampFormat, value, nil); err != nil {
|
|
p.Log.Errorf("Converting to timestamp [%T] failed: %v", value, err)
|
|
} else {
|
|
metric.SetTime(time)
|
|
metric.RemoveField(key)
|
|
}
|
|
|
|
case p.fieldConversions.Base64IEEEFloat32 != nil && p.fieldConversions.Base64IEEEFloat32.Match(key):
|
|
if v, err := base64ToFloat32(value.(string)); err != nil {
|
|
p.Log.Errorf("Converting to base64_ieee_float32 [%T] failed: %v", value, err)
|
|
metric.RemoveField(key)
|
|
} else {
|
|
metric.AddField(key, v)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func toInteger(v interface{}) (int64, error) {
|
|
switch value := v.(type) {
|
|
case float32:
|
|
if value < float32(math.MinInt64) {
|
|
return math.MinInt64, nil
|
|
}
|
|
if value > float32(math.MaxInt64) {
|
|
return math.MaxInt64, nil
|
|
}
|
|
return int64(math.Round(float64(value))), nil
|
|
case float64:
|
|
if value < float64(math.MinInt64) {
|
|
return math.MinInt64, nil
|
|
}
|
|
if value > float64(math.MaxInt64) {
|
|
return math.MaxInt64, nil
|
|
}
|
|
return int64(math.Round(value)), nil
|
|
default:
|
|
if v, err := internal.ToInt64(value); err == nil {
|
|
return v, nil
|
|
}
|
|
|
|
v, err := internal.ToFloat64(value)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if v < float64(math.MinInt64) {
|
|
return math.MinInt64, nil
|
|
}
|
|
if v > float64(math.MaxInt64) {
|
|
return math.MaxInt64, nil
|
|
}
|
|
return int64(math.Round(v)), nil
|
|
}
|
|
}
|
|
|
|
func toUnsigned(v interface{}) (uint64, error) {
|
|
switch value := v.(type) {
|
|
case float32:
|
|
if value < 0 {
|
|
return 0, nil
|
|
}
|
|
if value > float32(math.MaxUint64) {
|
|
return math.MaxUint64, nil
|
|
}
|
|
return uint64(math.Round(float64(value))), nil
|
|
case float64:
|
|
if value < 0 {
|
|
return 0, nil
|
|
}
|
|
if value > float64(math.MaxUint64) {
|
|
return math.MaxUint64, nil
|
|
}
|
|
return uint64(math.Round(value)), nil
|
|
default:
|
|
if v, err := internal.ToUint64(value); err == nil {
|
|
return v, nil
|
|
}
|
|
|
|
v, err := internal.ToFloat64(value)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if v < 0 {
|
|
return 0, nil
|
|
}
|
|
if v > float64(math.MaxUint64) {
|
|
return math.MaxUint64, nil
|
|
}
|
|
return uint64(math.Round(v)), nil
|
|
}
|
|
}
|
|
|
|
func toFloat(v interface{}) (float64, error) {
|
|
if v, ok := v.(string); ok && strings.HasPrefix(v, "0x") {
|
|
var i big.Int
|
|
if _, success := i.SetString(v, 0); !success {
|
|
return 0, errors.New("unable to parse string to big int")
|
|
}
|
|
|
|
var f big.Float
|
|
f.SetInt(&i)
|
|
result, _ := f.Float64()
|
|
|
|
return result, nil
|
|
}
|
|
return internal.ToFloat64(v)
|
|
}
|
|
|
|
func base64ToFloat32(encoded string) (float32, error) {
|
|
// Decode the Base64 string to bytes
|
|
decodedBytes, err := base64.StdEncoding.DecodeString(encoded)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Check if the byte length matches a float32 (4 bytes)
|
|
if len(decodedBytes) != 4 {
|
|
return 0, errors.New("decoded byte length is not 4 bytes")
|
|
}
|
|
|
|
// Convert the bytes to a string representation as per IEEE 754 of the bits
|
|
bitsStrRepresentation := fmt.Sprintf("%08b%08b%08b%08b", decodedBytes[0], decodedBytes[1], decodedBytes[2], decodedBytes[3])
|
|
|
|
// Convert the bits to a uint32
|
|
bits, err := strconv.ParseUint(bitsStrRepresentation, 2, 32)
|
|
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Convert the uint32 (bits) to a float32 based on IEEE 754 binary representation
|
|
return math.Float32frombits(uint32(bits)), nil
|
|
}
|
|
|
|
func init() {
|
|
processors.Add("converter", func() telegraf.Processor {
|
|
return &Converter{}
|
|
})
|
|
}
|