//go:generate ../../../tools/readme_config_includer/generator package quantile import ( _ "embed" "fmt" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/aggregators" ) //go:embed sample.conf var sampleConfig string type Quantile struct { Quantiles []float64 `toml:"quantiles"` Compression float64 `toml:"compression"` AlgorithmType string `toml:"algorithm"` newAlgorithm newAlgorithmFunc cache map[uint64]aggregate suffixes []string Log telegraf.Logger `toml:"-"` } type aggregate struct { name string fields map[string]algorithm tags map[string]string } type newAlgorithmFunc func(compression float64) (algorithm, error) func (*Quantile) SampleConfig() string { return sampleConfig } func (q *Quantile) Add(in telegraf.Metric) { id := in.HashID() if cached, ok := q.cache[id]; ok { fields := in.Fields() for k, algo := range cached.fields { if field, ok := fields[k]; ok { if v, isconvertible := convert(field); isconvertible { err := algo.Add(v) if err != nil { q.Log.Errorf("adding cached field %s: %v", k, err) } } } } return } // New metric, setup cache and init algorithm a := aggregate{ name: in.Name(), tags: in.Tags(), fields: make(map[string]algorithm), } for k, field := range in.Fields() { if v, isconvertible := convert(field); isconvertible { algo, err := q.newAlgorithm(q.Compression) if err != nil { q.Log.Errorf("generating algorithm %s: %v", k, err) } err = algo.Add(v) if err != nil { q.Log.Errorf("adding field %s: %v", k, err) } a.fields[k] = algo } } q.cache[id] = a } func (q *Quantile) Push(acc telegraf.Accumulator) { for _, aggregate := range q.cache { fields := make(map[string]interface{}, len(aggregate.fields)*len(q.Quantiles)) for k, algo := range aggregate.fields { for i, qtl := range q.Quantiles { fields[k+q.suffixes[i]] = algo.Quantile(qtl) } } acc.AddFields(aggregate.name, fields, aggregate.tags) } } func (q *Quantile) Reset() { q.cache = make(map[uint64]aggregate) } func convert(in interface{}) (float64, bool) { switch v := in.(type) { case float64: return v, true case int64: return float64(v), true case uint64: return float64(v), true default: return 0, false } } func (q *Quantile) Init() error { switch q.AlgorithmType { case "t-digest", "": q.newAlgorithm = newTDigest case "exact R7": q.newAlgorithm = newExactR7 case "exact R8": q.newAlgorithm = newExactR8 default: return fmt.Errorf("unknown algorithm type %q", q.AlgorithmType) } if _, err := q.newAlgorithm(q.Compression); err != nil { return fmt.Errorf("cannot create %q algorithm: %w", q.AlgorithmType, err) } if len(q.Quantiles) == 0 { q.Quantiles = []float64{0.25, 0.5, 0.75} } duplicates := make(map[float64]bool) q.suffixes = make([]string, 0, len(q.Quantiles)) for _, qtl := range q.Quantiles { if qtl < 0.0 || qtl > 1.0 { return fmt.Errorf("quantile %v out of range", qtl) } if _, found := duplicates[qtl]; found { return fmt.Errorf("duplicate quantile %v", qtl) } duplicates[qtl] = true q.suffixes = append(q.suffixes, fmt.Sprintf("_%03d", int(qtl*100.0))) } q.Reset() return nil } func init() { aggregators.Add("quantile", func() telegraf.Aggregator { return &Quantile{Compression: 100} }) }