1
0
Fork 0
telegraf/plugins/processors/lookup/lookup.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

215 lines
4.7 KiB
Go

//go:generate ../../../tools/readme_config_includer/generator
package lookup
import (
"bytes"
_ "embed"
"encoding/csv"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"strings"
"text/template"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/processors"
)
//go:embed sample.conf
var sampleConfig string
type Processor struct {
Filenames []string `toml:"files"`
Fileformat string `toml:"format"`
KeyTemplate string `toml:"key"`
Log telegraf.Logger `toml:"-"`
tmpl *template.Template
mappings map[string][]telegraf.Tag
}
func (*Processor) SampleConfig() string {
return sampleConfig
}
func (p *Processor) Init() error {
if len(p.Filenames) < 1 {
return errors.New("missing 'files'")
}
if p.KeyTemplate == "" {
return errors.New("missing 'key_template'")
}
tmpl, err := template.New("key").Parse(p.KeyTemplate)
if err != nil {
return fmt.Errorf("creating template failed: %w", err)
}
p.tmpl = tmpl
p.mappings = make(map[string][]telegraf.Tag)
switch strings.ToLower(p.Fileformat) {
case "", "json":
return p.loadJSONFiles()
case "csv_key_name_value":
return p.loadCSVKeyNameValueFiles()
case "csv_key_values":
return p.loadCSVKeyValuesFiles()
}
return fmt.Errorf("invalid format %q", p.Fileformat)
}
func (p *Processor) Apply(in ...telegraf.Metric) []telegraf.Metric {
out := make([]telegraf.Metric, 0, len(in))
for _, raw := range in {
m := raw
if wm, ok := raw.(telegraf.UnwrappableMetric); ok {
m = wm.Unwrap()
}
var buf bytes.Buffer
if err := p.tmpl.Execute(&buf, m); err != nil {
p.Log.Errorf("generating key failed: %v", err)
p.Log.Debugf("metric was %v", m)
} else if tags, found := p.mappings[buf.String()]; found {
for _, tag := range tags {
m.AddTag(tag.Key, tag.Value)
}
}
out = append(out, raw)
}
return out
}
func (p *Processor) loadJSONFiles() error {
for _, fn := range p.Filenames {
buf, err := os.ReadFile(fn)
if err != nil {
return fmt.Errorf("loading %q failed: %w", fn, err)
}
var data map[string]map[string]string
if err := json.Unmarshal(buf, &data); err != nil {
return fmt.Errorf("parsing %q failed: %w", fn, err)
}
for key, tags := range data {
for k, v := range tags {
p.mappings[key] = append(p.mappings[key], telegraf.Tag{Key: k, Value: v})
}
}
}
return nil
}
func (p *Processor) loadCSVKeyNameValueFiles() error {
for _, fn := range p.Filenames {
if err := p.loadCSVKeyNameValueFile(fn); err != nil {
return err
}
}
return nil
}
func (p *Processor) loadCSVKeyNameValueFile(fn string) error {
f, err := os.Open(fn)
if err != nil {
return fmt.Errorf("loading %q failed: %w", fn, err)
}
defer f.Close()
reader := csv.NewReader(f)
reader.Comment = '#'
reader.FieldsPerRecord = -1
reader.TrimLeadingSpace = true
line := 0
for {
line++
data, err := reader.Read()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return fmt.Errorf("reading line %d in %q failed: %w", line, fn, err)
}
if len(data) < 3 {
return fmt.Errorf("line %d in %q has not enough columns, requiring at least `key,name,value`", line, fn)
}
if len(data)%2 != 1 {
return fmt.Errorf("line %d in %q has a tag-name without value", line, fn)
}
key := data[0]
for i := 1; i < len(data)-1; i += 2 {
k, v := data[i], data[i+1]
p.mappings[key] = append(p.mappings[key], telegraf.Tag{Key: k, Value: v})
}
}
return nil
}
func (p *Processor) loadCSVKeyValuesFiles() error {
for _, fn := range p.Filenames {
if err := p.loadCSVKeyValuesFile(fn); err != nil {
return err
}
}
return nil
}
func (p *Processor) loadCSVKeyValuesFile(fn string) error {
f, err := os.Open(fn)
if err != nil {
return fmt.Errorf("loading %q failed: %w", fn, err)
}
defer f.Close()
reader := csv.NewReader(f)
reader.Comment = '#'
reader.TrimLeadingSpace = true
// Read the first line which should be the header
header, err := reader.Read()
if err != nil {
if errors.Is(err, io.EOF) {
return fmt.Errorf("missing header in %q", fn)
}
return fmt.Errorf("reading header in %q failed: %w", fn, err)
}
if len(header) < 2 {
return fmt.Errorf("header in %q has not enough columns, requiring at least `key,value`", fn)
}
header = header[1:]
line := 1
for {
line++
data, err := reader.Read()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return fmt.Errorf("reading line %d in %q failed: %w", line, fn, err)
}
key := data[0]
for i, v := range data[1:] {
v = strings.TrimSpace(v)
if v != "" {
p.mappings[key] = append(p.mappings[key], telegraf.Tag{Key: header[i], Value: v})
}
}
}
return nil
}
func init() {
processors.Add("lookup", func() telegraf.Processor {
return &Processor{}
})
}