178 lines
4.7 KiB
Go
178 lines
4.7 KiB
Go
package quix
|
|
|
|
import (
|
|
"crypto/rand"
|
|
"encoding/json"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/golang-jwt/jwt/v5"
|
|
"github.com/stretchr/testify/require"
|
|
kafkacontainer "github.com/testcontainers/testcontainers-go/modules/kafka"
|
|
|
|
"github.com/influxdata/telegraf/config"
|
|
"github.com/influxdata/telegraf/testutil"
|
|
)
|
|
|
|
func TestMissingTopic(t *testing.T) {
|
|
plugin := &Quix{}
|
|
require.ErrorContains(t, plugin.Init(), "option 'topic' must be set")
|
|
}
|
|
|
|
func TestMissingWorkspace(t *testing.T) {
|
|
plugin := &Quix{Topic: "foo"}
|
|
require.ErrorContains(t, plugin.Init(), "option 'workspace' must be set")
|
|
}
|
|
|
|
func TestMissingToken(t *testing.T) {
|
|
plugin := &Quix{Topic: "foo", Workspace: "bar"}
|
|
require.ErrorContains(t, plugin.Init(), "option 'token' must be set")
|
|
}
|
|
|
|
func TestDefaultURL(t *testing.T) {
|
|
plugin := &Quix{
|
|
Topic: "foo",
|
|
Workspace: "bar",
|
|
Token: config.NewSecret([]byte("secret")),
|
|
}
|
|
require.NoError(t, plugin.Init())
|
|
require.Equal(t, "https://portal-api.platform.quix.io", plugin.APIURL)
|
|
}
|
|
|
|
func TestFetchingConfig(t *testing.T) {
|
|
// Setup HTTP test-server for providing the broker config
|
|
brokerCfg := []byte(`
|
|
{
|
|
"bootstrap.servers":"servers",
|
|
"sasl.mechanism":"mechanism",
|
|
"sasl.username":"user",
|
|
"sasl.password":"password",
|
|
"security.protocol":"protocol",
|
|
"ssl.ca.cert":"Y2VydA=="
|
|
}
|
|
`)
|
|
server := httptest.NewServer(
|
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
if r.URL.Path != "/workspaces/bar/broker/librdkafka" {
|
|
w.WriteHeader(http.StatusNotFound)
|
|
return
|
|
}
|
|
if r.Header.Get("Authorization") != "Bearer bXkgc2VjcmV0" {
|
|
w.WriteHeader(http.StatusUnauthorized)
|
|
return
|
|
}
|
|
if r.Header.Get("Accept") != "application/json" {
|
|
w.WriteHeader(http.StatusUnsupportedMediaType)
|
|
return
|
|
}
|
|
if _, err := w.Write(brokerCfg); err != nil {
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
t.Error(err)
|
|
}
|
|
}),
|
|
)
|
|
defer server.Close()
|
|
|
|
// Setup the plugin and fetch the config
|
|
plugin := &Quix{
|
|
APIURL: server.URL,
|
|
Topic: "foo",
|
|
Workspace: "bar",
|
|
Token: config.NewSecret([]byte("bXkgc2VjcmV0")),
|
|
}
|
|
require.NoError(t, plugin.Init())
|
|
|
|
// Check the config
|
|
expected := &brokerConfig{
|
|
BootstrapServers: "servers",
|
|
SaslMechanism: "mechanism",
|
|
SaslUsername: "user",
|
|
SaslPassword: "password",
|
|
SecurityProtocol: "protocol",
|
|
SSLCertBase64: "Y2VydA==",
|
|
cert: []byte("cert"),
|
|
}
|
|
cfg, err := plugin.fetchBrokerConfig()
|
|
require.NoError(t, err)
|
|
require.Equal(t, expected, cfg)
|
|
}
|
|
|
|
func TestConnectAndWriteIntegration(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("Skipping integration test in short mode")
|
|
}
|
|
|
|
// Setup common config params
|
|
workspace := "test"
|
|
topic := "telegraf"
|
|
|
|
// Setup a kafka container
|
|
kafkaContainer, err := kafkacontainer.Run(t.Context(), "confluentinc/confluent-local:7.5.0")
|
|
require.NoError(t, err)
|
|
defer kafkaContainer.Terminate(t.Context()) //nolint:errcheck // ignored
|
|
|
|
brokers, err := kafkaContainer.Brokers(t.Context())
|
|
require.NoError(t, err)
|
|
|
|
// Setup broker config distributed via HTTP
|
|
brokerCfg := &brokerConfig{
|
|
BootstrapServers: strings.Join(brokers, ","),
|
|
SecurityProtocol: "PLAINTEXT",
|
|
}
|
|
response, err := json.Marshal(brokerCfg)
|
|
require.NoError(t, err)
|
|
|
|
// Setup authentication
|
|
signingKey := make([]byte, 64)
|
|
_, err = rand.Read(signingKey)
|
|
require.NoError(t, err)
|
|
|
|
tokenRaw := jwt.NewWithClaims(jwt.SigningMethodHS256, &jwt.RegisteredClaims{
|
|
ExpiresAt: jwt.NewNumericDate(time.Now().Add(1 * time.Minute)),
|
|
Issuer: "quix test",
|
|
})
|
|
token, err := tokenRaw.SignedString(signingKey)
|
|
require.NoError(t, err)
|
|
|
|
// Setup HTTP test-server for providing the broker config
|
|
path := "/workspaces/" + workspace + "/broker/librdkafka"
|
|
server := httptest.NewServer(
|
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
if r.URL.Path != path {
|
|
w.WriteHeader(http.StatusNotFound)
|
|
t.Logf("invalid path %q", r.URL.Path)
|
|
return
|
|
}
|
|
if r.Header.Get("Authorization") != "Bearer "+token {
|
|
w.WriteHeader(http.StatusUnauthorized)
|
|
return
|
|
}
|
|
if r.Header.Get("Accept") != "application/json" {
|
|
w.WriteHeader(http.StatusUnsupportedMediaType)
|
|
return
|
|
}
|
|
if _, err := w.Write(response); err != nil {
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
t.Error(err)
|
|
}
|
|
}),
|
|
)
|
|
defer server.Close()
|
|
|
|
// Setup the plugin and establish connection
|
|
plugin := &Quix{
|
|
APIURL: server.URL,
|
|
Workspace: workspace,
|
|
Topic: topic,
|
|
Token: config.NewSecret([]byte(token)),
|
|
}
|
|
require.NoError(t, plugin.Init())
|
|
require.NoError(t, plugin.Connect())
|
|
defer plugin.Close()
|
|
|
|
// Verify that we can successfully write data to the kafka broker
|
|
require.NoError(t, plugin.Write(testutil.MockMetrics()))
|
|
}
|