import asyncio from asyncua import Client, ua import threading from loguru import logger from DataObject import DataObject class OpcUaRequester(threading.Thread): def __init__(self, config, queue): super().__init__() self.config = config self.queue = queue self.name = self.config['name'] self.url = self.config['url'] self.nodes = self.config['nodes'] self.delay = self.config['delay'] self.timeout = self.config['timeout'] # consider this flag in the localLoop self.killBill = False self.killEvent = threading.Event() async def opcUaRequesterInnerLoop(self): while not self.killBill: try: async with Client(url=self.url, timeout=self.timeout) as client: for nodeSpec in self.nodes: try: logger.debug(f"Trying {self.name} {self.url} ns={nodeSpec['ns']};{nodeSpec['n']}") node = client.get_node(f"ns={nodeSpec['ns']};{nodeSpec['n']}") value = await node.read_value() displayName = nodeSpec['d'] if ('d' in nodeSpec) else (await node.read_display_name()).Text logger.debug(f"Got: {displayName=} = {value=}") self.queue.put(DataObject(self.name, nodeSpec['ns'], displayName, value)) except ua.UaError as e: logger.error(f"UaError in inner OPC-UA loop: {type(e)} {e}") await asyncio.sleep(self.delay) except asyncio.exceptions.TimeoutError as e: logger.error(f"Timeout in inner OPC-UA loop") except Exception as e: logger.error(f"Exception in inner OPC-UA loop: {type(e)} {e}") def run(self): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(self.opcUaRequesterInnerLoop()) loop.close() def stop(self): self.killBill = True logger.info("kill flag set") self.killEvent.set() logger.info("kill events triggered")