1
0
Fork 0
telegraf/plugins/inputs/libvirt/libvirt.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

266 lines
6.9 KiB
Go

//go:generate ../../../tools/readme_config_includer/generator
package libvirt
import (
_ "embed"
"errors"
"fmt"
"sync"
golibvirt "github.com/digitalocean/go-libvirt"
libvirtutils "github.com/thomasklein94/packer-plugin-libvirt/libvirt-utils"
"golang.org/x/sync/errgroup"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
//go:embed sample.conf
var sampleConfig string
const (
domainStatsState uint32 = 1
domainStatsCPUTotal uint32 = 2
domainStatsBalloon uint32 = 4
domainStatsVCPU uint32 = 8
domainStatsInterface uint32 = 16
domainStatsBlock uint32 = 32
domainStatsPerf uint32 = 64
domainStatsIothread uint32 = 128
domainStatsMemory uint32 = 256
domainStatsDirtyrate uint32 = 512
domainStatsAll uint32 = 1023
defaultLibvirtURI = "qemu:///system"
pluginName = "libvirt"
)
type Libvirt struct {
LibvirtURI string `toml:"libvirt_uri"`
Domains []string `toml:"domains"`
StatisticsGroups []string `toml:"statistics_groups"`
AdditionalStatistics []string `toml:"additional_statistics"`
Log telegraf.Logger `toml:"-"`
utils utils
metricNumber uint32
vcpuMappingEnabled bool
domainsMap map[string]struct{}
}
func (*Libvirt) SampleConfig() string {
return sampleConfig
}
func (l *Libvirt) Init() error {
if len(l.Domains) == 0 {
l.Log.Debugf("No domains given. Collecting metrics from all available domains.")
}
l.domainsMap = make(map[string]struct{}, len(l.Domains))
for _, domain := range l.Domains {
l.domainsMap[domain] = struct{}{}
}
if l.LibvirtURI == "" {
l.Log.Debugf("Using default libvirt url - %q", defaultLibvirtURI)
l.LibvirtURI = defaultLibvirtURI
}
if err := l.validateLibvirtURI(); err != nil {
return err
}
// setting to defaults only when statistics_groups is missing in config
if l.StatisticsGroups == nil {
l.Log.Debugf("Setting libvirt to gather all metrics.")
l.metricNumber = domainStatsAll
} else {
if err := l.calculateMetricNumber(); err != nil {
return err
}
}
if err := l.validateAdditionalStatistics(); err != nil {
return err
}
if !l.isThereAnythingToGather() {
return errors.New("all configuration options are empty or invalid. Did not find anything to gather")
}
return nil
}
func (l *Libvirt) Gather(acc telegraf.Accumulator) error {
var err error
if err := l.utils.ensureConnected(l.LibvirtURI); err != nil {
return err
}
// Get all available domains
gatheredDomains, err := l.utils.gatherAllDomains()
if handledErr := handleError(err, "error occurred while gathering all domains", l.utils); handledErr != nil {
return handledErr
} else if len(gatheredDomains) == 0 {
l.Log.Debug("Couldn't find any domains on system")
return nil
}
// Exclude domain.
domains := l.filterDomains(gatheredDomains)
if len(domains) == 0 {
l.Log.Debug("Configured domains are not available on system")
return nil
}
var vcpuInfos map[string][]vcpuAffinity
if l.vcpuMappingEnabled {
vcpuInfos, err = l.getVcpuMapping(domains)
if handledErr := handleError(err, "error occurred while gathering vcpu mapping", l.utils); handledErr != nil {
return handledErr
}
}
err = l.gatherMetrics(domains, vcpuInfos, acc)
return handleError(err, "error occurred while gathering metrics", l.utils)
}
func (l *Libvirt) validateLibvirtURI() error {
uri := libvirtutils.LibvirtUri{}
err := uri.Unmarshal(l.LibvirtURI)
if err != nil {
return err
}
// dialer not needed, calling this just for validating libvirt URI as soon as possible:
_, err = libvirtutils.NewDialerFromLibvirtUri(uri)
return err
}
func (l *Libvirt) calculateMetricNumber() error {
var libvirtMetricNumber = map[string]uint32{
"state": domainStatsState,
"cpu_total": domainStatsCPUTotal,
"balloon": domainStatsBalloon,
"vcpu": domainStatsVCPU,
"interface": domainStatsInterface,
"block": domainStatsBlock,
"perf": domainStatsPerf,
"iothread": domainStatsIothread,
"memory": domainStatsMemory,
"dirtyrate": domainStatsDirtyrate}
metricIsSet := make(map[string]bool)
for _, metricName := range l.StatisticsGroups {
metricNumber, exists := libvirtMetricNumber[metricName]
if !exists {
return fmt.Errorf("unrecognized metrics name %q", metricName)
}
if _, ok := metricIsSet[metricName]; ok {
return fmt.Errorf("duplicated statistics group in config: %q", metricName)
}
l.metricNumber += metricNumber
metricIsSet[metricName] = true
}
return nil
}
func (l *Libvirt) validateAdditionalStatistics() error {
for _, stat := range l.AdditionalStatistics {
switch stat {
case "vcpu_mapping":
if l.vcpuMappingEnabled {
return fmt.Errorf("duplicated additional statistic in config: %q", stat)
}
l.vcpuMappingEnabled = true
default:
return fmt.Errorf("additional statistics: %v is not supported by this plugin", stat)
}
}
return nil
}
func (l *Libvirt) isThereAnythingToGather() bool {
return l.metricNumber > 0 || len(l.AdditionalStatistics) > 0
}
func handleError(err error, errMessage string, utils utils) error {
if err != nil {
if chanErr := utils.disconnect(); chanErr != nil {
return fmt.Errorf("%s: %w; error occurred when disconnecting: %w", errMessage, err, chanErr)
}
return fmt.Errorf("%s: %w", errMessage, err)
}
return nil
}
func (l *Libvirt) filterDomains(availableDomains []golibvirt.Domain) []golibvirt.Domain {
if len(l.domainsMap) == 0 {
return availableDomains
}
var filteredDomains []golibvirt.Domain
for _, domain := range availableDomains {
if _, ok := l.domainsMap[domain.Name]; ok {
filteredDomains = append(filteredDomains, domain)
}
}
return filteredDomains
}
func (l *Libvirt) gatherMetrics(domains []golibvirt.Domain, vcpuInfos map[string][]vcpuAffinity, acc telegraf.Accumulator) error {
stats, err := l.utils.gatherStatsForDomains(domains, l.metricNumber)
if err != nil {
return err
}
l.addMetrics(stats, vcpuInfos, acc)
return nil
}
func (l *Libvirt) getVcpuMapping(domains []golibvirt.Domain) (map[string][]vcpuAffinity, error) {
pCPUs, err := l.utils.gatherNumberOfPCPUs()
if err != nil {
return nil, err
}
var vcpuInfos = make(map[string][]vcpuAffinity)
group := errgroup.Group{}
mutex := &sync.RWMutex{}
for i := range domains {
domain := domains[i]
// Executing gatherVcpuMapping can take some time, it is worth to call it in parallel
group.Go(func() error {
vcpuInfo, err := l.utils.gatherVcpuMapping(domain, pCPUs, l.shouldGetCurrentPCPU())
if err != nil {
return err
}
mutex.Lock()
vcpuInfos[domain.Name] = vcpuInfo
mutex.Unlock()
return nil
})
}
err = group.Wait()
if err != nil {
return nil, err
}
return vcpuInfos, nil
}
func (l *Libvirt) shouldGetCurrentPCPU() bool {
return l.vcpuMappingEnabled && (l.metricNumber&domainStatsVCPU) != 0
}
func init() {
inputs.Add(pluginName, func() telegraf.Input {
return &Libvirt{
utils: &utilsImpl{},
}
})
}