connect after all subscriptions

This commit is contained in:
Wolfgang Hottgenroth
2017-11-28 17:53:12 +01:00
parent 56009a3748
commit 70d0a054cb
2 changed files with 6 additions and 10 deletions

View File

@@ -24,7 +24,6 @@ public class MeasurementCollector {
logger.debug("Configuration loaded"); logger.debug("Configuration loaded");
MqttReceiver mqttReceiver = new MqttReceiver(config); MqttReceiver mqttReceiver = new MqttReceiver(config);
mqttReceiver.connect();
logger.debug("MqttReceiver started"); logger.debug("MqttReceiver started");
JmsTopic<ADataObject> queue = new JmsTopic<>(config, JmsTopic.Mode.PRODUCER); JmsTopic<ADataObject> queue = new JmsTopic<>(config, JmsTopic.Mode.PRODUCER);
@@ -40,6 +39,9 @@ public class MeasurementCollector {
espThermometerParser.init(); espThermometerParser.init();
mqttReceiver.registerParser(espThermometerParser); mqttReceiver.registerParser(espThermometerParser);
logger.debug("EspThermometerParser started"); logger.debug("EspThermometerParser started");
mqttReceiver.connect();
logger.debug("MqttReceiver connected");
} }
} }

View File

@@ -68,7 +68,7 @@ public class MqttReceiver {
try { try {
client = new MqttClient(broker, clientId); client = new MqttClient(broker, clientId);
client.setCallback(callback); client.setCallback(callback);
client.connect(connOpts); reconnect();
logger.info("Connected"); logger.info("Connected");
} catch (MqttException e) { } catch (MqttException e) {
throw new MeasurementCollectorException("MqttReceiver.connect", e); throw new MeasurementCollectorException("MqttReceiver.connect", e);
@@ -101,12 +101,6 @@ public class MqttReceiver {
} }
public void registerParser(AMessageParser parser) throws MeasurementCollectorException { public void registerParser(AMessageParser parser) throws MeasurementCollectorException {
try {
parsers.put(parser.getTopic(), parser); parsers.put(parser.getTopic(), parser);
client.subscribe(parser.getTopic());
logger.info("Subscribed: " + parser.getTopic());
} catch (MqttException e) {
throw new MeasurementCollectorException("MqttReceiver.registerParser", e);
}
} }
} }