From 41c31249cff9d62ac305ea62cf19d1217d13a3f3 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Fri, 4 Feb 2022 19:46:15 +0100 Subject: [PATCH] changes --- .gitignore | 1 + opcua2mqtt/DataObject.py | 24 ++++++++++++ opcua2mqtt/MqttBase.py | 75 ++++++++++++++++++++++++++++++++++++ opcua2mqtt/MqttPublish.py | 30 +++++++++++++++ opcua2mqtt/OpcUaRequester.py | 43 +++++++++++++++++++++ opcua2mqtt/bridge.py | 60 +++++++++++++++++++++++++++++ opcua2mqtt/config.json | 22 +++++++++++ 7 files changed, 255 insertions(+) create mode 100644 .gitignore create mode 100644 opcua2mqtt/DataObject.py create mode 100644 opcua2mqtt/MqttBase.py create mode 100644 opcua2mqtt/MqttPublish.py create mode 100644 opcua2mqtt/OpcUaRequester.py create mode 100644 opcua2mqtt/bridge.py create mode 100644 opcua2mqtt/config.json diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c01bd8c --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*/__pycache__/ diff --git a/opcua2mqtt/DataObject.py b/opcua2mqtt/DataObject.py new file mode 100644 index 0000000..0653afd --- /dev/null +++ b/opcua2mqtt/DataObject.py @@ -0,0 +1,24 @@ +import re + + +class InvalidDataObjectException(Exception): + def __init__(self, message): + super().__init__(message) + +class DataObject(object): + invalidChars = re.compile("[#+/\s]") + + def __init__(self, serverName, nameSpaceIndex, variableName, value): + self.serverName = serverName + self.nameSpaceIndex = nameSpaceIndex + self.variableName = variableName + self.value = value + + def isValid(self): + return (not (DataObject.invalidChars.search(self.serverName) or DataObject.invalidChars.search(self.variableName))) and (type(self.nameSpaceIndex) == int) + + def getTopicPost(self): + return self.serverName + '/' + str(self.nameSpaceIndex) + '/' + self.variableName + + def getPayload(self): + return self.value \ No newline at end of file diff --git a/opcua2mqtt/MqttBase.py b/opcua2mqtt/MqttBase.py new file mode 100644 index 0000000..6033846 --- /dev/null +++ b/opcua2mqtt/MqttBase.py @@ -0,0 +1,75 @@ +import paho.mqtt.client as mqtt +from loguru import logger +import threading +import ssl + + + +def mqttOnConnectCallback(client, userdata, flags, rc): + userdata.onConnect() + +def mqttOnMessageCallback(client, userdata, message): + userdata.onMessage(message.topic, message.payload) + +def mqttOnDisconnectCallback(client, userdata, rc): + userdata.onDisconnect(rc) + + +class AbstractMqttPublisher(threading.Thread): + def __init__(self, config): + super().__init__() + + self.config = config["mqtt"] + + self.client = mqtt.Client(userdata=self) + + # consider this flag in the localLoop + self.killBill = False + self.killEvent = threading.Event() + + def run(self): + self.client.on_message = mqttOnMessageCallback + self.client.on_connect = mqttOnConnectCallback + self.client.on_disconnect = mqttOnDisconnectCallback + + if ("login" in self.config) and ("password" in self.config): + self.client.username_pw_set(self.config["login"], self.config["password"]) + + if ("ca" in self.config) and ("cert" in self.config) and ("key" in self.config): + self.client.tls_set( + ca_certs=self.config["ca"], + certfile=self.config["cert"], + keyfile=self.config["key"], + cert_reqs=ssl.CERT_REQUIRED, + tls_version=ssl.PROTOCOL_TLSv1_2, + ciphers=None # this does not mean "no cipher" but it means "default ciphers" + ) + + self.client.connect(self.config["broker"], int(self.config["port"])) + self.client.loop_start() + logger.info("mqtt loop started") + + self.localLoop() + + def localLoop(self): + raise NotImplementedError() + + def stop(self): + self.client.loop_stop() + logger.info("mqtt loop stopped") + + self.killBill = True + logger.info("kill flag set") + + self.killEvent.set() + logger.info("kill events triggered") + + def onConnect(self): + logger.info("mqtt connected") + + def onDisconnect(self, rc): + logger.warning("mqtt disconnect, rc: {}".format(rc)) + + def onMessage(self, topic, payload): + logger.warning("mqtt unexpected message received: {} -> {}".format(topic, str(payload))) + diff --git a/opcua2mqtt/MqttPublish.py b/opcua2mqtt/MqttPublish.py new file mode 100644 index 0000000..cbb2f33 --- /dev/null +++ b/opcua2mqtt/MqttPublish.py @@ -0,0 +1,30 @@ +from threading import Event +from loguru import logger +from MqttBase import AbstractMqttPublisher +from queue import Empty + + +LOOP_SLICE = 0.1 # seconds + +class MqttPublish(AbstractMqttPublisher): + def __init__(self, config, queue): + super().__init__(config) + self.queue = queue + self.topicPre = self.config["publishTopicPrefix"] + + def localLoop(self): + while not self.killBill: + try: + dataObject = self.queue.get(timeout=LOOP_SLICE) + if not dataObject.isValid(): + logger.error("invalid dataObject received: drop it") + else: + topic = self.topicPre + '/' + dataObject.getTopicPost() + payload = dataObject.getPayload() + self.client.publish(topic, payload) + logger.info("mqtt message sent: {} -> {}".format(topic, payload)) + except Empty: + if self.killBill: + logger.error("killbill received") + break + diff --git a/opcua2mqtt/OpcUaRequester.py b/opcua2mqtt/OpcUaRequester.py new file mode 100644 index 0000000..703d50b --- /dev/null +++ b/opcua2mqtt/OpcUaRequester.py @@ -0,0 +1,43 @@ +import asyncio +from asyncua import Client +import threading +from loguru import logger +from DataObject import DataObject + + +class OpcUaRequester(threading.Thread): + def __init__(self, config, name, queue): + super().__init__() + + self.config = config["opcua"][name] + self.queue = queue + + # consider this flag in the localLoop + self.killBill = False + self.killEvent = threading.Event() + + + async def opcUaRequesterInnerLoop(self): + while True: + async with Client(url=URL, timeout=10.0) as client: + for nodeSpec in NODE_SPECS: + node = client.get_node(f"ns={nodeSpec['ns']};{nodeSpec['n']}") + value = await node.read_value() + displayName = nodeSpec['d'] if ('d' in nodeSpec) else (await node.read_display_name()).Text + print(f"{displayName=} = {value=}") + self.queue.put(DataObject(SERVER, nodeSpec['ns'], displayName, value)) + await asyncio.sleep(DELAY) + + def run(self): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self.opcUaRequesterInnerLoop()) + loop.close() + + def stop(self): + self.killBill = True + logger.info("kill flag set") + + self.killEvent.set() + logger.info("kill events triggered") + diff --git a/opcua2mqtt/bridge.py b/opcua2mqtt/bridge.py new file mode 100644 index 0000000..87854fc --- /dev/null +++ b/opcua2mqtt/bridge.py @@ -0,0 +1,60 @@ +from MqttPublish import MqttPublish +from OpcUaRequester import OpcUaRequester +from loguru import logger +import argparse +import configparser +import threading +import queue + +deathBell = threading.Event() + +def exceptHook(args): + global deathBell + logger.error("Exception in thread caught: {}".format(args)) + deathBell.set() + logger.error("rang the death bell") + + +logger.info("opcua2mqtt bridge starting") + +parser = argparse.ArgumentParser(description="example1") +parser.add_argument('--config', '-f', + help='Config file, default is $pwd/config.json', + required=False, + default='./config.json') +args = parser.parse_args() + +config = configparser.ConfigParser() +config.read(args.config) + + +queue = queue.Queue() + +publishThread = MqttPublish(config, queue) +publishThread.start() +logger.info("MqttPublish started") + +opcuaThread = OpcUaRequester(config, queue) +opcuaThread.start() +logger.info("OpcUaRequest started") + +threading.excepthook = exceptHook +logger.info("Threading excepthook set") + +logger.info("opcua2mqtt bridge is running") + + +deathBell.wait() +logger.error("opcua2mqtt bridge is dying") + +publishThread.stop() +logger.error("publishThread stopped") +opcuaThread.stop() +logger.error("opcuaThread stopped") + +publishThread.join() +logger.error("publishThread joined") +opcuaThread.join() +logger.error("opcuaThread joined") + +logger.error("opcua2mqtt bridge is terminated") diff --git a/opcua2mqtt/config.json b/opcua2mqtt/config.json new file mode 100644 index 0000000..967d4dd --- /dev/null +++ b/opcua2mqtt/config.json @@ -0,0 +1,22 @@ +{ + "mqtt": { + "broker": "172.16.2.16", + "port": 1883, + "publishTopicPrefix": "opcua" + }, + "opcua": [ + { + "nodes": [ + { "ns": 0, "n": "i=345", "d": "pv" }, + { "ns": 0, "n": "i=348", "d": "sv" }, + { "ns": 0, "n": "i=351", "d": "tv" }, + { "ns": 0, "n": "i=354", "d": "qv" } + ], + "url": "opc.tcp://172.16.3.60:4840", + "name": "apl", + "delay": 1.0 + } + ] +} + +