diff --git a/boot.py b/boot.py index b8e7bb0..2e2e5d1 100644 --- a/boot.py +++ b/boot.py @@ -1 +1,7 @@ -# boot.py -- run on boot-up +import machine + +# wdt = machine.WDT(timeout=8000) +# wdt.feed() +# print("Watchdog enabled") + + diff --git a/main.py b/main.py index 27856e0..af9397b 100644 --- a/main.py +++ b/main.py @@ -1,103 +1,47 @@ # main.py -- put your code here! -from umqttsimple import MQTTClient, MQTTException -import time -import ubinascii -from machine import WDT -import network -import os + +from mqtt_as import MQTTClient +from mqtt_as import config as mqttConfig import config - -last_message = 0 -message_interval = 5 -counter = 0 -ringFlag = False -wdt = None -wdtCnt = 0 -client = None -wlan = None +import uasyncio as asyncio -def init(): - global wdt - wdt = WDT(timeout=8000) - wdt.feed() - print("Watchdog enabled") +def sub_cb(topic, msg, retained): + print(f"Topic = {topic} Msg = {msg} Retained = {retained}") - global wlan - wlan = network.WLAN(network.STA_IF) - wlan.active(True) - wlan.connect(config.SSID, config.WPA_KEY) - - connectCnt = 0 - while (not wlan.isconnected() or connectCnt > config.MAX_CONNECT_RETRY_CNT): - print("Not yet connected") - wdt.feed() - connectCnt += 1 - time.sleep(1) - - if (not wlan.isconnected()): - print("Failed to connect to wlan, restarting") - machine.reset() - - print(f"Connected to WLAN: {machine.reset_cause()} {connectCnt} {wlan.ifconfig()}") - - global client - clientId = ubinascii.hexlify(bytearray(os.urandom(16))).decode() - client = MQTTClient(clientId, config.MQTT_SERVER, keepalive=60, - ssl=True, user=config.MQTT_LOGIN, password=config.MQTT_PASSWORD) - client.set_callback(mqtt_callback) - client.connect() - print(f"Connected to MQTT broker {config.MQTT_SERVER} ") - client.subscribe(config.WATCHDOG_TOPIC) - print(f"Subscribed to {config.WATCHDOG_TOPIC}") - client.publish(config.STARTUP_TOPIC, f"Hello, rpipw thermometer is alive now at {wlan.ifconfig()}") - - -def mqtt_callback(topic, msg): - global wdt, wdtCnt - # print(f"Received: {topic}: {msg}") - if topic == config.WATCHDOG_TOPIC: - wdtCnt += 1 - # print(f"feed watchdog {wdtCnt}") - wdt.feed() - -def main(): - global client - print("Hello") - time.sleep(1) - - try: - init() - except Exception as e: - print(f"Some error when starting: {e.__class__.__name__}, {e}") - time.sleep(15) - machine.reset() - - last_time_mqtt_loop = time.ticks_ms() - last_time_1_second = time.ticks_ms() - last_time_main_period = time.ticks_ms() - payload_cnt = 0 +async def main(client): + await client.connect() + n = 0 + await asyncio.sleep(2) # Give broker time while True: - current_time = time.ticks_ms() + print('publish', n) + # If WiFi is down the following will pause for the duration. + await client.publish('rpi/hb', '{} {}'.format(n, client.REPUB_COUNT), qos = 1) + n += 1 + await asyncio.sleep(20) # Broker is slow - if (time.ticks_diff(current_time, last_time_mqtt_loop) > 250): - last_time_mqtt_loop = current_time - client.ping() - client.wait_msg() +async def connected_cb(client): + await client.subscribe('rpi/in', 1) - if (time.ticks_diff(current_time, last_time_1_second) > 1000): - last_time_1_second = current_time - client.publish(config.HEARTBEAT_TOPIC, f"still alive {current_time}") +loop = asyncio.get_event_loop() - if (time.ticks_diff(current_time, last_time_main_period) > config.MAIN_PERIOD_MS): - last_time_main_period = current_time - client.publish(config.PAYLOAD_TOPIC, f"measurement {payload_cnt}") - payload_cnt += 1 - print("Payload sent") +mqttConfig['ssid'] = config.SSID +mqttConfig['wifi_pw'] = config.WPA_KEY +mqttConfig['subs_cb'] = sub_cb +mqttConfig['server'] = config.MQTT_SERVER +mqttConfig['port'] = 8883 +mqttConfig['user'] = config.MQTT_LOGIN +mqttConfig['password'] = config.MQTT_PASSWORD +mqttConfig['connect_coro'] = connected_cb +# mqttConfig['client_id'] = ubinascii.hexlify(bytearray(os.urandom(16))).decode() +mqttConfig['ssl'] = False +# Set up client +MQTTClient.DEBUG = True # Optional +client = MQTTClient(mqttConfig) - client.disconnect() - -if __name__ == "__main__": - main() \ No newline at end of file +try: + loop.run_until_complete(main(client)) +finally: + client.close() \ No newline at end of file diff --git a/mqtt_as.py b/mqtt_as.py new file mode 100644 index 0000000..09a3804 --- /dev/null +++ b/mqtt_as.py @@ -0,0 +1,731 @@ +# mqtt_as.py Asynchronous version of umqtt.robust +# (C) Copyright Peter Hinch 2017-2022. +# Released under the MIT licence. + +# Pyboard D support added also RP2/default +# Various improvements contributed by Kevin Köck. + +import gc +import usocket as socket +import ustruct as struct + +gc.collect() +from ubinascii import hexlify +import uasyncio as asyncio + +gc.collect() +from utime import ticks_ms, ticks_diff +from uerrno import EINPROGRESS, ETIMEDOUT + +gc.collect() +from micropython import const +from machine import unique_id +import network + +gc.collect() +from sys import platform + +VERSION = (0, 6, 6) + +# Default short delay for good SynCom throughput (avoid sleep(0) with SynCom). +_DEFAULT_MS = const(20) +_SOCKET_POLL_DELAY = const(5) # 100ms added greatly to publish latency + +# Legitimate errors while waiting on a socket. See uasyncio __init__.py open_connection(). +ESP32 = platform == 'esp32' or platform == 'esp32_LoBo' +RP2 = platform == 'rp2' +if ESP32: + # https://forum.micropython.org/viewtopic.php?f=16&t=3608&p=20942#p20942 + BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT, 118, 119] # Add in weird ESP32 errors +elif RP2: + BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT, -110] +else: + BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT] + +ESP8266 = platform == 'esp8266' +PYBOARD = platform == 'pyboard' + +# Default "do little" coro for optional user replacement +async def eliza(*_): # e.g. via set_wifi_handler(coro): see test program + await asyncio.sleep_ms(_DEFAULT_MS) + + +config = { + 'client_id': hexlify(unique_id()), + 'server': None, + 'port': 0, + 'user': '', + 'password': '', + 'keepalive': 60, + 'ping_interval': 0, + 'ssl': False, + 'ssl_params': {}, + 'response_time': 10, + 'clean_init': True, + 'clean': True, + 'max_repubs': 4, + 'will': None, + 'subs_cb': lambda *_: None, + 'wifi_coro': eliza, + 'connect_coro': eliza, + 'ssid': None, + 'wifi_pw': None, +} + + +class MQTTException(Exception): + pass + + +def pid_gen(): + pid = 0 + while True: + pid = pid + 1 if pid < 65535 else 1 + yield pid + + +def qos_check(qos): + if not (qos == 0 or qos == 1): + raise ValueError('Only qos 0 and 1 are supported.') + + +# MQTT_base class. Handles MQTT protocol on the basis of a good connection. +# Exceptions from connectivity failures are handled by MQTTClient subclass. +class MQTT_base: + REPUB_COUNT = 0 # TEST + DEBUG = False + + def __init__(self, config): + # MQTT config + self._client_id = config['client_id'] + self._user = config['user'] + self._pswd = config['password'] + self._keepalive = config['keepalive'] + if self._keepalive >= 65536: + raise ValueError('invalid keepalive time') + self._response_time = config['response_time'] * 1000 # Repub if no PUBACK received (ms). + self._max_repubs = config['max_repubs'] + self._clean_init = config['clean_init'] # clean_session state on first connection + self._clean = config['clean'] # clean_session state on reconnect + will = config['will'] + if will is None: + self._lw_topic = False + else: + self._set_last_will(*will) + # WiFi config + self._ssid = config['ssid'] # Required for ESP32 / Pyboard D. Optional ESP8266 + self._wifi_pw = config['wifi_pw'] + self._ssl = config['ssl'] + self._ssl_params = config['ssl_params'] + # Callbacks and coros + self._cb = config['subs_cb'] + self._wifi_handler = config['wifi_coro'] + self._connect_handler = config['connect_coro'] + # Network + self.port = config['port'] + if self.port == 0: + self.port = 8883 if self._ssl else 1883 + self.server = config['server'] + if self.server is None: + raise ValueError('no server specified.') + self._sock = None + self._sta_if = network.WLAN(network.STA_IF) + self._sta_if.active(True) + + self.newpid = pid_gen() + self.rcv_pids = set() # PUBACK and SUBACK pids awaiting ACK response + self.last_rx = ticks_ms() # Time of last communication from broker + self.lock = asyncio.Lock() + + def _set_last_will(self, topic, msg, retain=False, qos=0): + qos_check(qos) + if not topic: + raise ValueError('Empty topic.') + self._lw_topic = topic + self._lw_msg = msg + self._lw_qos = qos + self._lw_retain = retain + + def dprint(self, msg, *args): + if self.DEBUG: + print(msg % args) + + def _timeout(self, t): + return ticks_diff(ticks_ms(), t) > self._response_time + + async def _as_read(self, n, sock=None): # OSError caught by superclass + if sock is None: + sock = self._sock + # Declare a byte array of size n. That space is needed anyway, better + # to just 'allocate' it in one go instead of appending to an + # existing object, this prevents reallocation and fragmentation. + data = bytearray(n) + buffer = memoryview(data) + size = 0 + t = ticks_ms() + while size < n: + if self._timeout(t) or not self.isconnected(): + raise OSError(-1, 'Timeout on socket read') + try: + msg = sock.read(n - size) + except OSError as e: # ESP32 issues weird 119 errors here + msg = None + if e.args[0] not in BUSY_ERRORS: + raise + if msg == b'': # Connection closed by host + raise OSError(-1, 'Connection closed by host') + if msg is not None: # data received + msg_size = len(msg) + buffer[size:size + msg_size] = msg + size += msg_size + t = ticks_ms() + self.last_rx = ticks_ms() + await asyncio.sleep_ms(_SOCKET_POLL_DELAY) + return data + + async def _as_write(self, bytes_wr, length=0, sock=None): + if sock is None: + sock = self._sock + + # Wrap bytes in memoryview to avoid copying during slicing + bytes_wr = memoryview(bytes_wr) + if length: + bytes_wr = bytes_wr[:length] + t = ticks_ms() + while bytes_wr: + if self._timeout(t) or not self.isconnected(): + raise OSError(-1, 'Timeout on socket write') + try: + n = sock.write(bytes_wr) + except OSError as e: # ESP32 issues weird 119 errors here + n = 0 + if e.args[0] not in BUSY_ERRORS: + raise + if n: + t = ticks_ms() + bytes_wr = bytes_wr[n:] + await asyncio.sleep_ms(_SOCKET_POLL_DELAY) + + async def _send_str(self, s): + await self._as_write(struct.pack("!H", len(s))) + await self._as_write(s) + + async def _recv_len(self): + n = 0 + sh = 0 + while 1: + res = await self._as_read(1) + b = res[0] + n |= (b & 0x7f) << sh + if not b & 0x80: + return n + sh += 7 + + async def _connect(self, clean): + self._sock = socket.socket() + self._sock.setblocking(False) + try: + self._sock.connect(self._addr) + except OSError as e: + if e.args[0] not in BUSY_ERRORS: + raise + await asyncio.sleep_ms(_DEFAULT_MS) + self.dprint('Connecting to broker.') + if self._ssl: + import ussl + self._sock = ussl.wrap_socket(self._sock, **self._ssl_params) + premsg = bytearray(b"\x10\0\0\0\0\0") + msg = bytearray(b"\x04MQTT\x04\0\0\0") # Protocol 3.1.1 + + sz = 10 + 2 + len(self._client_id) + msg[6] = clean << 1 + if self._user: + sz += 2 + len(self._user) + 2 + len(self._pswd) + msg[6] |= 0xC0 + if self._keepalive: + msg[7] |= self._keepalive >> 8 + msg[8] |= self._keepalive & 0x00FF + if self._lw_topic: + sz += 2 + len(self._lw_topic) + 2 + len(self._lw_msg) + msg[6] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3 + msg[6] |= self._lw_retain << 5 + + i = 1 + while sz > 0x7f: + premsg[i] = (sz & 0x7f) | 0x80 + sz >>= 7 + i += 1 + premsg[i] = sz + await self._as_write(premsg, i + 2) + await self._as_write(msg) + await self._send_str(self._client_id) + if self._lw_topic: + await self._send_str(self._lw_topic) + await self._send_str(self._lw_msg) + if self._user: + await self._send_str(self._user) + await self._send_str(self._pswd) + # Await CONNACK + # read causes ECONNABORTED if broker is out; triggers a reconnect. + resp = await self._as_read(4) + self.dprint('Connected to broker.') # Got CONNACK + if resp[3] != 0 or resp[0] != 0x20 or resp[1] != 0x02: + raise OSError(-1, 'Bad CONNACK') # Bad CONNACK e.g. authentication fail. + + async def _ping(self): + async with self.lock: + await self._as_write(b"\xc0\0") + + # Check internet connectivity by sending DNS lookup to Google's 8.8.8.8 + async def wan_ok(self, + packet=b'$\x1a\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x03www\x06google\x03com\x00\x00\x01\x00\x01'): + if not self.isconnected(): # WiFi is down + return False + length = 32 # DNS query and response packet size + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.setblocking(False) + s.connect(('8.8.8.8', 53)) + await asyncio.sleep(1) + try: + await self._as_write(packet, sock=s) + await asyncio.sleep(2) + res = await self._as_read(length, s) + if len(res) == length: + return True # DNS response size OK + except OSError: # Timeout on read: no connectivity. + return False + finally: + s.close() + return False + + async def broker_up(self): # Test broker connectivity + if not self.isconnected(): + return False + tlast = self.last_rx + if ticks_diff(ticks_ms(), tlast) < 1000: + return True + try: + await self._ping() + except OSError: + return False + t = ticks_ms() + while not self._timeout(t): + await asyncio.sleep_ms(100) + if ticks_diff(self.last_rx, tlast) > 0: # Response received + return True + return False + + async def disconnect(self): + if self._sock is not None: + await self._kill_tasks(False) # Keep socket open + try: + async with self.lock: + self._sock.write(b"\xe0\0") # Close broker connection + await asyncio.sleep_ms(100) + except OSError: + pass + self._close() + self._has_connected = False + + def _close(self): + if self._sock is not None: + self._sock.close() + + def close(self): # API. See https://github.com/peterhinch/micropython-mqtt/issues/60 + self._close() + try: + self._sta_if.disconnect() # Disconnect Wi-Fi to avoid errors + except OSError: + self.dprint('Wi-Fi not started, unable to disconnect interface') + self._sta_if.active(False) + + async def _await_pid(self, pid): + t = ticks_ms() + while pid in self.rcv_pids: # local copy + if self._timeout(t) or not self.isconnected(): + break # Must repub or bail out + await asyncio.sleep_ms(100) + else: + return True # PID received. All done. + return False + + # qos == 1: coro blocks until wait_msg gets correct PID. + # If WiFi fails completely subclass re-publishes with new PID. + async def publish(self, topic, msg, retain, qos): + pid = next(self.newpid) + if qos: + self.rcv_pids.add(pid) + async with self.lock: + await self._publish(topic, msg, retain, qos, 0, pid) + if qos == 0: + return + + count = 0 + while 1: # Await PUBACK, republish on timeout + if await self._await_pid(pid): + return + # No match + if count >= self._max_repubs or not self.isconnected(): + raise OSError(-1) # Subclass to re-publish with new PID + async with self.lock: + await self._publish(topic, msg, retain, qos, dup=1, pid=pid) # Add pid + count += 1 + self.REPUB_COUNT += 1 + + async def _publish(self, topic, msg, retain, qos, dup, pid): + pkt = bytearray(b"\x30\0\0\0") + pkt[0] |= qos << 1 | retain | dup << 3 + sz = 2 + len(topic) + len(msg) + if qos > 0: + sz += 2 + if sz >= 2097152: + raise MQTTException('Strings too long.') + i = 1 + while sz > 0x7f: + pkt[i] = (sz & 0x7f) | 0x80 + sz >>= 7 + i += 1 + pkt[i] = sz + await self._as_write(pkt, i + 1) + await self._send_str(topic) + if qos > 0: + struct.pack_into("!H", pkt, 0, pid) + await self._as_write(pkt, 2) + await self._as_write(msg) + + # Can raise OSError if WiFi fails. Subclass traps. + async def subscribe(self, topic, qos): + pkt = bytearray(b"\x82\0\0\0") + pid = next(self.newpid) + self.rcv_pids.add(pid) + struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, pid) + async with self.lock: + await self._as_write(pkt) + await self._send_str(topic) + await self._as_write(qos.to_bytes(1, "little")) + + if not await self._await_pid(pid): + raise OSError(-1) + + # Can raise OSError if WiFi fails. Subclass traps. + async def unsubscribe(self, topic): + pkt = bytearray(b"\xa2\0\0\0") + pid = next(self.newpid) + self.rcv_pids.add(pid) + struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic), pid) + async with self.lock: + await self._as_write(pkt) + await self._send_str(topic) + + if not await self._await_pid(pid): + raise OSError(-1) + + # Wait for a single incoming MQTT message and process it. + # Subscribed messages are delivered to a callback previously + # set by .setup() method. Other (internal) MQTT + # messages processed internally. + # Immediate return if no data available. Called from ._handle_msg(). + async def wait_msg(self): + try: + res = self._sock.read(1) # Throws OSError on WiFi fail + except OSError as e: + if e.args[0] in BUSY_ERRORS: # Needed by RP2 + await asyncio.sleep_ms(0) + return + raise + if res is None: + return + if res == b'': + raise OSError(-1, 'Empty response') + + if res == b"\xd0": # PINGRESP + await self._as_read(1) # Update .last_rx time + return + op = res[0] + + if op == 0x40: # PUBACK: save pid + sz = await self._as_read(1) + if sz != b"\x02": + raise OSError(-1, 'Invalid PUBACK packet') + rcv_pid = await self._as_read(2) + pid = rcv_pid[0] << 8 | rcv_pid[1] + if pid in self.rcv_pids: + self.rcv_pids.discard(pid) + else: + raise OSError(-1, 'Invalid pid in PUBACK packet') + + if op == 0x90: # SUBACK + resp = await self._as_read(4) + if resp[3] == 0x80: + raise OSError(-1, 'Invalid SUBACK packet') + pid = resp[2] | (resp[1] << 8) + if pid in self.rcv_pids: + self.rcv_pids.discard(pid) + else: + raise OSError(-1, 'Invalid pid in SUBACK packet') + + if op == 0xB0: # UNSUBACK + resp = await self._as_read(3) + pid = resp[2] | (resp[1] << 8) + if pid in self.rcv_pids: + self.rcv_pids.discard(pid) + else: + raise OSError(-1) + + if op & 0xf0 != 0x30: + return + sz = await self._recv_len() + topic_len = await self._as_read(2) + topic_len = (topic_len[0] << 8) | topic_len[1] + topic = await self._as_read(topic_len) + sz -= topic_len + 2 + if op & 6: + pid = await self._as_read(2) + pid = pid[0] << 8 | pid[1] + sz -= 2 + msg = await self._as_read(sz) + retained = op & 0x01 + self._cb(topic, msg, bool(retained)) + if op & 6 == 2: # qos 1 + pkt = bytearray(b"\x40\x02\0\0") # Send PUBACK + struct.pack_into("!H", pkt, 2, pid) + await self._as_write(pkt) + elif op & 6 == 4: # qos 2 not supported + raise OSError(-1, 'QoS 2 not supported') + + +# MQTTClient class. Handles issues relating to connectivity. + +class MQTTClient(MQTT_base): + def __init__(self, config): + super().__init__(config) + self._isconnected = False # Current connection state + keepalive = 1000 * self._keepalive # ms + self._ping_interval = keepalive // 4 if keepalive else 20000 + p_i = config['ping_interval'] * 1000 # Can specify shorter e.g. for subscribe-only + if p_i and p_i < self._ping_interval: + self._ping_interval = p_i + self._in_connect = False + self._has_connected = False # Define 'Clean Session' value to use. + self._tasks = [] + if ESP8266: + import esp + esp.sleep_type(0) # Improve connection integrity at cost of power consumption. + + async def wifi_connect(self, quick=False): + s = self._sta_if + if ESP8266: + if s.isconnected(): # 1st attempt, already connected. + return + s.active(True) + s.connect() # ESP8266 remembers connection. + for _ in range(60): + if s.status() != network.STAT_CONNECTING: # Break out on fail or success. Check once per sec. + break + await asyncio.sleep(1) + if s.status() == network.STAT_CONNECTING: # might hang forever awaiting dhcp lease renewal or something else + s.disconnect() + await asyncio.sleep(1) + if not s.isconnected() and self._ssid is not None and self._wifi_pw is not None: + s.connect(self._ssid, self._wifi_pw) + while s.status() == network.STAT_CONNECTING: # Break out on fail or success. Check once per sec. + await asyncio.sleep(1) + else: + s.active(True) + if RP2: # Disable auto-sleep. + # https://datasheets.raspberrypi.com/picow/connecting-to-the-internet-with-pico-w.pdf + # para 3.6.3 + s.config(pm = 0xa11140) + s.connect(self._ssid, self._wifi_pw) + for _ in range(60): # Break out on fail or success. Check once per sec. + await asyncio.sleep(1) + # Loop while connecting or no IP + if s.isconnected(): + break + if ESP32: + if s.status() != network.STAT_CONNECTING: # 1001 + break + elif PYBOARD: # No symbolic constants in network + if not 1 <= s.status() <= 2: + break + elif RP2: # 1 is STAT_CONNECTING. 2 reported by user (No IP?) + if not 1 <= s.status() <= 2: + break + else: # Timeout: still in connecting state + s.disconnect() + await asyncio.sleep(1) + + if not s.isconnected(): # Timed out + raise OSError('Wi-Fi connect timed out') + if not quick: # Skip on first connection only if power saving + # Ensure connection stays up for a few secs. + self.dprint('Checking WiFi integrity.') + for _ in range(5): + if not s.isconnected(): + raise OSError('Connection Unstable') # in 1st 5 secs + await asyncio.sleep(1) + self.dprint('Got reliable connection') + + async def connect(self, *, quick=False): # Quick initial connect option for battery apps + if not self._has_connected: + await self.wifi_connect(quick) # On 1st call, caller handles error + # Note this blocks if DNS lookup occurs. Do it once to prevent + # blocking during later internet outage: + self._addr = socket.getaddrinfo(self.server, self.port)[0][-1] + self._in_connect = True # Disable low level ._isconnected check + try: + if not self._has_connected and self._clean_init and not self._clean: + # Power up. Clear previous session data but subsequently save it. + # Issue #40 + await self._connect(True) # Connect with clean session + try: + async with self.lock: + self._sock.write(b"\xe0\0") # Force disconnect but keep socket open + except OSError: + pass + self.dprint("Waiting for disconnect") + await asyncio.sleep(2) # Wait for broker to disconnect + self.dprint("About to reconnect with unclean session.") + await self._connect(self._clean) + except Exception: + self._close() + self._in_connect = False # Caller may run .isconnected() + raise + clean = self._clean if self._has_connected else self._clean_init + self.rcv_pids.clear() + # If we get here without error broker/LAN must be up. + self._isconnected = True + self._in_connect = False # Low level code can now check connectivity. + asyncio.create_task(self._wifi_handler(True)) # User handler. + if not self._has_connected: + self._has_connected = True # Use normal clean flag on reconnect. + asyncio.create_task( + self._keep_connected()) # Runs forever unless user issues .disconnect() + + asyncio.create_task(self._handle_msg()) # Task quits on connection fail. + self._tasks.append(asyncio.create_task(self._keep_alive())) + if self.DEBUG: + self._tasks.append(asyncio.create_task(self._memory())) + asyncio.create_task(self._connect_handler(self)) # User handler. + + # Launched by .connect(). Runs until connectivity fails. Checks for and + # handles incoming messages. + async def _handle_msg(self): + try: + while self.isconnected(): + async with self.lock: + await self.wait_msg() # Immediate return if no message + await asyncio.sleep_ms(_DEFAULT_MS) # Let other tasks get lock + + except OSError: + pass + self._reconnect() # Broker or WiFi fail. + + # Keep broker alive MQTT spec 3.1.2.10 Keep Alive. + # Runs until ping failure or no response in keepalive period. + async def _keep_alive(self): + while self.isconnected(): + pings_due = ticks_diff(ticks_ms(), self.last_rx) // self._ping_interval + if pings_due >= 4: + self.dprint('Reconnect: broker fail.') + break + await asyncio.sleep_ms(self._ping_interval) + try: + await self._ping() + except OSError: + break + self._reconnect() # Broker or WiFi fail. + + async def _kill_tasks(self, kill_skt): # Cancel running tasks + for task in self._tasks: + task.cancel() + self._tasks.clear() + await asyncio.sleep_ms(0) # Ensure cancellation complete + if kill_skt: # Close socket + self._close() + + # DEBUG: show RAM messages. + async def _memory(self): + while True: + await asyncio.sleep(20) + gc.collect() + self.dprint("RAM free %d alloc %d", gc.mem_free(), gc.mem_alloc()) + + def isconnected(self): + if self._in_connect: # Disable low-level check during .connect() + return True + if self._isconnected and not self._sta_if.isconnected(): # It's going down. + self._reconnect() + return self._isconnected + + def _reconnect(self): # Schedule a reconnection if not underway. + if self._isconnected: + self._isconnected = False + asyncio.create_task(self._kill_tasks(True)) # Shut down tasks and socket + asyncio.create_task(self._wifi_handler(False)) # User handler. + + # Await broker connection. + async def _connection(self): + while not self._isconnected: + await asyncio.sleep(1) + + # Scheduled on 1st successful connection. Runs forever maintaining wifi and + # broker connection. Must handle conditions at edge of WiFi range. + async def _keep_connected(self): + while self._has_connected: + if self.isconnected(): # Pause for 1 second + await asyncio.sleep(1) + gc.collect() + else: # Link is down, socket is closed, tasks are killed + try: + self._sta_if.disconnect() + except OSError: + self.dprint('Wi-Fi not started, unable to disconnect interface') + await asyncio.sleep(1) + try: + await self.wifi_connect() + except OSError: + continue + if not self._has_connected: # User has issued the terminal .disconnect() + self.dprint('Disconnected, exiting _keep_connected') + break + try: + await self.connect() + # Now has set ._isconnected and scheduled _connect_handler(). + self.dprint('Reconnect OK!') + except OSError as e: + self.dprint('Error in reconnect. %s', e) + # Can get ECONNABORTED or -1. The latter signifies no or bad CONNACK received. + self._close() # Disconnect and try again. + self._in_connect = False + self._isconnected = False + self.dprint('Disconnected, exited _keep_connected') + + async def subscribe(self, topic, qos=0): + qos_check(qos) + while 1: + await self._connection() + try: + return await super().subscribe(topic, qos) + except OSError: + pass + self._reconnect() # Broker or WiFi fail. + + async def unsubscribe(self, topic): + while 1: + await self._connection() + try: + return await super().unsubscribe(topic) + except OSError: + pass + self._reconnect() # Broker or WiFi fail. + + async def publish(self, topic, msg, retain=False, qos=0): + qos_check(qos) + while 1: + await self._connection() + try: + return await super().publish(topic, msg, retain, qos) + except OSError: + pass + self._reconnect() # Broker or WiFi fail.