1
0
Fork 0
telegraf/plugins/outputs/azure_monitor/azure_monitor.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

597 lines
15 KiB
Go

//go:generate ../../../tools/readme_config_includer/generator
package azure_monitor
import (
"bytes"
"compress/gzip"
"context"
_ "embed"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"hash/fnv"
"io"
"net/http"
"regexp"
"strings"
"time"
"github.com/Azure/go-autorest/autorest"
"github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/selfstat"
)
//go:embed sample.conf
var sampleConfig string
const (
vmInstanceMetadataURL = "http://169.254.169.254/metadata/instance?api-version=2017-12-01"
resourceIDTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/virtualMachines/%s"
resourceIDScaleSetTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/virtualMachineScaleSets/%s"
maxRequestBodySize = 4000000
)
var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
type dimension struct {
name string
value string
}
type aggregate struct {
name string
min float64
max float64
sum float64
count int64
dimensions []dimension
updated bool
}
type AzureMonitor struct {
Timeout config.Duration `toml:"timeout"`
NamespacePrefix string `toml:"namespace_prefix"`
StringsAsDimensions bool `toml:"strings_as_dimensions"`
Region string `toml:"region"`
ResourceID string `toml:"resource_id"`
EndpointURL string `toml:"endpoint_url"`
TimestampLimitPast config.Duration `toml:"timestamp_limit_past"`
TimestampLimitFuture config.Duration `toml:"timestamp_limit_future"`
Log telegraf.Logger `toml:"-"`
url string
preparer autorest.Preparer
client *http.Client
cache map[time.Time]map[uint64]*aggregate
timeFunc func() time.Time
MetricOutsideWindow selfstat.Stat
}
func (*AzureMonitor) SampleConfig() string {
return sampleConfig
}
func (a *AzureMonitor) Init() error {
a.cache = make(map[time.Time]map[uint64]*aggregate, 36)
authorizer, err := auth.NewAuthorizerFromEnvironmentWithResource("https://monitoring.azure.com/")
if err != nil {
return fmt.Errorf("creating authorizer failed: %w", err)
}
a.preparer = autorest.CreatePreparer(authorizer.WithAuthorization())
return nil
}
func (a *AzureMonitor) Connect() error {
a.client = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
},
Timeout: time.Duration(a.Timeout),
}
// If information is missing try to retrieve it from the Azure VM instance
if a.Region == "" || a.ResourceID == "" {
region, resourceID, err := vmInstanceMetadata(a.client)
if err != nil {
return fmt.Errorf("getting VM metadata failed: %w", err)
}
if a.Region == "" {
a.Region = region
}
if a.ResourceID == "" {
a.ResourceID = resourceID
}
}
if a.ResourceID == "" {
return errors.New("no resource ID configured or available via VM instance metadata")
}
if a.EndpointURL == "" {
if a.Region == "" {
return errors.New("no region configured or available via VM instance metadata")
}
a.url = fmt.Sprintf("https://%s.monitoring.azure.com%s/metrics", a.Region, a.ResourceID)
} else {
a.url = a.EndpointURL + a.ResourceID + "/metrics"
}
a.Log.Debugf("Writing to Azure Monitor URL: %s", a.url)
a.MetricOutsideWindow = selfstat.Register(
"azure_monitor",
"metric_outside_window",
map[string]string{
"region": a.Region,
"resource_id": a.ResourceID,
},
)
a.Reset()
return nil
}
// Close shuts down an any active connections
func (a *AzureMonitor) Close() error {
a.client.CloseIdleConnections()
a.client = nil
return nil
}
// Add will append a metric to the output aggregate
func (a *AzureMonitor) Add(m telegraf.Metric) {
// Azure Monitor only supports aggregates 30 minutes into the past and 4
// minutes into the future. Future metrics are dropped when pushed.
tbucket := m.Time().Truncate(time.Minute)
if tbucket.Before(a.timeFunc().Add(-time.Duration(a.TimestampLimitPast))) {
a.MetricOutsideWindow.Incr(1)
return
}
// Azure Monitor doesn't have a string value type, so convert string fields
// to dimensions (a.k.a. tags) if enabled.
if a.StringsAsDimensions {
for _, f := range m.FieldList() {
if v, ok := f.Value.(string); ok {
m.AddTag(f.Key, v)
}
}
}
for _, f := range m.FieldList() {
fv, err := internal.ToFloat64(f.Value)
if err != nil {
continue
}
// Azure Monitor does not support fields so the field name is appended
// to the metric name.
sanitizeKey := invalidNameCharRE.ReplaceAllString(f.Key, "_")
name := m.Name() + "-" + sanitizeKey
id := hashIDWithField(m.HashID(), f.Key)
// Create the time bucket if doesn't exist
if _, ok := a.cache[tbucket]; !ok {
a.cache[tbucket] = make(map[uint64]*aggregate)
}
// Fetch existing aggregate
agg, ok := a.cache[tbucket][id]
if !ok {
dimensions := make([]dimension, 0, len(m.TagList()))
for _, tag := range m.TagList() {
dimensions = append(dimensions, dimension{
name: tag.Key,
value: tag.Value,
})
}
a.cache[tbucket][id] = &aggregate{
name: name,
dimensions: dimensions,
min: fv,
max: fv,
sum: fv,
count: 1,
updated: true,
}
continue
}
if fv < agg.min {
agg.min = fv
}
if fv > agg.max {
agg.max = fv
}
agg.sum += fv
agg.count++
agg.updated = true
}
}
// Push sends metrics to the output metric buffer
func (a *AzureMonitor) Push() []telegraf.Metric {
var metrics []telegraf.Metric
for tbucket, aggs := range a.cache {
// Do not send metrics early
if tbucket.After(a.timeFunc().Add(time.Duration(a.TimestampLimitFuture))) {
continue
}
for _, agg := range aggs {
// Only send aggregates that have had an update since the last push.
if !agg.updated {
continue
}
tags := make(map[string]string, len(agg.dimensions))
for _, tag := range agg.dimensions {
tags[tag.name] = tag.value
}
m := metric.New(agg.name,
tags,
map[string]interface{}{
"min": agg.min,
"max": agg.max,
"sum": agg.sum,
"count": agg.count,
},
tbucket,
)
metrics = append(metrics, m)
}
}
return metrics
}
// Reset clears the cache of aggregate metrics
func (a *AzureMonitor) Reset() {
for tbucket := range a.cache {
// Remove aggregates older than 30 minutes
if tbucket.Before(a.timeFunc().Add(-time.Duration(a.TimestampLimitPast))) {
delete(a.cache, tbucket)
continue
}
// Metrics updated within the latest 1m have not been pushed and should
// not be cleared.
if tbucket.After(a.timeFunc().Add(time.Duration(a.TimestampLimitFuture))) {
continue
}
for id := range a.cache[tbucket] {
a.cache[tbucket][id].updated = false
}
}
}
// Write writes metrics to the remote endpoint
func (a *AzureMonitor) Write(metrics []telegraf.Metric) error {
now := a.timeFunc()
tsEarliest := now.Add(-time.Duration(a.TimestampLimitPast))
tsLatest := now.Add(time.Duration(a.TimestampLimitFuture))
writeErr := &internal.PartialWriteError{
MetricsAccept: make([]int, 0, len(metrics)),
}
azmetrics := make(map[uint64]*azureMonitorMetric, len(metrics))
for i, m := range metrics {
// Skip metrics that our outside of the valid timespan
if m.Time().Before(tsEarliest) || m.Time().After(tsLatest) {
a.Log.Tracef("Metric outside acceptable time window: %v", m)
a.MetricOutsideWindow.Incr(1)
writeErr.Err = errors.New("metric(s) outside of acceptable time window")
writeErr.MetricsReject = append(writeErr.MetricsReject, i)
continue
}
amm, err := translate(m, a.NamespacePrefix)
if err != nil {
a.Log.Errorf("Could not create azure metric for %q; discarding point", m.Name())
if writeErr.Err == nil {
writeErr.Err = errors.New("translating metric(s) failed")
}
writeErr.MetricsReject = append(writeErr.MetricsReject, i)
continue
}
id := hashIDWithTagKeysOnly(m)
if azm, ok := azmetrics[id]; !ok {
azmetrics[id] = amm
azmetrics[id].index = i
} else {
azmetrics[id].Data.BaseData.Series = append(
azm.Data.BaseData.Series,
amm.Data.BaseData.Series...,
)
azmetrics[id].index = i
}
}
if len(azmetrics) == 0 {
if writeErr.Err == nil {
return nil
}
return writeErr
}
var buffer bytes.Buffer
buffer.Grow(maxRequestBodySize)
batchIndices := make([]int, 0, len(azmetrics))
for _, m := range azmetrics {
// Azure Monitor accepts new batches of points in new-line delimited
// JSON, following RFC 4288 (see https://github.com/ndjson/ndjson-spec).
buf, err := json.Marshal(m)
if err != nil {
writeErr.MetricsReject = append(writeErr.MetricsReject, m.index)
writeErr.Err = err
continue
}
batchIndices = append(batchIndices, m.index)
// Azure Monitor's maximum request body size of 4MB. Send batches that
// exceed this size via separate write requests.
if buffer.Len()+len(buf)+1 > maxRequestBodySize {
if retryable, err := a.send(buffer.Bytes()); err != nil {
writeErr.Err = err
if !retryable {
writeErr.MetricsReject = append(writeErr.MetricsAccept, batchIndices...)
}
return writeErr
}
writeErr.MetricsAccept = append(writeErr.MetricsAccept, batchIndices...)
batchIndices = make([]int, 0, len(azmetrics))
buffer.Reset()
}
if _, err := buffer.Write(buf); err != nil {
return fmt.Errorf("writing to buffer failed: %w", err)
}
if err := buffer.WriteByte('\n'); err != nil {
return fmt.Errorf("writing to buffer failed: %w", err)
}
}
if retryable, err := a.send(buffer.Bytes()); err != nil {
writeErr.Err = err
if !retryable {
writeErr.MetricsReject = append(writeErr.MetricsAccept, batchIndices...)
}
return writeErr
}
writeErr.MetricsAccept = append(writeErr.MetricsAccept, batchIndices...)
if writeErr.Err == nil {
return nil
}
return writeErr
}
func (a *AzureMonitor) send(body []byte) (bool, error) {
var buf bytes.Buffer
g := gzip.NewWriter(&buf)
if _, err := g.Write(body); err != nil {
return false, fmt.Errorf("zipping content failed: %w", err)
}
if err := g.Close(); err != nil {
return false, fmt.Errorf("closing gzip writer failed: %w", err)
}
req, err := http.NewRequest("POST", a.url, &buf)
if err != nil {
return false, fmt.Errorf("creating request failed: %w", err)
}
req.Header.Set("Content-Encoding", "gzip")
req.Header.Set("Content-Type", "application/x-ndjson")
// Add the authorization header. WithAuthorization will automatically
// refresh the token if needed.
req, err = a.preparer.Prepare(req)
if err != nil {
return false, fmt.Errorf("unable to fetch authentication credentials: %w", err)
}
resp, err := a.client.Do(req)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
a.client.CloseIdleConnections()
a.client = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
},
Timeout: time.Duration(a.Timeout),
}
}
return true, err
}
defer resp.Body.Close()
if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
return false, nil
}
retryable := resp.StatusCode != 400
if respbody, err := io.ReadAll(resp.Body); err == nil {
return retryable, fmt.Errorf("failed to write batch: [%d] %s: %s", resp.StatusCode, resp.Status, string(respbody))
}
return retryable, fmt.Errorf("failed to write batch: [%d] %s", resp.StatusCode, resp.Status)
}
// vmMetadata retrieves metadata about the current Azure VM
func vmInstanceMetadata(c *http.Client) (region, resourceID string, err error) {
req, err := http.NewRequest("GET", vmInstanceMetadataURL, nil)
if err != nil {
return "", "", fmt.Errorf("error creating request: %w", err)
}
req.Header.Set("Metadata", "true")
resp, err := c.Do(req)
if err != nil {
return "", "", err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", "", err
}
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
return "", "", fmt.Errorf("unable to fetch instance metadata: [%s] %d",
vmInstanceMetadataURL, resp.StatusCode)
}
var metadata virtualMachineMetadata
if err := json.Unmarshal(body, &metadata); err != nil {
return "", "", err
}
region = metadata.Compute.Location
resourceID = metadata.ResourceID()
return region, resourceID, nil
}
func hashIDWithField(id uint64, fk string) uint64 {
h := fnv.New64a()
b := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(b, id)
h.Write(b[:n])
h.Write([]byte("\n"))
h.Write([]byte(fk))
h.Write([]byte("\n"))
return h.Sum64()
}
func hashIDWithTagKeysOnly(m telegraf.Metric) uint64 {
h := fnv.New64a()
h.Write([]byte(m.Name()))
h.Write([]byte("\n"))
for _, tag := range m.TagList() {
if tag.Key == "" || tag.Value == "" {
continue
}
h.Write([]byte(tag.Key))
h.Write([]byte("\n"))
}
b := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(b, uint64(m.Time().UnixNano()))
h.Write(b[:n])
h.Write([]byte("\n"))
return h.Sum64()
}
func translate(m telegraf.Metric, prefix string) (*azureMonitorMetric, error) {
dimensionNames := make([]string, 0, len(m.TagList()))
dimensionValues := make([]string, 0, len(m.TagList()))
for _, tag := range m.TagList() {
// Azure custom metrics service supports up to 10 dimensions
if len(dimensionNames) >= 10 {
continue
}
if tag.Key == "" || tag.Value == "" {
continue
}
dimensionNames = append(dimensionNames, tag.Key)
dimensionValues = append(dimensionValues, tag.Value)
}
vmin, err := getFloatField(m, "min")
if err != nil {
return nil, err
}
vmax, err := getFloatField(m, "max")
if err != nil {
return nil, err
}
vsum, err := getFloatField(m, "sum")
if err != nil {
return nil, err
}
vcount, err := getIntField(m, "count")
if err != nil {
return nil, err
}
mn, ns := "Missing", "Missing"
names := strings.SplitN(m.Name(), "-", 2)
if len(names) > 1 {
mn = names[1]
}
if len(names) > 0 {
ns = names[0]
}
ns = prefix + ns
return &azureMonitorMetric{
Time: m.Time(),
Data: &azureMonitorData{
BaseData: &azureMonitorBaseData{
Metric: mn,
Namespace: ns,
DimensionNames: dimensionNames,
Series: []*azureMonitorSeries{
{
DimensionValues: dimensionValues,
Min: vmin,
Max: vmax,
Sum: vsum,
Count: vcount,
},
},
},
},
}, nil
}
func getFloatField(m telegraf.Metric, key string) (float64, error) {
fv, ok := m.GetField(key)
if !ok {
return 0, fmt.Errorf("missing field: %s", key)
}
if value, ok := fv.(float64); ok {
return value, nil
}
return 0, fmt.Errorf("unexpected type: %s: %T", key, fv)
}
func getIntField(m telegraf.Metric, key string) (int64, error) {
fv, ok := m.GetField(key)
if !ok {
return 0, fmt.Errorf("missing field: %s", key)
}
if value, ok := fv.(int64); ok {
return value, nil
}
return 0, fmt.Errorf("unexpected type: %s: %T", key, fv)
}
func init() {
outputs.Add("azure_monitor", func() telegraf.Output {
return &AzureMonitor{
NamespacePrefix: "Telegraf/",
TimestampLimitPast: config.Duration(20 * time.Minute),
TimestampLimitFuture: config.Duration(-1 * time.Minute),
Timeout: config.Duration(5 * time.Second),
timeFunc: time.Now,
}
})
}