1085 lines
28 KiB
Go
1085 lines
28 KiB
Go
//go:generate ../../../tools/readme_config_includer/generator
|
|
package docker
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
_ "embed"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"regexp"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/Masterminds/semver/v3"
|
|
"github.com/docker/docker/api/types"
|
|
"github.com/docker/docker/api/types/container"
|
|
"github.com/docker/docker/api/types/filters"
|
|
"github.com/docker/docker/api/types/swarm"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/config"
|
|
"github.com/influxdata/telegraf/filter"
|
|
"github.com/influxdata/telegraf/internal/choice"
|
|
"github.com/influxdata/telegraf/internal/docker"
|
|
docker_stats "github.com/influxdata/telegraf/plugins/common/docker"
|
|
common_tls "github.com/influxdata/telegraf/plugins/common/tls"
|
|
"github.com/influxdata/telegraf/plugins/inputs"
|
|
)
|
|
|
|
//go:embed sample.conf
|
|
var sampleConfig string
|
|
|
|
var (
|
|
sizeRegex = regexp.MustCompile(`^(\d+(\.\d+)*) ?([kKmMgGtTpP])?[bB]?$`)
|
|
containerStates = []string{"created", "restarting", "running", "removing", "paused", "exited", "dead"}
|
|
containerMetricClasses = []string{"cpu", "network", "blkio"}
|
|
now = time.Now
|
|
|
|
minVersion = semver.MustParse("1.23")
|
|
minDiskUsageVersion = semver.MustParse("1.42")
|
|
)
|
|
|
|
// KB, MB, GB, TB, PB...human friendly
|
|
const (
|
|
KB = 1000
|
|
MB = 1000 * KB
|
|
GB = 1000 * MB
|
|
TB = 1000 * GB
|
|
PB = 1000 * TB
|
|
|
|
defaultEndpoint = "unix:///var/run/docker.sock"
|
|
)
|
|
|
|
type Docker struct {
|
|
Endpoint string `toml:"endpoint"`
|
|
ContainerNames []string `toml:"container_names" deprecated:"1.4.0;1.35.0;use 'container_name_include' instead"`
|
|
|
|
GatherServices bool `toml:"gather_services"`
|
|
|
|
Timeout config.Duration `toml:"timeout"`
|
|
PerDevice bool `toml:"perdevice" deprecated:"1.18.0;1.35.0;use 'perdevice_include' instead"`
|
|
PerDeviceInclude []string `toml:"perdevice_include"`
|
|
Total bool `toml:"total" deprecated:"1.18.0;1.35.0;use 'total_include' instead"`
|
|
TotalInclude []string `toml:"total_include"`
|
|
TagEnvironment []string `toml:"tag_env"`
|
|
LabelInclude []string `toml:"docker_label_include"`
|
|
LabelExclude []string `toml:"docker_label_exclude"`
|
|
|
|
ContainerInclude []string `toml:"container_name_include"`
|
|
ContainerExclude []string `toml:"container_name_exclude"`
|
|
|
|
ContainerStateInclude []string `toml:"container_state_include"`
|
|
ContainerStateExclude []string `toml:"container_state_exclude"`
|
|
|
|
StorageObjects []string `toml:"storage_objects"`
|
|
|
|
IncludeSourceTag bool `toml:"source_tag"`
|
|
|
|
Log telegraf.Logger `toml:"-"`
|
|
|
|
common_tls.ClientConfig
|
|
|
|
newEnvClient func() (dockerClient, error)
|
|
newClient func(string, *tls.Config) (dockerClient, error)
|
|
|
|
client dockerClient
|
|
engineHost string
|
|
serverVersion string
|
|
filtersCreated bool
|
|
labelFilter filter.Filter
|
|
containerFilter filter.Filter
|
|
stateFilter filter.Filter
|
|
objectTypes []types.DiskUsageObject
|
|
}
|
|
|
|
func (*Docker) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
func (d *Docker) Init() error {
|
|
err := choice.CheckSlice(d.PerDeviceInclude, containerMetricClasses)
|
|
if err != nil {
|
|
return fmt.Errorf("error validating 'perdevice_include' setting: %w", err)
|
|
}
|
|
|
|
err = choice.CheckSlice(d.TotalInclude, containerMetricClasses)
|
|
if err != nil {
|
|
return fmt.Errorf("error validating 'total_include' setting: %w", err)
|
|
}
|
|
|
|
// Temporary logic needed for backwards compatibility until 'perdevice' setting is removed.
|
|
if d.PerDevice {
|
|
if !choice.Contains("network", d.PerDeviceInclude) {
|
|
d.PerDeviceInclude = append(d.PerDeviceInclude, "network")
|
|
}
|
|
if !choice.Contains("blkio", d.PerDeviceInclude) {
|
|
d.PerDeviceInclude = append(d.PerDeviceInclude, "blkio")
|
|
}
|
|
}
|
|
|
|
// Temporary logic needed for backwards compatibility until 'total' setting is removed.
|
|
if !d.Total {
|
|
if choice.Contains("cpu", d.TotalInclude) {
|
|
d.TotalInclude = []string{"cpu"}
|
|
} else {
|
|
d.TotalInclude = make([]string, 0)
|
|
}
|
|
}
|
|
|
|
d.objectTypes = make([]types.DiskUsageObject, 0, len(d.StorageObjects))
|
|
|
|
for _, object := range d.StorageObjects {
|
|
switch object {
|
|
case "container":
|
|
d.objectTypes = append(d.objectTypes, types.ContainerObject)
|
|
case "image":
|
|
d.objectTypes = append(d.objectTypes, types.ImageObject)
|
|
case "volume":
|
|
d.objectTypes = append(d.objectTypes, types.VolumeObject)
|
|
default:
|
|
d.Log.Warnf("Unrecognized storage object type: %s", object)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *Docker) Gather(acc telegraf.Accumulator) error {
|
|
if d.client == nil {
|
|
c, err := d.getNewClient()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
d.client = c
|
|
|
|
version, err := semver.NewVersion(d.client.ClientVersion())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if version.LessThan(minVersion) {
|
|
d.Log.Warnf("Unsupported api version (%v.%v), upgrade to docker engine 1.12 or later (api version 1.24)",
|
|
version.Major(), version.Minor())
|
|
} else if version.LessThan(minDiskUsageVersion) && len(d.objectTypes) > 0 {
|
|
d.Log.Warnf("Unsupported api version for disk usage (%v.%v), upgrade to docker engine 23.0 or later (api version 1.42)",
|
|
version.Major(), version.Minor())
|
|
}
|
|
}
|
|
|
|
// Close any idle connections in the end of gathering
|
|
defer d.client.Close()
|
|
|
|
// Create label filters if not already created
|
|
if !d.filtersCreated {
|
|
err := d.createLabelFilters()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = d.createContainerFilters()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = d.createContainerStateFilters()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
d.filtersCreated = true
|
|
}
|
|
|
|
// Get daemon info
|
|
err := d.gatherInfo(acc)
|
|
if err != nil {
|
|
acc.AddError(err)
|
|
}
|
|
|
|
if d.GatherServices {
|
|
err := d.gatherSwarmInfo(acc)
|
|
if err != nil {
|
|
acc.AddError(err)
|
|
}
|
|
}
|
|
|
|
filterArgs := filters.NewArgs()
|
|
for _, state := range containerStates {
|
|
if d.stateFilter.Match(state) {
|
|
filterArgs.Add("status", state)
|
|
}
|
|
}
|
|
|
|
// All container states were excluded
|
|
if filterArgs.Len() == 0 {
|
|
return nil
|
|
}
|
|
|
|
// List containers
|
|
opts := container.ListOptions{
|
|
Filters: filterArgs,
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(d.Timeout))
|
|
defer cancel()
|
|
|
|
containers, err := d.client.ContainerList(ctx, opts)
|
|
if errors.Is(err, context.DeadlineExceeded) {
|
|
return errListTimeout
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Get container data
|
|
var wg sync.WaitGroup
|
|
wg.Add(len(containers))
|
|
for _, cntnr := range containers {
|
|
go func(c container.Summary) {
|
|
defer wg.Done()
|
|
if err := d.gatherContainer(c, acc); err != nil {
|
|
acc.AddError(err)
|
|
}
|
|
}(cntnr)
|
|
}
|
|
wg.Wait()
|
|
|
|
// Get disk usage data
|
|
if len(d.objectTypes) > 0 {
|
|
d.gatherDiskUsage(acc, types.DiskUsageOptions{Types: d.objectTypes})
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *Docker) gatherSwarmInfo(acc telegraf.Accumulator) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(d.Timeout))
|
|
defer cancel()
|
|
|
|
services, err := d.client.ServiceList(ctx, types.ServiceListOptions{})
|
|
if errors.Is(err, context.DeadlineExceeded) {
|
|
return errServiceTimeout
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(services) > 0 {
|
|
tasks, err := d.client.TaskList(ctx, types.TaskListOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
nodes, err := d.client.NodeList(ctx, types.NodeListOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
activeNodes := make(map[string]struct{})
|
|
for _, n := range nodes {
|
|
if n.Status.State != swarm.NodeStateDown {
|
|
activeNodes[n.ID] = struct{}{}
|
|
}
|
|
}
|
|
|
|
tasksNoShutdown := make(map[string]uint64, len(tasks))
|
|
running := make(map[string]int, len(tasks))
|
|
for _, task := range tasks {
|
|
if task.DesiredState != swarm.TaskStateShutdown {
|
|
tasksNoShutdown[task.ServiceID]++
|
|
}
|
|
|
|
if task.Status.State == swarm.TaskStateRunning {
|
|
running[task.ServiceID]++
|
|
}
|
|
}
|
|
|
|
for _, service := range services {
|
|
tags := make(map[string]string, 3)
|
|
fields := make(map[string]interface{}, 2)
|
|
now := time.Now()
|
|
tags["service_id"] = service.ID
|
|
tags["service_name"] = service.Spec.Name
|
|
if service.Spec.Mode.Replicated != nil && service.Spec.Mode.Replicated.Replicas != nil {
|
|
tags["service_mode"] = "replicated"
|
|
fields["tasks_running"] = running[service.ID]
|
|
fields["tasks_desired"] = *service.Spec.Mode.Replicated.Replicas
|
|
} else if service.Spec.Mode.Global != nil {
|
|
tags["service_mode"] = "global"
|
|
fields["tasks_running"] = running[service.ID]
|
|
fields["tasks_desired"] = tasksNoShutdown[service.ID]
|
|
} else if service.Spec.Mode.ReplicatedJob != nil {
|
|
tags["service_mode"] = "replicated_job"
|
|
fields["tasks_running"] = running[service.ID]
|
|
if service.Spec.Mode.ReplicatedJob.MaxConcurrent != nil {
|
|
fields["max_concurrent"] = *service.Spec.Mode.ReplicatedJob.MaxConcurrent
|
|
}
|
|
if service.Spec.Mode.ReplicatedJob.TotalCompletions != nil {
|
|
fields["total_completions"] = *service.Spec.Mode.ReplicatedJob.TotalCompletions
|
|
}
|
|
} else if service.Spec.Mode.GlobalJob != nil {
|
|
tags["service_mode"] = "global_job"
|
|
fields["tasks_running"] = running[service.ID]
|
|
} else {
|
|
d.Log.Error("Unknown replica mode")
|
|
}
|
|
// Add metrics
|
|
acc.AddFields("docker_swarm",
|
|
fields,
|
|
tags,
|
|
now)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *Docker) gatherInfo(acc telegraf.Accumulator) error {
|
|
// Init vars
|
|
dataFields := make(map[string]interface{})
|
|
metadataFields := make(map[string]interface{})
|
|
now := time.Now()
|
|
|
|
// Get info from docker daemon
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(d.Timeout))
|
|
defer cancel()
|
|
|
|
info, err := d.client.Info(ctx)
|
|
if errors.Is(err, context.DeadlineExceeded) {
|
|
return errInfoTimeout
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
d.engineHost = info.Name
|
|
d.serverVersion = info.ServerVersion
|
|
|
|
tags := map[string]string{
|
|
"engine_host": d.engineHost,
|
|
"server_version": d.serverVersion,
|
|
}
|
|
|
|
fields := map[string]interface{}{
|
|
"n_cpus": info.NCPU,
|
|
"n_used_file_descriptors": info.NFd,
|
|
"n_containers": info.Containers,
|
|
"n_containers_running": info.ContainersRunning,
|
|
"n_containers_stopped": info.ContainersStopped,
|
|
"n_containers_paused": info.ContainersPaused,
|
|
"n_images": info.Images,
|
|
"n_goroutines": info.NGoroutines,
|
|
"n_listener_events": info.NEventsListener,
|
|
}
|
|
|
|
// Add metrics
|
|
acc.AddFields("docker", fields, tags, now)
|
|
acc.AddFields("docker",
|
|
map[string]interface{}{"memory_total": info.MemTotal},
|
|
tags,
|
|
now)
|
|
|
|
// Get storage metrics
|
|
tags["unit"] = "bytes"
|
|
|
|
var (
|
|
// "docker_devicemapper" measurement fields
|
|
poolName string
|
|
deviceMapperFields = make(map[string]interface{}, len(info.DriverStatus))
|
|
)
|
|
|
|
for _, rawData := range info.DriverStatus {
|
|
name := strings.ToLower(strings.ReplaceAll(rawData[0], " ", "_"))
|
|
if name == "pool_name" {
|
|
poolName = rawData[1]
|
|
continue
|
|
}
|
|
|
|
// Try to convert string to int (bytes)
|
|
value, err := parseSize(rawData[1])
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
switch name {
|
|
case "pool_blocksize",
|
|
"base_device_size",
|
|
"data_space_used",
|
|
"data_space_total",
|
|
"data_space_available",
|
|
"metadata_space_used",
|
|
"metadata_space_total",
|
|
"metadata_space_available",
|
|
"thin_pool_minimum_free_space":
|
|
deviceMapperFields[name+"_bytes"] = value
|
|
}
|
|
|
|
// Legacy devicemapper measurements
|
|
if name == "pool_blocksize" {
|
|
// pool blocksize
|
|
acc.AddFields("docker",
|
|
map[string]interface{}{"pool_blocksize": value},
|
|
tags,
|
|
now)
|
|
} else if strings.HasPrefix(name, "data_space_") {
|
|
// data space
|
|
fieldName := strings.TrimPrefix(name, "data_space_")
|
|
dataFields[fieldName] = value
|
|
} else if strings.HasPrefix(name, "metadata_space_") {
|
|
// metadata space
|
|
fieldName := strings.TrimPrefix(name, "metadata_space_")
|
|
metadataFields[fieldName] = value
|
|
}
|
|
}
|
|
|
|
if len(dataFields) > 0 {
|
|
acc.AddFields("docker_data", dataFields, tags, now)
|
|
}
|
|
|
|
if len(metadataFields) > 0 {
|
|
acc.AddFields("docker_metadata", metadataFields, tags, now)
|
|
}
|
|
|
|
if len(deviceMapperFields) > 0 {
|
|
tags := map[string]string{
|
|
"engine_host": d.engineHost,
|
|
"server_version": d.serverVersion,
|
|
}
|
|
|
|
if poolName != "" {
|
|
tags["pool_name"] = poolName
|
|
}
|
|
|
|
acc.AddFields("docker_devicemapper", deviceMapperFields, tags, now)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func hostnameFromID(id string) string {
|
|
if len(id) > 12 {
|
|
return id[0:12]
|
|
}
|
|
return id
|
|
}
|
|
|
|
// Parse container name
|
|
func parseContainerName(containerNames []string) string {
|
|
var cname string
|
|
|
|
for _, name := range containerNames {
|
|
trimmedName := strings.TrimPrefix(name, "/")
|
|
if !strings.Contains(trimmedName, "/") {
|
|
cname = trimmedName
|
|
return cname
|
|
}
|
|
}
|
|
return cname
|
|
}
|
|
|
|
func (d *Docker) gatherContainer(
|
|
cntnr container.Summary,
|
|
acc telegraf.Accumulator,
|
|
) error {
|
|
var v *container.StatsResponse
|
|
|
|
cname := parseContainerName(cntnr.Names)
|
|
|
|
if cname == "" {
|
|
return nil
|
|
}
|
|
|
|
if !d.containerFilter.Match(cname) {
|
|
return nil
|
|
}
|
|
|
|
imageName, imageVersion := docker.ParseImage(cntnr.Image)
|
|
|
|
tags := map[string]string{
|
|
"engine_host": d.engineHost,
|
|
"server_version": d.serverVersion,
|
|
"container_name": cname,
|
|
"container_image": imageName,
|
|
"container_version": imageVersion,
|
|
}
|
|
|
|
if d.IncludeSourceTag {
|
|
tags["source"] = hostnameFromID(cntnr.ID)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(d.Timeout))
|
|
defer cancel()
|
|
|
|
r, err := d.client.ContainerStats(ctx, cntnr.ID, false)
|
|
if errors.Is(err, context.DeadlineExceeded) {
|
|
return errStatsTimeout
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("error getting docker stats: %w", err)
|
|
}
|
|
|
|
defer r.Body.Close()
|
|
dec := json.NewDecoder(r.Body)
|
|
if err = dec.Decode(&v); err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("error decoding: %w", err)
|
|
}
|
|
daemonOSType := r.OSType
|
|
|
|
// Add labels to tags
|
|
for k, label := range cntnr.Labels {
|
|
if d.labelFilter.Match(k) {
|
|
tags[k] = label
|
|
}
|
|
}
|
|
|
|
return d.gatherContainerInspect(cntnr, acc, tags, daemonOSType, v)
|
|
}
|
|
|
|
func (d *Docker) gatherContainerInspect(
|
|
cntnr container.Summary,
|
|
acc telegraf.Accumulator,
|
|
tags map[string]string,
|
|
daemonOSType string,
|
|
v *container.StatsResponse,
|
|
) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(d.Timeout))
|
|
defer cancel()
|
|
|
|
info, err := d.client.ContainerInspect(ctx, cntnr.ID)
|
|
if errors.Is(err, context.DeadlineExceeded) {
|
|
return errInspectTimeout
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("error inspecting docker container: %w", err)
|
|
}
|
|
|
|
// Add whitelisted environment variables to tags
|
|
if len(d.TagEnvironment) > 0 {
|
|
for _, envvar := range info.Config.Env {
|
|
for _, configvar := range d.TagEnvironment {
|
|
dockEnv := strings.SplitN(envvar, "=", 2)
|
|
// check for presence of tag in whitelist
|
|
if len(dockEnv) == 2 && len(strings.TrimSpace(dockEnv[1])) != 0 && configvar == dockEnv[0] {
|
|
tags[dockEnv[0]] = dockEnv[1]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if info.State != nil {
|
|
tags["container_status"] = info.State.Status
|
|
statefields := map[string]interface{}{
|
|
"oomkilled": info.State.OOMKilled,
|
|
"pid": info.State.Pid,
|
|
"exitcode": info.State.ExitCode,
|
|
"restart_count": info.RestartCount,
|
|
"container_id": cntnr.ID,
|
|
}
|
|
|
|
finished, err := time.Parse(time.RFC3339, info.State.FinishedAt)
|
|
if err == nil && !finished.IsZero() {
|
|
statefields["finished_at"] = finished.UnixNano()
|
|
} else {
|
|
// set finished to now for use in uptime
|
|
finished = now()
|
|
}
|
|
|
|
started, err := time.Parse(time.RFC3339, info.State.StartedAt)
|
|
if err == nil && !started.IsZero() {
|
|
statefields["started_at"] = started.UnixNano()
|
|
|
|
uptime := finished.Sub(started)
|
|
if finished.Before(started) {
|
|
uptime = now().Sub(started)
|
|
}
|
|
statefields["uptime_ns"] = uptime.Nanoseconds()
|
|
}
|
|
|
|
acc.AddFields("docker_container_status", statefields, tags, now())
|
|
|
|
if info.State.Health != nil {
|
|
healthfields := map[string]interface{}{
|
|
"health_status": info.State.Health.Status,
|
|
"failing_streak": info.ContainerJSONBase.State.Health.FailingStreak,
|
|
}
|
|
acc.AddFields("docker_container_health", healthfields, tags, now())
|
|
}
|
|
}
|
|
|
|
d.parseContainerStats(v, acc, tags, cntnr.ID, daemonOSType)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *Docker) parseContainerStats(
|
|
stat *container.StatsResponse,
|
|
acc telegraf.Accumulator,
|
|
tags map[string]string,
|
|
id, daemonOSType string,
|
|
) {
|
|
tm := stat.Read
|
|
|
|
if tm.Before(time.Unix(0, 0)) {
|
|
tm = time.Now()
|
|
}
|
|
|
|
memfields := map[string]interface{}{
|
|
"container_id": id,
|
|
}
|
|
|
|
memstats := []string{
|
|
"active_anon",
|
|
"active_file",
|
|
"cache",
|
|
"hierarchical_memory_limit",
|
|
"inactive_anon",
|
|
"inactive_file",
|
|
"mapped_file",
|
|
"pgfault",
|
|
"pgmajfault",
|
|
"pgpgin",
|
|
"pgpgout",
|
|
"rss",
|
|
"rss_huge",
|
|
"total_active_anon",
|
|
"total_active_file",
|
|
"total_cache",
|
|
"total_inactive_anon",
|
|
"total_inactive_file",
|
|
"total_mapped_file",
|
|
"total_pgfault",
|
|
"total_pgmajfault",
|
|
"total_pgpgin",
|
|
"total_pgpgout",
|
|
"total_rss",
|
|
"total_rss_huge",
|
|
"total_unevictable",
|
|
"total_writeback",
|
|
"unevictable",
|
|
"writeback",
|
|
}
|
|
for _, field := range memstats {
|
|
if value, ok := stat.MemoryStats.Stats[field]; ok {
|
|
memfields[field] = value
|
|
}
|
|
}
|
|
if stat.MemoryStats.Failcnt != 0 {
|
|
memfields["fail_count"] = stat.MemoryStats.Failcnt
|
|
}
|
|
|
|
if daemonOSType != "windows" {
|
|
memfields["limit"] = stat.MemoryStats.Limit
|
|
memfields["max_usage"] = stat.MemoryStats.MaxUsage
|
|
|
|
mem := docker_stats.CalculateMemUsageUnixNoCache(stat.MemoryStats)
|
|
memLimit := float64(stat.MemoryStats.Limit)
|
|
memfields["usage"] = uint64(mem)
|
|
memfields["usage_percent"] = docker_stats.CalculateMemPercentUnixNoCache(memLimit, mem)
|
|
} else {
|
|
memfields["commit_bytes"] = stat.MemoryStats.Commit
|
|
memfields["commit_peak_bytes"] = stat.MemoryStats.CommitPeak
|
|
memfields["private_working_set"] = stat.MemoryStats.PrivateWorkingSet
|
|
}
|
|
|
|
acc.AddFields("docker_container_mem", memfields, tags, tm)
|
|
|
|
if choice.Contains("cpu", d.TotalInclude) {
|
|
cpufields := map[string]interface{}{
|
|
"usage_total": stat.CPUStats.CPUUsage.TotalUsage,
|
|
"usage_in_usermode": stat.CPUStats.CPUUsage.UsageInUsermode,
|
|
"usage_in_kernelmode": stat.CPUStats.CPUUsage.UsageInKernelmode,
|
|
"usage_system": stat.CPUStats.SystemUsage,
|
|
"throttling_periods": stat.CPUStats.ThrottlingData.Periods,
|
|
"throttling_throttled_periods": stat.CPUStats.ThrottlingData.ThrottledPeriods,
|
|
"throttling_throttled_time": stat.CPUStats.ThrottlingData.ThrottledTime,
|
|
"container_id": id,
|
|
}
|
|
|
|
if daemonOSType != "windows" {
|
|
previousCPU := stat.PreCPUStats.CPUUsage.TotalUsage
|
|
previousSystem := stat.PreCPUStats.SystemUsage
|
|
cpuPercent := docker_stats.CalculateCPUPercentUnix(previousCPU, previousSystem, stat)
|
|
cpufields["usage_percent"] = cpuPercent
|
|
} else {
|
|
cpuPercent := docker_stats.CalculateCPUPercentWindows(stat)
|
|
cpufields["usage_percent"] = cpuPercent
|
|
}
|
|
|
|
cputags := copyTags(tags)
|
|
cputags["cpu"] = "cpu-total"
|
|
acc.AddFields("docker_container_cpu", cpufields, cputags, tm)
|
|
}
|
|
|
|
if choice.Contains("cpu", d.PerDeviceInclude) && len(stat.CPUStats.CPUUsage.PercpuUsage) > 0 {
|
|
// If we have OnlineCPUs field, then use it to restrict stats gathering to only Online CPUs
|
|
// (https://github.com/moby/moby/commit/115f91d7575d6de6c7781a96a082f144fd17e400)
|
|
var percpuusage []uint64
|
|
if stat.CPUStats.OnlineCPUs > 0 {
|
|
percpuusage = stat.CPUStats.CPUUsage.PercpuUsage[:stat.CPUStats.OnlineCPUs]
|
|
} else {
|
|
percpuusage = stat.CPUStats.CPUUsage.PercpuUsage
|
|
}
|
|
|
|
for i, percpu := range percpuusage {
|
|
percputags := copyTags(tags)
|
|
percputags["cpu"] = fmt.Sprintf("cpu%d", i)
|
|
fields := map[string]interface{}{
|
|
"usage_total": percpu,
|
|
"container_id": id,
|
|
}
|
|
acc.AddFields("docker_container_cpu", fields, percputags, tm)
|
|
}
|
|
}
|
|
|
|
totalNetworkStatMap := make(map[string]interface{})
|
|
for network, netstats := range stat.Networks {
|
|
netfields := map[string]interface{}{
|
|
"rx_dropped": netstats.RxDropped,
|
|
"rx_bytes": netstats.RxBytes,
|
|
"rx_errors": netstats.RxErrors,
|
|
"tx_packets": netstats.TxPackets,
|
|
"tx_dropped": netstats.TxDropped,
|
|
"rx_packets": netstats.RxPackets,
|
|
"tx_errors": netstats.TxErrors,
|
|
"tx_bytes": netstats.TxBytes,
|
|
"container_id": id,
|
|
}
|
|
// Create a new network tag dictionary for the "network" tag
|
|
if choice.Contains("network", d.PerDeviceInclude) {
|
|
nettags := copyTags(tags)
|
|
nettags["network"] = network
|
|
acc.AddFields("docker_container_net", netfields, nettags, tm)
|
|
}
|
|
if choice.Contains("network", d.TotalInclude) {
|
|
for field, value := range netfields {
|
|
if field == "container_id" {
|
|
continue
|
|
}
|
|
|
|
var uintV uint64
|
|
switch v := value.(type) {
|
|
case uint64:
|
|
uintV = v
|
|
case int64:
|
|
uintV = uint64(v)
|
|
default:
|
|
continue
|
|
}
|
|
|
|
_, ok := totalNetworkStatMap[field]
|
|
if ok {
|
|
totalNetworkStatMap[field] = totalNetworkStatMap[field].(uint64) + uintV
|
|
} else {
|
|
totalNetworkStatMap[field] = uintV
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// totalNetworkStatMap could be empty if container is running with --net=host.
|
|
if choice.Contains("network", d.TotalInclude) && len(totalNetworkStatMap) != 0 {
|
|
nettags := copyTags(tags)
|
|
nettags["network"] = "total"
|
|
totalNetworkStatMap["container_id"] = id
|
|
acc.AddFields("docker_container_net", totalNetworkStatMap, nettags, tm)
|
|
}
|
|
|
|
d.gatherBlockIOMetrics(acc, stat, tags, tm, id)
|
|
}
|
|
|
|
// Make a map of devices to their block io stats
|
|
func getDeviceStatMap(blkioStats container.BlkioStats) map[string]map[string]interface{} {
|
|
deviceStatMap := make(map[string]map[string]interface{})
|
|
|
|
for _, metric := range blkioStats.IoServiceBytesRecursive {
|
|
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
|
|
_, ok := deviceStatMap[device]
|
|
if !ok {
|
|
deviceStatMap[device] = make(map[string]interface{})
|
|
}
|
|
|
|
field := "io_service_bytes_recursive_" + strings.ToLower(metric.Op)
|
|
deviceStatMap[device][field] = metric.Value
|
|
}
|
|
|
|
for _, metric := range blkioStats.IoServicedRecursive {
|
|
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
|
|
_, ok := deviceStatMap[device]
|
|
if !ok {
|
|
deviceStatMap[device] = make(map[string]interface{})
|
|
}
|
|
|
|
field := "io_serviced_recursive_" + strings.ToLower(metric.Op)
|
|
deviceStatMap[device][field] = metric.Value
|
|
}
|
|
|
|
for _, metric := range blkioStats.IoQueuedRecursive {
|
|
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
|
|
field := "io_queue_recursive_" + strings.ToLower(metric.Op)
|
|
deviceStatMap[device][field] = metric.Value
|
|
}
|
|
|
|
for _, metric := range blkioStats.IoServiceTimeRecursive {
|
|
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
|
|
field := "io_service_time_recursive_" + strings.ToLower(metric.Op)
|
|
deviceStatMap[device][field] = metric.Value
|
|
}
|
|
|
|
for _, metric := range blkioStats.IoWaitTimeRecursive {
|
|
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
|
|
field := "io_wait_time_" + strings.ToLower(metric.Op)
|
|
deviceStatMap[device][field] = metric.Value
|
|
}
|
|
|
|
for _, metric := range blkioStats.IoMergedRecursive {
|
|
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
|
|
field := "io_merged_recursive_" + strings.ToLower(metric.Op)
|
|
deviceStatMap[device][field] = metric.Value
|
|
}
|
|
|
|
for _, metric := range blkioStats.IoTimeRecursive {
|
|
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
|
|
deviceStatMap[device]["io_time_recursive"] = metric.Value
|
|
}
|
|
|
|
for _, metric := range blkioStats.SectorsRecursive {
|
|
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
|
|
deviceStatMap[device]["sectors_recursive"] = metric.Value
|
|
}
|
|
return deviceStatMap
|
|
}
|
|
|
|
func (d *Docker) gatherBlockIOMetrics(
|
|
acc telegraf.Accumulator,
|
|
stat *container.StatsResponse,
|
|
tags map[string]string,
|
|
tm time.Time,
|
|
id string,
|
|
) {
|
|
perDeviceBlkio := choice.Contains("blkio", d.PerDeviceInclude)
|
|
totalBlkio := choice.Contains("blkio", d.TotalInclude)
|
|
blkioStats := stat.BlkioStats
|
|
deviceStatMap := getDeviceStatMap(blkioStats)
|
|
|
|
totalStatMap := make(map[string]interface{})
|
|
for device, fields := range deviceStatMap {
|
|
fields["container_id"] = id
|
|
if perDeviceBlkio {
|
|
iotags := copyTags(tags)
|
|
iotags["device"] = device
|
|
acc.AddFields("docker_container_blkio", fields, iotags, tm)
|
|
}
|
|
if totalBlkio {
|
|
for field, value := range fields {
|
|
if field == "container_id" {
|
|
continue
|
|
}
|
|
|
|
var uintV uint64
|
|
switch v := value.(type) {
|
|
case uint64:
|
|
uintV = v
|
|
case int64:
|
|
uintV = uint64(v)
|
|
default:
|
|
continue
|
|
}
|
|
|
|
_, ok := totalStatMap[field]
|
|
if ok {
|
|
totalStatMap[field] = totalStatMap[field].(uint64) + uintV
|
|
} else {
|
|
totalStatMap[field] = uintV
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if totalBlkio {
|
|
totalStatMap["container_id"] = id
|
|
iotags := copyTags(tags)
|
|
iotags["device"] = "total"
|
|
acc.AddFields("docker_container_blkio", totalStatMap, iotags, tm)
|
|
}
|
|
}
|
|
|
|
func (d *Docker) gatherDiskUsage(acc telegraf.Accumulator, opts types.DiskUsageOptions) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(d.Timeout))
|
|
defer cancel()
|
|
|
|
du, err := d.client.DiskUsage(ctx, opts)
|
|
|
|
if err != nil {
|
|
acc.AddError(err)
|
|
}
|
|
|
|
now := time.Now()
|
|
duName := "docker_disk_usage"
|
|
|
|
// Layers size
|
|
fields := map[string]interface{}{
|
|
"layers_size": du.LayersSize,
|
|
}
|
|
|
|
tags := map[string]string{
|
|
"engine_host": d.engineHost,
|
|
"server_version": d.serverVersion,
|
|
}
|
|
|
|
acc.AddFields(duName, fields, tags, now)
|
|
|
|
// Containers
|
|
for _, cntnr := range du.Containers {
|
|
fields := map[string]interface{}{
|
|
"size_rw": cntnr.SizeRw,
|
|
"size_root_fs": cntnr.SizeRootFs,
|
|
}
|
|
|
|
imageName, imageVersion := docker.ParseImage(cntnr.Image)
|
|
|
|
tags := map[string]string{
|
|
"engine_host": d.engineHost,
|
|
"server_version": d.serverVersion,
|
|
"container_name": parseContainerName(cntnr.Names),
|
|
"container_image": imageName,
|
|
"container_version": imageVersion,
|
|
}
|
|
|
|
if d.IncludeSourceTag {
|
|
tags["source"] = hostnameFromID(cntnr.ID)
|
|
}
|
|
|
|
acc.AddFields(duName, fields, tags, now)
|
|
}
|
|
|
|
// Images
|
|
for _, image := range du.Images {
|
|
fields := map[string]interface{}{
|
|
"size": image.Size,
|
|
"shared_size": image.SharedSize,
|
|
}
|
|
|
|
tags := map[string]string{
|
|
"engine_host": d.engineHost,
|
|
"server_version": d.serverVersion,
|
|
"image_id": image.ID[7:19], // remove "sha256:" and keep the first 12 characters
|
|
}
|
|
|
|
if len(image.RepoTags) > 0 {
|
|
imageName, imageVersion := docker.ParseImage(image.RepoTags[0])
|
|
tags["image_name"] = imageName
|
|
tags["image_version"] = imageVersion
|
|
}
|
|
|
|
acc.AddFields(duName, fields, tags, now)
|
|
}
|
|
|
|
// Volumes
|
|
for _, volume := range du.Volumes {
|
|
fields := map[string]interface{}{
|
|
"size": volume.UsageData.Size,
|
|
}
|
|
|
|
tags := map[string]string{
|
|
"engine_host": d.engineHost,
|
|
"server_version": d.serverVersion,
|
|
"volume_name": volume.Name,
|
|
}
|
|
|
|
acc.AddFields(duName, fields, tags, now)
|
|
}
|
|
}
|
|
|
|
func copyTags(in map[string]string) map[string]string {
|
|
out := make(map[string]string)
|
|
for k, v := range in {
|
|
out[k] = v
|
|
}
|
|
return out
|
|
}
|
|
|
|
// Parses the human-readable size string into the amount it represents.
|
|
func parseSize(sizeStr string) (int64, error) {
|
|
matches := sizeRegex.FindStringSubmatch(sizeStr)
|
|
if len(matches) != 4 {
|
|
return -1, fmt.Errorf("invalid size: %s", sizeStr)
|
|
}
|
|
|
|
size, err := strconv.ParseFloat(matches[1], 64)
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
|
|
uMap := map[string]int64{"k": KB, "m": MB, "g": GB, "t": TB, "p": PB}
|
|
unitPrefix := strings.ToLower(matches[3])
|
|
if mul, ok := uMap[unitPrefix]; ok {
|
|
size *= float64(mul)
|
|
}
|
|
|
|
return int64(size), nil
|
|
}
|
|
|
|
func (d *Docker) createContainerFilters() error {
|
|
// Backwards compatibility for deprecated `container_names` parameter.
|
|
if len(d.ContainerNames) > 0 {
|
|
d.ContainerInclude = append(d.ContainerInclude, d.ContainerNames...)
|
|
}
|
|
|
|
containerFilter, err := filter.NewIncludeExcludeFilter(d.ContainerInclude, d.ContainerExclude)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
d.containerFilter = containerFilter
|
|
return nil
|
|
}
|
|
|
|
func (d *Docker) createLabelFilters() error {
|
|
labelFilter, err := filter.NewIncludeExcludeFilter(d.LabelInclude, d.LabelExclude)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
d.labelFilter = labelFilter
|
|
return nil
|
|
}
|
|
|
|
func (d *Docker) createContainerStateFilters() error {
|
|
if len(d.ContainerStateInclude) == 0 && len(d.ContainerStateExclude) == 0 {
|
|
d.ContainerStateInclude = []string{"running"}
|
|
}
|
|
stateFilter, err := filter.NewIncludeExcludeFilter(d.ContainerStateInclude, d.ContainerStateExclude)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
d.stateFilter = stateFilter
|
|
return nil
|
|
}
|
|
|
|
func (d *Docker) getNewClient() (dockerClient, error) {
|
|
if d.Endpoint == "ENV" {
|
|
return d.newEnvClient()
|
|
}
|
|
|
|
tlsConfig, err := d.ClientConfig.TLSConfig()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return d.newClient(d.Endpoint, tlsConfig)
|
|
}
|
|
|
|
func init() {
|
|
inputs.Add("docker", func() telegraf.Input {
|
|
return &Docker{
|
|
PerDevice: true,
|
|
PerDeviceInclude: []string{"cpu"},
|
|
TotalInclude: []string{"cpu", "blkio", "network"},
|
|
Timeout: config.Duration(time.Second * 5),
|
|
Endpoint: defaultEndpoint,
|
|
newEnvClient: newEnvClient,
|
|
newClient: newClient,
|
|
filtersCreated: false,
|
|
}
|
|
})
|
|
}
|