1
0
Fork 0
telegraf/plugins/processors/ifname/ifname.go

331 lines
7.1 KiB
Go
Raw Permalink Normal View History

//go:generate ../../../tools/readme_config_includer/generator
package ifname
import (
_ "embed"
"errors"
"fmt"
"strconv"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/snmp"
"github.com/influxdata/telegraf/plugins/common/parallel"
"github.com/influxdata/telegraf/plugins/processors"
)
//go:embed sample.conf
var sampleConfig string
type nameMap map[uint64]string
type keyType = string
type valType = nameMap
type mapFunc func(agent string) (nameMap, error)
type sigMap map[string]chan struct{}
type IfName struct {
SourceTag string `toml:"tag"`
DestTag string `toml:"dest"`
AgentTag string `toml:"agent"`
snmp.ClientConfig
CacheSize uint `toml:"max_cache_entries"`
MaxParallelLookups int `toml:"max_parallel_lookups"`
Ordered bool `toml:"ordered"`
CacheTTL config.Duration `toml:"cache_ttl"`
Log telegraf.Logger `toml:"-"`
ifTable *snmp.Table
ifXTable *snmp.Table
cache *TTLCache
lock sync.Mutex
parallel parallel.Parallel
sigs sigMap
getMapRemote mapFunc
}
const minRetry = 5 * time.Minute
func (*IfName) SampleConfig() string {
return sampleConfig
}
func (d *IfName) Init() error {
d.getMapRemote = d.getMapRemoteNoMock
c := NewTTLCache(time.Duration(d.CacheTTL), d.CacheSize)
d.cache = &c
d.sigs = make(sigMap)
if _, err := snmp.NewWrapper(d.ClientConfig); err != nil {
return fmt.Errorf("parsing SNMP client config: %w", err)
}
return nil
}
func (d *IfName) addTag(metric telegraf.Metric) error {
agent, ok := metric.GetTag(d.AgentTag)
if !ok {
d.Log.Warn("Agent tag missing.")
return nil
}
numS, ok := metric.GetTag(d.SourceTag)
if !ok {
d.Log.Warn("Source tag missing.")
return nil
}
num, err := strconv.ParseUint(numS, 10, 64)
if err != nil {
return errors.New("couldn't parse source tag as uint")
}
firstTime := true
for {
m, age, err := d.getMap(agent)
if err != nil {
return fmt.Errorf("couldn't retrieve the table of interface names for %s: %w", agent, err)
}
name, found := m[num]
if found {
// success
metric.AddTag(d.DestTag, name)
return nil
}
// We have the agent's interface map but it doesn't contain
// the interface we're interested in. If the entry is old
// enough, retrieve it from the agent once more.
if age < minRetry {
return fmt.Errorf("interface number %d isn't in the table of interface names on %s", num, agent)
}
if firstTime {
d.invalidate(agent)
firstTime = false
continue
}
// not found, cache hit, retrying
return fmt.Errorf("missing interface but couldn't retrieve table for %v", agent)
}
}
func (d *IfName) invalidate(agent string) {
d.lock.Lock()
d.cache.Delete(agent)
d.lock.Unlock()
}
func (d *IfName) Start(acc telegraf.Accumulator) error {
var err error
d.ifTable, err = makeTable("1.3.6.1.2.1.2.2.1.2")
if err != nil {
return fmt.Errorf("preparing ifTable: %w", err)
}
d.ifXTable, err = makeTable("1.3.6.1.2.1.31.1.1.1.1")
if err != nil {
return fmt.Errorf("preparing ifXTable: %w", err)
}
fn := func(m telegraf.Metric) []telegraf.Metric {
err := d.addTag(m)
if err != nil {
d.Log.Debugf("Error adding tag: %v", err)
}
return []telegraf.Metric{m}
}
if d.Ordered {
d.parallel = parallel.NewOrdered(acc, fn, 10000, d.MaxParallelLookups)
} else {
d.parallel = parallel.NewUnordered(acc, fn, d.MaxParallelLookups)
}
return nil
}
func (d *IfName) Add(metric telegraf.Metric, _ telegraf.Accumulator) error {
d.parallel.Enqueue(metric)
return nil
}
func (d *IfName) Stop() {
d.parallel.Stop()
}
// getMap gets the interface names map either from cache or from the SNMP
// agent
func (d *IfName) getMap(agent string) (entry nameMap, age time.Duration, err error) {
var sig chan struct{}
d.lock.Lock()
// Check cache
m, ok, age := d.cache.Get(agent)
if ok {
d.lock.Unlock()
return m, age, nil
}
// cache miss. Is this the first request for this agent?
sig, found := d.sigs[agent]
if !found {
// This is the first request. Make signal for subsequent requests to wait on
s := make(chan struct{})
d.sigs[agent] = s
sig = s
}
d.lock.Unlock()
if found {
// This is not the first request. Wait for first to finish.
<-sig
// Check cache again
d.lock.Lock()
m, ok, age := d.cache.Get(agent)
d.lock.Unlock()
if ok {
return m, age, nil
}
return nil, 0, errors.New("getting remote table from cache")
}
// The cache missed and this is the first request for this
// agent. Make the SNMP request
m, err = d.getMapRemote(agent)
d.lock.Lock()
if err != nil {
// snmp failure. signal without saving to cache
close(sig)
delete(d.sigs, agent)
d.lock.Unlock()
return nil, 0, fmt.Errorf("getting remote table: %w", err)
}
// snmp success. Cache response, then signal any other waiting
// requests for this agent and clean up
d.cache.Put(agent, m)
close(sig)
delete(d.sigs, agent)
d.lock.Unlock()
return m, 0, nil
}
func (d *IfName) getMapRemoteNoMock(agent string) (nameMap, error) {
gs, err := snmp.NewWrapper(d.ClientConfig)
if err != nil {
return nil, fmt.Errorf("parsing SNMP client config: %w", err)
}
if err = gs.SetAgent(agent); err != nil {
return nil, fmt.Errorf("parsing agent tag: %w", err)
}
if err = gs.Connect(); err != nil {
return nil, fmt.Errorf("connecting when fetching interface names: %w", err)
}
// try ifXtable and ifName first. if that fails, fall back to
// ifTable and ifDescr
var m nameMap
if m, err = buildMap(gs, d.ifXTable); err == nil {
return m, nil
}
if m, err = buildMap(gs, d.ifTable); err == nil {
return m, nil
}
return nil, fmt.Errorf("fetching interface names: %w", err)
}
func init() {
processors.AddStreaming("ifname", func() telegraf.StreamingProcessor {
return &IfName{
SourceTag: "ifIndex",
DestTag: "ifName",
AgentTag: "agent",
CacheSize: 100,
MaxParallelLookups: 100,
ClientConfig: *snmp.DefaultClientConfig(),
CacheTTL: config.Duration(8 * time.Hour),
}
})
}
func makeTable(oid string) (*snmp.Table, error) {
var err error
tab := snmp.Table{
Name: "ifTable",
IndexAsTag: true,
Fields: []snmp.Field{
{Oid: oid, Name: "ifName"},
},
}
err = tab.Init(nil)
if err != nil {
// Init already wraps
return nil, err
}
return &tab, nil
}
func buildMap(gs snmp.GosnmpWrapper, tab *snmp.Table) (nameMap, error) {
var err error
rtab, err := tab.Build(gs, true)
if err != nil {
// Build already wraps
return nil, err
}
if len(rtab.Rows) == 0 {
return nil, errors.New("empty table")
}
t := make(nameMap)
for _, v := range rtab.Rows {
iStr, ok := v.Tags["index"]
if !ok {
// should always have an index tag because the table should
// always have IndexAsTag true
return nil, errors.New("no index tag")
}
i, err := strconv.ParseUint(iStr, 10, 64)
if err != nil {
return nil, errors.New("index tag isn't a uint")
}
nameIf, ok := v.Fields["ifName"]
if !ok {
return nil, errors.New("ifName field is missing")
}
name, ok := nameIf.(string)
if !ok {
return nil, errors.New("ifName field isn't a string")
}
t[i] = name
}
return t, nil
}