rename src dir
This commit is contained in:
72
src/OpcUaRequester.py
Normal file
72
src/OpcUaRequester.py
Normal file
@ -0,0 +1,72 @@
|
||||
import asyncio
|
||||
from asyncua import Client, ua
|
||||
import threading
|
||||
from loguru import logger
|
||||
from FlatDataObject import FlatDataObject
|
||||
from StructuredDataObject import StructuredDataObject
|
||||
|
||||
class OpcUaRequester(threading.Thread):
|
||||
def __init__(self, config, stats, queue):
|
||||
super().__init__()
|
||||
|
||||
self.config = config
|
||||
self.queue = queue
|
||||
self.stats = stats
|
||||
|
||||
self.name = self.config['name']
|
||||
self.url = self.config['url']
|
||||
self.nodes = self.config['nodes']
|
||||
self.delay = self.config['delay']
|
||||
self.timeout = self.config['timeout']
|
||||
self.dataObjectType = self.config['type']
|
||||
self.flat = self.dataObjectType == 'flat'
|
||||
|
||||
# consider this flag in the localLoop
|
||||
self.killBill = False
|
||||
self.killEvent = threading.Event()
|
||||
|
||||
|
||||
async def opcUaRequesterInnerLoop(self):
|
||||
while not self.killBill:
|
||||
try:
|
||||
async with Client(url=self.url, timeout=self.timeout) as client:
|
||||
if not self.flat:
|
||||
dataObject = StructuredDataObject(self.name)
|
||||
for nodeSpec in self.nodes:
|
||||
try:
|
||||
logger.debug(f"Trying {self.name} {self.url} ns={nodeSpec['ns']};{nodeSpec['n']}")
|
||||
node = client.get_node(f"ns={nodeSpec['ns']};{nodeSpec['n']}")
|
||||
value = await node.read_value()
|
||||
displayName = nodeSpec['d'] if ('d' in nodeSpec) else (await node.read_display_name()).Text
|
||||
logger.debug(f"Got: {displayName=} = {value=}")
|
||||
if self.flat:
|
||||
self.queue.put(FlatDataObject(self.name, nodeSpec['ns'], displayName, value))
|
||||
else:
|
||||
dataObject.add(displayName, value)
|
||||
self.stats.incOpcUaRequests()
|
||||
except ua.UaError as e:
|
||||
self.stats.incOpcUaErrors()
|
||||
logger.error(f"UaError in inner OPC-UA loop: {type(e)} {e}")
|
||||
if not self.flat:
|
||||
self.queue.put(dataObject)
|
||||
await asyncio.sleep(self.delay)
|
||||
except asyncio.exceptions.TimeoutError as e:
|
||||
self.stats.incOpcUaTimeouts()
|
||||
logger.error(f"Timeout in inner OPC-UA loop")
|
||||
except Exception as e:
|
||||
self.stats.incOpcUaErrors()
|
||||
logger.error(f"Exception in inner OPC-UA loop: {type(e)} {e}")
|
||||
|
||||
def run(self):
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
loop.run_until_complete(self.opcUaRequesterInnerLoop())
|
||||
loop.close()
|
||||
|
||||
def stop(self):
|
||||
self.killBill = True
|
||||
logger.info("kill flag set")
|
||||
|
||||
self.killEvent.set()
|
||||
logger.info("kill events triggered")
|
||||
|
Reference in New Issue
Block a user