431 lines
11 KiB
Go
431 lines
11 KiB
Go
//go:generate ../../../tools/readme_config_includer/generator
|
|
package s7comm
|
|
|
|
import (
|
|
_ "embed"
|
|
"errors"
|
|
"fmt"
|
|
"hash/maphash"
|
|
"log" //nolint:depguard // Required for tracing connection issues
|
|
"net"
|
|
"regexp"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/robinson/gos7"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/config"
|
|
"github.com/influxdata/telegraf/internal"
|
|
"github.com/influxdata/telegraf/metric"
|
|
"github.com/influxdata/telegraf/plugins/inputs"
|
|
)
|
|
|
|
//go:embed sample.conf
|
|
var sampleConfig string
|
|
|
|
var (
|
|
regexAddr = regexp.MustCompile(addressRegexp)
|
|
// Area mapping taken from https://github.com/robinson/gos7/blob/master/client.go
|
|
areaMap = map[string]int{
|
|
"PE": 0x81, // process inputs
|
|
"PA": 0x82, // process outputs
|
|
"MK": 0x83, // Merkers
|
|
"DB": 0x84, // DB
|
|
"C": 0x1C, // counters
|
|
"T": 0x1D, // timers
|
|
}
|
|
// Word-length mapping taken from https://github.com/robinson/gos7/blob/master/client.go
|
|
wordLenMap = map[string]int{
|
|
"X": 0x01, // Bit
|
|
"B": 0x02, // Byte (8 bit)
|
|
"C": 0x03, // Char (8 bit)
|
|
"S": 0x03, // String (8 bit)
|
|
"W": 0x04, // Word (16 bit)
|
|
"I": 0x05, // Integer (16 bit)
|
|
"DW": 0x06, // Double Word (32 bit)
|
|
"DI": 0x07, // Double integer (32 bit)
|
|
"R": 0x08, // IEEE 754 real (32 bit)
|
|
// see https://support.industry.siemens.com/cs/document/36479/date_and_time-format-for-s7-?dti=0&lc=en-DE
|
|
"DT": 0x0F, // Date and time (7 byte)
|
|
}
|
|
|
|
connectionTypeMap = map[string]int{
|
|
"PD": 1,
|
|
"OP": 2,
|
|
"basic": 3,
|
|
}
|
|
)
|
|
|
|
const addressRegexp = `^(?P<area>[A-Z]+)(?P<no>[0-9]+)\.(?P<type>[A-Z]+)(?P<start>[0-9]+)(?:\.(?P<extra>.*))?$`
|
|
|
|
type S7comm struct {
|
|
Server string `toml:"server"`
|
|
Rack int `toml:"rack"`
|
|
Slot int `toml:"slot"`
|
|
ConnectionType string `toml:"connection_type"`
|
|
BatchMaxSize int `toml:"pdu_size"`
|
|
Timeout config.Duration `toml:"timeout"`
|
|
DebugConnection bool `toml:"debug_connection" deprecated:"1.35.0;use 'log_level' 'trace' instead"`
|
|
Configs []metricDefinition `toml:"metric"`
|
|
Log telegraf.Logger `toml:"-"`
|
|
|
|
handler *gos7.TCPClientHandler
|
|
client gos7.Client
|
|
batches []batch
|
|
}
|
|
|
|
type metricDefinition struct {
|
|
Name string `toml:"name"`
|
|
Fields []metricFieldDefinition `toml:"fields"`
|
|
Tags map[string]string `toml:"tags"`
|
|
}
|
|
|
|
type metricFieldDefinition struct {
|
|
Name string `toml:"name"`
|
|
Address string `toml:"address"`
|
|
}
|
|
|
|
type batch struct {
|
|
items []gos7.S7DataItem
|
|
mappings []fieldMapping
|
|
}
|
|
|
|
type fieldMapping struct {
|
|
measurement string
|
|
field string
|
|
tags map[string]string
|
|
convert converterFunc
|
|
}
|
|
|
|
type converterFunc func([]byte) interface{}
|
|
|
|
func (*S7comm) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
func (s *S7comm) Init() error {
|
|
// Check settings
|
|
if s.Server == "" {
|
|
return errors.New("'server' has to be specified")
|
|
}
|
|
if s.Rack < 0 {
|
|
return errors.New("'rack' has to be specified")
|
|
}
|
|
if s.Slot < 0 {
|
|
return errors.New("'slot' has to be specified")
|
|
}
|
|
if s.ConnectionType == "" {
|
|
s.ConnectionType = "PD"
|
|
}
|
|
if _, found := connectionTypeMap[s.ConnectionType]; !found {
|
|
return fmt.Errorf("invalid 'connection_type' %q", s.ConnectionType)
|
|
}
|
|
if len(s.Configs) == 0 {
|
|
return errors.New("no metric defined")
|
|
}
|
|
|
|
// Set default port to 102 if none is given
|
|
var nerr *net.AddrError
|
|
if _, _, err := net.SplitHostPort(s.Server); errors.As(err, &nerr) {
|
|
if !strings.Contains(nerr.Err, "missing port") {
|
|
return errors.New("invalid 'server' address")
|
|
}
|
|
s.Server += ":102"
|
|
}
|
|
|
|
// Create handler for the connection
|
|
s.handler = gos7.NewTCPClientHandlerWithConnectType(s.Server, s.Rack, s.Slot, connectionTypeMap[s.ConnectionType])
|
|
s.handler.Timeout = time.Duration(s.Timeout)
|
|
if s.Log.Level().Includes(telegraf.Trace) || s.DebugConnection { // for backward compatibility
|
|
s.handler.Logger = log.New(&tracelogger{log: s.Log}, "", 0)
|
|
}
|
|
|
|
// Create the requests
|
|
return s.createRequests()
|
|
}
|
|
|
|
func (s *S7comm) Start(telegraf.Accumulator) error {
|
|
s.Log.Debugf("Connecting to %q...", s.Server)
|
|
if err := s.handler.Connect(); err != nil {
|
|
return &internal.StartupError{
|
|
Err: fmt.Errorf("connecting to %q failed: %w", s.Server, err),
|
|
Retry: true,
|
|
}
|
|
}
|
|
s.client = gos7.NewClient(s.handler)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *S7comm) Gather(acc telegraf.Accumulator) error {
|
|
timestamp := time.Now()
|
|
grouper := metric.NewSeriesGrouper()
|
|
|
|
for i, b := range s.batches {
|
|
// Read the batch
|
|
s.Log.Debugf("Reading batch %d...", i+1)
|
|
if err := s.client.AGReadMulti(b.items, len(b.items)); err != nil {
|
|
// Try to reconnect and skip this gather cycle to avoid hammering
|
|
// the network if the server is down or under load.
|
|
s.Log.Errorf("reading batch %d failed: %v; reconnecting...", i+1, err)
|
|
s.Stop()
|
|
return s.Start(acc)
|
|
}
|
|
|
|
// Dissect the received data into fields
|
|
for j, m := range b.mappings {
|
|
// Convert the data
|
|
buf := b.items[j].Data
|
|
value := m.convert(buf)
|
|
s.Log.Debugf(" got %v for field %q @ %d --> %v (%T)", buf, m.field, b.items[j].Start, value, value)
|
|
|
|
// Group the data by series
|
|
grouper.Add(m.measurement, m.tags, timestamp, m.field, value)
|
|
}
|
|
}
|
|
|
|
// Add the metrics grouped by series to the accumulator
|
|
for _, x := range grouper.Metrics() {
|
|
acc.AddMetric(x)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *S7comm) Stop() {
|
|
if s.handler != nil {
|
|
s.Log.Debugf("Disconnecting from %q...", s.handler.Address)
|
|
s.handler.Close()
|
|
}
|
|
}
|
|
|
|
func (s *S7comm) createRequests() error {
|
|
seed := maphash.MakeSeed()
|
|
seenFields := make(map[uint64]bool)
|
|
s.batches = make([]batch, 0)
|
|
|
|
current := batch{}
|
|
for i, cfg := range s.Configs {
|
|
// Set the defaults
|
|
if cfg.Name == "" {
|
|
cfg.Name = "s7comm"
|
|
}
|
|
|
|
// Check the metric definitions
|
|
if len(cfg.Fields) == 0 {
|
|
return fmt.Errorf("no fields defined for metric %q", cfg.Name)
|
|
}
|
|
|
|
// Create requests for all fields and add it to the current slot
|
|
for _, f := range cfg.Fields {
|
|
if f.Name == "" {
|
|
return fmt.Errorf("unnamed field in metric %q", cfg.Name)
|
|
}
|
|
|
|
item, cfunc, err := handleFieldAddress(f.Address)
|
|
if err != nil {
|
|
return fmt.Errorf("field %q of metric %q: %w", f.Name, cfg.Name, err)
|
|
}
|
|
m := fieldMapping{
|
|
measurement: cfg.Name,
|
|
field: f.Name,
|
|
tags: s.Configs[i].Tags,
|
|
convert: cfunc,
|
|
}
|
|
current.items = append(current.items, *item)
|
|
current.mappings = append(current.mappings, m)
|
|
|
|
// If the batch is full, start a new one
|
|
if len(current.items) == s.BatchMaxSize {
|
|
s.batches = append(s.batches, current)
|
|
current = batch{}
|
|
}
|
|
|
|
// Check for duplicate field definitions
|
|
id := fieldID(seed, cfg, f)
|
|
if seenFields[id] {
|
|
return fmt.Errorf("duplicate field definition field %q in metric %q", f.Name, cfg.Name)
|
|
}
|
|
seenFields[id] = true
|
|
}
|
|
|
|
// Update the configuration if changed
|
|
s.Configs[i] = cfg
|
|
}
|
|
|
|
// Add the last batch if any
|
|
if len(current.items) > 0 {
|
|
s.batches = append(s.batches, current)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func handleFieldAddress(address string) (*gos7.S7DataItem, converterFunc, error) {
|
|
// Parse the address into the different parts
|
|
if !regexAddr.MatchString(address) {
|
|
return nil, nil, fmt.Errorf("invalid address %q", address)
|
|
}
|
|
names := regexAddr.SubexpNames()[1:]
|
|
parts := regexAddr.FindStringSubmatch(address)[1:]
|
|
if len(names) != len(parts) {
|
|
return nil, nil, fmt.Errorf("names %v do not match parts %v", names, parts)
|
|
}
|
|
groups := make(map[string]string, len(names))
|
|
for i, n := range names {
|
|
groups[n] = parts[i]
|
|
}
|
|
|
|
// Check that we do have the required entries in the address
|
|
if _, found := groups["area"]; !found {
|
|
return nil, nil, errors.New("area is missing from address")
|
|
}
|
|
|
|
if _, found := groups["no"]; !found {
|
|
return nil, nil, errors.New("area index is missing from address")
|
|
}
|
|
if _, found := groups["type"]; !found {
|
|
return nil, nil, errors.New("type is missing from address")
|
|
}
|
|
if _, found := groups["start"]; !found {
|
|
return nil, nil, errors.New("start address is missing from address")
|
|
}
|
|
dtype := groups["type"]
|
|
|
|
// Lookup the item values from names and check the params
|
|
area, found := areaMap[groups["area"]]
|
|
if !found {
|
|
return nil, nil, errors.New("invalid area")
|
|
}
|
|
wordlen, found := wordLenMap[dtype]
|
|
if !found {
|
|
return nil, nil, errors.New("unknown data type")
|
|
}
|
|
areaidx, err := strconv.Atoi(groups["no"])
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("invalid area index: %w", err)
|
|
}
|
|
start, err := strconv.Atoi(groups["start"])
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("invalid start address: %w", err)
|
|
}
|
|
|
|
// Check the amount parameter if any
|
|
var extra, bit int
|
|
switch dtype {
|
|
case "S":
|
|
// We require an extra parameter
|
|
x := groups["extra"]
|
|
if x == "" {
|
|
return nil, nil, errors.New("extra parameter required")
|
|
}
|
|
|
|
extra, err = strconv.Atoi(x)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("invalid extra parameter: %w", err)
|
|
}
|
|
if extra < 1 {
|
|
return nil, nil, fmt.Errorf("invalid extra parameter %d", extra)
|
|
}
|
|
case "X":
|
|
// We require an extra parameter
|
|
x := groups["extra"]
|
|
if x == "" {
|
|
return nil, nil, errors.New("extra parameter required")
|
|
}
|
|
|
|
bit, err = strconv.Atoi(x)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("invalid extra parameter: %w", err)
|
|
}
|
|
if bit < 0 || bit > 7 {
|
|
// Ensure bit address is valid
|
|
return nil, nil, fmt.Errorf("invalid extra parameter: bit address %d out of range", bit)
|
|
}
|
|
default:
|
|
if groups["extra"] != "" {
|
|
return nil, nil, errors.New("extra parameter specified but not used")
|
|
}
|
|
}
|
|
|
|
// Get the required buffer size
|
|
amount := 1
|
|
var buflen int
|
|
switch dtype {
|
|
case "X", "B", "C": // 8-bit types
|
|
buflen = 1
|
|
case "W", "I": // 16-bit types
|
|
buflen = 2
|
|
case "DW", "DI", "R": // 32-bit types
|
|
buflen = 4
|
|
case "DT": // 7-byte
|
|
buflen = 7
|
|
case "S":
|
|
amount = extra
|
|
// Extra bytes as the first byte is the max-length of the string and
|
|
// the second byte is the actual length of the string.
|
|
buflen = extra + 2
|
|
default:
|
|
return nil, nil, errors.New("invalid data type")
|
|
}
|
|
|
|
// Setup the data item
|
|
item := &gos7.S7DataItem{
|
|
Area: area,
|
|
WordLen: wordlen,
|
|
Bit: bit,
|
|
DBNumber: areaidx,
|
|
Start: start,
|
|
Amount: amount,
|
|
Data: make([]byte, buflen),
|
|
}
|
|
|
|
// Determine the type converter function
|
|
f := determineConversion(dtype)
|
|
return item, f, nil
|
|
}
|
|
|
|
func fieldID(seed maphash.Seed, def metricDefinition, field metricFieldDefinition) uint64 {
|
|
var mh maphash.Hash
|
|
mh.SetSeed(seed)
|
|
|
|
mh.WriteString(def.Name)
|
|
mh.WriteByte(0)
|
|
mh.WriteString(field.Name)
|
|
mh.WriteByte(0)
|
|
|
|
// Tags
|
|
for k, v := range def.Tags {
|
|
mh.WriteString(k)
|
|
mh.WriteByte('=')
|
|
mh.WriteString(v)
|
|
mh.WriteByte(':')
|
|
}
|
|
mh.WriteByte(0)
|
|
|
|
return mh.Sum64()
|
|
}
|
|
|
|
// Logger for tracing internal messages
|
|
type tracelogger struct {
|
|
log telegraf.Logger
|
|
}
|
|
|
|
func (l *tracelogger) Write(b []byte) (n int, err error) {
|
|
l.log.Trace(string(b))
|
|
return len(b), nil
|
|
}
|
|
|
|
// Add this plugin to telegraf
|
|
func init() {
|
|
inputs.Add("s7comm", func() telegraf.Input {
|
|
return &S7comm{
|
|
Rack: -1,
|
|
Slot: -1,
|
|
BatchMaxSize: 20,
|
|
Timeout: config.Duration(10 * time.Second),
|
|
}
|
|
})
|
|
}
|