1
0
Fork 0
telegraf/plugins/common/shim/goshim.go

146 lines
3 KiB
Go
Raw Permalink Normal View History

package shim
import (
"context"
"errors"
"fmt"
"io"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/logger"
"github.com/influxdata/telegraf/plugins/serializers/influx"
)
type empty struct{}
var (
forever = 100 * 365 * 24 * time.Hour
envVarEscaper = strings.NewReplacer(
`"`, `\"`,
`\`, `\\`,
)
)
const (
// PollIntervalDisabled is used to indicate that you want to disable polling,
// as opposed to duration 0 meaning poll constantly.
PollIntervalDisabled = time.Duration(0)
)
// Shim allows you to wrap your inputs and run them as if they were part of Telegraf,
// except built externally.
type Shim struct {
Input telegraf.Input
Processor telegraf.StreamingProcessor
Output telegraf.Output
log telegraf.Logger
// streams
stdin io.Reader
stdout io.Writer
stderr io.Writer
// outgoing metric channel
metricCh chan telegraf.Metric
// input only
gatherPromptCh chan empty
}
// New creates a new shim interface
func New() *Shim {
return &Shim{
metricCh: make(chan telegraf.Metric, 1),
stdin: os.Stdin,
stdout: os.Stdout,
stderr: os.Stderr,
log: logger.New("", "", ""),
}
}
func (*Shim) watchForShutdown(cancel context.CancelFunc) {
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-quit // user-triggered quit
// cancel, but keep looping until the metric channel closes.
cancel()
}()
}
// Run the input plugins..
func (s *Shim) Run(pollInterval time.Duration) error {
if s.Input != nil {
err := s.RunInput(pollInterval)
if err != nil {
return fmt.Errorf("running input failed: %w", err)
}
} else if s.Processor != nil {
err := s.RunProcessor()
if err != nil {
return fmt.Errorf("running processor failed: %w", err)
}
} else if s.Output != nil {
err := s.RunOutput()
if err != nil {
return fmt.Errorf("running output failed: %w", err)
}
} else {
return errors.New("nothing to run")
}
return nil
}
func hasQuit(ctx context.Context) bool {
return ctx.Err() != nil
}
func (s *Shim) writeProcessedMetrics() error {
serializer := &influx.Serializer{}
if err := serializer.Init(); err != nil {
return fmt.Errorf("creating serializer failed: %w", err)
}
for { //nolint:staticcheck // for-select used on purpose
select {
case m, open := <-s.metricCh:
if !open {
return nil
}
b, err := serializer.Serialize(m)
if err != nil {
m.Reject()
return fmt.Errorf("failed to serialize metric: %w", err)
}
// Write this to stdout
_, err = fmt.Fprint(s.stdout, string(b))
if err != nil {
m.Drop()
return fmt.Errorf("failed to write metric: %w", err)
}
m.Accept()
}
}
}
// LogName satisfies the MetricMaker interface
func (*Shim) LogName() string {
return ""
}
// MakeMetric satisfies the MetricMaker interface
func (*Shim) MakeMetric(m telegraf.Metric) telegraf.Metric {
return m // don't need to do anything to it.
}
// Log satisfies the MetricMaker interface
func (s *Shim) Log() telegraf.Logger {
return s.log
}