diff --git a/.classpath b/.classpath index 72afe50..302939a 100644 --- a/.classpath +++ b/.classpath @@ -9,5 +9,6 @@ + diff --git a/bin/de/hottis/MeasurementCollector/ADataObject.class b/bin/de/hottis/MeasurementCollector/ADataObject.class index e02d929..2ca7cd4 100644 Binary files a/bin/de/hottis/MeasurementCollector/ADataObject.class and b/bin/de/hottis/MeasurementCollector/ADataObject.class differ diff --git a/bin/de/hottis/MeasurementCollector/AJmsQueue.class b/bin/de/hottis/MeasurementCollector/AJmsQueue.class new file mode 100644 index 0000000..28d1e3d Binary files /dev/null and b/bin/de/hottis/MeasurementCollector/AJmsQueue.class differ diff --git a/bin/de/hottis/MeasurementCollector/AMessageParser.class b/bin/de/hottis/MeasurementCollector/AMessageParser.class index 86e71a6..47ff50f 100644 Binary files a/bin/de/hottis/MeasurementCollector/AMessageParser.class and b/bin/de/hottis/MeasurementCollector/AMessageParser.class differ diff --git a/bin/de/hottis/MeasurementCollector/DatabaseEngine.class b/bin/de/hottis/MeasurementCollector/DatabaseEngine.class index 79c084f..4863049 100644 Binary files a/bin/de/hottis/MeasurementCollector/DatabaseEngine.class and b/bin/de/hottis/MeasurementCollector/DatabaseEngine.class differ diff --git a/bin/de/hottis/MeasurementCollector/ElectricEnergyDataObject.class b/bin/de/hottis/MeasurementCollector/ElectricEnergyDataObject.class index 2747ab2..0d04644 100644 Binary files a/bin/de/hottis/MeasurementCollector/ElectricEnergyDataObject.class and b/bin/de/hottis/MeasurementCollector/ElectricEnergyDataObject.class differ diff --git a/bin/de/hottis/MeasurementCollector/JmsQueueProducer.class b/bin/de/hottis/MeasurementCollector/JmsQueueProducer.class new file mode 100644 index 0000000..c07b9a2 Binary files /dev/null and b/bin/de/hottis/MeasurementCollector/JmsQueueProducer.class differ diff --git a/bin/de/hottis/MeasurementCollector/MBusParser.class b/bin/de/hottis/MeasurementCollector/MBusParser.class index 8c52d4f..da0d0b8 100644 Binary files a/bin/de/hottis/MeasurementCollector/MBusParser.class and b/bin/de/hottis/MeasurementCollector/MBusParser.class differ diff --git a/bin/de/hottis/MeasurementCollector/MeasurementCollector.class b/bin/de/hottis/MeasurementCollector/MeasurementCollector.class index 103418a..f18ab02 100644 Binary files a/bin/de/hottis/MeasurementCollector/MeasurementCollector.class and b/bin/de/hottis/MeasurementCollector/MeasurementCollector.class differ diff --git a/bin/de/hottis/MeasurementCollector/MqttReceiver$Callback.class b/bin/de/hottis/MeasurementCollector/MqttReceiver$Callback.class index 8a07425..cfca45e 100644 Binary files a/bin/de/hottis/MeasurementCollector/MqttReceiver$Callback.class and b/bin/de/hottis/MeasurementCollector/MqttReceiver$Callback.class differ diff --git a/bin/de/hottis/MeasurementCollector/MqttReceiver.class b/bin/de/hottis/MeasurementCollector/MqttReceiver.class index a235172..0924a6b 100644 Binary files a/bin/de/hottis/MeasurementCollector/MqttReceiver.class and b/bin/de/hottis/MeasurementCollector/MqttReceiver.class differ diff --git a/bin/de/hottis/MeasurementCollector/TemperatureDataObject.class b/bin/de/hottis/MeasurementCollector/TemperatureDataObject.class index 386191a..bedc04c 100644 Binary files a/bin/de/hottis/MeasurementCollector/TemperatureDataObject.class and b/bin/de/hottis/MeasurementCollector/TemperatureDataObject.class differ diff --git a/bin/measurementCollector.props b/bin/measurementCollector.props index de15669..c798c52 100644 --- a/bin/measurementCollector.props +++ b/bin/measurementCollector.props @@ -1,7 +1,7 @@ -mqtt.broker = tcp://172.16.2.15:1883 -;mqtt.broker = tcp://eupenstrasse20.dynamic.hottis.de:2883 -;mqtt.username = tron -;mqtt.password = geheim123 +;mqtt.broker = tcp://172.16.2.15:1883 +mqtt.broker = tcp://eupenstrasse20.dynamic.hottis.de:2883 +mqtt.username = tron +mqtt.password = geheim123 mbus.dataparser.1 = light,Light,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter mbus.dataparser.2 = computer,Computer,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter diff --git a/libraries/geronimo-j2ee-management_1.1_spec-1.0.1.jar b/libraries/geronimo-j2ee-management_1.1_spec-1.0.1.jar new file mode 100644 index 0000000..0701ae0 Binary files /dev/null and b/libraries/geronimo-j2ee-management_1.1_spec-1.0.1.jar differ diff --git a/libraries/geronimo-jms_1.1_spec-1.1.1.jar b/libraries/geronimo-jms_1.1_spec-1.1.1.jar new file mode 100644 index 0000000..4f5e646 Binary files /dev/null and b/libraries/geronimo-jms_1.1_spec-1.1.1.jar differ diff --git a/libraries/hawtbuf-1.11.jar b/libraries/hawtbuf-1.11.jar new file mode 100644 index 0000000..c7042a6 Binary files /dev/null and b/libraries/hawtbuf-1.11.jar differ diff --git a/libraries/slf4j-api-1.7.25.jar b/libraries/slf4j-api-1.7.25.jar new file mode 100644 index 0000000..0143c09 Binary files /dev/null and b/libraries/slf4j-api-1.7.25.jar differ diff --git a/src/de/hottis/MeasurementCollector/ADataObject.java b/src/de/hottis/MeasurementCollector/ADataObject.java index 5e99256..cf36368 100644 --- a/src/de/hottis/MeasurementCollector/ADataObject.java +++ b/src/de/hottis/MeasurementCollector/ADataObject.java @@ -1,12 +1,15 @@ package de.hottis.MeasurementCollector; +import java.io.Serializable; import java.time.LocalDateTime; import java.util.Map; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public abstract class ADataObject { +public abstract class ADataObject implements Serializable { + private static final long serialVersionUID = 1L; + final protected Logger logger = LogManager.getRootLogger(); private LocalDateTime timestamp; diff --git a/src/de/hottis/MeasurementCollector/AJmsQueue.java b/src/de/hottis/MeasurementCollector/AJmsQueue.java new file mode 100644 index 0000000..7ed3cac --- /dev/null +++ b/src/de/hottis/MeasurementCollector/AJmsQueue.java @@ -0,0 +1,42 @@ +package de.hottis.MeasurementCollector; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public abstract class AJmsQueue { + final protected Logger logger = LogManager.getRootLogger(); + + protected Connection connection; + protected Session session; + protected Destination destination; + + public AJmsQueue() { + + } + + + public void init() throws MeasurementCollectorException { + try { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + logger.debug("connectionFactory: " + connectionFactory); + connection = connectionFactory.createConnection(); + connection.start(); + logger.debug("connection: " + connection); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + logger.debug("session: " + session); + destination = session.createQueue("TEST.FOO"); + logger.debug("destination: " + destination); + } catch (JMSException e) { + logger.error("JMSException in AJmsQueue.init", e); + throw new MeasurementCollectorException("JMSException in AJmsQueue.init", e); + } + } + + +} diff --git a/src/de/hottis/MeasurementCollector/AMessageParser.java b/src/de/hottis/MeasurementCollector/AMessageParser.java index cb2f055..cb62567 100644 --- a/src/de/hottis/MeasurementCollector/AMessageParser.java +++ b/src/de/hottis/MeasurementCollector/AMessageParser.java @@ -1,6 +1,7 @@ package de.hottis.MeasurementCollector; import java.time.LocalDateTime; +import java.util.List; import java.util.Properties; import org.apache.logging.log4j.LogManager; @@ -10,18 +11,34 @@ public abstract class AMessageParser { final protected Logger logger = LogManager.getRootLogger(); private String topic; - protected DataObjectQueue queue; + //private DataObjectQueue queue; + private JmsQueueProducer jmsQueue = new JmsQueueProducer(); protected Properties config; - + public AMessageParser(String topic, Properties config, DataObjectQueue queue) { this.topic = topic; this.config = config; - this.queue = queue; + //this.queue = queue; } - + + public void init() throws MeasurementCollectorException { + jmsQueue.init(); + } + + public String getTopic() { return this.topic; } - + + public void enqueue(List itemList) throws MeasurementCollectorException { + for (ADataObject ado : itemList) { + jmsQueue.enqueue(ado); + logger.debug("message enqueued"); + } + + //queue.add(itemList); + //logger.debug("Queue size: " + queue.size()); + } + abstract public void execute(LocalDateTime timestamp, String msgPayload); } diff --git a/src/de/hottis/MeasurementCollector/ElectricEnergyDataObject.java b/src/de/hottis/MeasurementCollector/ElectricEnergyDataObject.java index 390d9b8..93aeb77 100644 --- a/src/de/hottis/MeasurementCollector/ElectricEnergyDataObject.java +++ b/src/de/hottis/MeasurementCollector/ElectricEnergyDataObject.java @@ -4,6 +4,7 @@ import java.time.LocalDateTime; import java.util.HashMap; public class ElectricEnergyDataObject extends ADataObject { + private static final long serialVersionUID = 1L; static final String ENERGY_KEY = "energy"; static final String POWER_KEY = "power"; static final String TABLE_NAME = "ElectricEnergy"; diff --git a/src/de/hottis/MeasurementCollector/JmsQueueProducer.java b/src/de/hottis/MeasurementCollector/JmsQueueProducer.java new file mode 100644 index 0000000..a8834e9 --- /dev/null +++ b/src/de/hottis/MeasurementCollector/JmsQueueProducer.java @@ -0,0 +1,44 @@ +package de.hottis.MeasurementCollector; + +import java.io.Serializable; + +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; + +public class JmsQueueProducer extends AJmsQueue { + private MessageProducer producer; + + public JmsQueueProducer() { + super(); + } + + public void init() throws MeasurementCollectorException { + super.init(); + + try { + producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + logger.debug("producer: " + producer); + } catch (JMSException e) { + logger.error("JMSException in AJmsQueue.init", e); + throw new MeasurementCollectorException("JMSException in JmsQueueProducer.init", e); + } + } + + public void enqueue(T item) throws MeasurementCollectorException { + try { + ObjectMessage message = session.createObjectMessage(); + message.setObject(item); + producer.send(message); + logger.debug("message enqueued"); + } catch (JMSException e) { + logger.error("JMSException in JmsQueueProducer.enqueue", e); + logger.error("Calling init"); + init(); + throw new MeasurementCollectorException("JMSException in JmsQueueProducer.enqueue", e); + } + } + +} diff --git a/src/de/hottis/MeasurementCollector/MBusParser.java b/src/de/hottis/MeasurementCollector/MBusParser.java index c08fd77..2e78cd4 100644 --- a/src/de/hottis/MeasurementCollector/MBusParser.java +++ b/src/de/hottis/MeasurementCollector/MBusParser.java @@ -85,13 +85,12 @@ public class MBusParser extends AMessageParser { // logger.debug(ado); //} - queue.add(measurementItems); - //logger.debug("Queue size: " + queue.size()); + enqueue(measurementItems); } else { logger.warn("unknown name: " + name); } } catch (Exception e) { - logger.error("Exception when handling mbus message: " + e); + logger.error("Exception when handling mbus message: ", e); } } diff --git a/src/de/hottis/MeasurementCollector/MeasurementCollector.java b/src/de/hottis/MeasurementCollector/MeasurementCollector.java index 22dcac2..e9de2d5 100644 --- a/src/de/hottis/MeasurementCollector/MeasurementCollector.java +++ b/src/de/hottis/MeasurementCollector/MeasurementCollector.java @@ -26,6 +26,7 @@ public class MeasurementCollector { logger.debug("Queue instantiated"); MBusParser mbusParser = new MBusParser(config, queue); + mbusParser.init(); mbusParser.registerConfiguredDataParsers(); mqttReceiver.registerParser(mbusParser); logger.debug("MBusParser started"); diff --git a/src/de/hottis/MeasurementCollector/TemperatureDataObject.java b/src/de/hottis/MeasurementCollector/TemperatureDataObject.java index ec67dae..ba92bbe 100644 --- a/src/de/hottis/MeasurementCollector/TemperatureDataObject.java +++ b/src/de/hottis/MeasurementCollector/TemperatureDataObject.java @@ -4,6 +4,7 @@ import java.time.LocalDateTime; import java.util.HashMap; public class TemperatureDataObject extends ADataObject { + private static final long serialVersionUID = 1L; static final String TEMPERATURE_KEY = "temperature"; static final String TABLE_NAME = "Temperature";