1
0
Fork 0
telegraf/plugins/aggregators/quantile/quantile.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

149 lines
3.3 KiB
Go

//go:generate ../../../tools/readme_config_includer/generator
package quantile
import (
_ "embed"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/aggregators"
)
//go:embed sample.conf
var sampleConfig string
type Quantile struct {
Quantiles []float64 `toml:"quantiles"`
Compression float64 `toml:"compression"`
AlgorithmType string `toml:"algorithm"`
newAlgorithm newAlgorithmFunc
cache map[uint64]aggregate
suffixes []string
Log telegraf.Logger `toml:"-"`
}
type aggregate struct {
name string
fields map[string]algorithm
tags map[string]string
}
type newAlgorithmFunc func(compression float64) (algorithm, error)
func (*Quantile) SampleConfig() string {
return sampleConfig
}
func (q *Quantile) Add(in telegraf.Metric) {
id := in.HashID()
if cached, ok := q.cache[id]; ok {
fields := in.Fields()
for k, algo := range cached.fields {
if field, ok := fields[k]; ok {
if v, isconvertible := convert(field); isconvertible {
err := algo.Add(v)
if err != nil {
q.Log.Errorf("adding cached field %s: %v", k, err)
}
}
}
}
return
}
// New metric, setup cache and init algorithm
a := aggregate{
name: in.Name(),
tags: in.Tags(),
fields: make(map[string]algorithm),
}
for k, field := range in.Fields() {
if v, isconvertible := convert(field); isconvertible {
algo, err := q.newAlgorithm(q.Compression)
if err != nil {
q.Log.Errorf("generating algorithm %s: %v", k, err)
}
err = algo.Add(v)
if err != nil {
q.Log.Errorf("adding field %s: %v", k, err)
}
a.fields[k] = algo
}
}
q.cache[id] = a
}
func (q *Quantile) Push(acc telegraf.Accumulator) {
for _, aggregate := range q.cache {
fields := make(map[string]interface{}, len(aggregate.fields)*len(q.Quantiles))
for k, algo := range aggregate.fields {
for i, qtl := range q.Quantiles {
fields[k+q.suffixes[i]] = algo.Quantile(qtl)
}
}
acc.AddFields(aggregate.name, fields, aggregate.tags)
}
}
func (q *Quantile) Reset() {
q.cache = make(map[uint64]aggregate)
}
func convert(in interface{}) (float64, bool) {
switch v := in.(type) {
case float64:
return v, true
case int64:
return float64(v), true
case uint64:
return float64(v), true
default:
return 0, false
}
}
func (q *Quantile) Init() error {
switch q.AlgorithmType {
case "t-digest", "":
q.newAlgorithm = newTDigest
case "exact R7":
q.newAlgorithm = newExactR7
case "exact R8":
q.newAlgorithm = newExactR8
default:
return fmt.Errorf("unknown algorithm type %q", q.AlgorithmType)
}
if _, err := q.newAlgorithm(q.Compression); err != nil {
return fmt.Errorf("cannot create %q algorithm: %w", q.AlgorithmType, err)
}
if len(q.Quantiles) == 0 {
q.Quantiles = []float64{0.25, 0.5, 0.75}
}
duplicates := make(map[float64]bool)
q.suffixes = make([]string, 0, len(q.Quantiles))
for _, qtl := range q.Quantiles {
if qtl < 0.0 || qtl > 1.0 {
return fmt.Errorf("quantile %v out of range", qtl)
}
if _, found := duplicates[qtl]; found {
return fmt.Errorf("duplicate quantile %v", qtl)
}
duplicates[qtl] = true
q.suffixes = append(q.suffixes, fmt.Sprintf("_%03d", int(qtl*100.0)))
}
q.Reset()
return nil
}
func init() {
aggregators.Add("quantile", func() telegraf.Aggregator {
return &Quantile{Compression: 100}
})
}