fix package name, start database handling

This commit is contained in:
Wolfgang Hottgenroth
2017-11-21 17:53:11 +01:00
parent 7b36caf330
commit 5896212ed9
57 changed files with 367 additions and 225 deletions

View File

@ -1,42 +0,0 @@
package de.hottis.MeasurementCollector;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public abstract class AJmsQueue {
final protected Logger logger = LogManager.getRootLogger();
protected Connection connection;
protected Session session;
protected Destination destination;
public AJmsQueue() {
}
public void init() throws MeasurementCollectorException {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
logger.debug("connectionFactory: " + connectionFactory);
connection = connectionFactory.createConnection();
connection.start();
logger.debug("connection: " + connection);
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
logger.debug("session: " + session);
destination = session.createQueue("TEST.FOO");
logger.debug("destination: " + destination);
} catch (JMSException e) {
logger.error("JMSException in AJmsQueue.init", e);
throw new MeasurementCollectorException("JMSException in AJmsQueue.init", e);
}
}
}

View File

@ -1,11 +0,0 @@
package de.hottis.MeasurementCollector;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@SuppressWarnings("serial")
public class DataObjectQueue extends ConcurrentLinkedQueue<List<ADataObject>> {
public DataObjectQueue() {
super();
}
}

View File

@ -1,81 +0,0 @@
package de.hottis.MeasurementCollector;
import java.util.List;
import java.util.Properties;
import java.util.Timer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class DatabaseEngine extends Thread implements ITriggerable {
final protected Logger logger = LogManager.getRootLogger();
private Properties config;
private DataObjectQueue queue;
private boolean stop;
private boolean triggerFlag;
private Timer timer;
public DatabaseEngine(Properties config, DataObjectQueue queue) {
super("MeasurementCollector.DatabaseEngine");
this.config = config;
this.queue = queue;
this.stop = false;
this.triggerFlag = false;
}
public void requestShutdown() {
logger.info("Shutdown of database engine requested");
this.stop = true;
try {
this.join();
logger.info("Database engine is down");
} catch (InterruptedException e) {
logger.error("Waiting for shutdown of database engine interrupted");
}
}
public synchronized void trigger() {
logger.debug("DatabaseEngine triggered");
triggerFlag = true;
notify();
}
public void init() {
timer = new Timer("DatabaseEngineTrigger");
timer.schedule(new TriggerTimer(this), 0, 60 * 1000);
}
@Override
public synchronized void run() {
while (! stop) {
logger.debug("DatabaseEngine is about to wait");
while (! triggerFlag) {
try {
wait();
} catch (InterruptedException e) {
}
}
triggerFlag = false;
logger.debug("DatabaseEngine has received trigger");
try {
while (true) {
List<ADataObject> adol = queue.peek();
if (adol == null) {
break;
}
for (ADataObject ado : adol) {
logger.debug(ado);
}
queue.remove(adol);
}
} catch (Exception e) {
logger.error("Exception in outer database engine loop", e);
}
}
logger.info("Database engine is terminating");
}
}

View File

@ -1,44 +0,0 @@
package de.hottis.MeasurementCollector;
import java.io.Serializable;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
public class JmsQueueProducer<T extends Serializable> extends AJmsQueue {
private MessageProducer producer;
public JmsQueueProducer() {
super();
}
public void init() throws MeasurementCollectorException {
super.init();
try {
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
logger.debug("producer: " + producer);
} catch (JMSException e) {
logger.error("JMSException in AJmsQueue.init", e);
throw new MeasurementCollectorException("JMSException in JmsQueueProducer.init", e);
}
}
public void enqueue(T item) throws MeasurementCollectorException {
try {
ObjectMessage message = session.createObjectMessage();
message.setObject(item);
producer.send(message);
logger.debug("message enqueued");
} catch (JMSException e) {
logger.error("JMSException in JmsQueueProducer.enqueue", e);
logger.error("Calling init");
init();
throw new MeasurementCollectorException("JMSException in JmsQueueProducer.enqueue", e);
}
}
}

View File

@ -1,4 +1,4 @@
package de.hottis.MeasurementCollector;
package de.hottis.measurementCollector;
import java.io.Serializable;
import java.time.LocalDateTime;

View File

@ -1,4 +1,4 @@
package de.hottis.MeasurementCollector;
package de.hottis.measurementCollector;
import java.time.LocalDateTime;
import java.util.List;

View File

@ -1,4 +1,4 @@
package de.hottis.MeasurementCollector;
package de.hottis.measurementCollector;
import java.time.LocalDateTime;
import java.util.List;
@ -11,18 +11,16 @@ public abstract class AMessageParser {
final protected Logger logger = LogManager.getRootLogger();
private String topic;
//private DataObjectQueue queue;
private JmsQueueProducer<ADataObject> jmsQueue = new JmsQueueProducer<ADataObject>();
private MyQueue<ADataObject> queue;
protected Properties config;
public AMessageParser(String topic, Properties config, DataObjectQueue queue) {
public AMessageParser(String topic, Properties config, MyQueue<ADataObject> queue) {
this.topic = topic;
this.config = config;
//this.queue = queue;
this.queue = queue;
}
public void init() throws MeasurementCollectorException {
jmsQueue.init();
}
@ -32,12 +30,9 @@ public abstract class AMessageParser {
public void enqueue(List<ADataObject> itemList) throws MeasurementCollectorException {
for (ADataObject ado : itemList) {
jmsQueue.enqueue(ado);
queue.enqueue(ado);
logger.debug("message enqueued");
}
//queue.add(itemList);
//logger.debug("Queue size: " + queue.size());
}
abstract public void execute(LocalDateTime timestamp, String msgPayload);

View File

@ -0,0 +1,122 @@
package de.hottis.measurementCollector;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;
import java.util.Timer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class DatabaseEngine extends Thread implements ITriggerable {
static final String DATABASE_ENGINE_PERIOD_PROP = "db.period";
static final String DATABASE_URL_PROP = "db.url";
final protected Logger logger = LogManager.getRootLogger();
private Properties config;
private MyQueue<ADataObject> queue;
private boolean stop;
private boolean triggerFlag;
private Timer timer;
private int period;
private String dbUrl;
public DatabaseEngine(Properties config, MyQueue<ADataObject> queue) {
super("MeasurementCollector.DatabaseEngine");
this.config = config;
this.queue = queue;
this.stop = false;
this.triggerFlag = false;
this.period = Integer.parseInt(this.config.getProperty(DATABASE_ENGINE_PERIOD_PROP));
this.dbUrl = this.config.getProperty(DATABASE_URL_PROP);
}
public void requestShutdown() {
logger.info("Shutdown of database engine requested");
this.stop = true;
try {
this.join();
logger.info("Database engine is down");
} catch (InterruptedException e) {
logger.error("Waiting for shutdown of database engine interrupted");
}
}
public synchronized void trigger() {
logger.debug("DatabaseEngine triggered");
triggerFlag = true;
notify();
}
public void init() throws MeasurementCollectorException {
timer = new Timer("DatabaseEngineTrigger");
timer.schedule(new TriggerTimer(this), 0, period * 1000);
}
private String createStatementFromDataObject(ADataObject ado) {
return "";
}
@Override
public synchronized void run() {
while (! stop) {
logger.debug("DatabaseEngine is about to wait for (regularly) " + period + "s");
while (! triggerFlag) {
try {
wait();
} catch (InterruptedException e) {
}
}
triggerFlag = false;
logger.debug("DatabaseEngine has received trigger");
Connection dbCon = null;
try {
int itemCnt = 0;
while (true) {
PreparedStatement pstmt = null;
try {
ADataObject ado = queue.dequeue();
if (ado == null) {
logger.warn("DatabaseEngine found no data");
break;
}
String stmtTxt = createStatementFromDataObject(ado);
pstmt = dbCon.prepareStatement(stmtTxt);
for (Object o : ado.getValues().values()) {
}
itemCnt++;
logger.info("DatabaseEngine received (" + itemCnt + ") " + ado);
} catch (SQLException e) {
logger.error("SQLException in inner database engine loop", e);
} finally {
if (pstmt != null) {
try {
pstmt.close();
pstmt = null;
} catch (SQLException e) {
logger.warn("SQLException when closing statement, nothing will be done");
}
}
}
}
} catch (Exception e) {
logger.error("Exception in outer database engine loop", e);
} finally {
if (dbCon ! null) {
try {
dbCon.close();
dbCon = null;
} catch (SQLException e) {
logger.warn("SQLException when closing connection, nothing will be done");
}
}
}
}
logger.info("Database engine is terminating");
}
}

View File

@ -1,4 +1,4 @@
package de.hottis.MeasurementCollector;
package de.hottis.measurementCollector;
import java.time.LocalDateTime;
import java.util.HashMap;

View File

@ -1,4 +1,4 @@
package de.hottis.MeasurementCollector;
package de.hottis.measurementCollector;
import java.time.LocalDateTime;
import java.util.ArrayList;

View File

@ -1,4 +1,4 @@
package de.hottis.MeasurementCollector;
package de.hottis.measurementCollector;
import java.time.LocalDateTime;
import java.util.ArrayList;

View File

@ -1,4 +1,4 @@
package de.hottis.MeasurementCollector;
package de.hottis.measurementCollector;
import java.time.LocalDateTime;
import java.util.ArrayList;

View File

@ -1,4 +1,4 @@
package de.hottis.MeasurementCollector;
package de.hottis.measurementCollector;
public interface ITriggerable {
public void trigger();

View File

@ -0,0 +1,124 @@
package de.hottis.measurementCollector;
import java.io.Serializable;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class JmsTopic<T extends Serializable> implements MyQueue<T> {
static final String JMS_PARSED_DATA_TOPIC = "jms.parseddata.topic";
static final String JMS_CLIENTID_PROP = "jms.clientid";
static final String JMS_BROKER ="jms.broker";
static enum Mode { CONSUMER, PRODUCER, CONSUMER_PRODUCER };
final protected Logger logger = LogManager.getRootLogger();
private String clientId;
private Properties config;
private Connection connection;
private Session session;
private Topic topic;
private MessageProducer producer;
private MessageConsumer consumer;
private String topicTxt;
private String broker;
private Mode mode;
public JmsTopic(Properties config, Mode mode) {
this.config = config;
clientId = this.config.getProperty(JMS_CLIENTID_PROP);
topicTxt = this.config.getProperty(JMS_PARSED_DATA_TOPIC);
broker = this.config.getProperty(JMS_BROKER);
this.mode = mode;
}
public void init() throws MeasurementCollectorException {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker);
connectionFactory.setTrustAllPackages(true);
logger.debug("connectionFactory: " + connectionFactory);
connection = connectionFactory.createConnection();
connection.setClientID(clientId);
connection.start();
logger.debug("connection: " + connection);
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
logger.debug("session: " + session);
topic = session.createTopic(topicTxt);
logger.debug("topic: " + topic);
if ((mode == Mode.PRODUCER) || (mode == Mode.CONSUMER_PRODUCER)) {
producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
logger.debug("producer: " + producer);
} else {
producer = null;
}
if ((mode == Mode.CONSUMER) || (mode == Mode.CONSUMER_PRODUCER)) {
consumer = session.createDurableSubscriber(topic, "DatabaseEngine");
logger.debug("consumer: " + consumer);
} else {
consumer = null;
}
} catch (JMSException e) {
logger.error("JMSException in AJmsQueue.init", e);
throw new MeasurementCollectorException("JMSException in AJmsTopic.init", e);
}
}
public T dequeue() throws MeasurementCollectorException {
if (consumer == null) {
throw new MeasurementCollectorException("This is no consumer");
}
try {
ObjectMessage message = (ObjectMessage) consumer.receiveNoWait();
T item;
if (message != null) {
@SuppressWarnings("unchecked")
T t = (T) message.getObject();
item = t;
logger.debug("message dequeued");
} else {
item = null;
}
return item;
} catch (JMSException e) {
logger.error("JMSException in JmsTopic.dequeue", e);
logger.error("Calling init");
init();
throw new MeasurementCollectorException("JMSException in JmsTopic.dequeue", e);
}
}
public void enqueue(T item) throws MeasurementCollectorException {
if (producer == null) {
throw new MeasurementCollectorException("This is no producer");
}
try {
ObjectMessage message = session.createObjectMessage();
message.setObject(item);
producer.send(message);
logger.debug("message enqueued");
} catch (JMSException e) {
logger.error("JMSException in JmsTopic.enqueue", e);
logger.error("Calling init");
init();
throw new MeasurementCollectorException("JMSException in JmsTopic.enqueue", e);
}
}
}

View File

@ -1,4 +1,4 @@
package de.hottis.MeasurementCollector;
package de.hottis.measurementCollector;
import java.lang.reflect.Constructor;
import java.time.LocalDateTime;
@ -24,7 +24,7 @@ public class MBusParser extends AMessageParser {
private HashMap<String, ADataParser> dataParsers;
public MBusParser(Properties config, DataObjectQueue queue) {
public MBusParser(Properties config, MyQueue<ADataObject> queue) {
super(TOPIC, config, queue);
JsonParserFactory jsonParserFactory = JsonParserFactory.getInstance();
jsonParser = jsonParserFactory.newJsonParser();

View File

@ -1,4 +1,4 @@
package de.hottis.MeasurementCollector;
package de.hottis.measurementCollector;
import java.util.Properties;
@ -22,19 +22,14 @@ public class MeasurementCollector {
mqttReceiver.connect();
logger.debug("MqttReceiver started");
DataObjectQueue queue = new DataObjectQueue();
logger.debug("Queue instantiated");
JmsTopic<ADataObject> queue = new JmsTopic<ADataObject>(config, JmsTopic.Mode.PRODUCER);
queue.init();
MBusParser mbusParser = new MBusParser(config, queue);
mbusParser.init();
mbusParser.registerConfiguredDataParsers();
mqttReceiver.registerParser(mbusParser);
logger.debug("MBusParser started");
DatabaseEngine databaseEngine = new DatabaseEngine(config, queue);
databaseEngine.init();
databaseEngine.start();
logger.debug("DatabaseEngine started");
}
}

View File

@ -1,4 +1,4 @@
package de.hottis.MeasurementCollector;
package de.hottis.measurementCollector;
@SuppressWarnings("serial")
public class MeasurementCollectorException extends Exception {
@ -6,4 +6,8 @@ public class MeasurementCollectorException extends Exception {
super(msg, cause);
}
public MeasurementCollectorException(String msg) {
super(msg);
}
}

View File

@ -0,0 +1,30 @@
package de.hottis.measurementCollector;
import java.util.Properties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class MeasurementDatabaseEngine {
static final String PROPS_FILENAME = "measurementDataEngine.props";
static final Logger logger = LogManager.getRootLogger();
public static void main(String[] args) throws Exception {
logger.info("MeasurementDatabaseEngine starting");
final Properties config = new Properties();
config.load(MeasurementDatabaseEngine.class.getClassLoader().getResourceAsStream(PROPS_FILENAME));
logger.debug("Configuration loaded");
JmsTopic<ADataObject> queue = new JmsTopic<ADataObject>(config, JmsTopic.Mode.CONSUMER);
queue.init();
DatabaseEngine databaseEngine = new DatabaseEngine(config, queue);
databaseEngine.init();
databaseEngine.start();
logger.debug("DatabaseEngine started");
}
}

View File

@ -1,4 +1,4 @@
package de.hottis.MeasurementCollector;
package de.hottis.measurementCollector;
import java.time.LocalDateTime;
import java.util.HashMap;

View File

@ -0,0 +1,8 @@
package de.hottis.measurementCollector;
import java.io.Serializable;
public interface MyQueue<T extends Serializable> {
public T dequeue() throws MeasurementCollectorException;
public void enqueue(T item) throws MeasurementCollectorException;
}

View File

@ -1,4 +1,4 @@
package de.hottis.MeasurementCollector;
package de.hottis.measurementCollector;
import java.time.LocalDateTime;
import java.util.HashMap;

View File

@ -1,4 +1,4 @@
package de.hottis.MeasurementCollector;
package de.hottis.measurementCollector;
import java.util.TimerTask;

View File

@ -3,11 +3,15 @@ mqtt.broker = tcp://eupenstrasse20.dynamic.hottis.de:2883
mqtt.username = tron
mqtt.password = geheim123
mbus.dataparser.1 = light,Light,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.2 = computer,Computer,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.3 = laundry,Laundry,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.4 = dryer,Dryer,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.5 = dishwasher,Dishwasher,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.6 = freezer,Freezer,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.7 = electricity,Total,de.hottis.MeasurementCollector.FinderThreePhasePowerMeter
mbus.dataparser.8 = thermom.,null,de.hottis.MeasurementCollector.HottisFourChannelThermometer
mbus.dataparser.1 = light,Light,de.hottis.measurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.2 = computer,Computer,de.hottis.measurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.3 = laundry,Laundry,de.hottis.measurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.4 = dryer,Dryer,de.hottis.measurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.5 = dishwasher,Dishwasher,de.hottis.measurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.6 = freezer,Freezer,de.hottis.measurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.7 = electricity,Total,de.hottis.measurementCollector.FinderThreePhasePowerMeter
mbus.dataparser.8 = thermom.,null,de.hottis.measurementCollector.HottisFourChannelThermometer
jms.broker = tcp://localhost:61616
jms.clientid = mcol
jms.parseddata.topic = IoT/Measurement

View File

@ -0,0 +1,5 @@
db.period = 3600
jms.broker = tcp://localhost:61616
jms.clientid = mdb
jms.parseddata.topic = IoT/Measurement