1
0
Fork 0
telegraf/plugins/inputs/elasticsearch/elasticsearch_test.go

374 lines
14 KiB
Go
Raw Permalink Normal View History

package elasticsearch
import (
"io"
"net/http"
"strings"
"testing"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/testutil"
)
func defaultTags() map[string]string {
return map[string]string{
"cluster_name": "es-testcluster",
"node_attribute_master": "true",
"node_id": "SDFsfSDFsdfFSDSDfSFDSDF",
"node_name": "test.host.com",
"node_host": "test",
"node_roles": "data,ingest,master",
}
}
func defaultServerInfo() serverInfo {
return serverInfo{nodeID: "", masterID: "SDFsfSDFsdfFSDSDfSFDSDF"}
}
type transportMock struct {
statusCode int
body string
}
func newTransportMock(body string) http.RoundTripper {
return &transportMock{
statusCode: http.StatusOK,
body: body,
}
}
func (t *transportMock) RoundTrip(r *http.Request) (*http.Response, error) {
res := &http.Response{
Header: make(http.Header),
Request: r,
StatusCode: t.statusCode,
}
res.Header.Set("Content-Type", "application/json")
res.Body = io.NopCloser(strings.NewReader(t.body))
return res, nil
}
func checkNodeStatsResult(t *testing.T, acc *testutil.Accumulator) {
tags := defaultTags()
acc.AssertContainsTaggedFields(t, "elasticsearch_indices", nodestatsIndicesExpected, tags)
acc.AssertContainsTaggedFields(t, "elasticsearch_os", nodestatsOsExpected, tags)
acc.AssertContainsTaggedFields(t, "elasticsearch_process", nodestatsProcessExpected, tags)
acc.AssertContainsTaggedFields(t, "elasticsearch_jvm", nodestatsJvmExpected, tags)
acc.AssertContainsTaggedFields(t, "elasticsearch_thread_pool", nodestatsThreadPoolExpected, tags)
acc.AssertContainsTaggedFields(t, "elasticsearch_fs", nodestatsFsExpected, tags)
acc.AssertContainsTaggedFields(t, "elasticsearch_transport", nodestatsTransportExpected, tags)
acc.AssertContainsTaggedFields(t, "elasticsearch_http", nodestatsHTTPExpected, tags)
acc.AssertContainsTaggedFields(t, "elasticsearch_breakers", nodestatsBreakersExpected, tags)
}
func TestGather(t *testing.T) {
es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"}
es.client.Transport = newTransportMock(nodeStatsResponse)
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
var acc testutil.Accumulator
require.NoError(t, acc.GatherError(es.Gather))
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
checkNodeStatsResult(t, &acc)
}
func TestGatherIndividualStats(t *testing.T) {
es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"}
es.NodeStats = []string{"jvm", "process"}
es.client.Transport = newTransportMock(nodeStatsResponseJVMProcess)
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
var acc testutil.Accumulator
require.NoError(t, acc.GatherError(es.Gather))
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
tags := defaultTags()
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices", nodestatsIndicesExpected, tags)
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_os", nodestatsOsExpected, tags)
acc.AssertContainsTaggedFields(t, "elasticsearch_process", nodestatsProcessExpected, tags)
acc.AssertContainsTaggedFields(t, "elasticsearch_jvm", nodestatsJvmExpected, tags)
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_thread_pool", nodestatsThreadPoolExpected, tags)
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_fs", nodestatsFsExpected, tags)
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_transport", nodestatsTransportExpected, tags)
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_http", nodestatsHTTPExpected, tags)
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_breakers", nodestatsBreakersExpected, tags)
}
func TestGatherEnrichStats(t *testing.T) {
es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"}
es.EnrichStats = true
es.client.Transport = newTransportMock(enrichStatsResponse)
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
var acc testutil.Accumulator
require.NoError(t, acc.GatherError(es.Gather))
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
metrics := acc.GetTelegrafMetrics()
require.Len(t, metrics, 8)
}
func TestGatherNodeStats(t *testing.T) {
es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"}
es.client.Transport = newTransportMock(nodeStatsResponse)
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
var acc testutil.Accumulator
require.NoError(t, es.gatherNodeStats("junk", &acc))
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
checkNodeStatsResult(t, &acc)
}
func TestGatherClusterHealthEmptyClusterHealth(t *testing.T) {
es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"}
es.ClusterHealth = true
es.ClusterHealthLevel = ""
es.client.Transport = newTransportMock(clusterHealthResponse)
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
var acc testutil.Accumulator
require.NoError(t, es.gatherClusterHealth("junk", &acc))
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
clusterHealthExpected,
map[string]string{"name": "elasticsearch_telegraf"})
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_cluster_health_indices",
v1IndexExpected,
map[string]string{"index": "v1"})
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_cluster_health_indices",
v2IndexExpected,
map[string]string{"index": "v2"})
}
func TestGatherClusterHealthSpecificClusterHealth(t *testing.T) {
es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"}
es.ClusterHealth = true
es.ClusterHealthLevel = "cluster"
es.client.Transport = newTransportMock(clusterHealthResponse)
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
var acc testutil.Accumulator
require.NoError(t, es.gatherClusterHealth("junk", &acc))
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
clusterHealthExpected,
map[string]string{"name": "elasticsearch_telegraf"})
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_cluster_health_indices",
v1IndexExpected,
map[string]string{"index": "v1"})
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_cluster_health_indices",
v2IndexExpected,
map[string]string{"index": "v2"})
}
func TestGatherClusterHealthAlsoIndicesHealth(t *testing.T) {
es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"}
es.ClusterHealth = true
es.ClusterHealthLevel = "indices"
es.client.Transport = newTransportMock(clusterHealthResponseWithIndices)
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
var acc testutil.Accumulator
require.NoError(t, es.gatherClusterHealth("junk", &acc))
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
clusterHealthExpected,
map[string]string{"name": "elasticsearch_telegraf"})
acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health_indices",
v1IndexExpected,
map[string]string{"index": "v1", "name": "elasticsearch_telegraf"})
acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health_indices",
v2IndexExpected,
map[string]string{"index": "v2", "name": "elasticsearch_telegraf"})
}
func TestGatherClusterStatsMaster(t *testing.T) {
// This needs multiple steps to replicate the multiple calls internally.
es := newElasticsearchWithClient()
es.ClusterStats = true
es.Servers = []string{"http://example.com:9200"}
es.serverInfo = make(map[string]serverInfo)
info := serverInfo{nodeID: "SDFsfSDFsdfFSDSDfSFDSDF", masterID: ""}
// first get catMaster
es.client.Transport = newTransportMock(IsMasterResult)
masterID, err := es.getCatMaster("junk")
require.NoError(t, err)
info.masterID = masterID
es.serverInfo["http://example.com:9200"] = info
isMasterResultTokens := strings.Split(IsMasterResult, " ")
require.Equal(t, masterID, isMasterResultTokens[0], "catmaster is incorrect")
// now get node status, which determines whether we're master
var acc testutil.Accumulator
es.Local = true
es.client.Transport = newTransportMock(nodeStatsResponse)
require.NoError(t, es.gatherNodeStats("junk", &acc))
require.True(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
checkNodeStatsResult(t, &acc)
// now test the clusterstats method
es.client.Transport = newTransportMock(clusterStatsResponse)
require.NoError(t, es.gatherClusterStats("junk", &acc))
tags := map[string]string{
"cluster_name": "es-testcluster",
"node_name": "test.host.com",
"status": "red",
}
acc.AssertContainsTaggedFields(t, "elasticsearch_clusterstats_nodes", clusterstatsNodesExpected, tags)
acc.AssertContainsTaggedFields(t, "elasticsearch_clusterstats_indices", clusterstatsIndicesExpected, tags)
}
func TestGatherClusterStatsNonMaster(t *testing.T) {
// This needs multiple steps to replicate the multiple calls internally.
es := newElasticsearchWithClient()
es.ClusterStats = true
es.Servers = []string{"http://example.com:9200"}
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = serverInfo{nodeID: "SDFsfSDFsdfFSDSDfSFDSDF", masterID: ""}
// first get catMaster
es.client.Transport = newTransportMock(IsNotMasterResult)
masterID, err := es.getCatMaster("junk")
require.NoError(t, err)
isNotMasterResultTokens := strings.Split(IsNotMasterResult, " ")
require.Equal(t, masterID, isNotMasterResultTokens[0], "catmaster is incorrect")
// now get node status, which determines whether we're master
var acc testutil.Accumulator
es.Local = true
es.client.Transport = newTransportMock(nodeStatsResponse)
require.NoError(t, es.gatherNodeStats("junk", &acc))
// ensure flag is clear so Cluster Stats would not be done
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
checkNodeStatsResult(t, &acc)
}
func TestGatherClusterIndicesStats(t *testing.T) {
es := newElasticsearchWithClient()
es.IndicesInclude = []string{"_all"}
es.Servers = []string{"http://example.com:9200"}
es.client.Transport = newTransportMock(clusterIndicesResponse)
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
var acc testutil.Accumulator
require.NoError(t, es.gatherIndicesStats("junk", &acc))
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,
map[string]string{"index_name": "twitter"})
}
func TestGatherDateStampedIndicesStats(t *testing.T) {
es := newElasticsearchWithClient()
es.IndicesInclude = []string{"twitter*", "influx*", "penguins"}
es.NumMostRecentIndices = 2
es.Servers = []string{"http://example.com:9200"}
es.client.Transport = newTransportMock(dateStampedIndicesResponse)
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
require.NoError(t, es.Init())
var acc testutil.Accumulator
require.NoError(t, es.gatherIndicesStats(es.Servers[0]+"/"+strings.Join(es.IndicesInclude, ",")+"/_stats", &acc))
// includes 2 most recent indices for "twitter", only expect the most recent two.
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,
map[string]string{"index_name": "twitter_2020_08_02"})
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,
map[string]string{"index_name": "twitter_2020_08_01"})
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,
map[string]string{"index_name": "twitter_2020_07_31"})
// includes 2 most recent indices for "influx", only expect the most recent two.
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,
map[string]string{"index_name": "influx2021.01.02"})
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,
map[string]string{"index_name": "influx2021.01.01"})
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,
map[string]string{"index_name": "influx2020.12.31"})
// not configured to sort the 'penguins' index, but ensure it is also included.
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,
map[string]string{"index_name": "penguins"})
}
func TestGatherClusterIndiceShardsStats(t *testing.T) {
es := newElasticsearchWithClient()
es.IndicesLevel = "shards"
es.Servers = []string{"http://example.com:9200"}
es.client.Transport = newTransportMock(clusterIndicesShardsResponse)
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
var acc testutil.Accumulator
require.NoError(t, es.gatherIndicesStats("junk", &acc))
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,
map[string]string{"index_name": "twitter"})
primaryTags := map[string]string{
"index_name": "twitter",
"node_id": "oqvR8I1dTpONvwRM30etww",
"shard_name": "0",
"type": "primary",
}
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_shards",
clusterIndicesPrimaryShardsExpected,
primaryTags)
replicaTags := map[string]string{
"index_name": "twitter",
"node_id": "oqvR8I1dTpONvwRM30etww",
"shard_name": "1",
"type": "replica",
}
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_shards",
clusterIndicesReplicaShardsExpected,
replicaTags)
}
func newElasticsearchWithClient() *Elasticsearch {
es := newElasticsearch()
es.client = &http.Client{}
return es
}