ongoing changes
This commit is contained in:
39
DatabaseEngine.py
Normal file
39
DatabaseEngine.py
Normal file
@ -0,0 +1,39 @@
|
||||
import threading
|
||||
from logger import Logger
|
||||
|
||||
class DatabaseEngine(threading.Thread):
|
||||
singleton = None
|
||||
|
||||
@classmethod
|
||||
def create(cls, queue, period):
|
||||
if cls.singleton is None:
|
||||
cls.singleton = DatabaseEngine(queue, period)
|
||||
return cls.singleton
|
||||
|
||||
def __init__(self, queue, period):
|
||||
threading.Thread.__init__(self)
|
||||
self.event = threading.Event()
|
||||
self.period = period
|
||||
self.queue = queue
|
||||
self.startTimer()
|
||||
self.setDaemon(True)
|
||||
|
||||
def triggerStoring(self):
|
||||
self.event.set()
|
||||
|
||||
def startTimer(self):
|
||||
self.timer = threading.Timer(self.period, self.event.set)
|
||||
self.timer.start()
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
self.event.wait()
|
||||
self.event.clear()
|
||||
|
||||
while not self.queue.empty():
|
||||
data = self.queue.get()
|
||||
Logger.log("DatabaseEngine receives: %s" % (str(data)))
|
||||
else:
|
||||
Logger.log("DatabaseEngine: no more data to handle")
|
||||
|
||||
self.startTimer()
|
@ -8,5 +8,5 @@ class MBusParser(AbstractParser, AbstractNextStage):
|
||||
self.topic = "IoT/Measurement/MeterbusHub"
|
||||
|
||||
def execute(self, data):
|
||||
Logger.log("MBusParser %s" % (str(data)))
|
||||
#Logger.log("MBusParser %s" % (str(data)))
|
||||
self.executeNextStage(data)
|
@ -8,5 +8,5 @@ class ModbusParser(AbstractParser, AbstractNextStage):
|
||||
self.topic = "IoT/Measurement/ModbusHub"
|
||||
|
||||
def execute(self, data):
|
||||
Logger.log("ModbusParser %s" % (str(data)))
|
||||
#Logger.log("ModbusParser %s" % (str(data)))
|
||||
self.executeNextStage(data)
|
@ -9,6 +9,7 @@ import time
|
||||
from MBusParser import MBusParser
|
||||
from ModbusParser import ModbusParser
|
||||
from PersistentQueue import PersistentQueue
|
||||
from DatabaseEngine import DatabaseEngine
|
||||
|
||||
|
||||
DEBUG = True
|
||||
@ -50,12 +51,20 @@ try:
|
||||
|
||||
mbusParser = MBusParser()
|
||||
mqttReader.registerParser(mbusParser)
|
||||
Logger.log("MBusParser started ...")
|
||||
|
||||
modbusParser = ModbusParser()
|
||||
mqttReader.registerParser(modbusParser)
|
||||
Logger.log("ModbusParser started ...")
|
||||
|
||||
persistentQueue = PersistentQueue()
|
||||
mbusParser.setNextStage(persistentQueue)
|
||||
modbusParser.setNextStage(persistentQueue)
|
||||
Logger.log("PersistentQueue instantiated ...")
|
||||
|
||||
databaseEngine = DatabaseEngine(persistentQueue, 60.0)
|
||||
databaseEngine.start()
|
||||
Logger.log("DatabaseEngine started ...")
|
||||
|
||||
Logger.log("MqttDispatcher running")
|
||||
|
||||
|
@ -11,7 +11,7 @@ def on_message(client, userdata, msg):
|
||||
try:
|
||||
j = {'payload':msg.payload, 'topic':msg.topic, 'timestamp':datetime.datetime.now()}
|
||||
|
||||
Logger.debug("MqttReceiver receives: %s" % j)
|
||||
# Logger.debug("MqttReceiver receives: %s" % j)
|
||||
|
||||
global _parsers
|
||||
_parsers[msg.topic].execute(j)
|
||||
|
@ -1,8 +1,17 @@
|
||||
from logger import Logger
|
||||
import Queue
|
||||
|
||||
class PersistentQueue(object):
|
||||
def __init__(self):
|
||||
super(PersistentQueue, self).__init__()
|
||||
self.queue = Queue.Queue()
|
||||
|
||||
def execute(self, data):
|
||||
Logger.log("PersistentQueue %s" % (str(data)))
|
||||
# Logger.log("PersistentQueue %s" % (str(data)))
|
||||
self.queue.put_nowait(data)
|
||||
|
||||
def empty(self):
|
||||
return self.queue.empty()
|
||||
|
||||
def get(self):
|
||||
return self.queue.get()
|
Reference in New Issue
Block a user