//go:generate ../../../tools/readme_config_includer/generator package cloudwatch import ( "context" _ "embed" "math" "net/http" "sort" "strings" "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/cloudwatch" "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" "github.com/influxdata/telegraf" common_aws "github.com/influxdata/telegraf/plugins/common/aws" common_http "github.com/influxdata/telegraf/plugins/common/http" "github.com/influxdata/telegraf/plugins/outputs" ) //go:embed sample.conf var sampleConfig string type CloudWatch struct { Namespace string `toml:"namespace"` // CloudWatch Metrics Namespace HighResolutionMetrics bool `toml:"high_resolution_metrics"` svc *cloudwatch.Client WriteStatistics bool `toml:"write_statistics"` Log telegraf.Logger `toml:"-"` common_aws.CredentialConfig common_http.HTTPClientConfig client *http.Client } type statisticType int const ( statisticTypeNone statisticType = iota statisticTypeMax statisticTypeMin statisticTypeSum statisticTypeCount ) type cloudwatchField interface { addValue(sType statisticType, value float64) buildDatum() []types.MetricDatum } type statisticField struct { metricName string fieldName string tags map[string]string values map[statisticType]float64 timestamp time.Time storageResolution int64 } func (f *statisticField) addValue(sType statisticType, value float64) { if sType != statisticTypeNone { f.values[sType] = value } } func (f *statisticField) buildDatum() []types.MetricDatum { var datums []types.MetricDatum if f.hasAllFields() { // If we have all required fields, we build datum with StatisticValues vmin := f.values[statisticTypeMin] vmax := f.values[statisticTypeMax] vsum := f.values[statisticTypeSum] vcount := f.values[statisticTypeCount] datum := types.MetricDatum{ MetricName: aws.String(strings.Join([]string{f.metricName, f.fieldName}, "_")), Dimensions: BuildDimensions(f.tags), Timestamp: aws.Time(f.timestamp), StatisticValues: &types.StatisticSet{ Minimum: aws.Float64(vmin), Maximum: aws.Float64(vmax), Sum: aws.Float64(vsum), SampleCount: aws.Float64(vcount), }, StorageResolution: aws.Int32(int32(f.storageResolution)), } datums = append(datums, datum) } else { // If we don't have all required fields, we build each field as independent datum for sType, value := range f.values { datum := types.MetricDatum{ Value: aws.Float64(value), Dimensions: BuildDimensions(f.tags), Timestamp: aws.Time(f.timestamp), } switch sType { case statisticTypeMin: datum.MetricName = aws.String(strings.Join([]string{f.metricName, f.fieldName, "min"}, "_")) case statisticTypeMax: datum.MetricName = aws.String(strings.Join([]string{f.metricName, f.fieldName, "max"}, "_")) case statisticTypeSum: datum.MetricName = aws.String(strings.Join([]string{f.metricName, f.fieldName, "sum"}, "_")) case statisticTypeCount: datum.MetricName = aws.String(strings.Join([]string{f.metricName, f.fieldName, "count"}, "_")) default: // should not be here continue } datums = append(datums, datum) } } return datums } func (f *statisticField) hasAllFields() bool { _, hasMin := f.values[statisticTypeMin] _, hasMax := f.values[statisticTypeMax] _, hasSum := f.values[statisticTypeSum] _, hasCount := f.values[statisticTypeCount] return hasMin && hasMax && hasSum && hasCount } type valueField struct { metricName string fieldName string tags map[string]string value float64 timestamp time.Time storageResolution int64 } func (f *valueField) addValue(sType statisticType, value float64) { if sType == statisticTypeNone { f.value = value } } func (f *valueField) buildDatum() []types.MetricDatum { return []types.MetricDatum{ { MetricName: aws.String(strings.Join([]string{f.metricName, f.fieldName}, "_")), Value: aws.Float64(f.value), Dimensions: BuildDimensions(f.tags), Timestamp: aws.Time(f.timestamp), StorageResolution: aws.Int32(int32(f.storageResolution)), }, } } func (*CloudWatch) SampleConfig() string { return sampleConfig } func (c *CloudWatch) Connect() error { cfg, err := c.CredentialConfig.Credentials() if err != nil { return err } ctx := context.Background() client, err := c.HTTPClientConfig.CreateClient(ctx, c.Log) if err != nil { return err } c.client = client c.svc = cloudwatch.NewFromConfig(cfg, func(options *cloudwatch.Options) { options.HTTPClient = c.client }) return nil } func (c *CloudWatch) Close() error { if c.client != nil { c.client.CloseIdleConnections() } return nil } func (c *CloudWatch) Write(metrics []telegraf.Metric) error { var datums []types.MetricDatum for _, m := range metrics { d := BuildMetricDatum(c.WriteStatistics, c.HighResolutionMetrics, m) datums = append(datums, d...) } // PutMetricData only supports up to 1000 data metrics per call // https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html const maxDatumsPerCall = 1000 for _, partition := range PartitionDatums(maxDatumsPerCall, datums) { err := c.WriteToCloudWatch(partition) if err != nil { return err } } return nil } func (c *CloudWatch) WriteToCloudWatch(datums []types.MetricDatum) error { params := &cloudwatch.PutMetricDataInput{ MetricData: datums, Namespace: aws.String(c.Namespace), } _, err := c.svc.PutMetricData(context.Background(), params) if err != nil { c.Log.Errorf("Unable to write to CloudWatch : %+v", err.Error()) } return err } // PartitionDatums partitions the MetricDatums into smaller slices of a max size so that are under the limit // for the AWS API calls. func PartitionDatums(size int, datums []types.MetricDatum) [][]types.MetricDatum { numberOfPartitions := len(datums) / size if len(datums)%size != 0 { numberOfPartitions++ } partitions := make([][]types.MetricDatum, numberOfPartitions) for i := 0; i < numberOfPartitions; i++ { start := size * i end := size * (i + 1) if end > len(datums) { end = len(datums) } partitions[i] = datums[start:end] } return partitions } // BuildMetricDatum makes a MetricDatum from telegraf.Metric. It would check if all required fields of // cloudwatch.StatisticSet are available. If so, it would build MetricDatum from statistic values. // Otherwise, fields would still been built independently. func BuildMetricDatum(buildStatistic, highResolutionMetrics bool, point telegraf.Metric) []types.MetricDatum { fields := make(map[string]cloudwatchField) tags := point.Tags() storageResolution := int64(60) if highResolutionMetrics { storageResolution = 1 } for k, v := range point.Fields() { val, ok := convert(v) if !ok { // Only fields with values that can be converted to float64 (and within CloudWatch boundary) are supported. // Non-supported fields are skipped. continue } sType, fieldName := getStatisticType(k) // If statistic metric is not enabled or non-statistic type, just take current field as a value field. if !buildStatistic || sType == statisticTypeNone { fields[k] = &valueField{ metricName: point.Name(), fieldName: k, tags: tags, timestamp: point.Time(), value: val, storageResolution: storageResolution, } continue } // Otherwise, it shall be a statistic field. if _, ok := fields[fieldName]; !ok { // Hit an uncached field, create statisticField for first time fields[fieldName] = &statisticField{ metricName: point.Name(), fieldName: fieldName, tags: tags, timestamp: point.Time(), values: map[statisticType]float64{ sType: val, }, storageResolution: storageResolution, } } else { // Add new statistic value to this field fields[fieldName].addValue(sType, val) } } var datums []types.MetricDatum for _, f := range fields { d := f.buildDatum() datums = append(datums, d...) } return datums } // BuildDimensions makes a list of Dimensions by using a Point's tags. CloudWatch supports up to // 10 dimensions per metric, so we only keep up to the first 10 alphabetically. // This always includes the "host" tag if it exists. func BuildDimensions(mTags map[string]string) []types.Dimension { const maxDimensions = 10 dimensions := make([]types.Dimension, 0, maxDimensions) // This is pretty ugly, but we always want to include the "host" tag if it exists. if host, ok := mTags["host"]; ok { dimensions = append(dimensions, types.Dimension{ Name: aws.String("host"), Value: aws.String(host), }) } var keys []string for k := range mTags { if k != "host" { keys = append(keys, k) } } sort.Strings(keys) for _, k := range keys { if len(dimensions) >= maxDimensions { break } value := mTags[k] if value == "" { continue } dimensions = append(dimensions, types.Dimension{ Name: aws.String(k), Value: aws.String(mTags[k]), }) } return dimensions } func getStatisticType(name string) (sType statisticType, fieldName string) { switch { case strings.HasSuffix(name, "_max"): sType = statisticTypeMax fieldName = strings.TrimSuffix(name, "_max") case strings.HasSuffix(name, "_min"): sType = statisticTypeMin fieldName = strings.TrimSuffix(name, "_min") case strings.HasSuffix(name, "_sum"): sType = statisticTypeSum fieldName = strings.TrimSuffix(name, "_sum") case strings.HasSuffix(name, "_count"): sType = statisticTypeCount fieldName = strings.TrimSuffix(name, "_count") default: sType = statisticTypeNone fieldName = name } return sType, fieldName } func convert(v interface{}) (value float64, ok bool) { ok = true switch t := v.(type) { case int: value = float64(t) case int32: value = float64(t) case int64: value = float64(t) case uint64: value = float64(t) case float64: value = t case bool: if t { value = 1 } else { value = 0 } case time.Time: value = float64(t.Unix()) default: // Skip unsupported type. ok = false return value, ok } // Do CloudWatch boundary checking // Constraints at: http://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html switch { case math.IsNaN(value): return 0, false case math.IsInf(value, 0): return 0, false case value > 0 && value < float64(8.515920e-109): return 0, false case value > float64(1.174271e+108): return 0, false } return value, ok } func init() { outputs.Add("cloudwatch", func() telegraf.Output { return &CloudWatch{} }) }