215 lines
4.7 KiB
Go
215 lines
4.7 KiB
Go
//go:generate ../../../tools/readme_config_includer/generator
|
|
package lookup
|
|
|
|
import (
|
|
"bytes"
|
|
_ "embed"
|
|
"encoding/csv"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"strings"
|
|
"text/template"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/plugins/processors"
|
|
)
|
|
|
|
//go:embed sample.conf
|
|
var sampleConfig string
|
|
|
|
type Processor struct {
|
|
Filenames []string `toml:"files"`
|
|
Fileformat string `toml:"format"`
|
|
KeyTemplate string `toml:"key"`
|
|
Log telegraf.Logger `toml:"-"`
|
|
|
|
tmpl *template.Template
|
|
mappings map[string][]telegraf.Tag
|
|
}
|
|
|
|
func (*Processor) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
func (p *Processor) Init() error {
|
|
if len(p.Filenames) < 1 {
|
|
return errors.New("missing 'files'")
|
|
}
|
|
|
|
if p.KeyTemplate == "" {
|
|
return errors.New("missing 'key_template'")
|
|
}
|
|
|
|
tmpl, err := template.New("key").Parse(p.KeyTemplate)
|
|
if err != nil {
|
|
return fmt.Errorf("creating template failed: %w", err)
|
|
}
|
|
p.tmpl = tmpl
|
|
|
|
p.mappings = make(map[string][]telegraf.Tag)
|
|
switch strings.ToLower(p.Fileformat) {
|
|
case "", "json":
|
|
return p.loadJSONFiles()
|
|
case "csv_key_name_value":
|
|
return p.loadCSVKeyNameValueFiles()
|
|
case "csv_key_values":
|
|
return p.loadCSVKeyValuesFiles()
|
|
}
|
|
|
|
return fmt.Errorf("invalid format %q", p.Fileformat)
|
|
}
|
|
|
|
func (p *Processor) Apply(in ...telegraf.Metric) []telegraf.Metric {
|
|
out := make([]telegraf.Metric, 0, len(in))
|
|
for _, raw := range in {
|
|
m := raw
|
|
if wm, ok := raw.(telegraf.UnwrappableMetric); ok {
|
|
m = wm.Unwrap()
|
|
}
|
|
|
|
var buf bytes.Buffer
|
|
if err := p.tmpl.Execute(&buf, m); err != nil {
|
|
p.Log.Errorf("generating key failed: %v", err)
|
|
p.Log.Debugf("metric was %v", m)
|
|
} else if tags, found := p.mappings[buf.String()]; found {
|
|
for _, tag := range tags {
|
|
m.AddTag(tag.Key, tag.Value)
|
|
}
|
|
}
|
|
out = append(out, raw)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (p *Processor) loadJSONFiles() error {
|
|
for _, fn := range p.Filenames {
|
|
buf, err := os.ReadFile(fn)
|
|
if err != nil {
|
|
return fmt.Errorf("loading %q failed: %w", fn, err)
|
|
}
|
|
|
|
var data map[string]map[string]string
|
|
if err := json.Unmarshal(buf, &data); err != nil {
|
|
return fmt.Errorf("parsing %q failed: %w", fn, err)
|
|
}
|
|
|
|
for key, tags := range data {
|
|
for k, v := range tags {
|
|
p.mappings[key] = append(p.mappings[key], telegraf.Tag{Key: k, Value: v})
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *Processor) loadCSVKeyNameValueFiles() error {
|
|
for _, fn := range p.Filenames {
|
|
if err := p.loadCSVKeyNameValueFile(fn); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *Processor) loadCSVKeyNameValueFile(fn string) error {
|
|
f, err := os.Open(fn)
|
|
if err != nil {
|
|
return fmt.Errorf("loading %q failed: %w", fn, err)
|
|
}
|
|
defer f.Close()
|
|
|
|
reader := csv.NewReader(f)
|
|
reader.Comment = '#'
|
|
reader.FieldsPerRecord = -1
|
|
reader.TrimLeadingSpace = true
|
|
|
|
line := 0
|
|
for {
|
|
line++
|
|
data, err := reader.Read()
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
break
|
|
}
|
|
return fmt.Errorf("reading line %d in %q failed: %w", line, fn, err)
|
|
}
|
|
if len(data) < 3 {
|
|
return fmt.Errorf("line %d in %q has not enough columns, requiring at least `key,name,value`", line, fn)
|
|
}
|
|
if len(data)%2 != 1 {
|
|
return fmt.Errorf("line %d in %q has a tag-name without value", line, fn)
|
|
}
|
|
|
|
key := data[0]
|
|
for i := 1; i < len(data)-1; i += 2 {
|
|
k, v := data[i], data[i+1]
|
|
p.mappings[key] = append(p.mappings[key], telegraf.Tag{Key: k, Value: v})
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *Processor) loadCSVKeyValuesFiles() error {
|
|
for _, fn := range p.Filenames {
|
|
if err := p.loadCSVKeyValuesFile(fn); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *Processor) loadCSVKeyValuesFile(fn string) error {
|
|
f, err := os.Open(fn)
|
|
if err != nil {
|
|
return fmt.Errorf("loading %q failed: %w", fn, err)
|
|
}
|
|
defer f.Close()
|
|
|
|
reader := csv.NewReader(f)
|
|
reader.Comment = '#'
|
|
reader.TrimLeadingSpace = true
|
|
|
|
// Read the first line which should be the header
|
|
header, err := reader.Read()
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
return fmt.Errorf("missing header in %q", fn)
|
|
}
|
|
return fmt.Errorf("reading header in %q failed: %w", fn, err)
|
|
}
|
|
if len(header) < 2 {
|
|
return fmt.Errorf("header in %q has not enough columns, requiring at least `key,value`", fn)
|
|
}
|
|
header = header[1:]
|
|
|
|
line := 1
|
|
for {
|
|
line++
|
|
data, err := reader.Read()
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
break
|
|
}
|
|
return fmt.Errorf("reading line %d in %q failed: %w", line, fn, err)
|
|
}
|
|
|
|
key := data[0]
|
|
for i, v := range data[1:] {
|
|
v = strings.TrimSpace(v)
|
|
if v != "" {
|
|
p.mappings[key] = append(p.mappings[key], telegraf.Tag{Key: header[i], Value: v})
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
func init() {
|
|
processors.Add("lookup", func() telegraf.Processor {
|
|
return &Processor{}
|
|
})
|
|
}
|