1
0
Fork 0
telegraf/plugins/common/parallel/ordered.go

85 lines
1.7 KiB
Go
Raw Permalink Normal View History

package parallel
import (
"sync"
"github.com/influxdata/telegraf"
)
type Ordered struct {
wg sync.WaitGroup
fn func(telegraf.Metric) []telegraf.Metric
// queue of jobs coming in. Workers pick jobs off this queue for processing
workerQueue chan job
// queue of ordered metrics going out
queue chan futureMetric
}
func NewOrdered(acc telegraf.Accumulator, fn func(telegraf.Metric) []telegraf.Metric, orderedQueueSize, workerCount int) *Ordered {
p := &Ordered{
fn: fn,
workerQueue: make(chan job, workerCount),
queue: make(chan futureMetric, orderedQueueSize),
}
p.startWorkers(workerCount)
p.wg.Add(1)
go func() {
p.readQueue(acc)
p.wg.Done()
}()
return p
}
func (p *Ordered) Enqueue(metric telegraf.Metric) {
future := make(futureMetric)
p.queue <- future
// write the future to the worker pool. Order doesn't matter now because the
// outgoing p.queue will enforce order regardless of the order the jobs are
// completed in
p.workerQueue <- job{
future: future,
metric: metric,
}
}
func (p *Ordered) readQueue(acc telegraf.Accumulator) {
// wait for the response from each worker in order
for mCh := range p.queue {
// allow each worker to write out multiple metrics
for metrics := range mCh {
for _, m := range metrics {
acc.AddMetric(m)
}
}
}
}
func (p *Ordered) startWorkers(count int) {
p.wg.Add(count)
for i := 0; i < count; i++ {
go func() {
for job := range p.workerQueue {
job.future <- p.fn(job.metric)
close(job.future)
}
p.wg.Done()
}()
}
}
func (p *Ordered) Stop() {
close(p.queue)
close(p.workerQueue)
p.wg.Wait()
}
type futureMetric chan []telegraf.Metric
type job struct {
future futureMetric
metric telegraf.Metric
}