reconnection in mqtt receiver introduced
This commit is contained in:
BIN
bin/de/hottis/MeasurementCollector/MqttReceiver$Callback.class
Normal file
BIN
bin/de/hottis/MeasurementCollector/MqttReceiver$Callback.class
Normal file
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -4,10 +4,14 @@
|
|||||||
<Console name="ConsoleAppender" target="SYSTEM_OUT">
|
<Console name="ConsoleAppender" target="SYSTEM_OUT">
|
||||||
<PatternLayout pattern="%d [%t, %F, %L] %-5level - %msg%n%throwable"/>
|
<PatternLayout pattern="%d [%t, %F, %L] %-5level - %msg%n%throwable"/>
|
||||||
</Console>
|
</Console>
|
||||||
|
<File name="FileAppender" fileName="/tmp/MeasurementCollector.log">
|
||||||
|
<PatternLayout pattern="%d [%t, %F, %L] %-5level - %msg%n%throwable"/>
|
||||||
|
</File>
|
||||||
</Appenders>
|
</Appenders>
|
||||||
<Loggers>
|
<Loggers>
|
||||||
<Root level="debug">
|
<Root level="debug">
|
||||||
<AppenderRef ref="ConsoleAppender"/>
|
<AppenderRef ref="ConsoleAppender"/>
|
||||||
|
<AppenderRef ref="FileAppender"/>
|
||||||
</Root>
|
</Root>
|
||||||
</Loggers>
|
</Loggers>
|
||||||
</Configuration>
|
</Configuration>
|
@ -4,9 +4,12 @@ import java.time.LocalDateTime;
|
|||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
|
import javax.management.RuntimeErrorException;
|
||||||
|
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
|
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
|
||||||
import org.eclipse.paho.client.mqttv3.MqttClient;
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||||
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
||||||
import org.eclipse.paho.client.mqttv3.MqttException;
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||||
@ -23,10 +26,23 @@ public class MqttReceiver {
|
|||||||
static final String MQTT_PASSWORD_PROP = "mqtt.password";
|
static final String MQTT_PASSWORD_PROP = "mqtt.password";
|
||||||
|
|
||||||
|
|
||||||
class Listener implements IMqttMessageListener {
|
class Callback implements MqttCallbackExtended {
|
||||||
public void messageArrived(String topic, MqttMessage payload) {
|
public void messageArrived(String topic, MqttMessage payload) {
|
||||||
parsers.get(topic).execute(LocalDateTime.now(), payload.toString());
|
parsers.get(topic).execute(LocalDateTime.now(), payload.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void connectComplete(boolean reconnect, java.lang.String serverURI) {
|
||||||
|
logger.info("Connection established for " + serverURI);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void connectionLost(java.lang.Throwable cause) {
|
||||||
|
logger.error("Connection lost, cause: " + cause.toString());
|
||||||
|
reconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void deliveryComplete(IMqttDeliveryToken token) {
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final String broker;
|
private final String broker;
|
||||||
@ -34,7 +50,7 @@ public class MqttReceiver {
|
|||||||
private final MqttConnectOptions connOpts;
|
private final MqttConnectOptions connOpts;
|
||||||
private MqttClient client;
|
private MqttClient client;
|
||||||
private HashMap<String, AMessageParser> parsers;
|
private HashMap<String, AMessageParser> parsers;
|
||||||
private final Listener listener = new Listener();
|
private final Callback callback = new Callback();
|
||||||
|
|
||||||
public MqttReceiver(Properties config) {
|
public MqttReceiver(Properties config) {
|
||||||
broker = config.getProperty(MQTT_BROKER_PROP, "localhost");
|
broker = config.getProperty(MQTT_BROKER_PROP, "localhost");
|
||||||
@ -53,6 +69,7 @@ public class MqttReceiver {
|
|||||||
public void connect() throws MeasurementCollectorException {
|
public void connect() throws MeasurementCollectorException {
|
||||||
try {
|
try {
|
||||||
client = new MqttClient(broker, clientId);
|
client = new MqttClient(broker, clientId);
|
||||||
|
client.setCallback(callback);
|
||||||
client.connect(connOpts);
|
client.connect(connOpts);
|
||||||
logger.info("Connected");
|
logger.info("Connected");
|
||||||
} catch (MqttException e) {
|
} catch (MqttException e) {
|
||||||
@ -60,10 +77,35 @@ public class MqttReceiver {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void reconnect() {
|
||||||
|
logger.error("reconnect called");
|
||||||
|
if (! client.isConnected()) {
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
client.connect(connOpts);
|
||||||
|
for (String topic : parsers.keySet()) {
|
||||||
|
client.subscribe(topic);
|
||||||
|
logger.info("Re-Subscribed: " + topic);
|
||||||
|
}
|
||||||
|
logger.error("reconnecting successfully completed");
|
||||||
|
break;
|
||||||
|
} catch (MqttException e) {
|
||||||
|
logger.error("Exception during reconnection: " + e.toString());
|
||||||
|
try {
|
||||||
|
Thread.sleep(10*1000);
|
||||||
|
} catch (InterruptedException e1) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.error("client is still connected");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void registerParser(AMessageParser parser) throws MeasurementCollectorException {
|
public void registerParser(AMessageParser parser) throws MeasurementCollectorException {
|
||||||
try {
|
try {
|
||||||
parsers.put(parser.getTopic(), parser);
|
parsers.put(parser.getTopic(), parser);
|
||||||
client.subscribe(parser.getTopic(), listener);
|
client.subscribe(parser.getTopic());
|
||||||
logger.info("Subscribed: " + parser.getTopic());
|
logger.info("Subscribed: " + parser.getTopic());
|
||||||
} catch (MqttException e) {
|
} catch (MqttException e) {
|
||||||
throw new MeasurementCollectorException("MqttReceiver.registerParser", e);
|
throw new MeasurementCollectorException("MqttReceiver.registerParser", e);
|
||||||
|
@ -4,10 +4,14 @@
|
|||||||
<Console name="ConsoleAppender" target="SYSTEM_OUT">
|
<Console name="ConsoleAppender" target="SYSTEM_OUT">
|
||||||
<PatternLayout pattern="%d [%t, %F, %L] %-5level - %msg%n%throwable"/>
|
<PatternLayout pattern="%d [%t, %F, %L] %-5level - %msg%n%throwable"/>
|
||||||
</Console>
|
</Console>
|
||||||
|
<File name="FileAppender" fileName="/tmp/MeasurementCollector.log">
|
||||||
|
<PatternLayout pattern="%d [%t, %F, %L] %-5level - %msg%n%throwable"/>
|
||||||
|
</File>
|
||||||
</Appenders>
|
</Appenders>
|
||||||
<Loggers>
|
<Loggers>
|
||||||
<Root level="debug">
|
<Root level="debug">
|
||||||
<AppenderRef ref="ConsoleAppender"/>
|
<AppenderRef ref="ConsoleAppender"/>
|
||||||
|
<AppenderRef ref="FileAppender"/>
|
||||||
</Root>
|
</Root>
|
||||||
</Loggers>
|
</Loggers>
|
||||||
</Configuration>
|
</Configuration>
|
Reference in New Issue
Block a user