opcua-with-python/opcua2mqtt/OpcUaRequester.py
2022-02-05 11:08:49 +01:00

59 lines
2.1 KiB
Python

import asyncio
from asyncua import Client, ua
import threading
from loguru import logger
from DataObject import DataObject
class OpcUaRequester(threading.Thread):
def __init__(self, config, queue):
super().__init__()
self.config = config
self.queue = queue
self.name = self.config['name']
self.url = self.config['url']
self.nodes = self.config['nodes']
self.delay = self.config['delay']
self.timeout = self.config['timeout']
# consider this flag in the localLoop
self.killBill = False
self.killEvent = threading.Event()
async def opcUaRequesterInnerLoop(self):
while not self.killBill:
try:
async with Client(url=self.url, timeout=self.timeout) as client:
for nodeSpec in self.nodes:
try:
logger.debug(f"Trying {self.name} {self.url} ns={nodeSpec['ns']};{nodeSpec['n']}")
node = client.get_node(f"ns={nodeSpec['ns']};{nodeSpec['n']}")
value = await node.read_value()
displayName = nodeSpec['d'] if ('d' in nodeSpec) else (await node.read_display_name()).Text
logger.debug(f"Got: {displayName=} = {value=}")
self.queue.put(DataObject(self.name, nodeSpec['ns'], displayName, value))
except ua.UaError as e:
logger.error(f"UaError in inner OPC-UA loop: {type(e)} {e}")
await asyncio.sleep(self.delay)
except asyncio.exceptions.TimeoutError as e:
logger.error(f"Timeout in inner OPC-UA loop")
except Exception as e:
logger.error(f"Exception in inner OPC-UA loop: {type(e)} {e}")
def run(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.opcUaRequesterInnerLoop())
loop.close()
def stop(self):
self.killBill = True
logger.info("kill flag set")
self.killEvent.set()
logger.info("kill events triggered")