package sumologic import ( "bufio" "bytes" "compress/gzip" "errors" "fmt" "io" "net/http" "net/http/httptest" "net/url" "sync/atomic" "testing" "time" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/serializers/carbon2" "github.com/influxdata/telegraf/plugins/serializers/graphite" "github.com/influxdata/telegraf/plugins/serializers/prometheus" "github.com/influxdata/telegraf/testutil" ) func getMetric() telegraf.Metric { m := metric.New( "cpu", map[string]string{}, map[string]interface{}{ "value": 42.0, }, time.Unix(0, 0), ) return m } func getMetrics() []telegraf.Metric { const count = 100 var metrics = make([]telegraf.Metric, count) for i := 0; i < count; i++ { m := metric.New( fmt.Sprintf("cpu-%d", i), map[string]string{ "ec2_instance": "aws-129038123", "image": "aws-ami-1234567890", }, map[string]interface{}{ "idle": 5876876, "steal": 5876876, "system": 5876876, "user": 5876876, "temp": 70.0, }, time.Unix(0, 0), ) metrics[i] = m } return metrics } func TestMethod(t *testing.T) { ts := httptest.NewServer(http.NotFoundHandler()) defer ts.Close() u, err := url.Parse("http://" + ts.Listener.Addr().String()) require.NoError(t, err) tests := []struct { name string plugin func() *SumoLogic expectedMethod string connectError bool }{ { name: "default method is POST", plugin: func() *SumoLogic { s := Default() s.URL = u.String() return s }, expectedMethod: http.MethodPost, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method != tt.expectedMethod { w.WriteHeader(http.StatusInternalServerError) t.Errorf("Not equal, expected: %q, actual: %q", tt.expectedMethod, r.Method) return } w.WriteHeader(http.StatusOK) }) serializer := &carbon2.Serializer{ Format: "field_separate", } require.NoError(t, serializer.Init()) plugin := tt.plugin() plugin.SetSerializer(serializer) err = plugin.Connect() if tt.connectError { require.Error(t, err) return } require.NoError(t, err) err = plugin.Write([]telegraf.Metric{getMetric()}) require.NoError(t, err) }) } } func TestStatusCode(t *testing.T) { ts := httptest.NewServer(http.NotFoundHandler()) defer ts.Close() u, err := url.Parse("http://" + ts.Listener.Addr().String()) require.NoError(t, err) pluginFn := func() *SumoLogic { s := Default() s.URL = u.String() return s } tests := []struct { name string plugin *SumoLogic statusCode int errFunc func(t *testing.T, err error) }{ { name: "success", plugin: pluginFn(), statusCode: http.StatusOK, errFunc: func(t *testing.T, err error) { require.NoError(t, err) }, }, { name: "1xx status is an error", plugin: pluginFn(), statusCode: http.StatusSwitchingProtocols, errFunc: func(t *testing.T, err error) { require.Error(t, err) }, }, { name: "3xx status is an error", plugin: pluginFn(), statusCode: http.StatusMultipleChoices, errFunc: func(t *testing.T, err error) { require.Error(t, err) }, }, { name: "4xx status is an error", plugin: pluginFn(), statusCode: http.StatusBadRequest, errFunc: func(t *testing.T, err error) { require.Error(t, err) }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(tt.statusCode) }) serializer := &carbon2.Serializer{ Format: "field_separate", } require.NoError(t, serializer.Init()) tt.plugin.SetSerializer(serializer) err = tt.plugin.Connect() require.NoError(t, err) err = tt.plugin.Write([]telegraf.Metric{getMetric()}) tt.errFunc(t, err) }) } } func TestContentType(t *testing.T) { tests := []struct { name string plugin func() *SumoLogic expectedBody []byte }{ { name: "carbon2 (data format = field separate) is supported", plugin: func() *SumoLogic { s := Default() s.headers = map[string]string{ contentTypeHeader: carbon2ContentType, } serializer := &carbon2.Serializer{ Format: "field_separate", } require.NoError(t, serializer.Init()) s.SetSerializer(serializer) return s }, expectedBody: []byte("metric=cpu field=value 42 0\n"), }, { name: "carbon2 (data format = metric includes field) is supported", plugin: func() *SumoLogic { s := Default() s.headers = map[string]string{ contentTypeHeader: carbon2ContentType, } serializer := &carbon2.Serializer{ Format: "metric_includes_field", } require.NoError(t, serializer.Init()) s.SetSerializer(serializer) return s }, expectedBody: []byte("metric=cpu_value 42 0\n"), }, { name: "graphite is supported", plugin: func() *SumoLogic { s := Default() s.headers = map[string]string{ contentTypeHeader: graphiteContentType, } serializer := &graphite.GraphiteSerializer{} require.NoError(t, serializer.Init()) s.SetSerializer(serializer) return s }, }, { name: "prometheus is supported", plugin: func() *SumoLogic { s := Default() s.headers = map[string]string{ contentTypeHeader: prometheusContentType, } s.SetSerializer(&prometheus.Serializer{}) return s }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { var body bytes.Buffer ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { gz, err := gzip.NewReader(r.Body) if err != nil { w.WriteHeader(http.StatusInternalServerError) t.Error(err) return } var maxDecompressionSize int64 = 500 * 1024 * 1024 n, err := io.CopyN(&body, gz, maxDecompressionSize) if errors.Is(err, io.EOF) { err = nil } if err != nil { w.WriteHeader(http.StatusInternalServerError) t.Error(err) return } if n > maxDecompressionSize { w.WriteHeader(http.StatusInternalServerError) t.Errorf("Size of decoded data exceeds (%v) allowed size (%v)", n, maxDecompressionSize) return } w.WriteHeader(http.StatusOK) })) defer ts.Close() u, err := url.Parse("http://" + ts.Listener.Addr().String()) require.NoError(t, err) plugin := tt.plugin() plugin.URL = u.String() require.NoError(t, plugin.Connect()) err = plugin.Write([]telegraf.Metric{getMetric()}) require.NoError(t, err) if tt.expectedBody != nil { require.Equal(t, string(tt.expectedBody), body.String()) } }) } } func TestContentEncodingGzip(t *testing.T) { ts := httptest.NewServer(http.NotFoundHandler()) defer ts.Close() u, err := url.Parse("http://" + ts.Listener.Addr().String()) require.NoError(t, err) tests := []struct { name string plugin func() *SumoLogic }{ { name: "default content_encoding=gzip works", plugin: func() *SumoLogic { s := Default() s.URL = u.String() return s }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Header.Get("Content-Encoding") != "gzip" { w.WriteHeader(http.StatusInternalServerError) t.Errorf("Not equal, expected: %q, actual: %q", "gzip", r.Header.Get("Content-Encoding")) return } body, err := gzip.NewReader(r.Body) if err != nil { w.WriteHeader(http.StatusInternalServerError) t.Error(err) return } payload, err := io.ReadAll(body) if err != nil { w.WriteHeader(http.StatusInternalServerError) t.Error(err) return } if string(payload) != "metric=cpu field=value 42 0\n" { w.WriteHeader(http.StatusInternalServerError) t.Errorf("Not equal, expected: %q, actual: %q", "metric=cpu field=value 42 0\n", string(payload)) return } w.WriteHeader(http.StatusNoContent) }) serializer := &carbon2.Serializer{ Format: "field_separate", } require.NoError(t, serializer.Init()) plugin := tt.plugin() plugin.SetSerializer(serializer) err = plugin.Connect() require.NoError(t, err) err = plugin.Write([]telegraf.Metric{getMetric()}) require.NoError(t, err) }) } } func TestDefaultUserAgent(t *testing.T) { ts := httptest.NewServer(http.NotFoundHandler()) defer ts.Close() u, err := url.Parse("http://" + ts.Listener.Addr().String()) require.NoError(t, err) t.Run("default-user-agent", func(t *testing.T) { ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Header.Get("User-Agent") != internal.ProductToken() { w.WriteHeader(http.StatusInternalServerError) t.Errorf("Not equal, expected: %q, actual: %q", internal.ProductToken(), r.Header.Get("User-Agent")) return } w.WriteHeader(http.StatusOK) }) plugin := &SumoLogic{ URL: u.String(), MaxRequestBodySize: Default().MaxRequestBodySize, } serializer := &carbon2.Serializer{ Format: "field_separate", } require.NoError(t, serializer.Init()) plugin.SetSerializer(serializer) err = plugin.Connect() require.NoError(t, err) err = plugin.Write([]telegraf.Metric{getMetric()}) require.NoError(t, err) }) } func TestTOMLConfig(t *testing.T) { testcases := []struct { name string configBytes []byte expectedError bool }{ { name: "carbon2 content type is supported", configBytes: []byte(` [[outputs.sumologic]] url = "https://localhost:3000" data_format = "carbon2" `), expectedError: false, }, { name: "graphite content type is supported", configBytes: []byte(` [[outputs.sumologic]] url = "https://localhost:3000" data_format = "graphite" `), expectedError: false, }, { name: "prometheus content type is supported", configBytes: []byte(` [[outputs.sumologic]] url = "https://localhost:3000" data_format = "prometheus" `), expectedError: false, }, { name: "setting extra headers is not possible", configBytes: []byte(` [[outputs.sumologic]] url = "https://localhost:3000" data_format = "carbon2" [outputs.sumologic.headers] X-Sumo-Name = "dummy" X-Sumo-Host = "dummy" X-Sumo-Category = "dummy" X-Sumo-Dimensions = "dummy" `), expectedError: true, }, { name: "full example from sample config is correct", configBytes: []byte(` [[outputs.sumologic]] url = "https://localhost:3000" data_format = "carbon2" timeout = "5s" source_name = "name" source_host = "hosta" source_category = "category" dimensions = "dimensions" `), expectedError: false, }, { name: "unknown key - sumo_metadata - in config fails", configBytes: []byte(` [[outputs.sumologic]] url = "https://localhost:3000" data_format = "carbon2" timeout = "5s" source_name = "name" sumo_metadata = "metadata" `), expectedError: true, }, } for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { c := config.NewConfig() if tt.expectedError { require.Error(t, c.LoadConfigData(tt.configBytes, config.EmptySourcePath)) } else { require.NoError(t, c.LoadConfigData(tt.configBytes, config.EmptySourcePath)) } }) } } func TestMaxRequestBodySize(t *testing.T) { ts := httptest.NewServer(http.NotFoundHandler()) defer ts.Close() u, err := url.Parse("http://" + ts.Listener.Addr().String()) require.NoError(t, err) testcases := []struct { name string plugin func() *SumoLogic metrics []telegraf.Metric expectedError bool expectedRequestCount int32 expectedMetricLinesCount int32 }{ { name: "default max request body size is 1MB and doesn't split small enough metric slices", plugin: func() *SumoLogic { s := Default() s.URL = u.String() return s }, metrics: []telegraf.Metric{getMetric()}, expectedError: false, expectedRequestCount: 1, expectedMetricLinesCount: 1, }, { name: "default max request body size is 1MB and doesn't split small even medium sized metrics", plugin: func() *SumoLogic { s := Default() s.URL = u.String() return s }, metrics: getMetrics(), expectedError: false, expectedRequestCount: 1, expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 }, { name: "when short by at least 1B the request is split", plugin: func() *SumoLogic { s := Default() s.URL = u.String() // getMetrics returns metrics that serialized (using carbon2), // uncompressed size is 43750B s.MaxRequestBodySize = 43_749 return s }, metrics: getMetrics(), expectedError: false, expectedRequestCount: 2, expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 }, { name: "max request body size properly splits requests - max 10_000", plugin: func() *SumoLogic { s := Default() s.URL = u.String() s.MaxRequestBodySize = 10_000 return s }, metrics: getMetrics(), expectedError: false, expectedRequestCount: 5, expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 }, { name: "max request body size properly splits requests - max 5_000", plugin: func() *SumoLogic { s := Default() s.URL = u.String() s.MaxRequestBodySize = 5_000 return s }, metrics: getMetrics(), expectedError: false, expectedRequestCount: 10, expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 }, { name: "max request body size properly splits requests - max 2_500", plugin: func() *SumoLogic { s := Default() s.URL = u.String() s.MaxRequestBodySize = 2_500 return s }, metrics: getMetrics(), expectedError: false, expectedRequestCount: 20, expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 }, { name: "max request body size properly splits requests - max 1_000", plugin: func() *SumoLogic { s := Default() s.URL = u.String() s.MaxRequestBodySize = 1_000 return s }, metrics: getMetrics(), expectedError: false, expectedRequestCount: 50, expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 }, { name: "max request body size properly splits requests - max 500", plugin: func() *SumoLogic { s := Default() s.URL = u.String() s.MaxRequestBodySize = 500 return s }, metrics: getMetrics(), expectedError: false, expectedRequestCount: 100, expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 }, { name: "max request body size properly splits requests - max 300", plugin: func() *SumoLogic { s := Default() s.URL = u.String() s.MaxRequestBodySize = 300 return s }, metrics: getMetrics(), expectedError: false, expectedRequestCount: 100, expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 }, } for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { var ( requestCount int32 linesCount int32 ) ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { atomic.AddInt32(&requestCount, 1) if tt.expectedMetricLinesCount != 0 { atomic.AddInt32(&linesCount, int32(countLines(t, r.Body))) } w.WriteHeader(http.StatusOK) }) serializer := &carbon2.Serializer{ Format: "field_separate", } require.NoError(t, serializer.Init()) plugin := tt.plugin() plugin.SetSerializer(serializer) plugin.Log = testutil.Logger{} err = plugin.Connect() require.NoError(t, err) err = plugin.Write(tt.metrics) if tt.expectedError { require.Error(t, err) } else { require.NoError(t, err) require.Equal(t, tt.expectedRequestCount, atomic.LoadInt32(&requestCount)) require.Equal(t, tt.expectedMetricLinesCount, atomic.LoadInt32(&linesCount)) } }) } } func TestTryingToSendEmptyMetricsDoesntFail(t *testing.T) { ts := httptest.NewServer(http.NotFoundHandler()) defer ts.Close() u, err := url.Parse("http://" + ts.Listener.Addr().String()) require.NoError(t, err) metrics := make([]telegraf.Metric, 0) plugin := Default() plugin.URL = u.String() serializer := &carbon2.Serializer{ Format: "field_separate", } require.NoError(t, serializer.Init()) plugin.SetSerializer(serializer) err = plugin.Connect() require.NoError(t, err) err = plugin.Write(metrics) require.NoError(t, err) } func countLines(t *testing.T, body io.Reader) int { // All requests coming from Sumo Logic output plugin are gzipped. gz, err := gzip.NewReader(body) require.NoError(t, err) var linesCount int for s := bufio.NewScanner(gz); s.Scan(); { linesCount++ } return linesCount }