1
0
Fork 0
telegraf/plugins/common/opcua/client.go

242 lines
6.3 KiB
Go
Raw Permalink Normal View History

package opcua
import (
"context"
"errors"
"fmt"
"log" //nolint:depguard // just for debug
"net/url"
"strconv"
"time"
"github.com/gopcua/opcua"
"github.com/gopcua/opcua/debug"
"github.com/gopcua/opcua/ua"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/choice"
)
type OpcUAWorkarounds struct {
AdditionalValidStatusCodes []string `toml:"additional_valid_status_codes"`
}
type ConnectionState opcua.ConnState
const (
Closed ConnectionState = ConnectionState(opcua.Closed)
Connected ConnectionState = ConnectionState(opcua.Connected)
Connecting ConnectionState = ConnectionState(opcua.Connecting)
Disconnected ConnectionState = ConnectionState(opcua.Disconnected)
Reconnecting ConnectionState = ConnectionState(opcua.Reconnecting)
)
func (c ConnectionState) String() string {
return opcua.ConnState(c).String()
}
type OpcUAClientConfig struct {
Endpoint string `toml:"endpoint"`
SecurityPolicy string `toml:"security_policy"`
SecurityMode string `toml:"security_mode"`
Certificate string `toml:"certificate"`
PrivateKey string `toml:"private_key"`
Username config.Secret `toml:"username"`
Password config.Secret `toml:"password"`
AuthMethod string `toml:"auth_method"`
ConnectTimeout config.Duration `toml:"connect_timeout"`
RequestTimeout config.Duration `toml:"request_timeout"`
ClientTrace bool `toml:"client_trace"`
OptionalFields []string `toml:"optional_fields"`
Workarounds OpcUAWorkarounds `toml:"workarounds"`
SessionTimeout config.Duration `toml:"session_timeout"`
}
func (o *OpcUAClientConfig) Validate() error {
if err := o.validateOptionalFields(); err != nil {
return fmt.Errorf("invalid 'optional_fields': %w", err)
}
return o.validateEndpoint()
}
func (o *OpcUAClientConfig) validateOptionalFields() error {
validFields := []string{"DataType"}
return choice.CheckSlice(o.OptionalFields, validFields)
}
func (o *OpcUAClientConfig) validateEndpoint() error {
if o.Endpoint == "" {
return errors.New("endpoint url is empty")
}
_, err := url.Parse(o.Endpoint)
if err != nil {
return errors.New("endpoint url is invalid")
}
switch o.SecurityPolicy {
case "None", "Basic128Rsa15", "Basic256", "Basic256Sha256", "auto":
default:
return fmt.Errorf("invalid security type %q in %q", o.SecurityPolicy, o.Endpoint)
}
switch o.SecurityMode {
case "None", "Sign", "SignAndEncrypt", "auto":
default:
return fmt.Errorf("invalid security type %q in %q", o.SecurityMode, o.Endpoint)
}
return nil
}
func (o *OpcUAClientConfig) CreateClient(telegrafLogger telegraf.Logger) (*OpcUAClient, error) {
err := o.Validate()
if err != nil {
return nil, err
}
if o.ClientTrace {
debug.Enable = true
debug.Logger = log.New(&DebugLogger{Log: telegrafLogger}, "", 0)
}
c := &OpcUAClient{
Config: o,
Log: telegrafLogger,
}
c.Log.Debug("Initialising OpcUAClient")
err = c.setupWorkarounds()
return c, err
}
type OpcUAClient struct {
Config *OpcUAClientConfig
Log telegraf.Logger
Client *opcua.Client
opts []opcua.Option
codes []ua.StatusCode
}
// / setupOptions read the endpoints from the specified server and setup all authentication
func (o *OpcUAClient) SetupOptions() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.Config.ConnectTimeout))
defer cancel()
// Get a list of the endpoints for our target server
endpoints, err := opcua.GetEndpoints(ctx, o.Config.Endpoint)
if err != nil {
return err
}
if o.Config.Certificate == "" && o.Config.PrivateKey == "" {
if o.Config.SecurityPolicy != "None" || o.Config.SecurityMode != "None" {
o.Log.Debug("Generating self-signed certificate")
cert, privateKey, err := generateCert("urn:telegraf:gopcua:client", 2048,
o.Config.Certificate, o.Config.PrivateKey, 365*24*time.Hour)
if err != nil {
return err
}
o.Config.Certificate = cert
o.Config.PrivateKey = privateKey
}
}
o.Log.Debug("Configuring OPC UA connection options")
o.opts, err = o.generateClientOpts(endpoints)
return err
}
func (o *OpcUAClient) setupWorkarounds() error {
o.codes = []ua.StatusCode{ua.StatusOK}
for _, c := range o.Config.Workarounds.AdditionalValidStatusCodes {
val, err := strconv.ParseUint(c, 0, 32) // setting 32 bits to allow for safe conversion
if err != nil {
return err
}
o.codes = append(o.codes, ua.StatusCode(val))
}
return nil
}
func (o *OpcUAClient) StatusCodeOK(code ua.StatusCode) bool {
for _, val := range o.codes {
if val == code {
return true
}
}
return false
}
// Connect to an OPC UA device
func (o *OpcUAClient) Connect(ctx context.Context) error {
o.Log.Debug("Connecting OPC UA Client to server")
u, err := url.Parse(o.Config.Endpoint)
if err != nil {
return err
}
switch u.Scheme {
case "opc.tcp":
if err := o.SetupOptions(); err != nil {
return err
}
if o.Client != nil {
o.Log.Warnf("Closing connection to %q as already connected", u)
if err := o.Client.Close(ctx); err != nil {
// Only log the error but to not bail-out here as this prevents
// reconnections for multiple parties (see e.g. #9523).
o.Log.Errorf("Closing connection failed: %v", err)
}
}
o.Client, err = opcua.NewClient(o.Config.Endpoint, o.opts...)
if err != nil {
return fmt.Errorf("error in new client: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.Config.ConnectTimeout))
defer cancel()
if err := o.Client.Connect(ctx); err != nil {
return fmt.Errorf("error in Client Connection: %w", err)
}
o.Log.Debug("Connected to OPC UA Server")
default:
return fmt.Errorf("unsupported scheme %q in endpoint. Expected opc.tcp", u.Scheme)
}
return nil
}
func (o *OpcUAClient) Disconnect(ctx context.Context) error {
o.Log.Debug("Disconnecting from OPC UA Server")
u, err := url.Parse(o.Config.Endpoint)
if err != nil {
return err
}
switch u.Scheme {
case "opc.tcp":
// We can't do anything about failing to close a connection
err := o.Client.Close(ctx)
o.Client = nil
return err
default:
return errors.New("invalid controller")
}
}
func (o *OpcUAClient) State() ConnectionState {
if o.Client == nil {
return Disconnected
}
return ConnectionState(o.Client.State())
}