diff --git a/DatabaseEngine.py b/DatabaseEngine.py new file mode 100644 index 0000000..24d8c3c --- /dev/null +++ b/DatabaseEngine.py @@ -0,0 +1,39 @@ +import threading +from logger import Logger + +class DatabaseEngine(threading.Thread): + singleton = None + + @classmethod + def create(cls, queue, period): + if cls.singleton is None: + cls.singleton = DatabaseEngine(queue, period) + return cls.singleton + + def __init__(self, queue, period): + threading.Thread.__init__(self) + self.event = threading.Event() + self.period = period + self.queue = queue + self.startTimer() + self.setDaemon(True) + + def triggerStoring(self): + self.event.set() + + def startTimer(self): + self.timer = threading.Timer(self.period, self.event.set) + self.timer.start() + + def run(self): + while True: + self.event.wait() + self.event.clear() + + while not self.queue.empty(): + data = self.queue.get() + Logger.log("DatabaseEngine receives: %s" % (str(data))) + else: + Logger.log("DatabaseEngine: no more data to handle") + + self.startTimer() \ No newline at end of file diff --git a/MBusParser.py b/MBusParser.py index 3ce99fc..a35546b 100644 --- a/MBusParser.py +++ b/MBusParser.py @@ -8,5 +8,5 @@ class MBusParser(AbstractParser, AbstractNextStage): self.topic = "IoT/Measurement/MeterbusHub" def execute(self, data): - Logger.log("MBusParser %s" % (str(data))) + #Logger.log("MBusParser %s" % (str(data))) self.executeNextStage(data) \ No newline at end of file diff --git a/ModbusParser.py b/ModbusParser.py index 7fa5f2e..817d70d 100644 --- a/ModbusParser.py +++ b/ModbusParser.py @@ -8,5 +8,5 @@ class ModbusParser(AbstractParser, AbstractNextStage): self.topic = "IoT/Measurement/ModbusHub" def execute(self, data): - Logger.log("ModbusParser %s" % (str(data))) + #Logger.log("ModbusParser %s" % (str(data))) self.executeNextStage(data) \ No newline at end of file diff --git a/MqttDispatcher.py b/MqttDispatcher.py index 4cd6fc1..01711b7 100644 --- a/MqttDispatcher.py +++ b/MqttDispatcher.py @@ -9,6 +9,7 @@ import time from MBusParser import MBusParser from ModbusParser import ModbusParser from PersistentQueue import PersistentQueue +from DatabaseEngine import DatabaseEngine DEBUG = True @@ -50,12 +51,20 @@ try: mbusParser = MBusParser() mqttReader.registerParser(mbusParser) + Logger.log("MBusParser started ...") modbusParser = ModbusParser() mqttReader.registerParser(modbusParser) + Logger.log("ModbusParser started ...") persistentQueue = PersistentQueue() mbusParser.setNextStage(persistentQueue) + modbusParser.setNextStage(persistentQueue) + Logger.log("PersistentQueue instantiated ...") + + databaseEngine = DatabaseEngine(persistentQueue, 60.0) + databaseEngine.start() + Logger.log("DatabaseEngine started ...") Logger.log("MqttDispatcher running") diff --git a/MqttReceiver.py b/MqttReceiver.py index 2d94662..273043e 100644 --- a/MqttReceiver.py +++ b/MqttReceiver.py @@ -11,7 +11,7 @@ def on_message(client, userdata, msg): try: j = {'payload':msg.payload, 'topic':msg.topic, 'timestamp':datetime.datetime.now()} - Logger.debug("MqttReceiver receives: %s" % j) + # Logger.debug("MqttReceiver receives: %s" % j) global _parsers _parsers[msg.topic].execute(j) diff --git a/PersistentQueue.py b/PersistentQueue.py index dd31321..28ce733 100644 --- a/PersistentQueue.py +++ b/PersistentQueue.py @@ -1,8 +1,17 @@ from logger import Logger +import Queue class PersistentQueue(object): def __init__(self): super(PersistentQueue, self).__init__() + self.queue = Queue.Queue() def execute(self, data): - Logger.log("PersistentQueue %s" % (str(data))) + # Logger.log("PersistentQueue %s" % (str(data))) + self.queue.put_nowait(data) + + def empty(self): + return self.queue.empty() + + def get(self): + return self.queue.get() \ No newline at end of file