222 lines
5.9 KiB
Go
222 lines
5.9 KiB
Go
|
//go:generate ../../../tools/readme_config_includer/generator
|
||
|
package influxdb_v2
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"crypto/tls"
|
||
|
_ "embed"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"math/rand"
|
||
|
"net"
|
||
|
"net/url"
|
||
|
"strconv"
|
||
|
"strings"
|
||
|
"time"
|
||
|
|
||
|
"github.com/influxdata/telegraf"
|
||
|
"github.com/influxdata/telegraf/config"
|
||
|
"github.com/influxdata/telegraf/internal"
|
||
|
"github.com/influxdata/telegraf/plugins/common/ratelimiter"
|
||
|
commontls "github.com/influxdata/telegraf/plugins/common/tls"
|
||
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||
|
"github.com/influxdata/telegraf/plugins/serializers/influx"
|
||
|
)
|
||
|
|
||
|
//go:embed sample.conf
|
||
|
var sampleConfig string
|
||
|
|
||
|
type InfluxDB struct {
|
||
|
URLs []string `toml:"urls"`
|
||
|
LocalAddr string `toml:"local_address"`
|
||
|
Token config.Secret `toml:"token"`
|
||
|
Organization string `toml:"organization"`
|
||
|
Bucket string `toml:"bucket"`
|
||
|
BucketTag string `toml:"bucket_tag"`
|
||
|
ExcludeBucketTag bool `toml:"exclude_bucket_tag"`
|
||
|
Timeout config.Duration `toml:"timeout"`
|
||
|
HTTPHeaders map[string]string `toml:"http_headers"`
|
||
|
HTTPProxy string `toml:"http_proxy"`
|
||
|
UserAgent string `toml:"user_agent"`
|
||
|
ContentEncoding string `toml:"content_encoding"`
|
||
|
UintSupport bool `toml:"influx_uint_support"`
|
||
|
OmitTimestamp bool `toml:"influx_omit_timestamp"`
|
||
|
PingTimeout config.Duration `toml:"ping_timeout"`
|
||
|
ReadIdleTimeout config.Duration `toml:"read_idle_timeout"`
|
||
|
Log telegraf.Logger `toml:"-"`
|
||
|
commontls.ClientConfig
|
||
|
ratelimiter.RateLimitConfig
|
||
|
|
||
|
clients []*httpClient
|
||
|
encoder internal.ContentEncoder
|
||
|
serializer ratelimiter.Serializer
|
||
|
tlsCfg *tls.Config
|
||
|
}
|
||
|
|
||
|
func (*InfluxDB) SampleConfig() string {
|
||
|
return sampleConfig
|
||
|
}
|
||
|
|
||
|
func (i *InfluxDB) Init() error {
|
||
|
// Set defaults
|
||
|
if i.UserAgent == "" {
|
||
|
i.UserAgent = internal.ProductToken()
|
||
|
}
|
||
|
|
||
|
if len(i.URLs) == 0 {
|
||
|
i.URLs = append(i.URLs, "http://localhost:8086")
|
||
|
}
|
||
|
|
||
|
// Init encoding if configured
|
||
|
switch i.ContentEncoding {
|
||
|
case "", "gzip":
|
||
|
i.ContentEncoding = "gzip"
|
||
|
enc, err := internal.NewGzipEncoder()
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("setting up gzip encoder failed: %w", err)
|
||
|
}
|
||
|
i.encoder = enc
|
||
|
case "identity":
|
||
|
default:
|
||
|
return fmt.Errorf("invalid content encoding %q", i.ContentEncoding)
|
||
|
}
|
||
|
|
||
|
// Setup the limited serializer
|
||
|
serializer := &influx.Serializer{
|
||
|
UintSupport: i.UintSupport,
|
||
|
OmitTimestamp: i.OmitTimestamp,
|
||
|
}
|
||
|
if err := serializer.Init(); err != nil {
|
||
|
return fmt.Errorf("setting up serializer failed: %w", err)
|
||
|
}
|
||
|
i.serializer = ratelimiter.NewIndividualSerializer(serializer)
|
||
|
|
||
|
// Setup the client config
|
||
|
tlsCfg, err := i.ClientConfig.TLSConfig()
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("setting up TLS failed: %w", err)
|
||
|
}
|
||
|
i.tlsCfg = tlsCfg
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (i *InfluxDB) Connect() error {
|
||
|
for _, u := range i.URLs {
|
||
|
parts, err := url.Parse(u)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("error parsing url [%q]: %w", u, err)
|
||
|
}
|
||
|
|
||
|
var proxy *url.URL
|
||
|
if len(i.HTTPProxy) > 0 {
|
||
|
proxy, err = url.Parse(i.HTTPProxy)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("error parsing proxy_url [%s]: %w", i.HTTPProxy, err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
var localAddr *net.TCPAddr
|
||
|
if i.LocalAddr != "" {
|
||
|
// Resolve the local address into IP address and the given port if any
|
||
|
addr, sPort, err := net.SplitHostPort(i.LocalAddr)
|
||
|
if err != nil {
|
||
|
if !strings.Contains(err.Error(), "missing port") {
|
||
|
return fmt.Errorf("invalid local address: %w", err)
|
||
|
}
|
||
|
addr = i.LocalAddr
|
||
|
}
|
||
|
local, err := net.ResolveIPAddr("ip", addr)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("cannot resolve local address: %w", err)
|
||
|
}
|
||
|
|
||
|
var port int
|
||
|
if sPort != "" {
|
||
|
p, err := strconv.ParseUint(sPort, 10, 16)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("invalid port: %w", err)
|
||
|
}
|
||
|
port = int(p)
|
||
|
}
|
||
|
|
||
|
localAddr = &net.TCPAddr{IP: local.IP, Port: port, Zone: local.Zone}
|
||
|
}
|
||
|
|
||
|
switch parts.Scheme {
|
||
|
case "http", "https", "unix":
|
||
|
limiter, err := i.RateLimitConfig.CreateRateLimiter()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
c := &httpClient{
|
||
|
url: parts,
|
||
|
localAddr: localAddr,
|
||
|
token: i.Token,
|
||
|
organization: i.Organization,
|
||
|
bucket: i.Bucket,
|
||
|
bucketTag: i.BucketTag,
|
||
|
excludeBucketTag: i.ExcludeBucketTag,
|
||
|
timeout: time.Duration(i.Timeout),
|
||
|
headers: i.HTTPHeaders,
|
||
|
proxy: proxy,
|
||
|
userAgent: i.UserAgent,
|
||
|
contentEncoding: i.ContentEncoding,
|
||
|
tlsConfig: i.tlsCfg,
|
||
|
pingTimeout: i.PingTimeout,
|
||
|
readIdleTimeout: i.ReadIdleTimeout,
|
||
|
encoder: i.encoder,
|
||
|
rateLimiter: limiter,
|
||
|
serializer: i.serializer,
|
||
|
log: i.Log,
|
||
|
}
|
||
|
|
||
|
if err := c.Init(); err != nil {
|
||
|
return fmt.Errorf("error creating HTTP client [%s]: %w", parts, err)
|
||
|
}
|
||
|
|
||
|
i.clients = append(i.clients, c)
|
||
|
default:
|
||
|
return fmt.Errorf("unsupported scheme [%q]: %q", u, parts.Scheme)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (i *InfluxDB) Close() error {
|
||
|
for _, client := range i.clients {
|
||
|
client.Close()
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Write sends metrics to one of the configured servers, logging each
|
||
|
// unsuccessful. If all servers fail, return an error.
|
||
|
func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
||
|
ctx := context.Background()
|
||
|
|
||
|
for _, n := range rand.Perm(len(i.clients)) {
|
||
|
client := i.clients[n]
|
||
|
if err := client.Write(ctx, metrics); err != nil {
|
||
|
i.Log.Errorf("When writing to [%s]: %v", client.url, err)
|
||
|
var werr *internal.PartialWriteError
|
||
|
if errors.As(err, &werr) || errors.Is(err, internal.ErrSizeLimitReached) {
|
||
|
return err
|
||
|
}
|
||
|
continue
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
return errors.New("failed to send metrics to any configured server(s)")
|
||
|
}
|
||
|
|
||
|
func init() {
|
||
|
outputs.Add("influxdb_v2", func() telegraf.Output {
|
||
|
return &InfluxDB{
|
||
|
Timeout: config.Duration(time.Second * 5),
|
||
|
}
|
||
|
})
|
||
|
}
|