1
0
Fork 0
telegraf/plugins/inputs/dpdk/dpdk.go

293 lines
9 KiB
Go
Raw Permalink Normal View History

//go:generate ../../../tools/readme_config_includer/generator
//go:build linux
package dpdk
import (
_ "embed"
"errors"
"fmt"
"os"
"strings"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/inputs"
)
//go:embed sample.conf
var sampleConfig string
const (
defaultPathToSocket = "/var/run/dpdk/rte/dpdk_telemetry.v2"
defaultAccessTimeout = config.Duration(200 * time.Millisecond)
maxCommandLength = 56
maxCommandLengthWithParams = 1024
pluginName = "dpdk"
ethdevListCommand = "/ethdev/list"
rawdevListCommand = "/rawdev/list"
dpdkMetadataFieldPidName = "pid"
dpdkMetadataFieldVersionName = "version"
dpdkPluginOptionInMemory = "in_memory"
unreachableSocketBehaviorIgnore = "ignore"
unreachableSocketBehaviorError = "error"
)
type Dpdk struct {
SocketPath string `toml:"socket_path"`
AccessTimeout config.Duration `toml:"socket_access_timeout"`
DeviceTypes []string `toml:"device_types"`
EthdevConfig ethdevConfig `toml:"ethdev"`
AdditionalCommands []string `toml:"additional_commands"`
MetadataFields []string `toml:"metadata_fields"`
PluginOptions []string `toml:"plugin_options"`
UnreachableSocketBehavior string `toml:"unreachable_socket_behavior"`
Log telegraf.Logger `toml:"-"`
connectors []*dpdkConnector
rawdevCommands []string
ethdevCommands []string
ethdevExcludedCommandsFilter filter.Filter
socketGlobPath *globpath.GlobPath
}
type ethdevConfig struct {
EthdevExcludeCommands []string `toml:"exclude_commands"`
}
func (*Dpdk) SampleConfig() string {
return sampleConfig
}
func (dpdk *Dpdk) Init() error {
dpdk.setupDefaultValues()
err := dpdk.validateAdditionalCommands()
if err != nil {
return err
}
if dpdk.AccessTimeout < 0 {
return errors.New("socket_access_timeout should be positive number or equal to 0 (to disable timeouts)")
}
if len(dpdk.AdditionalCommands) == 0 && len(dpdk.DeviceTypes) == 0 {
return errors.New("plugin was configured with nothing to read")
}
dpdk.ethdevExcludedCommandsFilter, err = filter.Compile(dpdk.EthdevConfig.EthdevExcludeCommands)
if err != nil {
return fmt.Errorf("error occurred during filter preparation for ethdev excluded commands: %w", err)
}
if err = choice.Check(dpdk.UnreachableSocketBehavior, []string{unreachableSocketBehaviorError, unreachableSocketBehaviorIgnore}); err != nil {
return fmt.Errorf("unreachable_socket_behavior: %w", err)
}
glob, err := globpath.Compile(dpdk.SocketPath + "*")
if err != nil {
return err
}
dpdk.socketGlobPath = glob
return nil
}
func (dpdk *Dpdk) Start(telegraf.Accumulator) error {
return dpdk.maintainConnections()
}
// Gather function gathers all unique commands and processes each command sequentially
// Parallel processing could be achieved by running several instances of this plugin with different settings
func (dpdk *Dpdk) Gather(acc telegraf.Accumulator) error {
if err := dpdk.Start(acc); err != nil {
return err
}
for _, dpdkConn := range dpdk.connectors {
commands := dpdk.gatherCommands(acc, dpdkConn)
for _, command := range commands {
dpdkConn.processCommand(acc, dpdk.Log, command, dpdk.MetadataFields)
}
}
return nil
}
func (dpdk *Dpdk) Stop() {
for _, connector := range dpdk.connectors {
if err := connector.tryClose(); err != nil {
dpdk.Log.Warnf("Couldn't close connection for %q: %v", connector.pathToSocket, err)
}
}
dpdk.connectors = nil
}
// Setup default values for dpdk
func (dpdk *Dpdk) setupDefaultValues() {
if dpdk.SocketPath == "" {
dpdk.SocketPath = defaultPathToSocket
}
if dpdk.DeviceTypes == nil {
dpdk.DeviceTypes = []string{"ethdev"}
}
if dpdk.MetadataFields == nil {
dpdk.MetadataFields = []string{dpdkMetadataFieldPidName, dpdkMetadataFieldVersionName}
}
if dpdk.PluginOptions == nil {
dpdk.PluginOptions = []string{dpdkPluginOptionInMemory}
}
if len(dpdk.UnreachableSocketBehavior) == 0 {
dpdk.UnreachableSocketBehavior = unreachableSocketBehaviorError
}
dpdk.rawdevCommands = []string{"/rawdev/xstats"}
dpdk.ethdevCommands = []string{"/ethdev/stats", "/ethdev/xstats", "/ethdev/info", ethdevLinkStatusCommand}
}
func (dpdk *Dpdk) getDpdkInMemorySocketPaths() []string {
filePaths := dpdk.socketGlobPath.Match()
var results []string
for _, filePath := range filePaths {
fileInfo, err := os.Stat(filePath)
if err != nil || fileInfo.IsDir() || !strings.Contains(filePath, dpdkSocketTemplateName) {
continue
}
if isInMemorySocketPath(filePath, dpdk.SocketPath) {
results = append(results, filePath)
}
}
return results
}
// Checks that user-supplied commands are unique and match DPDK commands format
func (dpdk *Dpdk) validateAdditionalCommands() error {
dpdk.AdditionalCommands = uniqueValues(dpdk.AdditionalCommands)
for _, cmd := range dpdk.AdditionalCommands {
if len(cmd) == 0 {
return errors.New("got empty command")
}
if cmd[0] != '/' {
return fmt.Errorf("%q command should start with slash", cmd)
}
if commandWithoutParams := stripParams(cmd); len(commandWithoutParams) >= maxCommandLength {
return fmt.Errorf("%q command is too long. It shall be less than %v characters", commandWithoutParams, maxCommandLength)
}
if len(cmd) >= maxCommandLengthWithParams {
return fmt.Errorf("command with parameters %q shall be less than %v characters", cmd, maxCommandLengthWithParams)
}
}
return nil
}
// Establishes connections do DPDK telemetry sockets
func (dpdk *Dpdk) maintainConnections() error {
candidates := []string{dpdk.SocketPath}
if choice.Contains(dpdkPluginOptionInMemory, dpdk.PluginOptions) {
candidates = dpdk.getDpdkInMemorySocketPaths()
}
// Find sockets in the connected-sockets list that are not among
// the candidates anymore and thus need to be removed.
for i := 0; i < len(dpdk.connectors); i++ {
connector := dpdk.connectors[i]
if !choice.Contains(connector.pathToSocket, candidates) {
dpdk.Log.Debugf("Close unused connection: %s", connector.pathToSocket)
if closeErr := connector.tryClose(); closeErr != nil {
dpdk.Log.Warnf("Failed to close unused connection: %v", closeErr)
}
dpdk.connectors = append(dpdk.connectors[:i], dpdk.connectors[i+1:]...)
i--
}
}
// Find candidates that are not yet in the connected-sockets list as we
// need to connect to those.
for _, candidate := range candidates {
var found bool
for _, connector := range dpdk.connectors {
if candidate == connector.pathToSocket {
found = true
break
}
}
if !found {
connector := newDpdkConnector(candidate, dpdk.AccessTimeout)
connectionInitMessage, err := connector.connect()
if err != nil {
if dpdk.UnreachableSocketBehavior == unreachableSocketBehaviorError {
return fmt.Errorf("couldn't connect to socket %s: %w", candidate, err)
}
dpdk.Log.Warnf("Couldn't connect to socket %s: %v", candidate, err)
continue
}
dpdk.Log.Debugf("Successfully connected to the socket: %s. Version: %v running as process with PID %v with len %v",
candidate, connectionInitMessage.Version, connectionInitMessage.Pid, connectionInitMessage.MaxOutputLen)
dpdk.connectors = append(dpdk.connectors, connector)
}
}
if len(dpdk.connectors) == 0 {
errMsg := "no active sockets connections present"
if dpdk.UnreachableSocketBehavior == unreachableSocketBehaviorError {
return errors.New(errMsg)
}
dpdk.Log.Warnf("Unreachable socket issue occurred: %v", errMsg)
}
return nil
}
// Gathers all unique commands
func (dpdk *Dpdk) gatherCommands(acc telegraf.Accumulator, dpdkConnector *dpdkConnector) []string {
var commands []string
if choice.Contains("ethdev", dpdk.DeviceTypes) {
ethdevCommands := removeSubset(dpdk.ethdevCommands, dpdk.ethdevExcludedCommandsFilter)
ethdevCommands, err := dpdkConnector.appendCommandsWithParamsFromList(ethdevListCommand, ethdevCommands)
if err != nil {
acc.AddError(fmt.Errorf("error occurred during fetching of %q params: %w", ethdevListCommand, err))
}
commands = append(commands, ethdevCommands...)
}
if choice.Contains("rawdev", dpdk.DeviceTypes) {
rawdevCommands, err := dpdkConnector.appendCommandsWithParamsFromList(rawdevListCommand, dpdk.rawdevCommands)
if err != nil {
acc.AddError(fmt.Errorf("error occurred during fetching of %q params: %w", rawdevListCommand, err))
}
commands = append(commands, rawdevCommands...)
}
commands = append(commands, dpdk.AdditionalCommands...)
return uniqueValues(commands)
}
func init() {
inputs.Add(pluginName, func() telegraf.Input {
dpdk := &Dpdk{
// Setting it here (rather than in `Init()`) to distinguish between "zero" value,
// default value and don't having value in config at all.
AccessTimeout: defaultAccessTimeout,
}
return dpdk
})
}