1
0
Fork 0
telegraf/plugins/inputs/nginx_sts/nginx_sts.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

291 lines
8.6 KiB
Go

//go:generate ../../../tools/readme_config_includer/generator
package nginx_sts
import (
"bufio"
_ "embed"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"net/url"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)
//go:embed sample.conf
var sampleConfig string
type NginxSTS struct {
Urls []string `toml:"urls"`
ResponseTimeout config.Duration `toml:"response_timeout"`
tls.ClientConfig
client *http.Client
}
func (*NginxSTS) SampleConfig() string {
return sampleConfig
}
func (n *NginxSTS) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
// Create an HTTP client that is re-used for each
// collection interval
if n.client == nil {
client, err := n.createHTTPClient()
if err != nil {
return err
}
n.client = client
}
for _, u := range n.Urls {
addr, err := url.Parse(u)
if err != nil {
acc.AddError(fmt.Errorf("unable to parse address %q: %w", u, err))
continue
}
wg.Add(1)
go func(addr *url.URL) {
defer wg.Done()
acc.AddError(n.gatherURL(addr, acc))
}(addr)
}
wg.Wait()
return nil
}
func (n *NginxSTS) createHTTPClient() (*http.Client, error) {
if n.ResponseTimeout < config.Duration(time.Second) {
n.ResponseTimeout = config.Duration(time.Second * 5)
}
tlsConfig, err := n.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
Timeout: time.Duration(n.ResponseTimeout),
}
return client, nil
}
func (n *NginxSTS) gatherURL(addr *url.URL, acc telegraf.Accumulator) error {
resp, err := n.client.Get(addr.String())
if err != nil {
return fmt.Errorf("error making HTTP request to %q: %w", addr.String(), err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status %s", addr.String(), resp.Status)
}
contentType := strings.Split(resp.Header.Get("Content-Type"), ";")[0]
switch contentType {
case "application/json":
return gatherStatusURL(bufio.NewReader(resp.Body), getTags(addr), acc)
default:
return fmt.Errorf("%s returned unexpected content type %s", addr.String(), contentType)
}
}
type nginxSTSResponse struct {
Connections struct {
Active uint64 `json:"active"`
Reading uint64 `json:"reading"`
Writing uint64 `json:"writing"`
Waiting uint64 `json:"waiting"`
Accepted uint64 `json:"accepted"`
Handled uint64 `json:"handled"`
Requests uint64 `json:"requests"`
} `json:"connections"`
Hostname string `json:"hostName"`
StreamFilterZones map[string]map[string]server `json:"streamFilterZones"`
StreamServerZones map[string]server `json:"streamServerZones"`
StreamUpstreamZones map[string][]upstream `json:"streamUpstreamZones"`
}
type server struct {
ConnectCounter uint64 `json:"connectCounter"`
InBytes uint64 `json:"inBytes"`
OutBytes uint64 `json:"outBytes"`
SessionMsecCounter uint64 `json:"sessionMsecCounter"`
SessionMsec uint64 `json:"sessionMsec"`
Responses struct {
OneXx uint64 `json:"1xx"`
TwoXx uint64 `json:"2xx"`
ThreeXx uint64 `json:"3xx"`
FourXx uint64 `json:"4xx"`
FiveXx uint64 `json:"5xx"`
} `json:"responses"`
}
type upstream struct {
Server string `json:"server"`
ConnectCounter uint64 `json:"connectCounter"`
InBytes uint64 `json:"inBytes"`
OutBytes uint64 `json:"outBytes"`
Responses struct {
OneXx uint64 `json:"1xx"`
TwoXx uint64 `json:"2xx"`
ThreeXx uint64 `json:"3xx"`
FourXx uint64 `json:"4xx"`
FiveXx uint64 `json:"5xx"`
} `json:"responses"`
SessionMsecCounter uint64 `json:"sessionMsecCounter"`
SessionMsec uint64 `json:"sessionMsec"`
USessionMsecCounter uint64 `json:"uSessionMsecCounter"`
USessionMsec uint64 `json:"uSessionMsec"`
UConnectMsecCounter uint64 `json:"uConnectMsecCounter"`
UConnectMsec uint64 `json:"uConnectMsec"`
UFirstByteMsecCounter uint64 `json:"uFirstByteMsecCounter"`
UFirstByteMsec uint64 `json:"uFirstByteMsec"`
Weight uint64 `json:"weight"`
MaxFails uint64 `json:"maxFails"`
FailTimeout uint64 `json:"failTimeout"`
Backup bool `json:"backup"`
Down bool `json:"down"`
}
func gatherStatusURL(r *bufio.Reader, tags map[string]string, acc telegraf.Accumulator) error {
dec := json.NewDecoder(r)
status := &nginxSTSResponse{}
if err := dec.Decode(status); err != nil {
return errors.New("error while decoding JSON response")
}
acc.AddFields("nginx_sts_connections", map[string]interface{}{
"active": status.Connections.Active,
"reading": status.Connections.Reading,
"writing": status.Connections.Writing,
"waiting": status.Connections.Waiting,
"accepted": status.Connections.Accepted,
"handled": status.Connections.Handled,
"requests": status.Connections.Requests,
}, tags)
for zoneName, zone := range status.StreamServerZones {
zoneTags := make(map[string]string, len(tags)+1)
for k, v := range tags {
zoneTags[k] = v
}
zoneTags["zone"] = zoneName
acc.AddFields("nginx_sts_server", map[string]interface{}{
"connects": zone.ConnectCounter,
"in_bytes": zone.InBytes,
"out_bytes": zone.OutBytes,
"session_msec_counter": zone.SessionMsecCounter,
"session_msec": zone.SessionMsec,
"response_1xx_count": zone.Responses.OneXx,
"response_2xx_count": zone.Responses.TwoXx,
"response_3xx_count": zone.Responses.ThreeXx,
"response_4xx_count": zone.Responses.FourXx,
"response_5xx_count": zone.Responses.FiveXx,
}, zoneTags)
}
for filterName, filters := range status.StreamFilterZones {
for filterKey, upstream := range filters {
filterTags := make(map[string]string, len(tags)+2)
for k, v := range tags {
filterTags[k] = v
}
filterTags["filter_key"] = filterKey
filterTags["filter_name"] = filterName
acc.AddFields("nginx_sts_filter", map[string]interface{}{
"connects": upstream.ConnectCounter,
"in_bytes": upstream.InBytes,
"out_bytes": upstream.OutBytes,
"session_msec_counter": upstream.SessionMsecCounter,
"session_msec": upstream.SessionMsec,
"response_1xx_count": upstream.Responses.OneXx,
"response_2xx_count": upstream.Responses.TwoXx,
"response_3xx_count": upstream.Responses.ThreeXx,
"response_4xx_count": upstream.Responses.FourXx,
"response_5xx_count": upstream.Responses.FiveXx,
}, filterTags)
}
}
for upstreamName, upstreams := range status.StreamUpstreamZones {
for _, upstream := range upstreams {
upstreamServerTags := make(map[string]string, len(tags)+2)
for k, v := range tags {
upstreamServerTags[k] = v
}
upstreamServerTags["upstream"] = upstreamName
upstreamServerTags["upstream_address"] = upstream.Server
acc.AddFields("nginx_sts_upstream", map[string]interface{}{
"connects": upstream.ConnectCounter,
"session_msec": upstream.SessionMsec,
"session_msec_counter": upstream.SessionMsecCounter,
"upstream_session_msec": upstream.USessionMsec,
"upstream_session_msec_counter": upstream.USessionMsecCounter,
"upstream_connect_msec": upstream.UConnectMsec,
"upstream_connect_msec_counter": upstream.UConnectMsecCounter,
"upstream_firstbyte_msec": upstream.UFirstByteMsec,
"upstream_firstbyte_msec_counter": upstream.UFirstByteMsecCounter,
"in_bytes": upstream.InBytes,
"out_bytes": upstream.OutBytes,
"response_1xx_count": upstream.Responses.OneXx,
"response_2xx_count": upstream.Responses.TwoXx,
"response_3xx_count": upstream.Responses.ThreeXx,
"response_4xx_count": upstream.Responses.FourXx,
"response_5xx_count": upstream.Responses.FiveXx,
"weight": upstream.Weight,
"max_fails": upstream.MaxFails,
"fail_timeout": upstream.FailTimeout,
"backup": upstream.Backup,
"down": upstream.Down,
}, upstreamServerTags)
}
}
return nil
}
// Get tag(s) for the nginx plugin
func getTags(addr *url.URL) map[string]string {
h := addr.Host
host, port, err := net.SplitHostPort(h)
if err != nil {
host = addr.Host
if addr.Scheme == "http" {
port = "80"
} else if addr.Scheme == "https" {
port = "443"
} else {
port = ""
}
}
return map[string]string{"source": host, "port": port}
}
func init() {
inputs.Add("nginx_sts", func() telegraf.Input {
return &NginxSTS{}
})
}