1
0
Fork 0
telegraf/plugins/outputs/cloudwatch/cloudwatch.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

425 lines
11 KiB
Go

//go:generate ../../../tools/readme_config_includer/generator
package cloudwatch
import (
"context"
_ "embed"
"math"
"net/http"
"sort"
"strings"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/influxdata/telegraf"
common_aws "github.com/influxdata/telegraf/plugins/common/aws"
common_http "github.com/influxdata/telegraf/plugins/common/http"
"github.com/influxdata/telegraf/plugins/outputs"
)
//go:embed sample.conf
var sampleConfig string
type CloudWatch struct {
Namespace string `toml:"namespace"` // CloudWatch Metrics Namespace
HighResolutionMetrics bool `toml:"high_resolution_metrics"`
svc *cloudwatch.Client
WriteStatistics bool `toml:"write_statistics"`
Log telegraf.Logger `toml:"-"`
common_aws.CredentialConfig
common_http.HTTPClientConfig
client *http.Client
}
type statisticType int
const (
statisticTypeNone statisticType = iota
statisticTypeMax
statisticTypeMin
statisticTypeSum
statisticTypeCount
)
type cloudwatchField interface {
addValue(sType statisticType, value float64)
buildDatum() []types.MetricDatum
}
type statisticField struct {
metricName string
fieldName string
tags map[string]string
values map[statisticType]float64
timestamp time.Time
storageResolution int64
}
func (f *statisticField) addValue(sType statisticType, value float64) {
if sType != statisticTypeNone {
f.values[sType] = value
}
}
func (f *statisticField) buildDatum() []types.MetricDatum {
var datums []types.MetricDatum
if f.hasAllFields() {
// If we have all required fields, we build datum with StatisticValues
vmin := f.values[statisticTypeMin]
vmax := f.values[statisticTypeMax]
vsum := f.values[statisticTypeSum]
vcount := f.values[statisticTypeCount]
datum := types.MetricDatum{
MetricName: aws.String(strings.Join([]string{f.metricName, f.fieldName}, "_")),
Dimensions: BuildDimensions(f.tags),
Timestamp: aws.Time(f.timestamp),
StatisticValues: &types.StatisticSet{
Minimum: aws.Float64(vmin),
Maximum: aws.Float64(vmax),
Sum: aws.Float64(vsum),
SampleCount: aws.Float64(vcount),
},
StorageResolution: aws.Int32(int32(f.storageResolution)),
}
datums = append(datums, datum)
} else {
// If we don't have all required fields, we build each field as independent datum
for sType, value := range f.values {
datum := types.MetricDatum{
Value: aws.Float64(value),
Dimensions: BuildDimensions(f.tags),
Timestamp: aws.Time(f.timestamp),
}
switch sType {
case statisticTypeMin:
datum.MetricName = aws.String(strings.Join([]string{f.metricName, f.fieldName, "min"}, "_"))
case statisticTypeMax:
datum.MetricName = aws.String(strings.Join([]string{f.metricName, f.fieldName, "max"}, "_"))
case statisticTypeSum:
datum.MetricName = aws.String(strings.Join([]string{f.metricName, f.fieldName, "sum"}, "_"))
case statisticTypeCount:
datum.MetricName = aws.String(strings.Join([]string{f.metricName, f.fieldName, "count"}, "_"))
default:
// should not be here
continue
}
datums = append(datums, datum)
}
}
return datums
}
func (f *statisticField) hasAllFields() bool {
_, hasMin := f.values[statisticTypeMin]
_, hasMax := f.values[statisticTypeMax]
_, hasSum := f.values[statisticTypeSum]
_, hasCount := f.values[statisticTypeCount]
return hasMin && hasMax && hasSum && hasCount
}
type valueField struct {
metricName string
fieldName string
tags map[string]string
value float64
timestamp time.Time
storageResolution int64
}
func (f *valueField) addValue(sType statisticType, value float64) {
if sType == statisticTypeNone {
f.value = value
}
}
func (f *valueField) buildDatum() []types.MetricDatum {
return []types.MetricDatum{
{
MetricName: aws.String(strings.Join([]string{f.metricName, f.fieldName}, "_")),
Value: aws.Float64(f.value),
Dimensions: BuildDimensions(f.tags),
Timestamp: aws.Time(f.timestamp),
StorageResolution: aws.Int32(int32(f.storageResolution)),
},
}
}
func (*CloudWatch) SampleConfig() string {
return sampleConfig
}
func (c *CloudWatch) Connect() error {
cfg, err := c.CredentialConfig.Credentials()
if err != nil {
return err
}
ctx := context.Background()
client, err := c.HTTPClientConfig.CreateClient(ctx, c.Log)
if err != nil {
return err
}
c.client = client
c.svc = cloudwatch.NewFromConfig(cfg, func(options *cloudwatch.Options) {
options.HTTPClient = c.client
})
return nil
}
func (c *CloudWatch) Close() error {
if c.client != nil {
c.client.CloseIdleConnections()
}
return nil
}
func (c *CloudWatch) Write(metrics []telegraf.Metric) error {
var datums []types.MetricDatum
for _, m := range metrics {
d := BuildMetricDatum(c.WriteStatistics, c.HighResolutionMetrics, m)
datums = append(datums, d...)
}
// PutMetricData only supports up to 1000 data metrics per call
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html
const maxDatumsPerCall = 1000
for _, partition := range PartitionDatums(maxDatumsPerCall, datums) {
err := c.WriteToCloudWatch(partition)
if err != nil {
return err
}
}
return nil
}
func (c *CloudWatch) WriteToCloudWatch(datums []types.MetricDatum) error {
params := &cloudwatch.PutMetricDataInput{
MetricData: datums,
Namespace: aws.String(c.Namespace),
}
_, err := c.svc.PutMetricData(context.Background(), params)
if err != nil {
c.Log.Errorf("Unable to write to CloudWatch : %+v", err.Error())
}
return err
}
// PartitionDatums partitions the MetricDatums into smaller slices of a max size so that are under the limit
// for the AWS API calls.
func PartitionDatums(size int, datums []types.MetricDatum) [][]types.MetricDatum {
numberOfPartitions := len(datums) / size
if len(datums)%size != 0 {
numberOfPartitions++
}
partitions := make([][]types.MetricDatum, numberOfPartitions)
for i := 0; i < numberOfPartitions; i++ {
start := size * i
end := size * (i + 1)
if end > len(datums) {
end = len(datums)
}
partitions[i] = datums[start:end]
}
return partitions
}
// BuildMetricDatum makes a MetricDatum from telegraf.Metric. It would check if all required fields of
// cloudwatch.StatisticSet are available. If so, it would build MetricDatum from statistic values.
// Otherwise, fields would still been built independently.
func BuildMetricDatum(buildStatistic, highResolutionMetrics bool, point telegraf.Metric) []types.MetricDatum {
fields := make(map[string]cloudwatchField)
tags := point.Tags()
storageResolution := int64(60)
if highResolutionMetrics {
storageResolution = 1
}
for k, v := range point.Fields() {
val, ok := convert(v)
if !ok {
// Only fields with values that can be converted to float64 (and within CloudWatch boundary) are supported.
// Non-supported fields are skipped.
continue
}
sType, fieldName := getStatisticType(k)
// If statistic metric is not enabled or non-statistic type, just take current field as a value field.
if !buildStatistic || sType == statisticTypeNone {
fields[k] = &valueField{
metricName: point.Name(),
fieldName: k,
tags: tags,
timestamp: point.Time(),
value: val,
storageResolution: storageResolution,
}
continue
}
// Otherwise, it shall be a statistic field.
if _, ok := fields[fieldName]; !ok {
// Hit an uncached field, create statisticField for first time
fields[fieldName] = &statisticField{
metricName: point.Name(),
fieldName: fieldName,
tags: tags,
timestamp: point.Time(),
values: map[statisticType]float64{
sType: val,
},
storageResolution: storageResolution,
}
} else {
// Add new statistic value to this field
fields[fieldName].addValue(sType, val)
}
}
var datums []types.MetricDatum
for _, f := range fields {
d := f.buildDatum()
datums = append(datums, d...)
}
return datums
}
// BuildDimensions makes a list of Dimensions by using a Point's tags. CloudWatch supports up to
// 10 dimensions per metric, so we only keep up to the first 10 alphabetically.
// This always includes the "host" tag if it exists.
func BuildDimensions(mTags map[string]string) []types.Dimension {
const maxDimensions = 10
dimensions := make([]types.Dimension, 0, maxDimensions)
// This is pretty ugly, but we always want to include the "host" tag if it exists.
if host, ok := mTags["host"]; ok {
dimensions = append(dimensions, types.Dimension{
Name: aws.String("host"),
Value: aws.String(host),
})
}
var keys []string
for k := range mTags {
if k != "host" {
keys = append(keys, k)
}
}
sort.Strings(keys)
for _, k := range keys {
if len(dimensions) >= maxDimensions {
break
}
value := mTags[k]
if value == "" {
continue
}
dimensions = append(dimensions, types.Dimension{
Name: aws.String(k),
Value: aws.String(mTags[k]),
})
}
return dimensions
}
func getStatisticType(name string) (sType statisticType, fieldName string) {
switch {
case strings.HasSuffix(name, "_max"):
sType = statisticTypeMax
fieldName = strings.TrimSuffix(name, "_max")
case strings.HasSuffix(name, "_min"):
sType = statisticTypeMin
fieldName = strings.TrimSuffix(name, "_min")
case strings.HasSuffix(name, "_sum"):
sType = statisticTypeSum
fieldName = strings.TrimSuffix(name, "_sum")
case strings.HasSuffix(name, "_count"):
sType = statisticTypeCount
fieldName = strings.TrimSuffix(name, "_count")
default:
sType = statisticTypeNone
fieldName = name
}
return sType, fieldName
}
func convert(v interface{}) (value float64, ok bool) {
ok = true
switch t := v.(type) {
case int:
value = float64(t)
case int32:
value = float64(t)
case int64:
value = float64(t)
case uint64:
value = float64(t)
case float64:
value = t
case bool:
if t {
value = 1
} else {
value = 0
}
case time.Time:
value = float64(t.Unix())
default:
// Skip unsupported type.
ok = false
return value, ok
}
// Do CloudWatch boundary checking
// Constraints at: http://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html
switch {
case math.IsNaN(value):
return 0, false
case math.IsInf(value, 0):
return 0, false
case value > 0 && value < float64(8.515920e-109):
return 0, false
case value > float64(1.174271e+108):
return 0, false
}
return value, ok
}
func init() {
outputs.Add("cloudwatch", func() telegraf.Output {
return &CloudWatch{}
})
}