1
0
Fork 0
telegraf/plugins/inputs/gnmi/gnmi.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

490 lines
15 KiB
Go

//go:generate ../../../tools/config_includer/generator
//go:generate ../../../tools/readme_config_includer/generator
package gnmi
import (
"context"
_ "embed"
"errors"
"fmt"
"net"
"strings"
"sync"
"time"
"github.com/google/gnxi/utils/xpath"
"github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/gnmi/proto/gnmi_ext"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/choice"
common_tls "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/common/yangmodel"
"github.com/influxdata/telegraf/plugins/inputs"
)
//go:embed sample.conf
var sampleConfig string
// Currently supported GNMI Extensions
var supportedExtensions = []string{"juniper_header"}
// Define the warning to show if we cannot get a metric name.
const emptyNameWarning = `Got empty metric-name for response (field %q), usually
indicating configuration issues as the response cannot be related to any
subscription.Please open an issue on https://github.com/influxdata/telegraf
including your device model and the following response data:
%+v
This message is only printed once.`
type GNMI struct {
Addresses []string `toml:"addresses"`
Subscriptions []subscription `toml:"subscription"`
TagSubscriptions []tagSubscription `toml:"tag_subscription"`
Aliases map[string]string `toml:"aliases"`
Encoding string `toml:"encoding"`
Origin string `toml:"origin"`
Prefix string `toml:"prefix"`
Target string `toml:"target"`
UpdatesOnly bool `toml:"updates_only"`
VendorSpecific []string `toml:"vendor_specific"`
Username config.Secret `toml:"username"`
Password config.Secret `toml:"password"`
Redial config.Duration `toml:"redial"`
MaxMsgSize config.Size `toml:"max_msg_size"`
Depth int32 `toml:"depth"`
Trace bool `toml:"dump_responses"`
CanonicalFieldNames bool `toml:"canonical_field_names"`
TrimFieldNames bool `toml:"trim_field_names"`
PrefixTagKeyWithPath bool `toml:"prefix_tag_key_with_path"`
GuessPathTag bool `toml:"guess_path_tag" deprecated:"1.30.0;1.35.0;use 'path_guessing_strategy' instead"`
GuessPathStrategy string `toml:"path_guessing_strategy"`
EnableTLS bool `toml:"enable_tls" deprecated:"1.27.0;1.35.0;use 'tls_enable' instead"`
KeepaliveTime config.Duration `toml:"keepalive_time"`
KeepaliveTimeout config.Duration `toml:"keepalive_timeout"`
YangModelPaths []string `toml:"yang_model_paths"`
EnforceFirstNamespaceAsOrigin bool `toml:"enforce_first_namespace_as_origin"`
Log telegraf.Logger `toml:"-"`
common_tls.ClientConfig
// Internal state
internalAliases map[*pathInfo]string
decoder *yangmodel.Decoder
cancel context.CancelFunc
wg sync.WaitGroup
}
type subscription struct {
Name string `toml:"name"`
Origin string `toml:"origin"`
Path string `toml:"path"`
SubscriptionMode string `toml:"subscription_mode"`
SampleInterval config.Duration `toml:"sample_interval"`
SuppressRedundant bool `toml:"suppress_redundant"`
HeartbeatInterval config.Duration `toml:"heartbeat_interval"`
TagOnly bool `toml:"tag_only" deprecated:"1.25.0;1.35.0;please use 'tag_subscription's instead"`
fullPath *gnmi.Path
}
type tagSubscription struct {
subscription
Match string `toml:"match"`
Elements []string `toml:"elements"`
}
func (*GNMI) SampleConfig() string {
return sampleConfig
}
func (c *GNMI) Init() error {
// Check options
switch c.Encoding {
case "":
c.Encoding = "proto"
case "proto", "json", "json_ietf", "bytes":
// Do nothing, those are valid
default:
return fmt.Errorf("unsupported encoding %s", c.Encoding)
}
if time.Duration(c.Redial) <= 0 {
return errors.New("redial duration must be positive")
}
// Check vendor_specific options configured by user
if err := choice.CheckSlice(c.VendorSpecific, supportedExtensions); err != nil {
return fmt.Errorf("unsupported vendor_specific option: %w", err)
}
// Check path guessing and handle deprecated option
if c.GuessPathTag {
if c.GuessPathStrategy == "" {
c.GuessPathStrategy = "common path"
}
if c.GuessPathStrategy != "common path" {
return errors.New("conflicting settings between 'guess_path_tag' and 'path_guessing_strategy'")
}
}
switch c.GuessPathStrategy {
case "", "none", "common path", "subscription":
default:
return fmt.Errorf("invalid 'path_guessing_strategy' %q", c.GuessPathStrategy)
}
// Use the new TLS option for enabling
// Honor deprecated option
enable := (c.ClientConfig.Enable != nil && *c.ClientConfig.Enable) || c.EnableTLS
c.ClientConfig.Enable = &enable
// Split the subscriptions into "normal" and "tag" subscription
// and prepare them.
for i := len(c.Subscriptions) - 1; i >= 0; i-- {
subscription := c.Subscriptions[i]
// Check the subscription
if subscription.Name == "" {
return fmt.Errorf("empty 'name' found for subscription %d", i+1)
}
if subscription.Path == "" {
return fmt.Errorf("empty 'path' found for subscription %d", i+1)
}
// Support and convert legacy TagOnly subscriptions
if subscription.TagOnly {
tagSub := tagSubscription{
subscription: subscription,
Match: "name",
}
c.TagSubscriptions = append(c.TagSubscriptions, tagSub)
// Remove from the original subscriptions list
c.Subscriptions = append(c.Subscriptions[:i], c.Subscriptions[i+1:]...)
continue
}
if err := subscription.buildFullPath(c); err != nil {
return err
}
}
for idx := range c.TagSubscriptions {
if err := c.TagSubscriptions[idx].buildFullPath(c); err != nil {
return err
}
if c.TagSubscriptions[idx].TagOnly != c.TagSubscriptions[0].TagOnly {
return errors.New("do not mix legacy tag_only subscriptions and tag subscriptions")
}
switch c.TagSubscriptions[idx].Match {
case "":
if len(c.TagSubscriptions[idx].Elements) > 0 {
c.TagSubscriptions[idx].Match = "elements"
} else {
c.TagSubscriptions[idx].Match = "name"
}
case "unconditional":
case "name":
case "elements":
if len(c.TagSubscriptions[idx].Elements) == 0 {
return errors.New("tag_subscription must have at least one element")
}
default:
return fmt.Errorf("unknown match type %q for tag-subscription %q", c.TagSubscriptions[idx].Match, c.TagSubscriptions[idx].Name)
}
}
// Invert explicit alias list and prefill subscription names
c.internalAliases = make(map[*pathInfo]string, len(c.Subscriptions)+len(c.Aliases)+len(c.TagSubscriptions))
for _, s := range c.Subscriptions {
if err := s.buildAlias(c.internalAliases, c.EnforceFirstNamespaceAsOrigin); err != nil {
return err
}
}
for _, s := range c.TagSubscriptions {
if err := s.buildAlias(c.internalAliases, c.EnforceFirstNamespaceAsOrigin); err != nil {
return err
}
}
for alias, encodingPath := range c.Aliases {
path := newInfoFromString(encodingPath)
if c.EnforceFirstNamespaceAsOrigin {
path.enforceFirstNamespaceAsOrigin()
}
c.internalAliases[path] = alias
}
c.Log.Debugf("Internal alias mapping: %+v", c.internalAliases)
// Warn about configures insecure cipher suites
insecure := common_tls.InsecureCiphers(c.ClientConfig.TLSCipherSuites)
if len(insecure) > 0 {
c.Log.Warnf("Configured insecure cipher suites: %s", strings.Join(insecure, ","))
}
// Check the TLS configuration
if _, err := c.ClientConfig.TLSConfig(); err != nil {
if errors.Is(err, common_tls.ErrCipherUnsupported) {
secure, insecure := common_tls.Ciphers()
c.Log.Info("Supported secure ciphers:")
for _, name := range secure {
c.Log.Infof(" %s", name)
}
c.Log.Info("Supported insecure ciphers:")
for _, name := range insecure {
c.Log.Infof(" %s", name)
}
}
return err
}
// Load the YANG models if specified by the user
if len(c.YangModelPaths) > 0 {
decoder, err := yangmodel.NewDecoder(c.YangModelPaths...)
if err != nil {
return fmt.Errorf("creating YANG model decoder failed: %w", err)
}
c.decoder = decoder
}
return nil
}
func (c *GNMI) Start(acc telegraf.Accumulator) error {
// Validate configuration
request, err := c.newSubscribeRequest()
if err != nil {
return err
}
// Generate TLS config if enabled
tlscfg, err := c.ClientConfig.TLSConfig()
if err != nil {
return err
}
// Prepare the context, optionally with credentials
var ctx context.Context
ctx, c.cancel = context.WithCancel(context.Background())
if !c.Username.Empty() {
usernameSecret, err := c.Username.Get()
if err != nil {
return fmt.Errorf("getting username failed: %w", err)
}
username := usernameSecret.String()
usernameSecret.Destroy()
passwordSecret, err := c.Password.Get()
if err != nil {
return fmt.Errorf("getting password failed: %w", err)
}
password := passwordSecret.String()
passwordSecret.Destroy()
ctx = metadata.AppendToOutgoingContext(ctx, "username", username, "password", password)
}
// Create a goroutine for each device, dial and subscribe
c.wg.Add(len(c.Addresses))
for _, addr := range c.Addresses {
go func(addr string) {
defer c.wg.Done()
host, port, err := net.SplitHostPort(addr)
if err != nil {
acc.AddError(fmt.Errorf("unable to parse address %s: %w", addr, err))
return
}
h := handler{
host: host,
port: port,
aliases: c.internalAliases,
tagsubs: c.TagSubscriptions,
maxMsgSize: int(c.MaxMsgSize),
vendorExt: c.VendorSpecific,
tagStore: newTagStore(c.TagSubscriptions),
trace: c.Trace,
canonicalFieldNames: c.CanonicalFieldNames,
trimSlash: c.TrimFieldNames,
tagPathPrefix: c.PrefixTagKeyWithPath,
guessPathStrategy: c.GuessPathStrategy,
decoder: c.decoder,
enforceFirstNamespaceAsOrigin: c.EnforceFirstNamespaceAsOrigin,
log: c.Log,
ClientParameters: keepalive.ClientParameters{
Time: time.Duration(c.KeepaliveTime),
Timeout: time.Duration(c.KeepaliveTimeout),
PermitWithoutStream: false,
},
}
for ctx.Err() == nil {
if err := h.subscribeGNMI(ctx, acc, tlscfg, request); err != nil && ctx.Err() == nil {
acc.AddError(err)
}
select {
case <-ctx.Done():
case <-time.After(time.Duration(c.Redial)):
}
}
}(addr)
}
return nil
}
func (*GNMI) Gather(telegraf.Accumulator) error {
return nil
}
func (c *GNMI) Stop() {
c.cancel()
c.wg.Wait()
}
func (s *subscription) buildSubscription() (*gnmi.Subscription, error) {
gnmiPath, err := parsePath(s.Origin, s.Path, "")
if err != nil {
return nil, err
}
mode, ok := gnmi.SubscriptionMode_value[strings.ToUpper(s.SubscriptionMode)]
if !ok {
return nil, fmt.Errorf("invalid subscription mode %s", s.SubscriptionMode)
}
return &gnmi.Subscription{
Path: gnmiPath,
Mode: gnmi.SubscriptionMode(mode),
HeartbeatInterval: uint64(time.Duration(s.HeartbeatInterval).Nanoseconds()),
SampleInterval: uint64(time.Duration(s.SampleInterval).Nanoseconds()),
SuppressRedundant: s.SuppressRedundant,
}, nil
}
// Create a new gNMI SubscribeRequest
func (c *GNMI) newSubscribeRequest() (*gnmi.SubscribeRequest, error) {
// Create subscription objects
subscriptions := make([]*gnmi.Subscription, 0, len(c.Subscriptions)+len(c.TagSubscriptions))
for _, subscription := range c.TagSubscriptions {
sub, err := subscription.buildSubscription()
if err != nil {
return nil, err
}
subscriptions = append(subscriptions, sub)
}
for _, subscription := range c.Subscriptions {
sub, err := subscription.buildSubscription()
if err != nil {
return nil, err
}
subscriptions = append(subscriptions, sub)
}
// Construct subscribe request
gnmiPath, err := parsePath(c.Origin, c.Prefix, c.Target)
if err != nil {
return nil, err
}
// Do not provide an empty prefix. Required for Huawei NE40 router v8.21
// (and possibly others). See https://github.com/influxdata/telegraf/issues/12273.
if gnmiPath.Origin == "" && gnmiPath.Target == "" && len(gnmiPath.Elem) == 0 {
gnmiPath = nil
}
if c.Encoding != "proto" && c.Encoding != "json" && c.Encoding != "json_ietf" && c.Encoding != "bytes" {
return nil, fmt.Errorf("unsupported encoding %s", c.Encoding)
}
var extensions []*gnmi_ext.Extension
if c.Depth > 0 {
extensions = []*gnmi_ext.Extension{
{
Ext: &gnmi_ext.Extension_Depth{
Depth: &gnmi_ext.Depth{
Level: uint32(c.Depth),
},
},
},
}
}
return &gnmi.SubscribeRequest{
Request: &gnmi.SubscribeRequest_Subscribe{
Subscribe: &gnmi.SubscriptionList{
Prefix: gnmiPath,
Mode: gnmi.SubscriptionList_STREAM,
Encoding: gnmi.Encoding(gnmi.Encoding_value[strings.ToUpper(c.Encoding)]),
Subscription: subscriptions,
UpdatesOnly: c.UpdatesOnly,
},
},
Extension: extensions,
}, nil
}
// ParsePath from XPath-like string to gNMI path structure
func parsePath(origin, pathToParse, target string) (*gnmi.Path, error) {
gnmiPath, err := xpath.ToGNMIPath(pathToParse)
if err != nil {
return nil, err
}
gnmiPath.Origin = origin
gnmiPath.Target = target
return gnmiPath, err
}
func (s *subscription) buildFullPath(c *GNMI) error {
var err error
if s.fullPath, err = xpath.ToGNMIPath(s.Path); err != nil {
return err
}
s.fullPath.Origin = s.Origin
s.fullPath.Target = c.Target
if c.Prefix != "" {
prefix, err := xpath.ToGNMIPath(c.Prefix)
if err != nil {
return err
}
s.fullPath.Elem = append(prefix.Elem, s.fullPath.Elem...)
if s.Origin == "" && c.Origin != "" {
s.fullPath.Origin = c.Origin
}
}
return nil
}
func (s *subscription) buildAlias(aliases map[*pathInfo]string, enforceFirstNamespaceAsOrigin bool) error {
// Build the subscription path without keys
path, err := parsePath(s.Origin, s.Path, "")
if err != nil {
return err
}
info := newInfoFromPathWithoutKeys(path)
if enforceFirstNamespaceAsOrigin {
info.enforceFirstNamespaceAsOrigin()
}
// If the user didn't provide a measurement name, use last path element
name := s.Name
if name == "" && len(info.segments) > 0 {
name = info.segments[len(info.segments)-1].id
}
if name != "" {
aliases[info] = name
}
return nil
}
func init() {
inputs.Add("gnmi", func() telegraf.Input {
return &GNMI{
Redial: config.Duration(10 * time.Second),
EnforceFirstNamespaceAsOrigin: true,
}
})
// Backwards compatible alias:
inputs.Add("cisco_telemetry_gnmi", func() telegraf.Input {
return &GNMI{
Redial: config.Duration(10 * time.Second),
EnforceFirstNamespaceAsOrigin: true,
}
})
}