1
0
Fork 0
telegraf/plugins/common/mqtt/mqtt_v3.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

139 lines
3.2 KiB
Go

package mqtt
import (
"fmt"
"time"
mqttv3 "github.com/eclipse/paho.mqtt.golang" // Library that supports v3.1.1
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/logger"
)
type mqttv311Client struct {
client mqttv3.Client
timeout time.Duration
qos int
retain bool
}
func NewMQTTv311Client(cfg *MqttConfig) (*mqttv311Client, error) {
opts := mqttv3.NewClientOptions()
opts.KeepAlive = cfg.KeepAlive
opts.WriteTimeout = time.Duration(cfg.Timeout)
if time.Duration(cfg.ConnectionTimeout) >= 1*time.Second {
opts.ConnectTimeout = time.Duration(cfg.ConnectionTimeout)
}
opts.SetCleanSession(!cfg.PersistentSession)
if cfg.OnConnectionLost != nil {
onConnectionLost := func(_ mqttv3.Client, err error) {
cfg.OnConnectionLost(err)
}
opts.SetConnectionLostHandler(onConnectionLost)
}
opts.SetAutoReconnect(cfg.AutoReconnect)
if cfg.ClientID != "" {
opts.SetClientID(cfg.ClientID)
} else {
id, err := internal.RandomString(5)
if err != nil {
return nil, fmt.Errorf("generating random client ID failed: %w", err)
}
opts.SetClientID("Telegraf-Output-" + id)
}
tlsCfg, err := cfg.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}
opts.SetTLSConfig(tlsCfg)
if !cfg.Username.Empty() {
user, err := cfg.Username.Get()
if err != nil {
return nil, fmt.Errorf("getting username failed: %w", err)
}
opts.SetUsername(user.String())
user.Destroy()
}
if !cfg.Password.Empty() {
password, err := cfg.Password.Get()
if err != nil {
return nil, fmt.Errorf("getting password failed: %w", err)
}
opts.SetPassword(password.String())
password.Destroy()
}
servers, err := parseServers(cfg.Servers)
if err != nil {
return nil, err
}
for _, server := range servers {
if tlsCfg != nil {
server.Scheme = "tls"
}
broker := server.String()
opts.AddBroker(broker)
}
if cfg.ClientTrace {
log := &mqttLogger{logger.New("paho", "", "")}
mqttv3.ERROR = log
mqttv3.CRITICAL = log
mqttv3.WARN = log
mqttv3.DEBUG = log
}
return &mqttv311Client{
client: mqttv3.NewClient(opts),
timeout: time.Duration(cfg.Timeout),
qos: cfg.QoS,
retain: cfg.Retain,
}, nil
}
func (m *mqttv311Client) Connect() (bool, error) {
token := m.client.Connect()
if token.Wait() && token.Error() != nil {
return false, token.Error()
}
// Persistent sessions should skip subscription if a session is present, as
// the subscriptions are stored by the server.
type sessionPresent interface {
SessionPresent() bool
}
if t, ok := token.(sessionPresent); ok {
return t.SessionPresent(), nil
}
return false, nil
}
func (m *mqttv311Client) Publish(topic string, body []byte) error {
token := m.client.Publish(topic, byte(m.qos), m.retain, body)
if !token.WaitTimeout(m.timeout) {
return internal.ErrTimeout
}
return token.Error()
}
func (m *mqttv311Client) SubscribeMultiple(filters map[string]byte, callback mqttv3.MessageHandler) error {
token := m.client.SubscribeMultiple(filters, callback)
token.Wait()
return token.Error()
}
func (m *mqttv311Client) AddRoute(topic string, callback mqttv3.MessageHandler) {
m.client.AddRoute(topic, callback)
}
func (m *mqttv311Client) Close() error {
if m.client.IsConnected() {
m.client.Disconnect(100)
}
return nil
}