introduce JMS queue

This commit is contained in:
Wolfgang Hottgenroth
2017-11-19 15:45:17 +01:00
parent 294c128291
commit 7b36caf330
25 changed files with 122 additions and 13 deletions

View File

@ -9,5 +9,6 @@
<classpathentry kind="lib" path="/home/wn/workspace-java/MeasurementCollector/libraries/log4j-api-2.9.1.jar"/> <classpathentry kind="lib" path="/home/wn/workspace-java/MeasurementCollector/libraries/log4j-api-2.9.1.jar"/>
<classpathentry kind="lib" path="/home/wn/workspace-java/MeasurementCollector/libraries/mysql-connector-java-5.1.44-bin.jar"/> <classpathentry kind="lib" path="/home/wn/workspace-java/MeasurementCollector/libraries/mysql-connector-java-5.1.44-bin.jar"/>
<classpathentry kind="lib" path="/home/wn/workspace-java/MeasurementCollector/libraries/activemq-client-5.15.2.jar"/> <classpathentry kind="lib" path="/home/wn/workspace-java/MeasurementCollector/libraries/activemq-client-5.15.2.jar"/>
<classpathentry kind="lib" path="/home/wn/workspace-java/MeasurementCollector/libraries/geronimo-jms_1.1_spec-1.1.1.jar"/>
<classpathentry kind="output" path="bin"/> <classpathentry kind="output" path="bin"/>
</classpath> </classpath>

Binary file not shown.

View File

@ -1,7 +1,7 @@
mqtt.broker = tcp://172.16.2.15:1883 ;mqtt.broker = tcp://172.16.2.15:1883
;mqtt.broker = tcp://eupenstrasse20.dynamic.hottis.de:2883 mqtt.broker = tcp://eupenstrasse20.dynamic.hottis.de:2883
;mqtt.username = tron mqtt.username = tron
;mqtt.password = geheim123 mqtt.password = geheim123
mbus.dataparser.1 = light,Light,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter mbus.dataparser.1 = light,Light,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.2 = computer,Computer,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter mbus.dataparser.2 = computer,Computer,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter

Binary file not shown.

Binary file not shown.

BIN
libraries/hawtbuf-1.11.jar Normal file

Binary file not shown.

Binary file not shown.

View File

@ -1,12 +1,15 @@
package de.hottis.MeasurementCollector; package de.hottis.MeasurementCollector;
import java.io.Serializable;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.Map; import java.util.Map;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
public abstract class ADataObject { public abstract class ADataObject implements Serializable {
private static final long serialVersionUID = 1L;
final protected Logger logger = LogManager.getRootLogger(); final protected Logger logger = LogManager.getRootLogger();
private LocalDateTime timestamp; private LocalDateTime timestamp;

View File

@ -0,0 +1,42 @@
package de.hottis.MeasurementCollector;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public abstract class AJmsQueue {
final protected Logger logger = LogManager.getRootLogger();
protected Connection connection;
protected Session session;
protected Destination destination;
public AJmsQueue() {
}
public void init() throws MeasurementCollectorException {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
logger.debug("connectionFactory: " + connectionFactory);
connection = connectionFactory.createConnection();
connection.start();
logger.debug("connection: " + connection);
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
logger.debug("session: " + session);
destination = session.createQueue("TEST.FOO");
logger.debug("destination: " + destination);
} catch (JMSException e) {
logger.error("JMSException in AJmsQueue.init", e);
throw new MeasurementCollectorException("JMSException in AJmsQueue.init", e);
}
}
}

View File

@ -1,6 +1,7 @@
package de.hottis.MeasurementCollector; package de.hottis.MeasurementCollector;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.List;
import java.util.Properties; import java.util.Properties;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
@ -10,18 +11,34 @@ public abstract class AMessageParser {
final protected Logger logger = LogManager.getRootLogger(); final protected Logger logger = LogManager.getRootLogger();
private String topic; private String topic;
protected DataObjectQueue queue; //private DataObjectQueue queue;
private JmsQueueProducer<ADataObject> jmsQueue = new JmsQueueProducer<ADataObject>();
protected Properties config; protected Properties config;
public AMessageParser(String topic, Properties config, DataObjectQueue queue) { public AMessageParser(String topic, Properties config, DataObjectQueue queue) {
this.topic = topic; this.topic = topic;
this.config = config; this.config = config;
this.queue = queue; //this.queue = queue;
} }
public void init() throws MeasurementCollectorException {
jmsQueue.init();
}
public String getTopic() { public String getTopic() {
return this.topic; return this.topic;
} }
public void enqueue(List<ADataObject> itemList) throws MeasurementCollectorException {
for (ADataObject ado : itemList) {
jmsQueue.enqueue(ado);
logger.debug("message enqueued");
}
//queue.add(itemList);
//logger.debug("Queue size: " + queue.size());
}
abstract public void execute(LocalDateTime timestamp, String msgPayload); abstract public void execute(LocalDateTime timestamp, String msgPayload);
} }

View File

@ -4,6 +4,7 @@ import java.time.LocalDateTime;
import java.util.HashMap; import java.util.HashMap;
public class ElectricEnergyDataObject extends ADataObject { public class ElectricEnergyDataObject extends ADataObject {
private static final long serialVersionUID = 1L;
static final String ENERGY_KEY = "energy"; static final String ENERGY_KEY = "energy";
static final String POWER_KEY = "power"; static final String POWER_KEY = "power";
static final String TABLE_NAME = "ElectricEnergy"; static final String TABLE_NAME = "ElectricEnergy";

View File

@ -0,0 +1,44 @@
package de.hottis.MeasurementCollector;
import java.io.Serializable;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
public class JmsQueueProducer<T extends Serializable> extends AJmsQueue {
private MessageProducer producer;
public JmsQueueProducer() {
super();
}
public void init() throws MeasurementCollectorException {
super.init();
try {
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
logger.debug("producer: " + producer);
} catch (JMSException e) {
logger.error("JMSException in AJmsQueue.init", e);
throw new MeasurementCollectorException("JMSException in JmsQueueProducer.init", e);
}
}
public void enqueue(T item) throws MeasurementCollectorException {
try {
ObjectMessage message = session.createObjectMessage();
message.setObject(item);
producer.send(message);
logger.debug("message enqueued");
} catch (JMSException e) {
logger.error("JMSException in JmsQueueProducer.enqueue", e);
logger.error("Calling init");
init();
throw new MeasurementCollectorException("JMSException in JmsQueueProducer.enqueue", e);
}
}
}

View File

@ -85,13 +85,12 @@ public class MBusParser extends AMessageParser {
// logger.debug(ado); // logger.debug(ado);
//} //}
queue.add(measurementItems); enqueue(measurementItems);
//logger.debug("Queue size: " + queue.size());
} else { } else {
logger.warn("unknown name: " + name); logger.warn("unknown name: " + name);
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("Exception when handling mbus message: " + e); logger.error("Exception when handling mbus message: ", e);
} }
} }

View File

@ -26,6 +26,7 @@ public class MeasurementCollector {
logger.debug("Queue instantiated"); logger.debug("Queue instantiated");
MBusParser mbusParser = new MBusParser(config, queue); MBusParser mbusParser = new MBusParser(config, queue);
mbusParser.init();
mbusParser.registerConfiguredDataParsers(); mbusParser.registerConfiguredDataParsers();
mqttReceiver.registerParser(mbusParser); mqttReceiver.registerParser(mbusParser);
logger.debug("MBusParser started"); logger.debug("MBusParser started");

View File

@ -4,6 +4,7 @@ import java.time.LocalDateTime;
import java.util.HashMap; import java.util.HashMap;
public class TemperatureDataObject extends ADataObject { public class TemperatureDataObject extends ADataObject {
private static final long serialVersionUID = 1L;
static final String TEMPERATURE_KEY = "temperature"; static final String TEMPERATURE_KEY = "temperature";
static final String TABLE_NAME = "Temperature"; static final String TABLE_NAME = "Temperature";