1
0
Fork 0
telegraf/plugins/inputs/supervisor/supervisor.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

179 lines
4.9 KiB
Go

//go:generate ../../../tools/readme_config_includer/generator
package supervisor
import (
_ "embed"
"fmt"
"net/url"
"strings"
"github.com/kolo/xmlrpc"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/plugins/inputs"
)
//go:embed sample.conf
var sampleConfig string
type Supervisor struct {
Server string `toml:"url"`
MetricsInc []string `toml:"metrics_include"`
MetricsExc []string `toml:"metrics_exclude"`
Log telegraf.Logger `toml:"-"`
rpcClient *xmlrpc.Client
fieldFilter filter.Filter
}
type processInfo struct {
Name string `xmlrpc:"name"`
Group string `xmlrpc:"group"`
Description string `xmlrpc:"description"`
Start int32 `xmlrpc:"start"`
Stop int32 `xmlrpc:"stop"`
Now int32 `xmlrpc:"now"`
State int16 `xmlrpc:"state"`
Statename string `xmlrpc:"statename"`
StdoutLogfile string `xmlrpc:"stdout_logfile"`
StderrLogfile string `xmlrpc:"stderr_logfile"`
SpawnErr string `xmlrpc:"spawnerr"`
ExitStatus int8 `xmlrpc:"exitstatus"`
Pid int32 `xmlrpc:"pid"`
}
type supervisorInfo struct {
StateCode int8 `xmlrpc:"statecode"`
StateName string `xmlrpc:"statename"`
Ident string
}
func (*Supervisor) SampleConfig() string {
return sampleConfig
}
func (s *Supervisor) Init() error {
// Using default server URL if none was specified in config
if s.Server == "" {
s.Server = "http://localhost:9001/RPC2"
}
var err error
// Initializing XML-RPC client
s.rpcClient, err = xmlrpc.NewClient(s.Server, nil)
if err != nil {
return fmt.Errorf("failed to initialize XML-RPC client: %w", err)
}
// Setting filter for additional metrics
s.fieldFilter, err = filter.NewIncludeExcludeFilter(s.MetricsInc, s.MetricsExc)
if err != nil {
return fmt.Errorf("metrics filter setup failed: %w", err)
}
return nil
}
func (s *Supervisor) Gather(acc telegraf.Accumulator) error {
// API call to get information about all running processes
var rawProcessData []processInfo
err := s.rpcClient.Call("supervisor.getAllProcessInfo", nil, &rawProcessData)
if err != nil {
return fmt.Errorf("failed to get processes info: %w", err)
}
// API call to get information about instance status
var status supervisorInfo
err = s.rpcClient.Call("supervisor.getState", nil, &status)
if err != nil {
return fmt.Errorf("failed to get processes info: %w", err)
}
// API call to get identification string
err = s.rpcClient.Call("supervisor.getIdentification", nil, &status.Ident)
if err != nil {
return fmt.Errorf("failed to get instance identification: %w", err)
}
// Iterating through array of structs with processes info and adding fields to accumulator
for _, process := range rawProcessData {
processTags, processFields, err := s.parseProcessData(process, status)
if err != nil {
acc.AddError(err)
continue
}
acc.AddFields("supervisor_processes", processFields, processTags)
}
// Adding instance info fields to accumulator
instanceTags, instanceFields, err := s.parseInstanceData(status)
if err != nil {
return fmt.Errorf("failed to parse instance data: %w", err)
}
acc.AddFields("supervisor_instance", instanceFields, instanceTags)
return nil
}
func (s *Supervisor) parseProcessData(pInfo processInfo, status supervisorInfo) (map[string]string, map[string]interface{}, error) {
tags := map[string]string{
"process": pInfo.Name,
"group": pInfo.Group,
}
fields := map[string]interface{}{
"uptime": pInfo.Now - pInfo.Start,
"state": pInfo.State,
}
if s.fieldFilter.Match("pid") {
fields["pid"] = pInfo.Pid
}
if s.fieldFilter.Match("rc") {
fields["exitCode"] = pInfo.ExitStatus
}
splittedURL, err := beautifyServerString(s.Server)
if err != nil {
return nil, nil, fmt.Errorf("failed to parse server string: %w", err)
}
tags["id"] = status.Ident
tags["source"] = splittedURL[0]
tags["port"] = splittedURL[1]
return tags, fields, nil
}
// Parsing of supervisor instance data
func (s *Supervisor) parseInstanceData(status supervisorInfo) (map[string]string, map[string]interface{}, error) {
splittedURL, err := beautifyServerString(s.Server)
if err != nil {
return nil, nil, fmt.Errorf("failed to parse server string: %w", err)
}
tags := map[string]string{
"id": status.Ident,
"source": splittedURL[0],
"port": splittedURL[1],
}
fields := map[string]interface{}{
"state": status.StateCode,
}
return tags, fields, nil
}
// Function to get only address and port from URL
func beautifyServerString(rawurl string) ([]string, error) {
parsedURL, err := url.Parse(rawurl)
splittedURL := strings.Split(parsedURL.Host, ":")
if err != nil {
return nil, err
}
if len(splittedURL) < 2 {
if parsedURL.Scheme == "https" {
splittedURL[1] = "443"
} else {
splittedURL[1] = "80"
}
}
return splittedURL, nil
}
func init() {
inputs.Add("supervisor", func() telegraf.Input {
return &Supervisor{
MetricsExc: []string{"pid", "rc"},
}
})
}