From 401417d485964f8d50e1f0dec4bc8dc5a5112fb9 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Fri, 17 Nov 2017 10:54:24 +0100 Subject: [PATCH] reconnection in mqtt receiver introduced --- .../MqttReceiver$Callback.class | Bin 0 -> 2294 bytes .../MqttReceiver$Listener.class | Bin 1355 -> 0 bytes .../MeasurementCollector/MqttReceiver.class | Bin 3700 -> 4591 bytes bin/log4j2.xml | 4 ++ bin/measurementCollector.props | 2 +- .../MeasurementCollector/MqttReceiver.java | 50 ++++++++++++++++-- src/log4j2.xml | 4 ++ src/measurementCollector.props | 2 +- 8 files changed, 56 insertions(+), 6 deletions(-) create mode 100644 bin/de/hottis/MeasurementCollector/MqttReceiver$Callback.class delete mode 100644 bin/de/hottis/MeasurementCollector/MqttReceiver$Listener.class diff --git a/bin/de/hottis/MeasurementCollector/MqttReceiver$Callback.class b/bin/de/hottis/MeasurementCollector/MqttReceiver$Callback.class new file mode 100644 index 0000000000000000000000000000000000000000..8a0742579b49ac361da5b11fa4ef48b751b83207 GIT binary patch literal 2294 zcmbVNZFAd15Pr^aB3l({l9n`~HE~I3Z0ALVqyg$eNSu}i?6xFspeaz!^2Jsv>x?9) z?ayMSqy=UeKJ%j(_GHD@kPfZ+pqG>O_St>**}cF1{_PI{tFSdB7*^}TY|0?8eX}CC z-|<9CxIsxej<5phnU$Y{U`JTOJ`&zsi91e>TaPs)8O9Fz5jP#~Hq5)VL$cN|!muK} zhAAw^Zu`P)^QJWEojjT?a&)v3`a5xR^JyU5x~MaZ1WntYyUMUs9w43~ZM0_agmIgGZ1@AHP(@H}Ff*x1OG``qpZp6xb@C4-4Fr8 zm3Y=&MHTZgBBD-4bS4$t@m;Dy!`8Yfm zAgK21SU!U??r5km%qRi;M@k0ns&JCFdmtIoo`@J^VEeQtUD8OszeDkiMt>-?vh=AH zrs8o`o1T24#trp#3RK{!7!eNHFJ>*?@x^sw;aern2Mp)>rSEN5kv}x>oeFALc%G!b zn8FVR?x~449Qq*kV(Is`(vTXf3rC#+&-wyb%e_uQ+v@b#iZ)i|W8qQ`p+f1E&WiuB zpF!W19nTWCY&DR_V<`0U=&_p^k-TnFi!UovH>!fpt}*i>o|w=v?A6p8F_d}2t)L3y6_yv zOI%nuM&?h1QT!e24<P`lL+#=MiBREJW}R#aqUAhm@aLjFJ;3fC7w&3&_Xz}^=62rHqkxzh7hGm>so z)I*oojOaEP=y1i|iUF%KibPGGz!CaPT}89YgvCl1`IPzKx(bweMp!O%b+Wii=-CYS zWj_d)9azZO$YRhzKk|fZA$Az7s6f7MHTI<55qrKQ3|B%=_`5=@c%K@3;^YZy-6Zgj zO0Hp`(WYZVYi4<$uu-U-a=UG`3ToxbIlOkzS}yL|xQ=lPV}$uG_;fIVNkUF|9%mG{ z<9U~?w)1Wo<-4y$RIiF=*~Se_TewLWN{Z_I>EIS_Gut(32y>lPw-r?`b`X)8EX=Y@ z&Z%l6RoGjhBdKl7psD^ZtPQDOE?(oh*5hYF}2tN`g3TfE0eq7B0mMkn1 zN*8a40~btY&V=n|5=Kr!B;%8|*v1U*J6ORg!IGb(*D@@*rIeQcB-24o3K6Te*Mv>w z#acZhBF1fs&weOaWDDIWiePVtE$zvdDjtKO&RbzA4nuIZ0=^kGeGx^Hry+MT+?eJG z%kt9;%ID)jsRVSoeJ}-O2t2@A5@-w$ W@rb=Mc+AilJVBYiani)6*MYxRKzC&T diff --git a/bin/de/hottis/MeasurementCollector/MqttReceiver.class b/bin/de/hottis/MeasurementCollector/MqttReceiver.class index e5f3bd1a34a0df2e7d0d73edd7b114f0ac0088b0..a235172ff09c3fbb220ee88034b7cd949cdda47e 100644 GIT binary patch delta 1790 zcmZux>vt1H6#q@qu$yEHHKl+(oDxs1^hpq^V5=as9xS#LTI8h&>2^rB4V!j%1FeEV zQB+V77ZmYX6ocZcJW@(ke4~g0z90Cg|A1fo#0Tr0Gzs(^lQXk(=gz(NcYpWJ{4@FQ z!kp9po_HTXC59w87|QJPuCl)BNe-_X42N5SY6rvh_BmI%`|6t1hHX3klGDPG&CWf} zyeKv>cuj7lw;84v+d)<|DP>b#vdn`px)rpchdc`9%*iFoWZc8xu8HV6S1sP^#b(4L z+{-X!*IiSlE7*cYhAd5A7m;y4!}x|S%WC2(*EVotnmS_UK|3B$a1|c3`=vnELk#71 zFDtaO<(p$}gQrf(c$6V0Bl<;pBLN>*uv3I_al?oh0_H9SPr{~*G&2K@!C73g%7azd zqu?n#%`m2e_cU{h;fmst`u5<4U?6H~;Q(z_d+`jOm9Q_ykUdJH;5j_cAZr#kf>y*} zm@vw?F11e{UPP~iml%AP9Hg}h_Ty#ZMd#fVetgMP_a@ z+G47P*1{zmw0qr?r@qcGu8CJPr}tZ{{2s*dhM=)gtlm=aHr}y^+|?7~IIQ3ZjuLDu z(rg);9uh(Ap{(4PfSFLxFW8dWg61MZBx8V~FvBz*_nT2w<)*nV8V>jP)yVqJFsEwE z7^Fhq(yj7NON;3K)~N99zhoKF1*~m2?4k~(-_#rrKEOv3KBQ6#T4I4@&P&@29&AuR$^2pV5XJ}64*$~ z3g6uW*yNzfkEX3e;}LQdB8i@vjS{wbN!*TWXf6a;2W9Fc<-sJB9RqmG0Rb?9 zC-!0cXsD|b*gb%~3=MHiD<8lMjGAIey*N~sz$-D~Vi56_zSsJ3h<4sgpf8ylojt~| zZ}_Y@;X9rT631*|Ibv{P#Ngeu4S^Vs_m1Oae0WPke0WpR?$e}QA7z+_2n-@-C-z`3 zPNfKn%|{MpvV@Xuq9eD%k7ZP||DtA5Mu>5?#j_N{6WcOvp5C8%7ZvB+yb!r!KcjW34+)&#BGMD??;%Ty$rk5ISrz7V!j{C z|G#zeF@}P4Ae-K%Nmx&9qf>80sBO9~OSm?bFf}lK#xDde-9L^|{gWf(S6m>qKwd;& i(W$P|qxRgfzQW3+Lvafn_$@jAjz8!tG7r%B6Zj8B@y9g) delta 851 zcmY+CSxggA6o&uX7G}y!6)THTB$!t1YNHZa3MPOG8VDMQiit{FhEmc@r7hrwDDHv^ zUZ~=ZD{fJumMDq7`(}JIzUZ@wFFqU3wA92*&OO`Qd%kn-{bl)Gl>6z=`wsw0@j`%( z!O1^oIcK~}bW%7Jiz})UW!S*SWS7y}I3M6#_e0D$uLtxKdVLpqdAs1v?`JU9MO0M@ z#FY*Shj2u|Vg5?6nvSBG!4OhABO;FR@4_yHJZA-}?~&DPlM>+^ZSJ~ilRiAO_f(93KX_y)Kq-lKZY8YUKgq0|S4HnqpfdiFrq6RKBz>V!#gWVJjU>#Jf z=ld4;2NTGGjxMmmF^6!Q;|(^F)XqYEiypExsZSzC%5Cm@hl4si#=^!qbhnuYoU<7G zI0Hu8?Nnk2dh92ADY9uT!(x=97!{POq~c!ks?koFf^Uke5%wljVKeIKYp2pBX@C?yxir~_ctn6z^L^+t mebD0_l5#~F*fgJBWVKjR+qGfq@FcZ9#WT`5wAn-Q9QX_B_ro~= diff --git a/bin/log4j2.xml b/bin/log4j2.xml index 09cd5cf..0e87b69 100644 --- a/bin/log4j2.xml +++ b/bin/log4j2.xml @@ -4,10 +4,14 @@ + + + + \ No newline at end of file diff --git a/bin/measurementCollector.props b/bin/measurementCollector.props index 7a03150..c798c52 100644 --- a/bin/measurementCollector.props +++ b/bin/measurementCollector.props @@ -1,4 +1,4 @@ -; mqtt.broker = tcp://172.16.2.15:1883 +;mqtt.broker = tcp://172.16.2.15:1883 mqtt.broker = tcp://eupenstrasse20.dynamic.hottis.de:2883 mqtt.username = tron mqtt.password = geheim123 diff --git a/src/de/hottis/MeasurementCollector/MqttReceiver.java b/src/de/hottis/MeasurementCollector/MqttReceiver.java index e7b6b10..635eac3 100644 --- a/src/de/hottis/MeasurementCollector/MqttReceiver.java +++ b/src/de/hottis/MeasurementCollector/MqttReceiver.java @@ -4,9 +4,12 @@ import java.time.LocalDateTime; import java.util.HashMap; import java.util.Properties; +import javax.management.RuntimeErrorException; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.eclipse.paho.client.mqttv3.IMqttMessageListener; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; @@ -23,10 +26,23 @@ public class MqttReceiver { static final String MQTT_PASSWORD_PROP = "mqtt.password"; - class Listener implements IMqttMessageListener { + class Callback implements MqttCallbackExtended { public void messageArrived(String topic, MqttMessage payload) { parsers.get(topic).execute(LocalDateTime.now(), payload.toString()); } + + public void connectComplete(boolean reconnect, java.lang.String serverURI) { + logger.info("Connection established for " + serverURI); + } + + public void connectionLost(java.lang.Throwable cause) { + logger.error("Connection lost, cause: " + cause.toString()); + reconnect(); + } + + public void deliveryComplete(IMqttDeliveryToken token) { + + } } private final String broker; @@ -34,7 +50,7 @@ public class MqttReceiver { private final MqttConnectOptions connOpts; private MqttClient client; private HashMap parsers; - private final Listener listener = new Listener(); + private final Callback callback = new Callback(); public MqttReceiver(Properties config) { broker = config.getProperty(MQTT_BROKER_PROP, "localhost"); @@ -53,6 +69,7 @@ public class MqttReceiver { public void connect() throws MeasurementCollectorException { try { client = new MqttClient(broker, clientId); + client.setCallback(callback); client.connect(connOpts); logger.info("Connected"); } catch (MqttException e) { @@ -60,10 +77,35 @@ public class MqttReceiver { } } + public void reconnect() { + logger.error("reconnect called"); + if (! client.isConnected()) { + while (true) { + try { + client.connect(connOpts); + for (String topic : parsers.keySet()) { + client.subscribe(topic); + logger.info("Re-Subscribed: " + topic); + } + logger.error("reconnecting successfully completed"); + break; + } catch (MqttException e) { + logger.error("Exception during reconnection: " + e.toString()); + try { + Thread.sleep(10*1000); + } catch (InterruptedException e1) { + } + } + } + } else { + logger.error("client is still connected"); + } + } + public void registerParser(AMessageParser parser) throws MeasurementCollectorException { try { parsers.put(parser.getTopic(), parser); - client.subscribe(parser.getTopic(), listener); + client.subscribe(parser.getTopic()); logger.info("Subscribed: " + parser.getTopic()); } catch (MqttException e) { throw new MeasurementCollectorException("MqttReceiver.registerParser", e); diff --git a/src/log4j2.xml b/src/log4j2.xml index 09cd5cf..0e87b69 100644 --- a/src/log4j2.xml +++ b/src/log4j2.xml @@ -4,10 +4,14 @@ + + + + \ No newline at end of file diff --git a/src/measurementCollector.props b/src/measurementCollector.props index 7a03150..c798c52 100644 --- a/src/measurementCollector.props +++ b/src/measurementCollector.props @@ -1,4 +1,4 @@ -; mqtt.broker = tcp://172.16.2.15:1883 +;mqtt.broker = tcp://172.16.2.15:1883 mqtt.broker = tcp://eupenstrasse20.dynamic.hottis.de:2883 mqtt.username = tron mqtt.password = geheim123