//go:build linux package dpdk import ( "encoding/json" "errors" "fmt" "net" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" parsers_json "github.com/influxdata/telegraf/plugins/parsers/json" ) const ( maxInitMessageLength = 1024 // based on https://github.com/DPDK/dpdk/blob/v22.07/lib/telemetry/telemetry.c#L352 dpdkSocketTemplateName = "dpdk_telemetry" ) type initMessage struct { Version string `json:"version"` Pid int `json:"pid"` MaxOutputLen uint32 `json:"max_output_len"` } type dpdkConnector struct { pathToSocket string accessTimeout time.Duration connection net.Conn initMessage *initMessage } func newDpdkConnector(pathToSocket string, accessTimeout config.Duration) *dpdkConnector { return &dpdkConnector{ pathToSocket: pathToSocket, accessTimeout: time.Duration(accessTimeout), } } // Connects to the socket // Since DPDK is a local unix socket, it is instantly returns error or connection, so there's no need to set timeout for it func (conn *dpdkConnector) connect() (*initMessage, error) { if err := isSocket(conn.pathToSocket); err != nil { return nil, err } connection, err := net.Dial("unixpacket", conn.pathToSocket) if err != nil { return nil, fmt.Errorf("failed to connect to the socket: %w", err) } conn.connection = connection conn.initMessage, err = conn.readInitMessage() if err != nil { if closeErr := conn.tryClose(); closeErr != nil { return nil, fmt.Errorf("%w and failed to close connection: %w", err, closeErr) } return nil, err } return conn.initMessage, nil } // Add metadata fields to data func (conn *dpdkConnector) addMetadataFields(metadataFields []string, data map[string]interface{}) { if conn.initMessage == nil { return } for _, field := range metadataFields { switch field { case dpdkMetadataFieldPidName: data[dpdkMetadataFieldPidName] = conn.initMessage.Pid case dpdkMetadataFieldVersionName: data[dpdkMetadataFieldVersionName] = conn.initMessage.Version } } } // Fetches all identifiers of devices and then creates all possible combinations of commands for each device func (conn *dpdkConnector) appendCommandsWithParamsFromList(listCommand string, commands []string) ([]string, error) { response, err := conn.getCommandResponse(listCommand) if err != nil { return nil, err } params, err := jsonToArray(response, listCommand) if err != nil { return nil, err } result := make([]string, 0, len(commands)*len(params)) for _, command := range commands { for _, param := range params { result = append(result, commandWithParams(command, param)) } } return result, nil } // Executes command using provided connection and returns response // If error (such as timeout) occurred, then connection is discarded and recreated // because otherwise behavior of connection is undefined (e.g. it could return result of timed out command instead of latest) func (conn *dpdkConnector) getCommandResponse(fullCommand string) ([]byte, error) { connection, err := conn.getConnection() if err != nil { return nil, fmt.Errorf("failed to get connection to execute %q command: %w", fullCommand, err) } err = conn.setTimeout() if err != nil { return nil, fmt.Errorf("failed to set timeout for %q command: %w", fullCommand, err) } _, err = connection.Write([]byte(fullCommand)) if err != nil { if closeErr := conn.tryClose(); closeErr != nil { return nil, fmt.Errorf("failed to send %q command: %w and failed to close connection: %w", fullCommand, err, closeErr) } return nil, fmt.Errorf("failed to send %q command: %w", fullCommand, err) } buf := make([]byte, conn.initMessage.MaxOutputLen) messageLength, err := connection.Read(buf) if err != nil { if closeErr := conn.tryClose(); closeErr != nil { return nil, fmt.Errorf("failed read response of %q command: %w and failed to close connection: %w", fullCommand, err, closeErr) } return nil, fmt.Errorf("failed to read response of %q command: %w", fullCommand, err) } if messageLength == 0 { return nil, fmt.Errorf("got empty response during execution of %q command", fullCommand) } return buf[:messageLength], nil } // Executes command, parses response and creates/writes metrics from response to accumulator func (conn *dpdkConnector) processCommand(acc telegraf.Accumulator, log telegraf.Logger, commandWithParams string, metadataFields []string) { buf, err := conn.getCommandResponse(commandWithParams) if err != nil { acc.AddError(err) return } var parsedResponse map[string]interface{} err = json.Unmarshal(buf, &parsedResponse) if err != nil { acc.AddError(fmt.Errorf("failed to unmarshal json response from %q command: %w", commandWithParams, err)) return } command := stripParams(commandWithParams) value := parsedResponse[command] if isEmpty(value) { log.Warnf("got empty json on %q command", commandWithParams) return } jf := parsers_json.JSONFlattener{} err = jf.FullFlattenJSON("", value, true, true) if err != nil { acc.AddError(fmt.Errorf("failed to flatten response: %w", err)) return } err = processCommandResponse(command, jf.Fields) if err != nil { log.Warnf("Failed to process a response of the command: %s. Error: %v. Continue to handle data", command, err) } // Add metadata fields if required conn.addMetadataFields(metadataFields, jf.Fields) // Add common fields acc.AddFields(pluginName, jf.Fields, map[string]string{ "command": command, "params": getParams(commandWithParams), }) } func (conn *dpdkConnector) tryClose() error { if conn.connection == nil { return nil } err := conn.connection.Close() conn.connection = nil if err != nil { return err } return nil } func (conn *dpdkConnector) setTimeout() error { if conn.connection == nil { return errors.New("connection had not been established before") } if conn.accessTimeout == 0 { return conn.connection.SetDeadline(time.Time{}) } return conn.connection.SetDeadline(time.Now().Add(conn.accessTimeout)) } // Returns connections, if connection is not created then function tries to recreate it func (conn *dpdkConnector) getConnection() (net.Conn, error) { if conn.connection == nil { _, err := conn.connect() if err != nil { return nil, err } } return conn.connection, nil } // Reads InitMessage for connection. Should be read for each connection, otherwise InitMessage is returned as response for first command. func (conn *dpdkConnector) readInitMessage() (*initMessage, error) { buf := make([]byte, maxInitMessageLength) err := conn.setTimeout() if err != nil { return nil, fmt.Errorf("failed to set timeout: %w", err) } messageLength, err := conn.connection.Read(buf) if err != nil { return nil, fmt.Errorf("failed to read InitMessage: %w", err) } var connectionInitMessage initMessage err = json.Unmarshal(buf[:messageLength], &connectionInitMessage) if err != nil { return nil, fmt.Errorf("failed to unmarshal response: %w", err) } if connectionInitMessage.MaxOutputLen == 0 { return nil, errors.New("failed to read maxOutputLen information") } return &connectionInitMessage, nil }