This commit is contained in:
Wolfgang Hottgenroth 2022-02-04 19:46:15 +01:00
parent 0dbc61bafd
commit 41c31249cf
7 changed files with 255 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
*/__pycache__/

24
opcua2mqtt/DataObject.py Normal file
View File

@ -0,0 +1,24 @@
import re
class InvalidDataObjectException(Exception):
def __init__(self, message):
super().__init__(message)
class DataObject(object):
invalidChars = re.compile("[#+/\s]")
def __init__(self, serverName, nameSpaceIndex, variableName, value):
self.serverName = serverName
self.nameSpaceIndex = nameSpaceIndex
self.variableName = variableName
self.value = value
def isValid(self):
return (not (DataObject.invalidChars.search(self.serverName) or DataObject.invalidChars.search(self.variableName))) and (type(self.nameSpaceIndex) == int)
def getTopicPost(self):
return self.serverName + '/' + str(self.nameSpaceIndex) + '/' + self.variableName
def getPayload(self):
return self.value

75
opcua2mqtt/MqttBase.py Normal file
View File

@ -0,0 +1,75 @@
import paho.mqtt.client as mqtt
from loguru import logger
import threading
import ssl
def mqttOnConnectCallback(client, userdata, flags, rc):
userdata.onConnect()
def mqttOnMessageCallback(client, userdata, message):
userdata.onMessage(message.topic, message.payload)
def mqttOnDisconnectCallback(client, userdata, rc):
userdata.onDisconnect(rc)
class AbstractMqttPublisher(threading.Thread):
def __init__(self, config):
super().__init__()
self.config = config["mqtt"]
self.client = mqtt.Client(userdata=self)
# consider this flag in the localLoop
self.killBill = False
self.killEvent = threading.Event()
def run(self):
self.client.on_message = mqttOnMessageCallback
self.client.on_connect = mqttOnConnectCallback
self.client.on_disconnect = mqttOnDisconnectCallback
if ("login" in self.config) and ("password" in self.config):
self.client.username_pw_set(self.config["login"], self.config["password"])
if ("ca" in self.config) and ("cert" in self.config) and ("key" in self.config):
self.client.tls_set(
ca_certs=self.config["ca"],
certfile=self.config["cert"],
keyfile=self.config["key"],
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLSv1_2,
ciphers=None # this does not mean "no cipher" but it means "default ciphers"
)
self.client.connect(self.config["broker"], int(self.config["port"]))
self.client.loop_start()
logger.info("mqtt loop started")
self.localLoop()
def localLoop(self):
raise NotImplementedError()
def stop(self):
self.client.loop_stop()
logger.info("mqtt loop stopped")
self.killBill = True
logger.info("kill flag set")
self.killEvent.set()
logger.info("kill events triggered")
def onConnect(self):
logger.info("mqtt connected")
def onDisconnect(self, rc):
logger.warning("mqtt disconnect, rc: {}".format(rc))
def onMessage(self, topic, payload):
logger.warning("mqtt unexpected message received: {} -> {}".format(topic, str(payload)))

30
opcua2mqtt/MqttPublish.py Normal file
View File

@ -0,0 +1,30 @@
from threading import Event
from loguru import logger
from MqttBase import AbstractMqttPublisher
from queue import Empty
LOOP_SLICE = 0.1 # seconds
class MqttPublish(AbstractMqttPublisher):
def __init__(self, config, queue):
super().__init__(config)
self.queue = queue
self.topicPre = self.config["publishTopicPrefix"]
def localLoop(self):
while not self.killBill:
try:
dataObject = self.queue.get(timeout=LOOP_SLICE)
if not dataObject.isValid():
logger.error("invalid dataObject received: drop it")
else:
topic = self.topicPre + '/' + dataObject.getTopicPost()
payload = dataObject.getPayload()
self.client.publish(topic, payload)
logger.info("mqtt message sent: {} -> {}".format(topic, payload))
except Empty:
if self.killBill:
logger.error("killbill received")
break

View File

@ -0,0 +1,43 @@
import asyncio
from asyncua import Client
import threading
from loguru import logger
from DataObject import DataObject
class OpcUaRequester(threading.Thread):
def __init__(self, config, name, queue):
super().__init__()
self.config = config["opcua"][name]
self.queue = queue
# consider this flag in the localLoop
self.killBill = False
self.killEvent = threading.Event()
async def opcUaRequesterInnerLoop(self):
while True:
async with Client(url=URL, timeout=10.0) as client:
for nodeSpec in NODE_SPECS:
node = client.get_node(f"ns={nodeSpec['ns']};{nodeSpec['n']}")
value = await node.read_value()
displayName = nodeSpec['d'] if ('d' in nodeSpec) else (await node.read_display_name()).Text
print(f"{displayName=} = {value=}")
self.queue.put(DataObject(SERVER, nodeSpec['ns'], displayName, value))
await asyncio.sleep(DELAY)
def run(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.opcUaRequesterInnerLoop())
loop.close()
def stop(self):
self.killBill = True
logger.info("kill flag set")
self.killEvent.set()
logger.info("kill events triggered")

60
opcua2mqtt/bridge.py Normal file
View File

@ -0,0 +1,60 @@
from MqttPublish import MqttPublish
from OpcUaRequester import OpcUaRequester
from loguru import logger
import argparse
import configparser
import threading
import queue
deathBell = threading.Event()
def exceptHook(args):
global deathBell
logger.error("Exception in thread caught: {}".format(args))
deathBell.set()
logger.error("rang the death bell")
logger.info("opcua2mqtt bridge starting")
parser = argparse.ArgumentParser(description="example1")
parser.add_argument('--config', '-f',
help='Config file, default is $pwd/config.json',
required=False,
default='./config.json')
args = parser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
queue = queue.Queue()
publishThread = MqttPublish(config, queue)
publishThread.start()
logger.info("MqttPublish started")
opcuaThread = OpcUaRequester(config, queue)
opcuaThread.start()
logger.info("OpcUaRequest started")
threading.excepthook = exceptHook
logger.info("Threading excepthook set")
logger.info("opcua2mqtt bridge is running")
deathBell.wait()
logger.error("opcua2mqtt bridge is dying")
publishThread.stop()
logger.error("publishThread stopped")
opcuaThread.stop()
logger.error("opcuaThread stopped")
publishThread.join()
logger.error("publishThread joined")
opcuaThread.join()
logger.error("opcuaThread joined")
logger.error("opcua2mqtt bridge is terminated")

22
opcua2mqtt/config.json Normal file
View File

@ -0,0 +1,22 @@
{
"mqtt": {
"broker": "172.16.2.16",
"port": 1883,
"publishTopicPrefix": "opcua"
},
"opcua": [
{
"nodes": [
{ "ns": 0, "n": "i=345", "d": "pv" },
{ "ns": 0, "n": "i=348", "d": "sv" },
{ "ns": 0, "n": "i=351", "d": "tv" },
{ "ns": 0, "n": "i=354", "d": "qv" }
],
"url": "opc.tcp://172.16.3.60:4840",
"name": "apl",
"delay": 1.0
}
]
}