209 lines
5.4 KiB
Go
209 lines
5.4 KiB
Go
|
//go:generate ../../../tools/readme_config_includer/generator
|
||
|
package websocket
|
||
|
|
||
|
import (
|
||
|
_ "embed"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"net/http"
|
||
|
"net/url"
|
||
|
"time"
|
||
|
|
||
|
ws "github.com/gorilla/websocket"
|
||
|
|
||
|
"github.com/influxdata/telegraf"
|
||
|
"github.com/influxdata/telegraf/config"
|
||
|
"github.com/influxdata/telegraf/plugins/common/proxy"
|
||
|
"github.com/influxdata/telegraf/plugins/common/tls"
|
||
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||
|
)
|
||
|
|
||
|
//go:embed sample.conf
|
||
|
var sampleConfig string
|
||
|
|
||
|
const (
|
||
|
defaultConnectTimeout = 30 * time.Second
|
||
|
defaultWriteTimeout = 30 * time.Second
|
||
|
defaultReadTimeout = 30 * time.Second
|
||
|
)
|
||
|
|
||
|
// WebSocket can output to WebSocket endpoint.
|
||
|
type WebSocket struct {
|
||
|
URL string `toml:"url"`
|
||
|
ConnectTimeout config.Duration `toml:"connect_timeout"`
|
||
|
WriteTimeout config.Duration `toml:"write_timeout"`
|
||
|
ReadTimeout config.Duration `toml:"read_timeout"`
|
||
|
Headers map[string]*config.Secret `toml:"headers"`
|
||
|
UseTextFrames bool `toml:"use_text_frames"`
|
||
|
Log telegraf.Logger `toml:"-"`
|
||
|
proxy.HTTPProxy
|
||
|
proxy.Socks5ProxyConfig
|
||
|
tls.ClientConfig
|
||
|
|
||
|
conn *ws.Conn
|
||
|
serializer telegraf.Serializer
|
||
|
}
|
||
|
|
||
|
func (*WebSocket) SampleConfig() string {
|
||
|
return sampleConfig
|
||
|
}
|
||
|
|
||
|
// SetSerializer implements serializers.SerializerOutput.
|
||
|
func (w *WebSocket) SetSerializer(serializer telegraf.Serializer) {
|
||
|
w.serializer = serializer
|
||
|
}
|
||
|
|
||
|
var errInvalidURL = errors.New("invalid websocket URL")
|
||
|
|
||
|
// Init the output plugin.
|
||
|
func (w *WebSocket) Init() error {
|
||
|
if parsedURL, err := url.Parse(w.URL); err != nil || (parsedURL.Scheme != "ws" && parsedURL.Scheme != "wss") {
|
||
|
return fmt.Errorf("%w: %q", errInvalidURL, w.URL)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Connect to the output endpoint.
|
||
|
func (w *WebSocket) Connect() error {
|
||
|
tlsCfg, err := w.ClientConfig.TLSConfig()
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("error creating TLS config: %w", err)
|
||
|
}
|
||
|
|
||
|
dialProxy, err := w.HTTPProxy.Proxy()
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("error creating proxy: %w", err)
|
||
|
}
|
||
|
|
||
|
dialer := &ws.Dialer{
|
||
|
Proxy: dialProxy,
|
||
|
HandshakeTimeout: time.Duration(w.ConnectTimeout),
|
||
|
TLSClientConfig: tlsCfg,
|
||
|
}
|
||
|
|
||
|
if w.Socks5ProxyEnabled {
|
||
|
netDialer, err := w.Socks5ProxyConfig.GetDialer()
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("error connecting to socks5 proxy: %w", err)
|
||
|
}
|
||
|
dialer.NetDial = netDialer.Dial
|
||
|
}
|
||
|
|
||
|
headers := http.Header{}
|
||
|
for k, v := range w.Headers {
|
||
|
secret, err := v.Get()
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("getting header secret %q failed: %w", k, err)
|
||
|
}
|
||
|
|
||
|
headers.Set(k, secret.String())
|
||
|
secret.Destroy()
|
||
|
}
|
||
|
|
||
|
conn, resp, err := dialer.Dial(w.URL, headers)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("error dial: %w", err)
|
||
|
}
|
||
|
_ = resp.Body.Close()
|
||
|
if resp.StatusCode != http.StatusSwitchingProtocols {
|
||
|
return fmt.Errorf("wrong status code while connecting to server: %d", resp.StatusCode)
|
||
|
}
|
||
|
|
||
|
w.conn = conn
|
||
|
go w.read(conn)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (w *WebSocket) read(conn *ws.Conn) {
|
||
|
defer func() { _ = conn.Close() }()
|
||
|
if w.ReadTimeout > 0 {
|
||
|
if err := conn.SetReadDeadline(time.Now().Add(time.Duration(w.ReadTimeout))); err != nil {
|
||
|
w.Log.Errorf("error setting read deadline: %v", err)
|
||
|
return
|
||
|
}
|
||
|
conn.SetPingHandler(func(string) error {
|
||
|
err := conn.SetReadDeadline(time.Now().Add(time.Duration(w.ReadTimeout)))
|
||
|
if err != nil {
|
||
|
w.Log.Errorf("error setting read deadline: %v", err)
|
||
|
return err
|
||
|
}
|
||
|
return conn.WriteControl(ws.PongMessage, nil, time.Now().Add(time.Duration(w.WriteTimeout)))
|
||
|
})
|
||
|
}
|
||
|
for {
|
||
|
// Need to read a connection (to properly process pings from a server).
|
||
|
_, _, err := conn.ReadMessage()
|
||
|
if err != nil {
|
||
|
// Websocket connection is not readable after first error, it's going to error state.
|
||
|
// In the beginning of this goroutine we have defer section that closes such connection.
|
||
|
// After that connection will be tried to reestablish on next Write.
|
||
|
if ws.IsUnexpectedCloseError(err, ws.CloseGoingAway, ws.CloseAbnormalClosure) {
|
||
|
w.Log.Errorf("error reading websocket connection: %v", err)
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
if w.ReadTimeout > 0 {
|
||
|
if err := conn.SetReadDeadline(time.Now().Add(time.Duration(w.ReadTimeout))); err != nil {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Write writes the given metrics to the destination. Not thread-safe.
|
||
|
func (w *WebSocket) Write(metrics []telegraf.Metric) error {
|
||
|
if w.conn == nil {
|
||
|
// Previous write failed with error and ws conn was closed.
|
||
|
if err := w.Connect(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
messageData, err := w.serializer.SerializeBatch(metrics)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if w.WriteTimeout > 0 {
|
||
|
if err := w.conn.SetWriteDeadline(time.Now().Add(time.Duration(w.WriteTimeout))); err != nil {
|
||
|
return fmt.Errorf("error setting write deadline: %w", err)
|
||
|
}
|
||
|
}
|
||
|
messageType := ws.BinaryMessage
|
||
|
if w.UseTextFrames {
|
||
|
messageType = ws.TextMessage
|
||
|
}
|
||
|
err = w.conn.WriteMessage(messageType, messageData)
|
||
|
if err != nil {
|
||
|
_ = w.conn.Close()
|
||
|
w.conn = nil
|
||
|
return fmt.Errorf("error writing to connection: %w", err)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Close closes the connection. Noop if already closed.
|
||
|
func (w *WebSocket) Close() error {
|
||
|
if w.conn == nil {
|
||
|
return nil
|
||
|
}
|
||
|
err := w.conn.Close()
|
||
|
w.conn = nil
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func newWebSocket() *WebSocket {
|
||
|
return &WebSocket{
|
||
|
ConnectTimeout: config.Duration(defaultConnectTimeout),
|
||
|
WriteTimeout: config.Duration(defaultWriteTimeout),
|
||
|
ReadTimeout: config.Duration(defaultReadTimeout),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func init() {
|
||
|
outputs.Add("websocket", func() telegraf.Output {
|
||
|
return newWebSocket()
|
||
|
})
|
||
|
}
|