//go:generate ../../../tools/readme_config_includer/generator package twemproxy import ( _ "embed" "encoding/json" "errors" "io" "net" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) //go:embed sample.conf var sampleConfig string type Twemproxy struct { Addr string `toml:"addr"` Pools []string `toml:"pools"` } func (*Twemproxy) SampleConfig() string { return sampleConfig } func (t *Twemproxy) Gather(acc telegraf.Accumulator) error { conn, err := net.DialTimeout("tcp", t.Addr, 1*time.Second) if err != nil { return err } body, err := io.ReadAll(conn) if err != nil { return err } var stats map[string]interface{} if err = json.Unmarshal(body, &stats); err != nil { return errors.New("error decoding JSON response") } tags := make(map[string]string) tags["twemproxy"] = t.Addr t.processStat(acc, tags, stats) return nil } // Process Twemproxy server stats func (t *Twemproxy) processStat(acc telegraf.Accumulator, tags map[string]string, data map[string]interface{}) { if source, ok := data["source"]; ok { if val, ok := source.(string); ok { tags["source"] = val } } fields := make(map[string]interface{}) metrics := []string{"total_connections", "curr_connections", "timestamp"} for _, m := range metrics { if value, ok := data[m]; ok { if val, ok := value.(float64); ok { fields[m] = val } } } acc.AddFields("twemproxy", fields, tags) for _, pool := range t.Pools { if poolStat, ok := data[pool]; ok { if data, ok := poolStat.(map[string]interface{}); ok { poolTags := copyTags(tags) poolTags["pool"] = pool processPool(acc, poolTags, data) } } } } // Process pool data in Twemproxy stats func processPool(acc telegraf.Accumulator, tags map[string]string, data map[string]interface{}) { serverTags := make(map[string]map[string]string) fields := make(map[string]interface{}) for key, value := range data { switch key { case "client_connections", "forward_error", "client_err", "server_ejects", "fragments", "client_eof": if val, ok := value.(float64); ok { fields[key] = val } default: if data, ok := value.(map[string]interface{}); ok { if _, ok := serverTags[key]; !ok { serverTags[key] = copyTags(tags) serverTags[key]["server"] = key } processServer(acc, serverTags[key], data) } } } acc.AddFields("twemproxy_pool", fields, tags) } // Process backend server(redis/memcached) stats func processServer(acc telegraf.Accumulator, tags map[string]string, data map[string]interface{}) { fields := make(map[string]interface{}) for key, value := range data { if val, ok := value.(float64); ok { fields[key] = val } } acc.AddFields("twemproxy_pool_server", fields, tags) } // Tags is not expected to be mutated after passing to Add. func copyTags(tags map[string]string) map[string]string { newTags := make(map[string]string) for k, v := range tags { newTags[k] = v } return newTags } func init() { inputs.Add("twemproxy", func() telegraf.Input { return &Twemproxy{} }) }