405 lines
11 KiB
Go
405 lines
11 KiB
Go
//go:generate ../../../tools/readme_config_includer/generator
|
|
//go:build linux && amd64
|
|
|
|
package intel_pmt
|
|
|
|
import (
|
|
_ "embed"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"html"
|
|
"os"
|
|
"path/filepath"
|
|
"regexp"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/PaesslerAG/gval"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/plugins/inputs"
|
|
)
|
|
|
|
//go:embed sample.conf
|
|
var sampleConfig string
|
|
|
|
var hexToDecRegex = regexp.MustCompile(`0x[0-9a-fA-F]+`)
|
|
|
|
const (
|
|
defaultPmtBasePath = "/sys/class/intel_pmt"
|
|
pluginName = "intel_pmt"
|
|
)
|
|
|
|
type IntelPMT struct {
|
|
PmtSpec string `toml:"spec"`
|
|
DatatypeFilter []string `toml:"datatypes_enabled"`
|
|
SampleFilter []string `toml:"samples_enabled"`
|
|
Log telegraf.Logger `toml:"-"`
|
|
|
|
pmtBasePath string
|
|
reader sourceReader
|
|
pmtTelemetryFiles map[string]pmtFileInfo
|
|
pmtMetadata *pmt
|
|
pmtAggregator map[string]aggregator
|
|
pmtAggregatorInterface map[string]aggregatorInterface
|
|
pmtTransformations map[string]map[string]transformation
|
|
}
|
|
|
|
type pmtFileInfo []fileInfo
|
|
|
|
type fileInfo struct {
|
|
path string
|
|
numaNode string
|
|
pciBdf string // PCI Bus:Device.Function (BDF)
|
|
}
|
|
|
|
func (*IntelPMT) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
func (p *IntelPMT) Init() error {
|
|
err := p.checkPmtSpec()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = p.explorePmtInSysfs()
|
|
if err != nil {
|
|
return fmt.Errorf("error while exploring pmt sysfs: %w", err)
|
|
}
|
|
|
|
return p.parseXMLs()
|
|
}
|
|
|
|
func (p *IntelPMT) Gather(acc telegraf.Accumulator) error {
|
|
var wg sync.WaitGroup
|
|
var hasError atomic.Bool
|
|
for guid := range p.pmtTelemetryFiles {
|
|
wg.Add(1)
|
|
go func(guid string, fileInfo []fileInfo) {
|
|
defer wg.Done()
|
|
for _, info := range fileInfo {
|
|
data, err := os.ReadFile(info.path)
|
|
if err != nil {
|
|
hasError.Store(true)
|
|
acc.AddError(fmt.Errorf("gathering metrics failed: %w", err))
|
|
return
|
|
}
|
|
|
|
err = p.aggregateSamples(acc, guid, data, info.numaNode, info.pciBdf)
|
|
if err != nil {
|
|
hasError.Store(true)
|
|
acc.AddError(fmt.Errorf("gathering metrics failed: %w", err))
|
|
return
|
|
}
|
|
}
|
|
}(guid, p.pmtTelemetryFiles[guid])
|
|
}
|
|
wg.Wait()
|
|
|
|
if hasError.Load() {
|
|
return errors.New("error(s) occurred while gathering metrics")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// checkPmtSpec checks if provided PmtSpec is correct and readable.
|
|
//
|
|
// PmtSpec is expected to be an absolute filepath.
|
|
//
|
|
// Returns:
|
|
//
|
|
// error - error if PmtSpec is invalid, not readable, or not absolute.
|
|
func (p *IntelPMT) checkPmtSpec() error {
|
|
if p.PmtSpec == "" {
|
|
return errors.New("pmt spec is empty")
|
|
}
|
|
|
|
if !isFileReadable(p.PmtSpec) {
|
|
return fmt.Errorf("provided pmt spec is not readable %q", p.PmtSpec)
|
|
}
|
|
|
|
lastSlash := strings.LastIndex(p.PmtSpec, "/")
|
|
// if PmtSpec contains no "/"
|
|
if lastSlash == -1 {
|
|
return errors.New("provided pmt spec is not an absolute path")
|
|
}
|
|
p.pmtBasePath = p.PmtSpec[:lastSlash]
|
|
p.reader = fileReader{}
|
|
|
|
return nil
|
|
}
|
|
|
|
// explorePmtInSysfs finds necessary paths in pmt sysfs.
|
|
//
|
|
// This method finds "telem" files, used to retrieve telemetry values
|
|
// and saves them under their corresponding GUID.
|
|
// It also finds which NUMA node and PCI BDF the samples belong to.
|
|
//
|
|
// Returns:
|
|
//
|
|
// error - error if any of the operations failed.
|
|
func (p *IntelPMT) explorePmtInSysfs() error {
|
|
pmtDirectories, err := os.ReadDir(defaultPmtBasePath)
|
|
if err != nil {
|
|
return fmt.Errorf("error reading pmt directory: %w", err)
|
|
}
|
|
p.pmtTelemetryFiles = make(map[string]pmtFileInfo)
|
|
for _, dir := range pmtDirectories {
|
|
if !strings.HasPrefix(dir.Name(), "telem") {
|
|
continue
|
|
}
|
|
telemDirPath := filepath.Join(defaultPmtBasePath, dir.Name())
|
|
symlinkInfo, err := os.Stat(telemDirPath)
|
|
if err != nil {
|
|
return fmt.Errorf("error resolving symlink for directory %q: %w", telemDirPath, err)
|
|
}
|
|
if !symlinkInfo.IsDir() {
|
|
continue
|
|
}
|
|
|
|
pmtGUIDPath := filepath.Join(telemDirPath, "guid")
|
|
rawGUID, err := os.ReadFile(pmtGUIDPath)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot read GUID: %w", err)
|
|
}
|
|
// cut the newline char
|
|
tID := strings.TrimSpace(string(rawGUID))
|
|
|
|
telemPath := filepath.Join(telemDirPath, "telem")
|
|
if !isFileReadable(telemPath) {
|
|
p.Log.Warnf("telem file is not readable %q", telemPath)
|
|
continue
|
|
}
|
|
|
|
telemDevicePath := filepath.Join(telemDirPath, "device")
|
|
telemDeviceSymlink, err := filepath.EvalSymlinks(telemDevicePath)
|
|
if err != nil {
|
|
return fmt.Errorf("error while evaluating symlink %q: %w", telemDeviceSymlink, err)
|
|
}
|
|
|
|
telemDevicePciBdf := filepath.Base(filepath.Join(telemDeviceSymlink, ".."))
|
|
|
|
numaNodePath := filepath.Join(telemDeviceSymlink, "..", "numa_node")
|
|
|
|
numaNode, err := os.ReadFile(numaNodePath)
|
|
if err != nil {
|
|
return fmt.Errorf("error while reading numa_node file %q: %w", numaNodePath, err)
|
|
}
|
|
numaNodeString := strings.TrimSpace(string(numaNode))
|
|
if numaNodeString == "" {
|
|
return fmt.Errorf("numa_node file %q is empty", numaNodePath)
|
|
}
|
|
|
|
fi := fileInfo{
|
|
path: telemPath,
|
|
numaNode: numaNodeString,
|
|
pciBdf: telemDevicePciBdf,
|
|
}
|
|
p.pmtTelemetryFiles[tID] = append(p.pmtTelemetryFiles[tID], fi)
|
|
}
|
|
if len(p.pmtTelemetryFiles) == 0 {
|
|
return errors.New("no telemetry sources found - current platform doesn't support PMT or proper permissions needed to read them")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func isFileReadable(path string) bool {
|
|
if _, err := os.Stat(path); err != nil {
|
|
return false
|
|
}
|
|
|
|
file, err := os.Open(path)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
file.Close()
|
|
|
|
return true
|
|
}
|
|
|
|
// getSampleValues reads all sample values for all sample groups.
|
|
//
|
|
// This method reads all telemetry samples for given GUID from given data
|
|
// and saves it in results map.
|
|
//
|
|
// Parameters:
|
|
//
|
|
// guid - GUID saying which Aggregator XML will be read.
|
|
// data - data read from "telem" file.
|
|
//
|
|
// Returns:
|
|
//
|
|
// map[string]uint64 - results map with read data.
|
|
// error - error if getting any of the values failed.
|
|
func (p *IntelPMT) getSampleValues(guid string, data []byte) (map[string]uint64, error) {
|
|
results := make(map[string]uint64)
|
|
for _, group := range p.pmtAggregator[guid].SampleGroup {
|
|
// Determine starting position of the Sample Group.
|
|
// Each Sample Group occupies 8 bytes.
|
|
offset := 8 * group.SampleID
|
|
for _, sample := range group.Sample {
|
|
var err error
|
|
results[sample.SampleID], err = getTelemSample(sample, data, offset)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
return results, nil
|
|
}
|
|
|
|
// getTelemSample extracts a telemetry sample from a given buffer.
|
|
//
|
|
// This function uses offset as a starting position.
|
|
// Then it uses LSB and MSB from sample to determine which bits
|
|
// to read from the given buffer.
|
|
//
|
|
// Parameters:
|
|
//
|
|
// s - sample from Aggregator XML containing LSB and MSB info.
|
|
// buf - the byte buffer containing the telemetry data.
|
|
// offset - the starting position (in bytes) in the buffer.
|
|
//
|
|
// Returns:
|
|
//
|
|
// uint64 - the extracted sample as a 64-bit unsigned integer.
|
|
// error - error if offset+8 exceeds the size of the buffer.
|
|
func getTelemSample(s sample, buf []byte, offset uint64) (uint64, error) {
|
|
if len(buf) < int(offset+8) {
|
|
return 0, fmt.Errorf("error reading telemetry sample: insufficient bytes from offset %d in buffer of size %d", offset, len(buf))
|
|
}
|
|
data := binary.LittleEndian.Uint64(buf[offset : offset+8])
|
|
|
|
// Apply mask and shift right
|
|
value := (data & s.mask) >> s.Lsb
|
|
return value, nil
|
|
}
|
|
|
|
// aggregateSamples outputs transformed metrics to Telegraf.
|
|
//
|
|
// This method transforms low level samples
|
|
// into high-level samples with appropriate transformation equation.
|
|
// Then it creates fields and tags and adds them to Telegraf Accumulator.
|
|
//
|
|
// Parameters:
|
|
//
|
|
// guid - GUID saying which Aggregator Interface will be read.
|
|
// data - contents of the "telem" file.
|
|
// numaNode - which NUMA node this sample belongs to.
|
|
// pciBdf - PCI Bus:Device.Function (BDF) this sample belongs to.
|
|
// acc - Telegraf Accumulator.
|
|
//
|
|
// Returns:
|
|
//
|
|
// error - error if getting values has failed, if sample IDref is missing or if equation evaluation has failed.
|
|
func (p *IntelPMT) aggregateSamples(acc telegraf.Accumulator, guid string, data []byte, numaNode, pciBdf string) error {
|
|
results, err := p.getSampleValues(guid, data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, sample := range p.pmtAggregatorInterface[guid].AggregatorSamples.AggregatorSample {
|
|
parameters := make(map[string]interface{})
|
|
for _, input := range sample.TransformInputs.TransformInput {
|
|
if _, ok := results[input.SampleIDREF]; !ok {
|
|
return fmt.Errorf("sample with IDREF %q has not been found", input.SampleIDREF)
|
|
}
|
|
parameters[input.VarName] = results[input.SampleIDREF]
|
|
}
|
|
eq := transformEquation(p.pmtTransformations[guid][sample.TransformREF].Transform)
|
|
res, err := eval(eq, parameters)
|
|
if err != nil {
|
|
return fmt.Errorf("error during eval of sample %q: %w", sample.SampleName, err)
|
|
}
|
|
fields := map[string]interface{}{
|
|
"value": res,
|
|
}
|
|
tags := map[string]string{
|
|
"guid": guid,
|
|
"numa_node": numaNode,
|
|
"pci_bdf": pciBdf,
|
|
"sample_name": sample.SampleName,
|
|
"sample_group": sample.SampleGroup,
|
|
"datatype_idref": sample.DatatypeIDRef,
|
|
}
|
|
if sample.core != "" {
|
|
tags["core"] = sample.core
|
|
}
|
|
if sample.cha != "" {
|
|
tags["cha"] = sample.cha
|
|
}
|
|
|
|
acc.AddFields(pluginName, fields, tags)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// transformEquation changes the equation string to be ready for eval.
|
|
//
|
|
// This function removes "$" signs, which prefixes every parameter in equations.
|
|
// Then escapes special characters from XML
|
|
// like "<" into "<", "&" into "&" and ">" into ">"
|
|
// so they can be used in evaluation.
|
|
//
|
|
// Parameters:
|
|
//
|
|
// eq - string which should be transformed.
|
|
//
|
|
// Returns:
|
|
//
|
|
// string - transformed string.
|
|
func transformEquation(eq string) string {
|
|
withoutDollar := strings.ReplaceAll(eq, "$", "")
|
|
decoded := html.UnescapeString(withoutDollar)
|
|
return decoded
|
|
}
|
|
|
|
// eval calculates the value of given equation for given parameters.
|
|
//
|
|
// This function evaluates arbitrary equations with parameters.
|
|
// It substitutes the parameters in the equation with their values
|
|
// and calculates its value.
|
|
// Example: equation "a + b", with params: a: 2, b: 3.
|
|
// a and b will be substituted with their values so the equation becomes "2 + 3".
|
|
// If any of the parameters are missing then the equation is invalid and returns an error.
|
|
// Parameters:
|
|
//
|
|
// eq - equation which should be calculated.
|
|
// params - parameters to substitute in the equation.
|
|
//
|
|
// Returns:
|
|
//
|
|
// interface - the value of calculation.
|
|
// error - error if the equation is empty, if hex to dec conversion failed or if the equation is invalid.
|
|
func eval(eq string, params map[string]interface{}) (interface{}, error) {
|
|
if eq == "" {
|
|
return nil, errors.New("no transformation equation found")
|
|
}
|
|
// gval doesn't support hexadecimals
|
|
eq = hexToDecRegex.ReplaceAllStringFunc(eq, hexToDec)
|
|
if eq == "" {
|
|
return nil, errors.New("error during hex to decimal conversion")
|
|
}
|
|
result, err := gval.Evaluate(eq, params)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func hexToDec(hexStr string) string {
|
|
dec, err := strconv.ParseInt(hexStr, 0, 64)
|
|
if err != nil {
|
|
return ""
|
|
}
|
|
return strconv.FormatInt(dec, 10)
|
|
}
|
|
|
|
func init() {
|
|
inputs.Add(pluginName, func() telegraf.Input {
|
|
return &IntelPMT{}
|
|
})
|
|
}
|