Compare commits

..

5 Commits

5 changed files with 92 additions and 781 deletions

View File

@ -1,7 +1 @@
import machine
# wdt = machine.WDT(timeout=8000)
# wdt.feed()
# print("Watchdog enabled")
# boot.py -- run on boot-up

View File

@ -1,16 +1,18 @@
import secrets
SSID = 'telemetry'
SSID = 'gast1000'
WPA_KEY = secrets.WPA_KEY
MQTT_SERVER = '172.16.10.47'
MQTT_LOGIN = 'thermo-anna'
MQTT_SERVER = 'broker.hottis.de'
MQTT_LOGIN = secrets.MQTT_LOGIN
MQTT_PASSWORD = secrets.MQTT_PASSWORD
LOCATION = secrets.LOCATION
HEARTBEAT_TOPIC = b'IoT/rpipwThermometer/0/Heartbeat'
WATCHDOG_TOPIC = b'IoT/Watchdog'
PAYLOAD_TOPIC = b'IoT/rpipwThermometer/0/Payload'
STARTUP_TOPIC = b'IoT/rpipwThermometer/0/Startup'
MAX_CONNECT_RETRY_CNT = 25
MAIN_PERIOD_MS = 60000
MAIN_PERIOD_S = 60

118
main.py
View File

@ -1,47 +1,87 @@
# main.py -- put your code here!
from mqtt_as import MQTTClient
from mqtt_as import config as mqttConfig
import machine
from umqttsimple import MQTTClient
import time
import ubinascii
from machine import Pin, WDT
from onewire import OneWire
from ds18x20 import DS18X20
import network
import json
import config
import uasyncio as asyncio
def sub_cb(topic, msg, retained):
print(f"Topic = {topic} Msg = {msg} Retained = {retained}")
SENSOR_PIN = 16
async def main(client):
await client.connect()
n = 0
await asyncio.sleep(2) # Give broker time
while True:
print('publish', n)
# If WiFi is down the following will pause for the duration.
await client.publish('rpi/hb', '{} {}'.format(n, client.REPUB_COUNT), qos = 1)
n += 1
await asyncio.sleep(20) # Broker is slow
async def connected_cb(client):
await client.subscribe('rpi/in', 1)
print("Hello")
resetCause = machine.reset_cause()
loop = asyncio.get_event_loop()
mqttConfig['ssid'] = config.SSID
mqttConfig['wifi_pw'] = config.WPA_KEY
mqttConfig['subs_cb'] = sub_cb
mqttConfig['server'] = config.MQTT_SERVER
mqttConfig['port'] = 8883
mqttConfig['user'] = config.MQTT_LOGIN
mqttConfig['password'] = config.MQTT_PASSWORD
mqttConfig['connect_coro'] = connected_cb
# mqttConfig['client_id'] = ubinascii.hexlify(bytearray(os.urandom(16))).decode()
mqttConfig['ssl'] = False
# Set up client
MQTTClient.DEBUG = True # Optional
client = MQTTClient(mqttConfig)
wdt = WDT(timeout=8000)
wdt.feed()
print("Watchdog enabled")
try:
loop.run_until_complete(main(client))
finally:
client.close()
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(config.SSID, config.WPA_KEY)
connectCnt = 0
while (not wlan.isconnected() or (connectCnt > config.MAX_CONNECT_RETRY_CNT)):
print("Not yet connected")
wdt.feed()
connectCnt += 1
time.sleep(1)
if (not wlan.isconnected()):
print("Failed to connect to wlan, restarting")
machine.reset()
print(f"Connected to WLAN: {resetCause} {connectCnt} {wlan.ifconfig()}")
wdt.feed()
except Exception as e:
print(f"Some error when starting: {e.__class__.__name__}, {e}")
print("Restarting")
time.sleep(5)
machine.reset()
while True:
try:
clientId = f"rpipwtherm-{ubinascii.hexlify(machine.unique_id())}"
client = MQTTClient(clientId, config.MQTT_SERVER, keepalive=60,
ssl=True, user=config.MQTT_LOGIN, password=config.MQTT_PASSWORD)
client.connect()
print(f"Connected to MQTT broker {config.MQTT_SERVER} ")
wdt.feed()
# client.publish(config.STARTUP_TOPIC, f"Hello, rpipw thermometer is alive now at {wlan.ifconfig()}")
sensor = DS18X20(OneWire(Pin(SENSOR_PIN, Pin.IN, Pin.PULL_UP)))
devices = sensor.scan()
sensor.convert_temp()
time.sleep(1)
for idx, device in enumerate(devices):
t = sensor.read_temp(device)
msg = {
"sensor" : idx,
"location": config.LOCATION,
"temperature": t
}
print(f"Sensor: {idx}, {json.dumps(msg)}")
client.publish(config.PAYLOAD_TOPIC, json.dumps(msg))
print(f"Payload {idx} sent")
wdt.feed()
client.disconnect()
print("Disconnected from broker")
for w in range(config.MAIN_PERIOD_S):
print(f"Waiting {w}...")
wdt.feed()
time.sleep(1)
except Exception as e:
print(f"Error in main loop: {e.__class__.__name__}, {e}")
print("Restarting")
time.sleep(5)
machine.reset()

View File

@ -1,731 +0,0 @@
# mqtt_as.py Asynchronous version of umqtt.robust
# (C) Copyright Peter Hinch 2017-2022.
# Released under the MIT licence.
# Pyboard D support added also RP2/default
# Various improvements contributed by Kevin Köck.
import gc
import usocket as socket
import ustruct as struct
gc.collect()
from ubinascii import hexlify
import uasyncio as asyncio
gc.collect()
from utime import ticks_ms, ticks_diff
from uerrno import EINPROGRESS, ETIMEDOUT
gc.collect()
from micropython import const
from machine import unique_id
import network
gc.collect()
from sys import platform
VERSION = (0, 6, 6)
# Default short delay for good SynCom throughput (avoid sleep(0) with SynCom).
_DEFAULT_MS = const(20)
_SOCKET_POLL_DELAY = const(5) # 100ms added greatly to publish latency
# Legitimate errors while waiting on a socket. See uasyncio __init__.py open_connection().
ESP32 = platform == 'esp32' or platform == 'esp32_LoBo'
RP2 = platform == 'rp2'
if ESP32:
# https://forum.micropython.org/viewtopic.php?f=16&t=3608&p=20942#p20942
BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT, 118, 119] # Add in weird ESP32 errors
elif RP2:
BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT, -110]
else:
BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT]
ESP8266 = platform == 'esp8266'
PYBOARD = platform == 'pyboard'
# Default "do little" coro for optional user replacement
async def eliza(*_): # e.g. via set_wifi_handler(coro): see test program
await asyncio.sleep_ms(_DEFAULT_MS)
config = {
'client_id': hexlify(unique_id()),
'server': None,
'port': 0,
'user': '',
'password': '',
'keepalive': 60,
'ping_interval': 0,
'ssl': False,
'ssl_params': {},
'response_time': 10,
'clean_init': True,
'clean': True,
'max_repubs': 4,
'will': None,
'subs_cb': lambda *_: None,
'wifi_coro': eliza,
'connect_coro': eliza,
'ssid': None,
'wifi_pw': None,
}
class MQTTException(Exception):
pass
def pid_gen():
pid = 0
while True:
pid = pid + 1 if pid < 65535 else 1
yield pid
def qos_check(qos):
if not (qos == 0 or qos == 1):
raise ValueError('Only qos 0 and 1 are supported.')
# MQTT_base class. Handles MQTT protocol on the basis of a good connection.
# Exceptions from connectivity failures are handled by MQTTClient subclass.
class MQTT_base:
REPUB_COUNT = 0 # TEST
DEBUG = False
def __init__(self, config):
# MQTT config
self._client_id = config['client_id']
self._user = config['user']
self._pswd = config['password']
self._keepalive = config['keepalive']
if self._keepalive >= 65536:
raise ValueError('invalid keepalive time')
self._response_time = config['response_time'] * 1000 # Repub if no PUBACK received (ms).
self._max_repubs = config['max_repubs']
self._clean_init = config['clean_init'] # clean_session state on first connection
self._clean = config['clean'] # clean_session state on reconnect
will = config['will']
if will is None:
self._lw_topic = False
else:
self._set_last_will(*will)
# WiFi config
self._ssid = config['ssid'] # Required for ESP32 / Pyboard D. Optional ESP8266
self._wifi_pw = config['wifi_pw']
self._ssl = config['ssl']
self._ssl_params = config['ssl_params']
# Callbacks and coros
self._cb = config['subs_cb']
self._wifi_handler = config['wifi_coro']
self._connect_handler = config['connect_coro']
# Network
self.port = config['port']
if self.port == 0:
self.port = 8883 if self._ssl else 1883
self.server = config['server']
if self.server is None:
raise ValueError('no server specified.')
self._sock = None
self._sta_if = network.WLAN(network.STA_IF)
self._sta_if.active(True)
self.newpid = pid_gen()
self.rcv_pids = set() # PUBACK and SUBACK pids awaiting ACK response
self.last_rx = ticks_ms() # Time of last communication from broker
self.lock = asyncio.Lock()
def _set_last_will(self, topic, msg, retain=False, qos=0):
qos_check(qos)
if not topic:
raise ValueError('Empty topic.')
self._lw_topic = topic
self._lw_msg = msg
self._lw_qos = qos
self._lw_retain = retain
def dprint(self, msg, *args):
if self.DEBUG:
print(msg % args)
def _timeout(self, t):
return ticks_diff(ticks_ms(), t) > self._response_time
async def _as_read(self, n, sock=None): # OSError caught by superclass
if sock is None:
sock = self._sock
# Declare a byte array of size n. That space is needed anyway, better
# to just 'allocate' it in one go instead of appending to an
# existing object, this prevents reallocation and fragmentation.
data = bytearray(n)
buffer = memoryview(data)
size = 0
t = ticks_ms()
while size < n:
if self._timeout(t) or not self.isconnected():
raise OSError(-1, 'Timeout on socket read')
try:
msg = sock.read(n - size)
except OSError as e: # ESP32 issues weird 119 errors here
msg = None
if e.args[0] not in BUSY_ERRORS:
raise
if msg == b'': # Connection closed by host
raise OSError(-1, 'Connection closed by host')
if msg is not None: # data received
msg_size = len(msg)
buffer[size:size + msg_size] = msg
size += msg_size
t = ticks_ms()
self.last_rx = ticks_ms()
await asyncio.sleep_ms(_SOCKET_POLL_DELAY)
return data
async def _as_write(self, bytes_wr, length=0, sock=None):
if sock is None:
sock = self._sock
# Wrap bytes in memoryview to avoid copying during slicing
bytes_wr = memoryview(bytes_wr)
if length:
bytes_wr = bytes_wr[:length]
t = ticks_ms()
while bytes_wr:
if self._timeout(t) or not self.isconnected():
raise OSError(-1, 'Timeout on socket write')
try:
n = sock.write(bytes_wr)
except OSError as e: # ESP32 issues weird 119 errors here
n = 0
if e.args[0] not in BUSY_ERRORS:
raise
if n:
t = ticks_ms()
bytes_wr = bytes_wr[n:]
await asyncio.sleep_ms(_SOCKET_POLL_DELAY)
async def _send_str(self, s):
await self._as_write(struct.pack("!H", len(s)))
await self._as_write(s)
async def _recv_len(self):
n = 0
sh = 0
while 1:
res = await self._as_read(1)
b = res[0]
n |= (b & 0x7f) << sh
if not b & 0x80:
return n
sh += 7
async def _connect(self, clean):
self._sock = socket.socket()
self._sock.setblocking(False)
try:
self._sock.connect(self._addr)
except OSError as e:
if e.args[0] not in BUSY_ERRORS:
raise
await asyncio.sleep_ms(_DEFAULT_MS)
self.dprint('Connecting to broker.')
if self._ssl:
import ussl
self._sock = ussl.wrap_socket(self._sock, **self._ssl_params)
premsg = bytearray(b"\x10\0\0\0\0\0")
msg = bytearray(b"\x04MQTT\x04\0\0\0") # Protocol 3.1.1
sz = 10 + 2 + len(self._client_id)
msg[6] = clean << 1
if self._user:
sz += 2 + len(self._user) + 2 + len(self._pswd)
msg[6] |= 0xC0
if self._keepalive:
msg[7] |= self._keepalive >> 8
msg[8] |= self._keepalive & 0x00FF
if self._lw_topic:
sz += 2 + len(self._lw_topic) + 2 + len(self._lw_msg)
msg[6] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3
msg[6] |= self._lw_retain << 5
i = 1
while sz > 0x7f:
premsg[i] = (sz & 0x7f) | 0x80
sz >>= 7
i += 1
premsg[i] = sz
await self._as_write(premsg, i + 2)
await self._as_write(msg)
await self._send_str(self._client_id)
if self._lw_topic:
await self._send_str(self._lw_topic)
await self._send_str(self._lw_msg)
if self._user:
await self._send_str(self._user)
await self._send_str(self._pswd)
# Await CONNACK
# read causes ECONNABORTED if broker is out; triggers a reconnect.
resp = await self._as_read(4)
self.dprint('Connected to broker.') # Got CONNACK
if resp[3] != 0 or resp[0] != 0x20 or resp[1] != 0x02:
raise OSError(-1, 'Bad CONNACK') # Bad CONNACK e.g. authentication fail.
async def _ping(self):
async with self.lock:
await self._as_write(b"\xc0\0")
# Check internet connectivity by sending DNS lookup to Google's 8.8.8.8
async def wan_ok(self,
packet=b'$\x1a\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x03www\x06google\x03com\x00\x00\x01\x00\x01'):
if not self.isconnected(): # WiFi is down
return False
length = 32 # DNS query and response packet size
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setblocking(False)
s.connect(('8.8.8.8', 53))
await asyncio.sleep(1)
try:
await self._as_write(packet, sock=s)
await asyncio.sleep(2)
res = await self._as_read(length, s)
if len(res) == length:
return True # DNS response size OK
except OSError: # Timeout on read: no connectivity.
return False
finally:
s.close()
return False
async def broker_up(self): # Test broker connectivity
if not self.isconnected():
return False
tlast = self.last_rx
if ticks_diff(ticks_ms(), tlast) < 1000:
return True
try:
await self._ping()
except OSError:
return False
t = ticks_ms()
while not self._timeout(t):
await asyncio.sleep_ms(100)
if ticks_diff(self.last_rx, tlast) > 0: # Response received
return True
return False
async def disconnect(self):
if self._sock is not None:
await self._kill_tasks(False) # Keep socket open
try:
async with self.lock:
self._sock.write(b"\xe0\0") # Close broker connection
await asyncio.sleep_ms(100)
except OSError:
pass
self._close()
self._has_connected = False
def _close(self):
if self._sock is not None:
self._sock.close()
def close(self): # API. See https://github.com/peterhinch/micropython-mqtt/issues/60
self._close()
try:
self._sta_if.disconnect() # Disconnect Wi-Fi to avoid errors
except OSError:
self.dprint('Wi-Fi not started, unable to disconnect interface')
self._sta_if.active(False)
async def _await_pid(self, pid):
t = ticks_ms()
while pid in self.rcv_pids: # local copy
if self._timeout(t) or not self.isconnected():
break # Must repub or bail out
await asyncio.sleep_ms(100)
else:
return True # PID received. All done.
return False
# qos == 1: coro blocks until wait_msg gets correct PID.
# If WiFi fails completely subclass re-publishes with new PID.
async def publish(self, topic, msg, retain, qos):
pid = next(self.newpid)
if qos:
self.rcv_pids.add(pid)
async with self.lock:
await self._publish(topic, msg, retain, qos, 0, pid)
if qos == 0:
return
count = 0
while 1: # Await PUBACK, republish on timeout
if await self._await_pid(pid):
return
# No match
if count >= self._max_repubs or not self.isconnected():
raise OSError(-1) # Subclass to re-publish with new PID
async with self.lock:
await self._publish(topic, msg, retain, qos, dup=1, pid=pid) # Add pid
count += 1
self.REPUB_COUNT += 1
async def _publish(self, topic, msg, retain, qos, dup, pid):
pkt = bytearray(b"\x30\0\0\0")
pkt[0] |= qos << 1 | retain | dup << 3
sz = 2 + len(topic) + len(msg)
if qos > 0:
sz += 2
if sz >= 2097152:
raise MQTTException('Strings too long.')
i = 1
while sz > 0x7f:
pkt[i] = (sz & 0x7f) | 0x80
sz >>= 7
i += 1
pkt[i] = sz
await self._as_write(pkt, i + 1)
await self._send_str(topic)
if qos > 0:
struct.pack_into("!H", pkt, 0, pid)
await self._as_write(pkt, 2)
await self._as_write(msg)
# Can raise OSError if WiFi fails. Subclass traps.
async def subscribe(self, topic, qos):
pkt = bytearray(b"\x82\0\0\0")
pid = next(self.newpid)
self.rcv_pids.add(pid)
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, pid)
async with self.lock:
await self._as_write(pkt)
await self._send_str(topic)
await self._as_write(qos.to_bytes(1, "little"))
if not await self._await_pid(pid):
raise OSError(-1)
# Can raise OSError if WiFi fails. Subclass traps.
async def unsubscribe(self, topic):
pkt = bytearray(b"\xa2\0\0\0")
pid = next(self.newpid)
self.rcv_pids.add(pid)
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic), pid)
async with self.lock:
await self._as_write(pkt)
await self._send_str(topic)
if not await self._await_pid(pid):
raise OSError(-1)
# Wait for a single incoming MQTT message and process it.
# Subscribed messages are delivered to a callback previously
# set by .setup() method. Other (internal) MQTT
# messages processed internally.
# Immediate return if no data available. Called from ._handle_msg().
async def wait_msg(self):
try:
res = self._sock.read(1) # Throws OSError on WiFi fail
except OSError as e:
if e.args[0] in BUSY_ERRORS: # Needed by RP2
await asyncio.sleep_ms(0)
return
raise
if res is None:
return
if res == b'':
raise OSError(-1, 'Empty response')
if res == b"\xd0": # PINGRESP
await self._as_read(1) # Update .last_rx time
return
op = res[0]
if op == 0x40: # PUBACK: save pid
sz = await self._as_read(1)
if sz != b"\x02":
raise OSError(-1, 'Invalid PUBACK packet')
rcv_pid = await self._as_read(2)
pid = rcv_pid[0] << 8 | rcv_pid[1]
if pid in self.rcv_pids:
self.rcv_pids.discard(pid)
else:
raise OSError(-1, 'Invalid pid in PUBACK packet')
if op == 0x90: # SUBACK
resp = await self._as_read(4)
if resp[3] == 0x80:
raise OSError(-1, 'Invalid SUBACK packet')
pid = resp[2] | (resp[1] << 8)
if pid in self.rcv_pids:
self.rcv_pids.discard(pid)
else:
raise OSError(-1, 'Invalid pid in SUBACK packet')
if op == 0xB0: # UNSUBACK
resp = await self._as_read(3)
pid = resp[2] | (resp[1] << 8)
if pid in self.rcv_pids:
self.rcv_pids.discard(pid)
else:
raise OSError(-1)
if op & 0xf0 != 0x30:
return
sz = await self._recv_len()
topic_len = await self._as_read(2)
topic_len = (topic_len[0] << 8) | topic_len[1]
topic = await self._as_read(topic_len)
sz -= topic_len + 2
if op & 6:
pid = await self._as_read(2)
pid = pid[0] << 8 | pid[1]
sz -= 2
msg = await self._as_read(sz)
retained = op & 0x01
self._cb(topic, msg, bool(retained))
if op & 6 == 2: # qos 1
pkt = bytearray(b"\x40\x02\0\0") # Send PUBACK
struct.pack_into("!H", pkt, 2, pid)
await self._as_write(pkt)
elif op & 6 == 4: # qos 2 not supported
raise OSError(-1, 'QoS 2 not supported')
# MQTTClient class. Handles issues relating to connectivity.
class MQTTClient(MQTT_base):
def __init__(self, config):
super().__init__(config)
self._isconnected = False # Current connection state
keepalive = 1000 * self._keepalive # ms
self._ping_interval = keepalive // 4 if keepalive else 20000
p_i = config['ping_interval'] * 1000 # Can specify shorter e.g. for subscribe-only
if p_i and p_i < self._ping_interval:
self._ping_interval = p_i
self._in_connect = False
self._has_connected = False # Define 'Clean Session' value to use.
self._tasks = []
if ESP8266:
import esp
esp.sleep_type(0) # Improve connection integrity at cost of power consumption.
async def wifi_connect(self, quick=False):
s = self._sta_if
if ESP8266:
if s.isconnected(): # 1st attempt, already connected.
return
s.active(True)
s.connect() # ESP8266 remembers connection.
for _ in range(60):
if s.status() != network.STAT_CONNECTING: # Break out on fail or success. Check once per sec.
break
await asyncio.sleep(1)
if s.status() == network.STAT_CONNECTING: # might hang forever awaiting dhcp lease renewal or something else
s.disconnect()
await asyncio.sleep(1)
if not s.isconnected() and self._ssid is not None and self._wifi_pw is not None:
s.connect(self._ssid, self._wifi_pw)
while s.status() == network.STAT_CONNECTING: # Break out on fail or success. Check once per sec.
await asyncio.sleep(1)
else:
s.active(True)
if RP2: # Disable auto-sleep.
# https://datasheets.raspberrypi.com/picow/connecting-to-the-internet-with-pico-w.pdf
# para 3.6.3
s.config(pm = 0xa11140)
s.connect(self._ssid, self._wifi_pw)
for _ in range(60): # Break out on fail or success. Check once per sec.
await asyncio.sleep(1)
# Loop while connecting or no IP
if s.isconnected():
break
if ESP32:
if s.status() != network.STAT_CONNECTING: # 1001
break
elif PYBOARD: # No symbolic constants in network
if not 1 <= s.status() <= 2:
break
elif RP2: # 1 is STAT_CONNECTING. 2 reported by user (No IP?)
if not 1 <= s.status() <= 2:
break
else: # Timeout: still in connecting state
s.disconnect()
await asyncio.sleep(1)
if not s.isconnected(): # Timed out
raise OSError('Wi-Fi connect timed out')
if not quick: # Skip on first connection only if power saving
# Ensure connection stays up for a few secs.
self.dprint('Checking WiFi integrity.')
for _ in range(5):
if not s.isconnected():
raise OSError('Connection Unstable') # in 1st 5 secs
await asyncio.sleep(1)
self.dprint('Got reliable connection')
async def connect(self, *, quick=False): # Quick initial connect option for battery apps
if not self._has_connected:
await self.wifi_connect(quick) # On 1st call, caller handles error
# Note this blocks if DNS lookup occurs. Do it once to prevent
# blocking during later internet outage:
self._addr = socket.getaddrinfo(self.server, self.port)[0][-1]
self._in_connect = True # Disable low level ._isconnected check
try:
if not self._has_connected and self._clean_init and not self._clean:
# Power up. Clear previous session data but subsequently save it.
# Issue #40
await self._connect(True) # Connect with clean session
try:
async with self.lock:
self._sock.write(b"\xe0\0") # Force disconnect but keep socket open
except OSError:
pass
self.dprint("Waiting for disconnect")
await asyncio.sleep(2) # Wait for broker to disconnect
self.dprint("About to reconnect with unclean session.")
await self._connect(self._clean)
except Exception:
self._close()
self._in_connect = False # Caller may run .isconnected()
raise
clean = self._clean if self._has_connected else self._clean_init
self.rcv_pids.clear()
# If we get here without error broker/LAN must be up.
self._isconnected = True
self._in_connect = False # Low level code can now check connectivity.
asyncio.create_task(self._wifi_handler(True)) # User handler.
if not self._has_connected:
self._has_connected = True # Use normal clean flag on reconnect.
asyncio.create_task(
self._keep_connected()) # Runs forever unless user issues .disconnect()
asyncio.create_task(self._handle_msg()) # Task quits on connection fail.
self._tasks.append(asyncio.create_task(self._keep_alive()))
if self.DEBUG:
self._tasks.append(asyncio.create_task(self._memory()))
asyncio.create_task(self._connect_handler(self)) # User handler.
# Launched by .connect(). Runs until connectivity fails. Checks for and
# handles incoming messages.
async def _handle_msg(self):
try:
while self.isconnected():
async with self.lock:
await self.wait_msg() # Immediate return if no message
await asyncio.sleep_ms(_DEFAULT_MS) # Let other tasks get lock
except OSError:
pass
self._reconnect() # Broker or WiFi fail.
# Keep broker alive MQTT spec 3.1.2.10 Keep Alive.
# Runs until ping failure or no response in keepalive period.
async def _keep_alive(self):
while self.isconnected():
pings_due = ticks_diff(ticks_ms(), self.last_rx) // self._ping_interval
if pings_due >= 4:
self.dprint('Reconnect: broker fail.')
break
await asyncio.sleep_ms(self._ping_interval)
try:
await self._ping()
except OSError:
break
self._reconnect() # Broker or WiFi fail.
async def _kill_tasks(self, kill_skt): # Cancel running tasks
for task in self._tasks:
task.cancel()
self._tasks.clear()
await asyncio.sleep_ms(0) # Ensure cancellation complete
if kill_skt: # Close socket
self._close()
# DEBUG: show RAM messages.
async def _memory(self):
while True:
await asyncio.sleep(20)
gc.collect()
self.dprint("RAM free %d alloc %d", gc.mem_free(), gc.mem_alloc())
def isconnected(self):
if self._in_connect: # Disable low-level check during .connect()
return True
if self._isconnected and not self._sta_if.isconnected(): # It's going down.
self._reconnect()
return self._isconnected
def _reconnect(self): # Schedule a reconnection if not underway.
if self._isconnected:
self._isconnected = False
asyncio.create_task(self._kill_tasks(True)) # Shut down tasks and socket
asyncio.create_task(self._wifi_handler(False)) # User handler.
# Await broker connection.
async def _connection(self):
while not self._isconnected:
await asyncio.sleep(1)
# Scheduled on 1st successful connection. Runs forever maintaining wifi and
# broker connection. Must handle conditions at edge of WiFi range.
async def _keep_connected(self):
while self._has_connected:
if self.isconnected(): # Pause for 1 second
await asyncio.sleep(1)
gc.collect()
else: # Link is down, socket is closed, tasks are killed
try:
self._sta_if.disconnect()
except OSError:
self.dprint('Wi-Fi not started, unable to disconnect interface')
await asyncio.sleep(1)
try:
await self.wifi_connect()
except OSError:
continue
if not self._has_connected: # User has issued the terminal .disconnect()
self.dprint('Disconnected, exiting _keep_connected')
break
try:
await self.connect()
# Now has set ._isconnected and scheduled _connect_handler().
self.dprint('Reconnect OK!')
except OSError as e:
self.dprint('Error in reconnect. %s', e)
# Can get ECONNABORTED or -1. The latter signifies no or bad CONNACK received.
self._close() # Disconnect and try again.
self._in_connect = False
self._isconnected = False
self.dprint('Disconnected, exited _keep_connected')
async def subscribe(self, topic, qos=0):
qos_check(qos)
while 1:
await self._connection()
try:
return await super().subscribe(topic, qos)
except OSError:
pass
self._reconnect() # Broker or WiFi fail.
async def unsubscribe(self, topic):
while 1:
await self._connection()
try:
return await super().unsubscribe(topic)
except OSError:
pass
self._reconnect() # Broker or WiFi fail.
async def publish(self, topic, msg, retain=False, qos=0):
qos_check(qos)
while 1:
await self._connection()
try:
return await super().publish(topic, msg, retain, qos)
except OSError:
pass
self._reconnect() # Broker or WiFi fail.

6
secrets.py-example Normal file
View File

@ -0,0 +1,6 @@
WPA_KEY = 'geheim'
MQTT_LOGIN = 'testuser'
MQTT_PASSWORD = 'geheim'
LOCATION = "somewhere"