1
0
Fork 0
telegraf/plugins/outputs/postgresql/postgresql.go

508 lines
16 KiB
Go
Raw Permalink Normal View History

//go:generate ../../../tools/readme_config_includer/generator
package postgresql
import (
"context"
_ "embed"
"errors"
"fmt"
"strings"
"time"
"github.com/coocood/freecache"
"github.com/jackc/pgconn"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/logger"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/outputs/postgresql/sqltemplate"
"github.com/influxdata/telegraf/plugins/outputs/postgresql/utils"
)
type dbh interface {
Begin(ctx context.Context) (pgx.Tx, error)
CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error)
Exec(ctx context.Context, sql string, arguments ...interface{}) (commandTag pgconn.CommandTag, err error)
Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
}
//go:embed sample.conf
var sampleConfig string
type Postgresql struct {
Connection config.Secret `toml:"connection"`
Schema string `toml:"schema"`
TagsAsForeignKeys bool `toml:"tags_as_foreign_keys"`
TagTableSuffix string `toml:"tag_table_suffix"`
ForeignTagConstraint bool `toml:"foreign_tag_constraint"`
TagsAsJsonb bool `toml:"tags_as_jsonb"`
FieldsAsJsonb bool `toml:"fields_as_jsonb"`
TimestampColumnName string `toml:"timestamp_column_name"`
TimestampColumnType string `toml:"timestamp_column_type"`
CreateTemplates []*sqltemplate.Template `toml:"create_templates"`
AddColumnTemplates []*sqltemplate.Template `toml:"add_column_templates"`
TagTableCreateTemplates []*sqltemplate.Template `toml:"tag_table_create_templates"`
TagTableAddColumnTemplates []*sqltemplate.Template `toml:"tag_table_add_column_templates"`
Uint64Type string `toml:"uint64_type"`
RetryMaxBackoff config.Duration `toml:"retry_max_backoff"`
TagCacheSize int `toml:"tag_cache_size"`
ColumnNameLenLimit int `toml:"column_name_length_limit"`
LogLevel string `toml:"log_level"`
Logger telegraf.Logger `toml:"-"`
dbContext context.Context
dbContextCancel func()
dbConfig *pgxpool.Config
db *pgxpool.Pool
tableManager *TableManager
tagsCache *freecache.Cache
pguint8 *pgtype.DataType
writeChan chan *TableSource
writeWaitGroup *utils.WaitGroup
// Column types
timeColumn utils.Column
tagIDColumn utils.Column
fieldsJSONColumn utils.Column
tagsJSONColumn utils.Column
}
func (*Postgresql) SampleConfig() string {
return sampleConfig
}
func (p *Postgresql) Init() error {
if p.TagCacheSize < 0 {
return errors.New("invalid tag_cache_size")
}
// Set the time-column name
if p.TimestampColumnName == "" {
p.TimestampColumnName = "time"
}
switch p.TimestampColumnType {
case "":
p.TimestampColumnType = PgTimestampWithoutTimeZone
case PgTimestampWithoutTimeZone, PgTimestampWithTimeZone:
// do nothing for the valid choices
default:
return fmt.Errorf("unknown timestamp column type %q", p.TimestampColumnType)
}
// Initialize the column prototypes
p.timeColumn = utils.Column{
Name: p.TimestampColumnName,
Type: p.TimestampColumnType,
Role: utils.TimeColType,
}
p.tagIDColumn = utils.Column{Name: "tag_id", Type: PgBigInt, Role: utils.TagsIDColType}
p.fieldsJSONColumn = utils.Column{Name: "fields", Type: PgJSONb, Role: utils.FieldColType}
p.tagsJSONColumn = utils.Column{Name: "tags", Type: PgJSONb, Role: utils.TagColType}
connectionSecret, err := p.Connection.Get()
if err != nil {
return fmt.Errorf("getting address failed: %w", err)
}
connection := connectionSecret.String()
defer connectionSecret.Destroy()
if p.dbConfig, err = pgxpool.ParseConfig(connection); err != nil {
return err
}
parsedConfig, err := pgx.ParseConfig(connection)
if err != nil {
return err
}
if _, ok := parsedConfig.Config.RuntimeParams["pool_max_conns"]; !ok {
// The pgx default for pool_max_conns is 4. However we want to default to 1.
p.dbConfig.MaxConns = 1
}
if _, ok := p.dbConfig.ConnConfig.RuntimeParams["application_name"]; !ok {
p.dbConfig.ConnConfig.RuntimeParams["application_name"] = "telegraf"
}
if p.LogLevel != "" {
p.dbConfig.ConnConfig.Logger = utils.PGXLogger{Logger: p.Logger}
p.dbConfig.ConnConfig.LogLevel, err = pgx.LogLevelFromString(p.LogLevel)
if err != nil {
return errors.New("invalid log level")
}
}
switch p.Uint64Type {
case PgNumeric:
case PgUint8:
p.dbConfig.AfterConnect = p.registerUint8
default:
return errors.New("invalid uint64_type")
}
return nil
}
// Connect establishes a connection to the target database and prepares the cache
func (p *Postgresql) Connect() error {
// Yes, we're not supposed to store the context. However since we don't receive a context, we have to.
p.dbContext, p.dbContextCancel = context.WithCancel(context.Background())
var err error
p.db, err = pgxpool.ConnectConfig(p.dbContext, p.dbConfig)
if err != nil {
p.dbContextCancel()
return &internal.StartupError{
Err: err,
Retry: true,
}
}
p.tableManager = NewTableManager(p)
if p.TagsAsForeignKeys {
p.tagsCache = freecache.NewCache(p.TagCacheSize * 34) // from testing, each entry consumes approx 34 bytes
}
maxConns := int(p.db.Stat().MaxConns())
if maxConns > 1 {
p.writeChan = make(chan *TableSource)
p.writeWaitGroup = utils.NewWaitGroup()
for i := 0; i < maxConns; i++ {
p.writeWaitGroup.Add(1)
go p.writeWorker(p.dbContext)
}
}
return nil
}
func (p *Postgresql) registerUint8(_ context.Context, conn *pgx.Conn) error {
if p.pguint8 == nil {
dt := pgtype.DataType{
// Use 'numeric' type for encoding/decoding across the wire
// It might be more efficient to create a native pgtype.Type, but would involve a lot of code. So this is
// probably good enough.
Value: &Uint8{},
Name: "uint8",
}
row := conn.QueryRow(p.dbContext, "SELECT oid FROM pg_type WHERE typname=$1", dt.Name)
if err := row.Scan(&dt.OID); err != nil {
return fmt.Errorf("retrieving OID for uint8 data type: %w", err)
}
p.pguint8 = &dt
}
conn.ConnInfo().RegisterDataType(*p.pguint8)
return nil
}
// Close closes the connection(s) to the database.
func (p *Postgresql) Close() error {
if p.writeChan != nil {
// We're using async mode. Gracefully close with timeout.
close(p.writeChan)
select {
case <-p.writeWaitGroup.C():
case <-time.NewTimer(time.Second * 5).C:
p.Logger.Warnf("Shutdown timeout expired while waiting for metrics to flush. Some metrics may not be written to database.")
}
}
// Die!
p.dbContextCancel()
if p.db != nil {
p.db.Close()
}
p.tableManager = nil
return nil
}
func (p *Postgresql) Write(metrics []telegraf.Metric) error {
if p.tagsCache != nil {
// gather at the start of write so there's less chance of any async operations ongoing
p.Logger.Debugf("cache: size=%d hit=%d miss=%d full=%d\n",
p.tagsCache.EntryCount(),
p.tagsCache.HitCount(),
p.tagsCache.MissCount(),
p.tagsCache.EvacuateCount(),
)
p.tagsCache.ResetStatistics()
}
tableSources := NewTableSources(p, metrics)
var err error
if p.db.Stat().MaxConns() > 1 {
p.writeConcurrent(tableSources)
} else {
err = p.writeSequential(tableSources)
}
if err != nil {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
// PgError doesn't include .Detail in Error(), so we concat it onto .Message.
if pgErr.Detail != "" {
pgErr.Message += "; " + pgErr.Detail
}
}
}
return err
}
func (p *Postgresql) writeSequential(tableSources map[string]*TableSource) error {
tx, err := p.db.Begin(p.dbContext)
if err != nil {
return fmt.Errorf("starting transaction: %w", err)
}
defer tx.Rollback(p.dbContext) //nolint:errcheck // In case of failure during commit, "err" from commit will be returned
for _, tableSource := range tableSources {
sp := tx
if len(tableSources) > 1 {
// wrap each sub-batch in a savepoint so that if a permanent error is received, we can drop just that one sub-batch, and insert everything else.
sp, err = tx.Begin(p.dbContext)
if err != nil {
return fmt.Errorf("starting savepoint: %w", err)
}
}
err := p.writeMetricsFromMeasure(p.dbContext, sp, tableSource)
if err != nil {
if isTempError(err) {
// return so that telegraf will retry the whole batch
return err
}
p.Logger.Errorf("write error (permanent, dropping sub-batch): %v", err)
if len(tableSources) == 1 {
return nil
}
// drop this one sub-batch and continue trying the rest
if err := sp.Rollback(p.dbContext); err != nil {
return err
}
}
// savepoints do not need to be committed (released), so save the round trip and skip it
}
if err := tx.Commit(p.dbContext); err != nil {
return fmt.Errorf("committing transaction: %w", err)
}
return nil
}
func (p *Postgresql) writeConcurrent(tableSources map[string]*TableSource) {
for _, tableSource := range tableSources {
select {
case p.writeChan <- tableSource:
case <-p.dbContext.Done():
return
}
}
}
func (p *Postgresql) writeWorker(ctx context.Context) {
defer p.writeWaitGroup.Done()
for {
select {
case tableSource, ok := <-p.writeChan:
if !ok {
return
}
if err := p.writeRetry(ctx, tableSource); err != nil {
p.Logger.Errorf("write error (permanent, dropping sub-batch): %v", err)
}
case <-p.dbContext.Done():
return
}
}
}
// isTempError reports whether the error received during a metric write operation is temporary or permanent.
// A temporary error is one that if the write were retried at a later time, that it might succeed.
// Note however that this applies to the transaction as a whole, not the individual operation. Meaning for example a
// write might come in that needs a new table created, but another worker already created the table in between when we
// checked for it, and tried to create it. In this case, the operation error is permanent, as we can try `CREATE TABLE`
// again and it will still fail. But if we retry the transaction from scratch, when we perform the table check we'll see
// it exists, so we consider the error temporary.
func isTempError(err error) bool {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr); pgErr != nil {
// https://www.postgresql.org/docs/12/errcodes-appendix.html
errClass := pgErr.Code[:2]
switch errClass {
case "23": // Integrity Constraint Violation
// 23505 - unique_violation
if pgErr.Code == "23505" && strings.Contains(err.Error(), "pg_type_typname_nsp_index") {
// Happens when you try to create 2 tables simultaneously.
return true
}
case "25": // Invalid Transaction State
// If we're here, this is a bug, but recoverable
return true
case "40": // Transaction Rollback
if pgErr.Code == "40P01" { // deadlock_detected
return true
}
case "42": // Syntax Error or Access Rule Violation
switch pgErr.Code {
case "42701": // duplicate_column
return true
case "42P07": // duplicate_table
return true
}
case "53": // Insufficient Resources
return true
case "57": // Operator Intervention
switch pgErr.Code {
case "57014": // query_cancelled
// This one is a bit of a mess. This code comes back when PGX cancels the query. Such as when PGX can't
// convert to the column's type. So even though the error was originally generated by PGX, we get the
// error from Postgres.
return false
case "57P04": // database_dropped
return false
}
return true
}
// Assume that any other error that comes from postgres is a permanent error
return false
}
var tempErr interface{ Temporary() bool }
if errors.As(err, &tempErr) {
return tempErr.Temporary()
}
// Assume that any other error is permanent.
// This may mean that we incorrectly discard data that could have been retried, but the alternative is that we get
// stuck retrying data that will never succeed, causing good data to be dropped because the buffer fills up.
return false
}
func (p *Postgresql) writeRetry(ctx context.Context, tableSource *TableSource) error {
backoff := time.Duration(0)
for {
err := p.writeMetricsFromMeasure(ctx, p.db, tableSource)
if err == nil {
return nil
}
if !isTempError(err) {
return err
}
p.Logger.Errorf("write error (retry in %s): %v", backoff, err)
tableSource.Reset()
time.Sleep(backoff)
if backoff == 0 {
backoff = time.Millisecond * 250
} else {
backoff *= 2
if backoff > time.Duration(p.RetryMaxBackoff) {
backoff = time.Duration(p.RetryMaxBackoff)
}
}
}
}
// Writes the metrics from a specified measure. All the provided metrics must belong to the same measurement.
func (p *Postgresql) writeMetricsFromMeasure(ctx context.Context, db dbh, tableSource *TableSource) error {
err := p.tableManager.MatchSource(ctx, db, tableSource)
if err != nil {
return err
}
if p.TagsAsForeignKeys {
if err = writeTagTable(ctx, db, tableSource); err != nil {
if p.ForeignTagConstraint {
return fmt.Errorf("writing to tag table %q: %w", tableSource.Name()+p.TagTableSuffix, err)
}
// log and continue. As the admin can correct the issue, and tags don't change over time, they can be
// added from future metrics after issue is corrected.
p.Logger.Errorf("writing to tag table %q: %s", tableSource.Name()+p.TagTableSuffix, err.Error())
}
}
fullTableName := utils.FullTableName(p.Schema, tableSource.Name())
if _, err := db.CopyFrom(ctx, fullTableName, tableSource.ColumnNames(), tableSource); err != nil {
return err
}
return nil
}
func writeTagTable(ctx context.Context, db dbh, tableSource *TableSource) error {
ttsrc := NewTagTableSource(tableSource)
// Check whether we have any tags to insert
if !ttsrc.Next() {
return nil
}
ttsrc.Reset()
// need a transaction so that if it errors, we don't roll back the parent transaction, just the tags
tx, err := db.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx) //nolint:errcheck // In case of failure during commit, "err" from commit will be returned
ident := pgx.Identifier{ttsrc.postgresql.Schema, ttsrc.Name()}
identTemp := pgx.Identifier{ttsrc.Name() + "_temp"}
sql := fmt.Sprintf("CREATE TEMP TABLE %s (LIKE %s) ON COMMIT DROP", identTemp.Sanitize(), ident.Sanitize())
if _, err := tx.Exec(ctx, sql); err != nil {
return fmt.Errorf("creating tags temp table: %w", err)
}
if _, err := tx.CopyFrom(ctx, identTemp, ttsrc.ColumnNames(), ttsrc); err != nil {
return fmt.Errorf("copying into tags temp table: %w", err)
}
insert := fmt.Sprintf("INSERT INTO %s SELECT * FROM %s ORDER BY tag_id ON CONFLICT (tag_id) DO NOTHING", ident.Sanitize(), identTemp.Sanitize())
if _, err := tx.Exec(ctx, insert); err != nil {
return fmt.Errorf("inserting into tags table: %w", err)
}
if err := tx.Commit(ctx); err != nil {
return err
}
ttsrc.UpdateCache()
return nil
}
func newPostgresql() *Postgresql {
p := &Postgresql{
Schema: "public",
TagTableSuffix: "_tag",
TagCacheSize: 100000,
Uint64Type: PgNumeric,
CreateTemplates: []*sqltemplate.Template{{}},
AddColumnTemplates: []*sqltemplate.Template{{}},
TagTableCreateTemplates: []*sqltemplate.Template{{}},
TagTableAddColumnTemplates: []*sqltemplate.Template{{}},
RetryMaxBackoff: config.Duration(time.Second * 15),
Logger: logger.New("outputs", "postgresql", ""),
LogLevel: "warn",
}
p.CreateTemplates[0].UnmarshalText([]byte(`CREATE TABLE {{ .table }} ({{ .columns }})`))
p.AddColumnTemplates[0].UnmarshalText([]byte(`ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}`))
p.TagTableCreateTemplates[0].UnmarshalText([]byte(`CREATE TABLE {{ .table }} ({{ .columns }}, PRIMARY KEY (tag_id))`))
p.TagTableAddColumnTemplates[0].UnmarshalText(
[]byte(`ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}`),
)
return p
}
func init() {
outputs.Add("postgresql", func() telegraf.Output { return newPostgresql() })
}