1
0
Fork 0
telegraf/models/buffer_suite_test.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

960 lines
35 KiB
Go

package models
import (
"os"
"testing"
"time"
"github.com/stretchr/testify/suite"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
)
type BufferSuiteTest struct {
suite.Suite
bufferType string
bufferPath string
hasMaxCapacity bool // whether the buffer type being tested supports a maximum metric capacity
}
func (s *BufferSuiteTest) SetupTest() {
switch s.bufferType {
case "", "memory":
s.hasMaxCapacity = true
case "disk":
path, err := os.MkdirTemp("", "*-buffer-test")
s.Require().NoError(err)
s.bufferPath = path
s.hasMaxCapacity = false
}
}
func (s *BufferSuiteTest) TearDownTest() {
if s.bufferPath != "" {
s.NoError(os.RemoveAll(s.bufferPath))
s.bufferPath = ""
}
}
func TestMemoryBufferSuite(t *testing.T) {
suite.Run(t, &BufferSuiteTest{bufferType: "memory"})
}
func TestDiskBufferSuite(t *testing.T) {
suite.Run(t, &BufferSuiteTest{bufferType: "disk"})
}
func (s *BufferSuiteTest) newTestBuffer(capacity int) Buffer {
s.T().Helper()
buf, err := NewBuffer("test", "123", "", capacity, s.bufferType, s.bufferPath)
s.Require().NoError(err)
buf.Stats().MetricsAdded.Set(0)
buf.Stats().MetricsWritten.Set(0)
buf.Stats().MetricsRejected.Set(0)
buf.Stats().MetricsDropped.Set(0)
return buf
}
func (s *BufferSuiteTest) TestBufferLenEmpty() {
buf := s.newTestBuffer(5)
defer buf.Close()
s.Equal(0, buf.Len())
}
func (s *BufferSuiteTest) TestBufferLenOne() {
buf := s.newTestBuffer(5)
defer buf.Close()
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m)
s.Equal(1, buf.Len())
}
func (s *BufferSuiteTest) TestBufferLenFull() {
buf := s.newTestBuffer(5)
defer buf.Close()
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m, m, m)
s.Equal(5, buf.Len())
}
func (s *BufferSuiteTest) TestBufferLenOverfill() {
if !s.hasMaxCapacity {
s.T().Skip("tested buffer does not have a maximum capacity")
}
buf := s.newTestBuffer(5)
defer buf.Close()
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m, m, m, m)
s.Equal(5, buf.Len())
}
func (s *BufferSuiteTest) TestBufferBatchLenZero() {
buf := s.newTestBuffer(5)
defer buf.Close()
tx := buf.BeginTransaction(0)
s.Empty(tx.Batch)
}
func (s *BufferSuiteTest) TestBufferBatchLenBufferEmpty() {
buf := s.newTestBuffer(5)
defer buf.Close()
tx := buf.BeginTransaction(2)
s.Empty(tx.Batch)
}
func (s *BufferSuiteTest) TestBufferBatchLenUnderfill() {
buf := s.newTestBuffer(5)
defer buf.Close()
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m)
tx := buf.BeginTransaction(2)
s.Len(tx.Batch, 1)
}
func (s *BufferSuiteTest) TestBufferBatchLenFill() {
buf := s.newTestBuffer(5)
defer buf.Close()
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m)
tx := buf.BeginTransaction(2)
s.Len(tx.Batch, 2)
}
func (s *BufferSuiteTest) TestBufferBatchLenExact() {
buf := s.newTestBuffer(5)
defer buf.Close()
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m)
tx := buf.BeginTransaction(2)
s.Len(tx.Batch, 2)
}
func (s *BufferSuiteTest) TestBufferBatchLenLargerThanBuffer() {
buf := s.newTestBuffer(5)
defer buf.Close()
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m, m, m)
tx := buf.BeginTransaction(6)
s.Len(tx.Batch, 5)
}
func (s *BufferSuiteTest) TestBufferBatchWrap() {
buf := s.newTestBuffer(5)
defer buf.Close()
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m, m, m)
tx := buf.BeginTransaction(2)
tx.AcceptAll()
buf.EndTransaction(tx)
buf.Add(m, m)
tx = buf.BeginTransaction(5)
s.Len(tx.Batch, 5)
}
func (s *BufferSuiteTest) TestBufferBatchLatest() {
buf := s.newTestBuffer(4)
defer buf.Close()
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
tx := buf.BeginTransaction(2)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)),
}, tx.Batch)
}
func (s *BufferSuiteTest) TestBufferBatchLatestWrap() {
if !s.hasMaxCapacity {
s.T().Skip("tested buffer does not have a maximum capacity")
}
buf := s.newTestBuffer(4)
defer buf.Close()
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
tx := buf.BeginTransaction(2)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)),
}, tx.Batch)
}
func (s *BufferSuiteTest) TestBufferMultipleBatch() {
buf := s.newTestBuffer(10)
defer buf.Close()
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)))
tx := buf.BeginTransaction(5)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)),
}, tx.Batch)
tx.AcceptAll()
buf.EndTransaction(tx)
tx = buf.BeginTransaction(5)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)),
}, tx.Batch)
tx.AcceptAll()
buf.EndTransaction(tx)
}
func (s *BufferSuiteTest) TestBufferRejectWithRoom() {
buf := s.newTestBuffer(5)
defer buf.Close()
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
tx := buf.BeginTransaction(2)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(int64(0), buf.Stats().MetricsDropped.Get())
tx = buf.BeginTransaction(5)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)),
}, tx.Batch)
}
func (s *BufferSuiteTest) TestBufferRejectNothingNewFull() {
buf := s.newTestBuffer(5)
defer buf.Close()
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
tx := buf.BeginTransaction(2)
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(int64(0), buf.Stats().MetricsDropped.Get())
tx = buf.BeginTransaction(5)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)),
}, tx.Batch)
}
func (s *BufferSuiteTest) TestBufferRejectNoRoom() {
if !s.hasMaxCapacity {
s.T().Skip("tested buffer does not have a maximum capacity")
}
buf := s.newTestBuffer(5)
defer buf.Close()
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
tx := buf.BeginTransaction(2)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0)))
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(int64(3), buf.Stats().MetricsDropped.Get())
tx = buf.BeginTransaction(5)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0)),
}, tx.Batch)
}
func (s *BufferSuiteTest) TestBufferRejectRoomExact() {
buf := s.newTestBuffer(5)
defer buf.Close()
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
tx := buf.BeginTransaction(2)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(int64(0), buf.Stats().MetricsDropped.Get())
tx = buf.BeginTransaction(5)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)),
}, tx.Batch)
}
func (s *BufferSuiteTest) TestBufferRejectRoomOverwriteOld() {
if !s.hasMaxCapacity {
s.T().Skip("tested buffer does not have a maximum capacity")
}
buf := s.newTestBuffer(5)
defer buf.Close()
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
tx := buf.BeginTransaction(1)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)))
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(int64(1), buf.Stats().MetricsDropped.Get())
tx = buf.BeginTransaction(5)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)),
}, tx.Batch)
}
func (s *BufferSuiteTest) TestBufferRejectPartialRoom() {
if !s.hasMaxCapacity {
s.T().Skip("tested buffer does not have a maximum capacity")
}
buf := s.newTestBuffer(5)
defer buf.Close()
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
tx := buf.BeginTransaction(2)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0)))
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(int64(2), buf.Stats().MetricsDropped.Get())
tx = buf.BeginTransaction(5)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0)),
}, tx.Batch)
}
func (s *BufferSuiteTest) TestBufferRejectNewMetricsWrapped() {
if !s.hasMaxCapacity {
s.T().Skip("tested buffer does not have a maximum capacity")
}
buf := s.newTestBuffer(5)
defer buf.Close()
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
tx := buf.BeginTransaction(2)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
// buffer: 1, 4, 5; batch: 2, 3
s.Equal(int64(0), buf.Stats().MetricsDropped.Get())
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(9, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(10, 0)))
// buffer: 8, 9, 10, 6, 7; batch: 2, 3
s.Equal(int64(3), buf.Stats().MetricsDropped.Get())
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(12, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(13, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(14, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(15, 0)))
// buffer: 13, 14, 15, 11, 12; batch: 2, 3
s.Equal(int64(8), buf.Stats().MetricsDropped.Get())
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(int64(10), buf.Stats().MetricsDropped.Get())
tx = buf.BeginTransaction(5)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(12, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(13, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(14, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(15, 0)),
}, tx.Batch)
}
func (s *BufferSuiteTest) TestBufferRejectWrapped() {
if !s.hasMaxCapacity {
s.T().Skip("tested buffer does not have a maximum capacity")
}
buf := s.newTestBuffer(5)
defer buf.Close()
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0)))
tx := buf.BeginTransaction(3)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(9, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(10, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(12, 0)))
tx.KeepAll()
buf.EndTransaction(tx)
tx = buf.BeginTransaction(5)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(9, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(10, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(12, 0)),
}, tx.Batch)
}
func (s *BufferSuiteTest) TestBufferRejectAdjustFirst() {
if !s.hasMaxCapacity {
s.T().Skip("tested buffer does not have a maximum capacity")
}
buf := s.newTestBuffer(10)
defer buf.Close()
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
tx := buf.BeginTransaction(3)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)))
tx.KeepAll()
buf.EndTransaction(tx)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(9, 0)))
tx = buf.BeginTransaction(3)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(10, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(12, 0)))
tx.KeepAll()
buf.EndTransaction(tx)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(13, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(14, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(15, 0)))
tx = buf.BeginTransaction(3)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(16, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(17, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(18, 0)))
tx.KeepAll()
buf.EndTransaction(tx)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(19, 0)))
tx = buf.BeginTransaction(10)
testutil.RequireMetricsEqual(s.T(),
[]telegraf.Metric{
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(10, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(12, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(13, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(14, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(15, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(16, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(17, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(18, 0)),
metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(19, 0)),
}, tx.Batch)
}
func (s *BufferSuiteTest) TestBufferAddDropsOverwrittenMetrics() {
if !s.hasMaxCapacity {
s.T().Skip("tested buffer does not have a maximum capacity")
}
buf := s.newTestBuffer(5)
defer buf.Close()
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m, m, m)
buf.Add(m, m, m, m, m)
s.Equal(int64(5), buf.Stats().MetricsDropped.Get())
s.Equal(int64(0), buf.Stats().MetricsWritten.Get())
}
func (s *BufferSuiteTest) TestBufferAcceptRemovesBatch() {
buf := s.newTestBuffer(5)
defer buf.Close()
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m)
tx := buf.BeginTransaction(2)
tx.AcceptAll()
buf.EndTransaction(tx)
s.Equal(1, buf.Len())
}
func (s *BufferSuiteTest) TestBufferRejectLeavesBatch() {
buf := s.newTestBuffer(5)
defer buf.Close()
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m)
tx := buf.BeginTransaction(2)
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(3, buf.Len())
}
func (s *BufferSuiteTest) TestBufferAcceptWritesOverwrittenBatch() {
buf := s.newTestBuffer(5)
defer buf.Close()
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m, m, m)
tx := buf.BeginTransaction(5)
buf.Add(m, m, m, m, m)
tx.AcceptAll()
buf.EndTransaction(tx)
s.Equal(int64(0), buf.Stats().MetricsDropped.Get())
s.Equal(int64(5), buf.Stats().MetricsWritten.Get())
}
func (s *BufferSuiteTest) TestBufferBatchRejectDropsOverwrittenBatch() {
if !s.hasMaxCapacity {
s.T().Skip("tested buffer does not have a maximum capacity")
}
buf := s.newTestBuffer(5)
defer buf.Close()
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m, m, m)
tx := buf.BeginTransaction(5)
buf.Add(m, m, m, m, m)
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(int64(5), buf.Stats().MetricsDropped.Get())
s.Equal(int64(0), buf.Stats().MetricsWritten.Get())
}
func (s *BufferSuiteTest) TestBufferMetricsOverwriteBatchAccept() {
buf := s.newTestBuffer(5)
defer buf.Close()
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m, m, m)
tx := buf.BeginTransaction(3)
buf.Add(m, m, m)
tx.AcceptAll()
buf.EndTransaction(tx)
s.Equal(int64(0), buf.Stats().MetricsDropped.Get(), "dropped")
s.Equal(int64(3), buf.Stats().MetricsWritten.Get(), "written")
}
func (s *BufferSuiteTest) TestBufferMetricsOverwriteBatchReject() {
if !s.hasMaxCapacity {
s.T().Skip("tested buffer does not have a maximum capacity")
}
buf := s.newTestBuffer(5)
defer buf.Close()
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m, m, m)
tx := buf.BeginTransaction(3)
buf.Add(m, m, m)
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(int64(3), buf.Stats().MetricsDropped.Get())
s.Equal(int64(0), buf.Stats().MetricsWritten.Get())
}
func (s *BufferSuiteTest) TestBufferMetricsBatchAcceptRemoved() {
if !s.hasMaxCapacity {
s.T().Skip("tested buffer does not have a maximum capacity")
}
buf := s.newTestBuffer(5)
defer buf.Close()
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m, m, m)
tx := buf.BeginTransaction(3)
buf.Add(m, m, m, m, m)
tx.AcceptAll()
buf.EndTransaction(tx)
s.Equal(int64(2), buf.Stats().MetricsDropped.Get())
s.Equal(int64(3), buf.Stats().MetricsWritten.Get())
}
func (s *BufferSuiteTest) TestBufferWrapWithBatch() {
if !s.hasMaxCapacity {
s.T().Skip("tested buffer does not have a maximum capacity")
}
buf := s.newTestBuffer(5)
defer buf.Close()
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m)
tx := buf.BeginTransaction(3)
buf.Add(m, m, m, m, m, m)
s.Equal(int64(1), buf.Stats().MetricsDropped.Get())
buf.EndTransaction(tx)
}
func (s *BufferSuiteTest) TestBufferBatchNotRemoved() {
buf := s.newTestBuffer(5)
defer buf.Close()
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m, m, m)
tx := buf.BeginTransaction(2)
s.Equal(5, buf.Len())
buf.EndTransaction(tx)
}
func (s *BufferSuiteTest) TestBufferBatchRejectAcceptNoop() {
buf := s.newTestBuffer(5)
defer buf.Close()
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m, m, m)
tx := buf.BeginTransaction(2)
tx.KeepAll()
buf.EndTransaction(tx)
tx.AcceptAll()
buf.EndTransaction(tx)
s.Equal(5, buf.Len())
}
func (s *BufferSuiteTest) TestBufferAddCallsMetricRejectWhenNoBatch() {
if !s.hasMaxCapacity {
s.T().Skip("tested buffer does not have a maximum capacity")
}
buf := s.newTestBuffer(5)
defer buf.Close()
var reject int
mm := &mockMetric{
Metric: metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)),
RejectF: func() {
reject++
},
}
buf.Add(mm, mm, mm, mm, mm)
buf.Add(mm, mm)
s.Equal(2, reject)
}
func (s *BufferSuiteTest) TestBufferAddCallsMetricRejectWhenNotInBatch() {
if !s.hasMaxCapacity {
s.T().Skip("tested buffer does not have a maximum capacity")
}
buf := s.newTestBuffer(5)
defer buf.Close()
var reject int
mm := &mockMetric{
Metric: metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)),
RejectF: func() {
reject++
},
}
buf.Add(mm, mm, mm, mm, mm)
tx := buf.BeginTransaction(2)
buf.Add(mm, mm, mm, mm)
s.Equal(2, reject)
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(4, reject)
}
func (s *BufferSuiteTest) TestBufferRejectCallsMetricRejectWithOverwritten() {
if !s.hasMaxCapacity {
s.T().Skip("tested buffer does not have a maximum capacity")
}
buf := s.newTestBuffer(5)
defer buf.Close()
var reject int
mm := &mockMetric{
Metric: metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)),
RejectF: func() {
reject++
},
}
buf.Add(mm, mm, mm, mm, mm)
tx := buf.BeginTransaction(5)
buf.Add(mm, mm)
s.Equal(0, reject)
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(2, reject)
}
func (s *BufferSuiteTest) TestBufferAddOverwriteAndReject() {
if !s.hasMaxCapacity {
s.T().Skip("tested buffer does not have a maximum capacity")
}
buf := s.newTestBuffer(5)
defer buf.Close()
var reject int
mm := &mockMetric{
Metric: metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)),
RejectF: func() {
reject++
},
}
buf.Add(mm, mm, mm, mm, mm)
tx := buf.BeginTransaction(5)
buf.Add(mm, mm, mm, mm, mm)
buf.Add(mm, mm, mm, mm, mm)
buf.Add(mm, mm, mm, mm, mm)
buf.Add(mm, mm, mm, mm, mm)
s.Equal(15, reject)
tx.KeepAll()
buf.EndTransaction(tx)
s.Equal(20, reject)
}
func (s *BufferSuiteTest) TestBufferAddOverwriteAndRejectOffset() {
if !s.hasMaxCapacity {
s.T().Skip("tested buffer does not have a maximum capacity")
}
buf := s.newTestBuffer(5)
defer buf.Close()
var reject int
var accept int
mm := &mockMetric{
Metric: metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)),
RejectF: func() {
reject++
},
AcceptF: func() {
accept++
},
}
buf.Add(mm, mm, mm)
buf.Add(mm, mm, mm, mm)
s.Equal(2, reject)
tx := buf.BeginTransaction(5)
buf.Add(mm, mm, mm, mm)
s.Equal(2, reject)
buf.Add(mm, mm, mm, mm)
s.Equal(5, reject)
buf.Add(mm, mm, mm, mm)
s.Equal(9, reject)
buf.Add(mm, mm, mm, mm)
s.Equal(13, reject)
tx.AcceptAll()
buf.EndTransaction(tx)
s.Equal(13, reject)
s.Equal(5, accept)
}
func (s *BufferSuiteTest) TestBufferRejectEmptyBatch() {
buf := s.newTestBuffer(5)
defer buf.Close()
tx := buf.BeginTransaction(2)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
tx.KeepAll()
buf.EndTransaction(tx)
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
tx = buf.BeginTransaction(2)
for _, m := range tx.Batch {
s.NotNil(m)
}
buf.EndTransaction(tx)
}
func (s *BufferSuiteTest) TestBufferFlushedPartial() {
buf := s.newTestBuffer(5)
defer buf.Close()
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
tx := buf.BeginTransaction(2)
s.Len(tx.Batch, 2)
tx.AcceptAll()
buf.EndTransaction(tx)
s.Equal(1, buf.Len())
}
func (s *BufferSuiteTest) TestBufferFlushedFull() {
buf := s.newTestBuffer(5)
defer buf.Close()
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
tx := buf.BeginTransaction(2)
s.Len(tx.Batch, 2)
tx.AcceptAll()
buf.EndTransaction(tx)
s.Equal(0, buf.Len())
}
func (s *BufferSuiteTest) TestPartialWriteBackToFront() {
buf := s.newTestBuffer(5)
defer buf.Close()
m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
buf.Add(m, m, m, m, m)
// Get a batch of all metrics but only reject the last one
tx := buf.BeginTransaction(5)
s.Len(tx.Batch, 5)
tx.Reject = []int{4}
buf.EndTransaction(tx)
s.Equal(4, buf.Len())
// Get the next batch which should miss the last metric
tx = buf.BeginTransaction(5)
s.Len(tx.Batch, 4)
tx.Accept = []int{3}
buf.EndTransaction(tx)
s.Equal(3, buf.Len())
// Now get the next batch and reject the remaining metrics
tx = buf.BeginTransaction(5)
s.Len(tx.Batch, 3)
tx.Accept = []int{0, 1, 2}
buf.EndTransaction(tx)
s.Equal(0, buf.Len())
s.Equal(int64(5), buf.Stats().MetricsAdded.Get(), "metrics added")
s.Equal(int64(4), buf.Stats().MetricsWritten.Get(), "metrics written")
s.Equal(int64(1), buf.Stats().MetricsRejected.Get(), "metrics rejected")
s.Equal(int64(0), buf.Stats().MetricsDropped.Get(), "metrics dropped")
}
type mockMetric struct {
telegraf.Metric
AcceptF func()
RejectF func()
DropF func()
}
func (m *mockMetric) Accept() {
if m.AcceptF != nil {
m.AcceptF()
}
}
func (m *mockMetric) Reject() {
if m.RejectF != nil {
m.RejectF()
}
}
func (m *mockMetric) Drop() {
if m.DropF != nil {
m.DropF()
}
}