1
0
Fork 0
telegraf/plugins/outputs/postgresql/sqltemplate/template.go

406 lines
13 KiB
Go
Raw Permalink Normal View History

// Package sqltemplate
/*
Templates are used for creation of the SQL used when creating and modifying tables. These templates are specified within
the configuration as the parameters 'create_templates', 'add_column_templates', 'tag_table_create_templates', and
'tag_table_add_column_templates'.
The templating functionality behaves the same in all cases. However, the variables will differ.
# Variables
The following variables are available within all template executions:
- table - A Table object referring to the current table being
created/modified.
- columns - A Columns object of the new columns being added to the
table (all columns in the case of a new table, and new columns in the case
of existing table).
- allColumns - A Columns object of all the columns (both old and new)
of the table. In the case of a new table, this is the same as `columns`.
- metricTable - A Table object referring to the table containing the
fields. In the case of TagsAsForeignKeys and `table` is the tag table, then
`metricTable` is the table using this one for its tags.
- tagTable - A Table object referring to the table containing the
tags. In the case of TagsAsForeignKeys and `table` is the metrics table,
then `tagTable` is the table containing the tags for it.
Each object has helper methods that may be used within the template. See the documentation for the appropriate type.
When the object is interpolated without a helper, it is automatically converted to a string through its String() method.
# Functions
All the functions provided by the Sprig library (http://masterminds.github.io/sprig/) are available within template executions.
In addition, the following functions are also available:
- quoteIdentifier - Quotes the input string as a Postgres identifier.
- quoteLiteral - Quotes the input string as a Postgres literal.
# Examples
The default templates show basic usage. When left unconfigured, it is the equivalent of:
[outputs.postgresql]
create_templates = [
'''CREATE TABLE {{.table}} ({{.columns}})''',
]
add_column_templates = [
'''ALTER TABLE {{.table}} ADD COLUMN IF NOT EXISTS {{.columns|join ", ADD COLUMN IF NOT EXISTS "}}''',
]
tag_table_create_templates = [
'''CREATE TABLE {{.table}} ({{.columns}}, PRIMARY KEY (tag_id))'''
]
tag_table_add_column_templates = [
'''ALTER TABLE {{.table}} ADD COLUMN IF NOT EXISTS {{.columns|join ", ADD COLUMN IF NOT EXISTS "}}''',
]
A simple example for usage with TimescaleDB would be:
[outputs.postgresql]
create_templates = [
'''CREATE TABLE {{ .table }} ({{ .allColumns }})''',
'''SELECT create_hypertable({{ .table|quoteLiteral }}, 'time', chunk_time_interval => INTERVAL '1d')''',
'''ALTER TABLE {{ .table }} SET (timescaledb.compress, timescaledb.compress_segmentby = 'tag_id')''',
'''SELECT add_compression_policy({{ .table|quoteLiteral }}, INTERVAL '2h')''',
]
...where the defaults for the other templates would be automatically applied.
A very complex example for versions of TimescaleDB which don't support adding columns to compressed hypertables (v<2.1.0),
using views and unions to emulate the functionality, would be:
[outputs.postgresql]
schema = "telegraf"
create_templates = [
'''CREATE TABLE {{ .table }} ({{ .allColumns }})''',
'''SELECT create_hypertable({{ .table|quoteLiteral }}, 'time', chunk_time_interval => INTERVAL '1d')''',
'''ALTER TABLE {{ .table }} SET (timescaledb.compress, timescaledb.compress_segmentby = 'tag_id')''',
'''SELECT add_compression_policy({{ .table|quoteLiteral }}, INTERVAL '2d')''',
'''CREATE VIEW {{ .table.WithSuffix "_data" }} AS
SELECT {{ .allColumns.Selectors | join "," }} FROM {{ .table }}''',
'''CREATE VIEW {{ .table.WithSchema "public" }} AS
SELECT time, {{ (.tagTable.Columns.Tags.Concat .allColumns.Fields).Identifiers | join "," }}
FROM {{ .table.WithSuffix "_data" }} t, {{ .tagTable }} tt
WHERE t.tag_id = tt.tag_id''',
]
add_column_templates = [
'''ALTER TABLE {{ .table }} RENAME TO {{ (.table.WithSuffix "_" .table.Columns.Hash).WithSchema "" }}''',
'''ALTER VIEW {{ .table.WithSuffix "_data" }} RENAME TO {{ (.table.WithSuffix "_" .table.Columns.Hash "_data").WithSchema "" }}''',
'''DROP VIEW {{ .table.WithSchema "public" }}''',
'''CREATE TABLE {{ .table }} ({{ .allColumns }})''',
'''SELECT create_hypertable({{ .table|quoteLiteral }}, 'time', chunk_time_interval => INTERVAL '1d')''',
'''ALTER TABLE {{ .table }} SET (timescaledb.compress, timescaledb.compress_segmentby = 'tag_id')''',
'''SELECT add_compression_policy({{ .table|quoteLiteral }}, INTERVAL '2d')''',
'''CREATE VIEW {{ .table.WithSuffix "_data" }} AS
SELECT {{ .allColumns.Selectors | join "," }}
FROM {{ .table }}
UNION ALL
SELECT {{ (.allColumns.Union .table.Columns).Selectors | join "," }}
FROM {{ .table.WithSuffix "_" .table.Columns.Hash "_data" }}''',
'''CREATE VIEW {{ .table.WithSchema "public" }}
AS SELECT time, {{ (.tagTable.Columns.Tags.Concat .allColumns.Fields).Identifiers | join "," }}
FROM {{ .table.WithSuffix "_data" }} t, {{ .tagTable }} tt
WHERE t.tag_id = tt.tag_id''',
]
*/
package sqltemplate
import (
"bytes"
"encoding/base32"
"fmt"
"hash/fnv"
"strings"
"text/template"
"unsafe"
"github.com/Masterminds/sprig"
"github.com/influxdata/telegraf/plugins/outputs/postgresql/utils"
)
var templateFuncs = map[string]interface{}{
"quoteIdentifier": QuoteIdentifier,
"quoteLiteral": QuoteLiteral,
}
func asString(obj interface{}) string {
switch obj := obj.(type) {
case string:
return obj
case []byte:
return string(obj)
case fmt.Stringer:
return obj.String()
default:
return fmt.Sprintf("%v", obj)
}
}
// QuoteIdentifier quotes the given string as a Postgres identifier (double-quotes the value).
//
// QuoteIdentifier is accessible within templates as 'quoteIdentifier'.
func QuoteIdentifier(name interface{}) string {
return utils.QuoteIdentifier(asString(name))
}
// QuoteLiteral quotes the given string as a Postgres literal (single-quotes the value).
//
// QuoteLiteral is accessible within templates as 'quoteLiteral'.
func QuoteLiteral(str interface{}) string {
return utils.QuoteLiteral(asString(str))
}
// Table is an object which represents a Postgres table.
type Table struct {
Schema string
Name string
Columns Columns
}
func NewTable(schemaName, tableName string, columns []utils.Column) *Table {
if tableName == "" {
return nil
}
return &Table{
Schema: schemaName,
Name: tableName,
Columns: NewColumns(columns),
}
}
// String returns the table's fully qualified & quoted identifier (schema+table).
func (tbl *Table) String() string {
return tbl.Identifier()
}
// Identifier returns the table's fully qualified & quoted identifier (schema+table).
//
// If schema is empty, it is omitted from the result.
func (tbl *Table) Identifier() string {
if tbl.Schema == "" {
return QuoteIdentifier(tbl.Name)
}
return QuoteIdentifier(tbl.Schema) + "." + QuoteIdentifier(tbl.Name)
}
// WithSchema returns a copy of the Table object, but with the schema replaced by the given value.
func (tbl *Table) WithSchema(name string) *Table {
tblNew := &Table{}
*tblNew = *tbl
tblNew.Schema = name
return tblNew
}
// WithName returns a copy of the Table object, but with the name replaced by the given value.
func (tbl *Table) WithName(name string) *Table {
tblNew := &Table{}
*tblNew = *tbl
tblNew.Name = name
return tblNew
}
// WithSuffix returns a copy of the Table object, but with the name suffixed with the given value.
func (tbl *Table) WithSuffix(suffixes ...string) *Table {
tblNew := &Table{}
*tblNew = *tbl
tblNew.Name += strings.Join(suffixes, "")
return tblNew
}
// A Column is an object which represents a Postgres column.
type Column utils.Column
// String returns the column's definition (as used in a CREATE TABLE statement). E.G:
//
// "my_column" bigint
func (tc Column) String() string {
return tc.Definition()
}
// Definition returns the column's definition (as used in a CREATE TABLE statement). E.G:
//
// "my_column" bigint
func (tc Column) Definition() string {
return tc.Identifier() + " " + tc.Type
}
// Identifier returns the column's quoted identifier.
func (tc Column) Identifier() string {
return QuoteIdentifier(tc.Name)
}
// Selector returns the selector for the column. For most cases this is the same as Identifier.
// However, in some cases, such as a UNION, this may return a statement such as `NULL AS "foo"`.
func (tc Column) Selector() string {
if tc.Type != "" {
return tc.Identifier()
}
return "NULL AS " + tc.Identifier()
}
// IsTag returns true if the column is a tag column. Otherwise, false.
func (tc Column) IsTag() bool {
return tc.Role == utils.TagColType
}
// IsField returns true if the column is a field column. Otherwise, false.
func (tc Column) IsField() bool {
return tc.Role == utils.FieldColType
}
// Columns represents an ordered list of Column objects, with convenience methods for operating on the
// list.
type Columns []Column
func NewColumns(cols []utils.Column) Columns {
tcols := make(Columns, 0, len(cols))
for _, col := range cols {
tcols = append(tcols, Column(col))
}
return tcols
}
// List returns the Columns object as a slice of Column.
func (cols Columns) List() []Column {
return cols
}
// Definitions returns the list of column definitions.
func (cols Columns) Definitions() []string {
defs := make([]string, 0, len(cols))
for _, tc := range cols {
defs = append(defs, tc.Definition())
}
return defs
}
// Identifiers returns the list of quoted column identifiers.
func (cols Columns) Identifiers() []string {
idents := make([]string, 0, len(cols))
for _, tc := range cols {
idents = append(idents, tc.Identifier())
}
return idents
}
// Selectors returns the list of column selectors.
func (cols Columns) Selectors() []string {
selectors := make([]string, 0, len(cols))
for _, tc := range cols {
selectors = append(selectors, tc.Selector())
}
return selectors
}
// String returns the comma delimited list of column identifiers.
func (cols Columns) String() string {
colStrs := make([]string, 0, len(cols))
for _, tc := range cols {
colStrs = append(colStrs, tc.String())
}
return strings.Join(colStrs, ", ")
}
// Keys returns a Columns list of the columns which are not fields (e.g. time, tag_id, & tags).
func (cols Columns) Keys() Columns {
var newCols []Column
for _, tc := range cols {
if tc.Role != utils.FieldColType {
newCols = append(newCols, tc)
}
}
return newCols
}
// Sorted returns a sorted copy of Columns.
//
// Columns are sorted so that they are in order as: [Time, Tags, Fields], with the columns within each group sorted alphabetically.
func (cols Columns) Sorted() Columns {
newCols := make(Columns, 0, len(cols))
newCols = append(newCols, cols...)
(*utils.ColumnList)(unsafe.Pointer(&newCols)).Sort() //nolint:gosec // G103: Valid use of unsafe call to speed up sorting
return newCols
}
// Concat returns a copy of Columns with the given tcsList appended to the end.
func (cols Columns) Concat(tcsList ...Columns) Columns {
tcsNew := make(Columns, 0, len(cols)+len(tcsList))
tcsNew = append(tcsNew, cols...)
for _, tcs := range tcsList {
tcsNew = append(tcsNew, tcs...)
}
return tcsNew
}
// Tags returns a Columns list of the columns which are tags.
func (cols Columns) Tags() Columns {
var newCols []Column
for _, tc := range cols {
if tc.Role == utils.TagColType {
newCols = append(newCols, tc)
}
}
return newCols
}
// Fields returns a Columns list of the columns which are fields.
func (cols Columns) Fields() Columns {
var newCols []Column
for _, tc := range cols {
if tc.Role == utils.FieldColType {
newCols = append(newCols, tc)
}
}
return newCols
}
// Hash returns a hash of the column names. The hash is base-32 encoded string, up to 7 characters long with no padding.
//
// This can be useful as an identifier for supporting table renaming + unions in the case of non-modifiable tables.
func (cols Columns) Hash() string {
hash := fnv.New32a()
for _, tc := range cols.Sorted() {
hash.Write([]byte(tc.Name))
hash.Write([]byte{0})
}
return strings.ToLower(base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(hash.Sum(nil)))
}
type Template template.Template
func (t *Template) UnmarshalText(text []byte) error {
tmpl := template.New("")
tmpl.Option("missingkey=error")
tmpl.Funcs(templateFuncs)
tmpl.Funcs(sprig.TxtFuncMap())
tt, err := tmpl.Parse(string(text))
if err != nil {
return err
}
*t = Template(*tt)
return nil
}
func (t *Template) Render(table *Table, newColumns []utils.Column, metricTable, tagTable *Table) ([]byte, error) {
tcs := NewColumns(newColumns).Sorted()
data := map[string]interface{}{
"table": table,
"columns": tcs,
"allColumns": tcs.Concat(table.Columns).Sorted(),
"metricTable": metricTable,
"tagTable": tagTable,
}
buf := bytes.NewBuffer(nil)
err := (*template.Template)(t).Execute(buf, data)
return buf.Bytes(), err
}