276 lines
6.6 KiB
Go
276 lines
6.6 KiB
Go
|
//go:generate ../../../tools/readme_config_includer/generator
|
||
|
package sumologic
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"compress/gzip"
|
||
|
_ "embed"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"net/http"
|
||
|
"time"
|
||
|
|
||
|
"github.com/influxdata/telegraf"
|
||
|
"github.com/influxdata/telegraf/config"
|
||
|
"github.com/influxdata/telegraf/internal"
|
||
|
"github.com/influxdata/telegraf/models"
|
||
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||
|
"github.com/influxdata/telegraf/plugins/serializers/carbon2"
|
||
|
"github.com/influxdata/telegraf/plugins/serializers/graphite"
|
||
|
"github.com/influxdata/telegraf/plugins/serializers/prometheus"
|
||
|
)
|
||
|
|
||
|
//go:embed sample.conf
|
||
|
var sampleConfig string
|
||
|
|
||
|
const (
|
||
|
defaultClientTimeout = 5 * time.Second
|
||
|
defaultMethod = http.MethodPost
|
||
|
defaultMaxRequestBodySize = 1000000
|
||
|
|
||
|
contentTypeHeader = "Content-Type"
|
||
|
carbon2ContentType = "application/vnd.sumologic.carbon2"
|
||
|
graphiteContentType = "application/vnd.sumologic.graphite"
|
||
|
prometheusContentType = "application/vnd.sumologic.prometheus"
|
||
|
)
|
||
|
|
||
|
type header string
|
||
|
|
||
|
const (
|
||
|
sourceNameHeader header = `X-Sumo-Name`
|
||
|
sourceHostHeader header = `X-Sumo-Host`
|
||
|
sourceCategoryHeader header = `X-Sumo-Category`
|
||
|
dimensionsHeader header = `X-Sumo-Dimensions`
|
||
|
)
|
||
|
|
||
|
type SumoLogic struct {
|
||
|
URL string `toml:"url"`
|
||
|
Timeout config.Duration `toml:"timeout"`
|
||
|
MaxRequestBodySize config.Size `toml:"max_request_body_size"`
|
||
|
|
||
|
SourceName string `toml:"source_name"`
|
||
|
SourceHost string `toml:"source_host"`
|
||
|
SourceCategory string `toml:"source_category"`
|
||
|
Dimensions string `toml:"dimensions"`
|
||
|
|
||
|
Log telegraf.Logger `toml:"-"`
|
||
|
|
||
|
client *http.Client
|
||
|
serializer telegraf.Serializer
|
||
|
|
||
|
headers map[string]string
|
||
|
}
|
||
|
|
||
|
func (*SumoLogic) SampleConfig() string {
|
||
|
return sampleConfig
|
||
|
}
|
||
|
|
||
|
func (s *SumoLogic) SetSerializer(serializer telegraf.Serializer) {
|
||
|
s.serializer = serializer
|
||
|
}
|
||
|
|
||
|
func (s *SumoLogic) createClient() *http.Client {
|
||
|
return &http.Client{
|
||
|
Transport: &http.Transport{
|
||
|
Proxy: http.ProxyFromEnvironment,
|
||
|
},
|
||
|
Timeout: time.Duration(s.Timeout),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *SumoLogic) Connect() error {
|
||
|
s.headers = make(map[string]string)
|
||
|
|
||
|
var serializer telegraf.Serializer
|
||
|
if unwrapped, ok := s.serializer.(*models.RunningSerializer); ok {
|
||
|
serializer = unwrapped.Serializer
|
||
|
} else {
|
||
|
serializer = s.serializer
|
||
|
}
|
||
|
|
||
|
switch serializer.(type) {
|
||
|
case *carbon2.Serializer:
|
||
|
s.headers[contentTypeHeader] = carbon2ContentType
|
||
|
case *graphite.GraphiteSerializer:
|
||
|
s.headers[contentTypeHeader] = graphiteContentType
|
||
|
case *prometheus.Serializer:
|
||
|
s.headers[contentTypeHeader] = prometheusContentType
|
||
|
default:
|
||
|
return fmt.Errorf("unsupported serializer %T", serializer)
|
||
|
}
|
||
|
|
||
|
if s.Timeout == 0 {
|
||
|
s.Timeout = config.Duration(defaultClientTimeout)
|
||
|
}
|
||
|
|
||
|
s.client = s.createClient()
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (*SumoLogic) Close() error {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *SumoLogic) Write(metrics []telegraf.Metric) error {
|
||
|
if s.serializer == nil {
|
||
|
return errors.New("sumologic: serializer unset")
|
||
|
}
|
||
|
if len(metrics) == 0 {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
reqBody, err := s.serializer.SerializeBatch(metrics)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if l := len(reqBody); l > int(s.MaxRequestBodySize) {
|
||
|
chunks, err := s.splitIntoChunks(metrics)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return s.writeRequestChunks(chunks)
|
||
|
}
|
||
|
|
||
|
return s.writeRequestChunk(reqBody)
|
||
|
}
|
||
|
|
||
|
func (s *SumoLogic) writeRequestChunks(chunks [][]byte) error {
|
||
|
for _, reqChunk := range chunks {
|
||
|
if err := s.writeRequestChunk(reqChunk); err != nil {
|
||
|
s.Log.Errorf("Error sending chunk: %v", err)
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *SumoLogic) writeRequestChunk(reqBody []byte) error {
|
||
|
var (
|
||
|
err error
|
||
|
buff bytes.Buffer
|
||
|
gz = gzip.NewWriter(&buff)
|
||
|
)
|
||
|
|
||
|
if _, err = gz.Write(reqBody); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if err := gz.Close(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
req, err := http.NewRequest(defaultMethod, s.URL, &buff)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
req.Header.Set("Content-Encoding", "gzip")
|
||
|
req.Header.Set("User-Agent", internal.ProductToken())
|
||
|
|
||
|
// Set headers coming from the configuration.
|
||
|
for k, v := range s.headers {
|
||
|
req.Header.Set(k, v)
|
||
|
}
|
||
|
|
||
|
setHeaderIfSetInConfig(req, sourceNameHeader, s.SourceName)
|
||
|
setHeaderIfSetInConfig(req, sourceHostHeader, s.SourceHost)
|
||
|
setHeaderIfSetInConfig(req, sourceCategoryHeader, s.SourceCategory)
|
||
|
setHeaderIfSetInConfig(req, dimensionsHeader, s.Dimensions)
|
||
|
|
||
|
resp, err := s.client.Do(req)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("sumologic: failed sending request to %q: %w", s.URL, err)
|
||
|
}
|
||
|
defer resp.Body.Close()
|
||
|
|
||
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||
|
return fmt.Errorf("sumologic: when writing to %q received status code: %d", s.URL, resp.StatusCode)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// splitIntoChunks splits metrics to be sent into chunks so that every request
|
||
|
// is smaller than s.MaxRequestBodySize unless it was configured so small so that
|
||
|
// even a single metric cannot fit.
|
||
|
// In such a situation metrics will be sent one by one with a warning being logged
|
||
|
// for every request sent even though they don't fit in s.MaxRequestBodySize bytes.
|
||
|
func (s *SumoLogic) splitIntoChunks(metrics []telegraf.Metric) ([][]byte, error) {
|
||
|
var (
|
||
|
numMetrics = len(metrics)
|
||
|
chunks = make([][]byte, 0)
|
||
|
)
|
||
|
|
||
|
for i := 0; i < numMetrics; {
|
||
|
var toAppend []byte
|
||
|
for i < numMetrics {
|
||
|
chunkBody, err := s.serializer.Serialize(metrics[i])
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
la := len(toAppend)
|
||
|
if la != 0 {
|
||
|
// We already have something to append ...
|
||
|
if la+len(chunkBody) > int(s.MaxRequestBodySize) {
|
||
|
// ... and it's just the right size, without currently processed chunk.
|
||
|
break
|
||
|
}
|
||
|
// ... we can try appending more.
|
||
|
i++
|
||
|
toAppend = append(toAppend, chunkBody...)
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
// la == 0
|
||
|
i++
|
||
|
toAppend = chunkBody
|
||
|
|
||
|
if len(chunkBody) > int(s.MaxRequestBodySize) {
|
||
|
s.Log.Warnf(
|
||
|
"max_request_body_size set to %d which is too small even for a single metric (len: %d), sending without split",
|
||
|
s.MaxRequestBodySize, len(chunkBody),
|
||
|
)
|
||
|
|
||
|
// The serialized metric is too big, but we have no choice
|
||
|
// but to send it.
|
||
|
// max_request_body_size was set so small that it wouldn't
|
||
|
// even accommodate a single metric.
|
||
|
break
|
||
|
}
|
||
|
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
if toAppend == nil {
|
||
|
break
|
||
|
}
|
||
|
|
||
|
chunks = append(chunks, toAppend)
|
||
|
}
|
||
|
|
||
|
return chunks, nil
|
||
|
}
|
||
|
|
||
|
func setHeaderIfSetInConfig(r *http.Request, h header, value string) {
|
||
|
if value != "" {
|
||
|
r.Header.Set(string(h), value)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func Default() *SumoLogic {
|
||
|
return &SumoLogic{
|
||
|
Timeout: config.Duration(defaultClientTimeout),
|
||
|
MaxRequestBodySize: defaultMaxRequestBodySize,
|
||
|
headers: make(map[string]string),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func init() {
|
||
|
outputs.Add("sumologic", func() telegraf.Output {
|
||
|
return Default()
|
||
|
})
|
||
|
}
|