172 lines
4.6 KiB
Go
172 lines
4.6 KiB
Go
//go:generate ../../../tools/readme_config_includer/generator
|
|
package quix
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
_ "embed"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/IBM/sarama"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/config"
|
|
common_http "github.com/influxdata/telegraf/plugins/common/http"
|
|
common_kafka "github.com/influxdata/telegraf/plugins/common/kafka"
|
|
"github.com/influxdata/telegraf/plugins/outputs"
|
|
"github.com/influxdata/telegraf/plugins/serializers/json"
|
|
)
|
|
|
|
//go:embed sample.conf
|
|
var sampleConfig string
|
|
|
|
type Quix struct {
|
|
APIURL string `toml:"url"`
|
|
Workspace string `toml:"workspace"`
|
|
Topic string `toml:"topic"`
|
|
Token config.Secret `toml:"token"`
|
|
Log telegraf.Logger `toml:"-"`
|
|
common_http.HTTPClientConfig
|
|
|
|
producer sarama.SyncProducer
|
|
serializer telegraf.Serializer
|
|
kakfaTopic string
|
|
}
|
|
|
|
func (*Quix) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
func (q *Quix) Init() error {
|
|
// Set defaults
|
|
if q.APIURL == "" {
|
|
q.APIURL = "https://portal-api.platform.quix.io"
|
|
}
|
|
q.APIURL = strings.TrimSuffix(q.APIURL, "/")
|
|
|
|
// Check input parameters
|
|
if q.Topic == "" {
|
|
return errors.New("option 'topic' must be set")
|
|
}
|
|
if q.Workspace == "" {
|
|
return errors.New("option 'workspace' must be set")
|
|
}
|
|
if q.Token.Empty() {
|
|
return errors.New("option 'token' must be set")
|
|
}
|
|
q.kakfaTopic = q.Workspace + "-" + q.Topic
|
|
|
|
// Create a JSON serializer for the output
|
|
q.serializer = &json.Serializer{
|
|
TimestampUnits: config.Duration(time.Nanosecond), // Hardcoded nanoseconds precision
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *Quix) Connect() error {
|
|
// Fetch the Kafka broker configuration from the Quix HTTP endpoint
|
|
quixConfig, err := q.fetchBrokerConfig()
|
|
if err != nil {
|
|
return fmt.Errorf("fetching broker config failed: %w", err)
|
|
}
|
|
brokers := strings.Split(quixConfig.BootstrapServers, ",")
|
|
if len(brokers) == 0 {
|
|
return errors.New("no brokers received")
|
|
}
|
|
|
|
// Setup the Kakfa producer config
|
|
cfg := sarama.NewConfig()
|
|
cfg.Producer.Return.Successes = true
|
|
|
|
switch quixConfig.SecurityProtocol {
|
|
case "SASL_SSL":
|
|
cfg.Net.SASL.Enable = true
|
|
cfg.Net.SASL.User = quixConfig.SaslUsername
|
|
cfg.Net.SASL.Password = quixConfig.SaslPassword
|
|
cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
|
|
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
|
|
return &common_kafka.XDGSCRAMClient{HashGeneratorFcn: common_kafka.SHA256}
|
|
}
|
|
|
|
switch quixConfig.SaslMechanism {
|
|
case "SCRAM-SHA-512":
|
|
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
|
|
return &common_kafka.XDGSCRAMClient{HashGeneratorFcn: common_kafka.SHA512}
|
|
}
|
|
cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
|
|
case "SCRAM-SHA-256":
|
|
cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
|
|
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
|
|
return &common_kafka.XDGSCRAMClient{HashGeneratorFcn: common_kafka.SHA256}
|
|
}
|
|
case "PLAIN":
|
|
cfg.Net.SASL.Mechanism = sarama.SASLTypePlaintext
|
|
default:
|
|
return fmt.Errorf("unsupported SASL mechanism: %s", quixConfig.SaslMechanism)
|
|
}
|
|
|
|
cfg.Net.TLS.Enable = true
|
|
|
|
// Add the CA certificate sent by the server if there is any. Newer cloud
|
|
// instances do not need this and we can go with the system certificates.
|
|
if len(quixConfig.cert) > 0 {
|
|
certPool := x509.NewCertPool()
|
|
if !certPool.AppendCertsFromPEM(quixConfig.cert) {
|
|
return errors.New("appending CA cert to pool failed")
|
|
}
|
|
cfg.Net.TLS.Config = &tls.Config{RootCAs: certPool}
|
|
}
|
|
case "PLAINTEXT":
|
|
// No additional configuration required for plaintext communication
|
|
default:
|
|
return fmt.Errorf("unsupported security protocol: %s", quixConfig.SecurityProtocol)
|
|
}
|
|
|
|
// Setup the Kakfa producer itself
|
|
producer, err := sarama.NewSyncProducer(brokers, cfg)
|
|
if err != nil {
|
|
return fmt.Errorf("creating producer failed: %w", err)
|
|
}
|
|
q.producer = producer
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *Quix) Write(metrics []telegraf.Metric) error {
|
|
for _, m := range metrics {
|
|
serialized, err := q.serializer.Serialize(m)
|
|
if err != nil {
|
|
q.Log.Errorf("Error serializing metric: %v", err)
|
|
continue
|
|
}
|
|
|
|
msg := &sarama.ProducerMessage{
|
|
Topic: q.kakfaTopic,
|
|
Value: sarama.ByteEncoder(serialized),
|
|
Timestamp: m.Time(),
|
|
Key: sarama.StringEncoder("telegraf"),
|
|
}
|
|
|
|
if _, _, err = q.producer.SendMessage(msg); err != nil {
|
|
q.Log.Errorf("Error sending message to Kafka: %v", err)
|
|
continue
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *Quix) Close() error {
|
|
if q.producer != nil {
|
|
return q.producer.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func init() {
|
|
outputs.Add("quix", func() telegraf.Output { return &Quix{} })
|
|
}
|