1273 lines
29 KiB
Go
1273 lines
29 KiB
Go
|
package zabbix
|
||
|
|
||
|
import (
|
||
|
"encoding/binary"
|
||
|
"encoding/json"
|
||
|
"errors"
|
||
|
"net"
|
||
|
"os"
|
||
|
"path/filepath"
|
||
|
"sort"
|
||
|
"strings"
|
||
|
"sync"
|
||
|
"sync/atomic"
|
||
|
"testing"
|
||
|
"time"
|
||
|
|
||
|
"github.com/datadope-io/go-zabbix/v2"
|
||
|
"github.com/stretchr/testify/require"
|
||
|
|
||
|
"github.com/influxdata/telegraf"
|
||
|
"github.com/influxdata/telegraf/config"
|
||
|
"github.com/influxdata/telegraf/metric"
|
||
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||
|
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||
|
"github.com/influxdata/telegraf/testutil"
|
||
|
)
|
||
|
|
||
|
func TestSuccessfulReceive(t *testing.T) {
|
||
|
hostname, err := os.Hostname()
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
tests := []struct {
|
||
|
name string
|
||
|
prefix string
|
||
|
agentActive bool
|
||
|
skipMeasurementPrefix bool
|
||
|
input []telegraf.Metric
|
||
|
expected []zabbix.Packet
|
||
|
}{
|
||
|
{
|
||
|
name: "send one metric with one field and no extra tags, generates one zabbix metric",
|
||
|
input: []telegraf.Metric{
|
||
|
metric.New(
|
||
|
"name",
|
||
|
map[string]string{
|
||
|
"host": "hostname",
|
||
|
},
|
||
|
map[string]interface{}{
|
||
|
"value": int64(0),
|
||
|
},
|
||
|
time.Unix(1522082244, 0),
|
||
|
),
|
||
|
},
|
||
|
expected: []zabbix.Packet{
|
||
|
{
|
||
|
Request: "sender data",
|
||
|
Data: []*zabbix.Metric{
|
||
|
{
|
||
|
Host: "hostname",
|
||
|
Key: "name.value",
|
||
|
Value: "0",
|
||
|
Clock: 1522082244,
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
name: "string values representing a float number should be sent in the exact same format",
|
||
|
input: []telegraf.Metric{
|
||
|
metric.New(
|
||
|
"name",
|
||
|
map[string]string{
|
||
|
"host": "hostname",
|
||
|
},
|
||
|
map[string]interface{}{
|
||
|
"value": "3.1415",
|
||
|
},
|
||
|
time.Unix(1522082244, 0),
|
||
|
),
|
||
|
},
|
||
|
expected: []zabbix.Packet{
|
||
|
{
|
||
|
Request: "sender data",
|
||
|
Data: []*zabbix.Metric{
|
||
|
{
|
||
|
Host: "hostname",
|
||
|
Key: "name.value",
|
||
|
Value: "3.1415",
|
||
|
Clock: 1522082244,
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
name: "send one metric with one string field and no extra tags, generates one zabbix metric",
|
||
|
input: []telegraf.Metric{
|
||
|
metric.New(
|
||
|
"name",
|
||
|
map[string]string{
|
||
|
"host": "hostname",
|
||
|
},
|
||
|
map[string]interface{}{
|
||
|
"value": "some value",
|
||
|
},
|
||
|
time.Unix(1522082244, 0),
|
||
|
),
|
||
|
},
|
||
|
expected: []zabbix.Packet{
|
||
|
{
|
||
|
Request: "sender data",
|
||
|
Data: []*zabbix.Metric{
|
||
|
{
|
||
|
Host: "hostname",
|
||
|
Key: "name.value",
|
||
|
Value: "some value",
|
||
|
Clock: 1522082244,
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
name: "boolean values are converted to 1 (true) or 0 (false)",
|
||
|
input: []telegraf.Metric{
|
||
|
metric.New(
|
||
|
"name",
|
||
|
map[string]string{
|
||
|
"host": "hostname",
|
||
|
},
|
||
|
map[string]interface{}{
|
||
|
"valueTrue": true,
|
||
|
"valueFalse": false,
|
||
|
},
|
||
|
time.Unix(1522082244, 0),
|
||
|
),
|
||
|
},
|
||
|
expected: []zabbix.Packet{
|
||
|
{
|
||
|
Request: "sender data",
|
||
|
Data: []*zabbix.Metric{
|
||
|
{
|
||
|
Host: "hostname",
|
||
|
Key: "name.valueTrue",
|
||
|
Value: "true",
|
||
|
Clock: 1522082244,
|
||
|
},
|
||
|
{
|
||
|
Host: "hostname",
|
||
|
Key: "name.valueFalse",
|
||
|
Value: "false",
|
||
|
Clock: 1522082244,
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
name: "metrics without host tag use the system hostname",
|
||
|
input: []telegraf.Metric{
|
||
|
metric.New(
|
||
|
"name",
|
||
|
map[string]string{},
|
||
|
map[string]interface{}{
|
||
|
"value": "x",
|
||
|
},
|
||
|
time.Unix(1522082244, 0),
|
||
|
),
|
||
|
},
|
||
|
expected: []zabbix.Packet{
|
||
|
{
|
||
|
Request: "sender data",
|
||
|
Data: []*zabbix.Metric{
|
||
|
{
|
||
|
Host: hostname,
|
||
|
Key: "name.value",
|
||
|
Value: "x",
|
||
|
Clock: 1522082244,
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
name: "send one metric with extra tags, zabbix metric should be generated with a parameter",
|
||
|
input: []telegraf.Metric{
|
||
|
metric.New(
|
||
|
"name",
|
||
|
map[string]string{
|
||
|
"host": "hostname",
|
||
|
"foo": "bar",
|
||
|
},
|
||
|
map[string]interface{}{
|
||
|
"value": int64(0),
|
||
|
},
|
||
|
time.Unix(1522082244, 0),
|
||
|
),
|
||
|
},
|
||
|
expected: []zabbix.Packet{
|
||
|
{
|
||
|
Request: "sender data",
|
||
|
Data: []*zabbix.Metric{
|
||
|
{
|
||
|
Host: "hostname",
|
||
|
Key: "name.value[bar]",
|
||
|
Value: "0",
|
||
|
Clock: 1522082244,
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
name: "send one metric with two extra tags, zabbix parameters should be alphabetically ordered",
|
||
|
input: []telegraf.Metric{
|
||
|
metric.New(
|
||
|
"name",
|
||
|
map[string]string{
|
||
|
"host": "hostname",
|
||
|
"zparam": "last",
|
||
|
"aparam": "first",
|
||
|
},
|
||
|
map[string]interface{}{
|
||
|
"value": int64(0),
|
||
|
},
|
||
|
time.Unix(1522082244, 0),
|
||
|
),
|
||
|
},
|
||
|
expected: []zabbix.Packet{
|
||
|
{
|
||
|
Request: "sender data",
|
||
|
Data: []*zabbix.Metric{
|
||
|
{
|
||
|
Host: "hostname",
|
||
|
Key: "name.value[first,last]",
|
||
|
Value: "0",
|
||
|
Clock: 1522082244,
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
name: "send one metric with two fields and no extra tags, generates two zabbix metrics",
|
||
|
input: []telegraf.Metric{
|
||
|
metric.New(
|
||
|
"name",
|
||
|
map[string]string{
|
||
|
"host": "hostname",
|
||
|
},
|
||
|
map[string]interface{}{
|
||
|
"valueA": int64(0),
|
||
|
"valueB": int64(1),
|
||
|
},
|
||
|
time.Unix(1522082244, 0),
|
||
|
),
|
||
|
},
|
||
|
expected: []zabbix.Packet{
|
||
|
{
|
||
|
Request: "sender data",
|
||
|
Data: []*zabbix.Metric{
|
||
|
{
|
||
|
Host: "hostname",
|
||
|
Key: "name.valueA",
|
||
|
Value: "0",
|
||
|
Clock: 1522082244,
|
||
|
},
|
||
|
{
|
||
|
Host: "hostname",
|
||
|
Key: "name.valueB",
|
||
|
Value: "1",
|
||
|
Clock: 1522082244,
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
name: "send two metrics with one field and no extra tags, generates two zabbix metrics",
|
||
|
input: []telegraf.Metric{
|
||
|
metric.New(
|
||
|
"nameA",
|
||
|
map[string]string{
|
||
|
"host": "hostname",
|
||
|
},
|
||
|
map[string]interface{}{
|
||
|
"value": int64(0),
|
||
|
},
|
||
|
time.Unix(1522082244, 0),
|
||
|
),
|
||
|
metric.New(
|
||
|
"nameB",
|
||
|
map[string]string{
|
||
|
"host": "hostname",
|
||
|
},
|
||
|
map[string]interface{}{
|
||
|
"value": int64(0),
|
||
|
},
|
||
|
time.Unix(1522082244, 0),
|
||
|
),
|
||
|
},
|
||
|
expected: []zabbix.Packet{
|
||
|
{
|
||
|
Request: "sender data",
|
||
|
Data: []*zabbix.Metric{
|
||
|
{
|
||
|
Host: "hostname",
|
||
|
Key: "nameA.value",
|
||
|
Value: "0",
|
||
|
Clock: 1522082244,
|
||
|
},
|
||
|
{
|
||
|
Host: "hostname",
|
||
|
Key: "nameB.value",
|
||
|
Value: "0",
|
||
|
Clock: 1522082244,
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
name: "send two metrics with different hostname, generates two zabbix metrics for different hosts",
|
||
|
input: []telegraf.Metric{
|
||
|
metric.New(
|
||
|
"name",
|
||
|
map[string]string{
|
||
|
"host": "hostnameA",
|
||
|
},
|
||
|
map[string]interface{}{
|
||
|
"value": int64(0),
|
||
|
},
|
||
|
time.Unix(1522082244, 0),
|
||
|
),
|
||
|
metric.New(
|
||
|
"name",
|
||
|
map[string]string{
|
||
|
"host": "hostnameB",
|
||
|
},
|
||
|
map[string]interface{}{
|
||
|
"value": int64(0),
|
||
|
},
|
||
|
time.Unix(1522082244, 0),
|
||
|
),
|
||
|
},
|
||
|
expected: []zabbix.Packet{
|
||
|
{
|
||
|
Request: "sender data",
|
||
|
Data: []*zabbix.Metric{
|
||
|
{
|
||
|
Host: "hostnameA",
|
||
|
Key: "name.value",
|
||
|
Value: "0",
|
||
|
Clock: 1522082244,
|
||
|
},
|
||
|
{
|
||
|
Host: "hostnameB",
|
||
|
Key: "name.value",
|
||
|
Value: "0",
|
||
|
Clock: 1522082244,
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
name: "if key_prefix is configured, zabbix metrics should have that prefix in the key",
|
||
|
prefix: "telegraf.",
|
||
|
input: []telegraf.Metric{
|
||
|
metric.New(
|
||
|
"name",
|
||
|
map[string]string{
|
||
|
"host": "hostname",
|
||
|
},
|
||
|
map[string]interface{}{
|
||
|
"value": int64(0),
|
||
|
},
|
||
|
time.Unix(1522082244, 0),
|
||
|
),
|
||
|
},
|
||
|
expected: []zabbix.Packet{
|
||
|
{
|
||
|
Request: "sender data",
|
||
|
Data: []*zabbix.Metric{
|
||
|
{
|
||
|
Host: "hostname",
|
||
|
Key: "telegraf.name.value",
|
||
|
Value: "0",
|
||
|
Clock: 1522082244,
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
name: "if skip_measurement_prefix is configured, zabbix metrics should have to skip that prefix in the key",
|
||
|
skipMeasurementPrefix: true,
|
||
|
input: []telegraf.Metric{
|
||
|
metric.New(
|
||
|
"name",
|
||
|
map[string]string{
|
||
|
"host": "hostname",
|
||
|
},
|
||
|
map[string]interface{}{
|
||
|
"value": int64(0),
|
||
|
},
|
||
|
time.Unix(1522082244, 0),
|
||
|
),
|
||
|
},
|
||
|
expected: []zabbix.Packet{
|
||
|
{
|
||
|
Request: "sender data",
|
||
|
Data: []*zabbix.Metric{
|
||
|
{
|
||
|
Host: "hostname",
|
||
|
Key: "value",
|
||
|
Value: "0",
|
||
|
Clock: 1522082244,
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
name: "if AgentActive is configured, zabbix metrics should be sent respecting that protocol",
|
||
|
agentActive: true,
|
||
|
input: []telegraf.Metric{
|
||
|
metric.New(
|
||
|
"name",
|
||
|
map[string]string{
|
||
|
"host": "hostname",
|
||
|
},
|
||
|
map[string]interface{}{
|
||
|
"value": int64(0),
|
||
|
},
|
||
|
time.Unix(1522082244, 0),
|
||
|
),
|
||
|
},
|
||
|
expected: []zabbix.Packet{
|
||
|
{
|
||
|
Request: "agent data",
|
||
|
Data: []*zabbix.Metric{
|
||
|
{
|
||
|
Host: "hostname",
|
||
|
Key: "name.value",
|
||
|
Value: "0",
|
||
|
Clock: 1522082244,
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
name: "metrics should be time sorted, oldest to newest, to avoid zabbix doing extra work when generating trends",
|
||
|
input: []telegraf.Metric{
|
||
|
metric.New(
|
||
|
"name",
|
||
|
map[string]string{
|
||
|
"host": "hostnameD",
|
||
|
},
|
||
|
map[string]interface{}{
|
||
|
"value": int64(0),
|
||
|
},
|
||
|
time.Unix(4444444444, 0),
|
||
|
),
|
||
|
metric.New(
|
||
|
"name",
|
||
|
map[string]string{
|
||
|
"host": "hostnameC",
|
||
|
},
|
||
|
map[string]interface{}{
|
||
|
"value": int64(0),
|
||
|
},
|
||
|
time.Unix(3333333333, 0),
|
||
|
),
|
||
|
metric.New(
|
||
|
"name",
|
||
|
map[string]string{
|
||
|
"host": "hostnameA",
|
||
|
},
|
||
|
map[string]interface{}{
|
||
|
"value": int64(0),
|
||
|
},
|
||
|
time.Unix(1111111111, 0),
|
||
|
),
|
||
|
metric.New(
|
||
|
"name",
|
||
|
map[string]string{
|
||
|
"host": "hostnameB",
|
||
|
},
|
||
|
map[string]interface{}{
|
||
|
"value": int64(0),
|
||
|
},
|
||
|
time.Unix(2222222222, 0),
|
||
|
),
|
||
|
},
|
||
|
expected: []zabbix.Packet{
|
||
|
{
|
||
|
Request: "sender data",
|
||
|
Data: []*zabbix.Metric{
|
||
|
{
|
||
|
Host: "hostnameA",
|
||
|
Key: "name.value",
|
||
|
Value: "0",
|
||
|
Clock: 1111111111,
|
||
|
},
|
||
|
{
|
||
|
Host: "hostnameB",
|
||
|
Key: "name.value",
|
||
|
Value: "0",
|
||
|
Clock: 2222222222,
|
||
|
},
|
||
|
{
|
||
|
Host: "hostnameC",
|
||
|
Key: "name.value",
|
||
|
Value: "0",
|
||
|
Clock: 3333333333,
|
||
|
},
|
||
|
{
|
||
|
Host: "hostnameD",
|
||
|
Key: "name.value",
|
||
|
Value: "0",
|
||
|
Clock: 4444444444,
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
}
|
||
|
|
||
|
for _, tt := range tests {
|
||
|
t.Run(tt.name, func(t *testing.T) {
|
||
|
// Setup a Zabbix mock server and start listening
|
||
|
server, err := newZabbixMockServer()
|
||
|
require.NoError(t, err)
|
||
|
defer server.close()
|
||
|
|
||
|
var wg sync.WaitGroup
|
||
|
wg.Add(1)
|
||
|
go func() {
|
||
|
defer wg.Done()
|
||
|
server.listen()
|
||
|
}()
|
||
|
|
||
|
// Setup the plugin
|
||
|
plugin := &Zabbix{
|
||
|
Address: server.addr(),
|
||
|
KeyPrefix: tt.prefix,
|
||
|
HostTag: "host",
|
||
|
SkipMeasurementPrefix: tt.skipMeasurementPrefix,
|
||
|
AgentActive: tt.agentActive,
|
||
|
LLDSendInterval: config.Duration(10 * time.Minute),
|
||
|
Log: testutil.Logger{},
|
||
|
}
|
||
|
require.NoError(t, plugin.Init())
|
||
|
|
||
|
// Connect and write the data
|
||
|
require.NoError(t, plugin.Connect())
|
||
|
defer plugin.Close()
|
||
|
require.NoError(t, plugin.Write(tt.input))
|
||
|
|
||
|
// Wait for the data to arrive
|
||
|
require.Eventually(t, func() bool {
|
||
|
return server.count.Load() > 0
|
||
|
}, 3*time.Second, 100*time.Millisecond, "nothing received")
|
||
|
|
||
|
// Stop listening
|
||
|
server.listener.Close()
|
||
|
wg.Wait()
|
||
|
|
||
|
// Check the received data
|
||
|
server.Lock()
|
||
|
defer server.Unlock()
|
||
|
|
||
|
require.Empty(t, server.errs, "server had errors")
|
||
|
requireRequestDataEqual(t, tt.expected, server.received, false)
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestInvalidData(t *testing.T) {
|
||
|
input := []telegraf.Metric{
|
||
|
metric.New(
|
||
|
"name",
|
||
|
map[string]string{
|
||
|
"host": "hostname",
|
||
|
},
|
||
|
map[string]interface{}{
|
||
|
"value": []int{1, 2},
|
||
|
},
|
||
|
time.Unix(1522082244, 0),
|
||
|
),
|
||
|
}
|
||
|
|
||
|
// Setup a Zabbix mock server and start listening
|
||
|
server, err := newZabbixMockServer()
|
||
|
require.NoError(t, err)
|
||
|
defer server.close()
|
||
|
|
||
|
var wg sync.WaitGroup
|
||
|
wg.Add(1)
|
||
|
go func() {
|
||
|
defer wg.Done()
|
||
|
server.listen()
|
||
|
}()
|
||
|
|
||
|
// Setup the plugin
|
||
|
plugin := &Zabbix{
|
||
|
Address: server.addr(),
|
||
|
HostTag: "host",
|
||
|
LLDSendInterval: config.Duration(10 * time.Minute),
|
||
|
Log: testutil.Logger{},
|
||
|
}
|
||
|
require.NoError(t, plugin.Init())
|
||
|
|
||
|
// Connect and write the data
|
||
|
require.NoError(t, plugin.Connect())
|
||
|
defer plugin.Close()
|
||
|
require.NoError(t, plugin.Write(input))
|
||
|
require.NoError(t, plugin.Close())
|
||
|
|
||
|
// Stop listening
|
||
|
server.listener.Close()
|
||
|
wg.Wait()
|
||
|
|
||
|
// Check the received data
|
||
|
server.Lock()
|
||
|
defer server.Unlock()
|
||
|
|
||
|
require.Empty(t, server.errs, "server had errors")
|
||
|
require.Empty(t, server.received)
|
||
|
}
|
||
|
|
||
|
// TestLLD tests how LLD metrics are sent simulating the time passing.
|
||
|
// LLD is sent each LLDSendInterval. Only new data.
|
||
|
// LLD data is cleared LLDClearInterval.
|
||
|
func TestLLD(t *testing.T) {
|
||
|
// Telegraf metric which will be sent repeatedly
|
||
|
m := metric.New(
|
||
|
"name",
|
||
|
map[string]string{"host": "hostA", "foo": "bar"},
|
||
|
map[string]interface{}{"value": int64(0)},
|
||
|
time.Unix(0, 0),
|
||
|
)
|
||
|
|
||
|
mNew := metric.New(
|
||
|
"name",
|
||
|
map[string]string{"host": "hostA", "foo": "moo"},
|
||
|
map[string]interface{}{"value": int64(0)},
|
||
|
time.Unix(0, 0),
|
||
|
)
|
||
|
|
||
|
// Expected Zabbix metric generated
|
||
|
expected := []zabbix.Packet{
|
||
|
{
|
||
|
Request: "sender data",
|
||
|
Data: []*zabbix.Metric{
|
||
|
{
|
||
|
Host: "hostA",
|
||
|
Key: "telegraf.name.value[bar]",
|
||
|
Value: "0",
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
Request: "sender data",
|
||
|
Data: []*zabbix.Metric{
|
||
|
{
|
||
|
Host: "hostA",
|
||
|
Key: "telegraf.name.value[bar]",
|
||
|
Value: "0",
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
Request: "sender data",
|
||
|
Data: []*zabbix.Metric{
|
||
|
{
|
||
|
Host: "hostA",
|
||
|
Key: "telegraf.name.value[bar]",
|
||
|
Value: "0",
|
||
|
},
|
||
|
{
|
||
|
Host: "hostA",
|
||
|
Key: "telegraf.lld.name.foo",
|
||
|
Value: `{"data":[{"{#FOO}":"bar"}]}`,
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
Request: "sender data",
|
||
|
Data: []*zabbix.Metric{
|
||
|
{
|
||
|
Host: "hostA",
|
||
|
Key: "telegraf.name.value[bar]",
|
||
|
Value: "0",
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
Request: "sender data",
|
||
|
Data: []*zabbix.Metric{
|
||
|
{
|
||
|
Host: "hostA",
|
||
|
Key: "telegraf.name.value[bar]",
|
||
|
Value: "0",
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
Request: "sender data",
|
||
|
Data: []*zabbix.Metric{
|
||
|
{
|
||
|
Host: "hostA",
|
||
|
Key: "telegraf.name.value[moo]",
|
||
|
Value: "0",
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
Request: "sender data",
|
||
|
Data: []*zabbix.Metric{
|
||
|
{
|
||
|
Host: "hostA",
|
||
|
Key: "telegraf.name.value[bar]",
|
||
|
Value: "0",
|
||
|
},
|
||
|
{
|
||
|
Host: "hostA",
|
||
|
Key: "telegraf.lld.name.foo",
|
||
|
Value: `{"data":[{"{#FOO}":"bar"},{"{#FOO}":"moo"}]}`,
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
Request: "sender data",
|
||
|
Data: []*zabbix.Metric{
|
||
|
{
|
||
|
Host: "hostA",
|
||
|
Key: "telegraf.name.value[bar]",
|
||
|
Value: "0",
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
Request: "sender data",
|
||
|
Data: []*zabbix.Metric{
|
||
|
{
|
||
|
Host: "hostA",
|
||
|
Key: "telegraf.name.value[bar]",
|
||
|
Value: "0",
|
||
|
},
|
||
|
{
|
||
|
Host: "hostA",
|
||
|
Key: "telegraf.lld.name.foo",
|
||
|
Value: `{"data":[{"{#FOO}":"bar"}]}`,
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
}
|
||
|
|
||
|
// Setup a Zabbix mock server and start listening
|
||
|
server, err := newZabbixMockServer()
|
||
|
require.NoError(t, err)
|
||
|
defer server.close()
|
||
|
|
||
|
var wg sync.WaitGroup
|
||
|
wg.Add(1)
|
||
|
go func() {
|
||
|
defer wg.Done()
|
||
|
server.listen()
|
||
|
}()
|
||
|
|
||
|
// Setup plugin
|
||
|
plugin := &Zabbix{
|
||
|
Address: server.addr(),
|
||
|
KeyPrefix: "telegraf.",
|
||
|
HostTag: "host",
|
||
|
LLDSendInterval: config.Duration(10 * time.Minute),
|
||
|
LLDClearInterval: config.Duration(1 * time.Hour),
|
||
|
Log: testutil.Logger{},
|
||
|
}
|
||
|
require.NoError(t, plugin.Init())
|
||
|
|
||
|
// Connect and write the metrics
|
||
|
require.NoError(t, plugin.Connect())
|
||
|
defer plugin.Close()
|
||
|
|
||
|
// First packet
|
||
|
require.NoError(t, plugin.Write([]telegraf.Metric{m}))
|
||
|
|
||
|
// Second packet, while time has not surpassed LLDSendInterval
|
||
|
require.NoError(t, plugin.Write([]telegraf.Metric{m}))
|
||
|
|
||
|
// Simulate time passing for a new LLD send
|
||
|
plugin.lldLastSend = time.Now().Add(-time.Duration(plugin.LLDSendInterval)).Add(-time.Millisecond)
|
||
|
|
||
|
// Third packet, time has surpassed LLDSendInterval, metrics + LLD
|
||
|
require.NoError(t, plugin.Write([]telegraf.Metric{m}))
|
||
|
|
||
|
// Fourth packet
|
||
|
require.NoError(t, plugin.Write([]telegraf.Metric{m}))
|
||
|
|
||
|
// Simulate time passing for a new LLD send
|
||
|
plugin.lldLastSend = time.Now().Add(-time.Duration(plugin.LLDSendInterval)).Add(-time.Millisecond)
|
||
|
|
||
|
// Fifth packet, time has surpassed LLDSendInterval, metrics. No LLD as there is nothing new.
|
||
|
require.NoError(t, plugin.Write([]telegraf.Metric{m}))
|
||
|
|
||
|
// Sixth packet, new LLD info, but time has not surpassed LLDSendInterval
|
||
|
require.NoError(t, plugin.Write([]telegraf.Metric{mNew}))
|
||
|
|
||
|
// Simulate time passing for LLD clear
|
||
|
plugin.lldLastSend = time.Now().Add(-time.Duration(plugin.LLDClearInterval)).Add(-time.Millisecond)
|
||
|
|
||
|
// Seventh packet, time has surpassed LLDSendInterval and LLDClearInterval, metrics + LLD.
|
||
|
// LLD will be cleared.
|
||
|
require.NoError(t, plugin.Write([]telegraf.Metric{m}))
|
||
|
|
||
|
// Eighth packet, time host not surpassed LLDSendInterval, just metrics.
|
||
|
require.NoError(t, plugin.Write([]telegraf.Metric{m}))
|
||
|
|
||
|
// Simulate time passing for a new LLD send
|
||
|
plugin.lldLastSend = time.Now().Add(-time.Duration(plugin.LLDSendInterval)).Add(-time.Millisecond)
|
||
|
|
||
|
// Ninth packet, time has surpassed LLDSendInterval, metrics + LLD.
|
||
|
require.NoError(t, plugin.Write([]telegraf.Metric{m}))
|
||
|
|
||
|
// Wait for the metrics to be received
|
||
|
require.Eventuallyf(t, func() bool {
|
||
|
return server.count.Load() >= uint32(len(expected))
|
||
|
}, 3*time.Second, 100*time.Millisecond, "expected %d got %d", len(expected), server.count.Load())
|
||
|
|
||
|
// Stop listening
|
||
|
require.NoError(t, plugin.Close())
|
||
|
server.listener.Close()
|
||
|
wg.Wait()
|
||
|
|
||
|
// Check the received metrics
|
||
|
server.Lock()
|
||
|
defer server.Unlock()
|
||
|
|
||
|
require.Empty(t, server.errs, "server had errors")
|
||
|
requireRequestDataEqual(t, expected, server.received, true)
|
||
|
}
|
||
|
|
||
|
// TestAutoRegister tests that auto-registration requests are sent to zabbix if enabled
|
||
|
func TestAutoRegister(t *testing.T) {
|
||
|
now := time.Now()
|
||
|
input := []telegraf.Metric{
|
||
|
metric.New(
|
||
|
"name",
|
||
|
map[string]string{"host": "hostA"},
|
||
|
map[string]interface{}{"value": int64(0)},
|
||
|
now,
|
||
|
),
|
||
|
metric.New(
|
||
|
"name",
|
||
|
map[string]string{"host": "hostB"},
|
||
|
map[string]interface{}{"value": int64(42)},
|
||
|
now,
|
||
|
),
|
||
|
}
|
||
|
|
||
|
expected := []zabbix.Packet{
|
||
|
{
|
||
|
Request: "sender data",
|
||
|
Data: []*zabbix.Metric{
|
||
|
{
|
||
|
Host: "hostA",
|
||
|
Key: "telegraf.name.value",
|
||
|
Value: "0",
|
||
|
Clock: now.Unix(),
|
||
|
},
|
||
|
{
|
||
|
Host: "hostB",
|
||
|
Key: "telegraf.name.value",
|
||
|
Value: "42",
|
||
|
Clock: now.Unix(),
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
Request: "active checks",
|
||
|
Host: "hostA",
|
||
|
HostMetadata: "xxx",
|
||
|
},
|
||
|
{
|
||
|
Request: "active checks",
|
||
|
Host: "hostB",
|
||
|
HostMetadata: "xxx",
|
||
|
},
|
||
|
}
|
||
|
|
||
|
// Setup a Zabbix mock server and start listening
|
||
|
server, err := newZabbixMockServer()
|
||
|
require.NoError(t, err)
|
||
|
defer server.close()
|
||
|
|
||
|
var wg sync.WaitGroup
|
||
|
wg.Add(1)
|
||
|
go func() {
|
||
|
defer wg.Done()
|
||
|
server.listen()
|
||
|
}()
|
||
|
|
||
|
// Setup plugin
|
||
|
plugin := &Zabbix{
|
||
|
Address: server.addr(),
|
||
|
KeyPrefix: "telegraf.",
|
||
|
HostTag: "host",
|
||
|
Autoregister: "xxx",
|
||
|
AutoregisterResendInterval: config.Duration(time.Minute * 5),
|
||
|
Log: testutil.Logger{},
|
||
|
}
|
||
|
require.NoError(t, plugin.Init())
|
||
|
|
||
|
// Connect and write the metrics
|
||
|
require.NoError(t, plugin.Connect())
|
||
|
require.NoError(t, plugin.Write(input))
|
||
|
|
||
|
// Wait for the metrics to be received
|
||
|
require.Eventuallyf(t, func() bool {
|
||
|
return server.count.Load() >= uint32(len(expected))
|
||
|
}, 3*time.Second, 100*time.Millisecond, "expected %d got %d", len(expected), server.count.Load())
|
||
|
|
||
|
// Stop listening
|
||
|
require.NoError(t, plugin.Close())
|
||
|
server.listener.Close()
|
||
|
wg.Wait()
|
||
|
|
||
|
// Check the received metrics
|
||
|
server.Lock()
|
||
|
defer server.Unlock()
|
||
|
|
||
|
require.Empty(t, server.errs, "server had errors")
|
||
|
actual := server.received
|
||
|
sort.SliceStable(expected, func(i, j int) bool { return expected[i].Host < expected[j].Host })
|
||
|
sort.SliceStable(actual, func(i, j int) bool { return actual[i].Host < actual[j].Host })
|
||
|
requireRequestDataEqual(t, expected, actual, false)
|
||
|
}
|
||
|
|
||
|
func TestBuildZabbixMetric(t *testing.T) {
|
||
|
keyPrefix := "prefix."
|
||
|
hostTag := "host"
|
||
|
|
||
|
z := &Zabbix{
|
||
|
KeyPrefix: keyPrefix,
|
||
|
HostTag: hostTag,
|
||
|
}
|
||
|
|
||
|
zm, err := z.buildZabbixMetric(metric.New(
|
||
|
|
||
|
"name",
|
||
|
map[string]string{hostTag: "hostA", "foo": "bar", "a": "b"},
|
||
|
map[string]interface{}{},
|
||
|
time.Now()),
|
||
|
"value",
|
||
|
1,
|
||
|
)
|
||
|
require.NoError(t, err)
|
||
|
require.Equal(t, keyPrefix+"name.value[b,bar]", zm.Key)
|
||
|
|
||
|
zm, err = z.buildZabbixMetric(metric.New(
|
||
|
|
||
|
"name",
|
||
|
map[string]string{hostTag: "hostA"},
|
||
|
map[string]interface{}{},
|
||
|
time.Now()),
|
||
|
"value",
|
||
|
1,
|
||
|
)
|
||
|
require.NoError(t, err)
|
||
|
require.Equal(t, keyPrefix+"name.value", zm.Key)
|
||
|
}
|
||
|
|
||
|
func TestGetHostname(t *testing.T) {
|
||
|
hostname, err := os.Hostname()
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
tests := map[string]struct {
|
||
|
HostTag string
|
||
|
Host string
|
||
|
Tags map[string]string
|
||
|
Result string
|
||
|
}{
|
||
|
"metric with host tag": {
|
||
|
HostTag: "host",
|
||
|
Tags: map[string]string{
|
||
|
"host": "bar",
|
||
|
},
|
||
|
Result: "bar",
|
||
|
},
|
||
|
"metric with host tag changed": {
|
||
|
HostTag: "source",
|
||
|
Tags: map[string]string{
|
||
|
"source": "bar",
|
||
|
},
|
||
|
Result: "bar",
|
||
|
},
|
||
|
"metric with no host tag": {
|
||
|
Tags: map[string]string{},
|
||
|
Result: hostname,
|
||
|
},
|
||
|
}
|
||
|
|
||
|
for desc, test := range tests {
|
||
|
t.Run(desc, func(t *testing.T) {
|
||
|
metric := metric.New(
|
||
|
|
||
|
"name",
|
||
|
test.Tags,
|
||
|
map[string]interface{}{},
|
||
|
time.Now(),
|
||
|
)
|
||
|
|
||
|
host, err := getHostname(test.HostTag, metric)
|
||
|
require.NoError(t, err)
|
||
|
require.Equal(t, test.Result, host)
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestCases(t *testing.T) {
|
||
|
if testing.Short() {
|
||
|
t.Skip("Skipping integration test in short mode")
|
||
|
}
|
||
|
|
||
|
// Get all testcase directories
|
||
|
folders, err := os.ReadDir("testcases")
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
// Register the plugin
|
||
|
outputs.Add("zabbix", func() telegraf.Output {
|
||
|
return &Zabbix{
|
||
|
KeyPrefix: "telegraf.",
|
||
|
HostTag: "host",
|
||
|
AutoregisterResendInterval: config.Duration(time.Minute * 30),
|
||
|
LLDSendInterval: config.Duration(time.Minute * 10),
|
||
|
LLDClearInterval: config.Duration(time.Hour),
|
||
|
}
|
||
|
})
|
||
|
|
||
|
for _, f := range folders {
|
||
|
// Only handle folders
|
||
|
if !f.IsDir() {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
t.Run(f.Name(), func(t *testing.T) {
|
||
|
testcasePath := filepath.Join("testcases", f.Name())
|
||
|
configFilename := filepath.Join(testcasePath, "telegraf.conf")
|
||
|
inputFilename := filepath.Join(testcasePath, "input.influx")
|
||
|
expectedFilename := filepath.Join(testcasePath, "expected.out")
|
||
|
expectedErrorFilename := filepath.Join(testcasePath, "expected.err")
|
||
|
|
||
|
// Get parser to parse input and expected output
|
||
|
parser := &influx.Parser{}
|
||
|
require.NoError(t, parser.Init())
|
||
|
|
||
|
// Load the input data
|
||
|
input, err := testutil.ParseMetricsFromFile(inputFilename, parser)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
// Read the expected output if any
|
||
|
var expected []zabbix.Packet
|
||
|
if _, err := os.Stat(expectedFilename); err == nil {
|
||
|
buf, err := os.ReadFile(expectedFilename)
|
||
|
require.NoError(t, err)
|
||
|
require.NoError(t, json.Unmarshal(buf, &expected))
|
||
|
}
|
||
|
|
||
|
// Read the expected output if any
|
||
|
var expectedError string
|
||
|
if _, err := os.Stat(expectedErrorFilename); err == nil {
|
||
|
expectedErrors, err := testutil.ParseLinesFromFile(expectedErrorFilename)
|
||
|
require.NoError(t, err)
|
||
|
require.Len(t, expectedErrors, 1)
|
||
|
expectedError = expectedErrors[0]
|
||
|
}
|
||
|
|
||
|
// Configure the plugin
|
||
|
cfg := config.NewConfig()
|
||
|
require.NoError(t, cfg.LoadConfig(configFilename))
|
||
|
require.Len(t, cfg.Outputs, 1)
|
||
|
|
||
|
// Setup a Zabbix mock server and start listening
|
||
|
server, err := newZabbixMockServer()
|
||
|
require.NoError(t, err)
|
||
|
defer server.close()
|
||
|
|
||
|
var wg sync.WaitGroup
|
||
|
wg.Add(1)
|
||
|
go func() {
|
||
|
defer wg.Done()
|
||
|
server.listen()
|
||
|
}()
|
||
|
defer server.listener.Close()
|
||
|
|
||
|
// Setup the plugin
|
||
|
plugin := cfg.Outputs[0].Output.(*Zabbix)
|
||
|
plugin.Address = server.addr()
|
||
|
plugin.Log = testutil.Logger{}
|
||
|
require.NoError(t, plugin.Init())
|
||
|
|
||
|
// Connect and write the metric(s)
|
||
|
require.NoError(t, plugin.Connect())
|
||
|
defer plugin.Close()
|
||
|
|
||
|
err = plugin.Write(input)
|
||
|
if expectedError != "" {
|
||
|
require.ErrorContains(t, err, expectedError)
|
||
|
return
|
||
|
}
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
// Wait for the data to arrive
|
||
|
require.Eventuallyf(t, func() bool {
|
||
|
return server.count.Load() >= uint32(len(expected))
|
||
|
}, 3*time.Second, 100*time.Millisecond, "expected %d got %d", len(expected), server.count.Load())
|
||
|
|
||
|
server.listener.Close()
|
||
|
wg.Wait()
|
||
|
|
||
|
// Check the received data
|
||
|
server.Lock()
|
||
|
defer server.Unlock()
|
||
|
require.Empty(t, server.errs, "server had errors")
|
||
|
requireRequestDataEqual(t, expected, server.received, false)
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type zabbixMockServer struct {
|
||
|
listener net.Listener
|
||
|
|
||
|
received []zabbix.Packet
|
||
|
errs []error
|
||
|
count atomic.Uint32
|
||
|
sync.Mutex
|
||
|
}
|
||
|
|
||
|
func newZabbixMockServer() (*zabbixMockServer, error) {
|
||
|
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
return &zabbixMockServer{listener: l}, nil
|
||
|
}
|
||
|
|
||
|
func (s *zabbixMockServer) addr() string {
|
||
|
return s.listener.Addr().String()
|
||
|
}
|
||
|
|
||
|
func (s *zabbixMockServer) close() error {
|
||
|
if s.listener != nil {
|
||
|
return s.listener.Close()
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *zabbixMockServer) listen() {
|
||
|
for {
|
||
|
request, err := s.handle()
|
||
|
if err != nil {
|
||
|
if !errors.Is(err, net.ErrClosed) {
|
||
|
s.Lock()
|
||
|
s.errs = append(s.errs, err)
|
||
|
s.Unlock()
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
s.Lock()
|
||
|
s.received = append(s.received, request)
|
||
|
s.Unlock()
|
||
|
s.count.Store(uint32(len(s.received)))
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *zabbixMockServer) handle() (zabbix.Packet, error) {
|
||
|
conn, err := s.listener.Accept()
|
||
|
if err != nil {
|
||
|
return zabbix.Packet{}, err
|
||
|
}
|
||
|
defer conn.Close()
|
||
|
|
||
|
if err := conn.SetDeadline(time.Now().Add(time.Second)); err != nil {
|
||
|
return zabbix.Packet{}, err
|
||
|
}
|
||
|
|
||
|
// Obtain request from the mock zabbix server
|
||
|
// Read protocol header and version
|
||
|
header := make([]byte, 5)
|
||
|
if _, err := conn.Read(header); err != nil {
|
||
|
return zabbix.Packet{}, err
|
||
|
}
|
||
|
|
||
|
// Read data length
|
||
|
dataLengthRaw := make([]byte, 8)
|
||
|
if _, err := conn.Read(dataLengthRaw); err != nil {
|
||
|
return zabbix.Packet{}, err
|
||
|
}
|
||
|
dataLength := binary.LittleEndian.Uint64(dataLengthRaw)
|
||
|
|
||
|
// Read data content
|
||
|
content := make([]byte, dataLength)
|
||
|
if _, err := conn.Read(content); err != nil {
|
||
|
return zabbix.Packet{}, err
|
||
|
}
|
||
|
|
||
|
// The zabbix output checks that there are not errors
|
||
|
// Simulated response from the server
|
||
|
resp := []byte("ZBXD\x01\x00\x00\x00\x00\x00\x00\x00\x00{\"response\": \"success\", \"info\": \"\"}\n")
|
||
|
if _, err := conn.Write(resp); err != nil {
|
||
|
return zabbix.Packet{}, err
|
||
|
}
|
||
|
|
||
|
// Strip zabbix header and get JSON request
|
||
|
var request zabbix.Packet
|
||
|
if err := json.Unmarshal(content, &request); err != nil {
|
||
|
return zabbix.Packet{}, err
|
||
|
}
|
||
|
|
||
|
return request, nil
|
||
|
}
|
||
|
|
||
|
type lldValue struct {
|
||
|
Data []map[string]string `json:"data"`
|
||
|
}
|
||
|
|
||
|
func requireRequestDataEqual(t *testing.T, expected, actual []zabbix.Packet, ignoreClock bool) {
|
||
|
t.Helper()
|
||
|
require.Len(t, actual, len(expected))
|
||
|
for i, expectedReq := range expected {
|
||
|
actualReq := actual[i]
|
||
|
require.Equalf(t, expectedReq.Request, actualReq.Request, "different request types in request %d", i)
|
||
|
require.Equalf(t, expectedReq.Host, actualReq.Host, "different host in request %d", i)
|
||
|
require.Equalf(t, expectedReq.HostMetadata, actualReq.HostMetadata, "different hostmetadata in request %d", i)
|
||
|
|
||
|
// Check the elements
|
||
|
require.Len(t, actualReq.Data, len(expectedReq.Data))
|
||
|
|
||
|
less := func(a, b *zabbix.Metric) bool {
|
||
|
if a.Key == b.Key {
|
||
|
if a.Clock == b.Clock {
|
||
|
return a.Value < b.Value
|
||
|
}
|
||
|
return a.Clock < b.Clock
|
||
|
}
|
||
|
return a.Key < b.Key
|
||
|
}
|
||
|
sort.SliceStable(actualReq.Data, func(i, j int) bool { return less(actualReq.Data[i], actualReq.Data[j]) })
|
||
|
sort.SliceStable(expectedReq.Data, func(i, j int) bool { return less(expectedReq.Data[i], expectedReq.Data[j]) })
|
||
|
for j, expectedData := range expectedReq.Data {
|
||
|
actualData := actualReq.Data[j]
|
||
|
require.Equalf(t, expectedData.Key, actualData.Key, "different key in request %d, data %d", i, j)
|
||
|
require.Equalf(t, expectedData.Host, actualData.Host, "different host in request %d, data %d", i, j)
|
||
|
if !ignoreClock {
|
||
|
require.Equalf(t, expectedData.Clock, actualData.Clock, "different clock in request %d, data %d", i, j)
|
||
|
}
|
||
|
if strings.HasPrefix(expectedData.Value, "{") {
|
||
|
var actualValue, expectedValue lldValue
|
||
|
require.NoError(t, json.Unmarshal([]byte(actualData.Value), &actualValue))
|
||
|
require.NoError(t, json.Unmarshal([]byte(expectedData.Value), &expectedValue))
|
||
|
require.ElementsMatchf(t, expectedValue.Data, actualValue.Data, "different value in request %d, data %d", i, j)
|
||
|
} else {
|
||
|
require.Equalf(t, expectedData.Value, actualData.Value, "different value in request %d, data %d", i, j)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|