507 lines
16 KiB
Go
507 lines
16 KiB
Go
//go:generate ../../../tools/readme_config_includer/generator
|
|
package postgresql
|
|
|
|
import (
|
|
"context"
|
|
_ "embed"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/coocood/freecache"
|
|
"github.com/jackc/pgconn"
|
|
"github.com/jackc/pgtype"
|
|
"github.com/jackc/pgx/v4"
|
|
"github.com/jackc/pgx/v4/pgxpool"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/config"
|
|
"github.com/influxdata/telegraf/internal"
|
|
"github.com/influxdata/telegraf/logger"
|
|
"github.com/influxdata/telegraf/plugins/outputs"
|
|
"github.com/influxdata/telegraf/plugins/outputs/postgresql/sqltemplate"
|
|
"github.com/influxdata/telegraf/plugins/outputs/postgresql/utils"
|
|
)
|
|
|
|
type dbh interface {
|
|
Begin(ctx context.Context) (pgx.Tx, error)
|
|
CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error)
|
|
Exec(ctx context.Context, sql string, arguments ...interface{}) (commandTag pgconn.CommandTag, err error)
|
|
Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
|
|
}
|
|
|
|
//go:embed sample.conf
|
|
var sampleConfig string
|
|
|
|
type Postgresql struct {
|
|
Connection config.Secret `toml:"connection"`
|
|
Schema string `toml:"schema"`
|
|
TagsAsForeignKeys bool `toml:"tags_as_foreign_keys"`
|
|
TagTableSuffix string `toml:"tag_table_suffix"`
|
|
ForeignTagConstraint bool `toml:"foreign_tag_constraint"`
|
|
TagsAsJsonb bool `toml:"tags_as_jsonb"`
|
|
FieldsAsJsonb bool `toml:"fields_as_jsonb"`
|
|
TimestampColumnName string `toml:"timestamp_column_name"`
|
|
TimestampColumnType string `toml:"timestamp_column_type"`
|
|
CreateTemplates []*sqltemplate.Template `toml:"create_templates"`
|
|
AddColumnTemplates []*sqltemplate.Template `toml:"add_column_templates"`
|
|
TagTableCreateTemplates []*sqltemplate.Template `toml:"tag_table_create_templates"`
|
|
TagTableAddColumnTemplates []*sqltemplate.Template `toml:"tag_table_add_column_templates"`
|
|
Uint64Type string `toml:"uint64_type"`
|
|
RetryMaxBackoff config.Duration `toml:"retry_max_backoff"`
|
|
TagCacheSize int `toml:"tag_cache_size"`
|
|
ColumnNameLenLimit int `toml:"column_name_length_limit"`
|
|
LogLevel string `toml:"log_level"`
|
|
Logger telegraf.Logger `toml:"-"`
|
|
|
|
dbContext context.Context
|
|
dbContextCancel func()
|
|
dbConfig *pgxpool.Config
|
|
db *pgxpool.Pool
|
|
tableManager *TableManager
|
|
tagsCache *freecache.Cache
|
|
|
|
pguint8 *pgtype.DataType
|
|
|
|
writeChan chan *TableSource
|
|
writeWaitGroup *utils.WaitGroup
|
|
|
|
// Column types
|
|
timeColumn utils.Column
|
|
tagIDColumn utils.Column
|
|
fieldsJSONColumn utils.Column
|
|
tagsJSONColumn utils.Column
|
|
}
|
|
|
|
func (*Postgresql) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
func (p *Postgresql) Init() error {
|
|
if p.TagCacheSize < 0 {
|
|
return errors.New("invalid tag_cache_size")
|
|
}
|
|
|
|
// Set the time-column name
|
|
if p.TimestampColumnName == "" {
|
|
p.TimestampColumnName = "time"
|
|
}
|
|
|
|
switch p.TimestampColumnType {
|
|
case "":
|
|
p.TimestampColumnType = PgTimestampWithoutTimeZone
|
|
case PgTimestampWithoutTimeZone, PgTimestampWithTimeZone:
|
|
// do nothing for the valid choices
|
|
default:
|
|
return fmt.Errorf("unknown timestamp column type %q", p.TimestampColumnType)
|
|
}
|
|
|
|
// Initialize the column prototypes
|
|
p.timeColumn = utils.Column{
|
|
Name: p.TimestampColumnName,
|
|
Type: p.TimestampColumnType,
|
|
Role: utils.TimeColType,
|
|
}
|
|
p.tagIDColumn = utils.Column{Name: "tag_id", Type: PgBigInt, Role: utils.TagsIDColType}
|
|
p.fieldsJSONColumn = utils.Column{Name: "fields", Type: PgJSONb, Role: utils.FieldColType}
|
|
p.tagsJSONColumn = utils.Column{Name: "tags", Type: PgJSONb, Role: utils.TagColType}
|
|
|
|
connectionSecret, err := p.Connection.Get()
|
|
if err != nil {
|
|
return fmt.Errorf("getting address failed: %w", err)
|
|
}
|
|
connection := connectionSecret.String()
|
|
defer connectionSecret.Destroy()
|
|
|
|
if p.dbConfig, err = pgxpool.ParseConfig(connection); err != nil {
|
|
return err
|
|
}
|
|
parsedConfig, err := pgx.ParseConfig(connection)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if _, ok := parsedConfig.Config.RuntimeParams["pool_max_conns"]; !ok {
|
|
// The pgx default for pool_max_conns is 4. However we want to default to 1.
|
|
p.dbConfig.MaxConns = 1
|
|
}
|
|
|
|
if _, ok := p.dbConfig.ConnConfig.RuntimeParams["application_name"]; !ok {
|
|
p.dbConfig.ConnConfig.RuntimeParams["application_name"] = "telegraf"
|
|
}
|
|
|
|
if p.LogLevel != "" {
|
|
p.dbConfig.ConnConfig.Logger = utils.PGXLogger{Logger: p.Logger}
|
|
p.dbConfig.ConnConfig.LogLevel, err = pgx.LogLevelFromString(p.LogLevel)
|
|
if err != nil {
|
|
return errors.New("invalid log level")
|
|
}
|
|
}
|
|
|
|
switch p.Uint64Type {
|
|
case PgNumeric:
|
|
case PgUint8:
|
|
p.dbConfig.AfterConnect = p.registerUint8
|
|
default:
|
|
return errors.New("invalid uint64_type")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Connect establishes a connection to the target database and prepares the cache
|
|
func (p *Postgresql) Connect() error {
|
|
// Yes, we're not supposed to store the context. However since we don't receive a context, we have to.
|
|
p.dbContext, p.dbContextCancel = context.WithCancel(context.Background())
|
|
var err error
|
|
p.db, err = pgxpool.ConnectConfig(p.dbContext, p.dbConfig)
|
|
if err != nil {
|
|
p.dbContextCancel()
|
|
return &internal.StartupError{
|
|
Err: err,
|
|
Retry: true,
|
|
}
|
|
}
|
|
p.tableManager = NewTableManager(p)
|
|
|
|
if p.TagsAsForeignKeys {
|
|
p.tagsCache = freecache.NewCache(p.TagCacheSize * 34) // from testing, each entry consumes approx 34 bytes
|
|
}
|
|
|
|
maxConns := int(p.db.Stat().MaxConns())
|
|
if maxConns > 1 {
|
|
p.writeChan = make(chan *TableSource)
|
|
p.writeWaitGroup = utils.NewWaitGroup()
|
|
for i := 0; i < maxConns; i++ {
|
|
p.writeWaitGroup.Add(1)
|
|
go p.writeWorker(p.dbContext)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *Postgresql) registerUint8(_ context.Context, conn *pgx.Conn) error {
|
|
if p.pguint8 == nil {
|
|
dt := pgtype.DataType{
|
|
// Use 'numeric' type for encoding/decoding across the wire
|
|
// It might be more efficient to create a native pgtype.Type, but would involve a lot of code. So this is
|
|
// probably good enough.
|
|
Value: &Uint8{},
|
|
Name: "uint8",
|
|
}
|
|
row := conn.QueryRow(p.dbContext, "SELECT oid FROM pg_type WHERE typname=$1", dt.Name)
|
|
if err := row.Scan(&dt.OID); err != nil {
|
|
return fmt.Errorf("retrieving OID for uint8 data type: %w", err)
|
|
}
|
|
p.pguint8 = &dt
|
|
}
|
|
|
|
conn.ConnInfo().RegisterDataType(*p.pguint8)
|
|
return nil
|
|
}
|
|
|
|
// Close closes the connection(s) to the database.
|
|
func (p *Postgresql) Close() error {
|
|
if p.writeChan != nil {
|
|
// We're using async mode. Gracefully close with timeout.
|
|
close(p.writeChan)
|
|
select {
|
|
case <-p.writeWaitGroup.C():
|
|
case <-time.NewTimer(time.Second * 5).C:
|
|
p.Logger.Warnf("Shutdown timeout expired while waiting for metrics to flush. Some metrics may not be written to database.")
|
|
}
|
|
}
|
|
|
|
// Die!
|
|
p.dbContextCancel()
|
|
if p.db != nil {
|
|
p.db.Close()
|
|
}
|
|
p.tableManager = nil
|
|
return nil
|
|
}
|
|
|
|
func (p *Postgresql) Write(metrics []telegraf.Metric) error {
|
|
if p.tagsCache != nil {
|
|
// gather at the start of write so there's less chance of any async operations ongoing
|
|
p.Logger.Debugf("cache: size=%d hit=%d miss=%d full=%d\n",
|
|
p.tagsCache.EntryCount(),
|
|
p.tagsCache.HitCount(),
|
|
p.tagsCache.MissCount(),
|
|
p.tagsCache.EvacuateCount(),
|
|
)
|
|
p.tagsCache.ResetStatistics()
|
|
}
|
|
|
|
tableSources := NewTableSources(p, metrics)
|
|
|
|
var err error
|
|
if p.db.Stat().MaxConns() > 1 {
|
|
p.writeConcurrent(tableSources)
|
|
} else {
|
|
err = p.writeSequential(tableSources)
|
|
}
|
|
if err != nil {
|
|
var pgErr *pgconn.PgError
|
|
if errors.As(err, &pgErr) {
|
|
// PgError doesn't include .Detail in Error(), so we concat it onto .Message.
|
|
if pgErr.Detail != "" {
|
|
pgErr.Message += "; " + pgErr.Detail
|
|
}
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (p *Postgresql) writeSequential(tableSources map[string]*TableSource) error {
|
|
tx, err := p.db.Begin(p.dbContext)
|
|
if err != nil {
|
|
return fmt.Errorf("starting transaction: %w", err)
|
|
}
|
|
defer tx.Rollback(p.dbContext) //nolint:errcheck // In case of failure during commit, "err" from commit will be returned
|
|
|
|
for _, tableSource := range tableSources {
|
|
sp := tx
|
|
if len(tableSources) > 1 {
|
|
// wrap each sub-batch in a savepoint so that if a permanent error is received, we can drop just that one sub-batch, and insert everything else.
|
|
sp, err = tx.Begin(p.dbContext)
|
|
if err != nil {
|
|
return fmt.Errorf("starting savepoint: %w", err)
|
|
}
|
|
}
|
|
|
|
err := p.writeMetricsFromMeasure(p.dbContext, sp, tableSource)
|
|
if err != nil {
|
|
if isTempError(err) {
|
|
// return so that telegraf will retry the whole batch
|
|
return err
|
|
}
|
|
p.Logger.Errorf("write error (permanent, dropping sub-batch): %v", err)
|
|
if len(tableSources) == 1 {
|
|
return nil
|
|
}
|
|
// drop this one sub-batch and continue trying the rest
|
|
if err := sp.Rollback(p.dbContext); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
// savepoints do not need to be committed (released), so save the round trip and skip it
|
|
}
|
|
|
|
if err := tx.Commit(p.dbContext); err != nil {
|
|
return fmt.Errorf("committing transaction: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *Postgresql) writeConcurrent(tableSources map[string]*TableSource) {
|
|
for _, tableSource := range tableSources {
|
|
select {
|
|
case p.writeChan <- tableSource:
|
|
case <-p.dbContext.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Postgresql) writeWorker(ctx context.Context) {
|
|
defer p.writeWaitGroup.Done()
|
|
for {
|
|
select {
|
|
case tableSource, ok := <-p.writeChan:
|
|
if !ok {
|
|
return
|
|
}
|
|
if err := p.writeRetry(ctx, tableSource); err != nil {
|
|
p.Logger.Errorf("write error (permanent, dropping sub-batch): %v", err)
|
|
}
|
|
case <-p.dbContext.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// isTempError reports whether the error received during a metric write operation is temporary or permanent.
|
|
// A temporary error is one that if the write were retried at a later time, that it might succeed.
|
|
// Note however that this applies to the transaction as a whole, not the individual operation. Meaning for example a
|
|
// write might come in that needs a new table created, but another worker already created the table in between when we
|
|
// checked for it, and tried to create it. In this case, the operation error is permanent, as we can try `CREATE TABLE`
|
|
// again and it will still fail. But if we retry the transaction from scratch, when we perform the table check we'll see
|
|
// it exists, so we consider the error temporary.
|
|
func isTempError(err error) bool {
|
|
var pgErr *pgconn.PgError
|
|
if errors.As(err, &pgErr); pgErr != nil {
|
|
// https://www.postgresql.org/docs/12/errcodes-appendix.html
|
|
errClass := pgErr.Code[:2]
|
|
switch errClass {
|
|
case "23": // Integrity Constraint Violation
|
|
// 23505 - unique_violation
|
|
if pgErr.Code == "23505" && strings.Contains(err.Error(), "pg_type_typname_nsp_index") {
|
|
// Happens when you try to create 2 tables simultaneously.
|
|
return true
|
|
}
|
|
case "25": // Invalid Transaction State
|
|
// If we're here, this is a bug, but recoverable
|
|
return true
|
|
case "40": // Transaction Rollback
|
|
if pgErr.Code == "40P01" { // deadlock_detected
|
|
return true
|
|
}
|
|
case "42": // Syntax Error or Access Rule Violation
|
|
switch pgErr.Code {
|
|
case "42701": // duplicate_column
|
|
return true
|
|
case "42P07": // duplicate_table
|
|
return true
|
|
}
|
|
case "53": // Insufficient Resources
|
|
return true
|
|
case "57": // Operator Intervention
|
|
switch pgErr.Code {
|
|
case "57014": // query_cancelled
|
|
// This one is a bit of a mess. This code comes back when PGX cancels the query. Such as when PGX can't
|
|
// convert to the column's type. So even though the error was originally generated by PGX, we get the
|
|
// error from Postgres.
|
|
return false
|
|
case "57P04": // database_dropped
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
// Assume that any other error that comes from postgres is a permanent error
|
|
return false
|
|
}
|
|
|
|
var tempErr interface{ Temporary() bool }
|
|
if errors.As(err, &tempErr) {
|
|
return tempErr.Temporary()
|
|
}
|
|
|
|
// Assume that any other error is permanent.
|
|
// This may mean that we incorrectly discard data that could have been retried, but the alternative is that we get
|
|
// stuck retrying data that will never succeed, causing good data to be dropped because the buffer fills up.
|
|
return false
|
|
}
|
|
|
|
func (p *Postgresql) writeRetry(ctx context.Context, tableSource *TableSource) error {
|
|
backoff := time.Duration(0)
|
|
for {
|
|
err := p.writeMetricsFromMeasure(ctx, p.db, tableSource)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
if !isTempError(err) {
|
|
return err
|
|
}
|
|
p.Logger.Errorf("write error (retry in %s): %v", backoff, err)
|
|
tableSource.Reset()
|
|
time.Sleep(backoff)
|
|
|
|
if backoff == 0 {
|
|
backoff = time.Millisecond * 250
|
|
} else {
|
|
backoff *= 2
|
|
if backoff > time.Duration(p.RetryMaxBackoff) {
|
|
backoff = time.Duration(p.RetryMaxBackoff)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Writes the metrics from a specified measure. All the provided metrics must belong to the same measurement.
|
|
func (p *Postgresql) writeMetricsFromMeasure(ctx context.Context, db dbh, tableSource *TableSource) error {
|
|
err := p.tableManager.MatchSource(ctx, db, tableSource)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if p.TagsAsForeignKeys {
|
|
if err = writeTagTable(ctx, db, tableSource); err != nil {
|
|
if p.ForeignTagConstraint {
|
|
return fmt.Errorf("writing to tag table %q: %w", tableSource.Name()+p.TagTableSuffix, err)
|
|
}
|
|
// log and continue. As the admin can correct the issue, and tags don't change over time, they can be
|
|
// added from future metrics after issue is corrected.
|
|
p.Logger.Errorf("writing to tag table %q: %s", tableSource.Name()+p.TagTableSuffix, err.Error())
|
|
}
|
|
}
|
|
|
|
fullTableName := utils.FullTableName(p.Schema, tableSource.Name())
|
|
if _, err := db.CopyFrom(ctx, fullTableName, tableSource.ColumnNames(), tableSource); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func writeTagTable(ctx context.Context, db dbh, tableSource *TableSource) error {
|
|
ttsrc := NewTagTableSource(tableSource)
|
|
|
|
// Check whether we have any tags to insert
|
|
if !ttsrc.Next() {
|
|
return nil
|
|
}
|
|
ttsrc.Reset()
|
|
|
|
// need a transaction so that if it errors, we don't roll back the parent transaction, just the tags
|
|
tx, err := db.Begin(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback(ctx) //nolint:errcheck // In case of failure during commit, "err" from commit will be returned
|
|
|
|
ident := pgx.Identifier{ttsrc.postgresql.Schema, ttsrc.Name()}
|
|
identTemp := pgx.Identifier{ttsrc.Name() + "_temp"}
|
|
sql := fmt.Sprintf("CREATE TEMP TABLE %s (LIKE %s) ON COMMIT DROP", identTemp.Sanitize(), ident.Sanitize())
|
|
if _, err := tx.Exec(ctx, sql); err != nil {
|
|
return fmt.Errorf("creating tags temp table: %w", err)
|
|
}
|
|
|
|
if _, err := tx.CopyFrom(ctx, identTemp, ttsrc.ColumnNames(), ttsrc); err != nil {
|
|
return fmt.Errorf("copying into tags temp table: %w", err)
|
|
}
|
|
|
|
insert := fmt.Sprintf("INSERT INTO %s SELECT * FROM %s ORDER BY tag_id ON CONFLICT (tag_id) DO NOTHING", ident.Sanitize(), identTemp.Sanitize())
|
|
if _, err := tx.Exec(ctx, insert); err != nil {
|
|
return fmt.Errorf("inserting into tags table: %w", err)
|
|
}
|
|
|
|
if err := tx.Commit(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
ttsrc.UpdateCache()
|
|
return nil
|
|
}
|
|
|
|
func newPostgresql() *Postgresql {
|
|
p := &Postgresql{
|
|
Schema: "public",
|
|
TagTableSuffix: "_tag",
|
|
TagCacheSize: 100000,
|
|
Uint64Type: PgNumeric,
|
|
CreateTemplates: []*sqltemplate.Template{{}},
|
|
AddColumnTemplates: []*sqltemplate.Template{{}},
|
|
TagTableCreateTemplates: []*sqltemplate.Template{{}},
|
|
TagTableAddColumnTemplates: []*sqltemplate.Template{{}},
|
|
RetryMaxBackoff: config.Duration(time.Second * 15),
|
|
Logger: logger.New("outputs", "postgresql", ""),
|
|
LogLevel: "warn",
|
|
}
|
|
|
|
p.CreateTemplates[0].UnmarshalText([]byte(`CREATE TABLE {{ .table }} ({{ .columns }})`))
|
|
p.AddColumnTemplates[0].UnmarshalText([]byte(`ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}`))
|
|
p.TagTableCreateTemplates[0].UnmarshalText([]byte(`CREATE TABLE {{ .table }} ({{ .columns }}, PRIMARY KEY (tag_id))`))
|
|
p.TagTableAddColumnTemplates[0].UnmarshalText(
|
|
[]byte(`ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}`),
|
|
)
|
|
|
|
return p
|
|
}
|
|
|
|
func init() {
|
|
outputs.Add("postgresql", func() telegraf.Output { return newPostgresql() })
|
|
}
|