commit 3fcf6fe4862fced14ad8db82d203c6e09eb39f29 Author: Wolfgang Hottgenroth Date: Fri Nov 10 22:03:30 2017 +0100 initial diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0d20b64 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.pyc diff --git a/AbstractNextStage.py b/AbstractNextStage.py new file mode 100644 index 0000000..72e1ce8 --- /dev/null +++ b/AbstractNextStage.py @@ -0,0 +1,19 @@ +from logger import Logger + + +class AbstractNextStage(object): + def __init__(self): + self.nextStage = None + + def setNextStage(self, nextStage): + self.nextStage = nextStage + + def executeNextStage(self, data): + if self.nextStage is not None: + self.nextStage.execute(data) + else: + Logger.log("no nextStage set") + + def execute(self, data): + raise Exception("Not yet implemented") + # last step is to execute nextStage \ No newline at end of file diff --git a/AbstractParser.py b/AbstractParser.py new file mode 100644 index 0000000..24eb33b --- /dev/null +++ b/AbstractParser.py @@ -0,0 +1,16 @@ +from logger import Logger + + +class AbstractParser(object): + def __init__(self): + self.topic = None + self.nextStage = None + + def getTopic(self): + return self.topic + + def setNextStage(self, nextStage): + self.nextStage = nextStage + + def execute(self, data): + raise Exception("Not yet implemented") diff --git a/MBusParser.py b/MBusParser.py new file mode 100644 index 0000000..3ce99fc --- /dev/null +++ b/MBusParser.py @@ -0,0 +1,12 @@ +from logger import Logger +from AbstractParser import AbstractParser +from AbstractNextStage import AbstractNextStage + +class MBusParser(AbstractParser, AbstractNextStage): + def __init__(self): + super(MBusParser, self).__init__() + self.topic = "IoT/Measurement/MeterbusHub" + + def execute(self, data): + Logger.log("MBusParser %s" % (str(data))) + self.executeNextStage(data) \ No newline at end of file diff --git a/ModbusParser.py b/ModbusParser.py new file mode 100644 index 0000000..7fa5f2e --- /dev/null +++ b/ModbusParser.py @@ -0,0 +1,12 @@ +from logger import Logger +from AbstractParser import AbstractParser +from AbstractNextStage import AbstractNextStage + +class ModbusParser(AbstractParser, AbstractNextStage): + def __init__(self): + super(ModbusParser, self).__init__() + self.topic = "IoT/Measurement/ModbusHub" + + def execute(self, data): + Logger.log("ModbusParser %s" % (str(data))) + self.executeNextStage(data) \ No newline at end of file diff --git a/MqttDispatcher.py b/MqttDispatcher.py new file mode 100644 index 0000000..4cd6fc1 --- /dev/null +++ b/MqttDispatcher.py @@ -0,0 +1,67 @@ +#!/usr/bin/python + +from MqttReceiver import MqttReceiver +import os +import sys +from logger import Logger +import time + +from MBusParser import MBusParser +from ModbusParser import ModbusParser +from PersistentQueue import PersistentQueue + + +DEBUG = True +DEBUG_TO_STDOUT = True +BACKGROUND = False +PID_FILE = "/tmp/MqttDispatcher.pid" +LOG_FILE = "/tmp/MqttDispatcher.log" +MQTT_BROKER = "172.16.2.15" + + +if BACKGROUND: + pid = os.fork() +else: + pid = 0 + +if pid: + pidFile = file(PID_FILE , mode='w') + pidFile.write("%i\n" % pid) + pidFile.close() + sys.exit(0) + +Logger.openlog(LOG_FILE) + +if DEBUG: + Logger.debugEnable() + +if DEBUG_TO_STDOUT: + Logger.debugToStdoutEnable() + + +Logger.log("MqttDispatcher starting") + + + +try: + mqttReader = MqttReceiver.create(MQTT_BROKER) + mqttReader.start() + Logger.log("MqttReader started ...") + + mbusParser = MBusParser() + mqttReader.registerParser(mbusParser) + + modbusParser = ModbusParser() + mqttReader.registerParser(modbusParser) + + persistentQueue = PersistentQueue() + mbusParser.setNextStage(persistentQueue) + + Logger.log("MqttDispatcher running") + + while (True): + time.sleep(10) +except Exception, e: + Logger.log("MqttDispatcher dying from %s" % (str(e))) +finally: + Logger.log("MqttDispatcher terminating ...") diff --git a/MqttReceiver.py b/MqttReceiver.py new file mode 100644 index 0000000..2d94662 --- /dev/null +++ b/MqttReceiver.py @@ -0,0 +1,45 @@ +import threading +from logger import Logger +import paho.mqtt.client as mqtt +import json +import datetime + +_parsers = {} + + +def on_message(client, userdata, msg): + try: + j = {'payload':msg.payload, 'topic':msg.topic, 'timestamp':datetime.datetime.now()} + + Logger.debug("MqttReceiver receives: %s" % j) + + global _parsers + _parsers[msg.topic].execute(j) + except ValueError, e: + Logger.log("Exception %s in MqttReceiver, on_message" % (str(e))) + + +class MqttReceiver(threading.Thread): + singleton = None + + @classmethod + def create(cls, broker): + if cls.singleton is None: + cls.singleton = MqttReceiver(broker) + return cls.singleton + + def __init__(self, broker): + threading.Thread.__init__(self) + self.client = mqtt.Client() + self.client.on_message = on_message + self.client.connect(broker, 1883, 60) + self.setDaemon(True) + + def registerParser(self, parser): + Logger.log("registering %s for topic %s" % (parser.__class__.__name__, parser.getTopic())) + self.client.subscribe(parser.getTopic()) + global _parsers + _parsers[parser.getTopic()] = parser + + def run(self): + self.client.loop_forever() diff --git a/PersistentQueue.py b/PersistentQueue.py new file mode 100644 index 0000000..dd31321 --- /dev/null +++ b/PersistentQueue.py @@ -0,0 +1,8 @@ +from logger import Logger + +class PersistentQueue(object): + def __init__(self): + super(PersistentQueue, self).__init__() + + def execute(self, data): + Logger.log("PersistentQueue %s" % (str(data))) diff --git a/logger.py b/logger.py new file mode 100644 index 0000000..6d843af --- /dev/null +++ b/logger.py @@ -0,0 +1,36 @@ +from time import gmtime, strftime + +class Logger(object): + debugFlag = False + debugToStdoutFlag = False + + @classmethod + def log(cls, data): + t = strftime("%d %b %Y %H:%M:%S", gmtime()) + with open(cls.logfile, 'a') as f: + f.write("%s %s\n" % (t, data)) + if cls.debugFlag and cls.debugToStdoutFlag: + print data + + @classmethod + def debug(cls, data): + if cls.debugFlag: + cls.log(data) + + @classmethod + def debugEnable(cls): + cls.debugFlag = True + + @classmethod + def debugToStdoutEnable(cls): + cls.debugToStdoutFlag = True + + @classmethod + def debugDisable(cls): + cls.debugFlag = False + + @classmethod + def openlog(cls, logfile): + cls.logfile = logfile + +