diff --git a/opcua2mqtt/AbstractDataObject.py b/opcua2mqtt/AbstractDataObject.py new file mode 100644 index 0000000..a58cc4d --- /dev/null +++ b/opcua2mqtt/AbstractDataObject.py @@ -0,0 +1,20 @@ +import re +import json + +class InvalidDataObjectException(Exception): + def __init__(self, message): + super().__init__(message) + +class AbstractDataObject(object): + invalidChars = re.compile("[#+\s]") + + def __init__(self, topicPart): + self.topicPart = topicPart + + def getTopicPart(self): + if AbstractDataObject.invalidChars.search(self.topicPart): + raise InvalidDataObjectException(f"Topic contains invalid characters: {self.topicPart}") + return self.topicPart + + def getPayload(self): + raise NotImplementedError() \ No newline at end of file diff --git a/opcua2mqtt/DataObject.py b/opcua2mqtt/FlatDataObject.py similarity index 51% rename from opcua2mqtt/DataObject.py rename to opcua2mqtt/FlatDataObject.py index b7628f2..ff5096c 100644 --- a/opcua2mqtt/DataObject.py +++ b/opcua2mqtt/FlatDataObject.py @@ -1,25 +1,17 @@ import re import json +from AbstractDataObject import AbstractDataObject -class InvalidDataObjectException(Exception): - def __init__(self, message): - super().__init__(message) -class DataObject(object): - invalidChars = re.compile("[#+/\s]") +class FlatDataObject(AbstractDataObject): def __init__(self, serverName, nameSpaceIndex, variableName, value): + super().__init__(serverName + '/' + str(nameSpaceIndex) + '/' + variableName) self.serverName = serverName self.nameSpaceIndex = nameSpaceIndex self.variableName = variableName self.value = value - def isValid(self): - return (not (DataObject.invalidChars.search(self.serverName) or DataObject.invalidChars.search(self.variableName))) and (type(self.nameSpaceIndex) == int) - - def getTopicPost(self): - return self.serverName + '/' + str(self.nameSpaceIndex) + '/' + self.variableName - def getPayload(self): payload = { "serverName": self.serverName, diff --git a/opcua2mqtt/MqttPublish.py b/opcua2mqtt/MqttPublish.py index 746383e..bdb8857 100644 --- a/opcua2mqtt/MqttPublish.py +++ b/opcua2mqtt/MqttPublish.py @@ -1,42 +1,37 @@ from threading import Event from loguru import logger from MqttBase import AbstractMqttPublisher +from AbstractDataObject import InvalidDataObjectException from queue import Empty import json -import datetime LOOP_SLICE = 0.1 # seconds class MqttPublish(AbstractMqttPublisher): - def __init__(self, config, queue): + def __init__(self, config, stats, queue): super().__init__(config) self.queue = queue + self.stats = stats self.topicPre = self.config["publishTopicPrefix"] self.statusTopic = self.config["statusTopic"] self.statusThreshold = self.config["statusThreshold"] def localLoop(self): - cnt = 0 - startTime = datetime.datetime.now() while not self.killBill: try: dataObject = self.queue.get(timeout=LOOP_SLICE) - if not dataObject.isValid(): - logger.error("invalid dataObject received: drop it") - else: - topic = self.topicPre + '/' + dataObject.getTopicPost() - payload = dataObject.getPayload() - self.client.publish(topic, payload) - cnt += 1 - logger.debug("mqtt message sent: {} -> {}".format(topic, payload)) - if cnt % 100 == 1: - currentTime = datetime.datetime.now() - uptime = int((currentTime - startTime).total_seconds()) - payload = { "count": cnt, "uptime": uptime } - self.client.publish(self.statusTopic, json.dumps(payload), retain=True) + topic = self.topicPre + '/' + dataObject.getTopicPart() + payload = dataObject.getPayload() + self.client.publish(topic, payload) + logger.debug("mqtt message sent: {} -> {}".format(topic, payload)) + self.stats.incMqttRequests() except Empty: # just evaluate the killBill at the top of the loop again pass + except InvalidDataObjectException as e: + self.stats.incMqttErrors() + logger.error(f"InvalidDataObjectException received in MQTT local loop: {e}") except Exception as e: + self.stats.incMqttErrors() logger.error(f"Exception {type(e)} received in MQTT local loop: {e}") diff --git a/opcua2mqtt/OpcUaRequester.py b/opcua2mqtt/OpcUaRequester.py index 49dd62f..a22d7d4 100644 --- a/opcua2mqtt/OpcUaRequester.py +++ b/opcua2mqtt/OpcUaRequester.py @@ -2,21 +2,24 @@ import asyncio from asyncua import Client, ua import threading from loguru import logger -from DataObject import DataObject - +from FlatDataObject import FlatDataObject +from StructuredDataObject import StructuredDataObject class OpcUaRequester(threading.Thread): - def __init__(self, config, queue): + def __init__(self, config, stats, queue): super().__init__() self.config = config self.queue = queue + self.stats = stats self.name = self.config['name'] self.url = self.config['url'] self.nodes = self.config['nodes'] self.delay = self.config['delay'] self.timeout = self.config['timeout'] + self.dataObjectType = self.config['type'] + self.flat = self.dataObjectType == 'flat' # consider this flag in the localLoop self.killBill = False @@ -27,6 +30,8 @@ class OpcUaRequester(threading.Thread): while not self.killBill: try: async with Client(url=self.url, timeout=self.timeout) as client: + if not self.flat: + dataObject = StructuredDataObject(self.name) for nodeSpec in self.nodes: try: logger.debug(f"Trying {self.name} {self.url} ns={nodeSpec['ns']};{nodeSpec['n']}") @@ -34,13 +39,22 @@ class OpcUaRequester(threading.Thread): value = await node.read_value() displayName = nodeSpec['d'] if ('d' in nodeSpec) else (await node.read_display_name()).Text logger.debug(f"Got: {displayName=} = {value=}") - self.queue.put(DataObject(self.name, nodeSpec['ns'], displayName, value)) + if self.flat: + self.queue.put(FlatDataObject(self.name, nodeSpec['ns'], displayName, value)) + else: + dataObject.add(displayName, value) + self.stats.incOpcUaRequests() except ua.UaError as e: + self.stats.incOpcUaErrors() logger.error(f"UaError in inner OPC-UA loop: {type(e)} {e}") + if not self.flat: + self.queue.put(dataObject) await asyncio.sleep(self.delay) except asyncio.exceptions.TimeoutError as e: + self.stats.incOpcUaTimeouts() logger.error(f"Timeout in inner OPC-UA loop") except Exception as e: + self.stats.incOpcUaErrors() logger.error(f"Exception in inner OPC-UA loop: {type(e)} {e}") def run(self): diff --git a/opcua2mqtt/Statistics.py b/opcua2mqtt/Statistics.py new file mode 100644 index 0000000..8bd6244 --- /dev/null +++ b/opcua2mqtt/Statistics.py @@ -0,0 +1,66 @@ +from loguru import logger +import threading +from AbstractDataObject import AbstractDataObject +import json +import datetime + +class StatisticsDataObject(AbstractDataObject): + def __init__(self, topic, payload): + super().__init__(topic) + self.payload = payload + + def getPayload(self): + return json.dumps(self.payload) + + +class StatisticsCollector(threading.Thread): + def __init__(self, config, queue): + super().__init__() + + self.config = config['stats'] + self.period = self.config['period'] + self.topic = self.config['topic'] + self.queue = queue + + self.killBill = False + self.killEvent = threading.Event() + + self.stats = { + 'opcUaRequests': 0, + 'opcUaErrors' : 0, + 'opcUaTimeouts': 0, + 'mqttRequests': 0, + 'mqttErrors': 0, + 'uptime': 0 + } + + def incOpcUaRequests(self): + self.stats['opcUaRequests'] += 1 + + def incOpcUaErrors(self): + self.stats['opcUaErrors'] += 1 + + def incOpcUaTimeouts(self): + self.stats['opcUaTimeouts'] += 1 + + def incMqttRequests(self): + self.stats['mqttRequests'] += 1 + + def incMqttErrors(self): + self.stats['mqttErrors'] += 1 + + def stop(self): + self.killBill = True + logger.info("kill flag set") + + self.killEvent.set() + logger.info("kill events triggered") + + def run(self): + startTime = datetime.datetime.now() + while not self.killBill: + currentTime = datetime.datetime.now() + self.stats['uptime'] = int((currentTime - startTime).total_seconds()) + self.queue.put(StatisticsDataObject(self.topic, self.stats)) + self.killEvent.wait(timeout=float(self.period)) + diff --git a/opcua2mqtt/StructuredDataObject.py b/opcua2mqtt/StructuredDataObject.py new file mode 100644 index 0000000..2029e06 --- /dev/null +++ b/opcua2mqtt/StructuredDataObject.py @@ -0,0 +1,16 @@ +import re +import json +from AbstractDataObject import AbstractDataObject + + +class StructuredDataObject(AbstractDataObject): + + def __init__(self, topicPart): + super().__init__(topicPart) + self.keyValuePairs = [] + + def add(self, key, value): + self.keyValuePairs.append({key: value}) + + def getPayload(self): + return json.dumps(self.keyValuePairs) \ No newline at end of file diff --git a/opcua2mqtt/bridge.py b/opcua2mqtt/bridge.py index b8a7449..3990aba 100644 --- a/opcua2mqtt/bridge.py +++ b/opcua2mqtt/bridge.py @@ -1,5 +1,7 @@ from MqttPublish import MqttPublish from OpcUaRequester import OpcUaRequester +from Statistics import StatisticsCollector + from loguru import logger import argparse import json @@ -38,7 +40,11 @@ with open(args.config) as f: queue = queue.Queue() -publishThread = MqttPublish(config, queue) +statsThread = StatisticsCollector(config, queue) +statsThread.start() +logger.info("StatisticsCollector started") + +publishThread = MqttPublish(config, statsThread, queue) publishThread.start() logger.info("MqttPublish started") @@ -46,7 +52,7 @@ opcuaThreads = [] for o in config['opcua']: if o['enabled'] != 'true': continue - ot = OpcUaRequester(o, queue) + ot = OpcUaRequester(o, statsThread, queue) ot.start() logger.info(f"OpcUaRequester thread for {o['name']} started") opcuaThreads.append(ot) @@ -66,6 +72,9 @@ logger.error("opcua2mqtt bridge is dying") publishThread.stop() logger.error("publishThread stopped") +statsThread.stop() +logger.error("statsThread stopped") + for ot in opcuaThreads: ot.stop() logger.error(f"opcua thread {ot.name} stopped") @@ -73,6 +82,9 @@ for ot in opcuaThreads: publishThread.join() logger.error("publishThread joined") +statsThread.join() +logger.error("statsThread joined") + for ot in opcuaThreads: ot.join() logger.error(f"opcua thread {ot.name} joined") diff --git a/opcua2mqtt/config.json b/opcua2mqtt/config.json index 23d75d0..2761308 100644 --- a/opcua2mqtt/config.json +++ b/opcua2mqtt/config.json @@ -6,9 +6,14 @@ "statusTopic": "opcua/bridge/status", "statusThreshold": 100 }, + "stats": { + "topic": "statistics", + "period": 60 + }, "opcua": [ { - "enabled": "false", + "enabled": "true", + "type": "structured", "url": "opc.tcp://172.16.3.60:4840", "name": "apl", "delay": 1.0, @@ -21,7 +26,7 @@ ] }, { - "enabled": "true", + "enabled": "false", "url": "opc.tcp://192.168.254.5:4863", "name": "sh", "delay": 1.0,