1
0
Fork 0
telegraf/plugins/parsers/xpath/parser.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

703 lines
20 KiB
Go

package xpath
import (
"encoding/base64"
"encoding/hex"
"errors"
"fmt"
"reflect"
"slices"
"strconv"
"strings"
"time"
"github.com/antchfx/jsonquery"
path "github.com/antchfx/xpath"
"github.com/srebhan/cborquery"
"github.com/srebhan/protobufquery"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
)
type dataNode interface{}
type dataDocument interface {
Parse(buf []byte) (dataNode, error)
QueryAll(node dataNode, expr string) ([]dataNode, error)
CreateXPathNavigator(node dataNode) path.NodeNavigator
GetNodePath(node, relativeTo dataNode, sep string) string
GetNodeName(node dataNode, sep string, withParent bool) string
OutputXML(node dataNode) string
}
type Parser struct {
Format string `toml:"-"`
ProtobufMessageFiles []string `toml:"xpath_protobuf_files"`
ProtobufMessageDef string `toml:"xpath_protobuf_file" deprecated:"1.32.0;1.40.0;use 'xpath_protobuf_files' instead"`
ProtobufMessageType string `toml:"xpath_protobuf_type"`
ProtobufImportPaths []string `toml:"xpath_protobuf_import_paths"`
ProtobufSkipBytes int64 `toml:"xpath_protobuf_skip_bytes"`
PrintDocument bool `toml:"xpath_print_document"`
AllowEmptySelection bool `toml:"xpath_allow_empty_selection"`
NativeTypes bool `toml:"xpath_native_types"`
Trace bool `toml:"xpath_trace" deprecated:"1.35.0;use 'log_level' 'trace' instead"`
Configs []Config `toml:"xpath"`
DefaultMetricName string `toml:"-"`
DefaultTags map[string]string `toml:"-"`
Log telegraf.Logger `toml:"-"`
// Required for backward compatibility
ConfigsXML []Config `toml:"xml" deprecated:"1.23.1;1.35.0;use 'xpath' instead"`
ConfigsJSON []Config `toml:"xpath_json" deprecated:"1.23.1;1.35.0;use 'xpath' instead"`
ConfigsMsgPack []Config `toml:"xpath_msgpack" deprecated:"1.23.1;1.35.0;use 'xpath' instead"`
ConfigsProto []Config `toml:"xpath_protobuf" deprecated:"1.23.1;1.35.0;use 'xpath' instead"`
document dataDocument
}
type Config struct {
MetricQuery string `toml:"metric_name"`
Selection string `toml:"metric_selection"`
Timestamp string `toml:"timestamp"`
TimestampFmt string `toml:"timestamp_format"`
Timezone string `toml:"timezone"`
Tags map[string]string `toml:"tags"`
Fields map[string]string `toml:"fields"`
FieldsInt map[string]string `toml:"fields_int"`
FieldsHex []string `toml:"fields_bytes_as_hex"`
FieldsBase64 []string `toml:"fields_bytes_as_base64"`
FieldSelection string `toml:"field_selection"`
FieldNameQuery string `toml:"field_name"`
FieldValueQuery string `toml:"field_value"`
FieldNameExpand bool `toml:"field_name_expansion"`
TagSelection string `toml:"tag_selection"`
TagNameQuery string `toml:"tag_name"`
TagValueQuery string `toml:"tag_value"`
TagNameExpand bool `toml:"tag_name_expansion"`
FieldsHexFilter filter.Filter
FieldsBase64Filter filter.Filter
Location *time.Location
}
func (p *Parser) Init() error {
switch p.Format {
case "", "xml":
p.document = &xmlDocument{}
// Required for backward compatibility
if len(p.ConfigsXML) > 0 {
p.Configs = append(p.Configs, p.ConfigsXML...)
config.PrintOptionDeprecationNotice("parsers.xpath", "xml", telegraf.DeprecationInfo{
Since: "1.23.1",
RemovalIn: "1.35.0",
Notice: "use 'xpath' instead",
})
}
case "xpath_cbor":
p.document = &cborDocument{}
case "xpath_json":
p.document = &jsonDocument{}
// Required for backward compatibility
if len(p.ConfigsJSON) > 0 {
p.Configs = append(p.Configs, p.ConfigsJSON...)
config.PrintOptionDeprecationNotice("parsers.xpath", "xpath_json", telegraf.DeprecationInfo{
Since: "1.23.1",
RemovalIn: "1.35.0",
Notice: "use 'xpath' instead",
})
}
case "xpath_msgpack":
p.document = &msgpackDocument{}
// Required for backward compatibility
if len(p.ConfigsMsgPack) > 0 {
p.Configs = append(p.Configs, p.ConfigsMsgPack...)
config.PrintOptionDeprecationNotice("parsers.xpath", "xpath_msgpack", telegraf.DeprecationInfo{
Since: "1.23.1",
RemovalIn: "1.35.0",
Notice: "use 'xpath' instead",
})
}
case "xpath_protobuf":
if p.ProtobufMessageDef != "" && !slices.Contains(p.ProtobufMessageFiles, p.ProtobufMessageDef) {
p.ProtobufMessageFiles = append(p.ProtobufMessageFiles, p.ProtobufMessageDef)
}
pbdoc := protobufDocument{
MessageFiles: p.ProtobufMessageFiles,
MessageType: p.ProtobufMessageType,
ImportPaths: p.ProtobufImportPaths,
SkipBytes: p.ProtobufSkipBytes,
Log: p.Log,
}
if err := pbdoc.Init(); err != nil {
return err
}
p.document = &pbdoc
// Required for backward compatibility
if len(p.ConfigsProto) > 0 {
p.Configs = append(p.Configs, p.ConfigsProto...)
config.PrintOptionDeprecationNotice("parsers.xpath", "xpath_proto", telegraf.DeprecationInfo{
Since: "1.23.1",
RemovalIn: "1.35.0",
Notice: "use 'xpath' instead",
})
}
default:
return fmt.Errorf("unknown data-format %q for xpath parser", p.Format)
}
// Make sure we do have a metric name
if p.DefaultMetricName == "" {
return errors.New("missing default metric name")
}
// Update the configs with default values
for i, cfg := range p.Configs {
if cfg.Selection == "" {
cfg.Selection = "/"
}
if cfg.TimestampFmt == "" {
cfg.TimestampFmt = "unix"
}
if cfg.Timezone == "" {
cfg.Location = time.UTC
} else {
loc, err := time.LoadLocation(cfg.Timezone)
if err != nil {
return fmt.Errorf("invalid location in config %d: %w", i+1, err)
}
cfg.Location = loc
}
f, err := filter.Compile(cfg.FieldsHex)
if err != nil {
return fmt.Errorf("creating hex-fields filter failed: %w", err)
}
cfg.FieldsHexFilter = f
bf, err := filter.Compile(cfg.FieldsBase64)
if err != nil {
return fmt.Errorf("creating base64-fields filter failed: %w", err)
}
cfg.FieldsBase64Filter = bf
p.Configs[i] = cfg
}
return nil
}
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
t := time.Now()
// Parse the XML
doc, err := p.document.Parse(buf)
if err != nil {
return nil, err
}
if p.PrintDocument {
p.Log.Debugf("XML document equivalent: %q", p.document.OutputXML(doc))
}
// Queries
metrics := make([]telegraf.Metric, 0)
p.Log.Debugf("Number of configs: %d", len(p.Configs))
for _, cfg := range p.Configs {
selectedNodes, err := p.document.QueryAll(doc, cfg.Selection)
if err != nil {
return nil, err
}
if (len(selectedNodes) < 1 || selectedNodes[0] == nil) && !p.AllowEmptySelection {
p.debugEmptyQuery("metric selection", doc, cfg.Selection)
return metrics, errors.New("cannot parse with empty selection node")
}
p.Log.Debugf("Number of selected metric nodes: %d", len(selectedNodes))
for _, selected := range selectedNodes {
m, err := p.parseQuery(t, doc, selected, cfg)
if err != nil {
return metrics, err
}
metrics = append(metrics, m)
}
}
return metrics, nil
}
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line))
if err != nil {
return nil, err
}
switch len(metrics) {
case 0:
return nil, nil
case 1:
return metrics[0], nil
default:
return metrics[0], fmt.Errorf("cannot parse line with multiple (%d) metrics", len(metrics))
}
}
func (p *Parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}
func (p *Parser) parseQuery(starttime time.Time, doc, selected dataNode, cfg Config) (telegraf.Metric, error) {
var timestamp time.Time
var metricname string
// Determine the metric name. If a query was specified, use the result of this query and the default metric name
// otherwise.
metricname = p.DefaultMetricName
if len(cfg.MetricQuery) > 0 {
v, err := p.executeQuery(doc, selected, cfg.MetricQuery)
if err != nil {
return nil, fmt.Errorf("failed to query metric name: %w", err)
}
var ok bool
if metricname, ok = v.(string); !ok {
if v == nil {
p.Log.Infof("Hint: Empty metric-name-node. If you wanted to set a constant please use `metric_name = \"'name'\"`.")
}
return nil, fmt.Errorf("failed to query metric name: query result is of type %T not 'string'", v)
}
}
// By default take the time the parser was invoked and override the value
// with the queried timestamp if an expression was specified.
timestamp = starttime
if len(cfg.Timestamp) > 0 {
v, err := p.executeQuery(doc, selected, cfg.Timestamp)
if err != nil {
return nil, fmt.Errorf("failed to query timestamp: %w", err)
}
if v != nil {
timestamp, err = internal.ParseTimestamp(cfg.TimestampFmt, v, cfg.Location)
if err != nil {
return nil, fmt.Errorf("failed to parse timestamp: %w", err)
}
}
}
// Query tags and add default ones
tags := make(map[string]string)
// Handle the tag batch definitions if any.
if len(cfg.TagSelection) > 0 {
tagnamequery := "name()"
tagvaluequery := "."
if len(cfg.TagNameQuery) > 0 {
tagnamequery = cfg.TagNameQuery
}
if len(cfg.TagValueQuery) > 0 {
tagvaluequery = cfg.TagValueQuery
}
// Query all tags
selectedTagNodes, err := p.document.QueryAll(selected, cfg.TagSelection)
if err != nil {
return nil, err
}
p.Log.Debugf("Number of selected tag nodes: %d", len(selectedTagNodes))
if len(selectedTagNodes) > 0 && selectedTagNodes[0] != nil {
for _, selectedtag := range selectedTagNodes {
n, err := p.executeQuery(doc, selectedtag, tagnamequery)
if err != nil {
return nil, fmt.Errorf("failed to query tag name with query %q: %w", tagnamequery, err)
}
name, ok := n.(string)
if !ok {
return nil, fmt.Errorf("failed to query tag name with query %q: result is not a string (%v)", tagnamequery, n)
}
name = p.constructFieldName(selected, selectedtag, name, cfg.TagNameExpand)
v, err := p.executeQuery(doc, selectedtag, tagvaluequery)
if err != nil {
return nil, fmt.Errorf("failed to query tag value for %q: %w", name, err)
}
// Check if field name already exists and if so, append an index number.
if _, ok := tags[name]; ok {
for i := 1; ; i++ {
p := name + "_" + strconv.Itoa(i)
if _, ok := tags[p]; !ok {
name = p
break
}
}
}
// Convert the tag to be a string
s, err := internal.ToString(v)
if err != nil {
return nil, fmt.Errorf("failed to query tag value for %q: result is not a string (%v)", name, v)
}
tags[name] = s
}
} else {
p.debugEmptyQuery("tag selection", selected, cfg.TagSelection)
}
}
// Handle explicitly defined tags
for name, query := range cfg.Tags {
// Execute the query and cast the returned values into strings
v, err := p.executeQuery(doc, selected, query)
if err != nil {
return nil, fmt.Errorf("failed to query tag %q: %w", name, err)
}
switch v := v.(type) {
case string:
tags[name] = v
case bool:
tags[name] = strconv.FormatBool(v)
case float64:
tags[name] = strconv.FormatFloat(v, 'G', -1, 64)
case nil:
continue
default:
return nil, fmt.Errorf("unknown format '%T' for tag %q", v, name)
}
}
// Add default tags
for name, v := range p.DefaultTags {
tags[name] = v
}
// Query fields
fields := make(map[string]interface{})
// Handle the field batch definitions if any.
if len(cfg.FieldSelection) > 0 {
fieldnamequery := "name()"
fieldvaluequery := "."
if len(cfg.FieldNameQuery) > 0 {
fieldnamequery = cfg.FieldNameQuery
}
if len(cfg.FieldValueQuery) > 0 {
fieldvaluequery = cfg.FieldValueQuery
}
// Query all fields
selectedFieldNodes, err := p.document.QueryAll(selected, cfg.FieldSelection)
if err != nil {
return nil, err
}
p.Log.Debugf("Number of selected field nodes: %d", len(selectedFieldNodes))
if len(selectedFieldNodes) > 0 && selectedFieldNodes[0] != nil {
for _, selectedfield := range selectedFieldNodes {
n, err := p.executeQuery(doc, selectedfield, fieldnamequery)
if err != nil {
return nil, fmt.Errorf("failed to query field name with query %q: %w", fieldnamequery, err)
}
name, ok := n.(string)
if !ok {
return nil, fmt.Errorf("failed to query field name with query %q: result is not a string (%v)", fieldnamequery, n)
}
name = p.constructFieldName(selected, selectedfield, name, cfg.FieldNameExpand)
v, err := p.executeQuery(doc, selectedfield, fieldvaluequery)
if err != nil {
return nil, fmt.Errorf("failed to query field value for %q: %w", name, err)
}
// Check if field name already exists and if so, append an index number.
if _, ok := fields[name]; ok {
for i := 1; ; i++ {
p := name + "_" + strconv.Itoa(i)
if _, ok := fields[p]; !ok {
name = p
break
}
}
}
// Handle complex types which would be dropped otherwise for
// native type handling
if v != nil {
switch reflect.TypeOf(v).Kind() {
case reflect.Array, reflect.Slice, reflect.Map:
if b, ok := v.([]byte); ok {
if cfg.FieldsHexFilter != nil && cfg.FieldsHexFilter.Match(name) {
v = hex.EncodeToString(b)
}
if cfg.FieldsBase64Filter != nil && cfg.FieldsBase64Filter.Match(name) {
v = base64.StdEncoding.EncodeToString(b)
}
} else {
v = fmt.Sprintf("%v", v)
}
}
}
fields[name] = v
}
} else {
p.debugEmptyQuery("field selection", selected, cfg.FieldSelection)
}
}
// Handle explicitly defined fields
for name, query := range cfg.FieldsInt {
// Execute the query and cast the returned values into integers
v, err := p.executeQuery(doc, selected, query)
if err != nil {
return nil, fmt.Errorf("failed to query field (int) %q: %w", name, err)
}
switch v := v.(type) {
case string:
fields[name], err = strconv.ParseInt(v, 10, 54)
if err != nil {
return nil, fmt.Errorf("failed to parse field (int) %q: %w", name, err)
}
case bool:
fields[name] = int64(0)
if v {
fields[name] = int64(1)
}
case float64:
fields[name] = int64(v)
case nil:
continue
default:
return nil, fmt.Errorf("unknown format '%T' for field (int) %q", v, name)
}
}
for name, query := range cfg.Fields {
// Execute the query and store the result in fields
v, err := p.executeQuery(doc, selected, query)
if err != nil {
return nil, fmt.Errorf("failed to query field %q: %w", name, err)
}
// Handle complex types which would be dropped otherwise for
// native type handling
if v != nil {
switch reflect.TypeOf(v).Kind() {
case reflect.Array, reflect.Slice, reflect.Map:
if b, ok := v.([]byte); ok {
if cfg.FieldsHexFilter != nil && cfg.FieldsHexFilter.Match(name) {
v = hex.EncodeToString(b)
}
if cfg.FieldsBase64Filter != nil && cfg.FieldsBase64Filter.Match(name) {
v = base64.StdEncoding.EncodeToString(b)
}
} else {
v = fmt.Sprintf("%v", v)
}
}
}
fields[name] = v
}
return metric.New(metricname, tags, fields, timestamp), nil
}
func (p *Parser) executeQuery(doc, selected dataNode, query string) (r interface{}, err error) {
// Check if the query is relative or absolute and set the root for the query
root := selected
if strings.HasPrefix(query, "/") {
root = doc
}
// Compile the query
expr, err := path.Compile(query)
if err != nil {
return nil, fmt.Errorf("failed to compile query %q: %w", query, err)
}
// Evaluate the compiled expression and handle returned node-iterators
// separately. Those iterators will be returned for queries directly
// referencing a node (value or attribute).
n := expr.Evaluate(p.document.CreateXPathNavigator(root))
iter, ok := n.(*path.NodeIterator)
if !ok {
return n, nil
}
// We got an iterator, so take the first match and get the referenced
// property. This will always be a string.
if iter.MoveNext() {
current := iter.Current()
// If the dataformat supports native types and if support is
// enabled, we should return the native type of the data
if p.NativeTypes {
switch nn := current.(type) {
case *cborquery.NodeNavigator:
return nn.GetValue(), nil
case *jsonquery.NodeNavigator:
return nn.GetValue(), nil
case *protobufquery.NodeNavigator:
return nn.GetValue(), nil
}
}
return iter.Current().Value(), nil
}
return nil, nil
}
func splitLastPathElement(query string) []string {
// This is a rudimentary xpath-parser that splits the path
// into the last path element and the remaining path-part.
// The last path element is then further split into
// parts such as attributes or selectors. Each returned
// element is a full path!
// Nothing left
if query == "" || query == "/" || query == "//" || query == "." {
return nil
}
separatorIdx := strings.LastIndex(query, "/")
if separatorIdx < 0 {
query = "./" + query
separatorIdx = 1
}
// For double slash we want to split at the first slash
if separatorIdx > 0 && query[separatorIdx-1] == byte('/') {
separatorIdx--
}
base := query[:separatorIdx]
if base == "" {
base = "/"
}
elements := make([]string, 0, 3)
elements = append(elements, base)
offset := separatorIdx
if i := strings.Index(query[offset:], "::"); i >= 0 {
// Check for axis operator
offset += i
elements = append(elements, query[:offset]+"::*")
}
if i := strings.Index(query[offset:], "["); i >= 0 {
// Check for predicates
offset += i
elements = append(elements, query[:offset])
} else if i := strings.Index(query[offset:], "@"); i >= 0 {
// Check for attributes
offset += i
elements = append(elements, query[:offset])
}
return elements
}
func (p *Parser) constructFieldName(root, node dataNode, name string, expand bool) string {
var expansion string
// In case the name is empty we should determine the current node's name.
// This involves array index expansion in case the parent of the node is
// and array. If we expanded here, we should skip our parent as this is
// already encoded in the name
if name == "" {
name = p.document.GetNodeName(node, "_", !expand)
}
// If name expansion is requested, construct a path between the current
// node and the root node of the selection. Concatenate the elements with
// an underscore.
if expand {
expansion = p.document.GetNodePath(node, root, "_")
}
if len(expansion) > 0 {
name = expansion + "_" + name
}
return name
}
func (p *Parser) debugEmptyQuery(operation string, root dataNode, initialquery string) {
if p.Log == nil || (!p.Log.Level().Includes(telegraf.Trace) && !p.Trace) { // for backward compatibility
return
}
query := initialquery
// We already know that the
p.Log.Tracef("got 0 nodes for query %q in %s", query, operation)
for {
parts := splitLastPathElement(query)
if len(parts) < 1 {
return
}
for i := len(parts) - 1; i >= 0; i-- {
q := parts[i]
nodes, err := p.document.QueryAll(root, q)
if err != nil {
p.Log.Tracef("executing query %q in %s failed: %v", q, operation, err)
return
}
p.Log.Tracef("got %d nodes for query %q in %s", len(nodes), q, operation)
if len(nodes) > 0 && nodes[0] != nil {
return
}
query = parts[0]
}
}
}
func init() {
// Register all variants
parsers.Add("xml",
func(defaultMetricName string) telegraf.Parser {
return &Parser{
Format: "xml",
DefaultMetricName: defaultMetricName,
}
},
)
parsers.Add("xpath_cbor",
func(defaultMetricName string) telegraf.Parser {
return &Parser{
Format: "xpath_cbor",
DefaultMetricName: defaultMetricName,
}
},
)
parsers.Add("xpath_json",
func(defaultMetricName string) telegraf.Parser {
return &Parser{
Format: "xpath_json",
DefaultMetricName: defaultMetricName,
}
},
)
parsers.Add("xpath_msgpack",
func(defaultMetricName string) telegraf.Parser {
return &Parser{
Format: "xpath_msgpack",
DefaultMetricName: defaultMetricName,
}
},
)
parsers.Add("xpath_protobuf",
func(defaultMetricName string) telegraf.Parser {
return &Parser{
Format: "xpath_protobuf",
DefaultMetricName: defaultMetricName,
}
},
)
}