//go:generate ../../../tools/readme_config_includer/generator package mysql import ( "database/sql" _ "embed" "errors" "fmt" "reflect" "regexp" "strconv" "strings" "sync" "time" "github.com/go-sql-driver/mysql" "github.com/gofrs/uuid/v5" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs/mysql/v1" "github.com/influxdata/telegraf/plugins/inputs/mysql/v2" ) //go:embed sample.conf var sampleConfig string var tlsRe = regexp.MustCompile(`([\?&])(?:tls=custom)($|&)`) const ( defaultPerfEventsStatementsDigestTextLimit = 120 defaultPerfEventsStatementsLimit = 250 defaultPerfEventsStatementsTimeLimit = 86400 defaultGatherGlobalVars = true localhost = "" ) type Mysql struct { Servers []*config.Secret `toml:"servers"` PerfEventsStatementsDigestTextLimit int64 `toml:"perf_events_statements_digest_text_limit"` PerfEventsStatementsLimit int64 `toml:"perf_events_statements_limit"` PerfEventsStatementsTimeLimit int64 `toml:"perf_events_statements_time_limit"` TableSchemaDatabases []string `toml:"table_schema_databases"` GatherProcessList bool `toml:"gather_process_list"` GatherUserStatistics bool `toml:"gather_user_statistics"` GatherInfoSchemaAutoInc bool `toml:"gather_info_schema_auto_inc"` GatherInnoDBMetrics bool `toml:"gather_innodb_metrics"` GatherSlaveStatus bool `toml:"gather_slave_status"` GatherReplicaStatus bool `toml:"gather_replica_status"` GatherAllSlaveChannels bool `toml:"gather_all_slave_channels"` MariadbDialect bool `toml:"mariadb_dialect"` GatherBinaryLogs bool `toml:"gather_binary_logs"` GatherTableIOWaits bool `toml:"gather_table_io_waits"` GatherTableLockWaits bool `toml:"gather_table_lock_waits"` GatherIndexIOWaits bool `toml:"gather_index_io_waits"` GatherEventWaits bool `toml:"gather_event_waits"` GatherTableSchema bool `toml:"gather_table_schema"` GatherFileEventsStats bool `toml:"gather_file_events_stats"` GatherPerfEventsStatements bool `toml:"gather_perf_events_statements"` GatherGlobalVars bool `toml:"gather_global_variables"` GatherPerfSummaryPerAccountPerEvent bool `toml:"gather_perf_sum_per_acc_per_event"` PerfSummaryEvents []string `toml:"perf_summary_events"` IntervalSlow config.Duration `toml:"interval_slow"` MetricVersion int `toml:"metric_version"` Log telegraf.Logger `toml:"-"` tls.ClientConfig lastT time.Time getStatusQuery string loggedConvertFields map[string]bool } func (*Mysql) SampleConfig() string { return sampleConfig } func (m *Mysql) Init() error { switch { case m.MariadbDialect && m.GatherReplicaStatus: m.getStatusQuery = replicaStatusQueryMariadb case m.MariadbDialect: m.getStatusQuery = slaveStatusQueryMariadb case m.GatherReplicaStatus: m.getStatusQuery = replicaStatusQuery default: m.getStatusQuery = slaveStatusQuery } // Default to localhost if nothing specified. if len(m.Servers) == 0 { s := config.NewSecret([]byte(localhost)) m.Servers = append(m.Servers, &s) } m.loggedConvertFields = make(map[string]bool) // Register the TLS configuration. Due to the registry being a global // one for the mysql package, we need to define unique IDs to avoid // side effects and races between different plugin instances. Therefore, // we decorate the "custom" naming of the "tls" parameter with an UUID. tlsuuid, err := uuid.NewV7() if err != nil { return fmt.Errorf("cannot create UUID: %w", err) } tlsid := "custom-" + tlsuuid.String() tlsConfig, err := m.ClientConfig.TLSConfig() if err != nil { return fmt.Errorf("registering TLS config: %w", err) } if tlsConfig != nil { if err := mysql.RegisterTLSConfig(tlsid, tlsConfig); err != nil { return err } } // Adapt the DSN string for i, server := range m.Servers { dsnSecret, err := server.Get() if err != nil { return fmt.Errorf("getting server %d failed: %w", i, err) } dsn := dsnSecret.String() dsnSecret.Destroy() // Reference the custom TLS config of _THIS_ plugin instance if tlsRe.MatchString(dsn) { dsn = tlsRe.ReplaceAllString(dsn, "${1}tls="+tlsid+"${2}") } conf, err := mysql.ParseDSN(dsn) if err != nil { return fmt.Errorf("parsing %q failed: %w", dsn, err) } // Set the default timeout if none specified if conf.Timeout == 0 { conf.Timeout = time.Second * 5 } if err := server.Set([]byte(conf.FormatDSN())); err != nil { return fmt.Errorf("replacing server %q failed: %w", dsn, err) } m.Servers[i] = server } return nil } func (m *Mysql) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup // Loop through each server and collect metrics for _, server := range m.Servers { wg.Add(1) go func(s *config.Secret) { defer wg.Done() acc.AddError(m.gatherServer(s, acc)) }(server) } wg.Wait() return nil } // These are const but can't be declared as such because golang doesn't allow const maps var ( // status counter generalThreadStates = map[string]uint32{ "after create": uint32(0), "altering table": uint32(0), "analyzing": uint32(0), "checking permissions": uint32(0), "checking table": uint32(0), "cleaning up": uint32(0), "closing tables": uint32(0), "converting heap to myisam": uint32(0), "copying to tmp table": uint32(0), "creating sort index": uint32(0), "creating table": uint32(0), "creating tmp table": uint32(0), "deleting": uint32(0), "executing": uint32(0), "execution of init_command": uint32(0), "end": uint32(0), "freeing items": uint32(0), "flushing tables": uint32(0), "fulltext initialization": uint32(0), "idle": uint32(0), "init": uint32(0), "killed": uint32(0), "waiting for lock": uint32(0), "logging slow query": uint32(0), "login": uint32(0), "manage keys": uint32(0), "opening tables": uint32(0), "optimizing": uint32(0), "preparing": uint32(0), "reading from net": uint32(0), "removing duplicates": uint32(0), "removing tmp table": uint32(0), "reopen tables": uint32(0), "repair by sorting": uint32(0), "repair done": uint32(0), "repair with keycache": uint32(0), "replication master": uint32(0), "rolling back": uint32(0), "searching rows for update": uint32(0), "sending data": uint32(0), "sorting for group": uint32(0), "sorting for order": uint32(0), "sorting index": uint32(0), "sorting result": uint32(0), "statistics": uint32(0), "updating": uint32(0), "waiting for tables": uint32(0), "waiting for table flush": uint32(0), "waiting on cond": uint32(0), "writing to net": uint32(0), "other": uint32(0), } // plaintext statuses stateStatusMappings = map[string]string{ "user sleep": "idle", "creating index": "altering table", "committing alter table to storage engine": "altering table", "discard or import tablespace": "altering table", "rename": "altering table", "setup": "altering table", "renaming result table": "altering table", "preparing for alter table": "altering table", "copying to group table": "copying to tmp table", "copy to tmp table": "copying to tmp table", "query end": "end", "update": "updating", "updating main table": "updating", "updating reference tables": "updating", "system lock": "waiting for lock", "user lock": "waiting for lock", "table lock": "waiting for lock", "deleting from main table": "deleting", "deleting from reference tables": "deleting", } ) // Math constants const ( picoSeconds = 1e12 ) // metric queries const ( globalStatusQuery = `SHOW GLOBAL STATUS` globalVariablesQuery = `SHOW GLOBAL VARIABLES` slaveStatusQuery = `SHOW SLAVE STATUS` replicaStatusQuery = `SHOW REPLICA STATUS` slaveStatusQueryMariadb = `SHOW ALL SLAVES STATUS` replicaStatusQueryMariadb = `SHOW ALL REPLICAS STATUS` binaryLogsQuery = `SHOW BINARY LOGS` infoSchemaProcessListQuery = ` SELECT COALESCE(command,''),COALESCE(state,''),count(*) FROM information_schema.processlist WHERE ID != connection_id() GROUP BY command,state ORDER BY null` infoSchemaUserStatisticsQuery = ` SELECT * FROM information_schema.user_statistics` infoSchemaAutoIncQuery = ` SELECT table_schema, table_name, column_name, auto_increment, CAST(pow(2, case data_type when 'tinyint' then 7 when 'smallint' then 15 when 'mediumint' then 23 when 'int' then 31 when 'bigint' then 63 end+(column_type like '% unsigned'))-1 as decimal(19)) as max_int FROM information_schema.tables t JOIN information_schema.columns c USING (table_schema,table_name) WHERE c.extra = 'auto_increment' AND t.auto_increment IS NOT NULL ` innoDBMetricsQuery = ` SELECT NAME, COUNT FROM information_schema.INNODB_METRICS WHERE status='enabled' ` innoDBMetricsQueryMariadb = ` EXECUTE IMMEDIATE CONCAT(" SELECT NAME, COUNT FROM information_schema.INNODB_METRICS WHERE ", IF(version() REGEXP '10\.[1-4]\\..*',"status='enabled'", "ENABLED=1"), " "); ` perfTableIOWaitsQuery = ` SELECT OBJECT_SCHEMA, OBJECT_NAME, COUNT_FETCH, COUNT_INSERT, COUNT_UPDATE, COUNT_DELETE, SUM_TIMER_FETCH, SUM_TIMER_INSERT, SUM_TIMER_UPDATE, SUM_TIMER_DELETE FROM performance_schema.table_io_waits_summary_by_table WHERE OBJECT_SCHEMA NOT IN ('mysql', 'performance_schema') ` perfIndexIOWaitsQuery = ` SELECT OBJECT_SCHEMA, OBJECT_NAME, ifnull(INDEX_NAME, 'NONE') as INDEX_NAME, COUNT_FETCH, COUNT_INSERT, COUNT_UPDATE, COUNT_DELETE, SUM_TIMER_FETCH, SUM_TIMER_INSERT, SUM_TIMER_UPDATE, SUM_TIMER_DELETE FROM performance_schema.table_io_waits_summary_by_index_usage WHERE OBJECT_SCHEMA NOT IN ('mysql', 'performance_schema') ` perfTableLockWaitsQuery = ` SELECT OBJECT_SCHEMA, OBJECT_NAME, COUNT_READ_NORMAL, COUNT_READ_WITH_SHARED_LOCKS, COUNT_READ_HIGH_PRIORITY, COUNT_READ_NO_INSERT, COUNT_READ_EXTERNAL, COUNT_WRITE_ALLOW_WRITE, COUNT_WRITE_CONCURRENT_INSERT, COUNT_WRITE_LOW_PRIORITY, COUNT_WRITE_NORMAL, COUNT_WRITE_EXTERNAL, SUM_TIMER_READ_NORMAL, SUM_TIMER_READ_WITH_SHARED_LOCKS, SUM_TIMER_READ_HIGH_PRIORITY, SUM_TIMER_READ_NO_INSERT, SUM_TIMER_READ_EXTERNAL, SUM_TIMER_WRITE_ALLOW_WRITE, SUM_TIMER_WRITE_CONCURRENT_INSERT, SUM_TIMER_WRITE_LOW_PRIORITY, SUM_TIMER_WRITE_NORMAL, SUM_TIMER_WRITE_EXTERNAL FROM performance_schema.table_lock_waits_summary_by_table WHERE OBJECT_SCHEMA NOT IN ('mysql', 'performance_schema', 'information_schema') ` perfEventsStatementsQuery = ` SELECT ifnull(SCHEMA_NAME, 'NONE') as SCHEMA_NAME, DIGEST, LEFT(DIGEST_TEXT, %d) as DIGEST_TEXT, COUNT_STAR, SUM_TIMER_WAIT, SUM_ERRORS, SUM_WARNINGS, SUM_ROWS_AFFECTED, SUM_ROWS_SENT, SUM_ROWS_EXAMINED, SUM_CREATED_TMP_DISK_TABLES, SUM_CREATED_TMP_TABLES, SUM_SORT_MERGE_PASSES, SUM_SORT_ROWS, SUM_NO_INDEX_USED FROM performance_schema.events_statements_summary_by_digest WHERE SCHEMA_NAME NOT IN ('mysql', 'performance_schema', 'information_schema') AND last_seen > DATE_SUB(NOW(), INTERVAL %d SECOND) ORDER BY SUM_TIMER_WAIT DESC LIMIT %d ` perfEventWaitsQuery = ` SELECT EVENT_NAME, COUNT_STAR, SUM_TIMER_WAIT FROM performance_schema.events_waits_summary_global_by_event_name ` perfFileEventsQuery = ` SELECT EVENT_NAME, COUNT_READ, SUM_TIMER_READ, SUM_NUMBER_OF_BYTES_READ, COUNT_WRITE, SUM_TIMER_WRITE, SUM_NUMBER_OF_BYTES_WRITE, COUNT_MISC, SUM_TIMER_MISC FROM performance_schema.file_summary_by_event_name ` tableSchemaQuery = ` SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, ifnull(ENGINE, 'NONE') as ENGINE, ifnull(VERSION, '0') as VERSION, ifnull(ROW_FORMAT, 'NONE') as ROW_FORMAT, ifnull(TABLE_ROWS, '0') as TABLE_ROWS, ifnull(DATA_LENGTH, '0') as DATA_LENGTH, ifnull(INDEX_LENGTH, '0') as INDEX_LENGTH, ifnull(DATA_FREE, '0') as DATA_FREE, ifnull(CREATE_OPTIONS, 'NONE') as CREATE_OPTIONS FROM information_schema.tables WHERE TABLE_SCHEMA = '%s' ` dbListQuery = ` SELECT SCHEMA_NAME FROM information_schema.schemata WHERE SCHEMA_NAME NOT IN ('mysql', 'performance_schema', 'information_schema') ` perfSchemaTablesQuery = ` SELECT table_name FROM information_schema.tables WHERE table_schema = 'performance_schema' AND table_name = ? ` perfSummaryPerAccountPerEvent = ` SELECT coalesce(user, "unknown"), coalesce(host, "unknown"), coalesce(event_name, "unknown"), count_star, sum_timer_wait, min_timer_wait, avg_timer_wait, max_timer_wait, sum_lock_time, sum_errors, sum_warnings, sum_rows_affected, sum_rows_sent, sum_rows_examined, sum_created_tmp_disk_tables, sum_created_tmp_tables, sum_select_full_join, sum_select_full_range_join, sum_select_range, sum_select_range_check, sum_select_scan, sum_sort_merge_passes, sum_sort_range, sum_sort_rows, sum_sort_scan, sum_no_index_used, sum_no_good_index_used FROM performance_schema.events_statements_summary_by_account_by_event_name ` ) func (m *Mysql) gatherServer(server *config.Secret, acc telegraf.Accumulator) error { dsnSecret, err := server.Get() if err != nil { return err } dsn := dsnSecret.String() dsnSecret.Destroy() servtag := getDSNTag(dsn) db, err := sql.Open("mysql", dsn) if err != nil { return err } defer db.Close() err = m.gatherGlobalStatuses(db, servtag, acc) if err != nil { return err } if m.GatherGlobalVars { // Global Variables may be gathered less often interval := time.Duration(m.IntervalSlow) if interval >= time.Second && time.Since(m.lastT) >= interval { if err := m.gatherGlobalVariables(db, servtag, acc); err != nil { return err } m.lastT = time.Now() } } if m.GatherBinaryLogs { err = gatherBinaryLogs(db, servtag, acc) if err != nil { return err } } if m.GatherProcessList { err = m.gatherProcessListStatuses(db, servtag, acc) if err != nil { return err } } if m.GatherUserStatistics { err = m.gatherUserStatisticsStatuses(db, servtag, acc) if err != nil { return err } } if m.GatherSlaveStatus || m.GatherReplicaStatus { err = m.gatherSlaveStatuses(db, servtag, acc) if err != nil { return err } } if m.GatherInfoSchemaAutoInc { err = m.gatherInfoSchemaAutoIncStatuses(db, servtag, acc) if err != nil { return err } } if m.GatherInnoDBMetrics { err = m.gatherInnoDBMetrics(db, servtag, acc) if err != nil { return err } } if m.GatherPerfSummaryPerAccountPerEvent { err = m.gatherPerfSummaryPerAccountPerEvent(db, servtag, acc) if err != nil { return err } } if m.GatherTableIOWaits { err = gatherPerfTableIOWaits(db, servtag, acc) if err != nil { return err } } if m.GatherIndexIOWaits { err = gatherPerfIndexIOWaits(db, servtag, acc) if err != nil { return err } } if m.GatherTableLockWaits { err = gatherPerfTableLockWaits(db, servtag, acc) if err != nil { return err } } if m.GatherEventWaits { err = gatherPerfEventWaits(db, servtag, acc) if err != nil { return err } } if m.GatherFileEventsStats { err = gatherPerfFileEventsStatuses(db, servtag, acc) if err != nil { return err } } if m.GatherPerfEventsStatements { err = m.gatherPerfEventsStatements(db, servtag, acc) if err != nil { return err } } if m.GatherTableSchema { err = m.gatherTableSchema(db, servtag, acc) if err != nil { return err } } return nil } // gatherGlobalVariables can be used to fetch all global variables from // MySQL environment. func (m *Mysql) gatherGlobalVariables(db *sql.DB, servtag string, acc telegraf.Accumulator) error { // run query rows, err := db.Query(globalVariablesQuery) if err != nil { return err } defer rows.Close() var key string var val sql.RawBytes // parse DSN and save server tag tags := map[string]string{"server": servtag} fields := make(map[string]interface{}) for rows.Next() { if err := rows.Scan(&key, &val); err != nil { return err } key = strings.ToLower(key) // parse mysql version and put into field and tag if strings.Contains(key, "version") { fields[key] = string(val) tags[key] = string(val) } value, err := m.parseGlobalVariables(key, val) if err != nil { errString := fmt.Errorf("error parsing mysql global variable %q=%q: %w", key, string(val), err) if m.MetricVersion < 2 { m.Log.Debug(errString) } else { acc.AddError(errString) } } else { fields[key] = value } // Send 20 fields at a time if len(fields) >= 20 { acc.AddFields("mysql_variables", fields, tags) fields = make(map[string]interface{}) } } // Send any remaining fields if len(fields) > 0 { acc.AddFields("mysql_variables", fields, tags) } return nil } func (m *Mysql) parseGlobalVariables(key string, value sql.RawBytes) (interface{}, error) { if m.MetricVersion < 2 { return v1.ParseValue(value) } return v2.ConvertGlobalVariables(key, value) } // gatherSlaveStatuses can be used to get replication analytics // When the server is slave, then it returns only one row. // If the multi-source replication is set, then everything works differently // This code does not work with multi-source replication. func (m *Mysql) gatherSlaveStatuses(db *sql.DB, servtag string, acc telegraf.Accumulator) error { // run query var rows *sql.Rows var err error rows, err = db.Query(m.getStatusQuery) if err != nil { return err } defer rows.Close() tags := map[string]string{"server": servtag} fields := make(map[string]interface{}) // for each channel record for rows.Next() { // to save the column names as a field key // scanning keys and values separately // get columns names, and create an array with its length cols, err := rows.ColumnTypes() if err != nil { return err } vals := make([]sql.RawBytes, len(cols)) valPtrs := make([]interface{}, len(cols)) // fill the array with sql.Rawbytes for i := range vals { vals[i] = sql.RawBytes{} valPtrs[i] = &vals[i] } if err := rows.Scan(valPtrs...); err != nil { return err } // range over columns, and try to parse values for i, col := range cols { colName := col.Name() if m.MetricVersion >= 2 { colName = strings.ToLower(colName) } colValue := vals[i] if m.GatherAllSlaveChannels && (strings.EqualFold(colName, "channel_name") || strings.EqualFold(colName, "connection_name")) { // Since the default channel name is empty, we need this block channelName := "default" if len(colValue) > 0 { channelName = string(colValue) } tags["channel"] = channelName continue } if len(colValue) == 0 { continue } value, err := m.parseValueByDatabaseTypeName(colValue, col.DatabaseTypeName()) if err != nil { errString := fmt.Errorf("error parsing mysql slave status %q=%q: %w", colName, string(colValue), err) if m.MetricVersion < 2 { m.Log.Debug(errString) } else { acc.AddError(errString) } continue } fields["slave_"+colName] = value } acc.AddFields("mysql", fields, tags) // Only the first row is relevant if not all slave-channels should be gathered, // so break here and skip the remaining rows if !m.GatherAllSlaveChannels { break } } return nil } // gatherBinaryLogs can be used to collect size and count of all binary files // binlogs metric requires the MySQL server to turn it on in configuration func gatherBinaryLogs(db *sql.DB, servtag string, acc telegraf.Accumulator) error { // run query rows, err := db.Query(binaryLogsQuery) if err != nil { return err } defer rows.Close() // parse DSN and save host as a tag tags := map[string]string{"server": servtag} var ( size uint64 count uint64 fileSize uint64 fileName string encrypted string ) columns, err := rows.Columns() if err != nil { return err } numColumns := len(columns) // iterate over rows and count the size and count of files for rows.Next() { if numColumns == 3 { if err := rows.Scan(&fileName, &fileSize, &encrypted); err != nil { return err } } else { if err := rows.Scan(&fileName, &fileSize); err != nil { return err } } size += fileSize count++ } fields := map[string]interface{}{ "binary_size_bytes": size, "binary_files_count": count, } acc.AddFields("mysql", fields, tags) return nil } // gatherGlobalStatuses can be used to get MySQL status metrics // the mappings of actual names and names of each status to be exported // to output is provided on mappings variable func (m *Mysql) gatherGlobalStatuses(db *sql.DB, servtag string, acc telegraf.Accumulator) error { // run query rows, err := db.Query(globalStatusQuery) if err != nil { return err } defer rows.Close() // parse the DSN and save host name as a tag tags := map[string]string{"server": servtag} fields := make(map[string]interface{}) for rows.Next() { var key string var val sql.RawBytes if err := rows.Scan(&key, &val); err != nil { return err } if m.MetricVersion < 2 { var found bool for _, mapped := range v1.Mappings { if strings.HasPrefix(key, mapped.OnServer) { // convert numeric values to integer var i int v := string(val) switch v { case "ON", "true": i = 1 case "OFF", "false": i = 0 default: if i, err = strconv.Atoi(v); err != nil { // Make the value a value to prevent adding // the field containing nonsense values. i = 0 if !m.loggedConvertFields[key] { m.Log.Warnf("Cannot convert value %q for key %q to integer outputting zero...", v, key) m.loggedConvertFields[key] = true } } } fields[mapped.InExport+key[len(mapped.OnServer):]] = i found = true } } // Send 20 fields at a time if len(fields) >= 20 { acc.AddFields("mysql", fields, tags) fields = make(map[string]interface{}) } if found { continue } // search for specific values switch key { case "Queries": i, err := strconv.ParseInt(string(val), 10, 64) if err != nil { acc.AddError(fmt.Errorf("error parsing mysql %q int value: %w", key, err)) } else { fields["queries"] = i } case "Questions": i, err := strconv.ParseInt(string(val), 10, 64) if err != nil { acc.AddError(fmt.Errorf("error parsing mysql %q int value: %w", key, err)) } else { fields["questions"] = i } case "Slow_queries": i, err := strconv.ParseInt(string(val), 10, 64) if err != nil { acc.AddError(fmt.Errorf("error parsing mysql %q int value: %w", key, err)) } else { fields["slow_queries"] = i } case "Connections": i, err := strconv.ParseInt(string(val), 10, 64) if err != nil { acc.AddError(fmt.Errorf("error parsing mysql %q int value: %w", key, err)) } else { fields["connections"] = i } case "Syncs": i, err := strconv.ParseInt(string(val), 10, 64) if err != nil { acc.AddError(fmt.Errorf("error parsing mysql %q int value: %w", key, err)) } else { fields["syncs"] = i } case "Uptime": i, err := strconv.ParseInt(string(val), 10, 64) if err != nil { acc.AddError(fmt.Errorf("error parsing mysql %q int value: %w", key, err)) } else { fields["uptime"] = i } } } else { key = strings.ToLower(key) value, err := v2.ConvertGlobalStatus(key, val) if err != nil { acc.AddError(fmt.Errorf("error parsing mysql global status %q=%q: %w", key, string(val), err)) } else { fields[key] = value } } // Send 20 fields at a time if len(fields) >= 20 { acc.AddFields("mysql", fields, tags) fields = make(map[string]interface{}) } } // Send any remaining fields if len(fields) > 0 { acc.AddFields("mysql", fields, tags) } return nil } // GatherProcessList can be used to collect metrics on each running command // and its state with its running count func (m *Mysql) gatherProcessListStatuses(db *sql.DB, servtag string, acc telegraf.Accumulator) error { // run query rows, err := db.Query(infoSchemaProcessListQuery) if err != nil { return err } defer rows.Close() var ( command string state string count uint32 ) fields := make(map[string]interface{}) // mapping of state with its counts stateCounts := make(map[string]uint32, len(generalThreadStates)) // set map with keys and default values for k, v := range generalThreadStates { stateCounts[k] = v } for rows.Next() { err = rows.Scan(&command, &state, &count) if err != nil { return err } // each state has its mapping foundState := findThreadState(command, state) // count each state stateCounts[foundState] += count } tags := map[string]string{"server": servtag} for s, c := range stateCounts { fields[newNamespace("threads", s)] = c } if m.MetricVersion < 2 { acc.AddFields("mysql_info_schema", fields, tags) } else { acc.AddFields("mysql_process_list", fields, tags) } // get count of connections from each user connRows, err := db.Query("SELECT user, sum(1) AS connections FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user") if err != nil { return err } defer connRows.Close() for connRows.Next() { var user string var connections int64 err = connRows.Scan(&user, &connections) if err != nil { return err } tags := map[string]string{"server": servtag, "user": user} fields := make(map[string]interface{}) fields["connections"] = connections acc.AddFields("mysql_users", fields, tags) } return nil } // GatherUserStatisticsStatuses can be used to collect metrics on each running command // and its state with its running count func (m *Mysql) gatherUserStatisticsStatuses(db *sql.DB, servtag string, acc telegraf.Accumulator) error { // run query rows, err := db.Query(infoSchemaUserStatisticsQuery) if err != nil { // disable collecting if table is not found (mysql specific error) // (suppresses repeat errors) if strings.Contains(err.Error(), "nknown table 'user_statistics'") { m.GatherUserStatistics = false } return err } defer rows.Close() cols, err := columnsToLower(rows.Columns()) if err != nil { return err } read, err := getColSlice(rows) if err != nil { return err } for rows.Next() { err = rows.Scan(read...) if err != nil { return err } tags := map[string]string{"server": servtag, "user": *read[0].(*string)} fields := make(map[string]interface{}, len(cols)) for i := range cols { if i == 0 { continue // skip "user" } switch v := read[i].(type) { case *int64: fields[cols[i]] = *v case *float64: fields[cols[i]] = *v case *string: fields[cols[i]] = *v default: return fmt.Errorf("unknown column type - %T", v) } } acc.AddFields("mysql_user_stats", fields, tags) } return nil } // columnsToLower converts selected column names to lowercase. func columnsToLower(s []string, e error) ([]string, error) { if e != nil { return nil, e } d := make([]string, len(s)) for i := range s { d[i] = strings.ToLower(s[i]) } return d, nil } // getColSlice returns an in interface slice that can be used in the row.Scan(). func getColSlice(rows *sql.Rows) ([]interface{}, error) { columnTypes, err := rows.ColumnTypes() if err != nil { return nil, err } l := len(columnTypes) // list of all possible column names var ( user string totalConnections int64 concurrentConnections int64 connectedTime int64 busyTime int64 cpuTime int64 bytesReceived int64 bytesSent int64 binlogBytesWritten int64 rowsRead int64 rowsSent int64 rowsDeleted int64 rowsInserted int64 rowsUpdated int64 selectCommands int64 updateCommands int64 otherCommands int64 commitTransactions int64 rollbackTransactions int64 deniedConnections int64 lostConnections int64 accessDenied int64 emptyQueries int64 totalSslConnections int64 maxStatementTimeExceeded int64 // maria specific fbusyTime float64 fcpuTime float64 // percona specific rowsFetched int64 tableRowsRead int64 ) switch l { case 23: // maria5 return []interface{}{ &user, &totalConnections, &concurrentConnections, &connectedTime, &fbusyTime, &fcpuTime, &bytesReceived, &bytesSent, &binlogBytesWritten, &rowsRead, &rowsSent, &rowsDeleted, &rowsInserted, &rowsUpdated, &selectCommands, &updateCommands, &otherCommands, &commitTransactions, &rollbackTransactions, &deniedConnections, &lostConnections, &accessDenied, &emptyQueries, }, nil case 25: // maria10 return []interface{}{ &user, &totalConnections, &concurrentConnections, &connectedTime, &fbusyTime, &fcpuTime, &bytesReceived, &bytesSent, &binlogBytesWritten, &rowsRead, &rowsSent, &rowsDeleted, &rowsInserted, &rowsUpdated, &selectCommands, &updateCommands, &otherCommands, &commitTransactions, &rollbackTransactions, &deniedConnections, &lostConnections, &accessDenied, &emptyQueries, &totalSslConnections, &maxStatementTimeExceeded, }, nil case 21: // mysql 5.5 return []interface{}{ &user, &totalConnections, &concurrentConnections, &connectedTime, &busyTime, &cpuTime, &bytesReceived, &bytesSent, &binlogBytesWritten, &rowsFetched, &rowsUpdated, &tableRowsRead, &selectCommands, &updateCommands, &otherCommands, &commitTransactions, &rollbackTransactions, &deniedConnections, &lostConnections, &accessDenied, &emptyQueries, }, nil case 22: // percona cols := make([]interface{}, 0, 22) for i, ct := range columnTypes { // The first column is the user and has to be a string if i == 0 { cols = append(cols, new(string)) continue } // Percona 8 has some special fields that are float instead of ints // see: https://github.com/influxdata/telegraf/issues/7360 switch ct.ScanType().Kind() { case reflect.Float32, reflect.Float64: cols = append(cols, new(float64)) default: // Keep old type for backward compatibility cols = append(cols, new(int64)) } } return cols, nil } return nil, fmt.Errorf("not Supported - %d columns", l) } // gatherPerfTableIOWaits can be used to get total count and time of I/O wait event for each table and process func gatherPerfTableIOWaits(db *sql.DB, servtag string, acc telegraf.Accumulator) error { rows, err := db.Query(perfTableIOWaitsQuery) if err != nil { return err } defer rows.Close() var ( objSchema, objName string countFetch, countInsert, countUpdate, countDelete float64 timeFetch, timeInsert, timeUpdate, timeDelete float64 ) for rows.Next() { err = rows.Scan(&objSchema, &objName, &countFetch, &countInsert, &countUpdate, &countDelete, &timeFetch, &timeInsert, &timeUpdate, &timeDelete, ) if err != nil { return err } tags := map[string]string{ "server": servtag, "schema": objSchema, "name": objName, } fields := map[string]interface{}{ "table_io_waits_total_fetch": countFetch, "table_io_waits_total_insert": countInsert, "table_io_waits_total_update": countUpdate, "table_io_waits_total_delete": countDelete, "table_io_waits_seconds_total_fetch": timeFetch / picoSeconds, "table_io_waits_seconds_total_insert": timeInsert / picoSeconds, "table_io_waits_seconds_total_update": timeUpdate / picoSeconds, "table_io_waits_seconds_total_delete": timeDelete / picoSeconds, } acc.AddFields("mysql_perf_schema", fields, tags) } return nil } // gatherPerfIndexIOWaits can be used to get total count and time of I/O wait event for each index and process func gatherPerfIndexIOWaits(db *sql.DB, servtag string, acc telegraf.Accumulator) error { rows, err := db.Query(perfIndexIOWaitsQuery) if err != nil { return err } defer rows.Close() var ( objSchema, objName, indexName string countFetch, countInsert, countUpdate, countDelete float64 timeFetch, timeInsert, timeUpdate, timeDelete float64 ) for rows.Next() { err = rows.Scan(&objSchema, &objName, &indexName, &countFetch, &countInsert, &countUpdate, &countDelete, &timeFetch, &timeInsert, &timeUpdate, &timeDelete, ) if err != nil { return err } tags := map[string]string{ "server": servtag, "schema": objSchema, "name": objName, "index": indexName, } fields := map[string]interface{}{ "index_io_waits_total_fetch": countFetch, "index_io_waits_seconds_total_fetch": timeFetch / picoSeconds, } // update write columns only when index is NONE if indexName == "NONE" { fields["index_io_waits_total_insert"] = countInsert fields["index_io_waits_total_update"] = countUpdate fields["index_io_waits_total_delete"] = countDelete fields["index_io_waits_seconds_total_insert"] = timeInsert / picoSeconds fields["index_io_waits_seconds_total_update"] = timeUpdate / picoSeconds fields["index_io_waits_seconds_total_delete"] = timeDelete / picoSeconds } acc.AddFields("mysql_perf_schema", fields, tags) } return nil } // gatherInfoSchemaAutoIncStatuses can be used to get auto incremented values of the column func (m *Mysql) gatherInfoSchemaAutoIncStatuses(db *sql.DB, servtag string, acc telegraf.Accumulator) error { rows, err := db.Query(infoSchemaAutoIncQuery) if err != nil { return err } defer rows.Close() var ( schema, table, column string incValue, maxInt uint64 ) for rows.Next() { if err := rows.Scan(&schema, &table, &column, &incValue, &maxInt); err != nil { return err } tags := map[string]string{ "server": servtag, "schema": schema, "table": table, "column": column, } fields := make(map[string]interface{}) fields["auto_increment_column"] = incValue fields["auto_increment_column_max"] = maxInt if m.MetricVersion < 2 { acc.AddFields("mysql_info_schema", fields, tags) } else { acc.AddFields("mysql_table_schema", fields, tags) } } return nil } // gatherInnoDBMetrics can be used to fetch enabled metrics from // information_schema.INNODB_METRICS func (m *Mysql) gatherInnoDBMetrics(db *sql.DB, servtag string, acc telegraf.Accumulator) error { var ( query string ) if m.MariadbDialect { query = innoDBMetricsQueryMariadb } else { query = innoDBMetricsQuery } // run query rows, err := db.Query(query) if err != nil { return err } defer rows.Close() // parse DSN and save server tag tags := map[string]string{"server": servtag} fields := make(map[string]interface{}) for rows.Next() { var key string var val sql.RawBytes if err := rows.Scan(&key, &val); err != nil { return err } key = strings.ToLower(key) value, err := m.parseValueByDatabaseTypeName(val, "BIGINT") if err != nil { acc.AddError(fmt.Errorf("error parsing mysql InnoDB metric %q=%q: %w", key, string(val), err)) continue } fields[key] = value // Send 20 fields at a time if len(fields) >= 20 { acc.AddFields("mysql_innodb", fields, tags) fields = make(map[string]interface{}) } } // Send any remaining fields if len(fields) > 0 { acc.AddFields("mysql_innodb", fields, tags) } return nil } // gatherPerfSummaryPerAccountPerEvent can be used to fetch enabled metrics from // performance_schema.events_statements_summary_by_account_by_event_name func (m *Mysql) gatherPerfSummaryPerAccountPerEvent(db *sql.DB, servtag string, acc telegraf.Accumulator) error { sqlQuery := perfSummaryPerAccountPerEvent var rows *sql.Rows var err error var ( srcUser string srcHost string eventName string countStar float64 sumTimerWait float64 minTimerWait float64 avgTimerWait float64 maxTimerWait float64 sumLockTime float64 sumErrors float64 sumWarnings float64 sumRowsAffected float64 sumRowsSent float64 sumRowsExamined float64 sumCreatedTmpDiskTables float64 sumCreatedTmpTables float64 sumSelectFullJoin float64 sumSelectFullRangeJoin float64 sumSelectRange float64 sumSelectRangeCheck float64 sumSelectScan float64 sumSortMergePasses float64 sumSortRange float64 sumSortRows float64 sumSortScan float64 sumNoIndexUsed float64 sumNoGoodIndexUsed float64 ) var events []interface{} // if we have perf_summary_events set - select only listed events (adding filter criteria for rows) if len(m.PerfSummaryEvents) > 0 { sqlQuery += " WHERE EVENT_NAME IN (" for i, eventName := range m.PerfSummaryEvents { if i > 0 { sqlQuery += ", " } sqlQuery += "?" events = append(events, eventName) } sqlQuery += ")" rows, err = db.Query(sqlQuery, events...) } else { // otherwise no filter, hence, select all rows rows, err = db.Query(perfSummaryPerAccountPerEvent) } if err != nil { return err } defer rows.Close() // parse DSN and save server tag tags := map[string]string{"server": servtag} for rows.Next() { if err := rows.Scan( &srcUser, &srcHost, &eventName, &countStar, &sumTimerWait, &minTimerWait, &avgTimerWait, &maxTimerWait, &sumLockTime, &sumErrors, &sumWarnings, &sumRowsAffected, &sumRowsSent, &sumRowsExamined, &sumCreatedTmpDiskTables, &sumCreatedTmpTables, &sumSelectFullJoin, &sumSelectFullRangeJoin, &sumSelectRange, &sumSelectRangeCheck, &sumSelectScan, &sumSortMergePasses, &sumSortRange, &sumSortRows, &sumSortScan, &sumNoIndexUsed, &sumNoGoodIndexUsed, ); err != nil { return err } srcUser = strings.ToLower(srcUser) srcHost = strings.ToLower(srcHost) sqlLWTags := copyTags(tags) sqlLWTags["src_user"] = srcUser sqlLWTags["src_host"] = srcHost sqlLWTags["event"] = eventName sqlLWFields := map[string]interface{}{ "count_star": countStar, "sum_timer_wait": sumTimerWait, "min_timer_wait": minTimerWait, "avg_timer_wait": avgTimerWait, "max_timer_wait": maxTimerWait, "sum_lock_time": sumLockTime, "sum_errors": sumErrors, "sum_warnings": sumWarnings, "sum_rows_affected": sumRowsAffected, "sum_rows_sent": sumRowsSent, "sum_rows_examined": sumRowsExamined, "sum_created_tmp_disk_tables": sumCreatedTmpDiskTables, "sum_created_tmp_tables": sumCreatedTmpTables, "sum_select_full_join": sumSelectFullJoin, "sum_select_full_range_join": sumSelectFullRangeJoin, "sum_select_range": sumSelectRange, "sum_select_range_check": sumSelectRangeCheck, "sum_select_scan": sumSelectScan, "sum_sort_merge_passes": sumSortMergePasses, "sum_sort_range": sumSortRange, "sum_sort_rows": sumSortRows, "sum_sort_scan": sumSortScan, "sum_no_index_used": sumNoIndexUsed, "sum_no_good_index_used": sumNoGoodIndexUsed, } acc.AddFields("mysql_perf_acc_event", sqlLWFields, sqlLWTags) } return nil } // gatherPerfTableLockWaits can be used to get // the total number and time for SQL and external lock wait events // for each table and operation // requires the MySQL server to be enabled to save this metric func gatherPerfTableLockWaits(db *sql.DB, servtag string, acc telegraf.Accumulator) error { // check if table exists, // if performance_schema is not enabled, tables do not exist // then there is no need to scan them var tableName string err := db.QueryRow(perfSchemaTablesQuery, "table_lock_waits_summary_by_table").Scan(&tableName) switch { case errors.Is(err, sql.ErrNoRows): return nil case err != nil: return err } rows, err := db.Query(perfTableLockWaitsQuery) if err != nil { return err } defer rows.Close() var ( objectSchema string objectName string countReadNormal float64 countReadWithSharedLocks float64 countReadHighPriority float64 countReadNoInsert float64 countReadExternal float64 countWriteAllowWrite float64 countWriteConcurrentInsert float64 countWriteLowPriority float64 countWriteNormal float64 countWriteExternal float64 timeReadNormal float64 timeReadWithSharedLocks float64 timeReadHighPriority float64 timeReadNoInsert float64 timeReadExternal float64 timeWriteAllowWrite float64 timeWriteConcurrentInsert float64 timeWriteLowPriority float64 timeWriteNormal float64 timeWriteExternal float64 ) for rows.Next() { err = rows.Scan( &objectSchema, &objectName, &countReadNormal, &countReadWithSharedLocks, &countReadHighPriority, &countReadNoInsert, &countReadExternal, &countWriteAllowWrite, &countWriteConcurrentInsert, &countWriteLowPriority, &countWriteNormal, &countWriteExternal, &timeReadNormal, &timeReadWithSharedLocks, &timeReadHighPriority, &timeReadNoInsert, &timeReadExternal, &timeWriteAllowWrite, &timeWriteConcurrentInsert, &timeWriteLowPriority, &timeWriteNormal, &timeWriteExternal, ) if err != nil { return err } tags := map[string]string{ "server": servtag, "schema": objectSchema, "table": objectName, } sqlLWTags := copyTags(tags) sqlLWTags["perf_query"] = "sql_lock_waits_total" sqlLWFields := map[string]interface{}{ "read_normal": countReadNormal, "read_with_shared_locks": countReadWithSharedLocks, "read_high_priority": countReadHighPriority, "read_no_insert": countReadNoInsert, "write_normal": countWriteNormal, "write_allow_write": countWriteAllowWrite, "write_concurrent_insert": countWriteConcurrentInsert, "write_low_priority": countWriteLowPriority, } acc.AddFields("mysql_perf_schema", sqlLWFields, sqlLWTags) externalLWTags := copyTags(tags) externalLWTags["perf_query"] = "external_lock_waits_total" externalLWFields := map[string]interface{}{ "read": countReadExternal, "write": countWriteExternal, } acc.AddFields("mysql_perf_schema", externalLWFields, externalLWTags) sqlLWSecTotalTags := copyTags(tags) sqlLWSecTotalTags["perf_query"] = "sql_lock_waits_seconds_total" sqlLWSecTotalFields := map[string]interface{}{ "read_normal": timeReadNormal / picoSeconds, "read_with_shared_locks": timeReadWithSharedLocks / picoSeconds, "read_high_priority": timeReadHighPriority / picoSeconds, "read_no_insert": timeReadNoInsert / picoSeconds, "write_normal": timeWriteNormal / picoSeconds, "write_allow_write": timeWriteAllowWrite / picoSeconds, "write_concurrent_insert": timeWriteConcurrentInsert / picoSeconds, "write_low_priority": timeWriteLowPriority / picoSeconds, } acc.AddFields("mysql_perf_schema", sqlLWSecTotalFields, sqlLWSecTotalTags) externalLWSecTotalTags := copyTags(tags) externalLWSecTotalTags["perf_query"] = "external_lock_waits_seconds_total" externalLWSecTotalFields := map[string]interface{}{ "read": timeReadExternal / picoSeconds, "write": timeWriteExternal / picoSeconds, } acc.AddFields("mysql_perf_schema", externalLWSecTotalFields, externalLWSecTotalTags) } return nil } // gatherPerfEventWaits can be used to get total time and number of event waits func gatherPerfEventWaits(db *sql.DB, servtag string, acc telegraf.Accumulator) error { rows, err := db.Query(perfEventWaitsQuery) if err != nil { return err } defer rows.Close() var ( event string starCount, timeWait float64 ) tags := map[string]string{ "server": servtag, } for rows.Next() { if err := rows.Scan(&event, &starCount, &timeWait); err != nil { return err } tags["event_name"] = event fields := map[string]interface{}{ "events_waits_total": starCount, "events_waits_seconds_total": timeWait / picoSeconds, } acc.AddFields("mysql_perf_schema", fields, tags) } return nil } // gatherPerfFileEvents can be used to get stats on file events func gatherPerfFileEventsStatuses(db *sql.DB, servtag string, acc telegraf.Accumulator) error { rows, err := db.Query(perfFileEventsQuery) if err != nil { return err } defer rows.Close() var ( eventName string countRead, countWrite, countMisc float64 sumTimerRead, sumTimerWrite, sumTimerMisc float64 sumNumBytesRead, sumNumBytesWrite float64 ) tags := map[string]string{ "server": servtag, } for rows.Next() { err = rows.Scan( &eventName, &countRead, &sumTimerRead, &sumNumBytesRead, &countWrite, &sumTimerWrite, &sumNumBytesWrite, &countMisc, &sumTimerMisc, ) if err != nil { return err } tags["event_name"] = eventName fields := make(map[string]interface{}) miscTags := copyTags(tags) miscTags["mode"] = "misc" fields["file_events_total"] = countWrite fields["file_events_seconds_total"] = sumTimerMisc / picoSeconds acc.AddFields("mysql_perf_schema", fields, miscTags) readTags := copyTags(tags) readTags["mode"] = "read" fields["file_events_total"] = countRead fields["file_events_seconds_total"] = sumTimerRead / picoSeconds fields["file_events_bytes_totals"] = sumNumBytesRead acc.AddFields("mysql_perf_schema", fields, readTags) writeTags := copyTags(tags) writeTags["mode"] = "write" fields["file_events_total"] = countWrite fields["file_events_seconds_total"] = sumTimerWrite / picoSeconds fields["file_events_bytes_totals"] = sumNumBytesWrite acc.AddFields("mysql_perf_schema", fields, writeTags) } return nil } // gatherPerfEventsStatements can be used to get attributes of each event func (m *Mysql) gatherPerfEventsStatements(db *sql.DB, servtag string, acc telegraf.Accumulator) error { query := fmt.Sprintf( perfEventsStatementsQuery, m.PerfEventsStatementsDigestTextLimit, m.PerfEventsStatementsTimeLimit, m.PerfEventsStatementsLimit, ) rows, err := db.Query(query) if err != nil { return err } defer rows.Close() var ( schemaName, digest, digestText string count, queryTime, errs, warnings float64 rowsAffected, rowsSent, rowsExamined float64 tmpTables, tmpDiskTables float64 sortMergePasses, sortRows float64 noIndexUsed float64 ) tags := map[string]string{ "server": servtag, } for rows.Next() { err = rows.Scan( &schemaName, &digest, &digestText, &count, &queryTime, &errs, &warnings, &rowsAffected, &rowsSent, &rowsExamined, &tmpTables, &tmpDiskTables, &sortMergePasses, &sortRows, &noIndexUsed, ) if err != nil { return err } tags["schema"] = schemaName tags["digest"] = digest tags["digest_text"] = digestText fields := map[string]interface{}{ "events_statements_total": count, "events_statements_seconds_total": queryTime / picoSeconds, "events_statements_errors_total": errs, "events_statements_warnings_total": warnings, "events_statements_rows_affected_total": rowsAffected, "events_statements_rows_sent_total": rowsSent, "events_statements_rows_examined_total": rowsExamined, "events_statements_tmp_tables_total": tmpTables, "events_statements_tmp_disk_tables_total": tmpDiskTables, "events_statements_sort_merge_passes_total": sortMergePasses, "events_statements_sort_rows_total": sortRows, "events_statements_no_index_used_total": noIndexUsed, } acc.AddFields("mysql_perf_schema", fields, tags) } return nil } // gatherTableSchema can be used to gather stats on each schema func (m *Mysql) gatherTableSchema(db *sql.DB, servtag string, acc telegraf.Accumulator) error { var dbList []string // if the list of databases if empty, then get all databases if len(m.TableSchemaDatabases) == 0 { rows, err := db.Query(dbListQuery) if err != nil { return err } defer rows.Close() var database string for rows.Next() { err = rows.Scan(&database) if err != nil { return err } dbList = append(dbList, database) } } else { dbList = m.TableSchemaDatabases } for _, database := range dbList { err := m.gatherSchemaForDB(db, database, servtag, acc) if err != nil { return err } } return nil } func (m *Mysql) gatherSchemaForDB(db *sql.DB, database, servtag string, acc telegraf.Accumulator) error { rows, err := db.Query(fmt.Sprintf(tableSchemaQuery, database)) if err != nil { return err } defer rows.Close() var ( tableSchema string tableName string tableType string engine string version float64 rowFormat string tableRows float64 dataLength float64 indexLength float64 dataFree float64 createOptions string ) for rows.Next() { err = rows.Scan( &tableSchema, &tableName, &tableType, &engine, &version, &rowFormat, &tableRows, &dataLength, &indexLength, &dataFree, &createOptions, ) if err != nil { return err } tags := map[string]string{"server": servtag} tags["schema"] = tableSchema tags["table"] = tableName if m.MetricVersion < 2 { acc.AddFields(newNamespace("info_schema", "table_rows"), map[string]interface{}{"value": tableRows}, tags) dlTags := copyTags(tags) dlTags["component"] = "data_length" acc.AddFields(newNamespace("info_schema", "table_size", "data_length"), map[string]interface{}{"value": dataLength}, dlTags) ilTags := copyTags(tags) ilTags["component"] = "index_length" acc.AddFields(newNamespace("info_schema", "table_size", "index_length"), map[string]interface{}{"value": indexLength}, ilTags) dfTags := copyTags(tags) dfTags["component"] = "data_free" acc.AddFields(newNamespace("info_schema", "table_size", "data_free"), map[string]interface{}{"value": dataFree}, dfTags) } else { acc.AddFields("mysql_table_schema", map[string]interface{}{"rows": tableRows}, tags) acc.AddFields("mysql_table_schema", map[string]interface{}{"data_length": dataLength}, tags) acc.AddFields("mysql_table_schema", map[string]interface{}{"index_length": indexLength}, tags) acc.AddFields("mysql_table_schema", map[string]interface{}{"data_free": dataFree}, tags) } versionTags := copyTags(tags) versionTags["type"] = tableType versionTags["engine"] = engine versionTags["row_format"] = rowFormat versionTags["create_options"] = createOptions if m.MetricVersion < 2 { acc.AddFields(newNamespace("info_schema", "table_version"), map[string]interface{}{"value": version}, versionTags) } else { acc.AddFields("mysql_table_schema_version", map[string]interface{}{"table_version": version}, versionTags) } } return nil } func (m *Mysql) parseValueByDatabaseTypeName(value sql.RawBytes, databaseTypeName string) (interface{}, error) { if m.MetricVersion < 2 { return v1.ParseValue(value) } switch databaseTypeName { case "INT": return v2.ParseInt(value) case "BIGINT": return v2.ParseUint(value) case "VARCHAR": return v2.ParseString(value) default: m.Log.Debugf("unknown database type name %q in parseValueByDatabaseTypeName", databaseTypeName) return v2.ParseValue(value) } } // findThreadState can be used to find thread state by command and plain state func findThreadState(rawCommand, rawState string) string { var ( // replace '_' symbol with space command = strings.ReplaceAll(strings.ToLower(rawCommand), "_", " ") state = strings.ReplaceAll(strings.ToLower(rawState), "_", " ") ) // if the state is already valid, then return it if _, ok := generalThreadStates[state]; ok { return state } // if state is plain, return the mapping if mappedState, ok := stateStatusMappings[state]; ok { return mappedState } // if the state is any lock, return the special state if strings.Contains(state, "waiting for") && strings.Contains(state, "lock") { return "waiting for lock" } if command == "sleep" && state == "" { return "idle" } if command == "query" { return "executing" } if command == "binlog dump" { return "replication master" } // if no mappings found and state is invalid, then return "other" state return "other" } // newNamespace can be used to make a namespace func newNamespace(words ...string) string { return strings.ReplaceAll(strings.Join(words, "_"), " ", "_") } func copyTags(in map[string]string) map[string]string { out := make(map[string]string) for k, v := range in { out[k] = v } return out } func getDSNTag(dsn string) string { conf, err := mysql.ParseDSN(dsn) if err != nil { return "127.0.0.1:3306" } return conf.Addr } func init() { inputs.Add("mysql", func() telegraf.Input { return &Mysql{ PerfEventsStatementsDigestTextLimit: defaultPerfEventsStatementsDigestTextLimit, PerfEventsStatementsLimit: defaultPerfEventsStatementsLimit, PerfEventsStatementsTimeLimit: defaultPerfEventsStatementsTimeLimit, GatherGlobalVars: defaultGatherGlobalVars, } }) }