1
0
Fork 0
telegraf/plugins/outputs/influxdb_v2/http.go

435 lines
12 KiB
Go
Raw Permalink Normal View History

package influxdb_v2
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"math"
"net"
"net/http"
"net/url"
"path"
"strconv"
"strings"
"time"
"golang.org/x/net/http2"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/ratelimiter"
)
type APIError struct {
Err error
StatusCode int
Retryable bool
}
func (e APIError) Error() string {
return e.Err.Error()
}
func (e APIError) Unwrap() error {
return e.Err
}
const (
defaultMaxWaitSeconds = 60
defaultMaxWaitRetryAfterSeconds = 10 * 60
)
type httpClient struct {
url *url.URL
localAddr *net.TCPAddr
token config.Secret
organization string
bucket string
bucketTag string
excludeBucketTag bool
timeout time.Duration
headers map[string]string
proxy *url.URL
userAgent string
contentEncoding string
pingTimeout config.Duration
readIdleTimeout config.Duration
tlsConfig *tls.Config
encoder internal.ContentEncoder
serializer ratelimiter.Serializer
rateLimiter *ratelimiter.RateLimiter
client *http.Client
params url.Values
retryTime time.Time
retryCount int
log telegraf.Logger
}
func (c *httpClient) Init() error {
if c.headers == nil {
c.headers = make(map[string]string, 1)
}
if _, ok := c.headers["User-Agent"]; !ok {
c.headers["User-Agent"] = c.userAgent
}
var proxy func(*http.Request) (*url.URL, error)
if c.proxy != nil {
proxy = http.ProxyURL(c.proxy)
} else {
proxy = http.ProxyFromEnvironment
}
var transport *http.Transport
switch c.url.Scheme {
case "http", "https":
var dialerFunc func(ctx context.Context, network, addr string) (net.Conn, error)
if c.localAddr != nil {
dialer := &net.Dialer{LocalAddr: c.localAddr}
dialerFunc = dialer.DialContext
}
transport = &http.Transport{
Proxy: proxy,
TLSClientConfig: c.tlsConfig,
DialContext: dialerFunc,
}
if c.readIdleTimeout != 0 || c.pingTimeout != 0 {
http2Trans, err := http2.ConfigureTransports(transport)
if err == nil {
http2Trans.ReadIdleTimeout = time.Duration(c.readIdleTimeout)
http2Trans.PingTimeout = time.Duration(c.pingTimeout)
}
}
case "unix":
transport = &http.Transport{
Dial: func(_, _ string) (net.Conn, error) {
return net.DialTimeout(
c.url.Scheme,
c.url.Path,
c.timeout,
)
},
}
default:
return fmt.Errorf("unsupported scheme %q", c.url.Scheme)
}
preppedURL, params, err := prepareWriteURL(*c.url, c.organization)
if err != nil {
return err
}
c.url = preppedURL
c.client = &http.Client{
Timeout: c.timeout,
Transport: transport,
}
c.params = params
return nil
}
type genericRespError struct {
Code string
Message string
Line *int32
MaxLength *int32
}
func (g genericRespError) Error() string {
errString := fmt.Sprintf("%s: %s", g.Code, g.Message)
if g.Line != nil {
return fmt.Sprintf("%s - line[%d]", errString, g.Line)
} else if g.MaxLength != nil {
return fmt.Sprintf("%s - maxlen[%d]", errString, g.MaxLength)
}
return errString
}
func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error {
if c.retryTime.After(time.Now()) {
return errors.New("retry time has not elapsed")
}
batches := make(map[string][]telegraf.Metric)
batchIndices := make(map[string][]int)
if c.bucketTag == "" {
batches[c.bucket] = metrics
batchIndices[c.bucket] = make([]int, len(metrics))
for i := range metrics {
batchIndices[c.bucket][i] = i
}
} else {
for i, metric := range metrics {
bucket, ok := metric.GetTag(c.bucketTag)
if !ok {
bucket = c.bucket
} else if c.excludeBucketTag {
// Avoid modifying the metric if we do remove the tag
metric = metric.Copy()
metric.Accept()
metric.RemoveTag(c.bucketTag)
}
batches[bucket] = append(batches[bucket], metric)
batchIndices[bucket] = append(batchIndices[bucket], i)
}
}
var wErr internal.PartialWriteError
for bucket, batch := range batches {
err := c.writeBatch(ctx, bucket, batch)
if err == nil {
wErr.MetricsAccept = append(wErr.MetricsAccept, batchIndices[bucket]...)
continue
}
// Check if the request was too large and split it
var apiErr *APIError
if errors.As(err, &apiErr) {
if apiErr.StatusCode == http.StatusRequestEntityTooLarge {
// TODO: Need a testcase to verify rejected metrics are not retried...
return c.splitAndWriteBatch(ctx, c.bucket, metrics)
}
wErr.Err = err
if !apiErr.Retryable {
wErr.MetricsReject = append(wErr.MetricsReject, batchIndices[bucket]...)
}
// TODO: Clarify if we should continue here to try the remaining buckets?
return &wErr
}
// Check if we got a write error and if so, translate the returned
// metric indices to return the original indices in case of bucketing
var writeErr *internal.PartialWriteError
if errors.As(err, &writeErr) {
wErr.Err = writeErr.Err
for _, idx := range writeErr.MetricsAccept {
wErr.MetricsAccept = append(wErr.MetricsAccept, batchIndices[bucket][idx])
}
for _, idx := range writeErr.MetricsReject {
wErr.MetricsReject = append(wErr.MetricsReject, batchIndices[bucket][idx])
}
if !errors.Is(writeErr.Err, internal.ErrSizeLimitReached) {
continue
}
return &wErr
}
// Return the error without special treatment
wErr.Err = err
return &wErr
}
return nil
}
func (c *httpClient) splitAndWriteBatch(ctx context.Context, bucket string, metrics []telegraf.Metric) error {
c.log.Warnf("Retrying write after splitting metric payload in half to reduce batch size")
midpoint := len(metrics) / 2
if err := c.writeBatch(ctx, bucket, metrics[:midpoint]); err != nil {
return err
}
return c.writeBatch(ctx, bucket, metrics[midpoint:])
}
func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []telegraf.Metric) error {
// Get the current limit for the outbound data
ratets := time.Now()
limit := c.rateLimiter.Remaining(ratets)
// Serialize the metrics with the remaining limit, exit early if nothing was serialized
body, werr := c.serializer.SerializeBatch(metrics, limit)
if werr != nil && !errors.Is(werr, internal.ErrSizeLimitReached) || len(body) == 0 {
return werr
}
used := int64(len(body))
// Encode the content if requested
if c.encoder != nil {
var err error
if body, err = c.encoder.Encode(body); err != nil {
return fmt.Errorf("encoding failed: %w", err)
}
}
// Setup the request
address := makeWriteURL(*c.url, c.params, bucket)
req, err := http.NewRequest("POST", address, io.NopCloser(bytes.NewBuffer(body)))
if err != nil {
return fmt.Errorf("creating request failed: %w", err)
}
if c.encoder != nil {
req.Header.Set("Content-Encoding", c.contentEncoding)
}
req.Header.Set("Content-Type", "text/plain; charset=utf-8")
// Set authorization
token, err := c.token.Get()
if err != nil {
return fmt.Errorf("getting token failed: %w", err)
}
req.Header.Set("Authorization", "Token "+token.String())
token.Destroy()
c.addHeaders(req)
// Execute the request
c.rateLimiter.Accept(ratets, used)
resp, err := c.client.Do(req.WithContext(ctx))
if err != nil {
internal.OnClientError(c.client, err)
return err
}
defer resp.Body.Close()
// Check for success
switch resp.StatusCode {
case
// this is the expected response:
http.StatusNoContent,
// but if we get these we should still accept it as delivered:
http.StatusOK,
http.StatusCreated,
http.StatusAccepted,
http.StatusPartialContent,
http.StatusMultiStatus,
http.StatusAlreadyReported:
c.retryCount = 0
return werr
}
// We got an error and now try to decode further
var desc string
writeResp := &genericRespError{}
if json.NewDecoder(resp.Body).Decode(writeResp) == nil {
desc = ": " + writeResp.Error()
}
switch resp.StatusCode {
// request was too large, send back to try again
case http.StatusRequestEntityTooLarge:
c.log.Errorf("Failed to write metric to %s, request was too large (413)", bucket)
return &APIError{
Err: fmt.Errorf("%s: %s", resp.Status, desc),
StatusCode: resp.StatusCode,
}
case
// request was malformed:
http.StatusBadRequest,
// request was received but server refused to process it due to a semantic problem with the request.
// for example, submitting metrics outside the retention period.
http.StatusUnprocessableEntity,
http.StatusNotAcceptable:
// Clients should *not* repeat the request and the metrics should be rejected.
return &APIError{
Err: fmt.Errorf("failed to write metric to %s (will be dropped: %s)%s", bucket, resp.Status, desc),
StatusCode: resp.StatusCode,
}
case http.StatusUnauthorized, http.StatusForbidden:
return fmt.Errorf("failed to write metric to %s (%s)%s", bucket, resp.Status, desc)
case http.StatusTooManyRequests,
http.StatusServiceUnavailable,
http.StatusBadGateway,
http.StatusGatewayTimeout:
// ^ these handle the cases where the server is likely overloaded, and may not be able to say so.
c.retryCount++
retryDuration := c.getRetryDuration(resp.Header)
c.retryTime = time.Now().Add(retryDuration)
c.log.Warnf("Failed to write to %s; will retry in %s. (%s)\n", bucket, retryDuration, resp.Status)
return fmt.Errorf("waiting %s for server (%s) before sending metric again", retryDuration, bucket)
}
// if it's any other 4xx code, the client should not retry as it's the client's mistake.
// retrying will not make the request magically work.
if len(resp.Status) > 0 && resp.Status[0] == '4' {
return &APIError{
Err: fmt.Errorf("failed to write metric to %s (will be dropped: %s)%s", bucket, resp.Status, desc),
StatusCode: resp.StatusCode,
}
}
// This is only until platform spec is fully implemented. As of the
// time of writing, there is no error body returned.
if xErr := resp.Header.Get("X-Influx-Error"); xErr != "" {
desc = fmt.Sprintf(": %s; %s", desc, xErr)
}
return &APIError{
Err: fmt.Errorf("failed to write metric to bucket %q: %s%s", bucket, resp.Status, desc),
StatusCode: resp.StatusCode,
Retryable: true,
}
}
// retryDuration takes the longer of the Retry-After header and our own back-off calculation
func (c *httpClient) getRetryDuration(headers http.Header) time.Duration {
// basic exponential backoff (x^2)/40 (denominator to widen the slope)
// at 40 denominator, it'll take 49 retries to hit the max defaultMaxWait of 60s
backoff := math.Pow(float64(c.retryCount), 2) / 40
backoff = math.Min(backoff, defaultMaxWaitSeconds)
// get any value from the header, if available
retryAfterHeader := float64(0)
retryAfterHeaderString := headers.Get("Retry-After")
if len(retryAfterHeaderString) > 0 {
var err error
retryAfterHeader, err = strconv.ParseFloat(retryAfterHeaderString, 64)
if err != nil {
// there was a value but we couldn't parse it? guess minimum 10 sec
retryAfterHeader = 10
}
// protect against excessively large retry-after
retryAfterHeader = math.Min(retryAfterHeader, defaultMaxWaitRetryAfterSeconds)
}
// take the highest value of backoff and retry-after.
retry := math.Max(backoff, retryAfterHeader)
return time.Duration(retry*1000) * time.Millisecond
}
func (c *httpClient) addHeaders(req *http.Request) {
for header, value := range c.headers {
if strings.EqualFold(header, "host") {
req.Host = value
} else {
req.Header.Set(header, value)
}
}
}
func makeWriteURL(loc url.URL, params url.Values, bucket string) string {
params.Set("bucket", bucket)
loc.RawQuery = params.Encode()
return loc.String()
}
func prepareWriteURL(loc url.URL, org string) (*url.URL, url.Values, error) {
switch loc.Scheme {
case "unix":
loc.Scheme = "http"
loc.Host = "127.0.0.1"
loc.Path = "/api/v2/write"
case "http", "https":
loc.Path = path.Join(loc.Path, "/api/v2/write")
default:
return nil, nil, fmt.Errorf("unsupported scheme: %q", loc.Scheme)
}
params := loc.Query()
params.Set("org", org)
return &loc, params, nil
}
func (c *httpClient) Close() {
c.client.CloseIdleConnections()
}