1
0
Fork 0
telegraf/plugins/inputs/intel_rdt/publisher.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

176 lines
4.6 KiB
Go

//go:build !windows
package intel_rdt
import (
"context"
"errors"
"strings"
"time"
"github.com/influxdata/telegraf"
)
type parsedCoresMeasurement struct {
cores string
values []float64
time time.Time
}
type parsedProcessMeasurement struct {
process string
cores string
values []float64
time time.Time
}
// publisher for publish new RDT metrics to telegraf accumulator
type publisher struct {
acc telegraf.Accumulator
log telegraf.Logger
shortenedMetrics bool
bufferChanProcess chan processMeasurement
bufferChanCores chan string
errChan chan error
}
func newPublisher(acc telegraf.Accumulator, log telegraf.Logger, shortenedMetrics bool) publisher {
return publisher{
acc: acc,
log: log,
shortenedMetrics: shortenedMetrics,
bufferChanProcess: make(chan processMeasurement),
bufferChanCores: make(chan string),
errChan: make(chan error),
}
}
func (p *publisher) publish(ctx context.Context) {
go func() {
for {
select {
case newMeasurements := <-p.bufferChanCores:
p.publishCores(newMeasurements)
case newMeasurements := <-p.bufferChanProcess:
p.publishProcess(newMeasurements)
case err := <-p.errChan:
p.log.Error(err)
case <-ctx.Done():
return
}
}
}()
}
func (p *publisher) publishCores(measurement string) {
parsedCoresMeasurement, err := parseCoresMeasurement(measurement)
if err != nil {
p.errChan <- err
}
p.addToAccumulatorCores(parsedCoresMeasurement)
}
func (p *publisher) publishProcess(measurement processMeasurement) {
parsedProcessMeasurement, err := parseProcessesMeasurement(measurement)
if err != nil {
p.errChan <- err
}
p.addToAccumulatorProcesses(parsedProcessMeasurement)
}
func parseCoresMeasurement(measurements string) (parsedCoresMeasurement, error) {
splitCSV, err := splitCSVLineIntoValues(measurements)
if err != nil {
return parsedCoresMeasurement{}, err
}
timestamp, err := parseTime(splitCSV.timeValue)
if err != nil {
return parsedCoresMeasurement{}, err
}
// change string slice to one string and separate it by coma
coresString := strings.Join(splitCSV.coreOrPIDsValues, ",")
// trim unwanted quotes
coresString = strings.Trim(coresString, "\"")
values := make([]float64, 0, len(splitCSV.metricsValues))
for _, metric := range splitCSV.metricsValues {
parsedValue, err := parseFloat(metric)
if err != nil {
return parsedCoresMeasurement{}, err
}
values = append(values, parsedValue)
}
return parsedCoresMeasurement{coresString, values, timestamp}, nil
}
func (p *publisher) addToAccumulatorCores(measurement parsedCoresMeasurement) {
for i, value := range measurement.values {
if p.shortenedMetrics {
// 0: "IPC"
// 1: "LLC_Misses"
if i == 0 || i == 1 {
continue
}
}
tags := make(map[string]string, 2)
fields := make(map[string]interface{}, 1)
tags["cores"] = measurement.cores
tags["name"] = pqosMetricOrder[i]
fields["value"] = value
p.acc.AddFields("rdt_metric", fields, tags, measurement.time)
}
}
func parseProcessesMeasurement(measurement processMeasurement) (parsedProcessMeasurement, error) {
splitCSV, err := splitCSVLineIntoValues(measurement.measurement)
if err != nil {
return parsedProcessMeasurement{}, err
}
pids, err := findPIDsInMeasurement(measurement.measurement)
if err != nil {
return parsedProcessMeasurement{}, err
}
lenOfPIDs := len(strings.Split(pids, ","))
if lenOfPIDs > len(splitCSV.coreOrPIDsValues) {
return parsedProcessMeasurement{}, errors.New("detected more pids (quoted) than actual number of pids in csv line")
}
timestamp, err := parseTime(splitCSV.timeValue)
if err != nil {
return parsedProcessMeasurement{}, err
}
actualProcess := measurement.name
cores := strings.Trim(strings.Join(splitCSV.coreOrPIDsValues[lenOfPIDs:], ","), `"`)
values := make([]float64, 0, len(splitCSV.metricsValues))
for _, metric := range splitCSV.metricsValues {
parsedValue, err := parseFloat(metric)
if err != nil {
return parsedProcessMeasurement{}, err
}
values = append(values, parsedValue)
}
return parsedProcessMeasurement{actualProcess, cores, values, timestamp}, nil
}
func (p *publisher) addToAccumulatorProcesses(measurement parsedProcessMeasurement) {
for i, value := range measurement.values {
if p.shortenedMetrics {
// 0: "IPC"
// 1: "LLC_Misses"
if i == 0 || i == 1 {
continue
}
}
tags := make(map[string]string, 3)
fields := make(map[string]interface{}, 1)
tags["process"] = measurement.process
tags["cores"] = measurement.cores
tags["name"] = pqosMetricOrder[i]
fields["value"] = value
p.acc.AddFields("rdt_metric", fields, tags, measurement.time)
}
}