structured data output and statistics

This commit is contained in:
Wolfgang Hottgenroth
2022-02-09 18:07:11 +01:00
parent 8bde252189
commit 7543b1857b
8 changed files with 156 additions and 36 deletions

View File

@ -0,0 +1,20 @@
import re
import json
class InvalidDataObjectException(Exception):
def __init__(self, message):
super().__init__(message)
class AbstractDataObject(object):
invalidChars = re.compile("[#+\s]")
def __init__(self, topicPart):
self.topicPart = topicPart
def getTopicPart(self):
if AbstractDataObject.invalidChars.search(self.topicPart):
raise InvalidDataObjectException(f"Topic contains invalid characters: {self.topicPart}")
return self.topicPart
def getPayload(self):
raise NotImplementedError()

View File

@ -1,25 +1,17 @@
import re import re
import json import json
from AbstractDataObject import AbstractDataObject
class InvalidDataObjectException(Exception):
def __init__(self, message):
super().__init__(message)
class DataObject(object): class FlatDataObject(AbstractDataObject):
invalidChars = re.compile("[#+/\s]")
def __init__(self, serverName, nameSpaceIndex, variableName, value): def __init__(self, serverName, nameSpaceIndex, variableName, value):
super().__init__(serverName + '/' + str(nameSpaceIndex) + '/' + variableName)
self.serverName = serverName self.serverName = serverName
self.nameSpaceIndex = nameSpaceIndex self.nameSpaceIndex = nameSpaceIndex
self.variableName = variableName self.variableName = variableName
self.value = value self.value = value
def isValid(self):
return (not (DataObject.invalidChars.search(self.serverName) or DataObject.invalidChars.search(self.variableName))) and (type(self.nameSpaceIndex) == int)
def getTopicPost(self):
return self.serverName + '/' + str(self.nameSpaceIndex) + '/' + self.variableName
def getPayload(self): def getPayload(self):
payload = { payload = {
"serverName": self.serverName, "serverName": self.serverName,

View File

@ -1,42 +1,37 @@
from threading import Event from threading import Event
from loguru import logger from loguru import logger
from MqttBase import AbstractMqttPublisher from MqttBase import AbstractMqttPublisher
from AbstractDataObject import InvalidDataObjectException
from queue import Empty from queue import Empty
import json import json
import datetime
LOOP_SLICE = 0.1 # seconds LOOP_SLICE = 0.1 # seconds
class MqttPublish(AbstractMqttPublisher): class MqttPublish(AbstractMqttPublisher):
def __init__(self, config, queue): def __init__(self, config, stats, queue):
super().__init__(config) super().__init__(config)
self.queue = queue self.queue = queue
self.stats = stats
self.topicPre = self.config["publishTopicPrefix"] self.topicPre = self.config["publishTopicPrefix"]
self.statusTopic = self.config["statusTopic"] self.statusTopic = self.config["statusTopic"]
self.statusThreshold = self.config["statusThreshold"] self.statusThreshold = self.config["statusThreshold"]
def localLoop(self): def localLoop(self):
cnt = 0
startTime = datetime.datetime.now()
while not self.killBill: while not self.killBill:
try: try:
dataObject = self.queue.get(timeout=LOOP_SLICE) dataObject = self.queue.get(timeout=LOOP_SLICE)
if not dataObject.isValid(): topic = self.topicPre + '/' + dataObject.getTopicPart()
logger.error("invalid dataObject received: drop it")
else:
topic = self.topicPre + '/' + dataObject.getTopicPost()
payload = dataObject.getPayload() payload = dataObject.getPayload()
self.client.publish(topic, payload) self.client.publish(topic, payload)
cnt += 1
logger.debug("mqtt message sent: {} -> {}".format(topic, payload)) logger.debug("mqtt message sent: {} -> {}".format(topic, payload))
if cnt % 100 == 1: self.stats.incMqttRequests()
currentTime = datetime.datetime.now()
uptime = int((currentTime - startTime).total_seconds())
payload = { "count": cnt, "uptime": uptime }
self.client.publish(self.statusTopic, json.dumps(payload), retain=True)
except Empty: except Empty:
# just evaluate the killBill at the top of the loop again # just evaluate the killBill at the top of the loop again
pass pass
except InvalidDataObjectException as e:
self.stats.incMqttErrors()
logger.error(f"InvalidDataObjectException received in MQTT local loop: {e}")
except Exception as e: except Exception as e:
self.stats.incMqttErrors()
logger.error(f"Exception {type(e)} received in MQTT local loop: {e}") logger.error(f"Exception {type(e)} received in MQTT local loop: {e}")

View File

@ -2,21 +2,24 @@ import asyncio
from asyncua import Client, ua from asyncua import Client, ua
import threading import threading
from loguru import logger from loguru import logger
from DataObject import DataObject from FlatDataObject import FlatDataObject
from StructuredDataObject import StructuredDataObject
class OpcUaRequester(threading.Thread): class OpcUaRequester(threading.Thread):
def __init__(self, config, queue): def __init__(self, config, stats, queue):
super().__init__() super().__init__()
self.config = config self.config = config
self.queue = queue self.queue = queue
self.stats = stats
self.name = self.config['name'] self.name = self.config['name']
self.url = self.config['url'] self.url = self.config['url']
self.nodes = self.config['nodes'] self.nodes = self.config['nodes']
self.delay = self.config['delay'] self.delay = self.config['delay']
self.timeout = self.config['timeout'] self.timeout = self.config['timeout']
self.dataObjectType = self.config['type']
self.flat = self.dataObjectType == 'flat'
# consider this flag in the localLoop # consider this flag in the localLoop
self.killBill = False self.killBill = False
@ -27,6 +30,8 @@ class OpcUaRequester(threading.Thread):
while not self.killBill: while not self.killBill:
try: try:
async with Client(url=self.url, timeout=self.timeout) as client: async with Client(url=self.url, timeout=self.timeout) as client:
if not self.flat:
dataObject = StructuredDataObject(self.name)
for nodeSpec in self.nodes: for nodeSpec in self.nodes:
try: try:
logger.debug(f"Trying {self.name} {self.url} ns={nodeSpec['ns']};{nodeSpec['n']}") logger.debug(f"Trying {self.name} {self.url} ns={nodeSpec['ns']};{nodeSpec['n']}")
@ -34,13 +39,22 @@ class OpcUaRequester(threading.Thread):
value = await node.read_value() value = await node.read_value()
displayName = nodeSpec['d'] if ('d' in nodeSpec) else (await node.read_display_name()).Text displayName = nodeSpec['d'] if ('d' in nodeSpec) else (await node.read_display_name()).Text
logger.debug(f"Got: {displayName=} = {value=}") logger.debug(f"Got: {displayName=} = {value=}")
self.queue.put(DataObject(self.name, nodeSpec['ns'], displayName, value)) if self.flat:
self.queue.put(FlatDataObject(self.name, nodeSpec['ns'], displayName, value))
else:
dataObject.add(displayName, value)
self.stats.incOpcUaRequests()
except ua.UaError as e: except ua.UaError as e:
self.stats.incOpcUaErrors()
logger.error(f"UaError in inner OPC-UA loop: {type(e)} {e}") logger.error(f"UaError in inner OPC-UA loop: {type(e)} {e}")
if not self.flat:
self.queue.put(dataObject)
await asyncio.sleep(self.delay) await asyncio.sleep(self.delay)
except asyncio.exceptions.TimeoutError as e: except asyncio.exceptions.TimeoutError as e:
self.stats.incOpcUaTimeouts()
logger.error(f"Timeout in inner OPC-UA loop") logger.error(f"Timeout in inner OPC-UA loop")
except Exception as e: except Exception as e:
self.stats.incOpcUaErrors()
logger.error(f"Exception in inner OPC-UA loop: {type(e)} {e}") logger.error(f"Exception in inner OPC-UA loop: {type(e)} {e}")
def run(self): def run(self):

66
opcua2mqtt/Statistics.py Normal file
View File

@ -0,0 +1,66 @@
from loguru import logger
import threading
from AbstractDataObject import AbstractDataObject
import json
import datetime
class StatisticsDataObject(AbstractDataObject):
def __init__(self, topic, payload):
super().__init__(topic)
self.payload = payload
def getPayload(self):
return json.dumps(self.payload)
class StatisticsCollector(threading.Thread):
def __init__(self, config, queue):
super().__init__()
self.config = config['stats']
self.period = self.config['period']
self.topic = self.config['topic']
self.queue = queue
self.killBill = False
self.killEvent = threading.Event()
self.stats = {
'opcUaRequests': 0,
'opcUaErrors' : 0,
'opcUaTimeouts': 0,
'mqttRequests': 0,
'mqttErrors': 0,
'uptime': 0
}
def incOpcUaRequests(self):
self.stats['opcUaRequests'] += 1
def incOpcUaErrors(self):
self.stats['opcUaErrors'] += 1
def incOpcUaTimeouts(self):
self.stats['opcUaTimeouts'] += 1
def incMqttRequests(self):
self.stats['mqttRequests'] += 1
def incMqttErrors(self):
self.stats['mqttErrors'] += 1
def stop(self):
self.killBill = True
logger.info("kill flag set")
self.killEvent.set()
logger.info("kill events triggered")
def run(self):
startTime = datetime.datetime.now()
while not self.killBill:
currentTime = datetime.datetime.now()
self.stats['uptime'] = int((currentTime - startTime).total_seconds())
self.queue.put(StatisticsDataObject(self.topic, self.stats))
self.killEvent.wait(timeout=float(self.period))

View File

@ -0,0 +1,16 @@
import re
import json
from AbstractDataObject import AbstractDataObject
class StructuredDataObject(AbstractDataObject):
def __init__(self, topicPart):
super().__init__(topicPart)
self.keyValuePairs = []
def add(self, key, value):
self.keyValuePairs.append({key: value})
def getPayload(self):
return json.dumps(self.keyValuePairs)

View File

@ -1,5 +1,7 @@
from MqttPublish import MqttPublish from MqttPublish import MqttPublish
from OpcUaRequester import OpcUaRequester from OpcUaRequester import OpcUaRequester
from Statistics import StatisticsCollector
from loguru import logger from loguru import logger
import argparse import argparse
import json import json
@ -38,7 +40,11 @@ with open(args.config) as f:
queue = queue.Queue() queue = queue.Queue()
publishThread = MqttPublish(config, queue) statsThread = StatisticsCollector(config, queue)
statsThread.start()
logger.info("StatisticsCollector started")
publishThread = MqttPublish(config, statsThread, queue)
publishThread.start() publishThread.start()
logger.info("MqttPublish started") logger.info("MqttPublish started")
@ -46,7 +52,7 @@ opcuaThreads = []
for o in config['opcua']: for o in config['opcua']:
if o['enabled'] != 'true': if o['enabled'] != 'true':
continue continue
ot = OpcUaRequester(o, queue) ot = OpcUaRequester(o, statsThread, queue)
ot.start() ot.start()
logger.info(f"OpcUaRequester thread for {o['name']} started") logger.info(f"OpcUaRequester thread for {o['name']} started")
opcuaThreads.append(ot) opcuaThreads.append(ot)
@ -66,6 +72,9 @@ logger.error("opcua2mqtt bridge is dying")
publishThread.stop() publishThread.stop()
logger.error("publishThread stopped") logger.error("publishThread stopped")
statsThread.stop()
logger.error("statsThread stopped")
for ot in opcuaThreads: for ot in opcuaThreads:
ot.stop() ot.stop()
logger.error(f"opcua thread {ot.name} stopped") logger.error(f"opcua thread {ot.name} stopped")
@ -73,6 +82,9 @@ for ot in opcuaThreads:
publishThread.join() publishThread.join()
logger.error("publishThread joined") logger.error("publishThread joined")
statsThread.join()
logger.error("statsThread joined")
for ot in opcuaThreads: for ot in opcuaThreads:
ot.join() ot.join()
logger.error(f"opcua thread {ot.name} joined") logger.error(f"opcua thread {ot.name} joined")

View File

@ -6,9 +6,14 @@
"statusTopic": "opcua/bridge/status", "statusTopic": "opcua/bridge/status",
"statusThreshold": 100 "statusThreshold": 100
}, },
"stats": {
"topic": "statistics",
"period": 60
},
"opcua": [ "opcua": [
{ {
"enabled": "false", "enabled": "true",
"type": "structured",
"url": "opc.tcp://172.16.3.60:4840", "url": "opc.tcp://172.16.3.60:4840",
"name": "apl", "name": "apl",
"delay": 1.0, "delay": 1.0,
@ -21,7 +26,7 @@
] ]
}, },
{ {
"enabled": "true", "enabled": "false",
"url": "opc.tcp://192.168.254.5:4863", "url": "opc.tcp://192.168.254.5:4863",
"name": "sh", "name": "sh",
"delay": 1.0, "delay": 1.0,