diff --git a/bin/de/hottis/MeasurementCollector/MqttReceiver$Callback.class b/bin/de/hottis/MeasurementCollector/MqttReceiver$Callback.class new file mode 100644 index 0000000..8a07425 Binary files /dev/null and b/bin/de/hottis/MeasurementCollector/MqttReceiver$Callback.class differ diff --git a/bin/de/hottis/MeasurementCollector/MqttReceiver$Listener.class b/bin/de/hottis/MeasurementCollector/MqttReceiver$Listener.class deleted file mode 100644 index a81de45..0000000 Binary files a/bin/de/hottis/MeasurementCollector/MqttReceiver$Listener.class and /dev/null differ diff --git a/bin/de/hottis/MeasurementCollector/MqttReceiver.class b/bin/de/hottis/MeasurementCollector/MqttReceiver.class index e5f3bd1..a235172 100644 Binary files a/bin/de/hottis/MeasurementCollector/MqttReceiver.class and b/bin/de/hottis/MeasurementCollector/MqttReceiver.class differ diff --git a/bin/log4j2.xml b/bin/log4j2.xml index 09cd5cf..0e87b69 100644 --- a/bin/log4j2.xml +++ b/bin/log4j2.xml @@ -4,10 +4,14 @@ + + + + \ No newline at end of file diff --git a/bin/measurementCollector.props b/bin/measurementCollector.props index 7a03150..c798c52 100644 --- a/bin/measurementCollector.props +++ b/bin/measurementCollector.props @@ -1,4 +1,4 @@ -; mqtt.broker = tcp://172.16.2.15:1883 +;mqtt.broker = tcp://172.16.2.15:1883 mqtt.broker = tcp://eupenstrasse20.dynamic.hottis.de:2883 mqtt.username = tron mqtt.password = geheim123 diff --git a/src/de/hottis/MeasurementCollector/MqttReceiver.java b/src/de/hottis/MeasurementCollector/MqttReceiver.java index e7b6b10..635eac3 100644 --- a/src/de/hottis/MeasurementCollector/MqttReceiver.java +++ b/src/de/hottis/MeasurementCollector/MqttReceiver.java @@ -4,9 +4,12 @@ import java.time.LocalDateTime; import java.util.HashMap; import java.util.Properties; +import javax.management.RuntimeErrorException; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.eclipse.paho.client.mqttv3.IMqttMessageListener; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; @@ -23,10 +26,23 @@ public class MqttReceiver { static final String MQTT_PASSWORD_PROP = "mqtt.password"; - class Listener implements IMqttMessageListener { + class Callback implements MqttCallbackExtended { public void messageArrived(String topic, MqttMessage payload) { parsers.get(topic).execute(LocalDateTime.now(), payload.toString()); } + + public void connectComplete(boolean reconnect, java.lang.String serverURI) { + logger.info("Connection established for " + serverURI); + } + + public void connectionLost(java.lang.Throwable cause) { + logger.error("Connection lost, cause: " + cause.toString()); + reconnect(); + } + + public void deliveryComplete(IMqttDeliveryToken token) { + + } } private final String broker; @@ -34,7 +50,7 @@ public class MqttReceiver { private final MqttConnectOptions connOpts; private MqttClient client; private HashMap parsers; - private final Listener listener = new Listener(); + private final Callback callback = new Callback(); public MqttReceiver(Properties config) { broker = config.getProperty(MQTT_BROKER_PROP, "localhost"); @@ -53,6 +69,7 @@ public class MqttReceiver { public void connect() throws MeasurementCollectorException { try { client = new MqttClient(broker, clientId); + client.setCallback(callback); client.connect(connOpts); logger.info("Connected"); } catch (MqttException e) { @@ -60,10 +77,35 @@ public class MqttReceiver { } } + public void reconnect() { + logger.error("reconnect called"); + if (! client.isConnected()) { + while (true) { + try { + client.connect(connOpts); + for (String topic : parsers.keySet()) { + client.subscribe(topic); + logger.info("Re-Subscribed: " + topic); + } + logger.error("reconnecting successfully completed"); + break; + } catch (MqttException e) { + logger.error("Exception during reconnection: " + e.toString()); + try { + Thread.sleep(10*1000); + } catch (InterruptedException e1) { + } + } + } + } else { + logger.error("client is still connected"); + } + } + public void registerParser(AMessageParser parser) throws MeasurementCollectorException { try { parsers.put(parser.getTopic(), parser); - client.subscribe(parser.getTopic(), listener); + client.subscribe(parser.getTopic()); logger.info("Subscribed: " + parser.getTopic()); } catch (MqttException e) { throw new MeasurementCollectorException("MqttReceiver.registerParser", e); diff --git a/src/log4j2.xml b/src/log4j2.xml index 09cd5cf..0e87b69 100644 --- a/src/log4j2.xml +++ b/src/log4j2.xml @@ -4,10 +4,14 @@ + + + + \ No newline at end of file diff --git a/src/measurementCollector.props b/src/measurementCollector.props index 7a03150..c798c52 100644 --- a/src/measurementCollector.props +++ b/src/measurementCollector.props @@ -1,4 +1,4 @@ -; mqtt.broker = tcp://172.16.2.15:1883 +;mqtt.broker = tcp://172.16.2.15:1883 mqtt.broker = tcp://eupenstrasse20.dynamic.hottis.de:2883 mqtt.username = tron mqtt.password = geheim123