450 lines
13 KiB
Go
450 lines
13 KiB
Go
//go:generate ../../../tools/readme_config_includer/generator
|
|
package opensearch
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/sha256"
|
|
_ "embed"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"text/template"
|
|
"time"
|
|
|
|
"github.com/opensearch-project/opensearch-go/v2"
|
|
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
|
|
"github.com/opensearch-project/opensearch-go/v2/opensearchutil"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/config"
|
|
"github.com/influxdata/telegraf/internal/choice"
|
|
"github.com/influxdata/telegraf/plugins/common/tls"
|
|
"github.com/influxdata/telegraf/plugins/outputs"
|
|
)
|
|
|
|
//go:embed sample.conf
|
|
var sampleConfig string
|
|
|
|
type Opensearch struct {
|
|
Username config.Secret `toml:"username"`
|
|
Password config.Secret `toml:"password"`
|
|
AuthBearerToken config.Secret `toml:"auth_bearer_token"`
|
|
EnableGzip bool `toml:"enable_gzip"`
|
|
EnableSniffer bool `toml:"enable_sniffer"`
|
|
FloatHandling string `toml:"float_handling"`
|
|
FloatReplacement float64 `toml:"float_replacement_value"`
|
|
ForceDocumentID bool `toml:"force_document_id"`
|
|
IndexName string `toml:"index_name"`
|
|
TemplateName string `toml:"template_name"`
|
|
ManageTemplate bool `toml:"manage_template"`
|
|
OverwriteTemplate bool `toml:"overwrite_template"`
|
|
DefaultPipeline string `toml:"default_pipeline"`
|
|
UsePipeline string `toml:"use_pipeline"`
|
|
Timeout config.Duration `toml:"timeout"`
|
|
HealthCheckInterval config.Duration `toml:"health_check_interval"`
|
|
HealthCheckTimeout config.Duration `toml:"health_check_timeout"`
|
|
URLs []string `toml:"urls"`
|
|
Log telegraf.Logger `toml:"-"`
|
|
tls.ClientConfig
|
|
|
|
indexTmpl *template.Template
|
|
pipelineTmpl *template.Template
|
|
onSucc func(context.Context, opensearchutil.BulkIndexerItem, opensearchutil.BulkIndexerResponseItem)
|
|
onFail func(context.Context, opensearchutil.BulkIndexerItem, opensearchutil.BulkIndexerResponseItem, error)
|
|
osClient *opensearch.Client
|
|
}
|
|
|
|
//go:embed template.json
|
|
var indexTemplate string
|
|
|
|
type templatePart struct {
|
|
TemplatePattern string
|
|
}
|
|
|
|
func (*Opensearch) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
func (o *Opensearch) Init() error {
|
|
if len(o.URLs) == 0 || o.IndexName == "" {
|
|
return errors.New("opensearch urls or index_name is not defined")
|
|
}
|
|
|
|
// Determine if we should process NaN and inf values
|
|
valOptions := []string{"", "none", "drop", "replace"}
|
|
if err := choice.Check(o.FloatHandling, valOptions); err != nil {
|
|
return fmt.Errorf("config float_handling type: %w", err)
|
|
}
|
|
|
|
if o.FloatHandling == "" {
|
|
o.FloatHandling = "none"
|
|
}
|
|
|
|
indexTmpl, err := template.New("index").Parse(o.IndexName)
|
|
if err != nil {
|
|
return fmt.Errorf("error parsing index_name template: %w", err)
|
|
}
|
|
o.indexTmpl = indexTmpl
|
|
|
|
pipelineTmpl, err := template.New("index").Parse(o.UsePipeline)
|
|
if err != nil {
|
|
return fmt.Errorf("error parsing use_pipeline template: %w", err)
|
|
}
|
|
o.pipelineTmpl = pipelineTmpl
|
|
|
|
o.onSucc = func(_ context.Context, _ opensearchutil.BulkIndexerItem, res opensearchutil.BulkIndexerResponseItem) {
|
|
o.Log.Debugf("Indexed to OpenSearch with status- [%d] Result- %s DocumentID- %s ", res.Status, res.Result, res.DocumentID)
|
|
}
|
|
o.onFail = func(_ context.Context, _ opensearchutil.BulkIndexerItem, res opensearchutil.BulkIndexerResponseItem, err error) {
|
|
if err != nil {
|
|
o.Log.Errorf("error while OpenSearch bulkIndexing: %v", err)
|
|
} else {
|
|
o.Log.Errorf("error while OpenSearch bulkIndexing: %s: %s", res.Error.Type, res.Error.Reason)
|
|
}
|
|
}
|
|
|
|
if o.TemplateName == "" {
|
|
return errors.New("template_name configuration not defined")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func init() {
|
|
outputs.Add("opensearch", func() telegraf.Output {
|
|
return &Opensearch{
|
|
Timeout: config.Duration(time.Second * 5),
|
|
HealthCheckInterval: config.Duration(time.Second * 10),
|
|
HealthCheckTimeout: config.Duration(time.Second * 1),
|
|
}
|
|
})
|
|
}
|
|
|
|
func (o *Opensearch) Connect() error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.Timeout))
|
|
defer cancel()
|
|
|
|
err := o.newClient()
|
|
if err != nil {
|
|
o.Log.Errorf("error creating OpenSearch client: %v", err)
|
|
}
|
|
|
|
if o.ManageTemplate {
|
|
err := o.manageTemplate(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
_, err = o.osClient.Ping()
|
|
if err != nil {
|
|
return fmt.Errorf("unable to ping OpenSearch server: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (o *Opensearch) newClient() error {
|
|
username, err := o.Username.Get()
|
|
if err != nil {
|
|
return fmt.Errorf("getting username failed: %w", err)
|
|
}
|
|
defer username.Destroy()
|
|
|
|
password, err := o.Password.Get()
|
|
if err != nil {
|
|
return fmt.Errorf("getting password failed: %w", err)
|
|
}
|
|
defer password.Destroy()
|
|
|
|
tlsConfig, err := o.ClientConfig.TLSConfig()
|
|
if err != nil {
|
|
return fmt.Errorf("creating TLS config failed: %w", err)
|
|
}
|
|
clientConfig := opensearch.Config{
|
|
Addresses: o.URLs,
|
|
Username: username.String(),
|
|
Password: password.String(),
|
|
Transport: &http.Transport{
|
|
TLSClientConfig: tlsConfig,
|
|
},
|
|
}
|
|
|
|
header := http.Header{}
|
|
if o.EnableGzip {
|
|
header.Add("Content-Encoding", "gzip")
|
|
header.Add("Content-Type", "application/json")
|
|
header.Add("Accept-Encoding", "gzip")
|
|
}
|
|
|
|
if !o.AuthBearerToken.Empty() {
|
|
token, err := o.AuthBearerToken.Get()
|
|
if err != nil {
|
|
return fmt.Errorf("getting token failed: %w", err)
|
|
}
|
|
header.Add("Authorization", "Bearer "+token.String())
|
|
defer token.Destroy()
|
|
}
|
|
clientConfig.Header = header
|
|
|
|
client, err := opensearch.NewClient(clientConfig)
|
|
o.osClient = client
|
|
|
|
return err
|
|
}
|
|
|
|
// getPointID generates a unique ID for a Metric Point
|
|
// Timestamp(ns),measurement name and Series Hash for compute the final
|
|
// SHA256 based hash ID
|
|
func getPointID(m telegraf.Metric) string {
|
|
var buffer bytes.Buffer
|
|
buffer.WriteString(strconv.FormatInt(m.Time().Local().UnixNano(), 10))
|
|
buffer.WriteString(m.Name())
|
|
buffer.WriteString(strconv.FormatUint(m.HashID(), 10))
|
|
|
|
return fmt.Sprintf("%x", sha256.Sum256(buffer.Bytes()))
|
|
}
|
|
|
|
func (o *Opensearch) Write(metrics []telegraf.Metric) error {
|
|
// get indexers based on unique pipeline values
|
|
indexers := getTargetIndexers(metrics, o)
|
|
if len(indexers) == 0 {
|
|
return errors.New("failed to instantiate OpenSearch bulkindexer")
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.Timeout))
|
|
defer cancel()
|
|
|
|
for _, metric := range metrics {
|
|
var name = metric.Name()
|
|
|
|
// index name has to be re-evaluated each time for telegraf
|
|
// to send the metric to the correct time-based index
|
|
indexName, err := o.GetIndexName(metric)
|
|
if err != nil {
|
|
return fmt.Errorf("generating indexname failed: %w", err)
|
|
}
|
|
|
|
// Handle NaN and inf field-values
|
|
fields := make(map[string]interface{})
|
|
for k, value := range metric.Fields() {
|
|
v, ok := value.(float64)
|
|
if !ok || o.FloatHandling == "none" || !(math.IsNaN(v) || math.IsInf(v, 0)) {
|
|
fields[k] = value
|
|
continue
|
|
}
|
|
if o.FloatHandling == "drop" {
|
|
continue
|
|
}
|
|
|
|
if math.IsNaN(v) || math.IsInf(v, 1) {
|
|
fields[k] = o.FloatReplacement
|
|
} else {
|
|
fields[k] = -o.FloatReplacement
|
|
}
|
|
}
|
|
|
|
m := make(map[string]interface{})
|
|
|
|
m["@timestamp"] = metric.Time()
|
|
m["measurement_name"] = name
|
|
m["tag"] = metric.Tags()
|
|
m[name] = fields
|
|
|
|
body, err := json.Marshal(m)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal body: %w", err)
|
|
}
|
|
|
|
bulkIndxrItem := opensearchutil.BulkIndexerItem{
|
|
Action: "index",
|
|
Index: indexName,
|
|
Body: strings.NewReader(string(body)),
|
|
OnSuccess: o.onSucc,
|
|
OnFailure: o.onFail,
|
|
}
|
|
if o.ForceDocumentID {
|
|
bulkIndxrItem.DocumentID = getPointID(metric)
|
|
}
|
|
|
|
if o.UsePipeline != "" {
|
|
pipelineName, err := o.getPipelineName(metric)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to evaluate pipeline name: %w", err)
|
|
}
|
|
|
|
if pipelineName != "" {
|
|
if indexers[pipelineName] != nil {
|
|
if err := indexers[pipelineName].Add(ctx, bulkIndxrItem); err != nil {
|
|
o.Log.Errorf("error adding metric entry to OpenSearch bulkIndexer: %v for pipeline %s", err, pipelineName)
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
if err := indexers["default"].Add(ctx, bulkIndxrItem); err != nil {
|
|
o.Log.Errorf("error adding metric entry to OpenSearch default bulkIndexer: %v", err)
|
|
}
|
|
}
|
|
|
|
for _, bulkIndxr := range indexers {
|
|
if err := bulkIndxr.Close(ctx); err != nil {
|
|
return fmt.Errorf("error sending bulk request to OpenSearch: %w", err)
|
|
}
|
|
|
|
// Report the indexer statistics
|
|
stats := bulkIndxr.Stats()
|
|
if stats.NumFailed > 0 {
|
|
return fmt.Errorf("failed to index [%d] documents", stats.NumFailed)
|
|
}
|
|
|
|
o.Log.Debugf("Successfully indexed [%d] documents", stats.NumAdded)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// BulkIndexer supports pipeline at config level so separate indexer instance for each unique pipeline
|
|
func getTargetIndexers(metrics []telegraf.Metric, osInst *Opensearch) map[string]opensearchutil.BulkIndexer {
|
|
var indexers = make(map[string]opensearchutil.BulkIndexer)
|
|
|
|
if osInst.UsePipeline != "" {
|
|
for _, metric := range metrics {
|
|
pipelineName, err := osInst.getPipelineName(metric)
|
|
if err != nil {
|
|
osInst.Log.Errorf("error while evaluating pipeline name: %v for pipeline %s", err, pipelineName)
|
|
}
|
|
|
|
if pipelineName != "" {
|
|
// BulkIndexer supports pipeline at config level not metric level
|
|
if _, ok := indexers[pipelineName]; ok {
|
|
continue
|
|
}
|
|
bulkIndxr, err := createBulkIndexer(osInst, pipelineName)
|
|
if err != nil {
|
|
osInst.Log.Errorf("error while instantiating OpenSearch NewBulkIndexer: %v for pipeline: %s", err, pipelineName)
|
|
} else {
|
|
indexers[pipelineName] = bulkIndxr
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
bulkIndxr, err := createBulkIndexer(osInst, "")
|
|
if err != nil {
|
|
osInst.Log.Errorf("error while instantiating OpenSearch NewBulkIndexer: %v for default pipeline", err)
|
|
} else {
|
|
indexers["default"] = bulkIndxr
|
|
}
|
|
return indexers
|
|
}
|
|
|
|
func createBulkIndexer(osInst *Opensearch, pipelineName string) (opensearchutil.BulkIndexer, error) {
|
|
var bulkIndexerConfig = opensearchutil.BulkIndexerConfig{
|
|
Client: osInst.osClient,
|
|
NumWorkers: 4, // The number of worker goroutines (default: number of CPUs)
|
|
FlushBytes: 5e+6, // The flush threshold in bytes (default: 5M)
|
|
}
|
|
if pipelineName != "" {
|
|
bulkIndexerConfig.Pipeline = pipelineName
|
|
}
|
|
|
|
return opensearchutil.NewBulkIndexer(bulkIndexerConfig)
|
|
}
|
|
|
|
func (o *Opensearch) GetIndexName(metric telegraf.Metric) (string, error) {
|
|
var buf bytes.Buffer
|
|
err := o.indexTmpl.Execute(&buf, metric)
|
|
|
|
if err != nil {
|
|
return "", fmt.Errorf("creating index name failed: %w", err)
|
|
}
|
|
var indexName = buf.String()
|
|
if strings.Contains(indexName, "{{") {
|
|
return "", fmt.Errorf("failed to evaluate valid indexname: %s", indexName)
|
|
}
|
|
return indexName, nil
|
|
}
|
|
|
|
func (o *Opensearch) getPipelineName(metric telegraf.Metric) (string, error) {
|
|
if o.UsePipeline == "" || !strings.Contains(o.UsePipeline, "{{") {
|
|
return o.UsePipeline, nil
|
|
}
|
|
|
|
var buf bytes.Buffer
|
|
err := o.pipelineTmpl.Execute(&buf, metric)
|
|
if err != nil {
|
|
return "", fmt.Errorf("creating pipeline name failed: %w", err)
|
|
}
|
|
var pipelineName = buf.String()
|
|
if strings.Contains(pipelineName, "{{") {
|
|
return "", fmt.Errorf("failed to evaluate valid pipelineName: %s", pipelineName)
|
|
}
|
|
o.Log.Debugf("PipelineTemplate- %s", pipelineName)
|
|
|
|
if pipelineName == "" {
|
|
pipelineName = o.DefaultPipeline
|
|
}
|
|
return pipelineName, nil
|
|
}
|
|
|
|
func (o *Opensearch) manageTemplate(ctx context.Context) error {
|
|
tempReq := opensearchapi.CatTemplatesRequest{
|
|
Name: o.TemplateName,
|
|
}
|
|
|
|
resp, err := tempReq.Do(ctx, o.osClient.Transport)
|
|
if err != nil {
|
|
return fmt.Errorf("template check failed, template name: %s, error: %w", o.TemplateName, err)
|
|
}
|
|
|
|
templateExists := resp.Body != http.NoBody
|
|
templatePattern := o.IndexName
|
|
|
|
if strings.Contains(templatePattern, "{{") {
|
|
templatePattern = templatePattern[0:strings.Index(templatePattern, "{{")]
|
|
}
|
|
|
|
if templatePattern == "" {
|
|
return errors.New("template cannot be created for dynamic index names without an index prefix")
|
|
}
|
|
|
|
if o.OverwriteTemplate || !templateExists || templatePattern != "" {
|
|
tp := templatePart{
|
|
TemplatePattern: templatePattern + "*",
|
|
}
|
|
|
|
t := template.Must(template.New("template").Parse(indexTemplate))
|
|
var tmpl bytes.Buffer
|
|
|
|
if err := t.Execute(&tmpl, tp); err != nil {
|
|
return err
|
|
}
|
|
|
|
indexTempReq := opensearchapi.IndicesPutTemplateRequest{
|
|
Name: o.TemplateName,
|
|
Body: strings.NewReader(tmpl.String()),
|
|
}
|
|
indexTempResp, err := indexTempReq.Do(ctx, o.osClient.Transport)
|
|
|
|
if err != nil || indexTempResp.StatusCode != 200 {
|
|
return fmt.Errorf("creating index template %q failed: %w", o.TemplateName, err)
|
|
}
|
|
|
|
o.Log.Debugf("Template %s created or updated", o.TemplateName)
|
|
} else {
|
|
o.Log.Debug("Found existing OpenSearch template. Skipping template management")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (o *Opensearch) Close() error {
|
|
o.osClient = nil
|
|
return nil
|
|
}
|