//go:generate ../../../tools/readme_config_includer/generator package solr import ( _ "embed" "encoding/json" "fmt" "net/http" "sync" "time" "github.com/coreos/go-semver/semver" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" ) //go:embed sample.conf var sampleConfig string type Solr struct { Servers []string `toml:"servers"` Username string `toml:"username"` Password string `toml:"password"` HTTPTimeout config.Duration `toml:"timeout"` Cores []string `toml:"cores"` Log telegraf.Logger `toml:"-"` client *http.Client configs map[string]*apiConfig filter filter.Filter } func (*Solr) SampleConfig() string { return sampleConfig } func (s *Solr) Init() error { // Setup client to do the queries s.client = &http.Client{ Transport: &http.Transport{ ResponseHeaderTimeout: time.Duration(s.HTTPTimeout), }, Timeout: time.Duration(s.HTTPTimeout), } // Prepare filter for the cores to query f, err := filter.Compile(s.Cores) if err != nil { return err } s.filter = f // Allocate config cache s.configs = make(map[string]*apiConfig, len(s.Servers)) return nil } func (s *Solr) Start(telegraf.Accumulator) error { for _, server := range s.Servers { // Simply fill the cache for all available servers _ = s.getAPIConfig(server) } return nil } func (s *Solr) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup for _, srv := range s.Servers { wg.Add(1) go func(server string) { defer wg.Done() // Check the server version from cache or query one cfg := s.getAPIConfig(server) s.collect(acc, cfg, server) }(srv) } wg.Wait() return nil } func (*Solr) Stop() {} func (s *Solr) getAPIConfig(server string) *apiConfig { if cfg, found := s.configs[server]; found { return cfg } version, err := s.determineServerAPIVersion(server) if err != nil { s.Log.Errorf("Getting version for %q failed: %v", server, err) // Exit early and do not fill the cache as the server might not be // reachable yet. return newAPIv1Config() } s.Log.Debugf("Found API version %d for server %q...", version, server) switch version { case 0: s.Log.Warn("Unable to determine API version! Using API v1...") s.configs[server] = newAPIv1Config() case 1: s.configs[server] = newAPIv1Config() case 2: s.configs[server] = newAPIv2Config() default: s.Log.Warnf("Unknown API version %q! Using latest known", version) s.configs[server] = newAPIv2Config() } return s.configs[server] } func (s *Solr) collect(acc telegraf.Accumulator, cfg *apiConfig, server string) { now := time.Now() var coreStatus adminCoresStatus if err := s.query(cfg.adminEndpoint(server), &coreStatus); err != nil { acc.AddError(err) return } var wg sync.WaitGroup for core, metrics := range coreStatus.Status { fields := map[string]interface{}{ "deleted_docs": metrics.Index.DeletedDocs, "max_docs": metrics.Index.MaxDoc, "num_docs": metrics.Index.NumDocs, "size_in_bytes": metrics.Index.SizeInBytes, } tags := map[string]string{"core": core} acc.AddFields("solr_admin", fields, tags, now) if s.filter != nil && !s.filter.Match(core) { continue } wg.Add(1) go func(server string, core string) { defer wg.Done() var data mBeansData if err := s.query(cfg.mbeansEndpoint(server, core), &data); err != nil { acc.AddError(err) return } cfg.parseCore(acc, core, &data, now) cfg.parseQueryHandler(acc, core, &data, now) cfg.parseUpdateHandler(acc, core, &data, now) cfg.parseCache(acc, core, &data, now) }(server, core) } wg.Wait() } func (s *Solr) query(endpoint string, v interface{}) error { req, reqErr := http.NewRequest(http.MethodGet, endpoint, nil) if reqErr != nil { return reqErr } if s.Username != "" { req.SetBasicAuth(s.Username, s.Password) } req.Header.Set("User-Agent", internal.ProductToken()) r, err := s.client.Do(req) if err != nil { return err } defer r.Body.Close() if r.StatusCode != http.StatusOK { return fmt.Errorf("solr: API endpoint %q responded with %q", endpoint, r.Status) } return json.NewDecoder(r.Body).Decode(v) } func (s *Solr) determineServerAPIVersion(server string) (int, error) { endpoint := server + "/solr/admin/info/system?wt=json" var info map[string]interface{} if err := s.query(endpoint, &info); err != nil { return 0, err } lraw, found := info["lucene"] if !found { return 0, nil } lucene, ok := lraw.(map[string]interface{}) if !ok { return 0, nil } vraw, ok := lucene["solr-spec-version"] if !ok { return 0, nil } v, ok := vraw.(string) if !ok { return 0, nil } // API version 1 is required until v7.x version := semver.New(v) if version.LessThan(semver.Version{Major: 7}) { return 1, nil } // Starting from 7.0 API version 2 has to be used to get the UPDATE and // QUERY metrics. return 2, nil } func init() { inputs.Add("solr", func() telegraf.Input { return &Solr{ HTTPTimeout: config.Duration(time.Second * 5), } }) }