import asyncio from sqlite3 import NotSupportedError from asyncua import Client, ua import threading from loguru import logger from FlatDataObject import FlatDataObject from StructuredDataObject import StructuredDataObject class FlatSubscriptionHandler: def __init__(self, serverName, nodes, queue): self.serverName = serverName self.nodes = nodes self.queue = queue self.count = 0 def getAndResetCount(self): tmp = self.count self.count = 0 return tmp def datachange_notification(self, node, val, data): logger.info(f"received: {node=}, {val=}, {data=}") self.count += 1 match node.nodeid.NodeIdType: case ua.NodeIdType.Numeric: prefix = 'i' case ua.NodeIdType.String: prefix = 's' case _: prefix = 'x' nodeName = f"{prefix}={node.nodeid.Identifier}" namespaceIndex = node.nodeid.NamespaceIndex displayNames = [ x['d'] for x in self.nodes if x['ns'] == namespaceIndex and x['n'] == nodeName ] name = displayNames[0] if displayNames else nodeName self.queue.put(FlatDataObject(self.serverName, str(namespaceIndex), name, data.monitored_item.Value)) class _RenewTriggerException(Exception): pass class OpcUaSubscriber(threading.Thread): def __init__(self, config, stats, queue): super().__init__() self.config = config self.queue = queue self.stats = stats self.name = self.config['name'] self.url = self.config['url'] self.nodes = self.config['nodes'] self.period = self.config['period'] self.timeout = self.config['timeout'] self.dataObjectType = self.config['type'] self.flat = self.dataObjectType == 'flat' if not self.flat: raise NotImplementedError("Only flat approach supported in OpcUaSubscriber") # consider this flag in the localLoop self.killBill = False self.killEvent = asyncio.Event() async def opcUaSubscriberInnerLoop(self): while not self.killBill: try: async with Client(url=self.url, timeout=self.timeout) as client: subscriptionHandler = FlatSubscriptionHandler(self.name, self.nodes, self.queue) subscription = await client.create_subscription(self.period * 1000, subscriptionHandler) nodes = [ client.get_node(f"ns={n['ns']};{n['n']}") for n in self.nodes ] await subscription.subscribe_data_change(nodes) logger.info("Subscriptions created, nodes subscribed") while True: try: await asyncio.wait_for(self.killEvent.wait(), self.period * 10) logger.info("About to terminate opcUaSubscriber") break except asyncio.TimeoutError: cnt = subscriptionHandler.getAndResetCount() logger.info(f"receive count: {cnt}") if cnt == 0: raise _RenewTriggerException() await subscription.delete() logger.info("Subscriptions deleted, wait a moment") await asyncio.sleep(1) logger.info("opcUaSubscriber terminated") except _RenewTriggerException: logger.error(f"too few data received, renew connection") # continues in the loop except asyncio.exceptions.TimeoutError as e: self.stats.incOpcUaTimeouts() logger.error(f"Timeout in inner OPC-UA loop") except asyncio.exceptions.CancelledError as e: self.stats.incOpcUaErrors() logger.error(f"Cancelled in inner OPC-UA loop") except Exception as e: self.stats.incOpcUaErrors() logger.error(f"Exception in inner OPC-UA loop: {type(e)} {e}") def run(self): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(self.opcUaSubscriberInnerLoop()) loop.close() def stop(self): self.killBill = True logger.info("kill flag set") self.killEvent.set() logger.info("kill events triggered")