package opcua import ( "fmt" "testing" "time" "github.com/docker/go-connections/nat" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go/wait" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/common/opcua" "github.com/influxdata/telegraf/plugins/common/opcua/input" "github.com/influxdata/telegraf/testutil" ) const servicePort = "4840" type opcTags struct { name string namespace string identifierType string identifier string want interface{} } func mapOPCTag(tags opcTags) (out input.NodeSettings) { out.FieldName = tags.name out.Namespace = tags.namespace out.IdentifierType = tags.identifierType out.Identifier = tags.identifier return out } func TestGetDataBadNodeContainerIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } container := testutil.Container{ Image: "open62541/open62541", ExposedPorts: []string{servicePort}, WaitingFor: wait.ForAll( wait.ForListeningPort(nat.Port(servicePort)), wait.ForLog("TCP network layer listening on opc.tcp://"), ), } err := container.Start() require.NoError(t, err, "failed to start container") defer container.Terminate() testopctags := []opcTags{ {"ProductName", "1", "i", "2261", "open62541 OPC UA Server"}, {"ProductUri", "0", "i", "2262", "http://open62541.org"}, {"ManufacturerName", "0", "i", "2263", "open62541"}, } readConfig := readClientConfig{ InputClientConfig: input.InputClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{ Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]), SecurityPolicy: "None", SecurityMode: "None", AuthMethod: "Anonymous", ConnectTimeout: config.Duration(10 * time.Second), RequestTimeout: config.Duration(1 * time.Second), Workarounds: opcua.OpcUAWorkarounds{}, }, MetricName: "testing", RootNodes: make([]input.NodeSettings, 0), Groups: make([]input.NodeGroupSettings, 0), }, } g := input.NodeGroupSettings{ MetricName: "anodic_current", TagsSlice: [][]string{ {"pot", "2002"}, }, } for _, tags := range testopctags { g.Nodes = append(g.Nodes, mapOPCTag(tags)) } readConfig.Groups = append(readConfig.Groups, g) logger := &testutil.CaptureLogger{} readClient, err := readConfig.createReadClient(logger) require.NoError(t, err) err = readClient.connect() require.NoError(t, err) } func TestReadClientIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } container := testutil.Container{ Image: "open62541/open62541", ExposedPorts: []string{servicePort}, WaitingFor: wait.ForAll( wait.ForListeningPort(nat.Port(servicePort)), wait.ForLog("TCP network layer listening on opc.tcp://"), ), } err := container.Start() require.NoError(t, err, "failed to start container") defer container.Terminate() testopctags := []opcTags{ {"ProductName", "0", "i", "2261", "open62541 OPC UA Server"}, {"ProductUri", "0", "i", "2262", "http://open62541.org"}, {"ManufacturerName", "0", "i", "2263", "open62541"}, {"badnode", "1", "i", "1337", nil}, {"goodnode", "1", "s", "the.answer", int32(42)}, {"DateTime", "1", "i", "51037", "0001-01-01T00:00:00Z"}, } readConfig := readClientConfig{ InputClientConfig: input.InputClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{ Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]), SecurityPolicy: "None", SecurityMode: "None", AuthMethod: "Anonymous", ConnectTimeout: config.Duration(10 * time.Second), RequestTimeout: config.Duration(1 * time.Second), Workarounds: opcua.OpcUAWorkarounds{}, }, MetricName: "testing", RootNodes: make([]input.NodeSettings, 0), Groups: make([]input.NodeGroupSettings, 0), }, } for _, tags := range testopctags { readConfig.RootNodes = append(readConfig.RootNodes, mapOPCTag(tags)) } client, err := readConfig.createReadClient(testutil.Logger{}) require.NoError(t, err) err = client.connect() require.NoError(t, err) for i, v := range client.LastReceivedData { require.Equal(t, testopctags[i].want, v.Value) } } func TestReadClientIntegrationAdditionalFields(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } container := testutil.Container{ Image: "open62541/open62541", ExposedPorts: []string{servicePort}, WaitingFor: wait.ForAll( wait.ForListeningPort(nat.Port(servicePort)), wait.ForLog("TCP network layer listening on opc.tcp://"), ), } require.NoError(t, container.Start(), "failed to start container") defer container.Terminate() testopctags := []opcTags{ {"ProductName", "0", "i", "2261", "open62541 OPC UA Server"}, {"ProductUri", "0", "i", "2262", "http://open62541.org"}, {"ManufacturerName", "0", "i", "2263", "open62541"}, {"badnode", "1", "i", "1337", nil}, {"goodnode", "1", "s", "the.answer", int32(42)}, {"DateTime", "1", "i", "51037", "0001-01-01T00:00:00Z"}, } testopctypes := []string{ "String", "String", "String", "Null", "Int32", "DateTime", } testopcquality := []string{ "The operation succeeded. StatusGood (0x0)", "The operation succeeded. StatusGood (0x0)", "The operation succeeded. StatusGood (0x0)", "User does not have permission to perform the requested operation. StatusBadUserAccessDenied (0x801F0000)", "The operation succeeded. StatusGood (0x0)", "The operation succeeded. StatusGood (0x0)", } expectedopcmetrics := make([]telegraf.Metric, 0, len(testopctags)) for i, x := range testopctags { now := time.Now() tags := map[string]string{ "id": fmt.Sprintf("ns=%s;%s=%s", x.namespace, x.identifierType, x.identifier), } fields := map[string]interface{}{ x.name: x.want, "Quality": testopcquality[i], "DataType": testopctypes[i], } expectedopcmetrics = append(expectedopcmetrics, metric.New("testing", tags, fields, now)) } readConfig := readClientConfig{ InputClientConfig: input.InputClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{ Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]), SecurityPolicy: "None", SecurityMode: "None", AuthMethod: "Anonymous", ConnectTimeout: config.Duration(10 * time.Second), RequestTimeout: config.Duration(1 * time.Second), Workarounds: opcua.OpcUAWorkarounds{}, OptionalFields: []string{"DataType"}, }, MetricName: "testing", RootNodes: make([]input.NodeSettings, 0), Groups: make([]input.NodeGroupSettings, 0), }, } for _, tags := range testopctags { readConfig.RootNodes = append(readConfig.RootNodes, mapOPCTag(tags)) } client, err := readConfig.createReadClient(testutil.Logger{}) require.NoError(t, err) require.NoError(t, client.connect()) actualopcmetrics := make([]telegraf.Metric, 0, len(client.LastReceivedData)) for i := range client.LastReceivedData { actualopcmetrics = append(actualopcmetrics, client.MetricForNode(i)) } testutil.RequireMetricsEqual(t, expectedopcmetrics, actualopcmetrics, testutil.IgnoreTime()) } func TestReadClientIntegrationWithPasswordAuth(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } container := testutil.Container{ Image: "open62541/open62541", Entrypoint: []string{"/opt/open62541/build/bin/examples/access_control_server"}, ExposedPorts: []string{servicePort}, WaitingFor: wait.ForAll( wait.ForListeningPort(nat.Port(servicePort)), wait.ForLog("TCP network layer listening on opc.tcp://"), ), } err := container.Start() require.NoError(t, err, "failed to start container") defer container.Terminate() testopctags := []opcTags{ {"ProductName", "0", "i", "2261", "open62541 OPC UA Server"}, {"ProductUri", "0", "i", "2262", "http://open62541.org"}, {"ManufacturerName", "0", "i", "2263", "open62541"}, } readConfig := readClientConfig{ InputClientConfig: input.InputClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{ Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]), SecurityPolicy: "None", SecurityMode: "None", Username: config.NewSecret([]byte("peter")), Password: config.NewSecret([]byte("peter123")), AuthMethod: "UserName", ConnectTimeout: config.Duration(10 * time.Second), RequestTimeout: config.Duration(1 * time.Second), Workarounds: opcua.OpcUAWorkarounds{}, }, MetricName: "testing", RootNodes: make([]input.NodeSettings, 0), Groups: make([]input.NodeGroupSettings, 0), }, } for _, tags := range testopctags { readConfig.RootNodes = append(readConfig.RootNodes, mapOPCTag(tags)) } client, err := readConfig.createReadClient(testutil.Logger{}) require.NoError(t, err) err = client.connect() require.NoError(t, err) for i, v := range client.LastReceivedData { require.Equal(t, testopctags[i].want, v.Value) } } func TestReadClientConfig(t *testing.T) { toml := ` [[inputs.opcua]] name = "localhost" endpoint = "opc.tcp://localhost:4840" connect_timeout = "10s" request_timeout = "5s" security_policy = "auto" security_mode = "auto" certificate = "/etc/telegraf/cert.pem" private_key = "/etc/telegraf/key.pem" auth_method = "Anonymous" username = "" password = "" optional_fields = ["DataType"] [[inputs.opcua.nodes]] name = "name" namespace = "1" identifier_type = "s" identifier="one" tags=[["tag0", "val0"]] [[inputs.opcua.nodes]] name="name2" namespace="2" identifier_type="s" identifier="two" tags=[["tag0", "val0"], ["tag00", "val00"]] default_tags = {tag6 = "val6"} [[inputs.opcua.group]] name = "foo" namespace = "3" identifier_type = "i" tags = [["tag1", "val1"], ["tag2", "val2"]] nodes = [{name="name3", identifier="3000", tags=[["tag3", "val3"]]}] [[inputs.opcua.group]] name = "bar" namespace = "0" identifier_type = "i" tags = [["tag1", "val1"], ["tag2", "val2"]] [[inputs.opcua.group.nodes]] name = "name4" identifier = "4000" tags=[["tag4", "val4"]] default_tags = { tag1 = "override" } [[inputs.opcua.group.nodes]] name = "name5" identifier = "4001" [inputs.opcua.workarounds] additional_valid_status_codes = ["0xC0"] [inputs.opcua.request_workarounds] use_unregistered_reads = true ` c := config.NewConfig() err := c.LoadConfigData([]byte(toml), config.EmptySourcePath) require.NoError(t, err) require.Len(t, c.Inputs, 1) o, ok := c.Inputs[0].Input.(*OpcUA) require.True(t, ok) require.Equal(t, "localhost", o.readClientConfig.MetricName) require.Equal(t, "opc.tcp://localhost:4840", o.readClientConfig.Endpoint) require.Equal(t, config.Duration(10*time.Second), o.readClientConfig.ConnectTimeout) require.Equal(t, config.Duration(5*time.Second), o.readClientConfig.RequestTimeout) require.Equal(t, "auto", o.readClientConfig.SecurityPolicy) require.Equal(t, "auto", o.readClientConfig.SecurityMode) require.Equal(t, "/etc/telegraf/cert.pem", o.readClientConfig.Certificate) require.Equal(t, "/etc/telegraf/key.pem", o.readClientConfig.PrivateKey) require.Equal(t, "Anonymous", o.readClientConfig.AuthMethod) require.True(t, o.readClientConfig.Username.Empty()) require.True(t, o.readClientConfig.Password.Empty()) require.Equal(t, []input.NodeSettings{ { FieldName: "name", Namespace: "1", IdentifierType: "s", Identifier: "one", TagsSlice: [][]string{{"tag0", "val0"}}, }, { FieldName: "name2", Namespace: "2", IdentifierType: "s", Identifier: "two", TagsSlice: [][]string{{"tag0", "val0"}, {"tag00", "val00"}}, DefaultTags: map[string]string{"tag6": "val6"}, }, }, o.readClientConfig.RootNodes) require.Equal(t, []input.NodeGroupSettings{ { MetricName: "foo", Namespace: "3", IdentifierType: "i", TagsSlice: [][]string{{"tag1", "val1"}, {"tag2", "val2"}}, Nodes: []input.NodeSettings{{ FieldName: "name3", Identifier: "3000", TagsSlice: [][]string{{"tag3", "val3"}}, }}, }, { MetricName: "bar", Namespace: "0", IdentifierType: "i", TagsSlice: [][]string{{"tag1", "val1"}, {"tag2", "val2"}}, Nodes: []input.NodeSettings{{ FieldName: "name4", Identifier: "4000", TagsSlice: [][]string{{"tag4", "val4"}}, DefaultTags: map[string]string{"tag1": "override"}, }, { FieldName: "name5", Identifier: "4001", }}, }, }, o.readClientConfig.Groups) require.Equal(t, opcua.OpcUAWorkarounds{AdditionalValidStatusCodes: []string{"0xC0"}}, o.readClientConfig.Workarounds) require.Equal(t, readClientWorkarounds{UseUnregisteredReads: true}, o.readClientConfig.ReadClientWorkarounds) require.Equal(t, []string{"DataType"}, o.readClientConfig.OptionalFields) err = o.Init() require.NoError(t, err) require.Len(t, o.client.NodeMetricMapping, 5, "incorrect number of nodes") require.EqualValues(t, map[string]string{"tag0": "val0"}, o.client.NodeMetricMapping[0].MetricTags) require.EqualValues(t, map[string]string{"tag6": "val6"}, o.client.NodeMetricMapping[1].MetricTags) require.EqualValues(t, map[string]string{"tag1": "val1", "tag2": "val2", "tag3": "val3"}, o.client.NodeMetricMapping[2].MetricTags) require.EqualValues(t, map[string]string{"tag1": "override", "tag2": "val2"}, o.client.NodeMetricMapping[3].MetricTags) require.EqualValues(t, map[string]string{"tag1": "val1", "tag2": "val2"}, o.client.NodeMetricMapping[4].MetricTags) } func TestUnregisteredReadsAndSessionRecoveryIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } container := testutil.Container{ Image: "open62541/open62541", ExposedPorts: []string{servicePort}, WaitingFor: wait.ForAll( wait.ForListeningPort(nat.Port(servicePort)), wait.ForLog("TCP network layer listening on opc.tcp://"), ), } require.NoError(t, container.Start(), "failed to start container") defer container.Terminate() testopctags := []opcTags{ {"ProductName", "0", "i", "2261", "open62541 OPC UA Server"}, {"ProductUri", "0", "i", "2262", "http://open62541.org"}, } readConfig := readClientConfig{ ReadRetries: 1, // Set low to make tests faster ReadClientWorkarounds: readClientWorkarounds{ UseUnregisteredReads: true, // Enable unregistered reads }, InputClientConfig: input.InputClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{ Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]), SecurityPolicy: "None", SecurityMode: "None", AuthMethod: "Anonymous", ConnectTimeout: config.Duration(10 * time.Second), RequestTimeout: config.Duration(1 * time.Second), Workarounds: opcua.OpcUAWorkarounds{}, }, MetricName: "testing", RootNodes: make([]input.NodeSettings, 0), Groups: make([]input.NodeGroupSettings, 0), }, } for _, tags := range testopctags { readConfig.RootNodes = append(readConfig.RootNodes, mapOPCTag(tags)) } // Create logger to capture logs logger := &testutil.CaptureLogger{} client, err := readConfig.createReadClient(logger) require.NoError(t, err) // First connection require.NoError(t, client.connect()) // Verify initial data read was successful require.Len(t, client.LastReceivedData, 2) for i, v := range client.LastReceivedData { require.Equal(t, testopctags[i].want, v.Value) } // Get initial metrics to compare later initialMetrics, err := client.currentValues() require.NoError(t, err) require.Len(t, initialMetrics, 2) // Now simulate session invalidation as would happen in the real world client.forceReconnect = true // Get metrics again - this should force a reconnection recoveredMetrics, err := client.currentValues() require.NoError(t, err, "Should recover from session invalidation") require.Len(t, recoveredMetrics, 2) // Verify data consistency after reconnect for i := range recoveredMetrics { require.Equal(t, initialMetrics[i].Fields()[testopctags[i].name], recoveredMetrics[i].Fields()[testopctags[i].name], "Data should be consistent after session recovery") } // Verify we're using unregistered reads by checking log messages // In a real scenario, the error message would say "unregistered nodes" // But since we're simulating, we need to verify the flag is set correctly require.True(t, client.Workarounds.UseUnregisteredReads, "UseUnregisteredReads flag should be properly set") } func TestConsecutiveSessionErrorRecoveryIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } container := testutil.Container{ Image: "open62541/open62541", ExposedPorts: []string{servicePort}, WaitingFor: wait.ForAll( wait.ForListeningPort(nat.Port(servicePort)), wait.ForLog("TCP network layer listening on opc.tcp://"), ), } require.NoError(t, container.Start(), "failed to start container") defer container.Terminate() // Create a test OpcUA instance o := &OpcUA{ readClientConfig: readClientConfig{ ReadRetries: 1, ReadClientWorkarounds: readClientWorkarounds{ UseUnregisteredReads: true, }, InputClientConfig: input.InputClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{ Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]), SecurityPolicy: "None", SecurityMode: "None", AuthMethod: "Anonymous", ConnectTimeout: config.Duration(10 * time.Second), RequestTimeout: config.Duration(1 * time.Second), }, MetricName: "testing", RootNodes: []input.NodeSettings{ mapOPCTag(opcTags{"ProductName", "0", "i", "2261", "open62541 OPC UA Server"}), }, }, }, Log: testutil.Logger{}, } // Initialize the plugin require.NoError(t, o.Init()) // Create an accumulator acc := &testutil.Accumulator{} // First gather should succeed require.NoError(t, o.Gather(acc)) require.Len(t, acc.Metrics, 1) require.Equal(t, uint64(0), o.consecutiveErrors) // Simulate a session error o.client.forceReconnect = true // The next gather should force a reconnection internally and succeed acc.ClearMetrics() require.NoError(t, o.Gather(acc)) require.Len(t, acc.Metrics, 1) require.Equal(t, uint64(0), o.consecutiveErrors, "Should reset consecutive errors after successful gather") // Simulate multiple consecutive errors with bad endpoint originalEndpoint := o.client.Config.OpcUAClientConfig.Endpoint o.client.Config.OpcUAClientConfig.Endpoint = "opc.tcp://invalid-endpoint:4840" // Next gather should fail acc.ClearMetrics() require.Error(t, o.Gather(acc)) require.Equal(t, uint64(1), o.consecutiveErrors) // Another failure should increment consecutive errors and trigger session invalidation acc.ClearMetrics() require.Error(t, o.Gather(acc)) require.Equal(t, uint64(2), o.consecutiveErrors) require.True(t, o.client.forceReconnect, "Should force session invalidation after multiple errors") // Restore endpoint to allow recovery o.client.Config.OpcUAClientConfig.Endpoint = originalEndpoint // Next gather should succeed and reset error counter acc.ClearMetrics() require.NoError(t, o.Gather(acc)) require.Len(t, acc.Metrics, 1) require.Equal(t, uint64(0), o.consecutiveErrors, "Should reset consecutive errors after recovery") }