145 lines
3 KiB
Go
145 lines
3 KiB
Go
package shim
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"os/signal"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/logger"
|
|
"github.com/influxdata/telegraf/plugins/serializers/influx"
|
|
)
|
|
|
|
type empty struct{}
|
|
|
|
var (
|
|
forever = 100 * 365 * 24 * time.Hour
|
|
envVarEscaper = strings.NewReplacer(
|
|
`"`, `\"`,
|
|
`\`, `\\`,
|
|
)
|
|
)
|
|
|
|
const (
|
|
// PollIntervalDisabled is used to indicate that you want to disable polling,
|
|
// as opposed to duration 0 meaning poll constantly.
|
|
PollIntervalDisabled = time.Duration(0)
|
|
)
|
|
|
|
// Shim allows you to wrap your inputs and run them as if they were part of Telegraf,
|
|
// except built externally.
|
|
type Shim struct {
|
|
Input telegraf.Input
|
|
Processor telegraf.StreamingProcessor
|
|
Output telegraf.Output
|
|
|
|
log telegraf.Logger
|
|
|
|
// streams
|
|
stdin io.Reader
|
|
stdout io.Writer
|
|
stderr io.Writer
|
|
|
|
// outgoing metric channel
|
|
metricCh chan telegraf.Metric
|
|
|
|
// input only
|
|
gatherPromptCh chan empty
|
|
}
|
|
|
|
// New creates a new shim interface
|
|
func New() *Shim {
|
|
return &Shim{
|
|
metricCh: make(chan telegraf.Metric, 1),
|
|
stdin: os.Stdin,
|
|
stdout: os.Stdout,
|
|
stderr: os.Stderr,
|
|
log: logger.New("", "", ""),
|
|
}
|
|
}
|
|
|
|
func (*Shim) watchForShutdown(cancel context.CancelFunc) {
|
|
quit := make(chan os.Signal, 1)
|
|
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
|
|
go func() {
|
|
<-quit // user-triggered quit
|
|
// cancel, but keep looping until the metric channel closes.
|
|
cancel()
|
|
}()
|
|
}
|
|
|
|
// Run the input plugins..
|
|
func (s *Shim) Run(pollInterval time.Duration) error {
|
|
if s.Input != nil {
|
|
err := s.RunInput(pollInterval)
|
|
if err != nil {
|
|
return fmt.Errorf("running input failed: %w", err)
|
|
}
|
|
} else if s.Processor != nil {
|
|
err := s.RunProcessor()
|
|
if err != nil {
|
|
return fmt.Errorf("running processor failed: %w", err)
|
|
}
|
|
} else if s.Output != nil {
|
|
err := s.RunOutput()
|
|
if err != nil {
|
|
return fmt.Errorf("running output failed: %w", err)
|
|
}
|
|
} else {
|
|
return errors.New("nothing to run")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func hasQuit(ctx context.Context) bool {
|
|
return ctx.Err() != nil
|
|
}
|
|
|
|
func (s *Shim) writeProcessedMetrics() error {
|
|
serializer := &influx.Serializer{}
|
|
if err := serializer.Init(); err != nil {
|
|
return fmt.Errorf("creating serializer failed: %w", err)
|
|
}
|
|
for { //nolint:staticcheck // for-select used on purpose
|
|
select {
|
|
case m, open := <-s.metricCh:
|
|
if !open {
|
|
return nil
|
|
}
|
|
b, err := serializer.Serialize(m)
|
|
if err != nil {
|
|
m.Reject()
|
|
return fmt.Errorf("failed to serialize metric: %w", err)
|
|
}
|
|
// Write this to stdout
|
|
_, err = fmt.Fprint(s.stdout, string(b))
|
|
if err != nil {
|
|
m.Drop()
|
|
return fmt.Errorf("failed to write metric: %w", err)
|
|
}
|
|
m.Accept()
|
|
}
|
|
}
|
|
}
|
|
|
|
// LogName satisfies the MetricMaker interface
|
|
func (*Shim) LogName() string {
|
|
return ""
|
|
}
|
|
|
|
// MakeMetric satisfies the MetricMaker interface
|
|
func (*Shim) MakeMetric(m telegraf.Metric) telegraf.Metric {
|
|
return m // don't need to do anything to it.
|
|
}
|
|
|
|
// Log satisfies the MetricMaker interface
|
|
func (s *Shim) Log() telegraf.Logger {
|
|
return s.log
|
|
}
|