373 lines
14 KiB
Go
373 lines
14 KiB
Go
package elasticsearch
|
|
|
|
import (
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/influxdata/telegraf/testutil"
|
|
)
|
|
|
|
func defaultTags() map[string]string {
|
|
return map[string]string{
|
|
"cluster_name": "es-testcluster",
|
|
"node_attribute_master": "true",
|
|
"node_id": "SDFsfSDFsdfFSDSDfSFDSDF",
|
|
"node_name": "test.host.com",
|
|
"node_host": "test",
|
|
"node_roles": "data,ingest,master",
|
|
}
|
|
}
|
|
func defaultServerInfo() serverInfo {
|
|
return serverInfo{nodeID: "", masterID: "SDFsfSDFsdfFSDSDfSFDSDF"}
|
|
}
|
|
|
|
type transportMock struct {
|
|
statusCode int
|
|
body string
|
|
}
|
|
|
|
func newTransportMock(body string) http.RoundTripper {
|
|
return &transportMock{
|
|
statusCode: http.StatusOK,
|
|
body: body,
|
|
}
|
|
}
|
|
|
|
func (t *transportMock) RoundTrip(r *http.Request) (*http.Response, error) {
|
|
res := &http.Response{
|
|
Header: make(http.Header),
|
|
Request: r,
|
|
StatusCode: t.statusCode,
|
|
}
|
|
res.Header.Set("Content-Type", "application/json")
|
|
res.Body = io.NopCloser(strings.NewReader(t.body))
|
|
return res, nil
|
|
}
|
|
|
|
func checkNodeStatsResult(t *testing.T, acc *testutil.Accumulator) {
|
|
tags := defaultTags()
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_indices", nodestatsIndicesExpected, tags)
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_os", nodestatsOsExpected, tags)
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_process", nodestatsProcessExpected, tags)
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_jvm", nodestatsJvmExpected, tags)
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_thread_pool", nodestatsThreadPoolExpected, tags)
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_fs", nodestatsFsExpected, tags)
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_transport", nodestatsTransportExpected, tags)
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_http", nodestatsHTTPExpected, tags)
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_breakers", nodestatsBreakersExpected, tags)
|
|
}
|
|
|
|
func TestGather(t *testing.T) {
|
|
es := newElasticsearchWithClient()
|
|
es.Servers = []string{"http://example.com:9200"}
|
|
es.client.Transport = newTransportMock(nodeStatsResponse)
|
|
es.serverInfo = make(map[string]serverInfo)
|
|
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
|
|
|
|
var acc testutil.Accumulator
|
|
require.NoError(t, acc.GatherError(es.Gather))
|
|
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
|
|
checkNodeStatsResult(t, &acc)
|
|
}
|
|
|
|
func TestGatherIndividualStats(t *testing.T) {
|
|
es := newElasticsearchWithClient()
|
|
es.Servers = []string{"http://example.com:9200"}
|
|
es.NodeStats = []string{"jvm", "process"}
|
|
es.client.Transport = newTransportMock(nodeStatsResponseJVMProcess)
|
|
es.serverInfo = make(map[string]serverInfo)
|
|
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
|
|
|
|
var acc testutil.Accumulator
|
|
require.NoError(t, acc.GatherError(es.Gather))
|
|
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
|
|
|
|
tags := defaultTags()
|
|
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices", nodestatsIndicesExpected, tags)
|
|
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_os", nodestatsOsExpected, tags)
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_process", nodestatsProcessExpected, tags)
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_jvm", nodestatsJvmExpected, tags)
|
|
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_thread_pool", nodestatsThreadPoolExpected, tags)
|
|
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_fs", nodestatsFsExpected, tags)
|
|
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_transport", nodestatsTransportExpected, tags)
|
|
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_http", nodestatsHTTPExpected, tags)
|
|
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_breakers", nodestatsBreakersExpected, tags)
|
|
}
|
|
|
|
func TestGatherEnrichStats(t *testing.T) {
|
|
es := newElasticsearchWithClient()
|
|
es.Servers = []string{"http://example.com:9200"}
|
|
es.EnrichStats = true
|
|
es.client.Transport = newTransportMock(enrichStatsResponse)
|
|
es.serverInfo = make(map[string]serverInfo)
|
|
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
|
|
|
|
var acc testutil.Accumulator
|
|
require.NoError(t, acc.GatherError(es.Gather))
|
|
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
|
|
|
|
metrics := acc.GetTelegrafMetrics()
|
|
require.Len(t, metrics, 8)
|
|
}
|
|
|
|
func TestGatherNodeStats(t *testing.T) {
|
|
es := newElasticsearchWithClient()
|
|
es.Servers = []string{"http://example.com:9200"}
|
|
es.client.Transport = newTransportMock(nodeStatsResponse)
|
|
es.serverInfo = make(map[string]serverInfo)
|
|
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
|
|
|
|
var acc testutil.Accumulator
|
|
require.NoError(t, es.gatherNodeStats("junk", &acc))
|
|
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
|
|
checkNodeStatsResult(t, &acc)
|
|
}
|
|
|
|
func TestGatherClusterHealthEmptyClusterHealth(t *testing.T) {
|
|
es := newElasticsearchWithClient()
|
|
es.Servers = []string{"http://example.com:9200"}
|
|
es.ClusterHealth = true
|
|
es.ClusterHealthLevel = ""
|
|
es.client.Transport = newTransportMock(clusterHealthResponse)
|
|
es.serverInfo = make(map[string]serverInfo)
|
|
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
|
|
|
|
var acc testutil.Accumulator
|
|
require.NoError(t, es.gatherClusterHealth("junk", &acc))
|
|
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
|
|
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
|
|
clusterHealthExpected,
|
|
map[string]string{"name": "elasticsearch_telegraf"})
|
|
|
|
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_cluster_health_indices",
|
|
v1IndexExpected,
|
|
map[string]string{"index": "v1"})
|
|
|
|
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_cluster_health_indices",
|
|
v2IndexExpected,
|
|
map[string]string{"index": "v2"})
|
|
}
|
|
|
|
func TestGatherClusterHealthSpecificClusterHealth(t *testing.T) {
|
|
es := newElasticsearchWithClient()
|
|
es.Servers = []string{"http://example.com:9200"}
|
|
es.ClusterHealth = true
|
|
es.ClusterHealthLevel = "cluster"
|
|
es.client.Transport = newTransportMock(clusterHealthResponse)
|
|
es.serverInfo = make(map[string]serverInfo)
|
|
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
|
|
|
|
var acc testutil.Accumulator
|
|
require.NoError(t, es.gatherClusterHealth("junk", &acc))
|
|
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
|
|
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
|
|
clusterHealthExpected,
|
|
map[string]string{"name": "elasticsearch_telegraf"})
|
|
|
|
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_cluster_health_indices",
|
|
v1IndexExpected,
|
|
map[string]string{"index": "v1"})
|
|
|
|
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_cluster_health_indices",
|
|
v2IndexExpected,
|
|
map[string]string{"index": "v2"})
|
|
}
|
|
|
|
func TestGatherClusterHealthAlsoIndicesHealth(t *testing.T) {
|
|
es := newElasticsearchWithClient()
|
|
es.Servers = []string{"http://example.com:9200"}
|
|
es.ClusterHealth = true
|
|
es.ClusterHealthLevel = "indices"
|
|
es.client.Transport = newTransportMock(clusterHealthResponseWithIndices)
|
|
es.serverInfo = make(map[string]serverInfo)
|
|
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
|
|
|
|
var acc testutil.Accumulator
|
|
require.NoError(t, es.gatherClusterHealth("junk", &acc))
|
|
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
|
|
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
|
|
clusterHealthExpected,
|
|
map[string]string{"name": "elasticsearch_telegraf"})
|
|
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health_indices",
|
|
v1IndexExpected,
|
|
map[string]string{"index": "v1", "name": "elasticsearch_telegraf"})
|
|
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health_indices",
|
|
v2IndexExpected,
|
|
map[string]string{"index": "v2", "name": "elasticsearch_telegraf"})
|
|
}
|
|
|
|
func TestGatherClusterStatsMaster(t *testing.T) {
|
|
// This needs multiple steps to replicate the multiple calls internally.
|
|
es := newElasticsearchWithClient()
|
|
es.ClusterStats = true
|
|
es.Servers = []string{"http://example.com:9200"}
|
|
es.serverInfo = make(map[string]serverInfo)
|
|
info := serverInfo{nodeID: "SDFsfSDFsdfFSDSDfSFDSDF", masterID: ""}
|
|
|
|
// first get catMaster
|
|
es.client.Transport = newTransportMock(IsMasterResult)
|
|
masterID, err := es.getCatMaster("junk")
|
|
require.NoError(t, err)
|
|
info.masterID = masterID
|
|
es.serverInfo["http://example.com:9200"] = info
|
|
|
|
isMasterResultTokens := strings.Split(IsMasterResult, " ")
|
|
require.Equal(t, masterID, isMasterResultTokens[0], "catmaster is incorrect")
|
|
|
|
// now get node status, which determines whether we're master
|
|
var acc testutil.Accumulator
|
|
es.Local = true
|
|
es.client.Transport = newTransportMock(nodeStatsResponse)
|
|
require.NoError(t, es.gatherNodeStats("junk", &acc))
|
|
require.True(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
|
|
checkNodeStatsResult(t, &acc)
|
|
|
|
// now test the clusterstats method
|
|
es.client.Transport = newTransportMock(clusterStatsResponse)
|
|
require.NoError(t, es.gatherClusterStats("junk", &acc))
|
|
|
|
tags := map[string]string{
|
|
"cluster_name": "es-testcluster",
|
|
"node_name": "test.host.com",
|
|
"status": "red",
|
|
}
|
|
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_clusterstats_nodes", clusterstatsNodesExpected, tags)
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_clusterstats_indices", clusterstatsIndicesExpected, tags)
|
|
}
|
|
|
|
func TestGatherClusterStatsNonMaster(t *testing.T) {
|
|
// This needs multiple steps to replicate the multiple calls internally.
|
|
es := newElasticsearchWithClient()
|
|
es.ClusterStats = true
|
|
es.Servers = []string{"http://example.com:9200"}
|
|
es.serverInfo = make(map[string]serverInfo)
|
|
es.serverInfo["http://example.com:9200"] = serverInfo{nodeID: "SDFsfSDFsdfFSDSDfSFDSDF", masterID: ""}
|
|
|
|
// first get catMaster
|
|
es.client.Transport = newTransportMock(IsNotMasterResult)
|
|
masterID, err := es.getCatMaster("junk")
|
|
require.NoError(t, err)
|
|
|
|
isNotMasterResultTokens := strings.Split(IsNotMasterResult, " ")
|
|
require.Equal(t, masterID, isNotMasterResultTokens[0], "catmaster is incorrect")
|
|
|
|
// now get node status, which determines whether we're master
|
|
var acc testutil.Accumulator
|
|
es.Local = true
|
|
es.client.Transport = newTransportMock(nodeStatsResponse)
|
|
require.NoError(t, es.gatherNodeStats("junk", &acc))
|
|
|
|
// ensure flag is clear so Cluster Stats would not be done
|
|
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
|
|
checkNodeStatsResult(t, &acc)
|
|
}
|
|
|
|
func TestGatherClusterIndicesStats(t *testing.T) {
|
|
es := newElasticsearchWithClient()
|
|
es.IndicesInclude = []string{"_all"}
|
|
es.Servers = []string{"http://example.com:9200"}
|
|
es.client.Transport = newTransportMock(clusterIndicesResponse)
|
|
es.serverInfo = make(map[string]serverInfo)
|
|
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
|
|
|
|
var acc testutil.Accumulator
|
|
require.NoError(t, es.gatherIndicesStats("junk", &acc))
|
|
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
|
|
clusterIndicesExpected,
|
|
map[string]string{"index_name": "twitter"})
|
|
}
|
|
|
|
func TestGatherDateStampedIndicesStats(t *testing.T) {
|
|
es := newElasticsearchWithClient()
|
|
es.IndicesInclude = []string{"twitter*", "influx*", "penguins"}
|
|
es.NumMostRecentIndices = 2
|
|
es.Servers = []string{"http://example.com:9200"}
|
|
es.client.Transport = newTransportMock(dateStampedIndicesResponse)
|
|
es.serverInfo = make(map[string]serverInfo)
|
|
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
|
|
require.NoError(t, es.Init())
|
|
|
|
var acc testutil.Accumulator
|
|
require.NoError(t, es.gatherIndicesStats(es.Servers[0]+"/"+strings.Join(es.IndicesInclude, ",")+"/_stats", &acc))
|
|
|
|
// includes 2 most recent indices for "twitter", only expect the most recent two.
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
|
|
clusterIndicesExpected,
|
|
map[string]string{"index_name": "twitter_2020_08_02"})
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
|
|
clusterIndicesExpected,
|
|
map[string]string{"index_name": "twitter_2020_08_01"})
|
|
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
|
|
clusterIndicesExpected,
|
|
map[string]string{"index_name": "twitter_2020_07_31"})
|
|
|
|
// includes 2 most recent indices for "influx", only expect the most recent two.
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
|
|
clusterIndicesExpected,
|
|
map[string]string{"index_name": "influx2021.01.02"})
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
|
|
clusterIndicesExpected,
|
|
map[string]string{"index_name": "influx2021.01.01"})
|
|
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
|
|
clusterIndicesExpected,
|
|
map[string]string{"index_name": "influx2020.12.31"})
|
|
|
|
// not configured to sort the 'penguins' index, but ensure it is also included.
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
|
|
clusterIndicesExpected,
|
|
map[string]string{"index_name": "penguins"})
|
|
}
|
|
|
|
func TestGatherClusterIndiceShardsStats(t *testing.T) {
|
|
es := newElasticsearchWithClient()
|
|
es.IndicesLevel = "shards"
|
|
es.Servers = []string{"http://example.com:9200"}
|
|
es.client.Transport = newTransportMock(clusterIndicesShardsResponse)
|
|
es.serverInfo = make(map[string]serverInfo)
|
|
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
|
|
|
|
var acc testutil.Accumulator
|
|
require.NoError(t, es.gatherIndicesStats("junk", &acc))
|
|
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
|
|
clusterIndicesExpected,
|
|
map[string]string{"index_name": "twitter"})
|
|
|
|
primaryTags := map[string]string{
|
|
"index_name": "twitter",
|
|
"node_id": "oqvR8I1dTpONvwRM30etww",
|
|
"shard_name": "0",
|
|
"type": "primary",
|
|
}
|
|
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_shards",
|
|
clusterIndicesPrimaryShardsExpected,
|
|
primaryTags)
|
|
|
|
replicaTags := map[string]string{
|
|
"index_name": "twitter",
|
|
"node_id": "oqvR8I1dTpONvwRM30etww",
|
|
"shard_name": "1",
|
|
"type": "replica",
|
|
}
|
|
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_shards",
|
|
clusterIndicesReplicaShardsExpected,
|
|
replicaTags)
|
|
}
|
|
|
|
func newElasticsearchWithClient() *Elasticsearch {
|
|
es := newElasticsearch()
|
|
es.client = &http.Client{}
|
|
return es
|
|
}
|