1
0
Fork 0
telegraf/plugins/processors/converter/converter.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

396 lines
10 KiB
Go

//go:generate ../../../tools/readme_config_includer/generator
package converter
import (
_ "embed"
"encoding/base64"
"errors"
"fmt"
"math"
"math/big"
"strconv"
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/processors"
)
//go:embed sample.conf
var sampleConfig string
type Conversion struct {
Measurement []string `toml:"measurement"`
Tag []string `toml:"tag"`
String []string `toml:"string"`
Integer []string `toml:"integer"`
Unsigned []string `toml:"unsigned"`
Boolean []string `toml:"boolean"`
Float []string `toml:"float"`
Timestamp []string `toml:"timestamp"`
TimestampFormat string `toml:"timestamp_format"`
Base64IEEEFloat32 []string `toml:"base64_ieee_float32"`
}
type Converter struct {
Tags *Conversion `toml:"tags"`
Fields *Conversion `toml:"fields"`
Log telegraf.Logger `toml:"-"`
tagConversions *ConversionFilter
fieldConversions *ConversionFilter
}
type ConversionFilter struct {
Measurement filter.Filter
Tag filter.Filter
String filter.Filter
Integer filter.Filter
Unsigned filter.Filter
Boolean filter.Filter
Float filter.Filter
Timestamp filter.Filter
Base64IEEEFloat32 filter.Filter
}
func (*Converter) SampleConfig() string {
return sampleConfig
}
func (p *Converter) Init() error {
return p.compile()
}
func (p *Converter) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
for _, metric := range metrics {
p.convertTags(metric)
p.convertFields(metric)
}
return metrics
}
func (p *Converter) compile() error {
tf, err := compileFilter(p.Tags)
if err != nil {
return err
}
ff, err := compileFilter(p.Fields)
if err != nil {
return err
}
if tf == nil && ff == nil {
return errors.New("no filters found")
}
p.tagConversions = tf
p.fieldConversions = ff
return nil
}
func compileFilter(conv *Conversion) (*ConversionFilter, error) {
if conv == nil {
return nil, nil
}
var err error
cf := &ConversionFilter{}
cf.Measurement, err = filter.Compile(conv.Measurement)
if err != nil {
return nil, err
}
cf.Tag, err = filter.Compile(conv.Tag)
if err != nil {
return nil, err
}
cf.String, err = filter.Compile(conv.String)
if err != nil {
return nil, err
}
cf.Integer, err = filter.Compile(conv.Integer)
if err != nil {
return nil, err
}
cf.Unsigned, err = filter.Compile(conv.Unsigned)
if err != nil {
return nil, err
}
cf.Boolean, err = filter.Compile(conv.Boolean)
if err != nil {
return nil, err
}
cf.Float, err = filter.Compile(conv.Float)
if err != nil {
return nil, err
}
cf.Timestamp, err = filter.Compile(conv.Timestamp)
if err != nil {
return nil, err
}
cf.Base64IEEEFloat32, err = filter.Compile(conv.Base64IEEEFloat32)
if err != nil {
return nil, err
}
return cf, nil
}
// convertTags converts tags into measurements or fields.
func (p *Converter) convertTags(metric telegraf.Metric) {
if p.tagConversions == nil {
return
}
for key, value := range metric.Tags() {
switch {
case p.tagConversions.Measurement != nil && p.tagConversions.Measurement.Match(key):
metric.SetName(value)
case p.tagConversions.String != nil && p.tagConversions.String.Match(key):
metric.AddField(key, value)
case p.tagConversions.Integer != nil && p.tagConversions.Integer.Match(key):
if v, err := toInteger(value); err != nil {
p.Log.Errorf("Converting to integer [%T] failed: %v", value, err)
} else {
metric.AddField(key, v)
}
case p.tagConversions.Unsigned != nil && p.tagConversions.Unsigned.Match(key):
if v, err := toUnsigned(value); err != nil {
p.Log.Errorf("Converting to unsigned [%T] failed: %v", value, err)
} else {
metric.AddField(key, v)
}
case p.tagConversions.Boolean != nil && p.tagConversions.Boolean.Match(key):
if v, err := internal.ToBool(value); err != nil {
p.Log.Errorf("Converting to boolean [%T] failed: %v", value, err)
} else {
metric.AddField(key, v)
}
case p.tagConversions.Float != nil && p.tagConversions.Float.Match(key):
if v, err := toFloat(value); err != nil {
p.Log.Errorf("Converting to float [%T] failed: %v", value, err)
} else {
metric.AddField(key, v)
}
case p.tagConversions.Timestamp != nil && p.tagConversions.Timestamp.Match(key):
time, err := internal.ParseTimestamp(p.Tags.TimestampFormat, value, nil)
if err != nil {
p.Log.Errorf("Converting to timestamp [%T] failed: %v", value, err)
continue
}
metric.SetTime(time)
default:
continue
}
metric.RemoveTag(key)
}
}
// convertFields converts fields into measurements, tags, or other field types.
func (p *Converter) convertFields(metric telegraf.Metric) {
if p.fieldConversions == nil {
return
}
for key, value := range metric.Fields() {
switch {
case p.fieldConversions.Measurement != nil && p.fieldConversions.Measurement.Match(key):
if v, err := internal.ToString(value); err != nil {
p.Log.Errorf("Converting to measurement [%T] failed: %v", value, err)
} else {
metric.SetName(v)
}
metric.RemoveField(key)
case p.fieldConversions.Tag != nil && p.fieldConversions.Tag.Match(key):
if v, err := internal.ToString(value); err != nil {
p.Log.Errorf("Converting to tag [%T] failed: %v", value, err)
} else {
metric.AddTag(key, v)
}
metric.RemoveField(key)
case p.fieldConversions.Float != nil && p.fieldConversions.Float.Match(key):
if v, err := toFloat(value); err != nil {
p.Log.Errorf("Converting to float [%T] failed: %v", value, err)
metric.RemoveField(key)
} else {
metric.AddField(key, v)
}
case p.fieldConversions.Integer != nil && p.fieldConversions.Integer.Match(key):
if v, err := toInteger(value); err != nil {
p.Log.Errorf("Converting to integer [%T] failed: %v", value, err)
metric.RemoveField(key)
} else {
metric.AddField(key, v)
}
case p.fieldConversions.Unsigned != nil && p.fieldConversions.Unsigned.Match(key):
if v, err := toUnsigned(value); err != nil {
p.Log.Errorf("Converting to unsigned [%T] failed: %v", value, err)
metric.RemoveField(key)
} else {
metric.AddField(key, v)
}
case p.fieldConversions.Boolean != nil && p.fieldConversions.Boolean.Match(key):
if v, err := internal.ToBool(value); err != nil {
p.Log.Errorf("Converting to bool [%T] failed: %v", value, err)
metric.RemoveField(key)
} else {
metric.AddField(key, v)
}
case p.fieldConversions.String != nil && p.fieldConversions.String.Match(key):
if v, err := internal.ToString(value); err != nil {
p.Log.Errorf("Converting to string [%T] failed: %v", value, err)
metric.RemoveField(key)
} else {
metric.AddField(key, v)
}
case p.fieldConversions.Timestamp != nil && p.fieldConversions.Timestamp.Match(key):
if time, err := internal.ParseTimestamp(p.Fields.TimestampFormat, value, nil); err != nil {
p.Log.Errorf("Converting to timestamp [%T] failed: %v", value, err)
} else {
metric.SetTime(time)
metric.RemoveField(key)
}
case p.fieldConversions.Base64IEEEFloat32 != nil && p.fieldConversions.Base64IEEEFloat32.Match(key):
if v, err := base64ToFloat32(value.(string)); err != nil {
p.Log.Errorf("Converting to base64_ieee_float32 [%T] failed: %v", value, err)
metric.RemoveField(key)
} else {
metric.AddField(key, v)
}
}
}
}
func toInteger(v interface{}) (int64, error) {
switch value := v.(type) {
case float32:
if value < float32(math.MinInt64) {
return math.MinInt64, nil
}
if value > float32(math.MaxInt64) {
return math.MaxInt64, nil
}
return int64(math.Round(float64(value))), nil
case float64:
if value < float64(math.MinInt64) {
return math.MinInt64, nil
}
if value > float64(math.MaxInt64) {
return math.MaxInt64, nil
}
return int64(math.Round(value)), nil
default:
if v, err := internal.ToInt64(value); err == nil {
return v, nil
}
v, err := internal.ToFloat64(value)
if err != nil {
return 0, err
}
if v < float64(math.MinInt64) {
return math.MinInt64, nil
}
if v > float64(math.MaxInt64) {
return math.MaxInt64, nil
}
return int64(math.Round(v)), nil
}
}
func toUnsigned(v interface{}) (uint64, error) {
switch value := v.(type) {
case float32:
if value < 0 {
return 0, nil
}
if value > float32(math.MaxUint64) {
return math.MaxUint64, nil
}
return uint64(math.Round(float64(value))), nil
case float64:
if value < 0 {
return 0, nil
}
if value > float64(math.MaxUint64) {
return math.MaxUint64, nil
}
return uint64(math.Round(value)), nil
default:
if v, err := internal.ToUint64(value); err == nil {
return v, nil
}
v, err := internal.ToFloat64(value)
if err != nil {
return 0, err
}
if v < 0 {
return 0, nil
}
if v > float64(math.MaxUint64) {
return math.MaxUint64, nil
}
return uint64(math.Round(v)), nil
}
}
func toFloat(v interface{}) (float64, error) {
if v, ok := v.(string); ok && strings.HasPrefix(v, "0x") {
var i big.Int
if _, success := i.SetString(v, 0); !success {
return 0, errors.New("unable to parse string to big int")
}
var f big.Float
f.SetInt(&i)
result, _ := f.Float64()
return result, nil
}
return internal.ToFloat64(v)
}
func base64ToFloat32(encoded string) (float32, error) {
// Decode the Base64 string to bytes
decodedBytes, err := base64.StdEncoding.DecodeString(encoded)
if err != nil {
return 0, err
}
// Check if the byte length matches a float32 (4 bytes)
if len(decodedBytes) != 4 {
return 0, errors.New("decoded byte length is not 4 bytes")
}
// Convert the bytes to a string representation as per IEEE 754 of the bits
bitsStrRepresentation := fmt.Sprintf("%08b%08b%08b%08b", decodedBytes[0], decodedBytes[1], decodedBytes[2], decodedBytes[3])
// Convert the bits to a uint32
bits, err := strconv.ParseUint(bitsStrRepresentation, 2, 32)
if err != nil {
return 0, err
}
// Convert the uint32 (bits) to a float32 based on IEEE 754 binary representation
return math.Float32frombits(uint32(bits)), nil
}
func init() {
processors.Add("converter", func() telegraf.Processor {
return &Converter{}
})
}