320 lines
14 KiB
Text
320 lines
14 KiB
Text
|
|
# This Starlark processor is used when loading Sparkplug B protobuf #
|
|
# messages into InfluxDB. The data source is a Opto22 Groov EPIC controller.
|
|
#
|
|
# This processor does the following:
|
|
# - Resolves the metric name using a numeric alias.
|
|
# When the EPIC MQTT client is started it sends a DBIRTH message
|
|
# that lists all metrics configured on the controller and includes
|
|
# a sequential numeric alias to reference it by.
|
|
# This processor stores that information in the array states["aliases"].
|
|
|
|
# When subsequent DDATA messages are published, the numeric alias is
|
|
# used to find the stored metric name in the array states["aliases"].
|
|
# - Splits the MQTT topic into 5 fields which can be used as tags in InfluxDB.
|
|
# - Splits the metric name into 6 fields which are be used as tags in InfluxDB.
|
|
# - Deletes the host, type, topic, name and alias tags
|
|
#
|
|
# TODO:
|
|
# The requirement that a DBIRTH message has to be received before DDATA messages
|
|
# can be used creates a significant reliability issue and a debugging mess.
|
|
# I have to go into the Groov EPIC controller and restart the MQTT client every time
|
|
# I restart the telegraf loader. This has caused many hours of needless frustration.
|
|
#
|
|
# I see two possible solutions:
|
|
# - Opto 22 changes their software making it optional to drop the alias
|
|
# and simply include the name in the DDATA messages. In my case it's never more
|
|
# than 15 characters. This is the simplest and most reliable solution.
|
|
# - Make a system call from telegraf and using SSH to remotely restart the MQTT client.
|
|
# - Have telegraf send a message through MQTT requesting a DBIRTH message from the EPIC Controller.
|
|
#
|
|
# Example Input:
|
|
# edge,host=firefly,topic=spBv1.0/SF/DDATA/epiclc/Exp501 type=9i,value=22.247711,alias=10i 1626475876000000000
|
|
# edge,host=firefly,topic=spBv1.0/SF/DDATA/epiclc/Exp501 alias=10i,type=9i,value=22.231323 1626475877000000000
|
|
# edge,host=firefly,topic=spBv1.0/SF/DBIRTH/epiclc/Exp501 type=9i,name="Strategy/IO/I_Ch_TC_Right",alias=9i 1626475880000000000
|
|
# edge,host=firefly,topic=spBv1.0/SF/DBIRTH/epiclc/Exp501 value=22.200958,name="Strategy/IO/I_Ch_TC_Top_C",type=9i,alias=10i 1626475881000000000
|
|
# edge,host=firefly,topic=spBv1.0/SF/DDATA/epiclc/Exp501 alias=10i,type=9i,value=22.177643 1626475884000000000
|
|
# edge,host=firefly,topic=spBv1.0/SF/DDATA/epiclc/Exp501 type=9i,value=22.231903,alias=10i 1626475885000000000
|
|
# edge,host=firefly,topic=spBv1.0/SF/DDATA/epiclc/Exp501 value=22.165192,alias=10i,type=9i 1626475895000000000
|
|
# edge,host=firefly,topic=spBv1.0/SF/DDATA/epiclc/Exp501 alias=10i,type=9i,value=22.127106 1626475896000000000
|
|
#
|
|
# Example Output:
|
|
# C,Component=Ch,Datatype=IO,Device=TC,EdgeID=epiclc,Experiment=Exp501,Metric=I_Ch_TC_Top_C,MsgType=DBIRTH,Position=Top,Reactor=SF,Source=Strategy value=22.200958 1626475881000000000
|
|
# C,Component=Ch,Datatype=IO,Device=TC,EdgeID=epiclc,Experiment=Exp501,Metric=I_Ch_TC_Top_C,MsgType=DDATA,Position=Top,Reactor=SF,Source=Strategy value=22.177643 1626475884000000000
|
|
# C,Component=Ch,Datatype=IO,Device=TC,EdgeID=epiclc,Experiment=Exp501,Metric=I_Ch_TC_Top_C,MsgType=DDATA,Position=Top,Reactor=SF,Source=Strategy value=22.231903 1626475885000000000
|
|
# C,Component=Ch,Datatype=IO,Device=TC,EdgeID=epiclc,Experiment=Exp501,Metric=I_Ch_TC_Top_C,MsgType=DDATA,Position=Top,Reactor=SF,Source=Strategy value=22.165192 1626475895000000000
|
|
# C,Component=Ch,Datatype=IO,Device=TC,EdgeID=epiclc,Experiment=Exp501,Metric=I_Ch_TC_Top_C,MsgType=DDATA,Position=Top,Reactor=SF,Source=Strategy value=22.127106 1626475896000000000
|
|
|
|
#############################################
|
|
# The following is the telegraf.conf used when calling this processor
|
|
|
|
# [[inputs.mqtt_consumer]]
|
|
# servers = ["tcp://your_server:1883"]
|
|
# qos = 0
|
|
# connection_timeout = "30s"
|
|
# topics = ["spBv1.0/#"]
|
|
# persistent_session = false
|
|
# client_id = ""
|
|
# username = "your username"
|
|
# password = "your password"
|
|
#
|
|
# # Sparkplug protobuf configuration
|
|
# data_format = "xpath_protobuf"
|
|
#
|
|
# # URL of sparkplug protobuf prototype
|
|
# xpath_protobuf_type = "org.eclipse.tahu.protobuf.Payload"
|
|
#
|
|
# # Location of sparkplug_b.proto file
|
|
# xpath_protobuf_file = "/apps/telegraf/config/sparkplug_b.proto"
|
|
#
|
|
# [[inputs.mqtt_consumer.xpath_protobuf]]
|
|
# metric_selection = "metrics[not(template_value)]"
|
|
# metric_name = "concat('edge', substring-after(name, ' '))"
|
|
# timestamp = "timestamp"
|
|
# timestamp_format = "unix_ms"
|
|
# [inputs.mqtt_consumer.xpath_protobuf.tags]
|
|
# name = "substring-after(name, ' ')"
|
|
# [inputs.mqtt_consumer.xpath_protobuf.fields_int]
|
|
# type = "datatype"
|
|
# alias = "alias"
|
|
# [inputs.mqtt_consumer.xpath_protobuf.fields]
|
|
# # A metric value must be numeric
|
|
# value = "number((int_value | long_value | float_value | double_value | boolean_value))"
|
|
# name = "name"
|
|
#
|
|
# # Starlark processor
|
|
# [[processors.starlark]]
|
|
# script = "sparkplug.star"
|
|
#
|
|
# # Optionally Define constants used in sparkplug.star
|
|
# # Constants can be defined here or they can be defined in the
|
|
# # sparkplug_b.star file.
|
|
#
|
|
# [processors.starlark.constants]
|
|
#
|
|
# # NOTE: The remaining fields can be specified either here or in the starlark script.
|
|
#
|
|
# # Tags used to identify message type - 3rd field of topic
|
|
# BIRTH_TAG = "BIRTH/"
|
|
# DEATH_TAG = "DEATH/"
|
|
# DATA_TAG = "DATA/"
|
|
#
|
|
# # Number of messages to hold if alias cannot be resolved
|
|
# MAX_UNRESOLVED = 3
|
|
#
|
|
# # Provide alternate names for the 5 sparkplug topic fields.
|
|
# # The topic contains 5 fields separated by the '/' character.
|
|
# # Define the tag name for each of these fields.
|
|
# MSG_FORMAT = "false" #0
|
|
# GROUP_ID = "reactor" #1
|
|
# MSG_TYPE = "false" #2
|
|
# EDGE_ID = "edgeid" #3
|
|
# DEVICE_ID = "experiment" #4
|
|
#
|
|
|
|
BIRTH_TAG = "BIRTH/"
|
|
DEATH_TAG = "DEATH/"
|
|
DATA_TAG = "DATA/"
|
|
|
|
# Number of messages to hold if alias cannot be resolved
|
|
MAX_UNRESOLVED = 3
|
|
|
|
# Provide alternate names for the 5 sparkplug topic fields.
|
|
# The topic contains 5 fields separated by the '/' character.
|
|
# Define the tag name for each of these fields.
|
|
MSG_FORMAT = "false" #0
|
|
GROUP_ID = "Reactor" #1
|
|
MSG_TYPE = "MsgType" #2
|
|
EDGE_ID = "EdgeID" #3
|
|
DEVICE_ID = "Experiment" #4
|
|
|
|
########### Begin sparkplug.star script
|
|
|
|
|
|
load("logging.star", "log")
|
|
|
|
state = {
|
|
"aliases": dict(),
|
|
"devices": dict(),
|
|
"unresolved": list()
|
|
}
|
|
|
|
def extractTopicTags(metric):
|
|
msg_format = ''
|
|
groupid = ''
|
|
msg_type = ''
|
|
edgeid = ''
|
|
deviceid = ''
|
|
|
|
topic = metric.tags.get("topic", "");
|
|
fields = topic.split("/");
|
|
nfields = len(fields)
|
|
if nfields > 0: msg_format = fields[0]
|
|
if nfields > 1: groupid = fields[1]
|
|
if nfields > 2: msg_type = fields[2]
|
|
if nfields > 3: edgeid = fields[3]
|
|
if nfields > 4: deviceid = fields[4]
|
|
return [msg_format, groupid, msg_type, edgeid, deviceid]
|
|
|
|
|
|
def buildTopicTags(metric, topicFields):
|
|
# Remove topic and host tags - they are not useful for analysis
|
|
metric.tags.pop("topic")
|
|
metric.tags.pop("host")
|
|
|
|
if MSG_FORMAT != "false": metric.tags[MSG_FORMAT] = topicFields[0]
|
|
if GROUP_ID != "false": metric.tags[GROUP_ID] = topicFields[1]
|
|
if MSG_TYPE != "false": metric.tags[MSG_TYPE] = topicFields[2]
|
|
if EDGE_ID != "false": metric.tags[EDGE_ID] = topicFields[3]
|
|
if DEVICE_ID != "false": metric.tags[DEVICE_ID] = topicFields[4]
|
|
|
|
|
|
def buildNameTags(metric,name):
|
|
# Remove type and alias from metric.fields - They are not useful for analysis
|
|
metric.fields.pop("type")
|
|
metric.fields.pop("alias")
|
|
if "name" in metric.fields:
|
|
metric.fields.pop("name")
|
|
|
|
# The Groov EPIC metric names are comprised of 3 fields separated by a '/'
|
|
# source, datatype, and metric name
|
|
# Extract these fields and include them as tags.
|
|
fields = name.split('/')
|
|
nfields = len(fields)
|
|
if nfields > 0:
|
|
metric.tags["Source"] = fields[0]
|
|
if nfields > 1:
|
|
metric.tags["Datatype"] = fields[1]
|
|
if nfields > 2:
|
|
metric.tags["Metric"] = fields[2]
|
|
|
|
# OPTIONAL
|
|
#
|
|
# By using underscore characters the metric name can be further
|
|
# divided into additional tags.
|
|
# How this is defined is site specific.
|
|
# Customize this as you wish
|
|
|
|
# The following demonstrates dividing the metric name into 3, 4 or 5 new tags
|
|
# A metric name must have between 3-5 underscore separated fields
|
|
|
|
# If there is only one or two fields then the only tag created is 'metric'
|
|
# which has the full name
|
|
#
|
|
# The last field is Units and is filled before fields 3, 4 and 5
|
|
# Ex: C, V, Torr, W, psi, RPM, On....
|
|
# The units are used in Influx as the 'measurement' name.
|
|
#
|
|
#
|
|
# Fields 3, 4 and 5 (device, position, composition) are optional
|
|
# measurement_component_device_position_composition_units
|
|
#
|
|
# Ex: I_FuelTank1_C (2 fields)
|
|
# Measurement I
|
|
# Component FuelTank1
|
|
# Units C
|
|
#
|
|
# I_FuelTank1_TC_Outlet_C (5 fields)
|
|
# Measurement I
|
|
# Component FuelTank1
|
|
# Device TC
|
|
# Position Outlet
|
|
# Units C
|
|
#
|
|
# I_FuelTank1_TC_Outlet_Premium_C (6 fields)
|
|
# Measurement I
|
|
# Component FuelTank1
|
|
# Device TC
|
|
# Position Outlet
|
|
# Composition Premium
|
|
# Units C
|
|
|
|
# Split the metric name into fields using '_'
|
|
sfields = fields[2].split('_')
|
|
nf = len(sfields)
|
|
# Don't split the name if it's one or two fields
|
|
if nf <= 2:
|
|
metric.name = "Name"
|
|
if nf > 2:
|
|
metric.name = sfields[nf-1] # The Units are used for the metric name
|
|
metric.tags["Component"] = sfields[1]
|
|
if nf > 3:
|
|
metric.tags["Device"] = sfields[2]
|
|
if nf > 4:
|
|
metric.tags["Position"] = sfields[3]
|
|
if nf > 5:
|
|
metric.tags["Composition"] = sfields[4]
|
|
|
|
def apply(metric):
|
|
output = metric
|
|
|
|
log.debug("apply metric: {}".format(metric))
|
|
|
|
topic = metric.tags.get("topic", "")
|
|
topicFields = extractTopicTags(metric)
|
|
edgeid = topicFields[3] # Sparkplug spec specifies 4th field as edgeid
|
|
|
|
# Split the topic into fields and assign to variables
|
|
# Determine if the message is of type birth and if so add it to the "devices" LUT.
|
|
if DEATH_TAG in topic:
|
|
output = None
|
|
elif BIRTH_TAG in topic:
|
|
log.debug(" metric msg_type: {} edgeid: {} topic: {}".format(BIRTH_TAG, edgeid, topic))
|
|
if "alias" in metric.fields and "name" in metric.fields:
|
|
# Create the lookup-table using "${edgeid}/${alias}" as the key and "${name}" as value
|
|
alias = metric.fields.get("alias")
|
|
name = metric.fields.get("name")
|
|
id = "{}/{}".format(edgeid,alias)
|
|
log.debug(" --> setting alias: {} name: {} id: {}'".format(alias, name, id))
|
|
state["aliases"][id] = name
|
|
if "value" in metric.fields:
|
|
buildTopicTags(metric, topicFields)
|
|
buildNameTags(metric, name)
|
|
else:
|
|
output = None
|
|
|
|
# Try to resolve the unresolved if any
|
|
if len(state["unresolved"]) > 0:
|
|
# Filter out the matching metrics and keep the rest as unresolved
|
|
log.debug(" unresolved")
|
|
unresolved = [("{}/{}".format(edgeid, m.fields["alias"]), m) for m in state["unresolved"]]
|
|
matching = [(mid, m) for mid, m in unresolved if mid == id]
|
|
state["unresolved"] = [m for mid, m in unresolved if mid != id]
|
|
|
|
log.debug(" found {} matching unresolved metrics".format(len(matching)))
|
|
# Process the matching metrics and output - TODO - needs debugging
|
|
# for mid, m in matching:
|
|
# buildTopicTags(m,topicFields)
|
|
# buildNameTags(m)
|
|
# output = [m for _, m in matching] + [metric]
|
|
|
|
elif DATA_TAG in topic:
|
|
log.debug(" metric msg_type: {} edgeid: {} topic: {}".format(DATA_TAG, edgeid, topic))
|
|
if "alias" in metric.fields:
|
|
alias = metric.fields.get("alias")
|
|
|
|
# Lookup the ID. If we know it, replace the name of the metric with the lookup value,
|
|
# otherwise we need to keep the metric for resolving later.
|
|
# This can happen if the messages are out-of-order for some reason...
|
|
id = "{}/{}".format(edgeid,alias)
|
|
if id in state["aliases"]:
|
|
name = state["aliases"][id]
|
|
log.debug(" found alias: {} name: {}".format(alias, name))
|
|
buildTopicTags(metric,topicFields)
|
|
buildNameTags(metric,name)
|
|
else:
|
|
# We want to hold the metric until we get the corresponding birth message
|
|
log.debug(" id not found: {}".format(id))
|
|
output = None
|
|
if len(state["unresolved"]) >= MAX_UNRESOLVED:
|
|
log.warn(" metric overflow, trimming {}".format(len(state["unresolved"]) - MAX_UNRESOLVED+1))
|
|
# Release the unresolved metrics as raw and trim buffer
|
|
output = state["unresolved"][MAX_UNRESOLVED-1:]
|
|
state["unresolved"] = state["unresolved"][:MAX_UNRESOLVED-1]
|
|
log.debug(" --> keeping metric")
|
|
state["unresolved"].append(metric)
|
|
else:
|
|
output = None
|
|
|
|
return output
|
|
|