//go:generate ../../../tools/readme_config_includer/generator package kubernetes import ( "context" _ "embed" "encoding/json" "fmt" "net/http" "os" "strings" "sync" "time" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" ) //go:embed sample.conf var sampleConfig string const ( defaultServiceAccountPath = "/var/run/secrets/kubernetes.io/serviceaccount/token" ) // Kubernetes represents the config object for the plugin type Kubernetes struct { URL string `toml:"url"` BearerToken string `toml:"bearer_token"` BearerTokenString string `toml:"bearer_token_string" deprecated:"1.24.0;1.35.0;use 'BearerToken' with a file instead"` NodeMetricName string `toml:"node_metric_name"` LabelInclude []string `toml:"label_include"` LabelExclude []string `toml:"label_exclude"` ResponseTimeout config.Duration `toml:"response_timeout"` Log telegraf.Logger `toml:"-"` tls.ClientConfig labelFilter filter.Filter httpClient *http.Client } func (*Kubernetes) SampleConfig() string { return sampleConfig } func (k *Kubernetes) Init() error { // If neither are provided, use the default service account. if k.BearerToken == "" && k.BearerTokenString == "" { k.BearerToken = defaultServiceAccountPath } labelFilter, err := filter.NewIncludeExcludeFilter(k.LabelInclude, k.LabelExclude) if err != nil { return err } k.labelFilter = labelFilter if k.URL == "" { k.InsecureSkipVerify = true } if k.NodeMetricName == "" { k.NodeMetricName = "kubernetes_node" } return nil } func (k *Kubernetes) Gather(acc telegraf.Accumulator) error { if k.URL != "" { acc.AddError(k.gatherSummary(k.URL, acc)) return nil } var wg sync.WaitGroup nodeBaseURLs, err := getNodeURLs(k.Log) if err != nil { return err } for _, url := range nodeBaseURLs { wg.Add(1) go func(url string) { defer wg.Done() acc.AddError(k.gatherSummary(url, acc)) }(url) } wg.Wait() return nil } func getNodeURLs(log telegraf.Logger) ([]string, error) { cfg, err := rest.InClusterConfig() if err != nil { return nil, err } client, err := kubernetes.NewForConfig(cfg) if err != nil { return nil, err } nodes, err := client.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) if err != nil { return nil, err } nodeUrls := make([]string, 0, len(nodes.Items)) for i := range nodes.Items { n := &nodes.Items[i] address := getNodeAddress(n.Status.Addresses) if address == "" { log.Warnf("Unable to node addresses for Node %q", n.Name) continue } nodeUrls = append(nodeUrls, "https://"+address+":10250") } return nodeUrls, nil } // Prefer internal addresses, if none found, use ExternalIP func getNodeAddress(addresses []v1.NodeAddress) string { extAddresses := make([]string, 0) for _, addr := range addresses { if addr.Type == v1.NodeInternalIP { return addr.Address } extAddresses = append(extAddresses, addr.Address) } if len(extAddresses) > 0 { return extAddresses[0] } return "" } func (k *Kubernetes) gatherSummary(baseURL string, acc telegraf.Accumulator) error { summaryMetrics := &summaryMetrics{} err := k.loadJSON(baseURL+"/stats/summary", summaryMetrics) if err != nil { return err } podInfos, err := k.gatherPodInfo(baseURL) if err != nil { return err } buildSystemContainerMetrics(summaryMetrics, acc) buildNodeMetrics(summaryMetrics, acc, k.NodeMetricName) buildPodMetrics(summaryMetrics, podInfos, k.labelFilter, acc) return nil } func buildSystemContainerMetrics(summaryMetrics *summaryMetrics, acc telegraf.Accumulator) { for _, container := range summaryMetrics.Node.SystemContainers { tags := map[string]string{ "node_name": summaryMetrics.Node.NodeName, "container_name": container.Name, } fields := make(map[string]interface{}) fields["cpu_usage_nanocores"] = container.CPU.UsageNanoCores fields["cpu_usage_core_nanoseconds"] = container.CPU.UsageCoreNanoSeconds fields["memory_usage_bytes"] = container.Memory.UsageBytes fields["memory_working_set_bytes"] = container.Memory.WorkingSetBytes fields["memory_rss_bytes"] = container.Memory.RSSBytes fields["memory_page_faults"] = container.Memory.PageFaults fields["memory_major_page_faults"] = container.Memory.MajorPageFaults fields["rootfs_available_bytes"] = container.RootFS.AvailableBytes fields["rootfs_capacity_bytes"] = container.RootFS.CapacityBytes fields["logsfs_available_bytes"] = container.LogsFS.AvailableBytes fields["logsfs_capacity_bytes"] = container.LogsFS.CapacityBytes acc.AddFields("kubernetes_system_container", fields, tags) } } func buildNodeMetrics(summaryMetrics *summaryMetrics, acc telegraf.Accumulator, metricName string) { tags := map[string]string{ "node_name": summaryMetrics.Node.NodeName, } fields := make(map[string]interface{}) fields["cpu_usage_nanocores"] = summaryMetrics.Node.CPU.UsageNanoCores fields["cpu_usage_core_nanoseconds"] = summaryMetrics.Node.CPU.UsageCoreNanoSeconds fields["memory_available_bytes"] = summaryMetrics.Node.Memory.AvailableBytes fields["memory_usage_bytes"] = summaryMetrics.Node.Memory.UsageBytes fields["memory_working_set_bytes"] = summaryMetrics.Node.Memory.WorkingSetBytes fields["memory_rss_bytes"] = summaryMetrics.Node.Memory.RSSBytes fields["memory_page_faults"] = summaryMetrics.Node.Memory.PageFaults fields["memory_major_page_faults"] = summaryMetrics.Node.Memory.MajorPageFaults fields["network_rx_bytes"] = summaryMetrics.Node.Network.RXBytes fields["network_rx_errors"] = summaryMetrics.Node.Network.RXErrors fields["network_tx_bytes"] = summaryMetrics.Node.Network.TXBytes fields["network_tx_errors"] = summaryMetrics.Node.Network.TXErrors fields["fs_available_bytes"] = summaryMetrics.Node.FileSystem.AvailableBytes fields["fs_capacity_bytes"] = summaryMetrics.Node.FileSystem.CapacityBytes fields["fs_used_bytes"] = summaryMetrics.Node.FileSystem.UsedBytes fields["runtime_image_fs_available_bytes"] = summaryMetrics.Node.Runtime.ImageFileSystem.AvailableBytes fields["runtime_image_fs_capacity_bytes"] = summaryMetrics.Node.Runtime.ImageFileSystem.CapacityBytes fields["runtime_image_fs_used_bytes"] = summaryMetrics.Node.Runtime.ImageFileSystem.UsedBytes acc.AddFields(metricName, fields, tags) } func (k *Kubernetes) gatherPodInfo(baseURL string) ([]item, error) { var podAPI pods err := k.loadJSON(baseURL+"/pods", &podAPI) if err != nil { return nil, err } podInfos := make([]item, 0, len(podAPI.Items)) podInfos = append(podInfos, podAPI.Items...) return podInfos, nil } func (k *Kubernetes) loadJSON(url string, v interface{}) error { var req, err = http.NewRequest("GET", url, nil) if err != nil { return err } var resp *http.Response tlsCfg, err := k.ClientConfig.TLSConfig() if err != nil { return err } if k.httpClient == nil { if k.ResponseTimeout < config.Duration(time.Second) { k.ResponseTimeout = config.Duration(time.Second * 5) } k.httpClient = &http.Client{ Transport: &http.Transport{ TLSClientConfig: tlsCfg, }, CheckRedirect: func(*http.Request, []*http.Request) error { return http.ErrUseLastResponse }, Timeout: time.Duration(k.ResponseTimeout), } } if k.BearerToken != "" { token, err := os.ReadFile(k.BearerToken) if err != nil { return err } k.BearerTokenString = strings.TrimSpace(string(token)) } req.Header.Set("Authorization", "Bearer "+k.BearerTokenString) req.Header.Add("Accept", "application/json") resp, err = k.httpClient.Do(req) if err != nil { return fmt.Errorf("error making HTTP request to %q: %w", url, err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("%s returned HTTP status %s", url, resp.Status) } err = json.NewDecoder(resp.Body).Decode(v) if err != nil { return fmt.Errorf("error parsing response: %w", err) } return nil } func buildPodMetrics(summaryMetrics *summaryMetrics, podInfo []item, labelFilter filter.Filter, acc telegraf.Accumulator) { for _, pod := range summaryMetrics.Pods { podLabels := make(map[string]string) containerImages := make(map[string]string) for _, info := range podInfo { if info.Metadata.Name == pod.PodRef.Name && info.Metadata.Namespace == pod.PodRef.Namespace { for _, v := range info.Spec.Containers { containerImages[v.Name] = v.Image } for k, v := range info.Metadata.Labels { if labelFilter.Match(k) { podLabels[k] = v } } } } for _, container := range pod.Containers { tags := map[string]string{ "node_name": summaryMetrics.Node.NodeName, "namespace": pod.PodRef.Namespace, "container_name": container.Name, "pod_name": pod.PodRef.Name, } for k, v := range containerImages { if k == container.Name { tags["image"] = v tok := strings.Split(v, ":") if len(tok) == 2 { tags["version"] = tok[1] } } } for k, v := range podLabels { tags[k] = v } fields := make(map[string]interface{}) fields["cpu_usage_nanocores"] = container.CPU.UsageNanoCores fields["cpu_usage_core_nanoseconds"] = container.CPU.UsageCoreNanoSeconds fields["memory_usage_bytes"] = container.Memory.UsageBytes fields["memory_working_set_bytes"] = container.Memory.WorkingSetBytes fields["memory_rss_bytes"] = container.Memory.RSSBytes fields["memory_page_faults"] = container.Memory.PageFaults fields["memory_major_page_faults"] = container.Memory.MajorPageFaults fields["rootfs_available_bytes"] = container.RootFS.AvailableBytes fields["rootfs_capacity_bytes"] = container.RootFS.CapacityBytes fields["rootfs_used_bytes"] = container.RootFS.UsedBytes fields["logsfs_available_bytes"] = container.LogsFS.AvailableBytes fields["logsfs_capacity_bytes"] = container.LogsFS.CapacityBytes fields["logsfs_used_bytes"] = container.LogsFS.UsedBytes acc.AddFields("kubernetes_pod_container", fields, tags) } for _, volume := range pod.Volumes { tags := map[string]string{ "node_name": summaryMetrics.Node.NodeName, "pod_name": pod.PodRef.Name, "namespace": pod.PodRef.Namespace, "volume_name": volume.Name, } for k, v := range podLabels { tags[k] = v } fields := make(map[string]interface{}) fields["available_bytes"] = volume.AvailableBytes fields["capacity_bytes"] = volume.CapacityBytes fields["used_bytes"] = volume.UsedBytes acc.AddFields("kubernetes_pod_volume", fields, tags) } tags := map[string]string{ "node_name": summaryMetrics.Node.NodeName, "pod_name": pod.PodRef.Name, "namespace": pod.PodRef.Namespace, } for k, v := range podLabels { tags[k] = v } fields := make(map[string]interface{}) fields["rx_bytes"] = pod.Network.RXBytes fields["rx_errors"] = pod.Network.RXErrors fields["tx_bytes"] = pod.Network.TXBytes fields["tx_errors"] = pod.Network.TXErrors acc.AddFields("kubernetes_pod_network", fields, tags) } } func init() { inputs.Add("kubernetes", func() telegraf.Input { return &Kubernetes{ LabelExclude: []string{"*"}, } }) }