11 Commits

13 changed files with 175 additions and 25 deletions

View File

@ -1,4 +1,4 @@
mqtt.broker = tcp://172.16.2.15:1883
mqtt.broker = tcp://127.0.0.1
;mqtt.broker = tcp://eupenstrasse20.dynamic.hottis.de:2883
;mqtt.username = tron
;mqtt.password = geheim123
@ -14,4 +14,5 @@ mbus.dataparser.8 = thermom.,null,de.hottis.measurementCollector.HottisFourChann
jms.broker = tcp://localhost:61616
jms.clientid = mcol
jms.parseddata.topic = IoT/Measurement
jms.parseddata.topic = IoT/Measurement

View File

@ -18,6 +18,11 @@
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>quick-json</groupId>
<artifactId>quick-json</artifactId>
<version>1.0.2.3</version>
</dependency>
<dependency>
<groupId>de.hottis.common</groupId>
<artifactId>HottisLibJava</artifactId>

View File

@ -1,5 +1,6 @@
package de.hottis.measurementCollector;
import de.hottis.smarthomelib.ADataObject;
import java.time.LocalDateTime;
import java.util.List;

View File

@ -1,5 +1,8 @@
package de.hottis.measurementCollector;
import de.hottis.common.HottisCommonException;
import de.hottis.common.MyQueue;
import de.hottis.smarthomelib.ADataObject;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Properties;
@ -10,8 +13,8 @@ import org.apache.logging.log4j.Logger;
public abstract class AMessageParser {
final protected Logger logger = LogManager.getRootLogger();
private String topic;
private MyQueue<ADataObject> queue;
private final String topic;
private final MyQueue<ADataObject> queue;
protected Properties config;
public AMessageParser(String topic, Properties config, MyQueue<ADataObject> queue) {
@ -28,11 +31,15 @@ public abstract class AMessageParser {
return this.topic;
}
public void enqueue(List<ADataObject> itemList) throws MeasurementCollectorException {
public void enqueue(List<ADataObject> itemList) throws HottisCommonException {
for (ADataObject ado : itemList) {
queue.enqueue(ado);
logger.debug("message enqueued");
}
enqueue(ado);
}
}
public void enqueue(ADataObject item) throws HottisCommonException {
queue.enqueue(item);
logger.debug("message enqueued");
}
abstract public void execute(LocalDateTime timestamp, String msgPayload);

View File

@ -0,0 +1,41 @@
package de.hottis.measurementCollector;
import de.hottis.common.MyQueue;
import de.hottis.smarthomelib.ADataObject;
import de.hottis.smarthomelib.TemperatureDataObject;
import java.time.LocalDateTime;
import java.util.Properties;
/**
*
* @author wn
*/
public class EspThermometerParser extends AMessageParser {
static final String TOPIC = "IoT/espThermometer2/measurement";
public EspThermometerParser(Properties config, MyQueue<ADataObject> queue) {
super(TOPIC, config, queue);
}
@Override
public void execute(LocalDateTime timestamp, String msgPayload) {
try {
// BueroBochum 22.93 2.486 1577
String[] payloadParts = msgPayload.split(" ");
if (payloadParts.length != 4) {
throw new MeasurementCollectorException("invalid number of parts in message: " + msgPayload);
}
String name = payloadParts[0];
double temperature = Double.parseDouble(payloadParts[1]);
TemperatureDataObject tdo = new TemperatureDataObject(timestamp, name, temperature);
enqueue(tdo);
} catch (Exception e) {
logger.error("Exception when esp thermometer message: ", e);
}
}
}

View File

@ -1,5 +1,7 @@
package de.hottis.measurementCollector;
import de.hottis.smarthomelib.ADataObject;
import de.hottis.smarthomelib.ElectricEnergyDataObject;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;

View File

@ -1,5 +1,7 @@
package de.hottis.measurementCollector;
import de.hottis.smarthomelib.ADataObject;
import de.hottis.smarthomelib.ElectricEnergyDataObject;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;

View File

@ -1,5 +1,7 @@
package de.hottis.measurementCollector;
import de.hottis.smarthomelib.ADataObject;
import de.hottis.smarthomelib.TemperatureDataObject;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;

View File

@ -0,0 +1,59 @@
package de.hottis.measurementCollector;
import java.lang.reflect.Constructor;
import java.time.LocalDateTime;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.json.parsers.JSONParser;
import com.json.parsers.JsonParserFactory;
import de.hottis.common.MyQueue;
import de.hottis.smarthomelib.ADataObject;
import de.hottis.smarthomelib.TemperatureDataObject;
import java.util.ArrayList;
public class HottisFreezerThermometerParser extends AMessageParser {
/*
{ "metadata": { "device": "ModbusHub", "Slave": "Thermometer", "connectionValid": "1", "requests": 151, "successfulRequests": 150, "failedRequests": 1, "exceptionErrors": 0, "retries": 0}, "data": {"n1": 194825, "u1": 1.86, "r1": 1035.02, "offset1": -0.24, "factor1": 1.00, "tRaw1": 9.09, "t1": 9.07, "alpha1": 1.00, "n2": 198656, "u2": 1.89, "r2": 951.77, "offset2": -0.31, "factor2": 0.99, "tRaw2": -12.52, "t2": -12.52, "alpha2": 1.00, "uptime": 3183}}
*/
static final String TOPIC = "IoT/Measurement/ModbusHub";
private final JSONParser jsonParser;
public HottisFreezerThermometerParser(Properties config, MyQueue<ADataObject> queue) {
super(TOPIC, config, queue);
JsonParserFactory jsonParserFactory = JsonParserFactory.getInstance();
jsonParser = jsonParserFactory.newJsonParser();
}
@Override
public void execute(LocalDateTime timestamp, String msgPayload) {
try {
@SuppressWarnings("rawtypes")
Map payloadMap = jsonParser.parseJson(msgPayload);
ArrayList<ADataObject> temperatureList = new ArrayList<>();
@SuppressWarnings("rawtypes")
double tFreezer = Double.parseDouble((String)(((Map)(payloadMap.get("data"))).get("t2")));
TemperatureDataObject tdo = new TemperatureDataObject(timestamp, "Freezer", tFreezer);
temperatureList.add(tdo);
@SuppressWarnings("rawtypes")
double tFridge = Double.parseDouble((String)(((Map)(payloadMap.get("data"))).get("t1")));
tdo = new TemperatureDataObject(timestamp, "Fridge", tFridge);
temperatureList.add(tdo);
enqueue(temperatureList);
} catch (Exception e) {
logger.error("Exception when handling Modbus thermometer message: ", e);
}
}
}

View File

@ -14,6 +14,8 @@ import org.openmuc.jmbus.VariableDataStructure;
import com.json.parsers.JSONParser;
import com.json.parsers.JsonParserFactory;
import de.hottis.common.MyQueue;
import de.hottis.smarthomelib.ADataObject;
public class MBusParser extends AMessageParser {
static final String DATA_PARSER_PROP = "mbus.dataparser";

View File

@ -1,5 +1,9 @@
package de.hottis.measurementCollector;
import de.hottis.common.JmsTopic;
import de.hottis.common.ConsoleQueue;
import de.hottis.smarthomelib.ADataObject;
import java.io.FileInputStream;
import java.util.Properties;
import org.apache.logging.log4j.LogManager;
@ -15,14 +19,16 @@ public class MeasurementCollector {
logger.info("MeasurementCollector starting");
final Properties config = new Properties();
config.load(MeasurementCollector.class.getClassLoader().getResourceAsStream(PROPS_FILENAME));
try (FileInputStream propsFileInputStream = new FileInputStream(PROPS_FILENAME)) {
config.load(propsFileInputStream);
}
logger.debug("Configuration loaded");
MqttReceiver mqttReceiver = new MqttReceiver(config);
mqttReceiver.connect();
logger.debug("MqttReceiver started");
JmsTopic<ADataObject> queue = new JmsTopic<ADataObject>(config, JmsTopic.Mode.PRODUCER);
// JmsTopic<ADataObject> queue = new JmsTopic<>(config, JmsTopic.Mode.PRODUCER);
ConsoleQueue<ADataObject> queue = new ConsoleQueue<>(config);
queue.init();
MBusParser mbusParser = new MBusParser(config, queue);
@ -30,6 +36,19 @@ public class MeasurementCollector {
mbusParser.registerConfiguredDataParsers();
mqttReceiver.registerParser(mbusParser);
logger.debug("MBusParser started");
}
EspThermometerParser espThermometerParser = new EspThermometerParser(config, queue);
espThermometerParser.init();
mqttReceiver.registerParser(espThermometerParser);
logger.debug("EspThermometerParser started");
HottisFreezerThermometerParser hottisFreezerThermometerParser = new HottisFreezerThermometerParser(config, queue);
hottisFreezerThermometerParser.init();
mqttReceiver.registerParser(hottisFreezerThermometerParser);
logger.debug("HottisFreezerThermometerParser started");
mqttReceiver.connect();
logger.debug("MqttReceiver connected");
}
}

View File

@ -0,0 +1,16 @@
package de.hottis.measurementCollector;
import de.hottis.common.HottisCommonException;
public class MeasurementCollectorException extends HottisCommonException {
private static final long serialVersionUID = 1L;
public MeasurementCollectorException(String msg, Throwable cause) {
super(msg, cause);
}
public MeasurementCollectorException(String msg) {
super(msg);
}
}

View File

@ -68,8 +68,8 @@ public class MqttReceiver {
try {
client = new MqttClient(broker, clientId);
client.setCallback(callback);
client.connect(connOpts);
logger.info("Connected");
reconnect();
logger.info("Connected");
} catch (MqttException e) {
throw new MeasurementCollectorException("MqttReceiver.connect", e);
}
@ -81,10 +81,9 @@ public class MqttReceiver {
while (true) {
try {
client.connect(connOpts);
for (String topic : parsers.keySet()) {
client.subscribe(topic);
logger.info("Re-Subscribed: " + topic);
}
String[] topics = parsers.keySet().toArray(new String[0]);
client.subscribe(topics);
logger.info("Re-Subscribed: " + topics.toString());
logger.error("reconnecting successfully completed");
break;
} catch (MqttException e) {
@ -101,12 +100,6 @@ public class MqttReceiver {
}
public void registerParser(AMessageParser parser) throws MeasurementCollectorException {
try {
parsers.put(parser.getTopic(), parser);
client.subscribe(parser.getTopic());
logger.info("Subscribed: " + parser.getTopic());
} catch (MqttException e) {
throw new MeasurementCollectorException("MqttReceiver.registerParser", e);
}
parsers.put(parser.getTopic(), parser);
}
}