258 lines
4.9 KiB
Go
258 lines
4.9 KiB
Go
//go:generate ../../../tools/readme_config_includer/generator
|
|
package aurora
|
|
|
|
import (
|
|
"context"
|
|
_ "embed"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"net/url"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/config"
|
|
"github.com/influxdata/telegraf/plugins/common/tls"
|
|
"github.com/influxdata/telegraf/plugins/inputs"
|
|
)
|
|
|
|
//go:embed sample.conf
|
|
var sampleConfig string
|
|
|
|
type roleType int
|
|
|
|
const (
|
|
unknown roleType = iota
|
|
leader
|
|
follower
|
|
)
|
|
|
|
func (r roleType) String() string {
|
|
switch r {
|
|
case leader:
|
|
return "leader"
|
|
case follower:
|
|
return "follower"
|
|
default:
|
|
return "unknown"
|
|
}
|
|
}
|
|
|
|
var (
|
|
defaultTimeout = 5 * time.Second
|
|
defaultRoles = []string{"leader", "follower"}
|
|
)
|
|
|
|
type vars map[string]interface{}
|
|
|
|
type Aurora struct {
|
|
Schedulers []string `toml:"schedulers"`
|
|
Roles []string `toml:"roles"`
|
|
Timeout config.Duration `toml:"timeout"`
|
|
Username string `toml:"username"`
|
|
Password string `toml:"password"`
|
|
tls.ClientConfig
|
|
|
|
client *http.Client
|
|
urls []*url.URL
|
|
}
|
|
|
|
func (*Aurora) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
func (a *Aurora) Gather(acc telegraf.Accumulator) error {
|
|
if a.client == nil {
|
|
err := a.initialize()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(a.Timeout))
|
|
defer cancel()
|
|
|
|
var wg sync.WaitGroup
|
|
for _, u := range a.urls {
|
|
wg.Add(1)
|
|
go func(u *url.URL) {
|
|
defer wg.Done()
|
|
role, err := a.gatherRole(ctx, u)
|
|
if err != nil {
|
|
acc.AddError(fmt.Errorf("%s: %w", u, err))
|
|
return
|
|
}
|
|
|
|
if !a.roleEnabled(role) {
|
|
return
|
|
}
|
|
|
|
err = a.gatherScheduler(ctx, u, role, acc)
|
|
if err != nil {
|
|
acc.AddError(fmt.Errorf("%s: %w", u, err))
|
|
}
|
|
}(u)
|
|
}
|
|
wg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (a *Aurora) initialize() error {
|
|
tlsCfg, err := a.ClientConfig.TLSConfig()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
client := &http.Client{
|
|
Transport: &http.Transport{
|
|
Proxy: http.ProxyFromEnvironment,
|
|
TLSClientConfig: tlsCfg,
|
|
},
|
|
}
|
|
|
|
urls := make([]*url.URL, 0, len(a.Schedulers))
|
|
for _, s := range a.Schedulers {
|
|
loc, err := url.Parse(s)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
urls = append(urls, loc)
|
|
}
|
|
|
|
if a.Timeout < config.Duration(time.Second) {
|
|
a.Timeout = config.Duration(defaultTimeout)
|
|
}
|
|
|
|
if len(a.Roles) == 0 {
|
|
a.Roles = defaultRoles
|
|
}
|
|
|
|
a.client = client
|
|
a.urls = urls
|
|
return nil
|
|
}
|
|
|
|
func (a *Aurora) roleEnabled(role roleType) bool {
|
|
if len(a.Roles) == 0 {
|
|
return true
|
|
}
|
|
|
|
for _, v := range a.Roles {
|
|
if role.String() == v {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (a *Aurora) gatherRole(ctx context.Context, origin *url.URL) (roleType, error) {
|
|
loc := *origin
|
|
loc.Path = "leaderhealth"
|
|
req, err := http.NewRequest("GET", loc.String(), nil)
|
|
if err != nil {
|
|
return unknown, err
|
|
}
|
|
|
|
if a.Username != "" || a.Password != "" {
|
|
req.SetBasicAuth(a.Username, a.Password)
|
|
}
|
|
req.Header.Add("Accept", "text/plain")
|
|
|
|
resp, err := a.client.Do(req.WithContext(ctx))
|
|
if err != nil {
|
|
return unknown, err
|
|
}
|
|
if err := resp.Body.Close(); err != nil {
|
|
return unknown, fmt.Errorf("closing body failed: %w", err)
|
|
}
|
|
|
|
switch resp.StatusCode {
|
|
case http.StatusOK:
|
|
return leader, nil
|
|
case http.StatusBadGateway:
|
|
fallthrough
|
|
case http.StatusServiceUnavailable:
|
|
return follower, nil
|
|
default:
|
|
return unknown, fmt.Errorf("%v", resp.Status)
|
|
}
|
|
}
|
|
|
|
func (a *Aurora) gatherScheduler(
|
|
ctx context.Context, origin *url.URL, role roleType, acc telegraf.Accumulator,
|
|
) error {
|
|
loc := *origin
|
|
loc.Path = "vars.json"
|
|
req, err := http.NewRequest("GET", loc.String(), nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if a.Username != "" || a.Password != "" {
|
|
req.SetBasicAuth(a.Username, a.Password)
|
|
}
|
|
req.Header.Add("Accept", "application/json")
|
|
|
|
resp, err := a.client.Do(req.WithContext(ctx))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return fmt.Errorf("%v", resp.Status)
|
|
}
|
|
|
|
var metrics vars
|
|
decoder := json.NewDecoder(resp.Body)
|
|
decoder.UseNumber()
|
|
err = decoder.Decode(&metrics)
|
|
if err != nil {
|
|
return fmt.Errorf("decoding response: %w", err)
|
|
}
|
|
|
|
var fields = make(map[string]interface{}, len(metrics))
|
|
for k, v := range metrics {
|
|
switch v := v.(type) {
|
|
case json.Number:
|
|
// Aurora encodes numbers as you would specify them as a literal,
|
|
// use this to determine if a value is a float or int.
|
|
if strings.ContainsAny(v.String(), ".eE") {
|
|
fv, err := v.Float64()
|
|
if err != nil {
|
|
acc.AddError(err)
|
|
continue
|
|
}
|
|
fields[k] = fv
|
|
} else {
|
|
fi, err := v.Int64()
|
|
if err != nil {
|
|
acc.AddError(err)
|
|
continue
|
|
}
|
|
fields[k] = fi
|
|
}
|
|
default:
|
|
continue
|
|
}
|
|
}
|
|
|
|
acc.AddFields("aurora",
|
|
fields,
|
|
map[string]string{
|
|
"scheduler": origin.String(),
|
|
"role": role.String(),
|
|
},
|
|
)
|
|
return nil
|
|
}
|
|
|
|
func init() {
|
|
inputs.Add("aurora", func() telegraf.Input {
|
|
return &Aurora{}
|
|
})
|
|
}
|