1
0
Fork 0
telegraf/plugins/outputs/nsq/nsq.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

75 lines
1.3 KiB
Go

//go:generate ../../../tools/readme_config_includer/generator
package nsq
import (
_ "embed"
"fmt"
"github.com/nsqio/go-nsq"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
)
//go:embed sample.conf
var sampleConfig string
type NSQ struct {
Server string
Topic string
Log telegraf.Logger `toml:"-"`
producer *nsq.Producer
serializer telegraf.Serializer
}
func (*NSQ) SampleConfig() string {
return sampleConfig
}
func (n *NSQ) SetSerializer(serializer telegraf.Serializer) {
n.serializer = serializer
}
func (n *NSQ) Connect() error {
config := nsq.NewConfig()
producer, err := nsq.NewProducer(n.Server, config)
if err != nil {
return err
}
n.producer = producer
return nil
}
func (n *NSQ) Close() error {
n.producer.Stop()
return nil
}
func (n *NSQ) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}
for _, metric := range metrics {
buf, err := n.serializer.Serialize(metric)
if err != nil {
n.Log.Debugf("Could not serialize metric: %v", err)
continue
}
err = n.producer.Publish(n.Topic, buf)
if err != nil {
return fmt.Errorf("failed to send NSQD message: %w", err)
}
}
return nil
}
func init() {
outputs.Add("nsq", func() telegraf.Output {
return &NSQ{}
})
}