226 lines
5.1 KiB
Go
226 lines
5.1 KiB
Go
//go:generate ../../../tools/readme_config_includer/generator
|
|
package loki
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
_ "embed"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"regexp"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"golang.org/x/oauth2"
|
|
"golang.org/x/oauth2/clientcredentials"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/config"
|
|
"github.com/influxdata/telegraf/internal"
|
|
"github.com/influxdata/telegraf/plugins/common/tls"
|
|
"github.com/influxdata/telegraf/plugins/outputs"
|
|
)
|
|
|
|
//go:embed sample.conf
|
|
var sampleConfig string
|
|
|
|
const (
|
|
defaultEndpoint = "/loki/api/v1/push"
|
|
defaultClientTimeout = 5 * time.Second
|
|
)
|
|
|
|
type Loki struct {
|
|
Domain string `toml:"domain"`
|
|
Endpoint string `toml:"endpoint"`
|
|
Timeout config.Duration `toml:"timeout"`
|
|
Username config.Secret `toml:"username"`
|
|
Password config.Secret `toml:"password"`
|
|
Headers map[string]string `toml:"http_headers"`
|
|
ClientID string `toml:"client_id"`
|
|
ClientSecret string `toml:"client_secret"`
|
|
TokenURL string `toml:"token_url"`
|
|
Scopes []string `toml:"scopes"`
|
|
GZipRequest bool `toml:"gzip_request"`
|
|
MetricNameLabel string `toml:"metric_name_label"`
|
|
SanitizeLabelNames bool `toml:"sanitize_label_names"`
|
|
|
|
url string
|
|
client *http.Client
|
|
tls.ClientConfig
|
|
}
|
|
|
|
func (l *Loki) createClient(ctx context.Context) (*http.Client, error) {
|
|
tlsCfg, err := l.ClientConfig.TLSConfig()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("tls config fail: %w", err)
|
|
}
|
|
|
|
client := &http.Client{
|
|
Transport: &http.Transport{
|
|
TLSClientConfig: tlsCfg,
|
|
Proxy: http.ProxyFromEnvironment,
|
|
},
|
|
Timeout: time.Duration(l.Timeout),
|
|
}
|
|
|
|
if l.ClientID != "" && l.ClientSecret != "" && l.TokenURL != "" {
|
|
oauthConfig := clientcredentials.Config{
|
|
ClientID: l.ClientID,
|
|
ClientSecret: l.ClientSecret,
|
|
TokenURL: l.TokenURL,
|
|
Scopes: l.Scopes,
|
|
}
|
|
ctx = context.WithValue(ctx, oauth2.HTTPClient, client)
|
|
client = oauthConfig.Client(ctx)
|
|
}
|
|
|
|
return client, nil
|
|
}
|
|
|
|
func (*Loki) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
func (l *Loki) Connect() (err error) {
|
|
if l.Domain == "" {
|
|
return errors.New("domain is required")
|
|
}
|
|
|
|
if l.Endpoint == "" {
|
|
l.Endpoint = defaultEndpoint
|
|
}
|
|
|
|
l.url = fmt.Sprintf("%s%s", l.Domain, l.Endpoint)
|
|
|
|
if l.Timeout == 0 {
|
|
l.Timeout = config.Duration(defaultClientTimeout)
|
|
}
|
|
|
|
ctx := context.Background()
|
|
l.client, err = l.createClient(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("http client fail: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (l *Loki) Close() error {
|
|
l.client.CloseIdleConnections()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (l *Loki) Write(metrics []telegraf.Metric) error {
|
|
s := Streams{}
|
|
|
|
sort.SliceStable(metrics, func(i, j int) bool {
|
|
return metrics[i].Time().Before(metrics[j].Time())
|
|
})
|
|
|
|
for _, m := range metrics {
|
|
if l.MetricNameLabel != "" {
|
|
m.AddTag(l.MetricNameLabel, m.Name())
|
|
}
|
|
|
|
tags := m.TagList()
|
|
if l.SanitizeLabelNames {
|
|
for _, t := range tags {
|
|
t.Key = sanitizeLabelName(t.Key)
|
|
}
|
|
}
|
|
|
|
var line string
|
|
for _, f := range m.FieldList() {
|
|
line += fmt.Sprintf("%s=\"%v\" ", f.Key, f.Value)
|
|
}
|
|
|
|
s.insertLog(tags, Log{strconv.FormatInt(m.Time().UnixNano(), 10), line})
|
|
}
|
|
|
|
return l.writeMetrics(s)
|
|
}
|
|
|
|
func (l *Loki) writeMetrics(s Streams) error {
|
|
bs, err := json.Marshal(s)
|
|
if err != nil {
|
|
return fmt.Errorf("json.Marshal: %w", err)
|
|
}
|
|
|
|
var reqBodyBuffer io.Reader = bytes.NewBuffer(bs)
|
|
|
|
if l.GZipRequest {
|
|
rc := internal.CompressWithGzip(reqBodyBuffer)
|
|
defer rc.Close()
|
|
reqBodyBuffer = rc
|
|
}
|
|
|
|
req, err := http.NewRequest(http.MethodPost, l.url, reqBodyBuffer)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !l.Username.Empty() {
|
|
username, err := l.Username.Get()
|
|
if err != nil {
|
|
return fmt.Errorf("getting username failed: %w", err)
|
|
}
|
|
password, err := l.Password.Get()
|
|
if err != nil {
|
|
username.Destroy()
|
|
return fmt.Errorf("getting password failed: %w", err)
|
|
}
|
|
req.SetBasicAuth(username.String(), password.String())
|
|
username.Destroy()
|
|
password.Destroy()
|
|
}
|
|
|
|
for k, v := range l.Headers {
|
|
if strings.EqualFold(k, "host") {
|
|
req.Host = v
|
|
}
|
|
req.Header.Set(k, v)
|
|
}
|
|
|
|
req.Header.Set("User-Agent", internal.ProductToken())
|
|
req.Header.Set("Content-Type", "application/json")
|
|
if l.GZipRequest {
|
|
req.Header.Set("Content-Encoding", "gzip")
|
|
}
|
|
|
|
resp, err := l.client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
//nolint:errcheck // err can be ignored since it is just for logging
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return fmt.Errorf("when writing to [%s] received status code, %d: %s", l.url, resp.StatusCode, body)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Verify the label name matches the regex [a-zA-Z_:][a-zA-Z0-9_:]*
|
|
func sanitizeLabelName(name string) string {
|
|
re := regexp.MustCompile(`^[^a-zA-Z_:]`)
|
|
result := re.ReplaceAllString(name, "_")
|
|
|
|
re = regexp.MustCompile(`[^a-zA-Z0-9_:]`)
|
|
return re.ReplaceAllString(result, "_")
|
|
}
|
|
|
|
func init() {
|
|
outputs.Add("loki", func() telegraf.Output {
|
|
return &Loki{
|
|
MetricNameLabel: "__name",
|
|
}
|
|
})
|
|
}
|