163 lines
3.7 KiB
Go
163 lines
3.7 KiB
Go
|
//go:generate ../../../tools/readme_config_includer/generator
|
||
|
package netflow
|
||
|
|
||
|
import (
|
||
|
_ "embed"
|
||
|
"encoding/hex"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"net"
|
||
|
"net/url"
|
||
|
"strings"
|
||
|
"sync"
|
||
|
|
||
|
"github.com/influxdata/telegraf"
|
||
|
"github.com/influxdata/telegraf/config"
|
||
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||
|
)
|
||
|
|
||
|
//go:embed sample.conf
|
||
|
var sampleConfig string
|
||
|
|
||
|
type NetFlow struct {
|
||
|
ServiceAddress string `toml:"service_address"`
|
||
|
ReadBufferSize config.Size `toml:"read_buffer_size"`
|
||
|
Protocol string `toml:"protocol"`
|
||
|
DumpPackets bool `toml:"dump_packets" deprecated:"1.35.0;use 'log_level' 'trace' instead"`
|
||
|
PENFiles []string `toml:"private_enterprise_number_files"`
|
||
|
Log telegraf.Logger `toml:"-"`
|
||
|
|
||
|
conn *net.UDPConn
|
||
|
decoder protocolDecoder
|
||
|
wg sync.WaitGroup
|
||
|
}
|
||
|
|
||
|
type protocolDecoder interface {
|
||
|
init() error
|
||
|
decode(net.IP, []byte) ([]telegraf.Metric, error)
|
||
|
}
|
||
|
|
||
|
func (*NetFlow) SampleConfig() string {
|
||
|
return sampleConfig
|
||
|
}
|
||
|
|
||
|
func (n *NetFlow) Init() error {
|
||
|
if n.ServiceAddress == "" {
|
||
|
return errors.New("service_address required")
|
||
|
}
|
||
|
u, err := url.Parse(n.ServiceAddress)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("invalid service address %q: %w", n.ServiceAddress, err)
|
||
|
}
|
||
|
switch u.Scheme {
|
||
|
case "udp", "udp4", "udp6":
|
||
|
default:
|
||
|
return fmt.Errorf("invalid scheme %q, should be 'udp', 'udp4' or 'udp6'", u.Scheme)
|
||
|
}
|
||
|
|
||
|
switch strings.ToLower(n.Protocol) {
|
||
|
case "netflow v9":
|
||
|
if len(n.PENFiles) != 0 {
|
||
|
n.Log.Warn("'private_enterprise_number_files' option will be ignored in 'netflow v9'")
|
||
|
}
|
||
|
n.decoder = &netflowDecoder{
|
||
|
log: n.Log,
|
||
|
}
|
||
|
case "", "ipfix":
|
||
|
n.decoder = &netflowDecoder{
|
||
|
penFiles: n.PENFiles,
|
||
|
log: n.Log,
|
||
|
}
|
||
|
case "netflow v5":
|
||
|
if len(n.PENFiles) != 0 {
|
||
|
n.Log.Warn("'private_enterprise_number_files' option will be ignored in 'netflow v5'")
|
||
|
}
|
||
|
n.decoder = &netflowv5Decoder{}
|
||
|
case "sflow", "sflow v5":
|
||
|
n.decoder = &sflowv5Decoder{log: n.Log}
|
||
|
default:
|
||
|
return fmt.Errorf("invalid protocol %q, only supports 'sflow', 'netflow v5', 'netflow v9' and 'ipfix'", n.Protocol)
|
||
|
}
|
||
|
|
||
|
return n.decoder.init()
|
||
|
}
|
||
|
|
||
|
func (n *NetFlow) Start(acc telegraf.Accumulator) error {
|
||
|
u, err := url.Parse(n.ServiceAddress)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
addr, err := net.ResolveUDPAddr(u.Scheme, u.Host)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
conn, err := net.ListenUDP(u.Scheme, addr)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
n.conn = conn
|
||
|
|
||
|
if n.ReadBufferSize > 0 {
|
||
|
if err := conn.SetReadBuffer(int(n.ReadBufferSize)); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
n.Log.Infof("Listening on %s://%s", n.conn.LocalAddr().Network(), n.conn.LocalAddr().String())
|
||
|
|
||
|
n.wg.Add(1)
|
||
|
go func() {
|
||
|
defer n.wg.Done()
|
||
|
n.read(acc)
|
||
|
}()
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (*NetFlow) Gather(telegraf.Accumulator) error {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (n *NetFlow) Stop() {
|
||
|
if n.conn != nil {
|
||
|
_ = n.conn.Close()
|
||
|
}
|
||
|
n.wg.Wait()
|
||
|
}
|
||
|
|
||
|
func (n *NetFlow) read(acc telegraf.Accumulator) {
|
||
|
buf := make([]byte, 64*1024) // 64kB
|
||
|
for {
|
||
|
count, src, err := n.conn.ReadFromUDP(buf)
|
||
|
if err != nil {
|
||
|
if !strings.HasSuffix(err.Error(), ": use of closed network connection") {
|
||
|
acc.AddError(err)
|
||
|
}
|
||
|
break
|
||
|
}
|
||
|
n.Log.Debugf("received %d bytes\n", count)
|
||
|
if count < 1 {
|
||
|
continue
|
||
|
}
|
||
|
if n.Log.Level().Includes(telegraf.Trace) || n.DumpPackets { // for backward compatibility
|
||
|
n.Log.Tracef("raw data: %s", hex.EncodeToString(buf[:count]))
|
||
|
}
|
||
|
metrics, err := n.decoder.decode(src.IP, buf[:count])
|
||
|
if err != nil {
|
||
|
errWithData := fmt.Errorf("%w; raw data: %s", err, hex.EncodeToString(buf[:count]))
|
||
|
acc.AddError(errWithData)
|
||
|
continue
|
||
|
}
|
||
|
for _, m := range metrics {
|
||
|
acc.AddMetric(m)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Register the plugin
|
||
|
func init() {
|
||
|
inputs.Add("netflow", func() telegraf.Input {
|
||
|
return &NetFlow{}
|
||
|
})
|
||
|
}
|