1
0
Fork 0
telegraf/plugins/outputs/sumologic/sumologic.go

276 lines
6.6 KiB
Go
Raw Permalink Normal View History

//go:generate ../../../tools/readme_config_includer/generator
package sumologic
import (
"bytes"
"compress/gzip"
_ "embed"
"errors"
"fmt"
"net/http"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers/carbon2"
"github.com/influxdata/telegraf/plugins/serializers/graphite"
"github.com/influxdata/telegraf/plugins/serializers/prometheus"
)
//go:embed sample.conf
var sampleConfig string
const (
defaultClientTimeout = 5 * time.Second
defaultMethod = http.MethodPost
defaultMaxRequestBodySize = 1000000
contentTypeHeader = "Content-Type"
carbon2ContentType = "application/vnd.sumologic.carbon2"
graphiteContentType = "application/vnd.sumologic.graphite"
prometheusContentType = "application/vnd.sumologic.prometheus"
)
type header string
const (
sourceNameHeader header = `X-Sumo-Name`
sourceHostHeader header = `X-Sumo-Host`
sourceCategoryHeader header = `X-Sumo-Category`
dimensionsHeader header = `X-Sumo-Dimensions`
)
type SumoLogic struct {
URL string `toml:"url"`
Timeout config.Duration `toml:"timeout"`
MaxRequestBodySize config.Size `toml:"max_request_body_size"`
SourceName string `toml:"source_name"`
SourceHost string `toml:"source_host"`
SourceCategory string `toml:"source_category"`
Dimensions string `toml:"dimensions"`
Log telegraf.Logger `toml:"-"`
client *http.Client
serializer telegraf.Serializer
headers map[string]string
}
func (*SumoLogic) SampleConfig() string {
return sampleConfig
}
func (s *SumoLogic) SetSerializer(serializer telegraf.Serializer) {
s.serializer = serializer
}
func (s *SumoLogic) createClient() *http.Client {
return &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
},
Timeout: time.Duration(s.Timeout),
}
}
func (s *SumoLogic) Connect() error {
s.headers = make(map[string]string)
var serializer telegraf.Serializer
if unwrapped, ok := s.serializer.(*models.RunningSerializer); ok {
serializer = unwrapped.Serializer
} else {
serializer = s.serializer
}
switch serializer.(type) {
case *carbon2.Serializer:
s.headers[contentTypeHeader] = carbon2ContentType
case *graphite.GraphiteSerializer:
s.headers[contentTypeHeader] = graphiteContentType
case *prometheus.Serializer:
s.headers[contentTypeHeader] = prometheusContentType
default:
return fmt.Errorf("unsupported serializer %T", serializer)
}
if s.Timeout == 0 {
s.Timeout = config.Duration(defaultClientTimeout)
}
s.client = s.createClient()
return nil
}
func (*SumoLogic) Close() error {
return nil
}
func (s *SumoLogic) Write(metrics []telegraf.Metric) error {
if s.serializer == nil {
return errors.New("sumologic: serializer unset")
}
if len(metrics) == 0 {
return nil
}
reqBody, err := s.serializer.SerializeBatch(metrics)
if err != nil {
return err
}
if l := len(reqBody); l > int(s.MaxRequestBodySize) {
chunks, err := s.splitIntoChunks(metrics)
if err != nil {
return err
}
return s.writeRequestChunks(chunks)
}
return s.writeRequestChunk(reqBody)
}
func (s *SumoLogic) writeRequestChunks(chunks [][]byte) error {
for _, reqChunk := range chunks {
if err := s.writeRequestChunk(reqChunk); err != nil {
s.Log.Errorf("Error sending chunk: %v", err)
}
}
return nil
}
func (s *SumoLogic) writeRequestChunk(reqBody []byte) error {
var (
err error
buff bytes.Buffer
gz = gzip.NewWriter(&buff)
)
if _, err = gz.Write(reqBody); err != nil {
return err
}
if err := gz.Close(); err != nil {
return err
}
req, err := http.NewRequest(defaultMethod, s.URL, &buff)
if err != nil {
return err
}
req.Header.Set("Content-Encoding", "gzip")
req.Header.Set("User-Agent", internal.ProductToken())
// Set headers coming from the configuration.
for k, v := range s.headers {
req.Header.Set(k, v)
}
setHeaderIfSetInConfig(req, sourceNameHeader, s.SourceName)
setHeaderIfSetInConfig(req, sourceHostHeader, s.SourceHost)
setHeaderIfSetInConfig(req, sourceCategoryHeader, s.SourceCategory)
setHeaderIfSetInConfig(req, dimensionsHeader, s.Dimensions)
resp, err := s.client.Do(req)
if err != nil {
return fmt.Errorf("sumologic: failed sending request to %q: %w", s.URL, err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("sumologic: when writing to %q received status code: %d", s.URL, resp.StatusCode)
}
return nil
}
// splitIntoChunks splits metrics to be sent into chunks so that every request
// is smaller than s.MaxRequestBodySize unless it was configured so small so that
// even a single metric cannot fit.
// In such a situation metrics will be sent one by one with a warning being logged
// for every request sent even though they don't fit in s.MaxRequestBodySize bytes.
func (s *SumoLogic) splitIntoChunks(metrics []telegraf.Metric) ([][]byte, error) {
var (
numMetrics = len(metrics)
chunks = make([][]byte, 0)
)
for i := 0; i < numMetrics; {
var toAppend []byte
for i < numMetrics {
chunkBody, err := s.serializer.Serialize(metrics[i])
if err != nil {
return nil, err
}
la := len(toAppend)
if la != 0 {
// We already have something to append ...
if la+len(chunkBody) > int(s.MaxRequestBodySize) {
// ... and it's just the right size, without currently processed chunk.
break
}
// ... we can try appending more.
i++
toAppend = append(toAppend, chunkBody...)
continue
}
// la == 0
i++
toAppend = chunkBody
if len(chunkBody) > int(s.MaxRequestBodySize) {
s.Log.Warnf(
"max_request_body_size set to %d which is too small even for a single metric (len: %d), sending without split",
s.MaxRequestBodySize, len(chunkBody),
)
// The serialized metric is too big, but we have no choice
// but to send it.
// max_request_body_size was set so small that it wouldn't
// even accommodate a single metric.
break
}
continue
}
if toAppend == nil {
break
}
chunks = append(chunks, toAppend)
}
return chunks, nil
}
func setHeaderIfSetInConfig(r *http.Request, h header, value string) {
if value != "" {
r.Header.Set(string(h), value)
}
}
func Default() *SumoLogic {
return &SumoLogic{
Timeout: config.Duration(defaultClientTimeout),
MaxRequestBodySize: defaultMaxRequestBodySize,
headers: make(map[string]string),
}
}
func init() {
outputs.Add("sumologic", func() telegraf.Output {
return Default()
})
}