1
0
Fork 0
telegraf/plugins/inputs/elasticsearch_query/elasticsearch_query.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

248 lines
6.4 KiB
Go

//go:generate ../../../tools/readme_config_includer/generator
package elasticsearch_query
import (
"context"
_ "embed"
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"sync"
"time"
elastic5 "gopkg.in/olivere/elastic.v5"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
common_http "github.com/influxdata/telegraf/plugins/common/http"
"github.com/influxdata/telegraf/plugins/inputs"
)
//go:embed sample.conf
var sampleConfig string
type ElasticsearchQuery struct {
URLs []string `toml:"urls"`
Username string `toml:"username"`
Password string `toml:"password"`
EnableSniffer bool `toml:"enable_sniffer"`
HealthCheckInterval config.Duration `toml:"health_check_interval"`
Aggregations []esAggregation `toml:"aggregation"`
Log telegraf.Logger `toml:"-"`
httpclient *http.Client
common_http.HTTPClientConfig
esClient *elastic5.Client
}
type esAggregation struct {
Index string `toml:"index"`
MeasurementName string `toml:"measurement_name"`
DateField string `toml:"date_field"`
DateFieldFormat string `toml:"date_field_custom_format"`
QueryPeriod config.Duration `toml:"query_period"`
FilterQuery string `toml:"filter_query"`
MetricFields []string `toml:"metric_fields"`
MetricFunction string `toml:"metric_function"`
Tags []string `toml:"tags"`
IncludeMissingTag bool `toml:"include_missing_tag"`
MissingTagValue string `toml:"missing_tag_value"`
mapMetricFields map[string]string
aggregationQueryList []aggregationQueryData
}
func (*ElasticsearchQuery) SampleConfig() string {
return sampleConfig
}
func (e *ElasticsearchQuery) Init() error {
if e.URLs == nil {
return errors.New("elasticsearch urls is not defined")
}
err := e.connectToES()
if err != nil {
e.Log.Errorf("error connecting to elasticsearch: %s", err)
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout))
defer cancel()
for i, agg := range e.Aggregations {
if agg.MeasurementName == "" {
return errors.New("field 'measurement_name' is not set")
}
if agg.DateField == "" {
return errors.New("field 'date_field' is not set")
}
err = e.initAggregation(ctx, agg, i)
if err != nil {
e.Log.Error(err.Error())
return nil
}
}
return nil
}
func (*ElasticsearchQuery) Start(telegraf.Accumulator) error {
return nil
}
// Gather writes the results of the queries from Elasticsearch to the Accumulator.
func (e *ElasticsearchQuery) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
err := e.connectToES()
if err != nil {
return err
}
for i, agg := range e.Aggregations {
wg.Add(1)
go func(agg esAggregation, i int) {
defer wg.Done()
err := e.esAggregationQuery(acc, agg, i)
if err != nil {
acc.AddError(fmt.Errorf("elasticsearch query aggregation %s: %w", agg.MeasurementName, err))
}
}(agg, i)
}
wg.Wait()
return nil
}
func (e *ElasticsearchQuery) Stop() {
if e.httpclient != nil {
e.httpclient.CloseIdleConnections()
}
}
func (e *ElasticsearchQuery) initAggregation(ctx context.Context, agg esAggregation, i int) (err error) {
// retrieve field mapping and build queries only once
agg.mapMetricFields, err = e.getMetricFields(ctx, agg)
if err != nil {
return fmt.Errorf("not possible to retrieve fields: %w", err)
}
for _, metricField := range agg.MetricFields {
if _, ok := agg.mapMetricFields[metricField]; !ok {
return fmt.Errorf("metric field %q not found on index %q", metricField, agg.Index)
}
}
err = agg.buildAggregationQuery()
if err != nil {
return err
}
e.Aggregations[i] = agg
return nil
}
func (e *ElasticsearchQuery) connectToES() error {
var clientOptions []elastic5.ClientOptionFunc
if e.esClient != nil {
if e.esClient.IsRunning() {
return nil
}
}
if e.httpclient == nil {
httpclient, err := e.createHTTPClient()
if err != nil {
return err
}
e.httpclient = httpclient
}
clientOptions = append(clientOptions,
elastic5.SetHttpClient(e.httpclient),
elastic5.SetSniff(e.EnableSniffer),
elastic5.SetURL(e.URLs...),
elastic5.SetHealthcheckInterval(time.Duration(e.HealthCheckInterval)),
)
if e.Username != "" {
clientOptions = append(clientOptions, elastic5.SetBasicAuth(e.Username, e.Password))
}
if time.Duration(e.HealthCheckInterval) == 0 {
clientOptions = append(clientOptions, elastic5.SetHealthcheck(false))
}
client, err := elastic5.NewClient(clientOptions...)
if err != nil {
return err
}
// check for ES version on first node
esVersion, err := client.ElasticsearchVersion(e.URLs[0])
if err != nil {
return fmt.Errorf("elasticsearch version check failed: %w", err)
}
esVersionSplit := strings.Split(esVersion, ".")
// quit if ES version is not supported
if len(esVersionSplit) == 0 {
return errors.New("elasticsearch version check failed")
}
i, err := strconv.Atoi(esVersionSplit[0])
if err != nil || i < 5 || i > 6 {
return fmt.Errorf("elasticsearch version %s not supported (currently supported versions are 5.x and 6.x)", esVersion)
}
e.esClient = client
return nil
}
func (e *ElasticsearchQuery) createHTTPClient() (*http.Client, error) {
ctx := context.Background()
return e.HTTPClientConfig.CreateClient(ctx, e.Log)
}
func (e *ElasticsearchQuery) esAggregationQuery(acc telegraf.Accumulator, aggregation esAggregation, i int) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout))
defer cancel()
// try to init the aggregation query if it is not done already
if aggregation.aggregationQueryList == nil {
err := e.initAggregation(ctx, aggregation, i)
if err != nil {
return err
}
aggregation = e.Aggregations[i]
}
searchResult, err := e.runAggregationQuery(ctx, aggregation)
if err != nil {
return err
}
if searchResult.Aggregations == nil {
parseSimpleResult(acc, aggregation.MeasurementName, searchResult)
return nil
}
return parseAggregationResult(acc, aggregation.aggregationQueryList, searchResult)
}
func init() {
inputs.Add("elasticsearch_query", func() telegraf.Input {
return &ElasticsearchQuery{
HealthCheckInterval: config.Duration(time.Second * 10),
HTTPClientConfig: common_http.HTTPClientConfig{
ResponseHeaderTimeout: config.Duration(5 * time.Second),
Timeout: config.Duration(5 * time.Second),
},
}
})
}