Compare commits
11 Commits
SplitProje
...
master
Author | SHA1 | Date | |
---|---|---|---|
d567d08591
|
|||
518b64210b | |||
809653812a | |||
c2b9affbaa | |||
73e40cabe6 | |||
65a7e3ba81 | |||
70d0a054cb | |||
56009a3748 | |||
47164bba83 | |||
117f3c01e5 | |||
04d5338294 |
@ -1,4 +1,4 @@
|
|||||||
mqtt.broker = tcp://172.16.2.15:1883
|
mqtt.broker = tcp://127.0.0.1
|
||||||
;mqtt.broker = tcp://eupenstrasse20.dynamic.hottis.de:2883
|
;mqtt.broker = tcp://eupenstrasse20.dynamic.hottis.de:2883
|
||||||
;mqtt.username = tron
|
;mqtt.username = tron
|
||||||
;mqtt.password = geheim123
|
;mqtt.password = geheim123
|
||||||
@ -15,3 +15,4 @@ mbus.dataparser.8 = thermom.,null,de.hottis.measurementCollector.HottisFourChann
|
|||||||
jms.broker = tcp://localhost:61616
|
jms.broker = tcp://localhost:61616
|
||||||
jms.clientid = mcol
|
jms.clientid = mcol
|
||||||
jms.parseddata.topic = IoT/Measurement
|
jms.parseddata.topic = IoT/Measurement
|
||||||
|
|
||||||
|
5
pom.xml
5
pom.xml
@ -18,6 +18,11 @@
|
|||||||
<version>3.8.1</version>
|
<version>3.8.1</version>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>quick-json</groupId>
|
||||||
|
<artifactId>quick-json</artifactId>
|
||||||
|
<version>1.0.2.3</version>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>de.hottis.common</groupId>
|
<groupId>de.hottis.common</groupId>
|
||||||
<artifactId>HottisLibJava</artifactId>
|
<artifactId>HottisLibJava</artifactId>
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package de.hottis.measurementCollector;
|
package de.hottis.measurementCollector;
|
||||||
|
|
||||||
|
import de.hottis.smarthomelib.ADataObject;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -1,5 +1,8 @@
|
|||||||
package de.hottis.measurementCollector;
|
package de.hottis.measurementCollector;
|
||||||
|
|
||||||
|
import de.hottis.common.HottisCommonException;
|
||||||
|
import de.hottis.common.MyQueue;
|
||||||
|
import de.hottis.smarthomelib.ADataObject;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
@ -10,8 +13,8 @@ import org.apache.logging.log4j.Logger;
|
|||||||
public abstract class AMessageParser {
|
public abstract class AMessageParser {
|
||||||
final protected Logger logger = LogManager.getRootLogger();
|
final protected Logger logger = LogManager.getRootLogger();
|
||||||
|
|
||||||
private String topic;
|
private final String topic;
|
||||||
private MyQueue<ADataObject> queue;
|
private final MyQueue<ADataObject> queue;
|
||||||
protected Properties config;
|
protected Properties config;
|
||||||
|
|
||||||
public AMessageParser(String topic, Properties config, MyQueue<ADataObject> queue) {
|
public AMessageParser(String topic, Properties config, MyQueue<ADataObject> queue) {
|
||||||
@ -28,11 +31,15 @@ public abstract class AMessageParser {
|
|||||||
return this.topic;
|
return this.topic;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void enqueue(List<ADataObject> itemList) throws MeasurementCollectorException {
|
public void enqueue(List<ADataObject> itemList) throws HottisCommonException {
|
||||||
for (ADataObject ado : itemList) {
|
for (ADataObject ado : itemList) {
|
||||||
queue.enqueue(ado);
|
enqueue(ado);
|
||||||
logger.debug("message enqueued");
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void enqueue(ADataObject item) throws HottisCommonException {
|
||||||
|
queue.enqueue(item);
|
||||||
|
logger.debug("message enqueued");
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract public void execute(LocalDateTime timestamp, String msgPayload);
|
abstract public void execute(LocalDateTime timestamp, String msgPayload);
|
||||||
|
@ -0,0 +1,41 @@
|
|||||||
|
package de.hottis.measurementCollector;
|
||||||
|
|
||||||
|
import de.hottis.common.MyQueue;
|
||||||
|
import de.hottis.smarthomelib.ADataObject;
|
||||||
|
import de.hottis.smarthomelib.TemperatureDataObject;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @author wn
|
||||||
|
*/
|
||||||
|
public class EspThermometerParser extends AMessageParser {
|
||||||
|
|
||||||
|
static final String TOPIC = "IoT/espThermometer2/measurement";
|
||||||
|
|
||||||
|
public EspThermometerParser(Properties config, MyQueue<ADataObject> queue) {
|
||||||
|
super(TOPIC, config, queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void execute(LocalDateTime timestamp, String msgPayload) {
|
||||||
|
try {
|
||||||
|
// BueroBochum 22.93 2.486 1577
|
||||||
|
String[] payloadParts = msgPayload.split(" ");
|
||||||
|
if (payloadParts.length != 4) {
|
||||||
|
throw new MeasurementCollectorException("invalid number of parts in message: " + msgPayload);
|
||||||
|
}
|
||||||
|
String name = payloadParts[0];
|
||||||
|
double temperature = Double.parseDouble(payloadParts[1]);
|
||||||
|
|
||||||
|
TemperatureDataObject tdo = new TemperatureDataObject(timestamp, name, temperature);
|
||||||
|
|
||||||
|
enqueue(tdo);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Exception when esp thermometer message: ", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -1,5 +1,7 @@
|
|||||||
package de.hottis.measurementCollector;
|
package de.hottis.measurementCollector;
|
||||||
|
|
||||||
|
import de.hottis.smarthomelib.ADataObject;
|
||||||
|
import de.hottis.smarthomelib.ElectricEnergyDataObject;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
package de.hottis.measurementCollector;
|
package de.hottis.measurementCollector;
|
||||||
|
|
||||||
|
import de.hottis.smarthomelib.ADataObject;
|
||||||
|
import de.hottis.smarthomelib.ElectricEnergyDataObject;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
package de.hottis.measurementCollector;
|
package de.hottis.measurementCollector;
|
||||||
|
|
||||||
|
import de.hottis.smarthomelib.ADataObject;
|
||||||
|
import de.hottis.smarthomelib.TemperatureDataObject;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -0,0 +1,59 @@
|
|||||||
|
package de.hottis.measurementCollector;
|
||||||
|
|
||||||
|
import java.lang.reflect.Constructor;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.Enumeration;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
|
||||||
|
import com.json.parsers.JSONParser;
|
||||||
|
import com.json.parsers.JsonParserFactory;
|
||||||
|
import de.hottis.common.MyQueue;
|
||||||
|
import de.hottis.smarthomelib.ADataObject;
|
||||||
|
import de.hottis.smarthomelib.TemperatureDataObject;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
public class HottisFreezerThermometerParser extends AMessageParser {
|
||||||
|
/*
|
||||||
|
{ "metadata": { "device": "ModbusHub", "Slave": "Thermometer", "connectionValid": "1", "requests": 151, "successfulRequests": 150, "failedRequests": 1, "exceptionErrors": 0, "retries": 0}, "data": {"n1": 194825, "u1": 1.86, "r1": 1035.02, "offset1": -0.24, "factor1": 1.00, "tRaw1": 9.09, "t1": 9.07, "alpha1": 1.00, "n2": 198656, "u2": 1.89, "r2": 951.77, "offset2": -0.31, "factor2": 0.99, "tRaw2": -12.52, "t2": -12.52, "alpha2": 1.00, "uptime": 3183}}
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
static final String TOPIC = "IoT/Measurement/ModbusHub";
|
||||||
|
|
||||||
|
private final JSONParser jsonParser;
|
||||||
|
|
||||||
|
public HottisFreezerThermometerParser(Properties config, MyQueue<ADataObject> queue) {
|
||||||
|
super(TOPIC, config, queue);
|
||||||
|
JsonParserFactory jsonParserFactory = JsonParserFactory.getInstance();
|
||||||
|
jsonParser = jsonParserFactory.newJsonParser();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void execute(LocalDateTime timestamp, String msgPayload) {
|
||||||
|
try {
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
Map payloadMap = jsonParser.parseJson(msgPayload);
|
||||||
|
|
||||||
|
ArrayList<ADataObject> temperatureList = new ArrayList<>();
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
double tFreezer = Double.parseDouble((String)(((Map)(payloadMap.get("data"))).get("t2")));
|
||||||
|
TemperatureDataObject tdo = new TemperatureDataObject(timestamp, "Freezer", tFreezer);
|
||||||
|
temperatureList.add(tdo);
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
double tFridge = Double.parseDouble((String)(((Map)(payloadMap.get("data"))).get("t1")));
|
||||||
|
tdo = new TemperatureDataObject(timestamp, "Fridge", tFridge);
|
||||||
|
temperatureList.add(tdo);
|
||||||
|
|
||||||
|
enqueue(temperatureList);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Exception when handling Modbus thermometer message: ", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -14,6 +14,8 @@ import org.openmuc.jmbus.VariableDataStructure;
|
|||||||
|
|
||||||
import com.json.parsers.JSONParser;
|
import com.json.parsers.JSONParser;
|
||||||
import com.json.parsers.JsonParserFactory;
|
import com.json.parsers.JsonParserFactory;
|
||||||
|
import de.hottis.common.MyQueue;
|
||||||
|
import de.hottis.smarthomelib.ADataObject;
|
||||||
|
|
||||||
public class MBusParser extends AMessageParser {
|
public class MBusParser extends AMessageParser {
|
||||||
static final String DATA_PARSER_PROP = "mbus.dataparser";
|
static final String DATA_PARSER_PROP = "mbus.dataparser";
|
||||||
|
@ -1,5 +1,9 @@
|
|||||||
package de.hottis.measurementCollector;
|
package de.hottis.measurementCollector;
|
||||||
|
|
||||||
|
import de.hottis.common.JmsTopic;
|
||||||
|
import de.hottis.common.ConsoleQueue;
|
||||||
|
import de.hottis.smarthomelib.ADataObject;
|
||||||
|
import java.io.FileInputStream;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
@ -15,14 +19,16 @@ public class MeasurementCollector {
|
|||||||
logger.info("MeasurementCollector starting");
|
logger.info("MeasurementCollector starting");
|
||||||
|
|
||||||
final Properties config = new Properties();
|
final Properties config = new Properties();
|
||||||
config.load(MeasurementCollector.class.getClassLoader().getResourceAsStream(PROPS_FILENAME));
|
try (FileInputStream propsFileInputStream = new FileInputStream(PROPS_FILENAME)) {
|
||||||
|
config.load(propsFileInputStream);
|
||||||
|
}
|
||||||
logger.debug("Configuration loaded");
|
logger.debug("Configuration loaded");
|
||||||
|
|
||||||
MqttReceiver mqttReceiver = new MqttReceiver(config);
|
MqttReceiver mqttReceiver = new MqttReceiver(config);
|
||||||
mqttReceiver.connect();
|
|
||||||
logger.debug("MqttReceiver started");
|
logger.debug("MqttReceiver started");
|
||||||
|
|
||||||
JmsTopic<ADataObject> queue = new JmsTopic<ADataObject>(config, JmsTopic.Mode.PRODUCER);
|
// JmsTopic<ADataObject> queue = new JmsTopic<>(config, JmsTopic.Mode.PRODUCER);
|
||||||
|
ConsoleQueue<ADataObject> queue = new ConsoleQueue<>(config);
|
||||||
queue.init();
|
queue.init();
|
||||||
|
|
||||||
MBusParser mbusParser = new MBusParser(config, queue);
|
MBusParser mbusParser = new MBusParser(config, queue);
|
||||||
@ -30,6 +36,19 @@ public class MeasurementCollector {
|
|||||||
mbusParser.registerConfiguredDataParsers();
|
mbusParser.registerConfiguredDataParsers();
|
||||||
mqttReceiver.registerParser(mbusParser);
|
mqttReceiver.registerParser(mbusParser);
|
||||||
logger.debug("MBusParser started");
|
logger.debug("MBusParser started");
|
||||||
}
|
|
||||||
|
EspThermometerParser espThermometerParser = new EspThermometerParser(config, queue);
|
||||||
|
espThermometerParser.init();
|
||||||
|
mqttReceiver.registerParser(espThermometerParser);
|
||||||
|
logger.debug("EspThermometerParser started");
|
||||||
|
|
||||||
|
HottisFreezerThermometerParser hottisFreezerThermometerParser = new HottisFreezerThermometerParser(config, queue);
|
||||||
|
hottisFreezerThermometerParser.init();
|
||||||
|
mqttReceiver.registerParser(hottisFreezerThermometerParser);
|
||||||
|
logger.debug("HottisFreezerThermometerParser started");
|
||||||
|
|
||||||
|
mqttReceiver.connect();
|
||||||
|
logger.debug("MqttReceiver connected");
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,16 @@
|
|||||||
|
package de.hottis.measurementCollector;
|
||||||
|
|
||||||
|
import de.hottis.common.HottisCommonException;
|
||||||
|
|
||||||
|
public class MeasurementCollectorException extends HottisCommonException {
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
public MeasurementCollectorException(String msg, Throwable cause) {
|
||||||
|
super(msg, cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MeasurementCollectorException(String msg) {
|
||||||
|
super(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -68,8 +68,8 @@ public class MqttReceiver {
|
|||||||
try {
|
try {
|
||||||
client = new MqttClient(broker, clientId);
|
client = new MqttClient(broker, clientId);
|
||||||
client.setCallback(callback);
|
client.setCallback(callback);
|
||||||
client.connect(connOpts);
|
reconnect();
|
||||||
logger.info("Connected");
|
logger.info("Connected");
|
||||||
} catch (MqttException e) {
|
} catch (MqttException e) {
|
||||||
throw new MeasurementCollectorException("MqttReceiver.connect", e);
|
throw new MeasurementCollectorException("MqttReceiver.connect", e);
|
||||||
}
|
}
|
||||||
@ -81,10 +81,9 @@ public class MqttReceiver {
|
|||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
client.connect(connOpts);
|
client.connect(connOpts);
|
||||||
for (String topic : parsers.keySet()) {
|
String[] topics = parsers.keySet().toArray(new String[0]);
|
||||||
client.subscribe(topic);
|
client.subscribe(topics);
|
||||||
logger.info("Re-Subscribed: " + topic);
|
logger.info("Re-Subscribed: " + topics.toString());
|
||||||
}
|
|
||||||
logger.error("reconnecting successfully completed");
|
logger.error("reconnecting successfully completed");
|
||||||
break;
|
break;
|
||||||
} catch (MqttException e) {
|
} catch (MqttException e) {
|
||||||
@ -101,12 +100,6 @@ public class MqttReceiver {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void registerParser(AMessageParser parser) throws MeasurementCollectorException {
|
public void registerParser(AMessageParser parser) throws MeasurementCollectorException {
|
||||||
try {
|
parsers.put(parser.getTopic(), parser);
|
||||||
parsers.put(parser.getTopic(), parser);
|
|
||||||
client.subscribe(parser.getTopic());
|
|
||||||
logger.info("Subscribed: " + parser.getTopic());
|
|
||||||
} catch (MqttException e) {
|
|
||||||
throw new MeasurementCollectorException("MqttReceiver.registerParser", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user