1
0
Fork 0
telegraf/plugins/inputs/rabbitmq/rabbitmq_test.go

689 lines
23 KiB
Go
Raw Permalink Normal View History

package rabbitmq
import (
"errors"
"fmt"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
)
func TestRabbitMQGeneratesMetricsSet1(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var jsonFilePath string
switch r.URL.Path {
case "/api/overview":
jsonFilePath = "testdata/set1/overview.json"
case "/api/nodes":
jsonFilePath = "testdata/set1/nodes.json"
case "/api/queues":
jsonFilePath = "testdata/set1/queues.json"
case "/api/exchanges":
jsonFilePath = "testdata/set1/exchanges.json"
case "/api/federation-links":
jsonFilePath = "testdata/set1/federation-links.json"
case "/api/nodes/rabbit@vagrant-ubuntu-trusty-64/memory":
jsonFilePath = "testdata/set1/memory.json"
default:
w.WriteHeader(http.StatusInternalServerError)
t.Errorf("unknown path %q", r.URL.Path)
return
}
data, err := os.ReadFile(jsonFilePath)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
t.Errorf("Could not read from data file %q: %v", jsonFilePath, err)
return
}
if _, err = w.Write(data); err != nil {
w.WriteHeader(http.StatusInternalServerError)
t.Error(err)
return
}
}))
defer ts.Close()
// Define test cases
expected := []telegraf.Metric{
testutil.MustMetric("rabbitmq_overview",
map[string]string{
"url": ts.URL,
},
map[string]interface{}{
"messages": int64(5),
"messages_ready": int64(32),
"messages_unacked": int64(27),
"messages_acked": int64(5246),
"messages_delivered": int64(5234),
"messages_delivered_get": int64(3333),
"messages_published": int64(5258),
"channels": int64(44),
"connections": int64(44),
"consumers": int64(65),
"exchanges": int64(43),
"queues": int64(62),
"clustering_listeners": int64(2),
"amqp_listeners": int64(2),
"return_unroutable": int64(10),
"return_unroutable_rate": float64(3.3),
},
time.Unix(0, 0),
),
testutil.MustMetric("rabbitmq_queue",
map[string]string{
"auto_delete": "false",
"durable": "false",
"node": "rabbit@rmqlocal-0.rmqlocal.ankorabbitstatefulset3.svc.cluster.local",
"queue": "reply_a716f0523cd44941ad2ea6ce4a3869c3",
"url": ts.URL,
"vhost": "sorandomsorandom",
},
map[string]interface{}{
"consumers": int64(3),
"consumer_utilisation": float64(1.0),
"head_message_timestamp": int64(1446362534),
"memory": int64(143776),
"message_bytes": int64(3),
"message_bytes_ready": int64(4),
"message_bytes_unacked": int64(5),
"message_bytes_ram": int64(6),
"message_bytes_persist": int64(7),
"messages": int64(44),
"messages_ready": int64(32),
"messages_unack": int64(44),
"messages_ack": int64(3457),
"messages_ack_rate": float64(9.9),
"messages_deliver": int64(22222),
"messages_deliver_rate": float64(333.4),
"messages_deliver_get": int64(3457),
"messages_deliver_get_rate": float64(0.2),
"messages_publish": int64(3457),
"messages_publish_rate": float64(11.2),
"messages_redeliver": int64(33),
"messages_redeliver_rate": float64(2.5),
"idle_since": "2015-11-01 8:22:14",
"slave_nodes": int64(1),
"synchronised_slave_nodes": int64(1),
},
time.Unix(0, 0),
),
testutil.MustMetric("rabbitmq_node",
map[string]string{
"node": "rabbit@vagrant-ubuntu-trusty-64",
"url": ts.URL,
},
map[string]interface{}{
"disk_free": int64(3776),
"disk_free_limit": int64(50000000),
"disk_free_alarm": int64(0),
"fd_total": int64(1024),
"fd_used": int64(63),
"mem_limit": int64(2503),
"mem_used": int64(159707080),
"mem_alarm": int64(1),
"proc_total": int64(1048576),
"proc_used": int64(783),
"run_queue": int64(0),
"sockets_total": int64(829),
"sockets_used": int64(45),
"uptime": int64(7464827),
"running": int64(1),
"mnesia_disk_tx_count": int64(16),
"mnesia_ram_tx_count": int64(296),
"mnesia_disk_tx_count_rate": float64(1.1),
"mnesia_ram_tx_count_rate": float64(2.2),
"gc_num": int64(57280132),
"gc_bytes_reclaimed": int64(2533),
"gc_num_rate": float64(274.2),
"gc_bytes_reclaimed_rate": float64(16490856.3),
"io_read_avg_time": float64(983.0),
"io_read_avg_time_rate": float64(88.77),
"io_read_bytes": int64(1111),
"io_read_bytes_rate": float64(99.99),
"io_write_avg_time": float64(134.0),
"io_write_avg_time_rate": float64(4.32),
"io_write_bytes": int64(823),
"io_write_bytes_rate": float64(32.8),
"mem_connection_readers": int64(1234),
"mem_connection_writers": int64(5678),
"mem_connection_channels": int64(1133),
"mem_connection_other": int64(2840),
"mem_queue_procs": int64(2840),
"mem_queue_slave_procs": int64(0),
"mem_plugins": int64(1755976),
"mem_other_proc": int64(23056584),
"mem_metrics": int64(196536),
"mem_mgmt_db": int64(491272),
"mem_mnesia": int64(115600),
"mem_other_ets": int64(2121872),
"mem_binary": int64(418848),
"mem_msg_index": int64(42848),
"mem_code": int64(25179322),
"mem_atom": int64(1041593),
"mem_other_system": int64(14741981),
"mem_allocated_unused": int64(38208528),
"mem_reserved_unallocated": int64(0),
"mem_total": int64(83025920),
},
time.Unix(0, 0),
),
testutil.MustMetric("rabbitmq_exchange",
map[string]string{
"auto_delete": "true",
"durable": "false",
"exchange": "reply_a716f0523cd44941ad2ea6ce4a3869c3",
"internal": "false",
"type": "direct",
"url": ts.URL,
"vhost": "sorandomsorandom",
},
map[string]interface{}{
"messages_publish_in": int64(3678),
"messages_publish_in_rate": float64(3.2),
"messages_publish_out": int64(3677),
"messages_publish_out_rate": float64(5.1),
},
time.Unix(0, 0),
),
testutil.MustMetric("rabbitmq_federation",
map[string]string{
"queue": "exampleLocalQueue",
"type": "queue",
"upstream": "ExampleFederationUpstream",
"upstream_queue": "exampleUpstreamQueue",
"url": ts.URL,
"vhost": "/",
},
map[string]interface{}{
"acks_uncommitted": int64(1),
"consumers": int64(2),
"messages_unacknowledged": int64(3),
"messages_uncommitted": int64(4),
"messages_unconfirmed": int64(5),
"messages_confirm": int64(67),
"messages_publish": int64(890),
"messages_return_unroutable": int64(1),
},
time.Unix(0, 0),
),
}
// Run the test
plugin := &RabbitMQ{
URL: ts.URL,
Log: testutil.Logger{},
}
require.NoError(t, plugin.Init())
acc := &testutil.Accumulator{}
require.NoError(t, plugin.Gather(acc))
acc.Wait(len(expected))
require.Empty(t, acc.Errors)
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime(), testutil.SortMetrics())
}
func TestRabbitMQGeneratesMetricsSet2(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var jsonFilePath string
switch r.URL.Path {
case "/api/overview":
jsonFilePath = "testdata/set2/overview.json"
case "/api/nodes":
jsonFilePath = "testdata/set2/nodes.json"
case "/api/queues":
jsonFilePath = "testdata/set2/queues.json"
case "/api/exchanges":
jsonFilePath = "testdata/set2/exchanges.json"
case "/api/federation-links":
jsonFilePath = "testdata/set2/federation-links.json"
case "/api/nodes/rabbit@rmqserver/memory":
jsonFilePath = "testdata/set2/memory.json"
default:
w.WriteHeader(http.StatusInternalServerError)
t.Errorf("unknown path %q", r.URL.Path)
return
}
data, err := os.ReadFile(jsonFilePath)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
t.Errorf("Could not read from data file %q: %v", jsonFilePath, err)
return
}
if _, err = w.Write(data); err != nil {
w.WriteHeader(http.StatusInternalServerError)
t.Error(err)
return
}
}))
defer ts.Close()
// Define test cases
expected := []telegraf.Metric{
testutil.MustMetric("rabbitmq_overview",
map[string]string{
"url": ts.URL,
},
map[string]interface{}{
"messages": int64(30),
"messages_ready": int64(30),
"messages_unacked": int64(0),
"messages_acked": int64(3736443),
"messages_delivered": int64(3736446),
"messages_delivered_get": int64(3736446),
"messages_published": int64(770025),
"channels": int64(43),
"connections": int64(43),
"consumers": int64(37),
"exchanges": int64(8),
"queues": int64(34),
"clustering_listeners": int64(1),
"amqp_listeners": int64(2),
"return_unroutable": int64(0),
"return_unroutable_rate": float64(0.0),
},
time.Unix(0, 0),
),
testutil.MustMetric("rabbitmq_queue",
map[string]string{
"auto_delete": "false",
"durable": "false",
"node": "rabbit@rmqserver",
"queue": "39fd2caf-63e5-41e3-c15a-ba8fa11434b2",
"url": ts.URL,
"vhost": "/",
},
map[string]interface{}{
"consumers": int64(1),
"consumer_utilisation": float64(1.0),
"memory": int64(15840),
"message_bytes": int64(0),
"message_bytes_ready": int64(0),
"message_bytes_unacked": int64(0),
"message_bytes_ram": int64(0),
"message_bytes_persist": int64(0),
"messages": int64(0),
"messages_ready": int64(0),
"messages_unack": int64(0),
"messages_ack": int64(180),
"messages_ack_rate": float64(0.0),
"messages_deliver": int64(180),
"messages_deliver_rate": float64(0.0),
"messages_deliver_get": int64(180),
"messages_deliver_get_rate": float64(0.0),
"messages_publish": int64(180),
"messages_publish_rate": float64(0.0),
"messages_redeliver": int64(0),
"messages_redeliver_rate": float64(0.0),
"idle_since": "2021-06-28 15:54:14",
"slave_nodes": int64(0),
"synchronised_slave_nodes": int64(0),
},
time.Unix(0, 0),
),
testutil.MustMetric("rabbitmq_queue",
map[string]string{
"auto_delete": "false",
"durable": "false",
"node": "rabbit@rmqserver",
"queue": "39fd2cb4-aa2d-c08b-457a-62d0893523a1",
"url": ts.URL,
"vhost": "/",
},
map[string]interface{}{
"consumers": int64(1),
"consumer_utilisation": float64(1.0),
"memory": int64(15600),
"message_bytes": int64(0),
"message_bytes_ready": int64(0),
"message_bytes_unacked": int64(0),
"message_bytes_ram": int64(0),
"message_bytes_persist": int64(0),
"messages": int64(0),
"messages_ready": int64(0),
"messages_unack": int64(0),
"messages_ack": int64(177),
"messages_ack_rate": float64(0.0),
"messages_deliver": int64(177),
"messages_deliver_rate": float64(0.0),
"messages_deliver_get": int64(177),
"messages_deliver_get_rate": float64(0.0),
"messages_publish": int64(177),
"messages_publish_rate": float64(0.0),
"messages_redeliver": int64(0),
"messages_redeliver_rate": float64(0.0),
"idle_since": "2021-06-28 15:54:14",
"slave_nodes": int64(0),
"synchronised_slave_nodes": int64(0),
},
time.Unix(0, 0),
),
testutil.MustMetric("rabbitmq_queue",
map[string]string{
"auto_delete": "false",
"durable": "false",
"node": "rabbit@rmqserver",
"queue": "39fd2cb5-3820-e01b-6e20-ba29d5553fc3",
"url": ts.URL,
"vhost": "/",
},
map[string]interface{}{
"consumers": int64(1),
"consumer_utilisation": float64(1.0),
"memory": int64(15584),
"message_bytes": int64(0),
"message_bytes_ready": int64(0),
"message_bytes_unacked": int64(0),
"message_bytes_ram": int64(0),
"message_bytes_persist": int64(0),
"messages": int64(0),
"messages_ready": int64(0),
"messages_unack": int64(0),
"messages_ack": int64(175),
"messages_ack_rate": float64(0.0),
"messages_deliver": int64(175),
"messages_deliver_rate": float64(0.0),
"messages_deliver_get": int64(175),
"messages_deliver_get_rate": float64(0.0),
"messages_publish": int64(175),
"messages_publish_rate": float64(0.0),
"messages_redeliver": int64(0),
"messages_redeliver_rate": float64(0.0),
"idle_since": "2021-06-28 15:54:15",
"slave_nodes": int64(0),
"synchronised_slave_nodes": int64(0),
},
time.Unix(0, 0),
),
testutil.MustMetric("rabbitmq_node",
map[string]string{
"node": "rabbit@rmqserver",
"url": ts.URL,
},
map[string]interface{}{
"disk_free": int64(25086496768),
"disk_free_limit": int64(50000000),
"disk_free_alarm": int64(0),
"fd_total": int64(65536),
"fd_used": int64(78),
"mem_limit": int64(1717546188),
"mem_used": int64(387645440),
"mem_alarm": int64(0),
"proc_total": int64(1048576),
"proc_used": int64(1128),
"run_queue": int64(1),
"sockets_total": int64(58893),
"sockets_used": int64(43),
"uptime": int64(4150152129),
"running": int64(1),
"mnesia_disk_tx_count": int64(103),
"mnesia_ram_tx_count": int64(2257),
"mnesia_disk_tx_count_rate": float64(0.0),
"mnesia_ram_tx_count_rate": float64(0.0),
"gc_num": int64(329526389),
"gc_bytes_reclaimed": int64(13660012170840),
"gc_num_rate": float64(125.2),
"gc_bytes_reclaimed_rate": float64(6583379.2),
"io_read_avg_time": float64(0.0),
"io_read_avg_time_rate": float64(0.0),
"io_read_bytes": int64(1),
"io_read_bytes_rate": float64(0.0),
"io_write_avg_time": float64(0.0),
"io_write_avg_time_rate": float64(0.0),
"io_write_bytes": int64(193066),
"io_write_bytes_rate": float64(0.0),
"mem_connection_readers": int64(1246768),
"mem_connection_writers": int64(72108),
"mem_connection_channels": int64(308588),
"mem_connection_other": int64(4883596),
"mem_queue_procs": int64(780996),
"mem_queue_slave_procs": int64(0),
"mem_plugins": int64(11932828),
"mem_other_proc": int64(39203520),
"mem_metrics": int64(626932),
"mem_mgmt_db": int64(3341264),
"mem_mnesia": int64(396016),
"mem_other_ets": int64(3771384),
"mem_binary": int64(209324208),
"mem_msg_index": int64(32648),
"mem_code": int64(32810827),
"mem_atom": int64(1458513),
"mem_other_system": int64(14284124),
"mem_allocated_unused": int64(61026048),
"mem_reserved_unallocated": int64(0),
"mem_total": int64(385548288),
},
time.Unix(0, 0),
),
testutil.MustMetric("rabbitmq_exchange",
map[string]string{
"auto_delete": "false",
"durable": "true",
"exchange": "",
"internal": "false",
"type": "direct",
"url": ts.URL,
"vhost": "/",
},
map[string]interface{}{
"messages_publish_in": int64(284725),
"messages_publish_in_rate": float64(0.0),
"messages_publish_out": int64(284572),
"messages_publish_out_rate": float64(0.0),
},
time.Unix(0, 0),
),
testutil.MustMetric("rabbitmq_exchange",
map[string]string{
"auto_delete": "false",
"durable": "true",
"exchange": "amq.direct",
"internal": "false",
"type": "direct",
"url": ts.URL,
"vhost": "/",
},
map[string]interface{}{
"messages_publish_in": int64(0),
"messages_publish_in_rate": float64(0.0),
"messages_publish_out": int64(0),
"messages_publish_out_rate": float64(0.0),
},
time.Unix(0, 0),
),
testutil.MustMetric("rabbitmq_exchange",
map[string]string{
"auto_delete": "false",
"durable": "true",
"exchange": "amq.fanout",
"internal": "false",
"type": "fanout",
"url": ts.URL,
"vhost": "/",
},
map[string]interface{}{
"messages_publish_in": int64(0),
"messages_publish_in_rate": float64(0.0),
"messages_publish_out": int64(0),
"messages_publish_out_rate": float64(0.0),
},
time.Unix(0, 0),
),
testutil.MustMetric("rabbitmq_exchange",
map[string]string{
"auto_delete": "false",
"durable": "true",
"exchange": "amq.headers",
"internal": "false",
"type": "headers",
"url": ts.URL,
"vhost": "/",
},
map[string]interface{}{
"messages_publish_in": int64(0),
"messages_publish_in_rate": float64(0.0),
"messages_publish_out": int64(0),
"messages_publish_out_rate": float64(0.0),
},
time.Unix(0, 0),
),
testutil.MustMetric("rabbitmq_exchange",
map[string]string{
"auto_delete": "false",
"durable": "true",
"exchange": "amq.match",
"internal": "false",
"type": "headers",
"url": ts.URL,
"vhost": "/",
},
map[string]interface{}{
"messages_publish_in": int64(0),
"messages_publish_in_rate": float64(0.0),
"messages_publish_out": int64(0),
"messages_publish_out_rate": float64(0.0),
},
time.Unix(0, 0),
),
testutil.MustMetric("rabbitmq_exchange",
map[string]string{
"auto_delete": "false",
"durable": "true",
"exchange": "amq.rabbitmq.trace",
"internal": "true",
"type": "topic",
"url": ts.URL,
"vhost": "/",
},
map[string]interface{}{
"messages_publish_in": int64(0),
"messages_publish_in_rate": float64(0.0),
"messages_publish_out": int64(0),
"messages_publish_out_rate": float64(0.0),
},
time.Unix(0, 0),
),
testutil.MustMetric("rabbitmq_exchange",
map[string]string{
"auto_delete": "false",
"durable": "true",
"exchange": "amq.topic",
"internal": "false",
"type": "topic",
"url": ts.URL,
"vhost": "/",
},
map[string]interface{}{
"messages_publish_in": int64(0),
"messages_publish_in_rate": float64(0.0),
"messages_publish_out": int64(0),
"messages_publish_out_rate": float64(0.0),
},
time.Unix(0, 0),
),
testutil.MustMetric("rabbitmq_exchange",
map[string]string{
"auto_delete": "true",
"durable": "false",
"exchange": "Exchange",
"internal": "false",
"type": "topic",
"url": ts.URL,
"vhost": "/",
},
map[string]interface{}{
"messages_publish_in": int64(18006),
"messages_publish_in_rate": float64(0.0),
"messages_publish_out": int64(60798),
"messages_publish_out_rate": float64(0.0),
},
time.Unix(0, 0),
),
}
expectedErrors := []error{
errors.New("error response trying to get \"/api/federation-links\": \"Object Not Found\" (reason: \"Not Found\")"),
}
// Run the test
plugin := &RabbitMQ{
URL: ts.URL,
Log: testutil.Logger{},
}
require.NoError(t, plugin.Init())
acc := &testutil.Accumulator{}
require.NoError(t, plugin.Gather(acc))
acc.Wait(len(expected))
require.Len(t, acc.Errors, len(expectedErrors))
require.ElementsMatch(t, expectedErrors, acc.Errors)
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime(), testutil.SortMetrics())
}
func TestRabbitMQMetricFilerts(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, fmt.Sprintf("unknown path %q", r.URL.Path), http.StatusNotFound)
}))
defer ts.Close()
metricErrors := map[string]error{
"exchange": errors.New("getting \"/api/exchanges\" failed: 404 Not Found"),
"federation": errors.New("getting \"/api/federation-links\" failed: 404 Not Found"),
"node": errors.New("getting \"/api/nodes\" failed: 404 Not Found"),
"overview": errors.New("getting \"/api/overview\" failed: 404 Not Found"),
"queue": errors.New("getting \"/api/queues\" failed: 404 Not Found"),
}
// Include test
for name, expected := range metricErrors {
plugin := &RabbitMQ{
URL: ts.URL,
Log: testutil.Logger{},
MetricInclude: []string{name},
}
require.NoError(t, plugin.Init())
acc := &testutil.Accumulator{}
require.NoError(t, plugin.Gather(acc))
require.Len(t, acc.Errors, 1)
require.ElementsMatch(t, []error{expected}, acc.Errors)
}
// Exclude test
for name := range metricErrors {
// Exclude the current metric error from the list of expected errors
var expected []error
for n, e := range metricErrors {
if n != name {
expected = append(expected, e)
}
}
plugin := &RabbitMQ{
URL: ts.URL,
Log: testutil.Logger{},
MetricExclude: []string{name},
}
require.NoError(t, plugin.Init())
acc := &testutil.Accumulator{}
require.NoError(t, plugin.Gather(acc))
require.Len(t, acc.Errors, len(expected))
require.ElementsMatch(t, expected, acc.Errors)
}
}