1
0
Fork 0
telegraf/plugins/inputs/cloudwatch_metric_streams/cloudwatch_metric_streams_test.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

382 lines
12 KiB
Go

package cloudwatch_metric_streams
import (
"bytes"
"crypto/tls"
"crypto/x509"
"net/http"
"net/url"
"os"
"testing"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/testutil"
)
const (
badMsg = "blahblahblah: 42\n"
emptyMsg = ""
accessKey = "super-secure-password!"
badAccessKey = "super-insecure-password!"
maxBodySize = 524288000
)
var (
pki = testutil.NewPKI("../../../testutil/pki")
)
func newTestCloudWatchMetricStreams() *CloudWatchMetricStreams {
metricStream := &CloudWatchMetricStreams{
Log: testutil.Logger{},
ServiceAddress: "localhost:8080",
Paths: []string{"/write"},
MaxBodySize: config.Size(maxBodySize),
close: make(chan struct{}),
}
return metricStream
}
func newTestMetricStreamAuth() *CloudWatchMetricStreams {
metricStream := newTestCloudWatchMetricStreams()
metricStream.AccessKey = accessKey
return metricStream
}
func newTestMetricStreamHTTPS() *CloudWatchMetricStreams {
metricStream := newTestCloudWatchMetricStreams()
metricStream.ServerConfig = *pki.TLSServerConfig()
return metricStream
}
func newTestCompatibleCloudWatchMetricStreams() *CloudWatchMetricStreams {
metricStream := newTestCloudWatchMetricStreams()
metricStream.APICompatability = true
return metricStream
}
func getHTTPSClient() *http.Client {
tlsConfig, err := pki.TLSClientConfig().TLSConfig()
if err != nil {
panic(err)
}
return &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
}
}
func createURL(scheme, path string) string {
u := url.URL{
Scheme: scheme,
Host: "localhost:8080",
Path: path,
RawQuery: "",
}
return u.String()
}
func readJSON(t *testing.T, jsonFilePath string) []byte {
data, err := os.ReadFile(jsonFilePath)
require.NoErrorf(t, err, "could not read from data file %s", jsonFilePath)
return data
}
func TestInvalidListenerConfig(t *testing.T) {
metricStream := newTestCloudWatchMetricStreams()
metricStream.ServiceAddress = "address_without_port"
acc := &testutil.Accumulator{}
require.Error(t, metricStream.Start(acc))
// Stop is called when any ServiceInput fails to start; it must succeed regardless of state
metricStream.Stop()
}
func TestWriteHTTPSNoClientAuth(t *testing.T) {
metricStream := newTestMetricStreamHTTPS()
metricStream.TLSAllowedCACerts = nil
acc := &testutil.Accumulator{}
require.NoError(t, metricStream.Init())
require.NoError(t, metricStream.Start(acc))
defer metricStream.Stop()
cas := x509.NewCertPool()
cas.AppendCertsFromPEM([]byte(pki.ReadServerCert()))
noClientAuthClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: cas,
},
},
}
// post single message to the metric stream listener
record := readJSON(t, "testdata/record.json")
resp, err := noClientAuthClient.Post(createURL("https", "/write"), "", bytes.NewBuffer(record))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 200, resp.StatusCode)
}
func TestWriteHTTPSWithClientAuth(t *testing.T) {
metricStream := newTestMetricStreamHTTPS()
acc := &testutil.Accumulator{}
require.NoError(t, metricStream.Init())
require.NoError(t, metricStream.Start(acc))
defer metricStream.Stop()
// post single message to the metric stream listener
record := readJSON(t, "testdata/record.json")
resp, err := getHTTPSClient().Post(createURL("https", "/write"), "", bytes.NewBuffer(record))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 200, resp.StatusCode)
}
func TestWriteHTTPSuccessfulAuth(t *testing.T) {
metricStream := newTestMetricStreamAuth()
acc := &testutil.Accumulator{}
require.NoError(t, metricStream.Init())
require.NoError(t, metricStream.Start(acc))
defer metricStream.Stop()
client := &http.Client{}
record := readJSON(t, "testdata/record.json")
req, err := http.NewRequest("POST", createURL("http", "/write"), bytes.NewBuffer(record))
require.NoError(t, err)
req.Header.Set("X-Amz-Firehose-Access-Key", accessKey)
// post single message to the metric stream listener
resp, err := client.Do(req)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, http.StatusOK, resp.StatusCode)
}
func TestWriteHTTPFailedAuth(t *testing.T) {
metricStream := newTestMetricStreamAuth()
acc := &testutil.Accumulator{}
require.NoError(t, metricStream.Init())
require.NoError(t, metricStream.Start(acc))
defer metricStream.Stop()
client := &http.Client{}
record := readJSON(t, "testdata/record.json")
req, err := http.NewRequest("POST", createURL("http", "/write"), bytes.NewBuffer(record))
require.NoError(t, err)
req.Header.Set("X-Amz-Firehose-Access-Key", badAccessKey)
// post single message to the metric stream listener
resp, err := client.Do(req)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, http.StatusUnauthorized, resp.StatusCode)
}
func TestWriteHTTP(t *testing.T) {
metricStream := newTestCloudWatchMetricStreams()
acc := &testutil.Accumulator{}
require.NoError(t, metricStream.Init())
require.NoError(t, metricStream.Start(acc))
defer metricStream.Stop()
// post single message to the metric stream listener
record := readJSON(t, "testdata/record.json")
resp, err := http.Post(createURL("http", "/write"), "", bytes.NewBuffer(record))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 200, resp.StatusCode)
}
func TestWriteHTTPMultipleRecords(t *testing.T) {
metricStream := newTestCloudWatchMetricStreams()
acc := &testutil.Accumulator{}
require.NoError(t, metricStream.Init())
require.NoError(t, metricStream.Start(acc))
defer metricStream.Stop()
// post multiple records to the metric stream listener
records := readJSON(t, "testdata/records.json")
resp, err := http.Post(createURL("http", "/write"), "", bytes.NewBuffer(records))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 200, resp.StatusCode)
}
func TestWriteHTTPExactMaxBodySize(t *testing.T) {
metricStream := newTestCloudWatchMetricStreams()
record := readJSON(t, "testdata/record.json")
metricStream.MaxBodySize = config.Size(len(record))
acc := &testutil.Accumulator{}
require.NoError(t, metricStream.Init())
require.NoError(t, metricStream.Start(acc))
defer metricStream.Stop()
// post single message to the metric stream listener
resp, err := http.Post(createURL("http", "/write"), "", bytes.NewBuffer(record))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 200, resp.StatusCode)
}
func TestWriteHTTPVerySmallMaxBody(t *testing.T) {
metricStream := newTestCloudWatchMetricStreams()
metricStream.MaxBodySize = config.Size(512)
acc := &testutil.Accumulator{}
require.NoError(t, metricStream.Init())
require.NoError(t, metricStream.Start(acc))
defer metricStream.Stop()
// post single message to the metric stream listener
record := readJSON(t, "testdata/record.json")
resp, err := http.Post(createURL("http", "/write"), "", bytes.NewBuffer(record))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 413, resp.StatusCode)
}
func TestReceive404ForInvalidEndpoint(t *testing.T) {
metricStream := newTestCloudWatchMetricStreams()
acc := &testutil.Accumulator{}
require.NoError(t, metricStream.Init())
require.NoError(t, metricStream.Start(acc))
defer metricStream.Stop()
// post single message to the metric stream listener
record := readJSON(t, "testdata/record.json")
resp, err := http.Post(createURL("http", "/foobar"), "", bytes.NewBuffer(record))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 404, resp.StatusCode)
}
func TestWriteHTTPInvalid(t *testing.T) {
metricStream := newTestCloudWatchMetricStreams()
acc := &testutil.Accumulator{}
require.NoError(t, metricStream.Init())
require.NoError(t, metricStream.Start(acc))
defer metricStream.Stop()
// post a badly formatted message to the metric stream listener
resp, err := http.Post(createURL("http", "/write"), "", bytes.NewBufferString(badMsg))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 400, resp.StatusCode)
}
func TestWriteHTTPEmpty(t *testing.T) {
metricStream := newTestCloudWatchMetricStreams()
acc := &testutil.Accumulator{}
require.NoError(t, metricStream.Init())
require.NoError(t, metricStream.Start(acc))
defer metricStream.Stop()
// post empty message to the metric stream listener
resp, err := http.Post(createURL("http", "/write"), "", bytes.NewBufferString(emptyMsg))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 400, resp.StatusCode)
}
func TestComposeMetrics(t *testing.T) {
metricStream := newTestCloudWatchMetricStreams()
acc := &testutil.Accumulator{}
require.NoError(t, metricStream.Init())
require.NoError(t, metricStream.Start(acc))
defer metricStream.Stop()
// compose a data object for writing
data := data{
MetricStreamName: "cloudwatch-metric-stream",
AccountID: "546734499701",
Region: "us-west-2",
Namespace: "AWS/EC2",
MetricName: "CPUUtilization",
Dimensions: map[string]string{"AutoScalingGroupName": "test-autoscaling-group"},
Timestamp: 1651679400000,
Value: map[string]float64{"max": 0.4366666666666666, "min": 0.3683333333333333, "sum": 1.9399999999999997, "count": 5.0},
Unit: "Percent",
}
// Compose the metrics from data
metricStream.composeMetrics(data)
acc.Wait(1)
acc.AssertContainsTaggedFields(t, "aws_ec2_cpuutilization",
map[string]interface{}{"max": 0.4366666666666666, "min": 0.3683333333333333, "sum": 1.9399999999999997, "count": 5.0},
map[string]string{"AutoScalingGroupName": "test-autoscaling-group", "accountId": "546734499701", "region": "us-west-2"},
)
}
func TestComposeAPICompatibleMetrics(t *testing.T) {
metricStream := newTestCompatibleCloudWatchMetricStreams()
acc := &testutil.Accumulator{}
require.NoError(t, metricStream.Init())
require.NoError(t, metricStream.Start(acc))
defer metricStream.Stop()
// compose a data object for writing
data := data{
MetricStreamName: "cloudwatch-metric-stream",
AccountID: "546734499701",
Region: "us-west-2",
Namespace: "AWS/EC2",
MetricName: "CPUUtilization",
Dimensions: map[string]string{"AutoScalingGroupName": "test-autoscaling-group"},
Timestamp: 1651679400000,
Value: map[string]float64{"max": 0.4366666666666666, "min": 0.3683333333333333, "sum": 1.9399999999999997, "count": 5.0},
Unit: "Percent",
}
// Compose the metrics from data
metricStream.composeMetrics(data)
acc.Wait(1)
acc.AssertContainsTaggedFields(t, "aws_ec2_cpuutilization",
map[string]interface{}{"maximum": 0.4366666666666666, "minimum": 0.3683333333333333, "sum": 1.9399999999999997, "samplecount": 5.0},
map[string]string{"AutoScalingGroupName": "test-autoscaling-group", "accountId": "546734499701", "region": "us-west-2"},
)
}
// post GZIP encoded data to the metric stream listener
func TestWriteHTTPGzippedData(t *testing.T) {
metricStream := newTestCloudWatchMetricStreams()
acc := &testutil.Accumulator{}
require.NoError(t, metricStream.Init())
require.NoError(t, metricStream.Start(acc))
defer metricStream.Stop()
data, err := os.ReadFile("./testdata/records.gz")
require.NoError(t, err)
req, err := http.NewRequest("POST", createURL("http", "/write"), bytes.NewBuffer(data))
require.NoError(t, err)
req.Header.Set("Content-Encoding", "gzip")
client := &http.Client{}
resp, err := client.Do(req)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 200, resp.StatusCode)
}