296 lines
7.7 KiB
Go
296 lines
7.7 KiB
Go
package netflow
|
|
|
|
import (
|
|
"encoding/hex"
|
|
"fmt"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/google/go-cmp/cmp"
|
|
"github.com/netsampler/goflow2/v2/decoders/netflow"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/config"
|
|
"github.com/influxdata/telegraf/plugins/inputs"
|
|
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
|
"github.com/influxdata/telegraf/testutil"
|
|
)
|
|
|
|
func TestInit(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
address string
|
|
protocol string
|
|
errmsg string
|
|
}{
|
|
{
|
|
name: "Netflow v5",
|
|
address: "udp://:2055",
|
|
protocol: "netflow v5",
|
|
},
|
|
{
|
|
name: "Netflow v5 (uppercase)",
|
|
address: "udp://:2055",
|
|
protocol: "Netflow v5",
|
|
},
|
|
{
|
|
name: "Netflow v9",
|
|
address: "udp://:2055",
|
|
protocol: "netflow v9",
|
|
},
|
|
{
|
|
name: "Netflow v9 (uppercase)",
|
|
address: "udp://:2055",
|
|
protocol: "Netflow v9",
|
|
},
|
|
{
|
|
name: "IPFIX",
|
|
address: "udp://:2055",
|
|
protocol: "ipfix",
|
|
},
|
|
{
|
|
name: "IPFIX (uppercase)",
|
|
address: "udp://:2055",
|
|
protocol: "IPFIX",
|
|
},
|
|
{
|
|
name: "invalid protocol",
|
|
address: "udp://:2055",
|
|
protocol: "foo",
|
|
errmsg: "invalid protocol",
|
|
},
|
|
{
|
|
name: "UDP",
|
|
address: "udp://:2055",
|
|
protocol: "netflow v5",
|
|
},
|
|
{
|
|
name: "UDP4",
|
|
address: "udp4://:2055",
|
|
protocol: "netflow v5",
|
|
},
|
|
{
|
|
name: "UDP6",
|
|
address: "udp6://:2055",
|
|
protocol: "netflow v5",
|
|
},
|
|
{
|
|
name: "empty service address",
|
|
address: "",
|
|
protocol: "netflow v5",
|
|
errmsg: "service_address required",
|
|
},
|
|
{
|
|
name: "invalid address scheme",
|
|
address: "tcp://:2055",
|
|
protocol: "netflow v5",
|
|
errmsg: "invalid scheme",
|
|
},
|
|
{
|
|
name: "invalid service address",
|
|
address: "udp://198.168.1.290:la",
|
|
protocol: "netflow v5",
|
|
errmsg: "invalid service address",
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
plugin := &NetFlow{
|
|
ServiceAddress: tt.address,
|
|
Protocol: tt.protocol,
|
|
Log: testutil.Logger{},
|
|
}
|
|
err := plugin.Init()
|
|
if tt.errmsg != "" {
|
|
require.ErrorContains(t, err, tt.errmsg)
|
|
} else {
|
|
require.NoError(t, err)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestMissingTemplate(t *testing.T) {
|
|
raw := "000a00bc646b84c000000000000000e7010500ac000000000001dbe100000000"
|
|
raw += "0000038a060018bdeac0a802c8000000000001bb6810f9f90000000000000000"
|
|
raw += "000157b8c40155f28a00005056b3e365005056b3a7f804646b8471646b84e600"
|
|
raw += "00018843fd5cf60000018843ff232e000000000000000e00000000000007bc00"
|
|
raw += "000005000009560000000300dc00000000000000000000000000000e3130342e"
|
|
raw += "31362e3234392e3234390e3130342e31362e3234392e323439000000"
|
|
msg, err := hex.DecodeString(raw)
|
|
require.NoError(t, err)
|
|
|
|
var acc testutil.Accumulator
|
|
var logger testutil.CaptureLogger
|
|
plugin := &NetFlow{
|
|
ServiceAddress: "udp://127.0.0.1:0",
|
|
Log: &logger,
|
|
}
|
|
require.NoError(t, plugin.Init())
|
|
require.NoError(t, plugin.Start(&acc))
|
|
defer plugin.Stop()
|
|
|
|
// Create a client without TLS
|
|
addr := plugin.conn.LocalAddr()
|
|
client, err := createClient(plugin.ServiceAddress, addr)
|
|
require.NoError(t, err)
|
|
|
|
// Write the message
|
|
_, err = client.Write(msg)
|
|
require.NoErrorf(t, err, "writing message failed: %v", err)
|
|
require.NoError(t, client.Close())
|
|
|
|
// We expect a warning here
|
|
require.Eventually(t, func() bool {
|
|
return len(logger.Warnings()) > 0
|
|
}, 3*time.Second, 100*time.Millisecond, "did not receive expected warnings")
|
|
|
|
var found bool
|
|
for _, w := range logger.Warnings() {
|
|
found = found || strings.Contains(w, netflow.ErrorTemplateNotFound.Error())
|
|
}
|
|
require.True(t, found, "warning not found")
|
|
}
|
|
|
|
func TestWrongMapping(t *testing.T) {
|
|
var logger testutil.CaptureLogger
|
|
plugin := &NetFlow{
|
|
ServiceAddress: "udp://127.0.0.1:0",
|
|
Protocol: "ipfix",
|
|
PENFiles: []string{"testcases/netflow_mapping.csv"},
|
|
Log: &logger,
|
|
}
|
|
require.ErrorContains(t, plugin.Init(), "does not match pattern")
|
|
}
|
|
|
|
func TestCases(t *testing.T) {
|
|
// Get all directories in testdata
|
|
folders, err := os.ReadDir("testcases")
|
|
require.NoError(t, err)
|
|
|
|
// Register the plugin
|
|
inputs.Add("netflow", func() telegraf.Input {
|
|
return &NetFlow{}
|
|
})
|
|
|
|
// Prepare the influx parser for expectations
|
|
parser := &influx.Parser{}
|
|
require.NoError(t, parser.Init())
|
|
|
|
for _, f := range folders {
|
|
// Only handle folders
|
|
if !f.IsDir() {
|
|
continue
|
|
}
|
|
testcasePath := filepath.Join("testcases", f.Name())
|
|
configFilename := filepath.Join(testcasePath, "telegraf.conf")
|
|
inputFiles := filepath.Join(testcasePath, "*.bin")
|
|
expectedFilename := filepath.Join(testcasePath, "expected.out")
|
|
expectedErrorFilename := filepath.Join(testcasePath, "expected.err")
|
|
|
|
// Compare options
|
|
options := []cmp.Option{
|
|
testutil.IgnoreTime(),
|
|
testutil.SortMetrics(),
|
|
}
|
|
|
|
t.Run(f.Name(), func(t *testing.T) {
|
|
// Read the input data
|
|
var messages [][]byte
|
|
matches, err := filepath.Glob(inputFiles)
|
|
require.NoError(t, err)
|
|
require.NotEmpty(t, matches)
|
|
sort.Strings(matches)
|
|
for _, fn := range matches {
|
|
m, err := os.ReadFile(fn)
|
|
require.NoError(t, err)
|
|
messages = append(messages, m)
|
|
}
|
|
|
|
// Read the expected output if any
|
|
var expected []telegraf.Metric
|
|
if _, err := os.Stat(expectedFilename); err == nil {
|
|
var err error
|
|
expected, err = testutil.ParseMetricsFromFile(expectedFilename, parser)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// Read the expected output if any
|
|
var expectedErrors []string
|
|
if _, err := os.Stat(expectedErrorFilename); err == nil {
|
|
var err error
|
|
expectedErrors, err = testutil.ParseLinesFromFile(expectedErrorFilename)
|
|
require.NoError(t, err)
|
|
require.NotEmpty(t, expectedErrors)
|
|
}
|
|
|
|
// Configure the plugin
|
|
cfg := config.NewConfig()
|
|
require.NoError(t, cfg.LoadConfig(configFilename))
|
|
require.Len(t, cfg.Inputs, 1)
|
|
|
|
// Setup and start the plugin
|
|
var acc testutil.Accumulator
|
|
plugin := cfg.Inputs[0].Input.(*NetFlow)
|
|
require.NoError(t, plugin.Init())
|
|
require.NoError(t, plugin.Start(&acc))
|
|
defer plugin.Stop()
|
|
|
|
// Create a client without TLS
|
|
addr := plugin.conn.LocalAddr()
|
|
client, err := createClient(plugin.ServiceAddress, addr)
|
|
require.NoError(t, err)
|
|
|
|
// Write the given sequence
|
|
for i, msg := range messages {
|
|
_, err := client.Write(msg)
|
|
require.NoErrorf(t, err, "writing message from %q failed: %v", matches[i], err)
|
|
}
|
|
require.NoError(t, client.Close())
|
|
|
|
getNErrors := func() int {
|
|
acc.Lock()
|
|
defer acc.Unlock()
|
|
return len(acc.Errors)
|
|
}
|
|
require.Eventuallyf(t, func() bool {
|
|
return getNErrors() >= len(expectedErrors)
|
|
}, 3*time.Second, 100*time.Millisecond, "did not receive errors (%d/%d)", getNErrors(), len(expectedErrors))
|
|
|
|
require.Lenf(t, acc.Errors, len(expectedErrors), "got errors: %v", acc.Errors)
|
|
sort.SliceStable(acc.Errors, func(i, j int) bool {
|
|
return acc.Errors[i].Error() < acc.Errors[j].Error()
|
|
})
|
|
for i, err := range acc.Errors {
|
|
require.ErrorContains(t, err, expectedErrors[i])
|
|
}
|
|
|
|
require.Eventuallyf(t, func() bool {
|
|
acc.Lock()
|
|
defer acc.Unlock()
|
|
return acc.NMetrics() >= uint64(len(expected))
|
|
}, 3*time.Second, 100*time.Millisecond, "did not receive metrics (%d/%d)", acc.NMetrics(), len(expected))
|
|
|
|
// Check the metric nevertheless as we might get some metrics despite errors.
|
|
actual := acc.GetTelegrafMetrics()
|
|
testutil.RequireMetricsEqual(t, expected, actual, options...)
|
|
})
|
|
}
|
|
}
|
|
|
|
func createClient(endpoint string, addr net.Addr) (net.Conn, error) {
|
|
// Determine the protocol in a crude fashion
|
|
parts := strings.SplitN(endpoint, "://", 2)
|
|
if len(parts) != 2 {
|
|
return nil, fmt.Errorf("invalid endpoint %q", endpoint)
|
|
}
|
|
protocol := parts[0]
|
|
return net.Dial(protocol, addr.String())
|
|
}
|