641 lines
17 KiB
Go
641 lines
17 KiB
Go
|
//go:generate ../../../tools/readme_config_includer/generator
|
||
|
package clickhouse
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
_ "embed"
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"net"
|
||
|
"net/http"
|
||
|
"net/url"
|
||
|
"strconv"
|
||
|
"strings"
|
||
|
"time"
|
||
|
|
||
|
"github.com/influxdata/telegraf"
|
||
|
"github.com/influxdata/telegraf/config"
|
||
|
"github.com/influxdata/telegraf/internal"
|
||
|
"github.com/influxdata/telegraf/plugins/common/tls"
|
||
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||
|
)
|
||
|
|
||
|
//go:embed sample.conf
|
||
|
var sampleConfig string
|
||
|
|
||
|
var defaultTimeout = 5 * time.Second
|
||
|
|
||
|
type ClickHouse struct {
|
||
|
Username string `toml:"username"`
|
||
|
Password string `toml:"password"`
|
||
|
Servers []string `toml:"servers"`
|
||
|
AutoDiscovery bool `toml:"auto_discovery"`
|
||
|
ClusterInclude []string `toml:"cluster_include"`
|
||
|
ClusterExclude []string `toml:"cluster_exclude"`
|
||
|
Timeout config.Duration `toml:"timeout"`
|
||
|
Variant string `toml:"variant"`
|
||
|
|
||
|
HTTPClient http.Client
|
||
|
tls.ClientConfig
|
||
|
}
|
||
|
|
||
|
type connect struct {
|
||
|
Cluster string `json:"cluster"`
|
||
|
ShardNum int `json:"shard_num"`
|
||
|
Hostname string `json:"host_name"`
|
||
|
url *url.URL
|
||
|
}
|
||
|
|
||
|
func (*ClickHouse) SampleConfig() string {
|
||
|
return sampleConfig
|
||
|
}
|
||
|
|
||
|
func (ch *ClickHouse) Init() error {
|
||
|
switch ch.Variant {
|
||
|
case "":
|
||
|
ch.Variant = "self-hosted"
|
||
|
case "self-hosted", "managed":
|
||
|
// valid options
|
||
|
default:
|
||
|
return fmt.Errorf("unknown variant %q", ch.Variant)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Start ClickHouse input service
|
||
|
func (ch *ClickHouse) Start(telegraf.Accumulator) error {
|
||
|
timeout := defaultTimeout
|
||
|
if time.Duration(ch.Timeout) != 0 {
|
||
|
timeout = time.Duration(ch.Timeout)
|
||
|
}
|
||
|
tlsCfg, err := ch.ClientConfig.TLSConfig()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
ch.HTTPClient = http.Client{
|
||
|
Timeout: timeout,
|
||
|
Transport: &http.Transport{
|
||
|
TLSClientConfig: tlsCfg,
|
||
|
Proxy: http.ProxyFromEnvironment,
|
||
|
MaxIdleConnsPerHost: 1,
|
||
|
},
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Gather collect data from ClickHouse server
|
||
|
func (ch *ClickHouse) Gather(acc telegraf.Accumulator) (err error) {
|
||
|
var (
|
||
|
connects []connect
|
||
|
exists = func(host string) bool {
|
||
|
for _, c := range connects {
|
||
|
if c.Hostname == host {
|
||
|
return true
|
||
|
}
|
||
|
}
|
||
|
return false
|
||
|
}
|
||
|
)
|
||
|
|
||
|
for _, server := range ch.Servers {
|
||
|
u, err := url.Parse(server)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
switch {
|
||
|
case ch.AutoDiscovery:
|
||
|
var conns []connect
|
||
|
if err := ch.execQuery(u, "SELECT cluster, shard_num, host_name FROM system.clusters "+ch.clusterIncludeExcludeFilter(), &conns); err != nil {
|
||
|
acc.AddError(err)
|
||
|
continue
|
||
|
}
|
||
|
for _, c := range conns {
|
||
|
if !exists(c.Hostname) {
|
||
|
c.url = &url.URL{
|
||
|
Scheme: u.Scheme,
|
||
|
Host: net.JoinHostPort(c.Hostname, u.Port()),
|
||
|
}
|
||
|
connects = append(connects, c)
|
||
|
}
|
||
|
}
|
||
|
default:
|
||
|
connects = append(connects, connect{
|
||
|
Hostname: u.Hostname(),
|
||
|
url: u,
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
for i := range connects {
|
||
|
metricsFuncs := []func(acc telegraf.Accumulator, conn *connect) error{
|
||
|
ch.tables,
|
||
|
ch.replicationQueue,
|
||
|
ch.detachedParts,
|
||
|
ch.dictionaries,
|
||
|
ch.mutations,
|
||
|
ch.disks,
|
||
|
ch.processes,
|
||
|
ch.textLog,
|
||
|
}
|
||
|
|
||
|
// Managed instances on Clickhouse Cloud does not give a user
|
||
|
// permissions to the zookeeper table
|
||
|
if ch.Variant != "managed" {
|
||
|
metricsFuncs = append(metricsFuncs, ch.zookeeper)
|
||
|
}
|
||
|
|
||
|
for _, metricFunc := range metricsFuncs {
|
||
|
if err := metricFunc(acc, &connects[i]); err != nil {
|
||
|
acc.AddError(err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
for metric := range commonMetrics {
|
||
|
if err := ch.commonMetrics(acc, &connects[i], metric); err != nil {
|
||
|
acc.AddError(err)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (ch *ClickHouse) Stop() {
|
||
|
ch.HTTPClient.CloseIdleConnections()
|
||
|
}
|
||
|
|
||
|
func (ch *ClickHouse) clusterIncludeExcludeFilter() string {
|
||
|
if len(ch.ClusterInclude) == 0 && len(ch.ClusterExclude) == 0 {
|
||
|
return ""
|
||
|
}
|
||
|
var (
|
||
|
escape = func(in string) string {
|
||
|
return "'" + strings.NewReplacer(`\`, `\\`, `'`, `\'`).Replace(in) + "'"
|
||
|
}
|
||
|
makeFilter = func(expr string, args []string) string {
|
||
|
in := make([]string, 0, len(args))
|
||
|
for _, v := range args {
|
||
|
in = append(in, escape(v))
|
||
|
}
|
||
|
return fmt.Sprintf("cluster %s (%s)", expr, strings.Join(in, ", "))
|
||
|
}
|
||
|
includeFilter, excludeFilter string
|
||
|
)
|
||
|
|
||
|
if len(ch.ClusterInclude) != 0 {
|
||
|
includeFilter = makeFilter("IN", ch.ClusterInclude)
|
||
|
}
|
||
|
if len(ch.ClusterExclude) != 0 {
|
||
|
excludeFilter = makeFilter("NOT IN", ch.ClusterExclude)
|
||
|
}
|
||
|
if includeFilter != "" && excludeFilter != "" {
|
||
|
return "WHERE " + includeFilter + " OR " + excludeFilter
|
||
|
}
|
||
|
if includeFilter == "" && excludeFilter != "" {
|
||
|
return "WHERE " + excludeFilter
|
||
|
}
|
||
|
return "WHERE " + includeFilter
|
||
|
}
|
||
|
|
||
|
func (ch *ClickHouse) commonMetrics(acc telegraf.Accumulator, conn *connect, metric string) error {
|
||
|
var intResult []struct {
|
||
|
Metric string `json:"metric"`
|
||
|
Value chUInt64 `json:"value"`
|
||
|
}
|
||
|
|
||
|
var floatResult []struct {
|
||
|
Metric string `json:"metric"`
|
||
|
Value float64 `json:"value"`
|
||
|
}
|
||
|
|
||
|
tags := makeDefaultTags(conn)
|
||
|
fields := make(map[string]interface{})
|
||
|
|
||
|
if commonMetricsIsFloat[metric] {
|
||
|
if err := ch.execQuery(conn.url, commonMetrics[metric], &floatResult); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
for _, r := range floatResult {
|
||
|
fields[internal.SnakeCase(r.Metric)] = r.Value
|
||
|
}
|
||
|
} else {
|
||
|
if err := ch.execQuery(conn.url, commonMetrics[metric], &intResult); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
for _, r := range intResult {
|
||
|
fields[internal.SnakeCase(r.Metric)] = uint64(r.Value)
|
||
|
}
|
||
|
}
|
||
|
acc.AddFields("clickhouse_"+metric, fields, tags)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (ch *ClickHouse) zookeeper(acc telegraf.Accumulator, conn *connect) error {
|
||
|
var zkExists []struct {
|
||
|
ZkExists chUInt64 `json:"zk_exists"`
|
||
|
}
|
||
|
|
||
|
if err := ch.execQuery(conn.url, systemZookeeperExistsSQL, &zkExists); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
tags := makeDefaultTags(conn)
|
||
|
|
||
|
if len(zkExists) > 0 && zkExists[0].ZkExists > 0 {
|
||
|
var zkRootNodes []struct {
|
||
|
ZkRootNodes chUInt64 `json:"zk_root_nodes"`
|
||
|
}
|
||
|
if err := ch.execQuery(conn.url, systemZookeeperRootNodesSQL, &zkRootNodes); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
acc.AddFields("clickhouse_zookeeper",
|
||
|
map[string]interface{}{
|
||
|
"root_nodes": uint64(zkRootNodes[0].ZkRootNodes),
|
||
|
},
|
||
|
tags,
|
||
|
)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (ch *ClickHouse) replicationQueue(acc telegraf.Accumulator, conn *connect) error {
|
||
|
var replicationQueueExists []struct {
|
||
|
ReplicationQueueExists chUInt64 `json:"replication_queue_exists"`
|
||
|
}
|
||
|
|
||
|
if err := ch.execQuery(conn.url, systemReplicationExistsSQL, &replicationQueueExists); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
tags := makeDefaultTags(conn)
|
||
|
|
||
|
if len(replicationQueueExists) > 0 && replicationQueueExists[0].ReplicationQueueExists > 0 {
|
||
|
var replicationTooManyTries []struct {
|
||
|
NumTriesReplicas chUInt64 `json:"replication_num_tries_replicas"`
|
||
|
TooManyTriesReplicas chUInt64 `json:"replication_too_many_tries_replicas"`
|
||
|
}
|
||
|
if err := ch.execQuery(conn.url, systemReplicationNumTriesSQL, &replicationTooManyTries); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
acc.AddFields("clickhouse_replication_queue",
|
||
|
map[string]interface{}{
|
||
|
"too_many_tries_replicas": uint64(replicationTooManyTries[0].TooManyTriesReplicas),
|
||
|
"num_tries_replicas": uint64(replicationTooManyTries[0].NumTriesReplicas),
|
||
|
},
|
||
|
tags,
|
||
|
)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (ch *ClickHouse) detachedParts(acc telegraf.Accumulator, conn *connect) error {
|
||
|
var detachedParts []struct {
|
||
|
DetachedParts chUInt64 `json:"detached_parts"`
|
||
|
}
|
||
|
if err := ch.execQuery(conn.url, systemDetachedPartsSQL, &detachedParts); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if len(detachedParts) > 0 {
|
||
|
tags := makeDefaultTags(conn)
|
||
|
acc.AddFields("clickhouse_detached_parts",
|
||
|
map[string]interface{}{
|
||
|
"detached_parts": uint64(detachedParts[0].DetachedParts),
|
||
|
},
|
||
|
tags,
|
||
|
)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (ch *ClickHouse) dictionaries(acc telegraf.Accumulator, conn *connect) error {
|
||
|
var brokenDictionaries []struct {
|
||
|
Origin string `json:"origin"`
|
||
|
BytesAllocated chUInt64 `json:"bytes_allocated"`
|
||
|
Status string `json:"status"`
|
||
|
}
|
||
|
if err := ch.execQuery(conn.url, systemDictionariesSQL, &brokenDictionaries); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
for _, dict := range brokenDictionaries {
|
||
|
tags := makeDefaultTags(conn)
|
||
|
|
||
|
isLoaded := uint64(1)
|
||
|
if dict.Status != "LOADED" {
|
||
|
isLoaded = 0
|
||
|
}
|
||
|
|
||
|
if dict.Origin != "" {
|
||
|
tags["dict_origin"] = dict.Origin
|
||
|
acc.AddFields("clickhouse_dictionaries",
|
||
|
map[string]interface{}{
|
||
|
"is_loaded": isLoaded,
|
||
|
"bytes_allocated": uint64(dict.BytesAllocated),
|
||
|
},
|
||
|
tags,
|
||
|
)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (ch *ClickHouse) mutations(acc telegraf.Accumulator, conn *connect) error {
|
||
|
var mutationsStatus []struct {
|
||
|
Failed chUInt64 `json:"failed"`
|
||
|
Running chUInt64 `json:"running"`
|
||
|
Completed chUInt64 `json:"completed"`
|
||
|
}
|
||
|
if err := ch.execQuery(conn.url, systemMutationSQL, &mutationsStatus); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if len(mutationsStatus) > 0 {
|
||
|
tags := makeDefaultTags(conn)
|
||
|
|
||
|
acc.AddFields("clickhouse_mutations",
|
||
|
map[string]interface{}{
|
||
|
"failed": uint64(mutationsStatus[0].Failed),
|
||
|
"running": uint64(mutationsStatus[0].Running),
|
||
|
"completed": uint64(mutationsStatus[0].Completed),
|
||
|
},
|
||
|
tags,
|
||
|
)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (ch *ClickHouse) disks(acc telegraf.Accumulator, conn *connect) error {
|
||
|
var disksStatus []struct {
|
||
|
Name string `json:"name"`
|
||
|
Path string `json:"path"`
|
||
|
FreePercent chUInt64 `json:"free_space_percent"`
|
||
|
KeepFreePercent chUInt64 `json:"keep_free_space_percent"`
|
||
|
}
|
||
|
|
||
|
if err := ch.execQuery(conn.url, systemDisksSQL, &disksStatus); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
for _, disk := range disksStatus {
|
||
|
tags := makeDefaultTags(conn)
|
||
|
tags["name"] = disk.Name
|
||
|
tags["path"] = disk.Path
|
||
|
|
||
|
acc.AddFields("clickhouse_disks",
|
||
|
map[string]interface{}{
|
||
|
"free_space_percent": uint64(disk.FreePercent),
|
||
|
"keep_free_space_percent": uint64(disk.KeepFreePercent),
|
||
|
},
|
||
|
tags,
|
||
|
)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (ch *ClickHouse) processes(acc telegraf.Accumulator, conn *connect) error {
|
||
|
var processesStats []struct {
|
||
|
QueryType string `json:"query_type"`
|
||
|
Percentile50 float64 `json:"p50"`
|
||
|
Percentile90 float64 `json:"p90"`
|
||
|
LongestRunning float64 `json:"longest_running"`
|
||
|
}
|
||
|
|
||
|
if err := ch.execQuery(conn.url, systemProcessesSQL, &processesStats); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
for _, process := range processesStats {
|
||
|
tags := makeDefaultTags(conn)
|
||
|
tags["query_type"] = process.QueryType
|
||
|
|
||
|
acc.AddFields("clickhouse_processes",
|
||
|
map[string]interface{}{
|
||
|
"percentile_50": process.Percentile50,
|
||
|
"percentile_90": process.Percentile90,
|
||
|
"longest_running": process.LongestRunning,
|
||
|
},
|
||
|
tags,
|
||
|
)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (ch *ClickHouse) textLog(acc telegraf.Accumulator, conn *connect) error {
|
||
|
var textLogExists []struct {
|
||
|
TextLogExists chUInt64 `json:"text_log_exists"`
|
||
|
}
|
||
|
|
||
|
if err := ch.execQuery(conn.url, systemTextLogExistsSQL, &textLogExists); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if len(textLogExists) > 0 && textLogExists[0].TextLogExists > 0 {
|
||
|
var textLogLast10MinMessages []struct {
|
||
|
Level string `json:"level"`
|
||
|
MessagesLast10Min chUInt64 `json:"messages_last_10_min"`
|
||
|
}
|
||
|
if err := ch.execQuery(conn.url, systemTextLogSQL, &textLogLast10MinMessages); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
for _, textLogItem := range textLogLast10MinMessages {
|
||
|
tags := makeDefaultTags(conn)
|
||
|
tags["level"] = textLogItem.Level
|
||
|
acc.AddFields("clickhouse_text_log",
|
||
|
map[string]interface{}{
|
||
|
"messages_last_10_min": uint64(textLogItem.MessagesLast10Min),
|
||
|
},
|
||
|
tags,
|
||
|
)
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (ch *ClickHouse) tables(acc telegraf.Accumulator, conn *connect) error {
|
||
|
var parts []struct {
|
||
|
Database string `json:"database"`
|
||
|
Table string `json:"table"`
|
||
|
Bytes chUInt64 `json:"bytes"`
|
||
|
Parts chUInt64 `json:"parts"`
|
||
|
Rows chUInt64 `json:"rows"`
|
||
|
}
|
||
|
|
||
|
if err := ch.execQuery(conn.url, systemPartsSQL, &parts); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
tags := makeDefaultTags(conn)
|
||
|
|
||
|
for _, part := range parts {
|
||
|
tags["table"] = part.Table
|
||
|
tags["database"] = part.Database
|
||
|
acc.AddFields("clickhouse_tables",
|
||
|
map[string]interface{}{
|
||
|
"bytes": uint64(part.Bytes),
|
||
|
"parts": uint64(part.Parts),
|
||
|
"rows": uint64(part.Rows),
|
||
|
},
|
||
|
tags,
|
||
|
)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func makeDefaultTags(conn *connect) map[string]string {
|
||
|
tags := map[string]string{
|
||
|
"source": conn.Hostname,
|
||
|
}
|
||
|
if len(conn.Cluster) != 0 {
|
||
|
tags["cluster"] = conn.Cluster
|
||
|
}
|
||
|
if conn.ShardNum != 0 {
|
||
|
tags["shard_num"] = strconv.Itoa(conn.ShardNum)
|
||
|
}
|
||
|
return tags
|
||
|
}
|
||
|
|
||
|
type clickhouseError struct {
|
||
|
StatusCode int
|
||
|
body []byte
|
||
|
}
|
||
|
|
||
|
func (e *clickhouseError) Error() string {
|
||
|
return fmt.Sprintf("received error code %d: %s", e.StatusCode, e.body)
|
||
|
}
|
||
|
|
||
|
func (ch *ClickHouse) execQuery(address *url.URL, query string, i interface{}) error {
|
||
|
q := address.Query()
|
||
|
q.Set("query", query+" FORMAT JSON")
|
||
|
address.RawQuery = q.Encode()
|
||
|
req, err := http.NewRequest("GET", address.String(), nil)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
if ch.Username != "" {
|
||
|
req.Header.Add("X-ClickHouse-User", ch.Username)
|
||
|
}
|
||
|
if ch.Password != "" {
|
||
|
req.Header.Add("X-ClickHouse-Key", ch.Password)
|
||
|
}
|
||
|
resp, err := ch.HTTPClient.Do(req)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
defer func() { _ = resp.Body.Close() }()
|
||
|
if resp.StatusCode >= 300 {
|
||
|
//nolint:errcheck // reading body for error reporting
|
||
|
body, _ := io.ReadAll(io.LimitReader(resp.Body, 200))
|
||
|
return &clickhouseError{
|
||
|
StatusCode: resp.StatusCode,
|
||
|
body: body,
|
||
|
}
|
||
|
}
|
||
|
var response struct {
|
||
|
Data json.RawMessage
|
||
|
}
|
||
|
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
if err := json.Unmarshal(response.Data, i); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// see https://clickhouse.yandex/docs/en/operations/settings/settings/#session_settings-output_format_json_quote_64bit_integers
|
||
|
type chUInt64 uint64
|
||
|
|
||
|
// UnmarshalJSON is a custom unmarshaler for chUInt64 to handle ClickHouse's JSON format.
|
||
|
func (i *chUInt64) UnmarshalJSON(b []byte) error {
|
||
|
b = bytes.TrimPrefix(b, []byte(`"`))
|
||
|
b = bytes.TrimSuffix(b, []byte(`"`))
|
||
|
v, err := strconv.ParseUint(string(b), 10, 64)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
*i = chUInt64(v)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
const (
|
||
|
systemEventsSQL = "SELECT event AS metric, toUInt64(value) AS value FROM system.events"
|
||
|
systemMetricsSQL = "SELECT metric, toUInt64(value) AS value FROM system.metrics"
|
||
|
systemAsyncMetricsSQL = "SELECT metric, toFloat64(value) AS value FROM system.asynchronous_metrics"
|
||
|
systemPartsSQL = `
|
||
|
SELECT
|
||
|
database,
|
||
|
table,
|
||
|
SUM(bytes) AS bytes,
|
||
|
COUNT(*) AS parts,
|
||
|
SUM(rows) AS rows
|
||
|
FROM system.parts
|
||
|
WHERE active = 1
|
||
|
GROUP BY
|
||
|
database, table
|
||
|
ORDER BY
|
||
|
database, table
|
||
|
`
|
||
|
systemZookeeperExistsSQL = "SELECT count() AS zk_exists FROM system.tables WHERE database='system' AND name='zookeeper'"
|
||
|
systemZookeeperRootNodesSQL = "SELECT count() AS zk_root_nodes FROM system.zookeeper WHERE path='/'"
|
||
|
|
||
|
systemReplicationExistsSQL = "SELECT count() AS replication_queue_exists FROM system.tables WHERE database='system' AND name='replication_queue'"
|
||
|
systemReplicationNumTriesSQL = "SELECT countIf(num_tries>1) AS replication_num_tries_replicas, countIf(num_tries>100) " +
|
||
|
"AS replication_too_many_tries_replicas FROM system.replication_queue SETTINGS empty_result_for_aggregation_by_empty_set=0"
|
||
|
|
||
|
systemDetachedPartsSQL = "SELECT count() AS detached_parts FROM system.detached_parts SETTINGS empty_result_for_aggregation_by_empty_set=0"
|
||
|
|
||
|
systemDictionariesSQL = "SELECT origin, status, bytes_allocated FROM system.dictionaries"
|
||
|
|
||
|
systemMutationSQL = "SELECT countIf(latest_fail_time>toDateTime('0000-00-00 00:00:00') AND is_done=0) " +
|
||
|
"AS failed, countIf(latest_fail_time=toDateTime('0000-00-00 00:00:00') AND is_done=0) " +
|
||
|
"AS running, countIf(is_done=1) AS completed FROM system.mutations SETTINGS empty_result_for_aggregation_by_empty_set=0"
|
||
|
systemDisksSQL = "SELECT name, path, toUInt64(100*free_space / total_space) " +
|
||
|
"AS free_space_percent, toUInt64( 100 * keep_free_space / total_space) AS keep_free_space_percent FROM system.disks"
|
||
|
systemProcessesSQL = "SELECT multiIf(positionCaseInsensitive(query,'select')=1,'select',positionCaseInsensitive(query,'insert')=1,'insert','other') " +
|
||
|
"AS query_type, quantile\n(0.5)(elapsed) AS p50, quantile(0.9)(elapsed) AS p90, max(elapsed) AS longest_running " +
|
||
|
"FROM system.processes GROUP BY query_type SETTINGS empty_result_for_aggregation_by_empty_set=0"
|
||
|
|
||
|
systemTextLogExistsSQL = "SELECT count() AS text_log_exists FROM system.tables WHERE database='system' AND name='text_log'"
|
||
|
systemTextLogSQL = "SELECT count() AS messages_last_10_min, level FROM system.text_log " +
|
||
|
"WHERE level <= 'Notice' AND event_time >= now() - INTERVAL 600 SECOND GROUP BY level SETTINGS empty_result_for_aggregation_by_empty_set=0"
|
||
|
)
|
||
|
|
||
|
var commonMetrics = map[string]string{
|
||
|
"events": systemEventsSQL,
|
||
|
"metrics": systemMetricsSQL,
|
||
|
"asynchronous_metrics": systemAsyncMetricsSQL,
|
||
|
}
|
||
|
|
||
|
var commonMetricsIsFloat = map[string]bool{
|
||
|
"events": false,
|
||
|
"metrics": false,
|
||
|
"asynchronous_metrics": true,
|
||
|
}
|
||
|
|
||
|
var _ telegraf.ServiceInput = &ClickHouse{}
|
||
|
|
||
|
func init() {
|
||
|
inputs.Add("clickhouse", func() telegraf.Input {
|
||
|
return &ClickHouse{
|
||
|
AutoDiscovery: true,
|
||
|
ClientConfig: tls.ClientConfig{
|
||
|
InsecureSkipVerify: false,
|
||
|
},
|
||
|
Timeout: config.Duration(defaultTimeout),
|
||
|
}
|
||
|
})
|
||
|
}
|