1
0
Fork 0
telegraf/plugins/inputs/procstat/process.go

381 lines
10 KiB
Go
Raw Permalink Normal View History

package procstat
import (
"errors"
"fmt"
"runtime"
"strconv"
"syscall"
"time"
gopsnet "github.com/shirou/gopsutil/v4/net"
gopsprocess "github.com/shirou/gopsutil/v4/process"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
type process interface {
Name() (string, error)
MemoryMaps(bool) (*[]gopsprocess.MemoryMapsStat, error)
pid() pid
setTag(string, string)
metrics(string, *collectionConfig, time.Time) ([]telegraf.Metric, error)
}
type pidFinder interface {
pidFile(path string) ([]pid, error)
pattern(pattern string) ([]pid, error)
uid(user string) ([]pid, error)
fullPattern(path string) ([]pid, error)
children(pid pid) ([]pid, error)
}
type proc struct {
hasCPUTimes bool
tags map[string]string
*gopsprocess.Process
}
func newProc(pid pid) (process, error) {
p, err := gopsprocess.NewProcess(int32(pid))
if err != nil {
return nil, err
}
proc := &proc{
Process: p,
hasCPUTimes: false,
tags: make(map[string]string),
}
return proc, nil
}
func (p *proc) pid() pid {
return pid(p.Process.Pid)
}
func (p *proc) setTag(k, v string) {
p.tags[k] = v
}
func (p *proc) percent(_ time.Duration) (float64, error) {
cpuPerc, err := p.Process.Percent(time.Duration(0))
if !p.hasCPUTimes && err == nil {
p.hasCPUTimes = true
return 0, errors.New("must call Percent twice to compute percent cpu")
}
return cpuPerc, err
}
// Add metrics a single process
func (p *proc) metrics(prefix string, cfg *collectionConfig, t time.Time) ([]telegraf.Metric, error) {
if prefix != "" {
prefix += "_"
}
fields := make(map[string]interface{})
numThreads, err := p.NumThreads()
if err == nil {
fields[prefix+"num_threads"] = numThreads
}
fds, err := p.NumFDs()
if err == nil {
fields[prefix+"num_fds"] = fds
}
ctx, err := p.NumCtxSwitches()
if err == nil {
fields[prefix+"voluntary_context_switches"] = ctx.Voluntary
fields[prefix+"involuntary_context_switches"] = ctx.Involuntary
}
faults, err := p.PageFaults()
if err == nil {
fields[prefix+"minor_faults"] = faults.MinorFaults
fields[prefix+"major_faults"] = faults.MajorFaults
fields[prefix+"child_minor_faults"] = faults.ChildMinorFaults
fields[prefix+"child_major_faults"] = faults.ChildMajorFaults
}
io, err := p.IOCounters()
if err == nil {
fields[prefix+"read_count"] = io.ReadCount
fields[prefix+"write_count"] = io.WriteCount
fields[prefix+"read_bytes"] = io.ReadBytes
fields[prefix+"write_bytes"] = io.WriteBytes
}
// Linux fixup for gopsutils exposing the disk-only-IO instead of the total
// I/O as for example on Windows
if rc, wc, err := collectTotalReadWrite(p); err == nil {
fields[prefix+"read_bytes"] = rc
fields[prefix+"write_bytes"] = wc
fields[prefix+"disk_read_bytes"] = io.ReadBytes
fields[prefix+"disk_write_bytes"] = io.WriteBytes
}
createdAt, err := p.CreateTime() // returns epoch in ms
if err == nil {
fields[prefix+"created_at"] = createdAt * 1000000 // ms to ns
}
if cfg.features["cpu"] {
cpuTime, err := p.Times()
if err == nil {
fields[prefix+"cpu_time_user"] = cpuTime.User
fields[prefix+"cpu_time_system"] = cpuTime.System
fields[prefix+"cpu_time_iowait"] = cpuTime.Iowait // only reported on Linux
}
cpuPerc, err := p.percent(time.Duration(0))
if err == nil {
if cfg.solarisMode {
fields[prefix+"cpu_usage"] = cpuPerc / float64(runtime.NumCPU())
} else {
fields[prefix+"cpu_usage"] = cpuPerc
}
}
}
// This only returns values for RSS and VMS
if cfg.features["memory"] {
mem, err := p.MemoryInfo()
if err == nil {
fields[prefix+"memory_rss"] = mem.RSS
fields[prefix+"memory_vms"] = mem.VMS
}
memPerc, err := p.MemoryPercent()
if err == nil {
fields[prefix+"memory_usage"] = memPerc
}
}
if cfg.features["mmap"] {
collectMemmap(p, prefix, fields)
}
if cfg.features["limits"] {
rlims, err := p.RlimitUsage(true)
if err == nil {
for _, rlim := range rlims {
var name string
switch rlim.Resource {
case gopsprocess.RLIMIT_CPU:
name = "cpu_time"
case gopsprocess.RLIMIT_DATA:
name = "memory_data"
case gopsprocess.RLIMIT_STACK:
name = "memory_stack"
case gopsprocess.RLIMIT_RSS:
name = "memory_rss"
case gopsprocess.RLIMIT_NOFILE:
name = "num_fds"
case gopsprocess.RLIMIT_MEMLOCK:
name = "memory_locked"
case gopsprocess.RLIMIT_AS:
name = "memory_vms"
case gopsprocess.RLIMIT_LOCKS:
name = "file_locks"
case gopsprocess.RLIMIT_SIGPENDING:
name = "signals_pending"
case gopsprocess.RLIMIT_NICE:
name = "nice_priority"
case gopsprocess.RLIMIT_RTPRIO:
name = "realtime_priority"
default:
continue
}
fields[prefix+"rlimit_"+name+"_soft"] = rlim.Soft
fields[prefix+"rlimit_"+name+"_hard"] = rlim.Hard
if name != "file_locks" { // gopsutil doesn't currently track the used file locks count
fields[prefix+name] = rlim.Used
}
}
}
}
// Add the tags as requested by the user
cmdline, err := p.Cmdline()
if err == nil {
if cfg.tagging["cmdline"] {
p.tags["cmdline"] = cmdline
} else {
fields[prefix+"cmdline"] = cmdline
}
}
if cfg.tagging["pid"] {
p.tags["pid"] = strconv.Itoa(int(p.Pid))
} else {
fields["pid"] = p.Pid
}
ppid, err := p.Ppid()
if err == nil {
if cfg.tagging["ppid"] {
p.tags["ppid"] = strconv.Itoa(int(ppid))
} else {
fields[prefix+"ppid"] = ppid
}
}
status, err := p.Status()
if err == nil {
if cfg.tagging["status"] {
p.tags["status"] = status[0]
} else {
fields[prefix+"status"] = status[0]
}
}
user, err := p.Username()
if err == nil {
if cfg.tagging["user"] {
p.tags["user"] = user
} else {
fields[prefix+"user"] = user
}
}
if _, exists := p.tags["process_name"]; !exists {
name, err := p.Name()
if err == nil {
p.tags["process_name"] = name
}
}
metrics := []telegraf.Metric{metric.New("procstat", p.tags, fields, t)}
// Collect the socket statistics if requested
if cfg.features["sockets"] {
for _, protocol := range cfg.socketProtos {
// Get the requested connections for the PID
var fieldlist []map[string]interface{}
switch protocol {
case "all":
conns, err := gopsnet.ConnectionsPid(protocol, p.Pid)
if err != nil {
return metrics, fmt.Errorf("cannot get connections for %q of PID %d", protocol, p.Pid)
}
var connsTCPv4, connsTCPv6, connsUDPv4, connsUDPv6, connsUnix []gopsnet.ConnectionStat
for _, c := range conns {
switch {
case c.Family == syscall.AF_INET && c.Type == syscall.SOCK_STREAM:
connsTCPv4 = append(connsTCPv4, c)
case c.Family == syscall.AF_INET6 && c.Type == syscall.SOCK_STREAM:
connsTCPv6 = append(connsTCPv6, c)
case c.Family == syscall.AF_INET && c.Type == syscall.SOCK_DGRAM:
connsUDPv4 = append(connsUDPv4, c)
case c.Family == syscall.AF_INET6 && c.Type == syscall.SOCK_DGRAM:
connsUDPv6 = append(connsUDPv6, c)
case c.Family == syscall.AF_UNIX:
connsUnix = append(connsUnix, c)
}
}
fl, err := statsTCP(connsTCPv4, syscall.AF_INET)
if err != nil {
return metrics, fmt.Errorf("cannot get statistics for \"tcp4\" of PID %d", p.Pid)
}
fieldlist = append(fieldlist, fl...)
fl, err = statsTCP(connsTCPv6, syscall.AF_INET6)
if err != nil {
return metrics, fmt.Errorf("cannot get statistics for \"tcp6\" of PID %d", p.Pid)
}
fieldlist = append(fieldlist, fl...)
fl, err = statsUDP(connsUDPv4, syscall.AF_INET)
if err != nil {
return metrics, fmt.Errorf("cannot get statistics for \"udp4\" of PID %d", p.Pid)
}
fieldlist = append(fieldlist, fl...)
fl, err = statsUDP(connsUDPv6, syscall.AF_INET6)
if err != nil {
return metrics, fmt.Errorf("cannot get statistics for \"udp6\" of PID %d", p.Pid)
}
fieldlist = append(fieldlist, fl...)
fl, err = statsUnix(connsUnix)
if err != nil {
return metrics, fmt.Errorf("cannot get statistics for \"unix\" of PID %d", p.Pid)
}
fieldlist = append(fieldlist, fl...)
case "tcp4", "tcp6":
family := uint8(syscall.AF_INET)
if protocol == "tcp6" {
family = syscall.AF_INET6
}
conns, err := gopsnet.ConnectionsPid(protocol, p.Pid)
if err != nil {
return metrics, fmt.Errorf("cannot get connections for %q of PID %d", protocol, p.Pid)
}
if fieldlist, err = statsTCP(conns, family); err != nil {
return metrics, fmt.Errorf("cannot get statistics for %q of PID %d", protocol, p.Pid)
}
case "udp4", "udp6":
family := uint8(syscall.AF_INET)
if protocol == "udp6" {
family = syscall.AF_INET6
}
conns, err := gopsnet.ConnectionsPid(protocol, p.Pid)
if err != nil {
return metrics, fmt.Errorf("cannot get connections for %q of PID %d", protocol, p.Pid)
}
if fieldlist, err = statsUDP(conns, family); err != nil {
return metrics, fmt.Errorf("cannot get statistics for %q of PID %d", protocol, p.Pid)
}
case "unix":
conns, err := gopsnet.ConnectionsPid(protocol, p.Pid)
if err != nil {
return metrics, fmt.Errorf("cannot get connections for %q of PID %d", protocol, p.Pid)
}
if fieldlist, err = statsUnix(conns); err != nil {
return metrics, fmt.Errorf("cannot get statistics for %q of PID %d", protocol, p.Pid)
}
}
for _, fields := range fieldlist {
if cfg.tagging["protocol"] {
p.tags["protocol"] = fields["protocol"].(string)
delete(fields, "protocol")
}
if cfg.tagging["state"] {
p.tags["state"] = fields["state"].(string)
delete(fields, "state")
}
if cfg.tagging["src"] && fields["src"] != nil {
p.tags["src"] = fields["src"].(string)
delete(fields, "src")
}
if cfg.tagging["src_port"] && fields["src_port"] != nil {
port := uint64(fields["src_port"].(uint16))
p.tags["src_port"] = strconv.FormatUint(port, 10)
delete(fields, "src_port")
}
if cfg.tagging["dest"] && fields["dest"] != nil {
p.tags["dest"] = fields["dest"].(string)
delete(fields, "dest")
}
if cfg.tagging["dest_port"] && fields["dest_port"] != nil {
port := uint64(fields["dest_port"].(uint16))
p.tags["dest_port"] = strconv.FormatUint(port, 10)
delete(fields, "dest_port")
}
if cfg.tagging["name"] && fields["name"] != nil {
p.tags["name"] = fields["name"].(string)
delete(fields, "name")
}
metrics = append(metrics, metric.New("procstat_socket", p.tags, fields, t))
}
}
}
return metrics, nil
}