mqtt handling

This commit is contained in:
Wolfgang Hottgenroth 2020-11-14 21:15:33 +01:00
parent e921ab59e3
commit fc5e6ab151
6 changed files with 109 additions and 199 deletions

View File

@ -37,7 +37,7 @@ BUILD_DIR = build
######################################
# C sources
C_SOURCES = \
User/Src/mqttTest.c User/Src/tcpTest.c User/Src/ports.c User/Src/eeprom.c User/Src/frontend.c User/Src/logger.c User/Src/loopCtrl.c User/Src/main2.c User/Src/mbusComm.c User/Src/ringbuffer.c User/Src/show.c User/Src/utils.c User/Src/wizHelper.c hottislib/PontCoopScheduler.c \
User/Src/mqttComm.c User/Src/tcpTest.c User/Src/ports.c User/Src/eeprom.c User/Src/frontend.c User/Src/logger.c User/Src/loopCtrl.c User/Src/main2.c User/Src/mbusComm.c User/Src/ringbuffer.c User/Src/show.c User/Src/utils.c User/Src/wizHelper.c hottislib/PontCoopScheduler.c \
libmbus/mbus/mbus-protocol.c \
Core/Src/main.c \
Core/Src/gpio.c \

View File

@ -1,9 +1,10 @@
#ifndef _MQTT_COMM_H_
#define _MQTT_COMM_H_
#ifndef _MQTTCOMM_H_
#define _MQTTCOMM_H_
// do not call it periodically, just call it with a NULL handle
void mqttCommInit(void *handle);
void mqttCommInit();
#endif // _MQTTCOMM_H_
#endif /* _MQTT_COMM_H_ */

View File

@ -1,10 +0,0 @@
#ifndef _MQTTTEST_H_
#define _MQTTTEST_H_
void mqttTestInit();
#endif // _MQTTTEST_H_

View File

@ -17,8 +17,7 @@
#include <frontend.h>
#include <eeprom.h>
#include <wizHelper.h>
#include <tcpTest.h>
#include <mqttTest.h>
#include <mqttComm.h>
void my_setup_1() {
@ -193,14 +192,14 @@ void my_setup_2() {
wizInit();
mqttTestInit();
mqttCommInit();
// frontendInit();
// frontendSetThreshold(240);
frontendInit();
frontendSetThreshold(240);
// schAdd(scheduleMBusRequest, NULL, 0, 1000);
// schAdd(triggerMBusRequest, NULL, 0, 100);
schAdd(scheduleMBusRequest, NULL, 0, 1000);
schAdd(triggerMBusRequest, NULL, 0, 100);
}
void my_loop() {

View File

@ -1,104 +1,120 @@
#include <mqttComm.h>
#include <logger.h>
#include <main.h>
#include <pubsubc.h>
#include <platformAdaption.h>
#include <PontCoopScheduler.h>
#include <wizHelper.h>
#include <show.h>
#include <mbusComm.h>
#include <stdint.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <stdio.h>
#include <mqtt_interface.h>
#include <MQTTClient.h>
#define RX_BUFFER_SIZE 2048
static uint8_t rxBuffer[RX_BUFFER_SIZE];
#define TX_BUFFER_SIZE 128
static uint8_t txBuffer[TX_BUFFER_SIZE];
extern const uint8_t MQTT_SOCK;
client_t client;
mqttClient_t mqttClient;
uint8_t brokerAddress[] = { 172, 16, 2, 16 };
uint16_t brokerPort = 1883;
static uint8_t targetIP[4] = { 172, 16, 2, 16 };
static uint16_t targetPort = 1883;
const static char WatchdogTopic[] = "IoT/Watchdog";
const static char StartupTopic[] = "IoT/MBGW3/Startup";
const static char StatusTopic[] = "IoT/MBGW3/Status";
struct opts_struct
{
char* clientid;
int nodelimiter;
char* delimiter;
enum QoS qos;
char* username;
char* password;
uint8_t *host;
uint16_t port;
int showtopics;
} opts ={ (char*)"stdout-subscriber", 0, (char*)"\n", QOS0, NULL, NULL, NULL, 0, 0 };
// typedef void (*callback_t)(char*, uint8_t*, uint16_t);
static void mqttCallback(char *topic, uint8_t *payload, uint16_t payloadLength) {
logMsg("mqcb: %s : %.*s", topic, payloadLength, payload);
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
MQTTClient mqttClient;
static void messageArrived(MessageData* md)
{
unsigned char testbuffer[100];
MQTTMessage* message = md->message;
if (opts.showtopics)
{
memcpy(testbuffer,(char*)message->payload,(int)message->payloadlen);
*(testbuffer + (int)message->payloadlen + 1) = "\n";
logMsg("%s\r\n",testbuffer);
if (0 == strcmp(topic, WatchdogTopic)) {
coloredMsg(LOG_GREEN, "mqcb: watchdog message received");
}
if (opts.nodelimiter)
logMsg("%.*s", (int)message->payloadlen, (char*)message->payload);
else
logMsg("%.*s%s", (int)message->payloadlen, (char*)message->payload, opts.delimiter);
}
static void mqttHandler(void *handle) {
show(DEBUG_2, ON);
MQTTYield(&mqttClient, data.keepAliveInterval);
show(DEBUG_2, OFF);
static void mqttStatusPublisher(void *handle) {
logMsg("mqsp: publishing status");
t_mbusCommStats *mbusCommStats = mbusCommGetStats();
char buf[64];
snprintf(buf, 64, "{\"uptime\":\"%ld\", \"requests\":\"%d\", \"errors\":\"%d\"}",
HAL_GetTick()/1000, mbusCommStats->requestCnt, mbusCommStats->errorCnt);
bool res = publish(&mqttClient, StatusTopic, (const char*)buf, strlen(buf), false);
coloredMsg(LOG_YELLOW, "mqch, publish returned %d", res);
}
void mqttCommInit(void *handle) {
if (! isNetworkAvailable()) {
coloredMsg(LOG_RED, "mqci, can not start mqtt yet, network unavailable, try again in a second");
schAdd(mqttCommInit, NULL, 1000, 0);
void mqttCommHandler(void *handle) {
static uint8_t state = 0;
static uint8_t message[] = "MeterbusGateway3Variant starting";
if (isNetworkAvailable()) {
switch (state) {
case 0:
coloredMsg(LOG_YELLOW, "mqch, initializing mqtt client");
client.sockNum = MQTT_SOCK;
mqttClientInit(&mqttClient, &client, mqttCallback);
coloredMsg(LOG_YELLOW, "mqch: mqtt client initialized");
state = 1;
break;
case 1:
coloredMsg(LOG_YELLOW, "mqch, connecting to broker ");
bool res = mqttConnect(&mqttClient, brokerAddress, 1883, "mbv3gw-client", NULL, NULL, NULL, 0, false, NULL, false);
coloredMsg(LOG_YELLOW, "mqch, mqttConnect returns %d", res);
if (res) {
coloredMsg(LOG_YELLOW, "mqch, ok, connected");
state = 2;
} else {
state = 255;
}
break;
case 2:
coloredMsg(LOG_YELLOW, "mqch, publish start-up");
res = publish(&mqttClient, StartupTopic, (const char*)message, strlen(message), false);
coloredMsg(LOG_YELLOW, "mqch, publish returned %d", res);
schAdd(mqttStatusPublisher, NULL, 0, 60000);
coloredMsg(LOG_YELLOW, "mqch, status publisher scheduled");
state = 3;
break;
case 3:
coloredMsg(LOG_YELLOW, "mqch, subscribe watchdog");
res = subscribe(&mqttClient, WatchdogTopic, MQTTQOS0);
coloredMsg(LOG_YELLOW, "mqch, subscribe returned %d", res);
state = 4;
break;
case 4:
coloredMsg(LOG_YELLOW, "mqch, now entering the loop");
state = 5;
break;
case 5:
// coloredMsg(LOG_YELLOW, "mqch, looping");
if (! mqttLoop(&mqttClient)) {
state = 0;
}
break;
case 255:
coloredMsg(LOG_YELLOW, "mqch, error state, will stop here");
schDel(mqttCommHandler, NULL);
coloredMsg(LOG_YELLOW, "mqch, trying again in one minute");
schAdd(mqttCommHandler, NULL, 60000, 100);
break;
}
} else {
coloredMsg(LOG_RED, "mqci, starting mqtt");
Network n;
NewNetwork(&n, MQTT_SOCK);
ConnectNetwork(&n, targetIP, targetPort);
MQTTClientInit(&mqttClient, &n, 1000, txBuffer, TX_BUFFER_SIZE, rxBuffer, RX_BUFFER_SIZE);
data.willFlag = 0;
data.MQTTVersion = 3;
data.clientID.cstring = opts.clientid;
data.username.cstring = opts.username;
data.password.cstring = opts.password;
data.keepAliveInterval = 60;
data.cleansession = 1;
int rc = MQTTConnect(&mqttClient, &data);
logMsg("Connected %d\r\n", rc);
opts.showtopics = 1;
logMsg("Subscribing to %s\r\n", "hello/wiznet");
rc = MQTTSubscribe(&mqttClient, "hello/wiznet", opts.qos, messageArrived);
logMsg("Subscribed %d\r\n", rc);
schAdd(mqttHandler, NULL, 0, 100);
coloredMsg(LOG_YELLOW, "mqch, network not yet ready");
}
}
void mqttCommInit() {
schAdd(mqttCommHandler, NULL, 0, 100);
}

View File

@ -1,96 +0,0 @@
#include <mqttTest.h>
#include <logger.h>
#include <pubsubc.h>
#include <platformAdaption.h>
#include <PontCoopScheduler.h>
#include <wizHelper.h>
#include <stdint.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
extern const uint8_t MQTT_SOCK;
client_t client;
mqttClient_t mqttClient;
uint8_t brokerAddress[] = { 172, 16, 2, 16 };
uint16_t brokerPort = 1883;
// typedef void (*callback_t)(char*, uint8_t*, uint16_t);
static void mqttCallback(char *topic, uint8_t *payload, uint16_t payloadLength) {
logMsg("mcb: %s : %*s", topic, payloadLength, payload);
}
void mqttTestHandler(void *handle) {
static uint8_t state = 0;
static uint8_t message[] = "Hello world\n\r";
if (isNetworkAvailable()) {
switch (state) {
case 0:
coloredMsg(LOG_YELLOW, "mth, initializing mqtt client");
client.sockNum = MQTT_SOCK;
mqttClientInit(&mqttClient, &client, mqttCallback);
coloredMsg(LOG_YELLOW, "mth: mqtt client initialized");
state = 1;
break;
case 1:
coloredMsg(LOG_YELLOW, "mth, connecting to broker ");
bool res = mqttConnect(&mqttClient, brokerAddress, 1883, "mbv3gw-client", NULL, NULL, NULL, 0, false, NULL, false);
coloredMsg(LOG_YELLOW, "mth, mqttConnect returns %d", res);
if (res) {
coloredMsg(LOG_YELLOW, "mth, ok, connected");
state = 2;
} else {
state = 255;
}
break;
case 2:
coloredMsg(LOG_YELLOW, "mth, publish something");
res = publish(&mqttClient, "wiznet/hello", message, strlen(message), false);
coloredMsg(LOG_YELLOW, "mth, publish returned %d", res);
state = 3;
break;
case 3:
coloredMsg(LOG_YELLOW, "mth, subscribe something");
res = subscribe(&mqttClient, "wiznet/helloback", MQTTQOS0);
coloredMsg(LOG_YELLOW, "mth, subscribe returned %d", res);
state = 4;
break;
case 4:
coloredMsg(LOG_YELLOW, "mth, now entering the loop");
state = 5;
break;
case 5:
// coloredMsg(LOG_YELLOW, "mth, looping");
if (! mqttLoop(&mqttClient)) {
state = 0;
}
break;
case 255:
coloredMsg(LOG_YELLOW, "mth, error state, will stop here");
schDel(mqttTestHandler, NULL);
coloredMsg(LOG_YELLOW, "mth, trying again in one minute");
schAdd(mqttTestHandler, NULL, 60000, 100);
break;
}
} else {
coloredMsg(LOG_YELLOW, "mth, network not yet ready");
}
}
void mqttTestInit() {
schAdd(mqttTestHandler, NULL, 0, 100);
}