306 lines
8.3 KiB
Go
306 lines
8.3 KiB
Go
//go:generate ../../../tools/readme_config_includer/generator
|
|
package graphite
|
|
|
|
import (
|
|
"crypto/tls"
|
|
_ "embed"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math/rand"
|
|
"net"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/config"
|
|
common_tls "github.com/influxdata/telegraf/plugins/common/tls"
|
|
"github.com/influxdata/telegraf/plugins/outputs"
|
|
"github.com/influxdata/telegraf/plugins/serializers/graphite"
|
|
)
|
|
|
|
//go:embed sample.conf
|
|
var sampleConfig string
|
|
|
|
var ErrNotConnected = errors.New("could not write to any server in cluster")
|
|
|
|
type connection struct {
|
|
name string
|
|
conn net.Conn
|
|
connected bool
|
|
}
|
|
|
|
type Graphite struct {
|
|
GraphiteTagSupport bool `toml:"graphite_tag_support"`
|
|
GraphiteTagSanitizeMode string `toml:"graphite_tag_sanitize_mode"`
|
|
GraphiteSeparator string `toml:"graphite_separator"`
|
|
GraphiteStrictRegex string `toml:"graphite_strict_sanitize_regex"`
|
|
// URL is only for backwards compatibility
|
|
Servers []string `toml:"servers"`
|
|
LocalAddr string `toml:"local_address"`
|
|
Prefix string `toml:"prefix"`
|
|
Template string `toml:"template"`
|
|
Templates []string `toml:"templates"`
|
|
Timeout config.Duration `toml:"timeout"`
|
|
Log telegraf.Logger `toml:"-"`
|
|
common_tls.ClientConfig
|
|
|
|
connections []connection
|
|
serializer *graphite.GraphiteSerializer
|
|
}
|
|
|
|
func (*Graphite) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
func (g *Graphite) Init() error {
|
|
s := &graphite.GraphiteSerializer{
|
|
Prefix: g.Prefix,
|
|
Template: g.Template,
|
|
StrictRegex: g.GraphiteStrictRegex,
|
|
TagSupport: g.GraphiteTagSupport,
|
|
TagSanitizeMode: g.GraphiteTagSanitizeMode,
|
|
Separator: g.GraphiteSeparator,
|
|
Templates: g.Templates,
|
|
}
|
|
if err := s.Init(); err != nil {
|
|
return err
|
|
}
|
|
g.serializer = s
|
|
|
|
// Set default values
|
|
if len(g.Servers) == 0 {
|
|
g.Servers = append(g.Servers, "localhost:2003")
|
|
}
|
|
|
|
// Fill in the connections from the server
|
|
g.connections = make([]connection, 0, len(g.Servers))
|
|
for _, server := range g.Servers {
|
|
g.connections = append(g.connections, connection{
|
|
name: server,
|
|
connected: false,
|
|
})
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (g *Graphite) Connect() error {
|
|
// Set tls config
|
|
tlsConfig, err := g.ClientConfig.TLSConfig()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Find all non-connected servers and try to reconnect
|
|
var newConnection bool
|
|
var connectedServers int
|
|
var failedServers []string
|
|
for i, server := range g.connections {
|
|
if server.connected {
|
|
connectedServers++
|
|
continue
|
|
}
|
|
newConnection = true
|
|
|
|
// Dialer with timeout
|
|
d := net.Dialer{Timeout: time.Duration(g.Timeout)}
|
|
if g.LocalAddr != "" {
|
|
// Resolve the local address into IP address and the given port if any
|
|
addr, sPort, err := net.SplitHostPort(g.LocalAddr)
|
|
if err != nil {
|
|
if !strings.Contains(err.Error(), "missing port") {
|
|
return fmt.Errorf("invalid local address: %w", err)
|
|
}
|
|
addr = g.LocalAddr
|
|
}
|
|
local, err := net.ResolveIPAddr("ip", addr)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot resolve local address: %w", err)
|
|
}
|
|
|
|
var port int
|
|
if sPort != "" {
|
|
p, err := strconv.ParseUint(sPort, 10, 16)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid port: %w", err)
|
|
}
|
|
port = int(p)
|
|
}
|
|
|
|
d.LocalAddr = &net.TCPAddr{IP: local.IP, Port: port, Zone: local.Zone}
|
|
}
|
|
|
|
// Get secure connection if tls config is set
|
|
var conn net.Conn
|
|
if tlsConfig != nil {
|
|
conn, err = tls.DialWithDialer(&d, "tcp", server.name, tlsConfig)
|
|
} else {
|
|
conn, err = d.Dial("tcp", server.name)
|
|
}
|
|
|
|
if err == nil {
|
|
g.connections[i].conn = conn
|
|
g.connections[i].connected = true
|
|
connectedServers++
|
|
} else {
|
|
g.Log.Debugf("Failed to establish connection: %v", err)
|
|
failedServers = append(failedServers, server.name)
|
|
}
|
|
}
|
|
|
|
if newConnection {
|
|
g.Log.Debugf("Successful connections: %d of %d", connectedServers, len(g.connections))
|
|
}
|
|
if len(failedServers) > 0 {
|
|
g.Log.Debugf("Failed servers: %d", len(failedServers))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (g *Graphite) Close() error {
|
|
// Closing all connections
|
|
for _, c := range g.connections {
|
|
_ = c.conn.Close()
|
|
c.connected = false
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// We need check eof as we can write to nothing without noticing anything is wrong
|
|
// the connection stays in a close_wait
|
|
// We can detect that by finding an eof
|
|
// if not for this, we can happily write and flush without getting errors (in Go) but getting RST tcp packets back (!)
|
|
// props to Tv via the authors of carbon-relay-ng` for this trick.
|
|
func (g *Graphite) checkEOF(conn net.Conn) error {
|
|
b := make([]byte, 1024)
|
|
|
|
if err := conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond)); err != nil {
|
|
g.Log.Debugf(
|
|
"Couldn't set read deadline for connection due to error %v with remote address %s. closing conn explicitly",
|
|
err,
|
|
conn.RemoteAddr().String(),
|
|
)
|
|
err = conn.Close()
|
|
g.Log.Debugf("Failed to close the connection: %v", err)
|
|
return err
|
|
}
|
|
num, err := conn.Read(b)
|
|
if errors.Is(err, io.EOF) {
|
|
g.Log.Debugf("Conn %s is closed. closing conn explicitly", conn.RemoteAddr().String())
|
|
err = conn.Close()
|
|
g.Log.Debugf("Failed to close the connection: %v", err)
|
|
return err
|
|
}
|
|
// just in case i misunderstand something or the remote behaves badly
|
|
if num != 0 {
|
|
g.Log.Infof("conn %s .conn.Read data? did not expect that. data: %s", conn, b[:num])
|
|
}
|
|
// Log non-timeout errors and close.
|
|
var netErr net.Error
|
|
if !errors.As(err, &netErr) || !netErr.Timeout() {
|
|
g.Log.Debugf("conn %s checkEOF .conn.Read returned err != EOF, which is unexpected. closing conn. error: %s", conn, err)
|
|
err = conn.Close()
|
|
g.Log.Debugf("Failed to close the connection: %v", err)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Choose a random server in the cluster to write to until a successful write
|
|
// occurs, logging each unsuccessful. If all servers fail, return error.
|
|
func (g *Graphite) Write(metrics []telegraf.Metric) error {
|
|
// Prepare data
|
|
var batch []byte
|
|
for _, metric := range metrics {
|
|
buf, err := g.serializer.Serialize(metric)
|
|
if err != nil {
|
|
g.Log.Errorf("Error serializing some metrics to graphite: %s", err.Error())
|
|
}
|
|
batch = append(batch, buf...)
|
|
}
|
|
|
|
// Try to connect to all servers not yet connected if any
|
|
if err := g.Connect(); err != nil {
|
|
return fmt.Errorf("failed to reconnect: %w", err)
|
|
}
|
|
|
|
// Return on success of if we encounter a non-retryable error
|
|
if err := g.send(batch); err == nil || !errors.Is(err, ErrNotConnected) {
|
|
return err
|
|
}
|
|
|
|
// Try to reconnect and resend
|
|
failedServers := make([]string, 0, len(g.connections))
|
|
for _, c := range g.connections {
|
|
if !c.connected {
|
|
failedServers = append(failedServers, c.name)
|
|
}
|
|
}
|
|
if len(failedServers) > 0 {
|
|
g.Log.Debugf("Reconnecting and retrying for the following servers: %s", strings.Join(failedServers, ","))
|
|
if err := g.Connect(); err != nil {
|
|
return fmt.Errorf("failed to reconnect: %w", err)
|
|
}
|
|
}
|
|
|
|
return g.send(batch)
|
|
}
|
|
|
|
func (g *Graphite) send(batch []byte) error {
|
|
// Try sending the data to a server. Try them in random order
|
|
p := rand.Perm(len(g.connections))
|
|
for i, n := range p {
|
|
server := g.connections[n]
|
|
|
|
// Skip unconnected servers
|
|
if !server.connected {
|
|
continue
|
|
}
|
|
|
|
if g.Timeout > 0 {
|
|
deadline := time.Now().Add(time.Duration(g.Timeout))
|
|
if err := server.conn.SetWriteDeadline(deadline); err != nil {
|
|
g.Log.Warnf("failed to set write deadline for %q: %v", server.name, err)
|
|
g.connections[n].connected = false
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Check the connection state
|
|
if err := g.checkEOF(server.conn); err != nil {
|
|
// Mark server as failed so a new connection will be made
|
|
g.connections[n].connected = false
|
|
continue
|
|
}
|
|
_, err := server.conn.Write(batch)
|
|
if err == nil {
|
|
// Sending the data was successfully
|
|
return nil
|
|
}
|
|
|
|
g.Log.Errorf("Writing to %q failed: %v", server.name, err)
|
|
if i < len(p)-1 {
|
|
g.Log.Info("Trying next server...")
|
|
}
|
|
// Mark server as failed so a new connection will be made
|
|
if server.conn != nil {
|
|
if err := server.conn.Close(); err != nil {
|
|
g.Log.Debugf("Failed to close connection to %q: %v", server.name, err)
|
|
}
|
|
}
|
|
g.connections[n].connected = false
|
|
}
|
|
|
|
// If we end here, none of the writes were successful
|
|
return ErrNotConnected
|
|
}
|
|
|
|
func init() {
|
|
outputs.Add("graphite", func() telegraf.Output {
|
|
return &Graphite{Timeout: config.Duration(2 * time.Second)}
|
|
})
|
|
}
|