diff --git a/src/OpcUaSubscriber.py b/src/OpcUaSubscriber.py new file mode 100644 index 0000000..fc24244 --- /dev/null +++ b/src/OpcUaSubscriber.py @@ -0,0 +1,114 @@ +import asyncio +from sqlite3 import NotSupportedError +from asyncua import Client, ua +import threading +from loguru import logger +from FlatDataObject import FlatDataObject +from StructuredDataObject import StructuredDataObject + + + +class FlatSubscriptionHandler: + def __init__(self, serverName, nodes, queue): + self.serverName = serverName + self.nodes = nodes + self.queue = queue + self.count = 0 + + def getAndResetCount(self): + tmp = self.count + self.count = 0 + return tmp + + def datachange_notification(self, node, val, data): + logger.info(f"received: {node=}, {val=}, {data=}") + self.count += 1 + match node.nodeid.NodeIdType: + case ua.NodeIdType.Numeric: + prefix = 'i' + case ua.NodeIdType.String: + prefix = 's' + case _: + prefix = 'x' + nodeName = f"{prefix}={node.nodeid.Identifier}" + namespaceIndex = node.nodeid.NamespaceIndex + displayNames = [ x['d'] for x in self.nodes if x['ns'] == namespaceIndex and x['n'] == nodeName ] + name = displayNames[0] if displayNames else nodeName + self.queue.put(FlatDataObject(self.serverName, str(namespaceIndex), name, data.monitored_item.Value)) + + +class _RenewTriggerException(Exception): pass + +class OpcUaSubscriber(threading.Thread): + def __init__(self, config, stats, queue): + super().__init__() + + self.config = config + self.queue = queue + self.stats = stats + + self.name = self.config['name'] + self.url = self.config['url'] + self.nodes = self.config['nodes'] + self.period = self.config['period'] + self.timeout = self.config['timeout'] + self.dataObjectType = self.config['type'] + self.flat = self.dataObjectType == 'flat' + + if not self.flat: + raise NotImplementedError("Only flat approach supported in OpcUaSubscriber") + + # consider this flag in the localLoop + self.killBill = False + self.killEvent = asyncio.Event() + + async def opcUaSubscriberInnerLoop(self): + while not self.killBill: + try: + async with Client(url=self.url, timeout=self.timeout) as client: + subscriptionHandler = FlatSubscriptionHandler(self.name, self.nodes, self.queue) + subscription = await client.create_subscription(self.period * 1000, subscriptionHandler) + nodes = [ client.get_node(f"ns={n['ns']};{n['n']}") for n in self.nodes ] + await subscription.subscribe_data_change(nodes) + logger.info("Subscriptions created, nodes subscribed") + + while True: + try: + await asyncio.wait_for(self.killEvent.wait(), self.period * 10) + logger.info("About to terminate opcUaSubscriber") + break + except asyncio.TimeoutError: + cnt = subscriptionHandler.getAndResetCount() + logger.info(f"receive count: {cnt}") + if cnt == 0: + raise _RenewTriggerException() + + await subscription.delete() + logger.info("Subscriptions deleted, wait a moment") + await asyncio.sleep(1) + logger.info("opcUaSubscriber terminated") + except _RenewTriggerException: + logger.error(f"too few data received, renew connection") + # continues in the loop + except asyncio.exceptions.TimeoutError as e: + self.stats.incOpcUaTimeouts() + logger.error(f"Timeout in inner OPC-UA loop") + except asyncio.exceptions.CancelledError as e: + self.stats.incOpcUaErrors() + logger.error(f"Cancelled in inner OPC-UA loop") + except Exception as e: + self.stats.incOpcUaErrors() + logger.error(f"Exception in inner OPC-UA loop: {type(e)} {e}") + + def run(self): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self.opcUaSubscriberInnerLoop()) + loop.close() + + def stop(self): + self.killBill = True + logger.info("kill flag set") + + self.killEvent.set() + logger.info("kill events triggered") diff --git a/src/bridge.py b/src/bridge.py index fd4f5e3..f212ed8 100644 --- a/src/bridge.py +++ b/src/bridge.py @@ -1,5 +1,4 @@ from MqttPublish import MqttPublish -from OpcUaRequester import OpcUaRequester from Statistics import StatisticsCollector from loguru import logger @@ -40,24 +39,43 @@ with open(args.config) as f: config = json.load(f) +match config['opcua']['type']: + case 'requester': + logger.info("Loading OpcUaRequester") + from OpcUaRequester import OpcUaRequester as OpcUa + case 'subscriber': + logger.info("Loading OpcUaSubscriber") + from OpcUaSubscriber import OpcUaSubscriber as OpcUa + case _: + raise Exception("unknown OpcUa type") + + queue = queue.Queue() -statsThread = StatisticsCollector(config, queue) -statsThread.start() -logger.info("StatisticsCollector started") +threads = [] +try: + statsThread = StatisticsCollector(config, queue) + statsThread.start() + threads.append(statsThread) + logger.info("StatisticsCollector started") -publishThread = MqttPublish(config, statsThread, queue) -publishThread.start() -logger.info("MqttPublish started") + publishThread = MqttPublish(config, statsThread, queue) + publishThread.start() + threads.append(publishThread) + logger.info("MqttPublish started") + -opcuaThreads = [] -for o in config['opcua']: - if o['enabled'] != 'true': - continue - ot = OpcUaRequester(o, statsThread, queue) - ot.start() - logger.info(f"OpcUaRequester thread for {o['name']} started") - opcuaThreads.append(ot) + opcuaThreads = [] + for o in config['opcua']["servers"]: + if o['enabled'] != 'true': + continue + ot = OpcUa(o, statsThread, queue) + ot.start() + logger.info(f"OpcUaRequester thread for {o['name']} started") + threads.append(ot) +except Exception as e: + logger.error(f"caught exception {type(e)}, {e} during start-up phase") + deathBell.set() threading.excepthook = exceptHook logger.info("Threading excepthook set") @@ -71,24 +89,12 @@ logger.info("opcua2mqtt bridge is running") deathBell.wait() logger.error("opcua2mqtt bridge is dying") -publishThread.stop() -logger.error("publishThread stopped") +for t in threads: + t.stop() + logger.error(f"thread {t.name} stopped") -statsThread.stop() -logger.error("statsThread stopped") - -for ot in opcuaThreads: - ot.stop() - logger.error(f"opcua thread {ot.name} stopped") - -publishThread.join() -logger.error("publishThread joined") - -statsThread.join() -logger.error("statsThread joined") - -for ot in opcuaThreads: - ot.join() - logger.error(f"opcua thread {ot.name} joined") +for t in threads: + t.join() + logger.error(f"thread {t.name} joined") logger.error("opcua2mqtt bridge is terminated") diff --git a/src/config.json b/src/config.json index e9d5540..ac0743e 100644 --- a/src/config.json +++ b/src/config.json @@ -8,62 +8,65 @@ "topic": "statistics", "period": 60 }, - "opcua": [ - { - "enabled": "true", - "type": "structured", - "url": "opc.tcp://172.16.3.60:4840", - "name": "apl", - "period": 1.0, - "timeout": 1.0, - "nodes": [ - { "ns": 0, "n": "i=345", "d": "pv" }, - { "ns": 0, "n": "i=348", "d": "sv" }, - { "ns": 0, "n": "i=351", "d": "tv" }, - { "ns": 0, "n": "i=354", "d": "qv" } - ] - }, - { - "enabled": "false", - "type": "flat", - "url": "opc.tcp://192.168.254.5:4863", - "name": "sh", - "period": 1.0, - "timeout": 10.0, - "nodes": [ - { "ns": 1, "n": "s=t|SERVER::A201CD124/MOT_01.AV_Out#Value", "d": "A201CD124" }, - { "ns": 1, "n": "s=t|SERVER::A201CJ003/PID_01.PV_Out#Value", "d": "A201CJ003" }, - { "ns": 1, "n": "s=t|SERVER::A201CJ004/PID_01.PV_Out#Value", "d": "A201CJ004" }, - { "ns": 1, "n": "s=t|SERVER::A201CJ011/PID_01.PV_Out#Value", "d": "A201CJ011" }, - { "ns": 1, "n": "s=t|SERVER::A201CJ014/PID_01.PV_Out#Value", "d": "A201CJ014" }, - { "ns": 1, "n": "s=t|SERVER::A201CJ021/MMON_01.PV_Out#Value", "d": "A201CJ021" }, - { "ns": 1, "n": "s=t|SERVER::A201CJ022/MMON_01.PV_Out#Value", "d": "A201CJ022" }, - { "ns": 1, "n": "s=t|SERVER::A201CJ023/MMON_01.PV_Out#Value", "d": "A201CJ023" }, - { "ns": 1, "n": "s=t|SERVER::A201CJ024/PID_01.PV_Out#Value", "d": "A201CJ024" }, - { "ns": 1, "n": "s=t|SERVER::A201CJ025/PID_01.PV_Out#Value", "d": "A201CJ025" }, - { "ns": 1, "n": "s=t|SERVER::A201CD123/MOT_01.AV_Out#Value", "d": "A201CD123" }, - { "ns": 1, "n": "s=t|SERVER::A201CD121/MOT_01.AV_Out#Value", "d": "A201CD121" }, - { "ns": 1, "n": "s=t|SERVER::A212DD110/MOT_01.AV_Out#Value", "d": "A212DD110" }, - { "ns": 1, "n": "s=t|SERVER::A212DD130/MOT_01.AV_Out#Value", "d": "A212DD130" }, - { "ns": 1, "n": "s=t|SERVER::A212DD131/MOT_01.AV_Out#Value", "d": "A212DD131" }, - { "ns": 1, "n": "s=t|SERVER::A212DD111/MOT_01.AV_Out#Value", "d": "A212DD111" }, - { "ns": 1, "n": "s=t|SERVER::A212DD113/MOT_01.AV_Out#Value", "d": "A212DD113" }, - { "ns": 1, "n": "s=t|SERVER::A212DJ004/PID_01.PV_Out#Value", "d": "A212DJ004" }, - { "ns": 1, "n": "s=t|SERVER::A212DJ021/PID_01.PV_Out#Value", "d": "A212DJ021" }, - { "ns": 1, "n": "s=t|SERVER::A212DJ001/PID_01.PV_Out#Value", "d": "A212DJ001" }, - { "ns": 1, "n": "s=t|SERVER::A212DJ011/PID_01.PV_Out#Value", "d": "A212DJ011" }, - { "ns": 1, "n": "s=t|SERVER::A212DJ032/MMON_01.PV_Out#Value", "d": "A212DJ032" }, - { "ns": 1, "n": "s=t|SERVER::A212DJ031/MMON_01.PV_Out#Value", "d": "A212DJ031" }, - { "ns": 1, "n": "s=t|SERVER::A212DJ033/MMON_01.PV_Out#Value", "d": "A212DJ033" }, - { "ns": 1, "n": "s=t|SERVER::A212DJ010/MMON_01.PV_Out#Value", "d": "A212DJ010" }, - { "ns": 1, "n": "s=t|SERVER::A212DJ042/MMON_01.PV_Out#Value", "d": "A212DJ042" }, - { "ns": 1, "n": "s=t|SERVER::A214BJ055/PID_01.PV_Out#Value", "d": "A214BJ055" }, - { "ns": 1, "n": "s=t|SERVER::A214BJ065/PID_01.PV_Out#Value", "d": "A214BJ065" }, - { "ns": 1, "n": "s=t|SERVER::A212BJ010/MMON_01.PV_Out#Value", "d": "A212BJ010" }, - { "ns": 1, "n": "s=t|SERVER::A212BJ010/MMON_02.PV_Out#Value", "d": "A212BJ010" } - ] - } - ] + "opcua": { + "type": "subscriber", + "servers": [ + { + "enabled": "true", + "type": "flat", + "url": "opc.tcp://172.16.3.60:4840", + "name": "apl", + "period": 1.0, + "timeout": 1.0, + "nodes": [ + { "ns": 0, "n": "i=345", "d": "pv" }, + { "ns": 0, "n": "i=348", "d": "sv" }, + { "ns": 0, "n": "i=351", "d": "tv" }, + { "ns": 0, "n": "i=354", "d": "qv" } + ] + }, + { + "enabled": "false", + "type": "flat", + "url": "opc.tcp://192.168.254.5:4863", + "name": "sh", + "period": 1.0, + "timeout": 10.0, + "nodes": [ + { "ns": 1, "n": "s=t|SERVER::A201CD124/MOT_01.AV_Out#Value", "d": "A201CD124" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ003/PID_01.PV_Out#Value", "d": "A201CJ003" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ004/PID_01.PV_Out#Value", "d": "A201CJ004" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ011/PID_01.PV_Out#Value", "d": "A201CJ011" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ014/PID_01.PV_Out#Value", "d": "A201CJ014" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ021/MMON_01.PV_Out#Value", "d": "A201CJ021" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ022/MMON_01.PV_Out#Value", "d": "A201CJ022" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ023/MMON_01.PV_Out#Value", "d": "A201CJ023" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ024/PID_01.PV_Out#Value", "d": "A201CJ024" }, + { "ns": 1, "n": "s=t|SERVER::A201CJ025/PID_01.PV_Out#Value", "d": "A201CJ025" }, + { "ns": 1, "n": "s=t|SERVER::A201CD123/MOT_01.AV_Out#Value", "d": "A201CD123" }, + { "ns": 1, "n": "s=t|SERVER::A201CD121/MOT_01.AV_Out#Value", "d": "A201CD121" }, + { "ns": 1, "n": "s=t|SERVER::A212DD110/MOT_01.AV_Out#Value", "d": "A212DD110" }, + { "ns": 1, "n": "s=t|SERVER::A212DD130/MOT_01.AV_Out#Value", "d": "A212DD130" }, + { "ns": 1, "n": "s=t|SERVER::A212DD131/MOT_01.AV_Out#Value", "d": "A212DD131" }, + { "ns": 1, "n": "s=t|SERVER::A212DD111/MOT_01.AV_Out#Value", "d": "A212DD111" }, + { "ns": 1, "n": "s=t|SERVER::A212DD113/MOT_01.AV_Out#Value", "d": "A212DD113" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ004/PID_01.PV_Out#Value", "d": "A212DJ004" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ021/PID_01.PV_Out#Value", "d": "A212DJ021" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ001/PID_01.PV_Out#Value", "d": "A212DJ001" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ011/PID_01.PV_Out#Value", "d": "A212DJ011" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ032/MMON_01.PV_Out#Value", "d": "A212DJ032" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ031/MMON_01.PV_Out#Value", "d": "A212DJ031" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ033/MMON_01.PV_Out#Value", "d": "A212DJ033" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ010/MMON_01.PV_Out#Value", "d": "A212DJ010" }, + { "ns": 1, "n": "s=t|SERVER::A212DJ042/MMON_01.PV_Out#Value", "d": "A212DJ042" }, + { "ns": 1, "n": "s=t|SERVER::A214BJ055/PID_01.PV_Out#Value", "d": "A214BJ055" }, + { "ns": 1, "n": "s=t|SERVER::A214BJ065/PID_01.PV_Out#Value", "d": "A214BJ065" }, + { "ns": 1, "n": "s=t|SERVER::A212BJ010/MMON_01.PV_Out#Value", "d": "A212BJ010" }, + { "ns": 1, "n": "s=t|SERVER::A212BJ010/MMON_02.PV_Out#Value", "d": "A212BJ010" } + ] + } + ] + } }