1
0
Fork 0
telegraf/plugins/outputs/quix/quix.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

172 lines
4.6 KiB
Go

//go:generate ../../../tools/readme_config_includer/generator
package quix
import (
"crypto/tls"
"crypto/x509"
_ "embed"
"errors"
"fmt"
"strings"
"time"
"github.com/IBM/sarama"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
common_http "github.com/influxdata/telegraf/plugins/common/http"
common_kafka "github.com/influxdata/telegraf/plugins/common/kafka"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers/json"
)
//go:embed sample.conf
var sampleConfig string
type Quix struct {
APIURL string `toml:"url"`
Workspace string `toml:"workspace"`
Topic string `toml:"topic"`
Token config.Secret `toml:"token"`
Log telegraf.Logger `toml:"-"`
common_http.HTTPClientConfig
producer sarama.SyncProducer
serializer telegraf.Serializer
kakfaTopic string
}
func (*Quix) SampleConfig() string {
return sampleConfig
}
func (q *Quix) Init() error {
// Set defaults
if q.APIURL == "" {
q.APIURL = "https://portal-api.platform.quix.io"
}
q.APIURL = strings.TrimSuffix(q.APIURL, "/")
// Check input parameters
if q.Topic == "" {
return errors.New("option 'topic' must be set")
}
if q.Workspace == "" {
return errors.New("option 'workspace' must be set")
}
if q.Token.Empty() {
return errors.New("option 'token' must be set")
}
q.kakfaTopic = q.Workspace + "-" + q.Topic
// Create a JSON serializer for the output
q.serializer = &json.Serializer{
TimestampUnits: config.Duration(time.Nanosecond), // Hardcoded nanoseconds precision
}
return nil
}
func (q *Quix) Connect() error {
// Fetch the Kafka broker configuration from the Quix HTTP endpoint
quixConfig, err := q.fetchBrokerConfig()
if err != nil {
return fmt.Errorf("fetching broker config failed: %w", err)
}
brokers := strings.Split(quixConfig.BootstrapServers, ",")
if len(brokers) == 0 {
return errors.New("no brokers received")
}
// Setup the Kakfa producer config
cfg := sarama.NewConfig()
cfg.Producer.Return.Successes = true
switch quixConfig.SecurityProtocol {
case "SASL_SSL":
cfg.Net.SASL.Enable = true
cfg.Net.SASL.User = quixConfig.SaslUsername
cfg.Net.SASL.Password = quixConfig.SaslPassword
cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &common_kafka.XDGSCRAMClient{HashGeneratorFcn: common_kafka.SHA256}
}
switch quixConfig.SaslMechanism {
case "SCRAM-SHA-512":
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &common_kafka.XDGSCRAMClient{HashGeneratorFcn: common_kafka.SHA512}
}
cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
case "SCRAM-SHA-256":
cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &common_kafka.XDGSCRAMClient{HashGeneratorFcn: common_kafka.SHA256}
}
case "PLAIN":
cfg.Net.SASL.Mechanism = sarama.SASLTypePlaintext
default:
return fmt.Errorf("unsupported SASL mechanism: %s", quixConfig.SaslMechanism)
}
cfg.Net.TLS.Enable = true
// Add the CA certificate sent by the server if there is any. Newer cloud
// instances do not need this and we can go with the system certificates.
if len(quixConfig.cert) > 0 {
certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(quixConfig.cert) {
return errors.New("appending CA cert to pool failed")
}
cfg.Net.TLS.Config = &tls.Config{RootCAs: certPool}
}
case "PLAINTEXT":
// No additional configuration required for plaintext communication
default:
return fmt.Errorf("unsupported security protocol: %s", quixConfig.SecurityProtocol)
}
// Setup the Kakfa producer itself
producer, err := sarama.NewSyncProducer(brokers, cfg)
if err != nil {
return fmt.Errorf("creating producer failed: %w", err)
}
q.producer = producer
return nil
}
func (q *Quix) Write(metrics []telegraf.Metric) error {
for _, m := range metrics {
serialized, err := q.serializer.Serialize(m)
if err != nil {
q.Log.Errorf("Error serializing metric: %v", err)
continue
}
msg := &sarama.ProducerMessage{
Topic: q.kakfaTopic,
Value: sarama.ByteEncoder(serialized),
Timestamp: m.Time(),
Key: sarama.StringEncoder("telegraf"),
}
if _, _, err = q.producer.SendMessage(msg); err != nil {
q.Log.Errorf("Error sending message to Kafka: %v", err)
continue
}
}
return nil
}
func (q *Quix) Close() error {
if q.producer != nil {
return q.producer.Close()
}
return nil
}
func init() {
outputs.Add("quix", func() telegraf.Output { return &Quix{} })
}