1
0
Fork 0
telegraf/plugins/inputs/kapacitor/kapacitor_test.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

143 lines
3.9 KiB
Go

package kapacitor_test
import (
"net/http"
"net/http/httptest"
"os"
"testing"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/inputs/kapacitor"
"github.com/influxdata/telegraf/testutil"
)
func TestKapacitor(t *testing.T) {
kapacitorReturn, err := os.ReadFile("./testdata/kapacitor_return.json")
require.NoError(t, err)
fakeInfluxServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/endpoint" {
if _, err := w.Write(kapacitorReturn); err != nil {
w.WriteHeader(http.StatusInternalServerError)
t.Error(err)
return
}
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
defer fakeInfluxServer.Close()
plugin := &kapacitor.Kapacitor{
URLs: []string{fakeInfluxServer.URL + "/endpoint"},
}
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
require.Len(t, acc.Metrics, 63)
fields := map[string]interface{}{
"alloc_bytes": int64(6950624),
"buck_hash_sys_bytes": int64(1446737),
"frees": int64(129656),
"gc_cpu_fraction": float64(0.006757149597237818),
"gc_sys_bytes": int64(575488),
"heap_alloc_bytes": int64(6950624),
"heap_idle_bytes": int64(499712),
"heap_in_use_bytes": int64(9166848),
"heap_objects": int64(28070),
"heap_released_bytes": int64(0),
"heap_sys_bytes": int64(9666560),
"last_gc_ns": int64(1478813691405406556),
"lookups": int64(40),
"mallocs": int64(157726),
"mcache_in_use_bytes": int64(9600),
"mcache_sys_bytes": int64(16384),
"mspan_in_use_bytes": int64(105600),
"mspan_sys_bytes": int64(114688),
"next_gc_ns": int64(10996691),
"num_gc": int64(4),
"other_sys_bytes": int64(1985959),
"pause_total_ns": int64(767327),
"stack_in_use_bytes": int64(819200),
"stack_sys_bytes": int64(819200),
"sys_bytes": int64(14625016),
"total_alloc_bytes": int64(13475176),
}
tags := map[string]string{
"kap_version": "1.1.0~rc2",
"url": fakeInfluxServer.URL + "/endpoint",
}
acc.AssertContainsTaggedFields(t, "kapacitor_memstats", fields, tags)
acc.AssertContainsTaggedFields(t, "kapacitor",
map[string]interface{}{
"num_enabled_tasks": 5,
"num_subscriptions": 6,
"num_tasks": 5,
}, tags)
}
func TestMissingStats(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
if _, err := w.Write([]byte(`{}`)); err != nil {
w.WriteHeader(http.StatusInternalServerError)
t.Error(err)
return
}
}))
defer server.Close()
plugin := &kapacitor.Kapacitor{
URLs: []string{server.URL},
}
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
require.False(t, acc.HasField("kapacitor_memstats", "alloc_bytes"))
require.True(t, acc.HasField("kapacitor", "num_tasks"))
}
func TestErrorHandling(t *testing.T) {
badServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/endpoint" {
if _, err := w.Write([]byte("not json")); err != nil {
w.WriteHeader(http.StatusInternalServerError)
t.Error(err)
return
}
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
defer badServer.Close()
plugin := &kapacitor.Kapacitor{
URLs: []string{badServer.URL + "/endpoint"},
}
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
acc.WaitError(1)
require.Equal(t, uint64(0), acc.NMetrics())
}
func TestErrorHandling404(t *testing.T) {
badServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusNotFound)
}))
defer badServer.Close()
plugin := &kapacitor.Kapacitor{
URLs: []string{badServer.URL},
}
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
acc.WaitError(1)
require.Equal(t, uint64(0), acc.NMetrics())
}