1
0
Fork 0
telegraf/plugins/inputs/vsphere/endpoint.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

1462 lines
43 KiB
Go

package vsphere
import (
"context"
"errors"
"fmt"
"math"
"math/rand"
"net/url"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/vmware/govmomi/object"
"github.com/vmware/govmomi/performance"
"github.com/vmware/govmomi/vim25/mo"
"github.com/vmware/govmomi/vim25/types"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
)
var isolateLUN = regexp.MustCompile(`.*/([^/]+)/?$`)
var isIPv4 = regexp.MustCompile(`^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$`)
var isIPv6 = regexp.MustCompile(`^(?:[A-Fa-f0-9]{0,4}:){1,7}[A-Fa-f0-9]{1,4}$`)
const maxSampleConst = 10 // Absolute maximum number of samples regardless of period
const maxMetadataSamples = 100 // Number of resources to sample for metric metadata
const maxRealtimeMetrics = 50000 // Absolute maximum metrics per realtime query
const hwMarkTTL = 4 * time.Hour
type queryChunk []types.PerfQuerySpec
type queryJob func(queryChunk)
// endpoint is a high-level representation of a connected vCenter endpoint. It is backed by the lower level client type.
type endpoint struct {
parent *VSphere
url *url.URL
resourceKinds map[string]*resourceKind
hwMarks *tsCache
lun2ds map[string]string
discoveryTicker *time.Ticker
collectMux sync.RWMutex
initialized bool
clientFactory *clientFactory
busy sync.Mutex
customFields map[int32]string
customAttrFilter filter.Filter
customAttrEnabled bool
metricNameLookup map[int32]string
metricNameMux sync.RWMutex
log telegraf.Logger
apiVersion string
}
type resourceKind struct {
name string
vcName string
pKey string
parentTag string
enabled bool
realTime bool
sampling int32
objects objectMap
filters filter.Filter
paths []string
excludePaths []string
collectInstances bool
getObjects func(context.Context, *endpoint, *resourceFilter) (objectMap, error)
include []string
simple bool
metrics performance.MetricList
parent string
latestSample time.Time
lastColl time.Time
}
type metricEntry struct {
tags map[string]string
name string
ts time.Time
fields map[string]interface{}
}
type objectMap map[string]*objectRef
type objectRef struct {
name string
altID string
ref types.ManagedObjectReference
parentRef *types.ManagedObjectReference // Pointer because it must be nillable
guest string
memorySizeMB int32
memoryReservation int32
dcname string
rpname string
customValues map[string]string
lookup map[string]string
}
func (e *endpoint) getParent(obj *objectRef, res *resourceKind) (*objectRef, bool) {
if pKind, ok := e.resourceKinds[res.parent]; ok {
if p, ok := pKind.objects[obj.parentRef.Value]; ok {
return p, true
}
}
return nil, false
}
// newEndpoint returns a new connection to a vCenter based on the URL and configuration passed
// as parameters.
func newEndpoint(ctx context.Context, parent *VSphere, address *url.URL, log telegraf.Logger) (*endpoint, error) {
e := endpoint{
url: address,
parent: parent,
hwMarks: newTSCache(hwMarkTTL, log),
lun2ds: make(map[string]string),
initialized: false,
clientFactory: newClientFactory(address, parent),
customAttrFilter: newFilterOrPanic(parent.CustomAttributeInclude, parent.CustomAttributeExclude),
customAttrEnabled: anythingEnabled(parent.CustomAttributeExclude),
log: log,
}
e.resourceKinds = map[string]*resourceKind{
"datacenter": {
name: "datacenter",
vcName: "Datacenter",
pKey: "dcname",
parentTag: "",
enabled: anythingEnabled(parent.DatacenterMetricExclude),
realTime: false,
sampling: int32(time.Duration(parent.HistoricalInterval).Seconds()),
objects: make(objectMap),
filters: newFilterOrPanic(parent.DatacenterMetricInclude, parent.DatacenterMetricExclude),
paths: parent.DatacenterInclude,
excludePaths: parent.DatacenterExclude,
simple: isSimple(parent.DatacenterMetricInclude, parent.DatacenterMetricExclude),
include: parent.DatacenterMetricInclude,
collectInstances: parent.DatacenterInstances,
getObjects: getDatacenters,
parent: "",
},
"cluster": {
name: "cluster",
vcName: "ClusterComputeResource",
pKey: "clustername",
parentTag: "dcname",
enabled: anythingEnabled(parent.ClusterMetricExclude),
realTime: false,
sampling: int32(time.Duration(parent.HistoricalInterval).Seconds()),
objects: make(objectMap),
filters: newFilterOrPanic(parent.ClusterMetricInclude, parent.ClusterMetricExclude),
paths: parent.ClusterInclude,
excludePaths: parent.ClusterExclude,
simple: isSimple(parent.ClusterMetricInclude, parent.ClusterMetricExclude),
include: parent.ClusterMetricInclude,
collectInstances: parent.ClusterInstances,
getObjects: getClusters,
parent: "datacenter",
},
"resourcepool": {
name: "resourcepool",
vcName: "ResourcePool",
pKey: "rpname",
parentTag: "clustername",
enabled: anythingEnabled(parent.ResourcePoolMetricExclude),
realTime: false,
sampling: int32(time.Duration(parent.HistoricalInterval).Seconds()),
objects: make(objectMap),
filters: newFilterOrPanic(parent.ResourcePoolMetricInclude, parent.ResourcePoolMetricExclude),
paths: parent.ResourcePoolInclude,
excludePaths: parent.ResourcePoolExclude,
simple: isSimple(parent.ResourcePoolMetricInclude, parent.ResourcePoolMetricExclude),
include: parent.ResourcePoolMetricInclude,
collectInstances: parent.ResourcePoolInstances,
getObjects: getResourcePools,
parent: "cluster",
},
"host": {
name: "host",
vcName: "HostSystem",
pKey: "esxhostname",
parentTag: "clustername",
enabled: anythingEnabled(parent.HostMetricExclude),
realTime: true,
sampling: 20,
objects: make(objectMap),
filters: newFilterOrPanic(parent.HostMetricInclude, parent.HostMetricExclude),
paths: parent.HostInclude,
excludePaths: parent.HostExclude,
simple: isSimple(parent.HostMetricInclude, parent.HostMetricExclude),
include: parent.HostMetricInclude,
collectInstances: parent.HostInstances,
getObjects: getHosts,
parent: "cluster",
},
"vm": {
name: "vm",
vcName: "VirtualMachine",
pKey: "vmname",
parentTag: "esxhostname",
enabled: anythingEnabled(parent.VMMetricExclude),
realTime: true,
sampling: 20,
objects: make(objectMap),
filters: newFilterOrPanic(parent.VMMetricInclude, parent.VMMetricExclude),
paths: parent.VMInclude,
excludePaths: parent.VMExclude,
simple: isSimple(parent.VMMetricInclude, parent.VMMetricExclude),
include: parent.VMMetricInclude,
collectInstances: parent.VMInstances,
getObjects: getVMs,
parent: "host",
},
"datastore": {
name: "datastore",
vcName: "Datastore",
pKey: "dsname",
enabled: anythingEnabled(parent.DatastoreMetricExclude),
realTime: false,
sampling: int32(time.Duration(parent.HistoricalInterval).Seconds()),
objects: make(objectMap),
filters: newFilterOrPanic(parent.DatastoreMetricInclude, parent.DatastoreMetricExclude),
paths: parent.DatastoreInclude,
excludePaths: parent.DatastoreExclude,
simple: isSimple(parent.DatastoreMetricInclude, parent.DatastoreMetricExclude),
include: parent.DatastoreMetricInclude,
collectInstances: parent.DatastoreInstances,
getObjects: getDatastores,
parent: "",
},
"vsan": {
name: "vsan",
vcName: "ClusterComputeResource",
pKey: "clustername",
parentTag: "dcname",
enabled: anythingEnabled(parent.VSANMetricExclude),
realTime: false,
sampling: int32(time.Duration(parent.VSANInterval).Seconds()),
objects: make(objectMap),
filters: newFilterOrPanic(parent.VSANMetricInclude, parent.VSANMetricExclude),
paths: parent.VSANClusterInclude,
simple: parent.VSANMetricSkipVerify,
include: parent.VSANMetricInclude,
collectInstances: false,
getObjects: getClusters,
parent: "datacenter",
},
}
// Start discover and other goodness
err := e.init(ctx)
return &e, err
}
func anythingEnabled(ex []string) bool {
for _, s := range ex {
if s == "*" {
return false
}
}
return true
}
func newFilterOrPanic(include, exclude []string) filter.Filter {
f, err := filter.NewIncludeExcludeFilter(include, exclude)
if err != nil {
panic(fmt.Sprintf("Include/exclude filters are invalid: %v", err))
}
return f
}
func isSimple(include, exclude []string) bool {
if len(exclude) > 0 || len(include) == 0 {
return false
}
for _, s := range include {
if strings.Contains(s, "*") {
return false
}
}
return true
}
func (e *endpoint) startDiscovery(ctx context.Context) {
e.discoveryTicker = time.NewTicker(time.Duration(e.parent.ObjectDiscoveryInterval))
go func() {
for {
select {
case <-e.discoveryTicker.C:
err := e.discover(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
e.log.Errorf("Discovery for %s: %s", e.url.Host, err.Error())
}
case <-ctx.Done():
e.log.Debugf("Exiting discovery goroutine for %s", e.url.Host)
e.discoveryTicker.Stop()
return
}
}
}()
}
func (e *endpoint) initialDiscovery(ctx context.Context) {
err := e.discover(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
e.log.Errorf("Discovery for %s: %s", e.url.Host, err.Error())
}
e.startDiscovery(ctx)
}
func (e *endpoint) init(ctx context.Context) error {
client, err := e.clientFactory.getClient(ctx)
if err != nil {
switch e.parent.DisconnectedServersBehavior {
case "error":
return err
case "ignore":
// Ignore the error and postpone the init until next collection cycle
e.log.Warnf("Error connecting to vCenter on init: %s", err)
return nil
default:
return fmt.Errorf("%q is not a valid value for disconnected_servers_behavior",
e.parent.DisconnectedServersBehavior)
}
}
// Initial load of custom field metadata
if e.customAttrEnabled {
fields, err := client.getCustomFields(ctx)
if err != nil {
e.log.Warn("Could not load custom field metadata")
} else {
e.customFields = fields
}
}
if time.Duration(e.parent.ObjectDiscoveryInterval) > 0 {
e.parent.Log.Debug("Running initial discovery")
e.initialDiscovery(ctx)
}
e.initialized = true
return nil
}
func (e *endpoint) getMetricNameForID(id int32) string {
e.metricNameMux.RLock()
defer e.metricNameMux.RUnlock()
return e.metricNameLookup[id]
}
func (e *endpoint) reloadMetricNameMap(ctx context.Context) error {
e.metricNameMux.Lock()
defer e.metricNameMux.Unlock()
client, err := e.clientFactory.getClient(ctx)
if err != nil {
return err
}
mn, err := client.counterInfoByKey(ctx)
if err != nil {
return err
}
e.metricNameLookup = make(map[int32]string)
for key, m := range mn {
e.metricNameLookup[key] = m.Name()
}
return nil
}
func (e *endpoint) getMetadata(ctx context.Context, obj *objectRef, sampling int32) (performance.MetricList, error) {
client, err := e.clientFactory.getClient(ctx)
if err != nil {
return nil, err
}
ctx1, cancel1 := context.WithTimeout(ctx, time.Duration(e.parent.Timeout))
defer cancel1()
metrics, err := client.perf.AvailableMetric(ctx1, obj.ref.Reference(), sampling)
if err != nil {
return nil, err
}
return metrics, nil
}
func (e *endpoint) getDatacenterName(ctx context.Context, client *client, cache map[string]string, r types.ManagedObjectReference) (string, bool) {
return e.getAncestorName(ctx, client, "Datacenter", cache, r)
}
func (e *endpoint) getAncestorName(
ctx context.Context,
client *client,
resourceType string,
cache map[string]string,
r types.ManagedObjectReference,
) (string, bool) {
path := make([]string, 0)
returnVal := ""
here := r
done := false
for !done {
done = func() bool {
if name, ok := cache[here.Reference().String()]; ok {
// Populate cache for the entire chain of objects leading here.
returnVal = name
return true
}
path = append(path, here.Reference().String())
o := object.NewCommon(client.client.Client, r)
var result mo.ManagedEntity
ctx1, cancel1 := context.WithTimeout(ctx, time.Duration(e.parent.Timeout))
defer cancel1()
err := o.Properties(ctx1, here, []string{"parent", "name"}, &result)
if err != nil {
e.parent.Log.Warnf("Error while resolving parent. Assuming no parent exists. Error: %s", err.Error())
return true
}
if result.Reference().Type == resourceType {
// Populate cache for the entire chain of objects leading here.
returnVal = result.Name
return true
}
if result.Parent == nil {
e.parent.Log.Debugf("No parent found for %s (ascending from %s)", here.Reference(), r.Reference())
return true
}
here = result.Parent.Reference()
return false
}()
}
for _, s := range path {
cache[s] = returnVal
}
return returnVal, returnVal != ""
}
func (e *endpoint) discover(ctx context.Context) error {
e.busy.Lock()
defer e.busy.Unlock()
if ctx.Err() != nil {
return ctx.Err()
}
err := e.reloadMetricNameMap(ctx)
if err != nil {
return err
}
sw := newStopwatch("discover", e.url.Host)
client, err := e.clientFactory.getClient(ctx)
if err != nil {
return err
}
// get the vSphere API version
e.apiVersion = client.client.ServiceContent.About.ApiVersion
e.parent.Log.Debugf("Discover new objects for %s", e.url.Host)
dcNameCache := make(map[string]string)
numRes := int64(0)
// Populate resource objects, and endpoint instance info.
newObjects := make(map[string]objectMap)
for k, res := range e.resourceKinds {
e.log.Debugf("Discovering resources for %s", res.name)
// Need to do this for all resource types even if they are not enabled
if res.enabled || (k != "vm" && k != "vsan") {
rf := resourceFilter{
finder: &finder{client},
resType: res.vcName,
paths: res.paths,
excludePaths: res.excludePaths}
ctx1, cancel1 := context.WithTimeout(ctx, time.Duration(e.parent.Timeout))
objects, err := res.getObjects(ctx1, e, &rf)
cancel1()
if err != nil {
return err
}
// Fill in datacenter names where available (no need to do it for Datacenters)
if res.name != "datacenter" {
for k, obj := range objects {
if obj.parentRef != nil {
obj.dcname, _ = e.getDatacenterName(ctx, client, dcNameCache, *obj.parentRef)
objects[k] = obj
}
}
}
// No need to collect metric metadata if resource type is not enabled
// VSAN is also skipped since vSAN metadata follow it's own format
if res.enabled && k != "vsan" {
if res.simple {
e.simpleMetadataSelect(ctx, client, res)
} else {
e.complexMetadataSelect(ctx, res, objects)
}
}
newObjects[k] = objects
sendInternalCounterWithTags("discovered_objects", e.url.Host, map[string]string{"type": res.name}, int64(len(objects)))
numRes += int64(len(objects))
}
}
// Build lun2ds map
dss := newObjects["datastore"]
l2d := make(map[string]string)
for _, ds := range dss {
lunID := ds.altID
m := isolateLUN.FindStringSubmatch(lunID)
if m != nil {
l2d[m[1]] = ds.name
}
}
// Load custom field metadata
var fields map[int32]string
if e.customAttrEnabled {
fields, err = client.getCustomFields(ctx)
if err != nil {
e.log.Warn("Could not load custom field metadata")
fields = nil
}
}
// Atomically swap maps
e.collectMux.Lock()
defer e.collectMux.Unlock()
for k, v := range newObjects {
e.resourceKinds[k].objects = v
}
e.lun2ds = l2d
if fields != nil {
e.customFields = fields
}
sw.stop()
sendInternalCounterWithTags("discovered_objects", e.url.Host, map[string]string{"type": "instance-total"}, numRes)
return nil
}
func (e *endpoint) simpleMetadataSelect(ctx context.Context, client *client, res *resourceKind) {
e.log.Debugf("Using fast metric metadata selection for %s", res.name)
m, err := client.counterInfoByName(ctx)
if err != nil {
e.log.Errorf("Getting metric metadata. Discovery will be incomplete. Error: %s", err.Error())
return
}
res.metrics = make(performance.MetricList, 0, len(res.include))
for _, s := range res.include {
if pci, ok := m[s]; ok {
cnt := types.PerfMetricId{
CounterId: pci.Key,
}
if res.collectInstances {
cnt.Instance = "*"
} else {
cnt.Instance = ""
}
res.metrics = append(res.metrics, cnt)
} else {
e.log.Warnf("Metric name %s is unknown. Will not be collected", s)
}
}
}
func (e *endpoint) complexMetadataSelect(ctx context.Context, res *resourceKind, objects objectMap) {
// We're only going to get metadata from maxMetadataSamples resources. If we have
// more resources than that, we pick maxMetadataSamples samples at random.
sampledObjects := make([]*objectRef, 0, len(objects))
for _, obj := range objects {
sampledObjects = append(sampledObjects, obj)
}
n := len(sampledObjects)
if n > maxMetadataSamples {
// Shuffle samples into the maxMetadataSamples positions
for i := 0; i < maxMetadataSamples; i++ {
j := int(rand.Int31n(int32(i + 1))) //nolint:gosec // G404: not security critical
t := sampledObjects[i]
sampledObjects[i] = sampledObjects[j]
sampledObjects[j] = t
}
sampledObjects = sampledObjects[0:maxMetadataSamples]
}
instInfoMux := sync.Mutex{}
te := newThrottledExecutor(e.parent.DiscoverConcurrency)
for _, obj := range sampledObjects {
func(obj *objectRef) {
te.run(ctx, func() {
metrics, err := e.getMetadata(ctx, obj, res.sampling)
if err != nil {
e.log.Errorf("Getting metric metadata. Discovery will be incomplete. Error: %s", err.Error())
}
mMap := make(map[string]types.PerfMetricId)
for _, m := range metrics {
if m.Instance != "" && res.collectInstances {
m.Instance = "*"
} else {
m.Instance = ""
}
if res.filters.Match(e.getMetricNameForID(m.CounterId)) {
mMap[strconv.Itoa(int(m.CounterId))+"|"+m.Instance] = m
}
}
e.log.Debugf("Found %d metrics for %s", len(mMap), obj.name)
instInfoMux.Lock()
defer instInfoMux.Unlock()
if len(mMap) > len(res.metrics) {
res.metrics = make(performance.MetricList, len(mMap))
i := 0
for _, m := range mMap {
res.metrics[i] = m
i++
}
}
})
}(obj)
}
te.wait()
}
func getDatacenters(ctx context.Context, e *endpoint, resourceFilter *resourceFilter) (objectMap, error) {
var resources []mo.Datacenter
ctx1, cancel1 := context.WithTimeout(ctx, time.Duration(e.parent.Timeout))
defer cancel1()
err := resourceFilter.findAll(ctx1, &resources)
if err != nil {
return nil, err
}
m := make(objectMap, len(resources))
for i := range resources {
r := &resources[i]
m[r.ExtensibleManagedObject.Reference().Value] = &objectRef{
name: r.Name,
ref: r.ExtensibleManagedObject.Reference(),
parentRef: r.Parent,
dcname: r.Name,
customValues: e.loadCustomAttributes(r.ManagedEntity),
}
}
return m, nil
}
func getClusters(ctx context.Context, e *endpoint, resourceFilter *resourceFilter) (objectMap, error) {
var resources []mo.ClusterComputeResource
ctx1, cancel1 := context.WithTimeout(ctx, time.Duration(e.parent.Timeout))
defer cancel1()
err := resourceFilter.findAll(ctx1, &resources)
if err != nil {
return nil, err
}
cache := make(map[string]*types.ManagedObjectReference)
m := make(objectMap, len(resources))
for i := range resources {
r := &resources[i]
// Wrap in a function to make defer work correctly.
err := func() error {
// We're not interested in the immediate parent (a folder), but the data center.
p, ok := cache[r.Parent.Value]
if !ok {
ctx2, cancel2 := context.WithTimeout(ctx, time.Duration(e.parent.Timeout))
defer cancel2()
client, err := e.clientFactory.getClient(ctx2)
if err != nil {
return err
}
o := object.NewFolder(client.client.Client, *r.Parent)
var folder mo.Folder
ctx3, cancel3 := context.WithTimeout(ctx, time.Duration(e.parent.Timeout))
defer cancel3()
err = o.Properties(ctx3, *r.Parent, []string{"parent"}, &folder)
if err != nil {
e.parent.Log.Warnf("Error while getting folder parent: %s", err.Error())
p = nil
} else {
pp := folder.Parent.Reference()
p = &pp
cache[r.Parent.Value] = p
}
}
m[r.ExtensibleManagedObject.Reference().Value] = &objectRef{
name: r.Name,
ref: r.ExtensibleManagedObject.Reference(),
parentRef: p,
customValues: e.loadCustomAttributes(r.ManagedEntity),
}
return nil
}()
if err != nil {
return nil, err
}
}
return m, nil
}
// noinspection GoUnusedParameter
func getResourcePools(ctx context.Context, e *endpoint, resourceFilter *resourceFilter) (objectMap, error) {
var resources []mo.ResourcePool
err := resourceFilter.findAll(ctx, &resources)
if err != nil {
return nil, err
}
m := make(objectMap)
for i := range resources {
r := &resources[i]
m[r.ExtensibleManagedObject.Reference().Value] = &objectRef{
name: r.Name,
ref: r.ExtensibleManagedObject.Reference(),
parentRef: r.Parent,
customValues: e.loadCustomAttributes(r.ManagedEntity),
}
}
return m, nil
}
func getResourcePoolName(rp types.ManagedObjectReference, rps objectMap) string {
// Loop through the Resource Pools objectmap to find the corresponding one
for _, r := range rps {
if r.ref == rp {
return r.name
}
}
return "Resources" // Default value
}
// noinspection GoUnusedParameter
func getHosts(ctx context.Context, e *endpoint, resourceFilter *resourceFilter) (objectMap, error) {
var resources []mo.HostSystem
err := resourceFilter.findAll(ctx, &resources)
if err != nil {
return nil, err
}
m := make(objectMap)
for i := range resources {
r := &resources[i]
m[r.ExtensibleManagedObject.Reference().Value] = &objectRef{
name: r.Name,
ref: r.ExtensibleManagedObject.Reference(),
parentRef: r.Parent,
customValues: e.loadCustomAttributes(r.ManagedEntity),
}
}
return m, nil
}
func getVMs(ctx context.Context, e *endpoint, rf *resourceFilter) (objectMap, error) {
var resources []mo.VirtualMachine
ctx1, cancel1 := context.WithTimeout(ctx, time.Duration(e.parent.Timeout))
defer cancel1()
err := rf.findAll(ctx1, &resources)
if err != nil {
return nil, err
}
m := make(objectMap)
client, err := e.clientFactory.getClient(ctx)
if err != nil {
return nil, err
}
// Create a ResourcePool Filter and get the list of Resource Pools
rprf := resourceFilter{
finder: &finder{client},
resType: "ResourcePool",
paths: []string{"/*/host/**"},
excludePaths: nil}
resourcePools, err := getResourcePools(ctx, e, &rprf)
if err != nil {
return nil, err
}
for i := range resources {
r := &resources[i]
if r.Runtime.PowerState != "poweredOn" {
continue
}
guest := "unknown"
uuid := ""
lookup := make(map[string]string)
// get the name of the VM resource pool
rpname := getResourcePoolName(*r.ResourcePool, resourcePools)
// Extract host name
if r.Guest != nil && r.Guest.HostName != "" {
lookup["guesthostname"] = r.Guest.HostName
}
// Collect network information
for _, net := range r.Guest.Net {
if net.DeviceConfigId == -1 {
continue
}
if net.IpConfig == nil || net.IpConfig.IpAddress == nil {
continue
}
ips := make(map[string][]string)
for _, ip := range net.IpConfig.IpAddress {
addr := ip.IpAddress
for _, ipType := range e.parent.IPAddresses {
if (ipType != "ipv4" || !isIPv4.MatchString(addr)) &&
(ipType != "ipv6" || !isIPv6.MatchString(addr)) {
continue
}
// By convention, we want the preferred addresses to appear first in the array.
if _, ok := ips[ipType]; !ok {
ips[ipType] = make([]string, 0)
}
if ip.State == "preferred" {
ips[ipType] = append([]string{addr}, ips[ipType]...)
} else {
ips[ipType] = append(ips[ipType], addr)
}
}
}
for ipType, ipList := range ips {
lookup["nic/"+strconv.Itoa(int(net.DeviceConfigId))+"/"+ipType] = strings.Join(ipList, ",")
}
}
// Sometimes Config is unknown and returns a nil pointer
if r.Config != nil {
guest = cleanGuestID(r.Config.GuestId)
if r.Guest.GuestId != "" {
guest = cleanGuestID(r.Guest.GuestId)
}
uuid = r.Config.Uuid
}
cvs := make(map[string]string)
if e.customAttrEnabled {
for _, cv := range r.Summary.CustomValue {
val := cv.(*types.CustomFieldStringValue)
if val.Value == "" {
continue
}
key, ok := e.customFields[val.Key]
if !ok {
e.log.Warnf("Metadata for custom field %d not found. Skipping", val.Key)
continue
}
if e.customAttrFilter.Match(key) {
cvs[key] = val.Value
}
}
}
m[r.ExtensibleManagedObject.Reference().Value] = &objectRef{
name: r.Name,
ref: r.ExtensibleManagedObject.Reference(),
parentRef: r.Runtime.Host,
guest: guest,
memorySizeMB: r.Summary.Config.MemorySizeMB,
memoryReservation: r.Summary.Config.MemoryReservation,
altID: uuid,
rpname: rpname,
customValues: e.loadCustomAttributes(r.ManagedEntity),
lookup: lookup,
}
}
return m, nil
}
func getDatastores(ctx context.Context, e *endpoint, resourceFilter *resourceFilter) (objectMap, error) {
var resources []mo.Datastore
ctx1, cancel1 := context.WithTimeout(ctx, time.Duration(e.parent.Timeout))
defer cancel1()
err := resourceFilter.findAll(ctx1, &resources)
if err != nil {
return nil, err
}
m := make(objectMap)
for i := range resources {
r := &resources[i]
lunID := ""
if r.Info != nil {
info := r.Info.GetDatastoreInfo()
if info != nil {
lunID = info.Url
}
}
m[r.ExtensibleManagedObject.Reference().Value] = &objectRef{
name: r.Name,
ref: r.ExtensibleManagedObject.Reference(),
parentRef: r.Parent,
altID: lunID,
customValues: e.loadCustomAttributes(r.ManagedEntity),
}
}
return m, nil
}
func (e *endpoint) loadCustomAttributes(entity mo.ManagedEntity) map[string]string {
if !e.customAttrEnabled {
return make(map[string]string)
}
cvs := make(map[string]string)
for _, v := range entity.CustomValue {
cv, ok := v.(*types.CustomFieldStringValue)
if !ok {
e.parent.Log.Warnf("Metadata for custom field %d not of string type. Skipping", cv.Key)
continue
}
key, ok := e.customFields[cv.Key]
if !ok {
e.parent.Log.Warnf("Metadata for custom field %d not found. Skipping", cv.Key)
continue
}
if e.customAttrFilter.Match(key) {
cvs[key] = cv.Value
}
}
return cvs
}
// close shuts down an endpoint and releases any resources associated with it.
func (e *endpoint) close() {
e.clientFactory.close()
}
// collect runs a round of data collections as specified in the configuration.
func (e *endpoint) collect(ctx context.Context, acc telegraf.Accumulator) error {
// Connection could have failed on init, so we need to check for a deferred
// init request.
if !e.initialized {
e.log.Debug("Performing deferred init")
err := e.init(ctx)
if err != nil {
return err
}
}
// If we never managed to do a discovery, collection will be a no-op. Therefore,
// we need to check that a connection is available, or the collection will
// silently fail.
if _, err := e.clientFactory.getClient(ctx); err != nil {
return err
}
e.collectMux.RLock()
defer e.collectMux.RUnlock()
if ctx.Err() != nil {
return ctx.Err()
}
// If discovery interval is disabled (0), discover on each collection cycle
if time.Duration(e.parent.ObjectDiscoveryInterval) == 0 {
err := e.discover(ctx)
if err != nil {
return err
}
}
var wg sync.WaitGroup
for k, res := range e.resourceKinds {
if res.enabled {
wg.Add(1)
go func(k string) {
defer wg.Done()
var err error
if k == "vsan" {
err = e.collectVsan(ctx, acc)
} else {
err = e.collectResource(ctx, k, acc)
}
if err != nil {
acc.AddError(err)
}
}(k)
}
}
wg.Wait()
// purge old timestamps from the cache
e.hwMarks.purge()
return nil
}
// Workaround to make sure pqs is a copy of the loop variable and won't change.
func submitChunkJob(ctx context.Context, te *throttledExecutor, job queryJob, pqs queryChunk) {
te.run(ctx, func() {
job(pqs)
})
}
func (e *endpoint) chunkify(ctx context.Context, res *resourceKind, now, latest time.Time, job queryJob) {
te := newThrottledExecutor(e.parent.CollectConcurrency)
maxMetrics := e.parent.MaxQueryMetrics
if maxMetrics < 1 {
maxMetrics = 1
}
// Workaround for vCenter weirdness. Cluster metrics seem to count multiple times
// when checking query size, so keep it at a low value.
// Revisit this when we better understand the reason why vCenter counts it this way!
if res.name == "cluster" && maxMetrics > 10 {
maxMetrics = 10
}
pqs := make(queryChunk, 0, e.parent.MaxQueryObjects)
numQs := 0
for _, obj := range res.objects {
timeBuckets := make(map[int64]*types.PerfQuerySpec)
for metricIdx, metric := range res.metrics {
// Determine time of last successful collection
metricName := e.getMetricNameForID(metric.CounterId)
if metricName == "" {
e.log.Debugf("Unable to find metric name for id %d. Skipping!", metric.CounterId)
continue
}
start, ok := e.hwMarks.get(obj.ref.Value, metricName)
if !ok {
start = latest.Add(time.Duration(-res.sampling) * time.Second * (time.Duration(e.parent.MetricLookback) - 1))
}
if !start.Truncate(time.Second).Before(now.Truncate(time.Second)) {
// this happens when the hiwater mark was estimated using a larger estInterval than is currently used
// the estInterval is reset to 1m in case of some errors
// there are no new metrics to be expected here and querying this gets us an error, so: skip!
e.log.Debugf("Start >= end (rounded to seconds): %s > %s. Skipping!", start, now)
continue
}
// Create bucket if we don't already have it
bucket, ok := timeBuckets[start.Unix()]
if !ok {
bucket = &types.PerfQuerySpec{
Entity: obj.ref,
MaxSample: maxSampleConst,
MetricId: make([]types.PerfMetricId, 0),
IntervalId: res.sampling,
Format: "normal",
}
bucket.StartTime = &start
bucket.EndTime = &now
timeBuckets[start.Unix()] = bucket
}
// Add this metric to the bucket
bucket.MetricId = append(bucket.MetricId, metric)
// Bucket filled to capacity?
// OR if we're past the absolute maximum limit
if (!res.realTime && len(bucket.MetricId) >= maxMetrics) || len(bucket.MetricId) > maxRealtimeMetrics {
e.log.Debugf("Submitting partial query: %d metrics (%d remaining) of type %s for %s. Total objects %d",
len(bucket.MetricId), len(res.metrics)-metricIdx, res.name, e.url.Host, len(res.objects))
// Don't send work items if the context has been cancelled.
if errors.Is(ctx.Err(), context.Canceled) {
return
}
// run collection job
delete(timeBuckets, start.Unix())
submitChunkJob(ctx, te, job, queryChunk{*bucket})
}
}
// Handle data in time bucket and submit job if we've reached the maximum number of object.
for _, bucket := range timeBuckets {
pqs = append(pqs, *bucket)
numQs += len(bucket.MetricId)
if (!res.realTime && numQs > e.parent.MaxQueryObjects) || numQs > maxRealtimeMetrics {
e.log.Debugf("Submitting final bucket job for %s: %d metrics", res.name, numQs)
submitChunkJob(ctx, te, job, pqs)
pqs = make(queryChunk, 0, e.parent.MaxQueryObjects)
numQs = 0
}
}
}
// Submit any jobs left in the queue
if len(pqs) > 0 {
e.log.Debugf("Submitting job for %s: %d objects, %d metrics", res.name, len(pqs), numQs)
submitChunkJob(ctx, te, job, pqs)
}
// Wait for background collection to finish
te.wait()
}
func (e *endpoint) collectResource(ctx context.Context, resourceType string, acc telegraf.Accumulator) error {
res := e.resourceKinds[resourceType]
client, err := e.clientFactory.getClient(ctx)
if err != nil {
return err
}
now, err := client.getServerTime(ctx)
if err != nil {
return err
}
// Estimate the interval at which we're invoked. Use local time (not server time)
// since this is about how we got invoked locally.
localNow := time.Now()
estInterval := time.Minute
if !res.lastColl.IsZero() {
s := time.Duration(res.sampling) * time.Second
rawInterval := localNow.Sub(res.lastColl)
paddedInterval := rawInterval + time.Duration(res.sampling/2)*time.Second
estInterval = paddedInterval.Truncate(s)
if estInterval < s {
estInterval = s
}
e.log.Debugf("Raw interval %s, padded: %s, estimated: %s", rawInterval, paddedInterval, estInterval)
}
e.log.Debugf("Interval estimated to %s", estInterval)
res.lastColl = localNow
latest := res.latestSample
if !latest.IsZero() {
elapsed := now.Sub(latest).Seconds() + 5.0 // Allow 5 second jitter.
e.log.Debugf("Latest: %s, elapsed: %f, resource: %s", latest, elapsed, resourceType)
if !res.realTime && elapsed < float64(res.sampling) {
// No new data would be available. We're outta here!
e.log.Debugf("Sampling period for %s of %d has not elapsed on %s",
resourceType, res.sampling, e.url.Host)
return nil
}
} else {
latest = now.Add(time.Duration(-res.sampling) * time.Second)
}
internalTags := map[string]string{"resourcetype": resourceType}
sw := newStopwatchWithTags("gather_duration", e.url.Host, internalTags)
e.log.Debugf("Collecting metrics for %d objects of type %s for %s",
len(res.objects), resourceType, e.url.Host)
count := int64(0)
var tsMux sync.Mutex
latestSample := time.Time{}
// Divide workload into chunks and process them concurrently
e.chunkify(ctx, res, now, latest,
func(chunk queryChunk) {
n, localLatest, err := e.collectChunk(ctx, chunk, res, acc, estInterval)
e.log.Debugf("CollectChunk for %s returned %d metrics", resourceType, n)
if err != nil {
acc.AddError(errors.New("while collecting " + res.name + " for vcenter " + e.url.Host + ": " + err.Error()))
return
}
e.parent.Log.Debugf("CollectChunk for %s returned %d metrics", resourceType, n)
atomic.AddInt64(&count, int64(n))
tsMux.Lock()
defer tsMux.Unlock()
if localLatest.After(latestSample) && !localLatest.IsZero() {
latestSample = localLatest
}
})
e.log.Debugf("Latest sample for %s set to %s", resourceType, latestSample)
if !latestSample.IsZero() {
res.latestSample = latestSample
}
sw.stop()
sendInternalCounterWithTags("gather_count", e.url.Host, internalTags, count)
return nil
}
func (e *endpoint) alignSamples(info []types.PerfSampleInfo, values []int64, interval time.Duration) ([]types.PerfSampleInfo, []float64) {
rInfo := make([]types.PerfSampleInfo, 0, len(info))
rValues := make([]float64, 0, len(values))
bi := 1.0
var lastBucket time.Time
for idx := range info {
// According to the docs, SampleInfo and Value should have the same length, but we've seen corrupted
// data coming back with missing values. Take care of that gracefully!
if idx >= len(values) {
e.log.Debugf("len(SampleInfo)>len(Value) %d > %d during alignment", len(info), len(values))
break
}
v := float64(values[idx])
if v < 0 {
continue
}
ts := info[idx].Timestamp
roundedTS := ts.Truncate(interval)
// Are we still working on the same bucket?
if roundedTS == lastBucket {
bi++
p := len(rValues) - 1
rValues[p] = ((bi-1)/bi)*rValues[p] + v/bi
} else {
rValues = append(rValues, v)
roundedInfo := types.PerfSampleInfo{
Timestamp: roundedTS,
Interval: info[idx].Interval,
}
rInfo = append(rInfo, roundedInfo)
bi = 1.0
lastBucket = roundedTS
}
}
return rInfo, rValues
}
func (e *endpoint) collectChunk(
ctx context.Context,
pqs queryChunk,
res *resourceKind,
acc telegraf.Accumulator,
interval time.Duration,
) (int, time.Time, error) {
e.log.Debugf("Query for %s has %d QuerySpecs", res.name, len(pqs))
latestSample := time.Time{}
count := 0
resourceType := res.name
prefix := "vsphere" + e.parent.Separator + resourceType
client, err := e.clientFactory.getClient(ctx)
if err != nil {
return count, latestSample, err
}
metricInfo, err := client.counterInfoByName(ctx)
if err != nil {
return count, latestSample, err
}
ems, err := client.queryMetrics(ctx, pqs)
if err != nil {
return count, latestSample, err
}
e.log.Debugf("Query for %s returned metrics for %d objects", resourceType, len(ems))
// Iterate through results
for _, em := range ems {
moid := em.Entity.Reference().Value
instInfo, found := res.objects[moid]
if !found {
e.log.Errorf("MOID %s not found in cache. Skipping! (This should not happen!)", moid)
continue
}
buckets := make(map[string]metricEntry)
for _, v := range em.Value {
name := v.Name
t := map[string]string{
"vcenter": e.url.Host,
"source": instInfo.name,
"moid": moid,
}
// Populate tags
objectRef, ok := res.objects[moid]
if !ok {
e.log.Errorf("MOID %s not found in cache. Skipping", moid)
continue
}
e.populateTags(objectRef, resourceType, res, t, v)
nValues := 0
alignedInfo, alignedValues := e.alignSamples(em.SampleInfo, v.Value, interval)
globalFields := e.populateGlobalFields(objectRef, resourceType, prefix)
for idx, sample := range alignedInfo {
// According to the docs, SampleInfo and Value should have the same length, but we've seen corrupted
// data coming back with missing values. Take care of that gracefully!
if idx >= len(alignedValues) {
e.log.Debugf("Len(SampleInfo)>len(Value) %d > %d", len(alignedInfo), len(alignedValues))
break
}
ts := sample.Timestamp
if ts.After(latestSample) {
latestSample = ts
}
nValues++
// Organize the metrics into a bucket per measurement.
mn, fn := e.makeMetricIdentifier(prefix, name)
bKey := mn + " " + v.Instance + " " + strconv.FormatInt(ts.UnixNano(), 10)
bucket, found := buckets[bKey]
if !found {
fields := make(map[string]interface{})
for k, v := range globalFields {
fields[k] = v
}
bucket = metricEntry{name: mn, ts: ts, fields: fields, tags: t}
buckets[bKey] = bucket
}
// Percentage values must be scaled down by 100.
info, ok := metricInfo[name]
if !ok {
e.log.Errorf("Could not determine unit for %s. Skipping", name)
}
v := alignedValues[idx]
if info.UnitInfo.GetElementDescription().Key == "percent" {
bucket.fields[fn] = v / 100.0
} else {
if e.parent.UseIntSamples {
bucket.fields[fn] = int64(round(v))
} else {
bucket.fields[fn] = v
}
}
count++
// Update hiwater marks
adjTS := ts.Add(interval).Truncate(interval).Add(-time.Second)
e.hwMarks.put(moid, name, adjTS)
}
if nValues == 0 {
e.log.Debugf("Missing value for: %s, %s", name, objectRef.name)
continue
}
}
// We've iterated through all the metrics and collected buckets for each
// measurement name. Now emit them!
for _, bucket := range buckets {
acc.AddFields(bucket.name, bucket.fields, bucket.tags, bucket.ts)
}
}
return count, latestSample, nil
}
func (e *endpoint) populateTags(objectRef *objectRef, resourceType string, resource *resourceKind, t map[string]string, v performance.MetricSeries) {
// Map name of object.
if resource.pKey != "" {
t[resource.pKey] = objectRef.name
}
if resourceType == "vm" && objectRef.altID != "" {
t["uuid"] = objectRef.altID
}
if resourceType == "vm" && objectRef.rpname != "" {
t["rpname"] = objectRef.rpname
}
// Map parent reference
parent, found := e.getParent(objectRef, resource)
if found {
t[resource.parentTag] = parent.name
if resourceType == "vm" {
if objectRef.guest != "" {
t["guest"] = objectRef.guest
}
if gh := objectRef.lookup["guesthostname"]; gh != "" {
t["guesthostname"] = gh
}
if c, ok := e.resourceKinds["cluster"].objects[parent.parentRef.Value]; ok {
t["clustername"] = c.name
}
}
}
// Fill in Datacenter name
if objectRef.dcname != "" {
t["dcname"] = objectRef.dcname
}
// Determine which point tag to map to the instance
name := v.Name
instance := "instance-total"
if v.Instance != "" {
instance = v.Instance
}
if strings.HasPrefix(name, "cpu.") {
t["cpu"] = instance
} else if strings.HasPrefix(name, "datastore.") {
t["lun"] = instance
if ds, ok := e.lun2ds[instance]; ok {
t["dsname"] = ds
} else {
t["dsname"] = instance
}
} else if strings.HasPrefix(name, "disk.") {
t["disk"] = cleanDiskTag(instance)
} else if strings.HasPrefix(name, "net.") {
t["interface"] = instance
// Add IP addresses to NIC data.
if resourceType == "vm" && objectRef.lookup != nil {
key := "nic/" + t["interface"] + "/"
if ip, ok := objectRef.lookup[key+"ipv6"]; ok {
t["ipv6"] = ip
}
if ip, ok := objectRef.lookup[key+"ipv4"]; ok {
t["ipv4"] = ip
}
}
} else if strings.HasPrefix(name, "storageAdapter.") {
t["adapter"] = instance
} else if strings.HasPrefix(name, "storagePath.") {
t["path"] = instance
} else if strings.HasPrefix(name, "sys.resource") {
t["resource"] = instance
} else if strings.HasPrefix(name, "vflashModule.") {
t["module"] = instance
} else if strings.HasPrefix(name, "virtualDisk.") {
t["disk"] = instance
} else if v.Instance != "" {
// default
t["instance"] = v.Instance
}
// Fill in custom values if they exist
for k, v := range objectRef.customValues {
if v != "" {
t[k] = v
}
}
}
func (e *endpoint) populateGlobalFields(objectRef *objectRef, resourceType, prefix string) map[string]interface{} {
globalFields := make(map[string]interface{})
if resourceType == "vm" && objectRef.memorySizeMB != 0 {
_, fieldName := e.makeMetricIdentifier(prefix, "memorySizeMB")
globalFields[fieldName] = strconv.Itoa(int(objectRef.memorySizeMB))
}
if resourceType == "vm" && objectRef.memoryReservation != 0 {
_, fieldName := e.makeMetricIdentifier(prefix, "memoryReservation")
globalFields[fieldName] = strconv.Itoa(int(objectRef.memoryReservation))
}
return globalFields
}
func (e *endpoint) makeMetricIdentifier(prefix, metric string) (metricName, fieldName string) {
parts := strings.Split(metric, ".")
if len(parts) == 1 {
return prefix, parts[0]
}
return prefix + e.parent.Separator + parts[0], strings.Join(parts[1:], e.parent.Separator)
}
func cleanGuestID(id string) string {
return strings.TrimSuffix(id, "Guest")
}
func cleanDiskTag(disk string) string {
// Remove enclosing "<>"
return strings.TrimSuffix(strings.TrimPrefix(disk, "<"), ">")
}
func round(x float64) float64 {
t := math.Trunc(x)
if math.Abs(x-t) >= 0.5 {
return t + math.Copysign(1, x)
}
return t
}