1
0
Fork 0
telegraf/plugins/inputs/neoom_beaam/neoom_beaam.go
Daniel Baumann 4978089aab
Adding upstream version 1.34.4.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-24 07:26:29 +02:00

310 lines
7.5 KiB
Go

//go:generate ../../../tools/config_includer/generator
//go:generate ../../../tools/readme_config_includer/generator
package neoom_beaam
import (
"context"
_ "embed"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
chttp "github.com/influxdata/telegraf/plugins/common/http"
"github.com/influxdata/telegraf/plugins/inputs"
)
//go:embed sample.conf
var sampleConfig string
type NeoomBeaam struct {
Address string `toml:"address"`
Token config.Secret `toml:"token"`
RefreshConfig bool `toml:"refresh_configuration"`
Log telegraf.Logger `toml:"-"`
chttp.HTTPClientConfig
source string
config site
client *http.Client
}
func (*NeoomBeaam) SampleConfig() string {
return sampleConfig
}
func (n *NeoomBeaam) Init() error {
if n.Address == "" {
n.Address = "https://10.10.10.10"
}
n.Address = strings.TrimRight(n.Address, "/")
u, err := url.Parse(n.Address)
if err != nil {
return fmt.Errorf("parsing of address %q failed: %w", n.Address, err)
}
n.source = u.Hostname()
return nil
}
func (n *NeoomBeaam) Start(telegraf.Accumulator) error {
// Create the client
ctx := context.Background()
client, err := n.HTTPClientConfig.CreateClient(ctx, n.Log)
if err != nil {
return fmt.Errorf("creating client failed: %w", err)
}
n.client = client
// Initialize configuration
return n.updateConfiguration()
}
func (n *NeoomBeaam) Gather(acc telegraf.Accumulator) error {
// Refresh the config if requested
if n.RefreshConfig {
if err := n.updateConfiguration(); err != nil {
return err
}
}
// Query the energy flow
if err := n.queryEnergyFlow(acc); err != nil {
acc.AddError(fmt.Errorf("querying site state failed: %w", err))
}
// Query all known things
for _, thing := range n.config.Things {
if err := n.queryThing(acc, thing); err != nil {
acc.AddError(fmt.Errorf("querying thing %q (%s) failed: %w", thing.Name, thing.id, err))
}
}
return nil
}
func (n *NeoomBeaam) Stop() {
if n.client != nil {
n.client.CloseIdleConnections()
}
}
func (n *NeoomBeaam) updateConfiguration() error {
endpoint := n.Address + "/api/v1/site/configuration"
request, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
return fmt.Errorf("creating configuration request failed: %w", err)
}
if !n.Token.Empty() {
token, err := n.Token.Get()
if err != nil {
return fmt.Errorf("getting token failed: %w", err)
}
bearer := "Bearer " + strings.TrimSpace(token.String())
token.Destroy()
request.Header.Set("Authorization", bearer)
}
request.Header.Set("Accept", "application/json")
// Update the configuration
response, err := n.client.Do(request)
if err != nil {
return &internal.StartupError{
Err: fmt.Errorf("querying configuration failed: %w", err),
Retry: true,
}
}
defer response.Body.Close()
body, err := io.ReadAll(response.Body)
if err != nil {
return fmt.Errorf("reading body failed: %w", err)
}
if response.StatusCode != http.StatusOK {
return &internal.StartupError{
Err: fmt.Errorf("configuration query returned %q: %s", response.Status, string(body)),
Retry: 400 <= response.StatusCode && response.StatusCode <= 499,
}
}
if err := json.Unmarshal(body, &n.config); err != nil {
return fmt.Errorf("decoding configuration failed: %w", err)
}
for id, thing := range n.config.Things {
thing.id = id
n.config.Things[id] = thing
}
return nil
}
func (n *NeoomBeaam) queryEnergyFlow(acc telegraf.Accumulator) error {
// Create the request
endpoint := n.Address + "/api/v1/site/state"
request, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
return fmt.Errorf("creating request failed: %w", err)
}
if !n.Token.Empty() {
token, err := n.Token.Get()
if err != nil {
return fmt.Errorf("getting token failed: %w", err)
}
bearer := "Bearer " + strings.TrimSpace(token.String())
token.Destroy()
request.Header.Set("Authorization", bearer)
}
request.Header.Set("Accept", "application/json")
// Execute query
response, err := n.client.Do(request)
if err != nil {
return fmt.Errorf("querying failed: %w", err)
}
defer response.Body.Close()
// Handle response
body, err := io.ReadAll(response.Body)
if err != nil {
return fmt.Errorf("reading body failed: %w", err)
}
if response.StatusCode != http.StatusOK {
return fmt.Errorf("query returned %q: %s", response.Status, string(body))
}
// Decode the data and create metric
var data siteState
if err := json.Unmarshal(body, &data); err != nil {
return fmt.Errorf("decoding failed: %w", err)
}
for _, s := range data.EnergyFlow.States {
if s.Value == nil {
n.Log.Debugf("omitting data point %q (%s) due to 'null' value", s.Key, s.DataPointID)
continue
}
dp, ok := n.config.EnergyFlow.DataPoints[s.DataPointID]
if !ok {
n.Log.Errorf("no data point definition for ID %q", s.DataPointID)
continue
}
tags := map[string]string{
"source": n.source,
"datapoint": s.Key,
"unit": dp.Unit,
}
fields := map[string]interface{}{
"value": s.Value,
}
ts := time.Unix(0, int64(s.Timestamp*float64(time.Millisecond)))
acc.AddFields("neoom_beaam_energy_flow", fields, tags, ts)
}
return nil
}
func (n *NeoomBeaam) queryThing(acc telegraf.Accumulator, thing thingDefinition) error {
// Create the request
endpoint := n.Address + "/api/v1/things/" + thing.id + "/states"
request, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
return fmt.Errorf("creating request failed: %w", err)
}
if !n.Token.Empty() {
token, err := n.Token.Get()
if err != nil {
return fmt.Errorf("getting token failed: %w", err)
}
bearer := "Bearer " + strings.TrimSpace(token.String())
token.Destroy()
request.Header.Set("Authorization", bearer)
}
request.Header.Set("Accept", "application/json")
// Execute query
response, err := n.client.Do(request)
if err != nil {
return fmt.Errorf("querying failed: %w", err)
}
defer response.Body.Close()
// Handle response
body, err := io.ReadAll(response.Body)
if err != nil {
return fmt.Errorf("reading body failed: %w", err)
}
if response.StatusCode != http.StatusOK {
return fmt.Errorf("query returned %q: %s", response.Status, string(body))
}
// Decode the data and create metric
var data thingState
if err := json.Unmarshal(body, &data); err != nil {
return fmt.Errorf("decoding failed: %w", err)
}
for _, s := range data.States {
if s.Value == nil {
n.Log.Debugf("omitting data point %q (%s) due to 'null' value", s.Key, s.DataPointID)
continue
}
dp, ok := thing.DataPoints[s.DataPointID]
if !ok {
n.Log.Errorf("no data point definition for ID %q", s.DataPointID)
continue
}
tags := map[string]string{
"source": n.source,
"thing": thing.Name,
"datapoint": s.Key,
"unit": dp.Unit,
}
var fields map[string]interface{}
if elements, ok := s.Value.([]interface{}); ok {
fields = make(map[string]interface{}, len(elements))
for i, v := range elements {
fields["value_"+strconv.Itoa(i)] = v
}
} else {
fields = map[string]interface{}{
"value": s.Value,
}
}
ts := time.Unix(0, int64(s.Timestamp*float64(time.Millisecond)))
acc.AddFields("neoom_beaam_thing", fields, tags, ts)
}
return nil
}
// Register the plugin
func init() {
inputs.Add("neoom_beaam", func() telegraf.Input {
return &NeoomBeaam{
HTTPClientConfig: chttp.HTTPClientConfig{
Timeout: config.Duration(5 * time.Second),
ResponseHeaderTimeout: config.Duration(5 * time.Second),
},
}
})
}