1
0
Fork 0
telegraf/plugins/inputs/p4runtime/p4runtime_test.go

615 lines
15 KiB
Go
Raw Permalink Normal View History

package p4runtime
import (
"errors"
"fmt"
"net"
"testing"
"time"
p4_config "github.com/p4lang/p4runtime/go/p4/config/v1"
p4 "github.com/p4lang/p4runtime/go/p4/v1"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
)
// CounterSpec available here https://github.com/p4lang/p4runtime/blob/main/proto/p4/config/v1/p4info.proto#L289
func createCounter(
name string,
id uint32,
unit p4_config.CounterSpec_Unit,
) *p4_config.Counter {
return &p4_config.Counter{
Preamble: &p4_config.Preamble{Name: name, Id: id},
Spec: &p4_config.CounterSpec{Unit: unit},
}
}
func createEntityCounterEntry(
counterID uint32,
index int64,
data *p4.CounterData,
) *p4.Entity_CounterEntry {
return &p4.Entity_CounterEntry{
CounterEntry: &p4.CounterEntry{
CounterId: counterID,
Index: &p4.Index{Index: index},
Data: data,
},
}
}
func newTestP4RuntimeClient(
p4RuntimeClient *fakeP4RuntimeClient,
addr string,
t *testing.T,
) *P4runtime {
conn, err := grpc.NewClient(
addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
require.NoError(t, err)
return &P4runtime{
Endpoint: addr,
DeviceID: uint64(1),
Log: testutil.Logger{},
conn: conn,
client: p4RuntimeClient,
}
}
func TestInitDefault(t *testing.T) {
plugin := &P4runtime{Log: testutil.Logger{}}
require.NoError(t, plugin.Init())
require.Equal(t, "127.0.0.1:9559", plugin.Endpoint)
require.Equal(t, uint64(0), plugin.DeviceID)
require.Empty(t, plugin.CounterNamesInclude)
require.False(t, plugin.EnableTLS)
}
func TestErrorGetP4Info(t *testing.T) {
responses := []struct {
getForwardingPipelineConfigResponse *p4.GetForwardingPipelineConfigResponse
getForwardingPipelineConfigResponseError error
}{
{
getForwardingPipelineConfigResponse: nil,
getForwardingPipelineConfigResponseError: errors.New("error when retrieving forwarding pipeline config"),
}, {
getForwardingPipelineConfigResponse: &p4.GetForwardingPipelineConfigResponse{
Config: nil,
},
getForwardingPipelineConfigResponseError: nil,
}, {
getForwardingPipelineConfigResponse: &p4.GetForwardingPipelineConfigResponse{
Config: &p4.ForwardingPipelineConfig{P4Info: nil},
},
getForwardingPipelineConfigResponseError: nil,
},
}
for _, response := range responses {
p4RtClient := &fakeP4RuntimeClient{
getForwardingPipelineConfigFn: func() (*p4.GetForwardingPipelineConfigResponse, error) {
return response.getForwardingPipelineConfigResponse, response.getForwardingPipelineConfigResponseError
},
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
plugin := newTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t)
var acc testutil.Accumulator
require.Error(t, plugin.Gather(&acc))
}
}
func TestOneCounterRead(t *testing.T) {
tests := []struct {
forwardingPipelineConfig *p4.ForwardingPipelineConfig
EntityCounterEntry *p4.Entity_CounterEntry
expected []telegraf.Metric
}{
{
forwardingPipelineConfig: &p4.ForwardingPipelineConfig{
P4Info: &p4_config.P4Info{
Counters: []*p4_config.Counter{
createCounter("foo", 1111, p4_config.CounterSpec_BOTH),
},
PkgInfo: &p4_config.PkgInfo{Name: "P4Program"},
},
},
EntityCounterEntry: createEntityCounterEntry(
1111,
5,
&p4.CounterData{ByteCount: 5, PacketCount: 1},
),
expected: []telegraf.Metric{testutil.MustMetric(
"p4_runtime",
map[string]string{
"p4program_name": "P4Program",
"counter_name": "foo",
"counter_type": "BOTH",
},
map[string]interface{}{
"bytes": int64(5),
"packets": int64(1),
"counter_index": 5},
time.Unix(0, 0)),
},
}, {
forwardingPipelineConfig: &p4.ForwardingPipelineConfig{
P4Info: &p4_config.P4Info{
Counters: []*p4_config.Counter{
createCounter(
"foo",
2222,
p4_config.CounterSpec_BYTES,
),
},
PkgInfo: &p4_config.PkgInfo{Name: "P4Program"},
},
},
EntityCounterEntry: createEntityCounterEntry(
2222,
5,
&p4.CounterData{ByteCount: 5},
),
expected: []telegraf.Metric{testutil.MustMetric(
"p4_runtime",
map[string]string{
"p4program_name": "P4Program",
"counter_name": "foo",
"counter_type": "BYTES",
},
map[string]interface{}{
"bytes": int64(5),
"packets": int64(0),
"counter_index": 5},
time.Unix(0, 0)),
},
}, {
forwardingPipelineConfig: &p4.ForwardingPipelineConfig{
P4Info: &p4_config.P4Info{
Counters: []*p4_config.Counter{
createCounter(
"foo",
3333,
p4_config.CounterSpec_PACKETS,
),
},
PkgInfo: &p4_config.PkgInfo{Name: "P4Program"},
},
},
EntityCounterEntry: createEntityCounterEntry(
3333,
5,
&p4.CounterData{PacketCount: 1},
),
expected: []telegraf.Metric{testutil.MustMetric(
"p4_runtime",
map[string]string{
"p4program_name": "P4Program",
"counter_name": "foo",
"counter_type": "PACKETS",
},
map[string]interface{}{
"bytes": int64(0),
"packets": int64(1),
"counter_index": 5},
time.Unix(0, 0)),
},
}, {
forwardingPipelineConfig: &p4.ForwardingPipelineConfig{
P4Info: &p4_config.P4Info{
Counters: []*p4_config.Counter{
createCounter("foo", 4444, p4_config.CounterSpec_BOTH),
},
PkgInfo: &p4_config.PkgInfo{Name: "P4Program"},
},
},
EntityCounterEntry: createEntityCounterEntry(
4444,
5,
&p4.CounterData{},
),
expected: nil,
},
}
for _, tt := range tests {
p4RtReadClient := &fakeP4RuntimeReadClient{
recvFn: func() (*p4.ReadResponse, error) {
return &p4.ReadResponse{
Entities: []*p4.Entity{{Entity: tt.EntityCounterEntry}},
}, nil
},
}
p4RtClient := &fakeP4RuntimeClient{
readFn: func(*p4.ReadRequest) (p4.P4Runtime_ReadClient, error) {
return p4RtReadClient, nil
},
getForwardingPipelineConfigFn: func() (*p4.GetForwardingPipelineConfigResponse, error) {
return &p4.GetForwardingPipelineConfigResponse{
Config: tt.forwardingPipelineConfig,
}, nil
},
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
plugin := newTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t)
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
testutil.RequireMetricsEqual(
t,
tt.expected,
acc.GetTelegrafMetrics(),
testutil.IgnoreTime(),
)
}
}
func TestMultipleEntitiesSingleCounterRead(t *testing.T) {
totalNumOfEntriesArr := [3]int{2, 10, 100}
for _, totalNumOfEntries := range totalNumOfEntriesArr {
var expected []telegraf.Metric
fmt.Println(
"Running TestMultipleEntitiesSingleCounterRead with ",
totalNumOfEntries,
"totalNumOfCounters",
)
entities := make([]*p4.Entity, 0, totalNumOfEntries)
p4InfoCounters := make([]*p4_config.Counter, 0, totalNumOfEntries)
p4InfoCounters = append(
p4InfoCounters,
createCounter("foo", 0, p4_config.CounterSpec_BOTH),
)
for i := 0; i < totalNumOfEntries; i++ {
counterEntry := &p4.Entity{
Entity: createEntityCounterEntry(
0,
int64(i),
&p4.CounterData{
ByteCount: int64(10),
PacketCount: int64(10),
},
),
}
entities = append(entities, counterEntry)
expected = append(expected, testutil.MustMetric(
"p4_runtime",
map[string]string{
"p4program_name": "P4Program",
"counter_name": "foo",
"counter_type": "BOTH",
},
map[string]interface{}{
"bytes": int64(10),
"packets": int64(10),
"counter_index": i,
},
time.Unix(0, 0),
))
}
forwardingPipelineConfig := &p4.ForwardingPipelineConfig{
P4Info: &p4_config.P4Info{
Counters: p4InfoCounters,
PkgInfo: &p4_config.PkgInfo{Name: "P4Program"},
},
}
p4RtReadClient := &fakeP4RuntimeReadClient{
recvFn: func() (*p4.ReadResponse, error) {
return &p4.ReadResponse{Entities: entities}, nil
},
}
p4RtClient := &fakeP4RuntimeClient{
readFn: func(*p4.ReadRequest) (p4.P4Runtime_ReadClient, error) {
return p4RtReadClient, nil
},
getForwardingPipelineConfigFn: func() (*p4.GetForwardingPipelineConfigResponse, error) {
return &p4.GetForwardingPipelineConfigResponse{
Config: forwardingPipelineConfig,
}, nil
},
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
plugin := newTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t)
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
acc.Wait(totalNumOfEntries)
testutil.RequireMetricsEqual(
t,
expected,
acc.GetTelegrafMetrics(),
testutil.IgnoreTime(),
)
}
}
func TestSingleEntitiesMultipleCounterRead(t *testing.T) {
totalNumOfCountersArr := [3]int{2, 10, 100}
for _, totalNumOfCounters := range totalNumOfCountersArr {
var expected []telegraf.Metric
fmt.Println(
"Running TestSingleEntitiesMultipleCounterRead with ",
totalNumOfCounters,
"totalNumOfCounters",
)
p4InfoCounters := make([]*p4_config.Counter, 0, totalNumOfCounters)
for i := 1; i <= totalNumOfCounters; i++ {
counterName := fmt.Sprintf("foo%v", i)
p4InfoCounters = append(
p4InfoCounters,
createCounter(
counterName,
uint32(i),
p4_config.CounterSpec_BOTH,
),
)
expected = append(expected, testutil.MustMetric(
"p4_runtime",
map[string]string{
"p4program_name": "P4Program",
"counter_name": counterName,
"counter_type": "BOTH",
},
map[string]interface{}{
"bytes": int64(10),
"packets": int64(10),
"counter_index": 1,
},
time.Unix(0, 0),
))
}
forwardingPipelineConfig := &p4.ForwardingPipelineConfig{
P4Info: &p4_config.P4Info{
Counters: p4InfoCounters,
PkgInfo: &p4_config.PkgInfo{Name: "P4Program"},
},
}
p4RtClient := &fakeP4RuntimeClient{
readFn: func(in *p4.ReadRequest) (p4.P4Runtime_ReadClient, error) {
counterID := in.Entities[0].GetCounterEntry().CounterId
return &fakeP4RuntimeReadClient{
recvFn: func() (*p4.ReadResponse, error) {
return &p4.ReadResponse{
Entities: []*p4.Entity{{
Entity: createEntityCounterEntry(
counterID,
1,
&p4.CounterData{
ByteCount: 10,
PacketCount: 10,
},
),
}},
}, nil
},
}, nil
},
getForwardingPipelineConfigFn: func() (*p4.GetForwardingPipelineConfigResponse, error) {
return &p4.GetForwardingPipelineConfigResponse{
Config: forwardingPipelineConfig,
}, nil
},
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
plugin := newTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t)
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
acc.Wait(totalNumOfCounters)
testutil.RequireMetricsEqual(
t,
expected,
acc.GetTelegrafMetrics(),
testutil.SortMetrics(),
testutil.IgnoreTime(),
)
}
}
func TestNoCountersAvailable(t *testing.T) {
forwardingPipelineConfig := &p4.ForwardingPipelineConfig{
P4Info: &p4_config.P4Info{Counters: make([]*p4_config.Counter, 0)},
}
p4RtClient := &fakeP4RuntimeClient{
getForwardingPipelineConfigFn: func() (*p4.GetForwardingPipelineConfigResponse, error) {
return &p4.GetForwardingPipelineConfigResponse{
Config: forwardingPipelineConfig,
}, nil
},
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
plugin := newTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t)
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
}
func TestFilterCounters(t *testing.T) {
forwardingPipelineConfig := &p4.ForwardingPipelineConfig{
P4Info: &p4_config.P4Info{
Counters: []*p4_config.Counter{
createCounter("foo", 1, p4_config.CounterSpec_BOTH),
},
PkgInfo: &p4_config.PkgInfo{Name: "P4Program"},
},
}
p4RtClient := &fakeP4RuntimeClient{
getForwardingPipelineConfigFn: func() (*p4.GetForwardingPipelineConfigResponse, error) {
return &p4.GetForwardingPipelineConfigResponse{
Config: forwardingPipelineConfig,
}, nil
},
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
plugin := newTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t)
plugin.CounterNamesInclude = []string{"oof"}
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
testutil.RequireMetricsEqual(
t,
nil,
acc.GetTelegrafMetrics(),
testutil.IgnoreTime(),
)
}
func TestFailReadCounterEntryFromEntry(t *testing.T) {
p4RtReadClient := &fakeP4RuntimeReadClient{
recvFn: func() (*p4.ReadResponse, error) {
return &p4.ReadResponse{
Entities: []*p4.Entity{{
Entity: &p4.Entity_TableEntry{
TableEntry: &p4.TableEntry{},
}}}}, nil
},
}
p4RtClient := &fakeP4RuntimeClient{
readFn: func(*p4.ReadRequest) (p4.P4Runtime_ReadClient, error) {
return p4RtReadClient, nil
},
getForwardingPipelineConfigFn: func() (*p4.GetForwardingPipelineConfigResponse, error) {
return &p4.GetForwardingPipelineConfigResponse{
Config: &p4.ForwardingPipelineConfig{
P4Info: &p4_config.P4Info{
Counters: []*p4_config.Counter{
createCounter(
"foo",
1111,
p4_config.CounterSpec_BOTH,
),
},
PkgInfo: &p4_config.PkgInfo{Name: "P4Program"},
},
},
}, nil
},
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
plugin := newTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t)
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
require.Equal(
t,
errors.New("reading counter entry from entry table_entry:{} failed"),
acc.Errors[0],
)
testutil.RequireMetricsEqual(
t,
nil,
acc.GetTelegrafMetrics(),
testutil.IgnoreTime(),
)
}
func TestFailReadAllEntries(t *testing.T) {
p4RtClient := &fakeP4RuntimeClient{
readFn: func(*p4.ReadRequest) (p4.P4Runtime_ReadClient, error) {
return nil, errors.New("connection error")
},
getForwardingPipelineConfigFn: func() (*p4.GetForwardingPipelineConfigResponse, error) {
return &p4.GetForwardingPipelineConfigResponse{
Config: &p4.ForwardingPipelineConfig{
P4Info: &p4_config.P4Info{
Counters: []*p4_config.Counter{
createCounter(
"foo",
1111,
p4_config.CounterSpec_BOTH,
),
},
PkgInfo: &p4_config.PkgInfo{Name: "P4Program"},
},
},
}, nil
},
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
plugin := newTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t)
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
require.Equal(
t,
acc.Errors[0],
fmt.Errorf("reading counter entries with ID=1111 failed with error: %w", errors.New("connection error")),
)
testutil.RequireMetricsEqual(
t,
nil,
acc.GetTelegrafMetrics(),
testutil.IgnoreTime(),
)
}
func TestFilterCounterNamesInclude(t *testing.T) {
counters := []*p4_config.Counter{
createCounter("foo", 1, p4_config.CounterSpec_BOTH),
createCounter("bar", 2, p4_config.CounterSpec_BOTH),
nil,
createCounter("", 3, p4_config.CounterSpec_BOTH),
}
counterNamesInclude := []string{"bar"}
filteredCounters := filterCounters(counters, counterNamesInclude)
require.Equal(
t,
[]*p4_config.Counter{
createCounter("bar", 2, p4_config.CounterSpec_BOTH),
}, filteredCounters,
)
}