1
0
Fork 0
telegraf/plugins/outputs/graylog/graylog.go

539 lines
10 KiB
Go
Raw Permalink Normal View History

//go:generate ../../../tools/readme_config_includer/generator
package graylog
import (
"bytes"
"compress/zlib"
"crypto/rand"
"crypto/tls"
_ "embed"
"encoding/binary"
ejson "encoding/json"
"fmt"
"io"
"math"
"net"
"os"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
common_tls "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
)
//go:embed sample.conf
var sampleConfig string
const (
defaultEndpoint = "127.0.0.1:12201"
defaultConnection = "wan"
defaultMaxChunkSizeWan = 1420
defaultMaxChunkSizeLan = 8154
defaultScheme = "udp"
defaultTimeout = 5 * time.Second
defaultReconnectionTime = 15 * time.Second
)
var defaultSpecFields = []string{"version", "host", "short_message", "full_message", "timestamp", "level", "facility", "line", "file"}
type gelfConfig struct {
Endpoint string
Connection string
MaxChunkSizeWan int
MaxChunkSizeLan int
}
type gelf interface {
io.WriteCloser
Connect() error
}
type gelfCommon struct {
gelfConfig
dialer *net.Dialer
conn net.Conn
}
type gelfUDP struct {
gelfCommon
}
type gelfTCP struct {
gelfCommon
tlsConfig *tls.Config
}
func newGelfWriter(cfg gelfConfig, dialer *net.Dialer, tlsConfig *tls.Config) gelf {
if cfg.Endpoint == "" {
cfg.Endpoint = defaultEndpoint
}
if cfg.Connection == "" {
cfg.Connection = defaultConnection
}
if cfg.MaxChunkSizeWan == 0 {
cfg.MaxChunkSizeWan = defaultMaxChunkSizeWan
}
if cfg.MaxChunkSizeLan == 0 {
cfg.MaxChunkSizeLan = defaultMaxChunkSizeLan
}
scheme := defaultScheme
parts := strings.SplitN(cfg.Endpoint, "://", 2)
if len(parts) == 2 {
scheme = strings.ToLower(parts[0])
cfg.Endpoint = parts[1]
}
common := gelfCommon{
gelfConfig: cfg,
dialer: dialer,
}
var g gelf
switch scheme {
case "tcp":
g = &gelfTCP{gelfCommon: common, tlsConfig: tlsConfig}
default:
g = &gelfUDP{gelfCommon: common}
}
return g
}
func (g *gelfUDP) Write(message []byte) (n int, err error) {
compressed, err := g.compress(message)
if err != nil {
return 0, err
}
chunksize := g.gelfConfig.MaxChunkSizeWan
length := compressed.Len()
if length > chunksize {
chunkCountInt := int(math.Ceil(float64(length) / float64(chunksize)))
id := make([]byte, 8)
_, err = rand.Read(id)
if err != nil {
return 0, err
}
for i, index := 0, 0; i < length; i, index = i+chunksize, index+1 {
packet, err := g.createChunkedMessage(index, chunkCountInt, id, &compressed)
if err != nil {
return 0, err
}
err = g.send(packet.Bytes())
if err != nil {
return 0, err
}
}
} else {
err = g.send(compressed.Bytes())
if err != nil {
return 0, err
}
}
n = len(message)
return n, nil
}
func (g *gelfUDP) Close() (err error) {
if g.conn != nil {
err = g.conn.Close()
g.conn = nil
}
return err
}
func (g *gelfUDP) createChunkedMessage(index, chunkCountInt int, id []byte, compressed *bytes.Buffer) (bytes.Buffer, error) {
var packet bytes.Buffer
chunksize := g.getChunksize()
b, err := g.intToBytes(30)
if err != nil {
return packet, err
}
packet.Write(b)
b, err = g.intToBytes(15)
if err != nil {
return packet, err
}
packet.Write(b)
packet.Write(id)
b, err = g.intToBytes(index)
if err != nil {
return packet, err
}
packet.Write(b)
b, err = g.intToBytes(chunkCountInt)
if err != nil {
return packet, err
}
packet.Write(b)
packet.Write(compressed.Next(chunksize))
return packet, nil
}
func (g *gelfUDP) getChunksize() int {
if g.gelfConfig.Connection == "wan" {
return g.gelfConfig.MaxChunkSizeWan
}
if g.gelfConfig.Connection == "lan" {
return g.gelfConfig.MaxChunkSizeLan
}
return g.gelfConfig.MaxChunkSizeWan
}
func (*gelfUDP) intToBytes(i int) ([]byte, error) {
buf := new(bytes.Buffer)
err := binary.Write(buf, binary.LittleEndian, int8(i))
if err != nil {
return nil, err
}
return buf.Bytes(), err
}
func (*gelfUDP) compress(b []byte) (bytes.Buffer, error) {
var buf bytes.Buffer
comp := zlib.NewWriter(&buf)
if _, err := comp.Write(b); err != nil {
return bytes.Buffer{}, err
}
if err := comp.Close(); err != nil {
return bytes.Buffer{}, err
}
return buf, nil
}
func (g *gelfUDP) Connect() error {
conn, err := g.dialer.Dial("udp", g.gelfConfig.Endpoint)
if err != nil {
return err
}
g.conn = conn
return nil
}
func (g *gelfUDP) send(b []byte) error {
if g.conn == nil {
err := g.Connect()
if err != nil {
return err
}
}
_, err := g.conn.Write(b)
if err != nil {
_ = g.conn.Close()
g.conn = nil
}
return err
}
func (g *gelfTCP) Write(message []byte) (n int, err error) {
err = g.send(message)
if err != nil {
return 0, err
}
n = len(message)
return n, nil
}
func (g *gelfTCP) Close() (err error) {
if g.conn != nil {
err = g.conn.Close()
g.conn = nil
}
return err
}
func (g *gelfTCP) Connect() error {
var err error
var conn net.Conn
if g.tlsConfig == nil {
conn, err = g.dialer.Dial("tcp", g.gelfConfig.Endpoint)
} else {
conn, err = tls.DialWithDialer(g.dialer, "tcp", g.gelfConfig.Endpoint, g.tlsConfig)
}
if err != nil {
return err
}
g.conn = conn
return nil
}
func (g *gelfTCP) send(b []byte) error {
if g.conn == nil {
err := g.Connect()
if err != nil {
return err
}
}
_, err := g.conn.Write(b)
if err != nil {
_ = g.conn.Close()
g.conn = nil
} else {
_, err = g.conn.Write([]byte{0}) // message delimiter
if err != nil {
_ = g.conn.Close()
g.conn = nil
}
}
return err
}
type Graylog struct {
Servers []string `toml:"servers"`
ShortMessageField string `toml:"short_message_field"`
NameFieldNoPrefix bool `toml:"name_field_noprefix"`
Timeout config.Duration `toml:"timeout"`
Reconnection bool `toml:"connection_retry"`
ReconnectionTime config.Duration `toml:"connection_retry_wait_time"`
Log telegraf.Logger `toml:"-"`
common_tls.ClientConfig
writer io.Writer
closers []io.WriteCloser
unconnected []string
stopRetry bool
wg sync.WaitGroup
sync.Mutex
}
func (*Graylog) SampleConfig() string {
return sampleConfig
}
func (g *Graylog) Connect() error {
if len(g.Servers) == 0 {
g.Servers = append(g.Servers, "localhost:12201")
}
tlsCfg, err := g.ClientConfig.TLSConfig()
if err != nil {
return err
}
if g.Reconnection {
go g.connectRetry(tlsCfg)
return nil
}
unconnected, gelfs := g.connectEndpoints(g.Servers, tlsCfg)
if len(unconnected) > 0 {
servers := strings.Join(unconnected, ",")
return fmt.Errorf("connect: connection failed for %s", servers)
}
writers := make([]io.Writer, 0, len(gelfs))
closers := make([]io.WriteCloser, 0, len(gelfs))
for _, w := range gelfs {
writers = append(writers, w)
closers = append(closers, w)
}
g.Lock()
defer g.Unlock()
g.writer = io.MultiWriter(writers...)
g.closers = closers
return nil
}
func (g *Graylog) connectRetry(tlsCfg *tls.Config) {
var writers []io.Writer
var closers []io.WriteCloser
var attempt int64
g.wg.Add(1)
servers := make([]string, 0, len(g.Servers))
servers = append(servers, g.Servers...)
for {
unconnected, gelfs := g.connectEndpoints(servers, tlsCfg)
for _, w := range gelfs {
writers = append(writers, w)
closers = append(closers, w)
}
g.Lock()
g.unconnected = unconnected
stopRetry := g.stopRetry
g.Unlock()
if stopRetry {
g.Log.Info("Stopping connection retries...")
break
}
if len(unconnected) == 0 {
break
}
attempt++
servers := strings.Join(unconnected, ",")
g.Log.Infof("Not connected to endpoints %s after attempt #%d...", servers, attempt)
time.Sleep(time.Duration(g.ReconnectionTime))
}
g.Log.Info("Connected!")
g.Lock()
g.writer = io.MultiWriter(writers...)
g.closers = closers
g.Unlock()
g.wg.Done()
}
func (g *Graylog) connectEndpoints(servers []string, tlsCfg *tls.Config) ([]string, []gelf) {
writers := make([]gelf, 0, len(servers))
unconnected := make([]string, 0, len(servers))
dialer := &net.Dialer{Timeout: time.Duration(g.Timeout)}
for _, server := range servers {
w := newGelfWriter(gelfConfig{Endpoint: server}, dialer, tlsCfg)
if err := w.Connect(); err != nil {
g.Log.Warnf("failed to connect to server [%s]: %v", server, err)
unconnected = append(unconnected, server)
continue
}
writers = append(writers, w)
}
return unconnected, writers
}
func (g *Graylog) Close() error {
g.Lock()
g.stopRetry = true
g.Unlock()
g.wg.Wait()
for _, closer := range g.closers {
_ = closer.Close()
}
return nil
}
func (g *Graylog) Write(metrics []telegraf.Metric) error {
g.Lock()
writer := g.writer
g.Unlock()
if writer == nil {
g.Lock()
unconnected := strings.Join(g.unconnected, ",")
g.Unlock()
return fmt.Errorf("not connected to %s", unconnected)
}
for _, metric := range metrics {
values, err := g.serialize(metric)
if err != nil {
return err
}
for _, value := range values {
_, err = writer.Write([]byte(value))
if err != nil {
return fmt.Errorf("error writing message: %q: %w", value, err)
}
}
}
return nil
}
func (g *Graylog) serialize(metric telegraf.Metric) ([]string, error) {
m := make(map[string]interface{})
m["version"] = "1.1"
m["timestamp"] = float64(metric.Time().UnixNano()) / 1_000_000_000
m["short_message"] = "telegraf"
if g.NameFieldNoPrefix {
m["name"] = metric.Name()
} else {
m["_name"] = metric.Name()
}
if host, ok := metric.GetTag("host"); ok {
m["host"] = host
} else {
host, err := os.Hostname()
if err != nil {
return nil, err
}
m["host"] = host
}
for _, tag := range metric.TagList() {
if tag.Key == "host" {
continue
}
if fieldInSpec(tag.Key) {
m[tag.Key] = tag.Value
} else {
m["_"+tag.Key] = tag.Value
}
}
for _, field := range metric.FieldList() {
if field.Key == g.ShortMessageField {
m["short_message"] = field.Value
} else if fieldInSpec(field.Key) {
m[field.Key] = field.Value
} else {
m["_"+field.Key] = field.Value
}
}
serialized, err := ejson.Marshal(m)
if err != nil {
return nil, err
}
return []string{string(serialized)}, nil
}
func fieldInSpec(field string) bool {
for _, specField := range defaultSpecFields {
if specField == field {
return true
}
}
return false
}
func init() {
outputs.Add("graylog", func() telegraf.Output {
return &Graylog{
Timeout: config.Duration(defaultTimeout),
ReconnectionTime: config.Duration(defaultReconnectionTime),
}
})
}