database engine started

This commit is contained in:
Wolfgang Hottgenroth
2017-11-16 15:25:45 +01:00
parent e0c99ac46d
commit bdb387b232
14 changed files with 116 additions and 8 deletions

View File

@ -0,0 +1,80 @@
package de.hottis.MeasurementCollector;
import java.util.List;
import java.util.Properties;
import java.util.Timer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class DatabaseEngine extends Thread implements ITriggerable {
final protected Logger logger = LogManager.getRootLogger();
private Properties config;
private DataObjectQueue queue;
private boolean stop;
private boolean triggerFlag;
private Timer timer;
public DatabaseEngine(Properties config, DataObjectQueue queue) {
super("MeasurementCollector.DatabaseEngine");
this.config = config;
this.queue = queue;
this.stop = false;
this.triggerFlag = false;
}
public void requestShutdown() {
logger.info("Shutdown of database engine requested");
this.stop = true;
try {
this.join();
logger.info("Database engine is down");
} catch (InterruptedException e) {
logger.error("Waiting for shutdown of database engine interrupted");
}
}
public synchronized void trigger() {
logger.debug("DatabaseEngine triggered");
triggerFlag = true;
notify();
}
public void init() {
timer = new Timer("DatabaseEngineTrigger");
timer.schedule(new TriggerTimer(this), 0, 60 * 1000);
}
@Override
public synchronized void run() {
while (! stop) {
logger.debug("DatabaseEngine is about to wait");
while (! triggerFlag) {
try {
wait();
} catch (InterruptedException e) {
}
}
triggerFlag = false;
logger.debug("DatabaseEngine has received trigger");
try {
while (true) {
List<ADataObject> adol = queue.peek();
if (adol == null) {
break;
}
for (ADataObject ado : adol) {
logger.debug(ado);
}
queue.remove(adol);
}
} catch (Exception e) {
logger.error("Exception in outer database engine loop", e);
}
}
}
}

View File

@ -0,0 +1,5 @@
package de.hottis.MeasurementCollector;
public interface ITriggerable {
public void trigger();
}

View File

@ -81,12 +81,12 @@ public class MBusParser extends AMessageParser {
if (dataParsers.containsKey(name)) {
List<ADataObject> measurementItems = dataParsers.get(name).parse(timestamp, dataRecords);
for (ADataObject ado : measurementItems) {
logger.debug(ado);
}
//for (ADataObject ado : measurementItems) {
// logger.debug(ado);
//}
queue.add(measurementItems);
logger.debug("Queue size: " + queue.size());
//logger.debug("Queue size: " + queue.size());
} else {
logger.warn("unknown name: " + name);
}

View File

@ -16,17 +16,24 @@ public class MeasurementCollector {
final Properties config = new Properties();
config.load(MeasurementCollector.class.getClassLoader().getResourceAsStream(PROPS_FILENAME));
logger.debug("Configuration loaded");
MqttReceiver mqttReceiver = new MqttReceiver(config);
mqttReceiver.connect();
logger.debug("MqttReceiver started");
DataObjectQueue queue = new DataObjectQueue();
logger.debug("Queue instantiated");
MBusParser mbusParser = new MBusParser(config, queue);
mbusParser.registerConfiguredDataParsers();
mqttReceiver.registerParser(mbusParser);
logger.debug("MBusParser started");
DatabaseEngine databaseEngine = new DatabaseEngine(config, queue);
databaseEngine.init();
databaseEngine.start();
logger.debug("DatabaseEngine started");
}
}

View File

@ -0,0 +1,15 @@
package de.hottis.MeasurementCollector;
import java.util.TimerTask;
public class TriggerTimer extends TimerTask {
private ITriggerable triggerable;
public TriggerTimer(ITriggerable triggerable) {
this.triggerable = triggerable;
}
public void run() {
triggerable.trigger();
}
}

View File

@ -2,7 +2,7 @@
<Configuration debug="false">
<Appenders>
<Console name="ConsoleAppender" target="SYSTEM_OUT">
<PatternLayout pattern="%d [%t] %-5level %logger{36} - %msg%n%throwable"/>
<PatternLayout pattern="%d [%t, %F, %L] %-5level - %msg%n%throwable"/>
</Console>
</Appenders>
<Loggers>