100 lines
3.3 KiB
Java
100 lines
3.3 KiB
Java
package de.hottis.MeasurementCollector;
|
|
|
|
import java.lang.reflect.Constructor;
|
|
import java.time.LocalDateTime;
|
|
import java.util.Enumeration;
|
|
import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Properties;
|
|
|
|
import org.openmuc.jmbus.DataRecord;
|
|
import org.openmuc.jmbus.MBusMessage;
|
|
import org.openmuc.jmbus.VariableDataStructure;
|
|
|
|
import com.json.parsers.JSONParser;
|
|
import com.json.parsers.JsonParserFactory;
|
|
|
|
public class MBusParser extends AMessageParser {
|
|
static final String DATA_PARSER_PROP = "mbus.dataparser";
|
|
|
|
static final String TOPIC = "IoT/Measurement/MeterbusHub";
|
|
|
|
private final JSONParser jsonParser;
|
|
|
|
private HashMap<String, ADataParser> dataParsers;
|
|
|
|
public MBusParser(Properties config, DataObjectQueue queue) {
|
|
super(TOPIC, config, queue);
|
|
JsonParserFactory jsonParserFactory = JsonParserFactory.getInstance();
|
|
jsonParser = jsonParserFactory.newJsonParser();
|
|
dataParsers = new HashMap<String, ADataParser>();
|
|
}
|
|
|
|
public void registerConfiguredDataParsers() throws MeasurementCollectorException {
|
|
try {
|
|
@SuppressWarnings("unchecked")
|
|
Enumeration<String> propNames = (Enumeration<String>) config.propertyNames();
|
|
while (propNames.hasMoreElements()) {
|
|
String propName = propNames.nextElement();
|
|
if (propName.startsWith(DATA_PARSER_PROP)) {
|
|
String[] parserConfigElements = config.get(propName).toString().split(",");
|
|
String nameInMsg = parserConfigElements[0];
|
|
String nameInDatabase = parserConfigElements[1];
|
|
String dataParserClassName = parserConfigElements[2];
|
|
|
|
Class<?> klass = Class.forName(dataParserClassName);
|
|
Constructor<?> constructor = klass.getConstructor(String.class);
|
|
ADataParser dataParser = (ADataParser) constructor.newInstance(nameInDatabase);
|
|
|
|
dataParsers.put(nameInMsg, dataParser);
|
|
logger.info(dataParserClassName + " registered for " + nameInMsg + ", " + nameInDatabase);
|
|
}
|
|
}
|
|
}
|
|
catch (Exception e) {
|
|
throw new MeasurementCollectorException("Exception when registering configured data parsers", e);
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public void execute(LocalDateTime timestamp, String msgPayload) {
|
|
try {
|
|
@SuppressWarnings("rawtypes")
|
|
Map payloadMap = jsonParser.parseJson(msgPayload);
|
|
@SuppressWarnings("rawtypes")
|
|
String name = (String)(((Map)(payloadMap.get("metadata"))).get("name"));
|
|
@SuppressWarnings("rawtypes")
|
|
String mbusMsgTxt = (String)(((Map)(payloadMap.get("data"))).get("telegram"));
|
|
String [] octetsTxt = mbusMsgTxt.split(" ");
|
|
byte [] octets = new byte[octetsTxt.length];
|
|
for (int i = 0; i < octetsTxt.length; i++) {
|
|
octets[i] = (byte)(Integer.parseInt(octetsTxt[i], 16) & 0xff);
|
|
}
|
|
|
|
MBusMessage mbusMsg = MBusMessage.decode(octets, octets.length);
|
|
VariableDataStructure variableDataStructure = mbusMsg.getVariableDataResponse();
|
|
variableDataStructure.decode();
|
|
List<DataRecord> dataRecords = variableDataStructure.getDataRecords();
|
|
|
|
|
|
if (dataParsers.containsKey(name)) {
|
|
List<ADataObject> measurementItems = dataParsers.get(name).parse(timestamp, dataRecords);
|
|
|
|
for (ADataObject ado : measurementItems) {
|
|
logger.debug(ado);
|
|
}
|
|
|
|
queue.add(measurementItems);
|
|
logger.debug("Queue size: " + queue.size());
|
|
} else {
|
|
logger.warn("unknown name: " + name);
|
|
}
|
|
} catch (Exception e) {
|
|
logger.error("Exception when handling mbus message: " + e);
|
|
}
|
|
|
|
}
|
|
|
|
}
|