DataParsers loaded from configuration, DataObjectQueue introduced

This commit is contained in:
Wolfgang Hottgenroth 2017-11-16 11:45:50 +01:00
parent 9b8df1b119
commit bc8482b03c
23 changed files with 127 additions and 84 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
MeasurementCollectorMqttReceiver-*

View File

@ -1,3 +1,13 @@
mqtt.broker = tcp://172.16.2.15:1883 ; mqtt.broker = tcp://172.16.2.15:1883
; mqtt.username = tron mqtt.broker = tcp://eupenstrasse20.dynamic.hottis.de:2883
; mqtt.password = geheim123 mqtt.username = tron
mqtt.password = geheim123
mbus.dataparser.1 = light,Light,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.2 = computer,Computer,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.3 = laundry,Laundry,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.4 = dryer,Dryer,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.5 = dishwasher,Dishwasher,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.6 = freezer,Freezer,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.7 = electricity,Total,de.hottis.MeasurementCollector.FinderThreePhasePowerMeter
mbus.dataparser.8 = thermom.,null,de.hottis.MeasurementCollector.HottisFourChannelThermometer

View File

@ -5,6 +5,12 @@ import java.util.List;
import org.openmuc.jmbus.DataRecord; import org.openmuc.jmbus.DataRecord;
public interface ADataParser { public abstract class ADataParser {
public List<ADataObject> parse(LocalDateTime timestamp, String name, List<DataRecord> dataRecords); protected String name;
public ADataParser(String name) {
this.name = name;
}
abstract public List<ADataObject> parse(LocalDateTime timestamp, List<DataRecord> dataRecords);
} }

View File

@ -1,15 +1,16 @@
package de.hottis.MeasurementCollector; package de.hottis.MeasurementCollector;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.List; import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
public abstract class AMessageParser { public abstract class AMessageParser {
private String topic; private String topic;
protected ConcurrentLinkedQueue<List<ADataObject>> queue; protected DataObjectQueue queue;
protected Properties config;
public AMessageParser(String topic, ConcurrentLinkedQueue<List<ADataObject>> queue) { public AMessageParser(String topic, Properties config, DataObjectQueue queue) {
this.topic = topic; this.topic = topic;
this.config = config;
this.queue = queue; this.queue = queue;
} }

View File

@ -0,0 +1,11 @@
package de.hottis.MeasurementCollector;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@SuppressWarnings("serial")
public class DataObjectQueue extends ConcurrentLinkedQueue<List<ADataObject>> {
public DataObjectQueue() {
super();
}
}

View File

@ -6,8 +6,12 @@ import java.util.List;
import org.openmuc.jmbus.DataRecord; import org.openmuc.jmbus.DataRecord;
public class FinderOnePhasePowerMeter implements ADataParser { public class FinderOnePhasePowerMeter extends ADataParser {
public List<ADataObject> parse(LocalDateTime timestamp, String name, List<DataRecord> dataRecords) { public FinderOnePhasePowerMeter(String name) {
super(name);
}
public List<ADataObject> parse(LocalDateTime timestamp, List<DataRecord> dataRecords) {
ArrayList<ADataObject> list = new ArrayList<ADataObject>(); ArrayList<ADataObject> list = new ArrayList<ADataObject>();
ElectricEnergyDataObject tdo = new ElectricEnergyDataObject(timestamp, name, ElectricEnergyDataObject tdo = new ElectricEnergyDataObject(timestamp, name,

View File

@ -6,8 +6,12 @@ import java.util.List;
import org.openmuc.jmbus.DataRecord; import org.openmuc.jmbus.DataRecord;
public class FinderThreePhasePowerMeter implements ADataParser { public class FinderThreePhasePowerMeter extends ADataParser {
public List<ADataObject> parse(LocalDateTime timestamp, String name, List<DataRecord> dataRecords) { public FinderThreePhasePowerMeter(String name) {
super(name);
}
public List<ADataObject> parse(LocalDateTime timestamp, List<DataRecord> dataRecords) {
ArrayList<ADataObject> list = new ArrayList<ADataObject>(); ArrayList<ADataObject> list = new ArrayList<ADataObject>();
ElectricEnergyDataObject tdo = new ElectricEnergyDataObject(timestamp, name, ElectricEnergyDataObject tdo = new ElectricEnergyDataObject(timestamp, name,

View File

@ -6,8 +6,12 @@ import java.util.List;
import org.openmuc.jmbus.DataRecord; import org.openmuc.jmbus.DataRecord;
public class HottisFourChannelThermometer implements ADataParser { public class HottisFourChannelThermometer extends ADataParser {
public List<ADataObject> parse(LocalDateTime timestamp, String name, List<DataRecord> dataRecords) { public HottisFourChannelThermometer(String name) {
super(name);
}
public List<ADataObject> parse(LocalDateTime timestamp, List<DataRecord> dataRecords) {
ArrayList<ADataObject> list = new ArrayList<ADataObject>(); ArrayList<ADataObject> list = new ArrayList<ADataObject>();
TemperatureDataObject tdo = new TemperatureDataObject(timestamp, "Hedge", dataRecords.get(5).getScaledDataValue()); TemperatureDataObject tdo = new TemperatureDataObject(timestamp, "Hedge", dataRecords.get(5).getScaledDataValue());

View File

@ -1,9 +1,12 @@
package de.hottis.MeasurementCollector; package de.hottis.MeasurementCollector;
import java.lang.reflect.Constructor;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.Properties;
import org.openmuc.jmbus.DataRecord; import org.openmuc.jmbus.DataRecord;
import org.openmuc.jmbus.MBusMessage; import org.openmuc.jmbus.MBusMessage;
@ -13,71 +16,80 @@ import com.json.parsers.JSONParser;
import com.json.parsers.JsonParserFactory; import com.json.parsers.JsonParserFactory;
public class MBusParser extends AMessageParser { public class MBusParser extends AMessageParser {
static final String DATA_PARSER_PROP = "mbus.dataparser";
static final String TOPIC = "IoT/Measurement/MeterbusHub"; static final String TOPIC = "IoT/Measurement/MeterbusHub";
private final JSONParser jsonParser; private final JSONParser jsonParser;
private final HottisFourChannelThermometer hottisFourChannelThermometer;
private final FinderOnePhasePowerMeter finderOnePhasePowerMeter; private HashMap<String, ADataParser> dataParsers;
private final FinderThreePhasePowerMeter finderThreePhasePowerMeter;
public MBusParser(Properties config, DataObjectQueue queue) {
public MBusParser(ConcurrentLinkedQueue<List<ADataObject>> queue) { super(TOPIC, config, queue);
super(TOPIC, queue);
JsonParserFactory jsonParserFactory = JsonParserFactory.getInstance(); JsonParserFactory jsonParserFactory = JsonParserFactory.getInstance();
jsonParser = jsonParserFactory.newJsonParser(); jsonParser = jsonParserFactory.newJsonParser();
hottisFourChannelThermometer = new HottisFourChannelThermometer(); dataParsers = new HashMap<String, ADataParser>();
finderOnePhasePowerMeter = new FinderOnePhasePowerMeter(); }
finderThreePhasePowerMeter = new FinderThreePhasePowerMeter();
public void registerConfiguredDataParsers() throws MeasurementCollectorException {
try {
@SuppressWarnings("unchecked")
Enumeration<String> propNames = (Enumeration<String>) config.propertyNames();
while (propNames.hasMoreElements()) {
String propName = propNames.nextElement();
if (propName.startsWith(DATA_PARSER_PROP)) {
String[] parserConfigElements = config.get(propName).toString().split(",");
String nameInMsg = parserConfigElements[0];
String nameInDatabase = parserConfigElements[1];
String dataParserClassName = parserConfigElements[2];
Class<?> klass = Class.forName(dataParserClassName);
Constructor<?> constructor = klass.getConstructor(String.class);
ADataParser dataParser = (ADataParser) constructor.newInstance(nameInDatabase);
dataParsers.put(nameInMsg, dataParser);
System.out.println(dataParserClassName + " registered for " + nameInMsg + ", " + nameInDatabase);
}
}
}
catch (Exception e) {
throw new MeasurementCollectorException("Exception when registering configured data parsers", e);
}
} }
@Override @Override
public void execute(LocalDateTime timestamp, String msgPayload) { public void execute(LocalDateTime timestamp, String msgPayload) {
try { try {
@SuppressWarnings("rawtypes")
Map payloadMap = jsonParser.parseJson(msgPayload); Map payloadMap = jsonParser.parseJson(msgPayload);
@SuppressWarnings("rawtypes")
String name = (String)(((Map)(payloadMap.get("metadata"))).get("name")); String name = (String)(((Map)(payloadMap.get("metadata"))).get("name"));
@SuppressWarnings("rawtypes")
String mbusMsgTxt = (String)(((Map)(payloadMap.get("data"))).get("telegram")); String mbusMsgTxt = (String)(((Map)(payloadMap.get("data"))).get("telegram"));
String [] octetsTxt = mbusMsgTxt.split(" "); String [] octetsTxt = mbusMsgTxt.split(" ");
byte [] octets = new byte[octetsTxt.length]; byte [] octets = new byte[octetsTxt.length];
for (int i = 0; i < octetsTxt.length; i++) { for (int i = 0; i < octetsTxt.length; i++) {
octets[i] = (byte)(Integer.parseInt(octetsTxt[i], 16) & 0xff); octets[i] = (byte)(Integer.parseInt(octetsTxt[i], 16) & 0xff);
} }
MBusMessage mbusMsg = MBusMessage.decode(octets, octets.length); MBusMessage mbusMsg = MBusMessage.decode(octets, octets.length);
VariableDataStructure variableDataStructure = mbusMsg.getVariableDataResponse(); VariableDataStructure variableDataStructure = mbusMsg.getVariableDataResponse();
variableDataStructure.decode(); variableDataStructure.decode();
List<DataRecord> dataRecords = variableDataStructure.getDataRecords(); List<DataRecord> dataRecords = variableDataStructure.getDataRecords();
//for (DataRecord dataRecord : dataRecords) {
// System.out.println(dataRecord.getScaledDataValue() + " " + dataRecord.getUnit().getUnit()); if (dataParsers.containsKey(name)) {
//} List<ADataObject> measurementItems = dataParsers.get(name).parse(timestamp, dataRecords);
List<ADataObject> measurementItems; for (ADataObject ado : measurementItems) {
switch (name) { System.out.println(ado);
case "thermom.": }
measurementItems = hottisFourChannelThermometer.parse(timestamp, name, dataRecords);
break; queue.add(measurementItems);
case "light": System.out.println("Queue size: " + queue.size());
case "computer": } else {
case "freezer": System.out.println("unknown name: " + name);
case "dryer":
case "laundry":
case "dishwasher":
measurementItems = finderOnePhasePowerMeter.parse(timestamp, name.substring(0,1).toUpperCase() + name.substring(1).toLowerCase(), dataRecords);
break;
case "electricity":
measurementItems = finderThreePhasePowerMeter.parse(timestamp, "Total", dataRecords);
break;
default:
System.out.println("unknown name");
measurementItems = null;
} }
for (ADataObject ado : measurementItems) {
System.out.println(ado);
}
queue.add(measurementItems);
System.out.println("Queue size: " + queue.size());
} catch (Exception e) { } catch (Exception e) {
System.out.println("Exception when handling mbus message: " + e); System.out.println("Exception when handling mbus message: " + e);
} }

View File

@ -1,8 +1,6 @@
package de.hottis.MeasurementCollector; package de.hottis.MeasurementCollector;
import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
public class MeasurementCollector { public class MeasurementCollector {
static final String PROPS_FILENAME = "measurementCollector.props"; static final String PROPS_FILENAME = "measurementCollector.props";
@ -14,15 +12,14 @@ public class MeasurementCollector {
final Properties config = new Properties(); final Properties config = new Properties();
config.load(MeasurementCollector.class.getClassLoader().getResourceAsStream(PROPS_FILENAME)); config.load(MeasurementCollector.class.getClassLoader().getResourceAsStream(PROPS_FILENAME));
MqttReceiver mqttReceiver = new MqttReceiver(config); MqttReceiver mqttReceiver = new MqttReceiver(config);
mqttReceiver.connect(); mqttReceiver.connect();
// TestParser testParser = new TestParser(); DataObjectQueue queue = new DataObjectQueue();
// mqttReceiver.registerParser(testParser);
ConcurrentLinkedQueue<List<ADataObject>> queue = new ConcurrentLinkedQueue<List<ADataObject>>(); MBusParser mbusParser = new MBusParser(config, queue);
mbusParser.registerConfiguredDataParsers();
MBusParser mbusParser = new MBusParser(queue);
mqttReceiver.registerParser(mbusParser); mqttReceiver.registerParser(mbusParser);
} }

View File

@ -1,8 +1,7 @@
package de.hottis.MeasurementCollector; package de.hottis.MeasurementCollector;
@SuppressWarnings("serial")
public class MeasurementCollectorException extends Exception { public class MeasurementCollectorException extends Exception {
private static final long serialVersionUID = -5819010697931904741L;
public MeasurementCollectorException(String msg, Throwable cause) { public MeasurementCollectorException(String msg, Throwable cause) {
super(msg, cause); super(msg, cause);
} }

View File

@ -1,16 +0,0 @@
package de.hottis.MeasurementCollector;
import java.time.LocalDateTime;
public class TestParser extends AMessageParser {
public TestParser() {
super("IoT/Watchdog");
}
@Override
public void execute(LocalDateTime timestamp, String msgPayload) {
System.out.println(timestamp.toString() + " " + msgPayload);
}
}

View File

@ -1,3 +1,13 @@
mqtt.broker = tcp://172.16.2.15:1883 ; mqtt.broker = tcp://172.16.2.15:1883
; mqtt.username = tron mqtt.broker = tcp://eupenstrasse20.dynamic.hottis.de:2883
; mqtt.password = geheim123 mqtt.username = tron
mqtt.password = geheim123
mbus.dataparser.1 = light,Light,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.2 = computer,Computer,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.3 = laundry,Laundry,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.4 = dryer,Dryer,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.5 = dishwasher,Dishwasher,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.6 = freezer,Freezer,de.hottis.MeasurementCollector.FinderOnePhasePowerMeter
mbus.dataparser.7 = electricity,Total,de.hottis.MeasurementCollector.FinderThreePhasePowerMeter
mbus.dataparser.8 = thermom.,null,de.hottis.MeasurementCollector.HottisFourChannelThermometer