This commit is contained in:
Wolfgang Hottgenroth 2017-11-10 22:03:30 +01:00
commit 3fcf6fe486
9 changed files with 216 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
*.pyc

19
AbstractNextStage.py Normal file
View File

@ -0,0 +1,19 @@
from logger import Logger
class AbstractNextStage(object):
def __init__(self):
self.nextStage = None
def setNextStage(self, nextStage):
self.nextStage = nextStage
def executeNextStage(self, data):
if self.nextStage is not None:
self.nextStage.execute(data)
else:
Logger.log("no nextStage set")
def execute(self, data):
raise Exception("Not yet implemented")
# last step is to execute nextStage

16
AbstractParser.py Normal file
View File

@ -0,0 +1,16 @@
from logger import Logger
class AbstractParser(object):
def __init__(self):
self.topic = None
self.nextStage = None
def getTopic(self):
return self.topic
def setNextStage(self, nextStage):
self.nextStage = nextStage
def execute(self, data):
raise Exception("Not yet implemented")

12
MBusParser.py Normal file
View File

@ -0,0 +1,12 @@
from logger import Logger
from AbstractParser import AbstractParser
from AbstractNextStage import AbstractNextStage
class MBusParser(AbstractParser, AbstractNextStage):
def __init__(self):
super(MBusParser, self).__init__()
self.topic = "IoT/Measurement/MeterbusHub"
def execute(self, data):
Logger.log("MBusParser %s" % (str(data)))
self.executeNextStage(data)

12
ModbusParser.py Normal file
View File

@ -0,0 +1,12 @@
from logger import Logger
from AbstractParser import AbstractParser
from AbstractNextStage import AbstractNextStage
class ModbusParser(AbstractParser, AbstractNextStage):
def __init__(self):
super(ModbusParser, self).__init__()
self.topic = "IoT/Measurement/ModbusHub"
def execute(self, data):
Logger.log("ModbusParser %s" % (str(data)))
self.executeNextStage(data)

67
MqttDispatcher.py Normal file
View File

@ -0,0 +1,67 @@
#!/usr/bin/python
from MqttReceiver import MqttReceiver
import os
import sys
from logger import Logger
import time
from MBusParser import MBusParser
from ModbusParser import ModbusParser
from PersistentQueue import PersistentQueue
DEBUG = True
DEBUG_TO_STDOUT = True
BACKGROUND = False
PID_FILE = "/tmp/MqttDispatcher.pid"
LOG_FILE = "/tmp/MqttDispatcher.log"
MQTT_BROKER = "172.16.2.15"
if BACKGROUND:
pid = os.fork()
else:
pid = 0
if pid:
pidFile = file(PID_FILE , mode='w')
pidFile.write("%i\n" % pid)
pidFile.close()
sys.exit(0)
Logger.openlog(LOG_FILE)
if DEBUG:
Logger.debugEnable()
if DEBUG_TO_STDOUT:
Logger.debugToStdoutEnable()
Logger.log("MqttDispatcher starting")
try:
mqttReader = MqttReceiver.create(MQTT_BROKER)
mqttReader.start()
Logger.log("MqttReader started ...")
mbusParser = MBusParser()
mqttReader.registerParser(mbusParser)
modbusParser = ModbusParser()
mqttReader.registerParser(modbusParser)
persistentQueue = PersistentQueue()
mbusParser.setNextStage(persistentQueue)
Logger.log("MqttDispatcher running")
while (True):
time.sleep(10)
except Exception, e:
Logger.log("MqttDispatcher dying from %s" % (str(e)))
finally:
Logger.log("MqttDispatcher terminating ...")

45
MqttReceiver.py Normal file
View File

@ -0,0 +1,45 @@
import threading
from logger import Logger
import paho.mqtt.client as mqtt
import json
import datetime
_parsers = {}
def on_message(client, userdata, msg):
try:
j = {'payload':msg.payload, 'topic':msg.topic, 'timestamp':datetime.datetime.now()}
Logger.debug("MqttReceiver receives: %s" % j)
global _parsers
_parsers[msg.topic].execute(j)
except ValueError, e:
Logger.log("Exception %s in MqttReceiver, on_message" % (str(e)))
class MqttReceiver(threading.Thread):
singleton = None
@classmethod
def create(cls, broker):
if cls.singleton is None:
cls.singleton = MqttReceiver(broker)
return cls.singleton
def __init__(self, broker):
threading.Thread.__init__(self)
self.client = mqtt.Client()
self.client.on_message = on_message
self.client.connect(broker, 1883, 60)
self.setDaemon(True)
def registerParser(self, parser):
Logger.log("registering %s for topic %s" % (parser.__class__.__name__, parser.getTopic()))
self.client.subscribe(parser.getTopic())
global _parsers
_parsers[parser.getTopic()] = parser
def run(self):
self.client.loop_forever()

8
PersistentQueue.py Normal file
View File

@ -0,0 +1,8 @@
from logger import Logger
class PersistentQueue(object):
def __init__(self):
super(PersistentQueue, self).__init__()
def execute(self, data):
Logger.log("PersistentQueue %s" % (str(data)))

36
logger.py Normal file
View File

@ -0,0 +1,36 @@
from time import gmtime, strftime
class Logger(object):
debugFlag = False
debugToStdoutFlag = False
@classmethod
def log(cls, data):
t = strftime("%d %b %Y %H:%M:%S", gmtime())
with open(cls.logfile, 'a') as f:
f.write("%s %s\n" % (t, data))
if cls.debugFlag and cls.debugToStdoutFlag:
print data
@classmethod
def debug(cls, data):
if cls.debugFlag:
cls.log(data)
@classmethod
def debugEnable(cls):
cls.debugFlag = True
@classmethod
def debugToStdoutEnable(cls):
cls.debugToStdoutFlag = True
@classmethod
def debugDisable(cls):
cls.debugFlag = False
@classmethod
def openlog(cls, logfile):
cls.logfile = logfile