diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5887986 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +MeasurementCollectorMqttReceiver-* diff --git a/bin/de/hottis/MeasurementCollector/ADataParser.class b/bin/de/hottis/MeasurementCollector/ADataParser.class index 34b01e9..927f512 100644 Binary files a/bin/de/hottis/MeasurementCollector/ADataParser.class and b/bin/de/hottis/MeasurementCollector/ADataParser.class differ diff --git a/bin/de/hottis/MeasurementCollector/AMessageParser.class b/bin/de/hottis/MeasurementCollector/AMessageParser.class index ff295b0..97125ce 100644 Binary files a/bin/de/hottis/MeasurementCollector/AMessageParser.class and b/bin/de/hottis/MeasurementCollector/AMessageParser.class differ diff --git a/bin/de/hottis/MeasurementCollector/DataObjectQueue.class b/bin/de/hottis/MeasurementCollector/DataObjectQueue.class new file mode 100644 index 0000000..8fd22a8 Binary files /dev/null and b/bin/de/hottis/MeasurementCollector/DataObjectQueue.class differ diff --git a/bin/de/hottis/MeasurementCollector/FinderOnePhasePowerMeter.class b/bin/de/hottis/MeasurementCollector/FinderOnePhasePowerMeter.class index bdf2fd2..3d761d5 100644 Binary files a/bin/de/hottis/MeasurementCollector/FinderOnePhasePowerMeter.class and b/bin/de/hottis/MeasurementCollector/FinderOnePhasePowerMeter.class differ diff --git a/bin/de/hottis/MeasurementCollector/FinderThreePhasePowerMeter.class b/bin/de/hottis/MeasurementCollector/FinderThreePhasePowerMeter.class index 5cf45e6..a2ec190 100644 Binary files a/bin/de/hottis/MeasurementCollector/FinderThreePhasePowerMeter.class and b/bin/de/hottis/MeasurementCollector/FinderThreePhasePowerMeter.class differ diff --git a/bin/de/hottis/MeasurementCollector/HottisFourChannelThermometer.class b/bin/de/hottis/MeasurementCollector/HottisFourChannelThermometer.class index 9b5bcbf..56a3a2c 100644 Binary files a/bin/de/hottis/MeasurementCollector/HottisFourChannelThermometer.class and b/bin/de/hottis/MeasurementCollector/HottisFourChannelThermometer.class differ diff --git a/bin/de/hottis/MeasurementCollector/MBusParser.class b/bin/de/hottis/MeasurementCollector/MBusParser.class index 53c74d4..1bdb046 100644 Binary files a/bin/de/hottis/MeasurementCollector/MBusParser.class and b/bin/de/hottis/MeasurementCollector/MBusParser.class differ diff --git a/bin/de/hottis/MeasurementCollector/MeasurementCollector.class b/bin/de/hottis/MeasurementCollector/MeasurementCollector.class index 9318cbf..13d4fa5 100644 Binary files a/bin/de/hottis/MeasurementCollector/MeasurementCollector.class and b/bin/de/hottis/MeasurementCollector/MeasurementCollector.class differ diff --git a/bin/de/hottis/MeasurementCollector/MeasurementCollectorException.class b/bin/de/hottis/MeasurementCollector/MeasurementCollectorException.class index 9a2e8c3..7293159 100644 Binary files a/bin/de/hottis/MeasurementCollector/MeasurementCollectorException.class and b/bin/de/hottis/MeasurementCollector/MeasurementCollectorException.class differ diff --git a/bin/de/hottis/MeasurementCollector/TestParser.class b/bin/de/hottis/MeasurementCollector/TestParser.class deleted file mode 100644 index 8acdf3b..0000000 Binary files a/bin/de/hottis/MeasurementCollector/TestParser.class and /dev/null differ diff --git a/bin/measurementCollector.props b/bin/measurementCollector.props index 7cb4554..7a03150 100644 --- a/bin/measurementCollector.props +++ b/bin/measurementCollector.props @@ -1,3 +1,13 @@ -mqtt.broker = tcp://172.16.2.15:1883 -; mqtt.username = tron -; mqtt.password = geheim123 +; mqtt.broker = tcp://172.16.2.15:1883 +mqtt.broker = tcp://eupenstrasse20.dynamic.hottis.de:2883 +mqtt.username = tron +mqtt.password = geheim123 + +mbus.dataparser.1 = light,Light,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter +mbus.dataparser.2 = computer,Computer,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter +mbus.dataparser.3 = laundry,Laundry,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter +mbus.dataparser.4 = dryer,Dryer,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter +mbus.dataparser.5 = dishwasher,Dishwasher,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter +mbus.dataparser.6 = freezer,Freezer,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter +mbus.dataparser.7 = electricity,Total,de.hottis.MeasurementCollector.FinderThreePhasePowerMeter +mbus.dataparser.8 = thermom.,null,de.hottis.MeasurementCollector.HottisFourChannelThermometer diff --git a/src/de/hottis/MeasurementCollector/ADataParser.java b/src/de/hottis/MeasurementCollector/ADataParser.java index 3db53ab..ea955dd 100644 --- a/src/de/hottis/MeasurementCollector/ADataParser.java +++ b/src/de/hottis/MeasurementCollector/ADataParser.java @@ -5,6 +5,12 @@ import java.util.List; import org.openmuc.jmbus.DataRecord; -public interface ADataParser { - public List parse(LocalDateTime timestamp, String name, List dataRecords); +public abstract class ADataParser { + protected String name; + + public ADataParser(String name) { + this.name = name; + } + + abstract public List parse(LocalDateTime timestamp, List dataRecords); } diff --git a/src/de/hottis/MeasurementCollector/AMessageParser.java b/src/de/hottis/MeasurementCollector/AMessageParser.java index 05313b8..2f54681 100644 --- a/src/de/hottis/MeasurementCollector/AMessageParser.java +++ b/src/de/hottis/MeasurementCollector/AMessageParser.java @@ -1,15 +1,16 @@ package de.hottis.MeasurementCollector; import java.time.LocalDateTime; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.Properties; public abstract class AMessageParser { private String topic; - protected ConcurrentLinkedQueue> queue; + protected DataObjectQueue queue; + protected Properties config; - public AMessageParser(String topic, ConcurrentLinkedQueue> queue) { + public AMessageParser(String topic, Properties config, DataObjectQueue queue) { this.topic = topic; + this.config = config; this.queue = queue; } diff --git a/src/de/hottis/MeasurementCollector/DataObjectQueue.java b/src/de/hottis/MeasurementCollector/DataObjectQueue.java new file mode 100644 index 0000000..2d61e8e --- /dev/null +++ b/src/de/hottis/MeasurementCollector/DataObjectQueue.java @@ -0,0 +1,11 @@ +package de.hottis.MeasurementCollector; + +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; + +@SuppressWarnings("serial") +public class DataObjectQueue extends ConcurrentLinkedQueue> { + public DataObjectQueue() { + super(); + } +} diff --git a/src/de/hottis/MeasurementCollector/FinderOnePhasePowerMeter.java b/src/de/hottis/MeasurementCollector/FinderOnePhasePowerMeter.java index 005713a..a2be8c6 100644 --- a/src/de/hottis/MeasurementCollector/FinderOnePhasePowerMeter.java +++ b/src/de/hottis/MeasurementCollector/FinderOnePhasePowerMeter.java @@ -6,8 +6,12 @@ import java.util.List; import org.openmuc.jmbus.DataRecord; -public class FinderOnePhasePowerMeter implements ADataParser { - public List parse(LocalDateTime timestamp, String name, List dataRecords) { +public class FinderOnePhasePowerMeter extends ADataParser { + public FinderOnePhasePowerMeter(String name) { + super(name); + } + + public List parse(LocalDateTime timestamp, List dataRecords) { ArrayList list = new ArrayList(); ElectricEnergyDataObject tdo = new ElectricEnergyDataObject(timestamp, name, diff --git a/src/de/hottis/MeasurementCollector/FinderThreePhasePowerMeter.java b/src/de/hottis/MeasurementCollector/FinderThreePhasePowerMeter.java index ab4de8b..4970271 100644 --- a/src/de/hottis/MeasurementCollector/FinderThreePhasePowerMeter.java +++ b/src/de/hottis/MeasurementCollector/FinderThreePhasePowerMeter.java @@ -6,8 +6,12 @@ import java.util.List; import org.openmuc.jmbus.DataRecord; -public class FinderThreePhasePowerMeter implements ADataParser { - public List parse(LocalDateTime timestamp, String name, List dataRecords) { +public class FinderThreePhasePowerMeter extends ADataParser { + public FinderThreePhasePowerMeter(String name) { + super(name); + } + + public List parse(LocalDateTime timestamp, List dataRecords) { ArrayList list = new ArrayList(); ElectricEnergyDataObject tdo = new ElectricEnergyDataObject(timestamp, name, diff --git a/src/de/hottis/MeasurementCollector/HottisFourChannelThermometer.java b/src/de/hottis/MeasurementCollector/HottisFourChannelThermometer.java index 6feb2a2..6c33253 100644 --- a/src/de/hottis/MeasurementCollector/HottisFourChannelThermometer.java +++ b/src/de/hottis/MeasurementCollector/HottisFourChannelThermometer.java @@ -6,8 +6,12 @@ import java.util.List; import org.openmuc.jmbus.DataRecord; -public class HottisFourChannelThermometer implements ADataParser { - public List parse(LocalDateTime timestamp, String name, List dataRecords) { +public class HottisFourChannelThermometer extends ADataParser { + public HottisFourChannelThermometer(String name) { + super(name); + } + + public List parse(LocalDateTime timestamp, List dataRecords) { ArrayList list = new ArrayList(); TemperatureDataObject tdo = new TemperatureDataObject(timestamp, "Hedge", dataRecords.get(5).getScaledDataValue()); diff --git a/src/de/hottis/MeasurementCollector/MBusParser.java b/src/de/hottis/MeasurementCollector/MBusParser.java index 1933ca2..147ff41 100644 --- a/src/de/hottis/MeasurementCollector/MBusParser.java +++ b/src/de/hottis/MeasurementCollector/MBusParser.java @@ -1,9 +1,12 @@ package de.hottis.MeasurementCollector; +import java.lang.reflect.Constructor; import java.time.LocalDateTime; +import java.util.Enumeration; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.Properties; import org.openmuc.jmbus.DataRecord; import org.openmuc.jmbus.MBusMessage; @@ -13,71 +16,80 @@ import com.json.parsers.JSONParser; import com.json.parsers.JsonParserFactory; public class MBusParser extends AMessageParser { + static final String DATA_PARSER_PROP = "mbus.dataparser"; static final String TOPIC = "IoT/Measurement/MeterbusHub"; private final JSONParser jsonParser; - private final HottisFourChannelThermometer hottisFourChannelThermometer; - private final FinderOnePhasePowerMeter finderOnePhasePowerMeter; - private final FinderThreePhasePowerMeter finderThreePhasePowerMeter; - - public MBusParser(ConcurrentLinkedQueue> queue) { - super(TOPIC, queue); + + private HashMap dataParsers; + + public MBusParser(Properties config, DataObjectQueue queue) { + super(TOPIC, config, queue); JsonParserFactory jsonParserFactory = JsonParserFactory.getInstance(); jsonParser = jsonParserFactory.newJsonParser(); - hottisFourChannelThermometer = new HottisFourChannelThermometer(); - finderOnePhasePowerMeter = new FinderOnePhasePowerMeter(); - finderThreePhasePowerMeter = new FinderThreePhasePowerMeter(); + dataParsers = new HashMap(); + } + + public void registerConfiguredDataParsers() throws MeasurementCollectorException { + try { + @SuppressWarnings("unchecked") + Enumeration propNames = (Enumeration) config.propertyNames(); + while (propNames.hasMoreElements()) { + String propName = propNames.nextElement(); + if (propName.startsWith(DATA_PARSER_PROP)) { + String[] parserConfigElements = config.get(propName).toString().split(","); + String nameInMsg = parserConfigElements[0]; + String nameInDatabase = parserConfigElements[1]; + String dataParserClassName = parserConfigElements[2]; + + Class klass = Class.forName(dataParserClassName); + Constructor constructor = klass.getConstructor(String.class); + ADataParser dataParser = (ADataParser) constructor.newInstance(nameInDatabase); + + dataParsers.put(nameInMsg, dataParser); + System.out.println(dataParserClassName + " registered for " + nameInMsg + ", " + nameInDatabase); + } + } + } + catch (Exception e) { + throw new MeasurementCollectorException("Exception when registering configured data parsers", e); + } } @Override public void execute(LocalDateTime timestamp, String msgPayload) { try { + @SuppressWarnings("rawtypes") Map payloadMap = jsonParser.parseJson(msgPayload); + @SuppressWarnings("rawtypes") String name = (String)(((Map)(payloadMap.get("metadata"))).get("name")); + @SuppressWarnings("rawtypes") String mbusMsgTxt = (String)(((Map)(payloadMap.get("data"))).get("telegram")); String [] octetsTxt = mbusMsgTxt.split(" "); byte [] octets = new byte[octetsTxt.length]; for (int i = 0; i < octetsTxt.length; i++) { octets[i] = (byte)(Integer.parseInt(octetsTxt[i], 16) & 0xff); } - + MBusMessage mbusMsg = MBusMessage.decode(octets, octets.length); VariableDataStructure variableDataStructure = mbusMsg.getVariableDataResponse(); variableDataStructure.decode(); List dataRecords = variableDataStructure.getDataRecords(); - //for (DataRecord dataRecord : dataRecords) { - // System.out.println(dataRecord.getScaledDataValue() + " " + dataRecord.getUnit().getUnit()); - //} - - List measurementItems; - switch (name) { - case "thermom.": - measurementItems = hottisFourChannelThermometer.parse(timestamp, name, dataRecords); - break; - case "light": - case "computer": - case "freezer": - case "dryer": - case "laundry": - case "dishwasher": - measurementItems = finderOnePhasePowerMeter.parse(timestamp, name.substring(0,1).toUpperCase() + name.substring(1).toLowerCase(), dataRecords); - break; - case "electricity": - measurementItems = finderThreePhasePowerMeter.parse(timestamp, "Total", dataRecords); - break; - default: - System.out.println("unknown name"); - measurementItems = null; + + if (dataParsers.containsKey(name)) { + List measurementItems = dataParsers.get(name).parse(timestamp, dataRecords); + + for (ADataObject ado : measurementItems) { + System.out.println(ado); + } + + queue.add(measurementItems); + System.out.println("Queue size: " + queue.size()); + } else { + System.out.println("unknown name: " + name); } - - for (ADataObject ado : measurementItems) { - System.out.println(ado); - } - - queue.add(measurementItems); - System.out.println("Queue size: " + queue.size()); } catch (Exception e) { System.out.println("Exception when handling mbus message: " + e); } diff --git a/src/de/hottis/MeasurementCollector/MeasurementCollector.java b/src/de/hottis/MeasurementCollector/MeasurementCollector.java index 10ac03a..1f151b6 100644 --- a/src/de/hottis/MeasurementCollector/MeasurementCollector.java +++ b/src/de/hottis/MeasurementCollector/MeasurementCollector.java @@ -1,8 +1,6 @@ package de.hottis.MeasurementCollector; -import java.util.List; import java.util.Properties; -import java.util.concurrent.ConcurrentLinkedQueue; public class MeasurementCollector { static final String PROPS_FILENAME = "measurementCollector.props"; @@ -14,15 +12,14 @@ public class MeasurementCollector { final Properties config = new Properties(); config.load(MeasurementCollector.class.getClassLoader().getResourceAsStream(PROPS_FILENAME)); + MqttReceiver mqttReceiver = new MqttReceiver(config); mqttReceiver.connect(); - // TestParser testParser = new TestParser(); - // mqttReceiver.registerParser(testParser); + DataObjectQueue queue = new DataObjectQueue(); - ConcurrentLinkedQueue> queue = new ConcurrentLinkedQueue>(); - - MBusParser mbusParser = new MBusParser(queue); + MBusParser mbusParser = new MBusParser(config, queue); + mbusParser.registerConfiguredDataParsers(); mqttReceiver.registerParser(mbusParser); } diff --git a/src/de/hottis/MeasurementCollector/MeasurementCollectorException.java b/src/de/hottis/MeasurementCollector/MeasurementCollectorException.java index 0a4dc20..758e889 100644 --- a/src/de/hottis/MeasurementCollector/MeasurementCollectorException.java +++ b/src/de/hottis/MeasurementCollector/MeasurementCollectorException.java @@ -1,8 +1,7 @@ package de.hottis.MeasurementCollector; +@SuppressWarnings("serial") public class MeasurementCollectorException extends Exception { - private static final long serialVersionUID = -5819010697931904741L; - public MeasurementCollectorException(String msg, Throwable cause) { super(msg, cause); } diff --git a/src/de/hottis/MeasurementCollector/TestParser.java b/src/de/hottis/MeasurementCollector/TestParser.java deleted file mode 100644 index 72c4aef..0000000 --- a/src/de/hottis/MeasurementCollector/TestParser.java +++ /dev/null @@ -1,16 +0,0 @@ -package de.hottis.MeasurementCollector; - -import java.time.LocalDateTime; - -public class TestParser extends AMessageParser { - - public TestParser() { - super("IoT/Watchdog"); - } - - @Override - public void execute(LocalDateTime timestamp, String msgPayload) { - System.out.println(timestamp.toString() + " " + msgPayload); - } - -} diff --git a/src/measurementCollector.props b/src/measurementCollector.props index 7cb4554..7a03150 100644 --- a/src/measurementCollector.props +++ b/src/measurementCollector.props @@ -1,3 +1,13 @@ -mqtt.broker = tcp://172.16.2.15:1883 -; mqtt.username = tron -; mqtt.password = geheim123 +; mqtt.broker = tcp://172.16.2.15:1883 +mqtt.broker = tcp://eupenstrasse20.dynamic.hottis.de:2883 +mqtt.username = tron +mqtt.password = geheim123 + +mbus.dataparser.1 = light,Light,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter +mbus.dataparser.2 = computer,Computer,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter +mbus.dataparser.3 = laundry,Laundry,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter +mbus.dataparser.4 = dryer,Dryer,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter +mbus.dataparser.5 = dishwasher,Dishwasher,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter +mbus.dataparser.6 = freezer,Freezer,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter +mbus.dataparser.7 = electricity,Total,de.hottis.MeasurementCollector.FinderThreePhasePowerMeter +mbus.dataparser.8 = thermom.,null,de.hottis.MeasurementCollector.HottisFourChannelThermometer