139 lines
3.2 KiB
Go
139 lines
3.2 KiB
Go
package mqtt
|
|
|
|
import (
|
|
"fmt"
|
|
"time"
|
|
|
|
mqttv3 "github.com/eclipse/paho.mqtt.golang" // Library that supports v3.1.1
|
|
|
|
"github.com/influxdata/telegraf/internal"
|
|
"github.com/influxdata/telegraf/logger"
|
|
)
|
|
|
|
type mqttv311Client struct {
|
|
client mqttv3.Client
|
|
timeout time.Duration
|
|
qos int
|
|
retain bool
|
|
}
|
|
|
|
func NewMQTTv311Client(cfg *MqttConfig) (*mqttv311Client, error) {
|
|
opts := mqttv3.NewClientOptions()
|
|
opts.KeepAlive = cfg.KeepAlive
|
|
opts.WriteTimeout = time.Duration(cfg.Timeout)
|
|
if time.Duration(cfg.ConnectionTimeout) >= 1*time.Second {
|
|
opts.ConnectTimeout = time.Duration(cfg.ConnectionTimeout)
|
|
}
|
|
opts.SetCleanSession(!cfg.PersistentSession)
|
|
if cfg.OnConnectionLost != nil {
|
|
onConnectionLost := func(_ mqttv3.Client, err error) {
|
|
cfg.OnConnectionLost(err)
|
|
}
|
|
opts.SetConnectionLostHandler(onConnectionLost)
|
|
}
|
|
opts.SetAutoReconnect(cfg.AutoReconnect)
|
|
|
|
if cfg.ClientID != "" {
|
|
opts.SetClientID(cfg.ClientID)
|
|
} else {
|
|
id, err := internal.RandomString(5)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("generating random client ID failed: %w", err)
|
|
}
|
|
opts.SetClientID("Telegraf-Output-" + id)
|
|
}
|
|
|
|
tlsCfg, err := cfg.ClientConfig.TLSConfig()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
opts.SetTLSConfig(tlsCfg)
|
|
|
|
if !cfg.Username.Empty() {
|
|
user, err := cfg.Username.Get()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("getting username failed: %w", err)
|
|
}
|
|
opts.SetUsername(user.String())
|
|
user.Destroy()
|
|
}
|
|
if !cfg.Password.Empty() {
|
|
password, err := cfg.Password.Get()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("getting password failed: %w", err)
|
|
}
|
|
opts.SetPassword(password.String())
|
|
password.Destroy()
|
|
}
|
|
|
|
servers, err := parseServers(cfg.Servers)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, server := range servers {
|
|
if tlsCfg != nil {
|
|
server.Scheme = "tls"
|
|
}
|
|
broker := server.String()
|
|
opts.AddBroker(broker)
|
|
}
|
|
|
|
if cfg.ClientTrace {
|
|
log := &mqttLogger{logger.New("paho", "", "")}
|
|
mqttv3.ERROR = log
|
|
mqttv3.CRITICAL = log
|
|
mqttv3.WARN = log
|
|
mqttv3.DEBUG = log
|
|
}
|
|
|
|
return &mqttv311Client{
|
|
client: mqttv3.NewClient(opts),
|
|
timeout: time.Duration(cfg.Timeout),
|
|
qos: cfg.QoS,
|
|
retain: cfg.Retain,
|
|
}, nil
|
|
}
|
|
|
|
func (m *mqttv311Client) Connect() (bool, error) {
|
|
token := m.client.Connect()
|
|
|
|
if token.Wait() && token.Error() != nil {
|
|
return false, token.Error()
|
|
}
|
|
|
|
// Persistent sessions should skip subscription if a session is present, as
|
|
// the subscriptions are stored by the server.
|
|
type sessionPresent interface {
|
|
SessionPresent() bool
|
|
}
|
|
if t, ok := token.(sessionPresent); ok {
|
|
return t.SessionPresent(), nil
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
func (m *mqttv311Client) Publish(topic string, body []byte) error {
|
|
token := m.client.Publish(topic, byte(m.qos), m.retain, body)
|
|
if !token.WaitTimeout(m.timeout) {
|
|
return internal.ErrTimeout
|
|
}
|
|
return token.Error()
|
|
}
|
|
|
|
func (m *mqttv311Client) SubscribeMultiple(filters map[string]byte, callback mqttv3.MessageHandler) error {
|
|
token := m.client.SubscribeMultiple(filters, callback)
|
|
token.Wait()
|
|
return token.Error()
|
|
}
|
|
|
|
func (m *mqttv311Client) AddRoute(topic string, callback mqttv3.MessageHandler) {
|
|
m.client.AddRoute(topic, callback)
|
|
}
|
|
|
|
func (m *mqttv311Client) Close() error {
|
|
if m.client.IsConnected() {
|
|
m.client.Disconnect(100)
|
|
}
|
|
return nil
|
|
}
|