package kapacitor_test import ( "net/http" "net/http/httptest" "os" "testing" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf/plugins/inputs/kapacitor" "github.com/influxdata/telegraf/testutil" ) func TestKapacitor(t *testing.T) { kapacitorReturn, err := os.ReadFile("./testdata/kapacitor_return.json") require.NoError(t, err) fakeInfluxServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "/endpoint" { if _, err := w.Write(kapacitorReturn); err != nil { w.WriteHeader(http.StatusInternalServerError) t.Error(err) return } } else { w.WriteHeader(http.StatusNotFound) } })) defer fakeInfluxServer.Close() plugin := &kapacitor.Kapacitor{ URLs: []string{fakeInfluxServer.URL + "/endpoint"}, } var acc testutil.Accumulator require.NoError(t, plugin.Gather(&acc)) require.Len(t, acc.Metrics, 63) fields := map[string]interface{}{ "alloc_bytes": int64(6950624), "buck_hash_sys_bytes": int64(1446737), "frees": int64(129656), "gc_cpu_fraction": float64(0.006757149597237818), "gc_sys_bytes": int64(575488), "heap_alloc_bytes": int64(6950624), "heap_idle_bytes": int64(499712), "heap_in_use_bytes": int64(9166848), "heap_objects": int64(28070), "heap_released_bytes": int64(0), "heap_sys_bytes": int64(9666560), "last_gc_ns": int64(1478813691405406556), "lookups": int64(40), "mallocs": int64(157726), "mcache_in_use_bytes": int64(9600), "mcache_sys_bytes": int64(16384), "mspan_in_use_bytes": int64(105600), "mspan_sys_bytes": int64(114688), "next_gc_ns": int64(10996691), "num_gc": int64(4), "other_sys_bytes": int64(1985959), "pause_total_ns": int64(767327), "stack_in_use_bytes": int64(819200), "stack_sys_bytes": int64(819200), "sys_bytes": int64(14625016), "total_alloc_bytes": int64(13475176), } tags := map[string]string{ "kap_version": "1.1.0~rc2", "url": fakeInfluxServer.URL + "/endpoint", } acc.AssertContainsTaggedFields(t, "kapacitor_memstats", fields, tags) acc.AssertContainsTaggedFields(t, "kapacitor", map[string]interface{}{ "num_enabled_tasks": 5, "num_subscriptions": 6, "num_tasks": 5, }, tags) } func TestMissingStats(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { if _, err := w.Write([]byte(`{}`)); err != nil { w.WriteHeader(http.StatusInternalServerError) t.Error(err) return } })) defer server.Close() plugin := &kapacitor.Kapacitor{ URLs: []string{server.URL}, } var acc testutil.Accumulator require.NoError(t, plugin.Gather(&acc)) require.False(t, acc.HasField("kapacitor_memstats", "alloc_bytes")) require.True(t, acc.HasField("kapacitor", "num_tasks")) } func TestErrorHandling(t *testing.T) { badServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "/endpoint" { if _, err := w.Write([]byte("not json")); err != nil { w.WriteHeader(http.StatusInternalServerError) t.Error(err) return } } else { w.WriteHeader(http.StatusNotFound) } })) defer badServer.Close() plugin := &kapacitor.Kapacitor{ URLs: []string{badServer.URL + "/endpoint"}, } var acc testutil.Accumulator require.NoError(t, plugin.Gather(&acc)) acc.WaitError(1) require.Equal(t, uint64(0), acc.NMetrics()) } func TestErrorHandling404(t *testing.T) { badServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusNotFound) })) defer badServer.Close() plugin := &kapacitor.Kapacitor{ URLs: []string{badServer.URL}, } var acc testutil.Accumulator require.NoError(t, plugin.Gather(&acc)) acc.WaitError(1) require.Equal(t, uint64(0), acc.NMetrics()) }