1
0
Fork 0
telegraf/plugins/outputs/opensearch/opensearch.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

450 lines
13 KiB
Go

//go:generate ../../../tools/readme_config_includer/generator
package opensearch
import (
"bytes"
"context"
"crypto/sha256"
_ "embed"
"encoding/json"
"errors"
"fmt"
"math"
"net/http"
"strconv"
"strings"
"text/template"
"time"
"github.com/opensearch-project/opensearch-go/v2"
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
"github.com/opensearch-project/opensearch-go/v2/opensearchutil"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
)
//go:embed sample.conf
var sampleConfig string
type Opensearch struct {
Username config.Secret `toml:"username"`
Password config.Secret `toml:"password"`
AuthBearerToken config.Secret `toml:"auth_bearer_token"`
EnableGzip bool `toml:"enable_gzip"`
EnableSniffer bool `toml:"enable_sniffer"`
FloatHandling string `toml:"float_handling"`
FloatReplacement float64 `toml:"float_replacement_value"`
ForceDocumentID bool `toml:"force_document_id"`
IndexName string `toml:"index_name"`
TemplateName string `toml:"template_name"`
ManageTemplate bool `toml:"manage_template"`
OverwriteTemplate bool `toml:"overwrite_template"`
DefaultPipeline string `toml:"default_pipeline"`
UsePipeline string `toml:"use_pipeline"`
Timeout config.Duration `toml:"timeout"`
HealthCheckInterval config.Duration `toml:"health_check_interval"`
HealthCheckTimeout config.Duration `toml:"health_check_timeout"`
URLs []string `toml:"urls"`
Log telegraf.Logger `toml:"-"`
tls.ClientConfig
indexTmpl *template.Template
pipelineTmpl *template.Template
onSucc func(context.Context, opensearchutil.BulkIndexerItem, opensearchutil.BulkIndexerResponseItem)
onFail func(context.Context, opensearchutil.BulkIndexerItem, opensearchutil.BulkIndexerResponseItem, error)
osClient *opensearch.Client
}
//go:embed template.json
var indexTemplate string
type templatePart struct {
TemplatePattern string
}
func (*Opensearch) SampleConfig() string {
return sampleConfig
}
func (o *Opensearch) Init() error {
if len(o.URLs) == 0 || o.IndexName == "" {
return errors.New("opensearch urls or index_name is not defined")
}
// Determine if we should process NaN and inf values
valOptions := []string{"", "none", "drop", "replace"}
if err := choice.Check(o.FloatHandling, valOptions); err != nil {
return fmt.Errorf("config float_handling type: %w", err)
}
if o.FloatHandling == "" {
o.FloatHandling = "none"
}
indexTmpl, err := template.New("index").Parse(o.IndexName)
if err != nil {
return fmt.Errorf("error parsing index_name template: %w", err)
}
o.indexTmpl = indexTmpl
pipelineTmpl, err := template.New("index").Parse(o.UsePipeline)
if err != nil {
return fmt.Errorf("error parsing use_pipeline template: %w", err)
}
o.pipelineTmpl = pipelineTmpl
o.onSucc = func(_ context.Context, _ opensearchutil.BulkIndexerItem, res opensearchutil.BulkIndexerResponseItem) {
o.Log.Debugf("Indexed to OpenSearch with status- [%d] Result- %s DocumentID- %s ", res.Status, res.Result, res.DocumentID)
}
o.onFail = func(_ context.Context, _ opensearchutil.BulkIndexerItem, res opensearchutil.BulkIndexerResponseItem, err error) {
if err != nil {
o.Log.Errorf("error while OpenSearch bulkIndexing: %v", err)
} else {
o.Log.Errorf("error while OpenSearch bulkIndexing: %s: %s", res.Error.Type, res.Error.Reason)
}
}
if o.TemplateName == "" {
return errors.New("template_name configuration not defined")
}
return nil
}
func init() {
outputs.Add("opensearch", func() telegraf.Output {
return &Opensearch{
Timeout: config.Duration(time.Second * 5),
HealthCheckInterval: config.Duration(time.Second * 10),
HealthCheckTimeout: config.Duration(time.Second * 1),
}
})
}
func (o *Opensearch) Connect() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.Timeout))
defer cancel()
err := o.newClient()
if err != nil {
o.Log.Errorf("error creating OpenSearch client: %v", err)
}
if o.ManageTemplate {
err := o.manageTemplate(ctx)
if err != nil {
return err
}
}
_, err = o.osClient.Ping()
if err != nil {
return fmt.Errorf("unable to ping OpenSearch server: %w", err)
}
return nil
}
func (o *Opensearch) newClient() error {
username, err := o.Username.Get()
if err != nil {
return fmt.Errorf("getting username failed: %w", err)
}
defer username.Destroy()
password, err := o.Password.Get()
if err != nil {
return fmt.Errorf("getting password failed: %w", err)
}
defer password.Destroy()
tlsConfig, err := o.ClientConfig.TLSConfig()
if err != nil {
return fmt.Errorf("creating TLS config failed: %w", err)
}
clientConfig := opensearch.Config{
Addresses: o.URLs,
Username: username.String(),
Password: password.String(),
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
}
header := http.Header{}
if o.EnableGzip {
header.Add("Content-Encoding", "gzip")
header.Add("Content-Type", "application/json")
header.Add("Accept-Encoding", "gzip")
}
if !o.AuthBearerToken.Empty() {
token, err := o.AuthBearerToken.Get()
if err != nil {
return fmt.Errorf("getting token failed: %w", err)
}
header.Add("Authorization", "Bearer "+token.String())
defer token.Destroy()
}
clientConfig.Header = header
client, err := opensearch.NewClient(clientConfig)
o.osClient = client
return err
}
// getPointID generates a unique ID for a Metric Point
// Timestamp(ns),measurement name and Series Hash for compute the final
// SHA256 based hash ID
func getPointID(m telegraf.Metric) string {
var buffer bytes.Buffer
buffer.WriteString(strconv.FormatInt(m.Time().Local().UnixNano(), 10))
buffer.WriteString(m.Name())
buffer.WriteString(strconv.FormatUint(m.HashID(), 10))
return fmt.Sprintf("%x", sha256.Sum256(buffer.Bytes()))
}
func (o *Opensearch) Write(metrics []telegraf.Metric) error {
// get indexers based on unique pipeline values
indexers := getTargetIndexers(metrics, o)
if len(indexers) == 0 {
return errors.New("failed to instantiate OpenSearch bulkindexer")
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.Timeout))
defer cancel()
for _, metric := range metrics {
var name = metric.Name()
// index name has to be re-evaluated each time for telegraf
// to send the metric to the correct time-based index
indexName, err := o.GetIndexName(metric)
if err != nil {
return fmt.Errorf("generating indexname failed: %w", err)
}
// Handle NaN and inf field-values
fields := make(map[string]interface{})
for k, value := range metric.Fields() {
v, ok := value.(float64)
if !ok || o.FloatHandling == "none" || !(math.IsNaN(v) || math.IsInf(v, 0)) {
fields[k] = value
continue
}
if o.FloatHandling == "drop" {
continue
}
if math.IsNaN(v) || math.IsInf(v, 1) {
fields[k] = o.FloatReplacement
} else {
fields[k] = -o.FloatReplacement
}
}
m := make(map[string]interface{})
m["@timestamp"] = metric.Time()
m["measurement_name"] = name
m["tag"] = metric.Tags()
m[name] = fields
body, err := json.Marshal(m)
if err != nil {
return fmt.Errorf("failed to marshal body: %w", err)
}
bulkIndxrItem := opensearchutil.BulkIndexerItem{
Action: "index",
Index: indexName,
Body: strings.NewReader(string(body)),
OnSuccess: o.onSucc,
OnFailure: o.onFail,
}
if o.ForceDocumentID {
bulkIndxrItem.DocumentID = getPointID(metric)
}
if o.UsePipeline != "" {
pipelineName, err := o.getPipelineName(metric)
if err != nil {
return fmt.Errorf("failed to evaluate pipeline name: %w", err)
}
if pipelineName != "" {
if indexers[pipelineName] != nil {
if err := indexers[pipelineName].Add(ctx, bulkIndxrItem); err != nil {
o.Log.Errorf("error adding metric entry to OpenSearch bulkIndexer: %v for pipeline %s", err, pipelineName)
}
continue
}
}
}
if err := indexers["default"].Add(ctx, bulkIndxrItem); err != nil {
o.Log.Errorf("error adding metric entry to OpenSearch default bulkIndexer: %v", err)
}
}
for _, bulkIndxr := range indexers {
if err := bulkIndxr.Close(ctx); err != nil {
return fmt.Errorf("error sending bulk request to OpenSearch: %w", err)
}
// Report the indexer statistics
stats := bulkIndxr.Stats()
if stats.NumFailed > 0 {
return fmt.Errorf("failed to index [%d] documents", stats.NumFailed)
}
o.Log.Debugf("Successfully indexed [%d] documents", stats.NumAdded)
}
return nil
}
// BulkIndexer supports pipeline at config level so separate indexer instance for each unique pipeline
func getTargetIndexers(metrics []telegraf.Metric, osInst *Opensearch) map[string]opensearchutil.BulkIndexer {
var indexers = make(map[string]opensearchutil.BulkIndexer)
if osInst.UsePipeline != "" {
for _, metric := range metrics {
pipelineName, err := osInst.getPipelineName(metric)
if err != nil {
osInst.Log.Errorf("error while evaluating pipeline name: %v for pipeline %s", err, pipelineName)
}
if pipelineName != "" {
// BulkIndexer supports pipeline at config level not metric level
if _, ok := indexers[pipelineName]; ok {
continue
}
bulkIndxr, err := createBulkIndexer(osInst, pipelineName)
if err != nil {
osInst.Log.Errorf("error while instantiating OpenSearch NewBulkIndexer: %v for pipeline: %s", err, pipelineName)
} else {
indexers[pipelineName] = bulkIndxr
}
}
}
}
bulkIndxr, err := createBulkIndexer(osInst, "")
if err != nil {
osInst.Log.Errorf("error while instantiating OpenSearch NewBulkIndexer: %v for default pipeline", err)
} else {
indexers["default"] = bulkIndxr
}
return indexers
}
func createBulkIndexer(osInst *Opensearch, pipelineName string) (opensearchutil.BulkIndexer, error) {
var bulkIndexerConfig = opensearchutil.BulkIndexerConfig{
Client: osInst.osClient,
NumWorkers: 4, // The number of worker goroutines (default: number of CPUs)
FlushBytes: 5e+6, // The flush threshold in bytes (default: 5M)
}
if pipelineName != "" {
bulkIndexerConfig.Pipeline = pipelineName
}
return opensearchutil.NewBulkIndexer(bulkIndexerConfig)
}
func (o *Opensearch) GetIndexName(metric telegraf.Metric) (string, error) {
var buf bytes.Buffer
err := o.indexTmpl.Execute(&buf, metric)
if err != nil {
return "", fmt.Errorf("creating index name failed: %w", err)
}
var indexName = buf.String()
if strings.Contains(indexName, "{{") {
return "", fmt.Errorf("failed to evaluate valid indexname: %s", indexName)
}
return indexName, nil
}
func (o *Opensearch) getPipelineName(metric telegraf.Metric) (string, error) {
if o.UsePipeline == "" || !strings.Contains(o.UsePipeline, "{{") {
return o.UsePipeline, nil
}
var buf bytes.Buffer
err := o.pipelineTmpl.Execute(&buf, metric)
if err != nil {
return "", fmt.Errorf("creating pipeline name failed: %w", err)
}
var pipelineName = buf.String()
if strings.Contains(pipelineName, "{{") {
return "", fmt.Errorf("failed to evaluate valid pipelineName: %s", pipelineName)
}
o.Log.Debugf("PipelineTemplate- %s", pipelineName)
if pipelineName == "" {
pipelineName = o.DefaultPipeline
}
return pipelineName, nil
}
func (o *Opensearch) manageTemplate(ctx context.Context) error {
tempReq := opensearchapi.CatTemplatesRequest{
Name: o.TemplateName,
}
resp, err := tempReq.Do(ctx, o.osClient.Transport)
if err != nil {
return fmt.Errorf("template check failed, template name: %s, error: %w", o.TemplateName, err)
}
templateExists := resp.Body != http.NoBody
templatePattern := o.IndexName
if strings.Contains(templatePattern, "{{") {
templatePattern = templatePattern[0:strings.Index(templatePattern, "{{")]
}
if templatePattern == "" {
return errors.New("template cannot be created for dynamic index names without an index prefix")
}
if o.OverwriteTemplate || !templateExists || templatePattern != "" {
tp := templatePart{
TemplatePattern: templatePattern + "*",
}
t := template.Must(template.New("template").Parse(indexTemplate))
var tmpl bytes.Buffer
if err := t.Execute(&tmpl, tp); err != nil {
return err
}
indexTempReq := opensearchapi.IndicesPutTemplateRequest{
Name: o.TemplateName,
Body: strings.NewReader(tmpl.String()),
}
indexTempResp, err := indexTempReq.Do(ctx, o.osClient.Transport)
if err != nil || indexTempResp.StatusCode != 200 {
return fmt.Errorf("creating index template %q failed: %w", o.TemplateName, err)
}
o.Log.Debugf("Template %s created or updated", o.TemplateName)
} else {
o.Log.Debug("Found existing OpenSearch template. Skipping template management")
}
return nil
}
func (o *Opensearch) Close() error {
o.osClient = nil
return nil
}