1
0
Fork 0
telegraf/plugins/outputs/postgresql/table_manager.go

449 lines
13 KiB
Go
Raw Permalink Normal View History

package postgresql
import (
"context"
"fmt"
"strings"
"sync"
"github.com/jackc/pgx/v4"
"github.com/influxdata/telegraf/plugins/outputs/postgresql/sqltemplate"
"github.com/influxdata/telegraf/plugins/outputs/postgresql/utils"
)
// This is an arbitrary constant value shared between multiple telegraf processes used for locking schema updates.
const schemaAdvisoryLockID int64 = 5705450890675909945
type tableState struct {
name string
columns map[string]utils.Column
sync.RWMutex
}
type TableManager struct {
*Postgresql
// map[tableName]map[columnName]utils.Column
tables map[string]*tableState
tablesMutex sync.Mutex
// Map to track which columns are already logged
loggedLongColumnWarn map[string]bool
loggedLongColumnErr map[string]bool
}
// NewTableManager returns an instance of the tables.Manager interface
// that can handle checking and updating the state of tables in the PG database.
func NewTableManager(postgresql *Postgresql) *TableManager {
return &TableManager{
Postgresql: postgresql,
tables: make(map[string]*tableState),
loggedLongColumnWarn: make(map[string]bool),
loggedLongColumnErr: make(map[string]bool),
}
}
// ClearTableCache clear the table structure cache.
func (tm *TableManager) ClearTableCache() {
tm.tablesMutex.Lock()
for _, tbl := range tm.tables {
tbl.Lock()
tbl.columns = nil
tbl.Unlock()
}
tm.tablesMutex.Unlock()
if tm.tagsCache != nil {
tm.tagsCache.Clear()
}
}
func (tm *TableManager) table(name string) *tableState {
tm.tablesMutex.Lock()
tbl := tm.tables[name]
if tbl == nil {
tbl = &tableState{name: name}
tm.tables[name] = tbl
}
tm.tablesMutex.Unlock()
return tbl
}
// MatchSource scans through the metrics, determining what columns are needed for inserting, and ensuring the DB schema matches.
//
// If the schema does not match, and schema updates are disabled:
// If a field missing from the DB, the field is omitted.
// If a tag is missing from the DB, the metric is dropped.
func (tm *TableManager) MatchSource(ctx context.Context, db dbh, rowSource *TableSource) error {
metricTable := tm.table(rowSource.Name())
var tagTable *tableState
if tm.TagsAsForeignKeys {
tagTable = tm.table(metricTable.name + tm.TagTableSuffix)
missingCols, err := tm.EnsureStructure(
ctx,
db,
tagTable,
rowSource.TagTableColumns(),
tm.TagTableCreateTemplates,
tm.TagTableAddColumnTemplates,
metricTable,
tagTable,
)
if err != nil {
if isTempError(err) {
return err
}
tm.Postgresql.Logger.Errorf("Permanent error updating schema for %s: %v", tagTable.name, err)
}
if len(missingCols) > 0 {
colDefs := make([]string, 0, len(missingCols))
for _, col := range missingCols {
if err := rowSource.DropColumn(col); err != nil {
return fmt.Errorf("metric/table mismatch: Unable to omit field/column from %q: %w", tagTable.name, err)
}
colDefs = append(colDefs, col.Name+" "+col.Type)
}
tm.Logger.Errorf("Table %q is missing tag columns (dropping metrics): %s",
tagTable.name,
strings.Join(colDefs, ", "))
}
}
missingCols, err := tm.EnsureStructure(
ctx,
db,
metricTable,
rowSource.MetricTableColumns(),
tm.CreateTemplates,
tm.AddColumnTemplates,
metricTable,
tagTable,
)
if err != nil {
if isTempError(err) {
return err
}
tm.Postgresql.Logger.Errorf("Permanent error updating schema for %s: %v", metricTable.name, err)
}
if len(missingCols) > 0 {
colDefs := make([]string, 0, len(missingCols))
for _, col := range missingCols {
if err := rowSource.DropColumn(col); err != nil {
return fmt.Errorf("metric/table mismatch: Unable to omit field/column from %q: %w", metricTable.name, err)
}
colDefs = append(colDefs, col.Name+" "+col.Type)
}
tm.Logger.Errorf("Table %q is missing columns (omitting fields): %s",
metricTable.name,
strings.Join(colDefs, ", "))
}
return nil
}
// EnsureStructure ensures that the table identified by tableName contains the provided columns.
//
// createTemplates and addColumnTemplates are the templates which are executed in the event of table create or alter
// (respectively).
// metricsTableName and tagsTableName are passed to the templates.
//
// If the table cannot be modified, the returned column list is the columns which are missing from the table. This
// includes when an error is returned.
//
//nolint:revive //argument-limit conditionally more arguments allowed
func (tm *TableManager) EnsureStructure(
ctx context.Context,
db dbh,
tbl *tableState,
columns []utils.Column,
createTemplates, addColumnsTemplates []*sqltemplate.Template,
metricsTable, tagsTable *tableState,
) ([]utils.Column, error) {
// Sort so that:
// * When we create/alter the table the columns are in a sane order (telegraf gives us the fields in random order)
// * When we display errors about missing columns, the order is also sane, and consistent
utils.ColumnList(columns).Sort()
// rlock, read, runlock, wlock, read, read_db, wlock_db, read_db, write_db, wunlock_db, wunlock
// rlock
tbl.RLock()
// read
currCols := tbl.columns
// runlock
tbl.RUnlock()
missingCols := diffMissingColumns(currCols, columns)
if len(missingCols) == 0 {
return nil, nil
}
// check that the missing columns are columns that can be added
addColumns := make([]utils.Column, 0, len(missingCols))
invalidColumns := make([]utils.Column, 0, len(missingCols))
for i, col := range missingCols {
if tm.ColumnNameLenLimit > 0 && len(col.Name) > tm.ColumnNameLenLimit {
if !tm.loggedLongColumnWarn[col.Name] {
tm.Postgresql.Logger.Warnf("Limiting too long column name: %q", col.Name)
tm.loggedLongColumnWarn[col.Name] = true
}
col.Name = col.Name[:tm.ColumnNameLenLimit]
missingCols[i] = col
}
if validateColumnName(col.Name) {
addColumns = append(addColumns, col)
continue
}
if col.Role == utils.TagColType {
return nil, fmt.Errorf("column name too long: %q", col.Name)
}
if !tm.loggedLongColumnErr[col.Name] {
tm.Postgresql.Logger.Errorf("Column name too long: %q", col.Name)
tm.loggedLongColumnErr[col.Name] = true
}
invalidColumns = append(invalidColumns, col)
}
// wlock
// We also need to lock the other table as it may be referenced by a template.
// To prevent deadlock, the metric & tag table must always be locked in the same order: 1) Tag, 2) Metric
if tbl == tagsTable {
tagsTable.Lock()
defer tagsTable.Unlock()
metricsTable.RLock()
defer metricsTable.RUnlock()
} else {
if tagsTable != nil {
tagsTable.RLock()
defer tagsTable.RUnlock()
}
metricsTable.Lock()
defer metricsTable.Unlock()
}
// read
currCols = tbl.columns
addColumns = diffMissingColumns(currCols, addColumns)
if len(addColumns) == 0 {
return invalidColumns, nil
}
// read_db
var err error
if currCols, err = tm.getColumns(ctx, db, tbl.name); err != nil {
return nil, err
}
tbl.columns = currCols
addColumns = diffMissingColumns(currCols, addColumns)
if len(addColumns) == 0 {
tbl.columns = currCols
return invalidColumns, nil
}
if len(currCols) == 0 && len(createTemplates) == 0 {
// can't create
return columns, nil
}
if len(currCols) != 0 && len(addColumnsTemplates) == 0 {
// can't add
return append(addColumns, invalidColumns...), nil
}
if len(currCols) == 0 && !tm.validateTableName(tbl.name) {
return nil, fmt.Errorf("table name too long: %s", tbl.name)
}
// wlock_db
tx, err := db.Begin(ctx)
if err != nil {
return append(addColumns, invalidColumns...), err
}
defer tx.Rollback(ctx) //nolint:errcheck // In case of failure during commit, "err" from commit will be returned
// It's possible to have multiple telegraf processes, in which we can't ensure they all lock tables in the same
// order. So to prevent possible deadlocks, we have to have a single lock for all schema modifications.
if _, err := tx.Exec(ctx, "SELECT pg_advisory_xact_lock($1)", schemaAdvisoryLockID); err != nil {
return append(addColumns, invalidColumns...), err
}
// read_db
if currCols, err = tm.getColumns(ctx, tx, tbl.name); err != nil {
return nil, err
}
tbl.columns = currCols
if currCols != nil {
addColumns = diffMissingColumns(currCols, addColumns)
if len(addColumns) == 0 {
return invalidColumns, nil
}
}
// write_db
var tmpls []*sqltemplate.Template
if len(currCols) == 0 {
tmpls = createTemplates
} else {
tmpls = addColumnsTemplates
}
if err := tm.update(ctx, tx, tbl, tmpls, addColumns, metricsTable, tagsTable); err != nil {
return append(addColumns, invalidColumns...), err
}
if currCols, err = tm.getColumns(ctx, tx, tbl.name); err != nil {
return append(addColumns, invalidColumns...), err
}
if err := tx.Commit(ctx); err != nil {
return append(addColumns, invalidColumns...), err
}
tbl.columns = currCols
// wunlock_db (deferred)
// wunlock (deferred)
return invalidColumns, nil
}
func (tm *TableManager) getColumns(ctx context.Context, db dbh, name string) (map[string]utils.Column, error) {
rows, err := db.Query(ctx, `
SELECT
column_name,
CASE WHEN data_type='USER-DEFINED' THEN udt_name ELSE data_type END,
col_description(format('%I.%I', table_schema, table_name)::regclass::oid, ordinal_position)
FROM information_schema.columns
WHERE table_schema = $1 and table_name = $2`, tm.Schema, name)
if err != nil {
return nil, err
}
defer rows.Close()
cols := make(map[string]utils.Column)
for rows.Next() {
var colName, colType string
desc := new(string)
err := rows.Scan(&colName, &colType, &desc)
if err != nil {
return nil, err
}
role := utils.FieldColType
switch colName {
case tm.timeColumn.Name:
role = utils.TimeColType
case tm.tagIDColumn.Name:
role = utils.TagsIDColType
case tm.tagsJSONColumn.Name:
role = utils.TagColType
case tm.fieldsJSONColumn.Name:
role = utils.FieldColType
default:
// We don't want to monopolize the column comment (preventing user from storing other information there),
// so just look at the first word
if desc != nil {
descWords := strings.Split(*desc, " ")
if descWords[0] == "tag" {
role = utils.TagColType
}
}
}
cols[colName] = utils.Column{
Name: colName,
Type: colType,
Role: role,
}
}
return cols, rows.Err()
}
//nolint:revive //argument-limit conditionally more arguments allowed
func (tm *TableManager) update(ctx context.Context,
tx pgx.Tx,
state *tableState,
tmpls []*sqltemplate.Template,
missingCols []utils.Column,
metricsTable, tagsTable *tableState,
) error {
tmplTable := sqltemplate.NewTable(tm.Schema, state.name, colMapToSlice(state.columns))
metricsTmplTable := sqltemplate.NewTable(tm.Schema, metricsTable.name, colMapToSlice(metricsTable.columns))
var tagsTmplTable *sqltemplate.Table
if tagsTable != nil {
tagsTmplTable = sqltemplate.NewTable(tm.Schema, tagsTable.name, colMapToSlice(tagsTable.columns))
} else {
tagsTmplTable = sqltemplate.NewTable("", "", nil)
}
for _, tmpl := range tmpls {
sql, err := tmpl.Render(tmplTable, missingCols, metricsTmplTable, tagsTmplTable)
if err != nil {
return err
}
if _, err := tx.Exec(ctx, string(sql)); err != nil {
return fmt.Errorf("executing %q: %w", sql, err)
}
}
// We need to be able to determine the role of the column when reading the structure back (because of the templates).
// For some columns we can determine this by the column name (time, tag_id, etc). However tags and fields can have any
// name, and look the same. So we add a comment to tag columns, and through process of elimination what remains are
// field columns.
for _, col := range missingCols {
if col.Role != utils.TagColType {
continue
}
stmt := fmt.Sprintf("COMMENT ON COLUMN %s.%s IS 'tag'",
tmplTable.String(), sqltemplate.QuoteIdentifier(col.Name))
if _, err := tx.Exec(ctx, stmt); err != nil {
return fmt.Errorf("setting column role comment: %w", err)
}
}
return nil
}
const maxIdentifierLength = 63
func (tm *TableManager) validateTableName(name string) bool {
if tm.Postgresql.TagsAsForeignKeys {
return len([]byte(name))+len([]byte(tm.Postgresql.TagTableSuffix)) <= maxIdentifierLength
}
return len([]byte(name)) <= maxIdentifierLength
}
func validateColumnName(name string) bool {
return len([]byte(name)) <= maxIdentifierLength
}
// diffMissingColumns filters srcColumns to the ones not present in dbColumns.
func diffMissingColumns(dbColumns map[string]utils.Column, srcColumns []utils.Column) []utils.Column {
if len(dbColumns) == 0 {
return srcColumns
}
var missingColumns []utils.Column
for _, srcCol := range srcColumns {
if _, ok := dbColumns[srcCol.Name]; !ok {
missingColumns = append(missingColumns, srcCol)
continue
}
}
return missingColumns
}
func colMapToSlice(colMap map[string]utils.Column) []utils.Column {
if colMap == nil {
return nil
}
cols := make([]utils.Column, 0, len(colMap))
for _, col := range colMap {
cols = append(cols, col)
}
return cols
}