1
0
Fork 0
telegraf/plugins/outputs/zabbix/zabbix.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

240 lines
7.3 KiB
Go

//go:generate ../../../tools/readme_config_includer/generator
package zabbix
import (
_ "embed"
"fmt"
"net"
"os"
"sort"
"strings"
"time"
"github.com/datadope-io/go-zabbix/v2"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
)
// zabbixSender is an interface to send autoregister data to Zabbix.
// It is implemented by Zabbix.Sender.
// Created to be able to mock Zabbix.Sender in tests.
type zabbixSender interface {
Send(packet *zabbix.Packet) (res zabbix.Response, err error)
SendMetrics(metrics []*zabbix.Metric) (resActive zabbix.Response, resTrapper zabbix.Response, err error)
RegisterHost(hostname string, hostMetadata string) error
}
// Zabbix allows pushing metrics to Zabbix software
type Zabbix struct {
Address string `toml:"address"`
AgentActive bool `toml:"agent_active"`
KeyPrefix string `toml:"key_prefix"`
HostTag string `toml:"host_tag"`
SkipMeasurementPrefix bool `toml:"skip_measurement_prefix"`
LLDSendInterval config.Duration `toml:"lld_send_interval"`
LLDClearInterval config.Duration `toml:"lld_clear_interval"`
Autoregister string `toml:"autoregister"`
AutoregisterResendInterval config.Duration `toml:"autoregister_resend_interval"`
Log telegraf.Logger `toml:"-"`
// lldHandler handles low level discovery data
lldHandler zabbixLLD
// lldLastSend store the last LLD send to known where to send it again
lldLastSend time.Time
// autoregisterLastSend stores the last time autoregister data was sent to Zabbix for each host.
autoregisterLastSend map[string]time.Time
// sender is the interface to send data to Zabbix.
sender zabbixSender
}
//go:embed sample.conf
var sampleConfig string
func (*Zabbix) SampleConfig() string {
return sampleConfig
}
// Connect does nothing, Write() would initiate connection in each call.
// Checking if Zabbix server is alive in this step does not allow Telegraf
// to start if there is a temporal connection problem with the server.
func (*Zabbix) Connect() error {
return nil
}
// Init initializes LLD and autoregister maps. Copy config values to them. Configure Logger.
func (z *Zabbix) Init() error {
// Add port to address if not present
if _, _, err := net.SplitHostPort(z.Address); err != nil {
z.Address = net.JoinHostPort(z.Address, "10051")
}
z.sender = zabbix.NewSender(z.Address)
// Initialize autoregisterLastSend map with size one, as the most common scenario is to have one host.
z.autoregisterLastSend = make(map[string]time.Time, 1)
z.lldLastSend = time.Now()
z.lldHandler = zabbixLLD{
log: z.Log,
hostTag: z.HostTag,
clearInterval: z.LLDClearInterval,
lastClear: time.Now(),
current: make(map[uint64]lldInfo, 100),
}
return nil
}
func (*Zabbix) Close() error {
return nil
}
// Write sends metrics to Zabbix server
func (z *Zabbix) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}
zbxMetrics := make([]*zabbix.Metric, 0, len(metrics))
for _, metric := range metrics {
hostname, err := getHostname(z.HostTag, metric)
if err != nil {
z.Log.Errorf("Error getting hostname for metric %v: %v", metric, err)
continue
}
zbxMetrics = append(zbxMetrics, z.processMetric(metric)...)
// Handle hostname for autoregister
z.autoregisterAdd(hostname)
// Process LLD data
err = z.lldHandler.Add(metric)
if err != nil {
z.Log.Errorf("Error processing LLD for metric %v: %v", metric, err)
}
}
// Send LLD data if enough time has passed
if time.Since(z.lldLastSend) > time.Duration(z.LLDSendInterval) {
z.lldLastSend = time.Now()
for _, lldMetric := range z.lldHandler.Push() {
zbxMetrics = append(zbxMetrics, z.processMetric(lldMetric)...)
}
}
// Send metrics to Zabbix server
err := z.sendZabbixMetrics(zbxMetrics)
// Send autoregister data after sending metrics.
z.autoregisterPush()
return err
}
// sendZabbixMetrics sends metrics to Zabbix server
func (z *Zabbix) sendZabbixMetrics(zbxMetrics []*zabbix.Metric) error {
if len(zbxMetrics) == 0 {
return nil
}
// Sort metrics by time.
// Avoid extra work in Zabbix when generating the trends.
// If values are not sent in clock order, trend generation is forced to
// make more database operations.
// When a value is received with a new hour, trend is flushed to the
// database.
// If later a value is received with the previous hour, new trend is
// flushed, old one is retrieved from database and updated.
// When a new value with the new hour is received, old trend is flushed,
// new trend retrieved from database and updated.
sort.Slice(zbxMetrics, func(i, j int) bool {
return zbxMetrics[i].Clock < zbxMetrics[j].Clock
})
packet := zabbix.NewPacket(zbxMetrics, z.AgentActive)
_, err := z.sender.Send(packet)
return err
}
// processMetric converts a Telegraf metric to a list of Zabbix metrics.
// Ignore metrics with no hostname.
func (z Zabbix) processMetric(metric telegraf.Metric) []*zabbix.Metric {
zbxMetrics := make([]*zabbix.Metric, 0, len(metric.FieldList()))
for _, field := range metric.FieldList() {
zbxMetric, err := z.buildZabbixMetric(metric, field.Key, field.Value)
if err != nil {
z.Log.Errorf("Error converting telegraf metric to Zabbix format: %v", err)
continue
}
zbxMetrics = append(zbxMetrics, zbxMetric)
}
return zbxMetrics
}
// buildZabbixMetric builds a Zabbix metric from a Telegraf metric, for one particular value.
func (z Zabbix) buildZabbixMetric(metric telegraf.Metric, fieldName string, value interface{}) (*zabbix.Metric, error) {
hostname, err := getHostname(z.HostTag, metric)
if err != nil {
return nil, fmt.Errorf("error getting hostname: %w", err)
}
metricValue, err := internal.ToString(value)
if err != nil {
return nil, fmt.Errorf("error converting value: %w", err)
}
key := z.KeyPrefix + metric.Name() + "." + fieldName
if z.SkipMeasurementPrefix {
key = z.KeyPrefix + fieldName
}
// Ignore host tag.
// We want to add tags to the key in alphabetical order. Eg.:
// lld.dns_query.query_time_ms[DOMAIN,RECORD_TYPE,SERVER]
// TagList already return the tags in alphabetical order.
tagValues := make([]string, 0, len(metric.TagList()))
for _, tag := range metric.TagList() {
if tag.Key == z.HostTag {
continue
}
// Get tag values in the same order as the tag keys in the tags slice.
tagValues = append(tagValues, tag.Value)
}
if len(tagValues) != 0 {
key = fmt.Sprintf("%v[%v]", key, strings.Join(tagValues, ","))
}
return zabbix.NewMetric(hostname, key, metricValue, z.AgentActive, metric.Time().Unix()), nil
}
func init() {
outputs.Add("zabbix", func() telegraf.Output {
return &Zabbix{
KeyPrefix: "telegraf.",
HostTag: "host",
AutoregisterResendInterval: config.Duration(time.Minute * 30),
LLDSendInterval: config.Duration(time.Minute * 10),
LLDClearInterval: config.Duration(time.Hour),
}
})
}
// getHostname returns the hostname from the tags, or the system hostname if not found.
func getHostname(hostTag string, metric telegraf.Metric) (string, error) {
if hostname, ok := metric.GetTag(hostTag); ok {
return hostname, nil
}
return os.Hostname()
}