Compare commits

...

14 Commits

8 changed files with 676 additions and 57 deletions

View File

@@ -1,11 +1,10 @@
# Makefile - A simple client for MQTT in C # Makefile - A simple client for MQTT in C
# Derived from the PubSubClient from Nick O'Leary
# Wolfgang Hottgenroth <woho@hottis.de> # Wolfgang Hottgenroth <woho@hottis.de>
# https://home.hottis.de/gitlab/wolutator/pubsubc # https://home.hottis.de/gitlab/wolutator/pubsubc
CFLAGS?=-mcpu=cortex-m3 -mthumb -Og -fdata-sections -ffunction-sections -g -gdwarf-2 CFLAGS?=-mcpu=cortex-m3 -mthumb -Og -fdata-sections -ffunction-sections -g -gdwarf-2 -Wall -Werror
CC=arm-none-eabi-gcc CC=arm-none-eabi-gcc
AR=arm-none-eabi-ar AR=arm-none-eabi-ar
@@ -15,7 +14,7 @@ CFLAGS+=-I../ioLibrary_Driver/Ethernet -Isrc
OBJDIR=build OBJDIR=build
VPATH=src VPATH=src
OBJS=$(addprefix $(OBJDIR)/,pubsubc.o client.o) OBJS=$(addprefix $(OBJDIR)/,pubsubc.o platformAdaption.o)
all: $(OBJS) all: $(OBJS)
$(AR) rcs pubsubc.a $^ $(AR) rcs pubsubc.a $^

View File

@@ -1,5 +1,85 @@
# pubsubc
This is a minimal MQTT client library in C.
It comes with an adaption layer for the ioLibrary_Driver for the
WizNet chips which can be found here https://github.com/Wiznet/ioLibrary_Driver.
You find a fork with a Makefile a STMCubeMX generated project here
https://home.hottis.de/gitlab/wolutator/ioLibrary_Driver/-/tree/WolfgangsOwnBranch.
Using this adaption layer you should find it easy to adjust it for
other platforms.
This work is directly derived from the famous PubSubClient library This work is directly derived from the famous PubSubClient library
of Nick O'Leary, which can be found at https://pubsubclient.knolleary.net/ of Nick O'Leary, which can be found at https://pubsubclient.knolleary.net/
It is most or less a plain C rewrite. It is most or less a plain C rewrite.
All honour for this working MQTT client library goes to Nick O'Leary,
all blame for bugs in the C port shall go to me.
## API
The library provides the following functions:
### Initialization
void mqttClientInit(mqttClient_t *mqttClient, client_t *client, callback_t callback);
Initializes the MQTT client, handover a client handle according to the definition
from ``platformAdaption.h`` and a callback function for incoming messages.
Implement this callback function for incoming messages with this footprint:
typedef void (*callback_t)(char*, uint8_t*, uint16_t);
### Connect
bool mqttConnect(mqttClient_t *mqttClient,
uint8_t *address, uint16_t port,
const char *id,
const char *user, const char *pass,
const char *willTopic, uint8_t willQos,
bool willRetain, const char *willMessage,
bool cleanSession);
In the original C++ implementation multiple variants of the ``connect``method are available
with less and lesser arguments. Unfortunately, in C this is no option. If you don't care
about authentication and the whole will stuff you can call it
mqttConnect(&mqttClient, brokerAddress, brokerPort, clientId, NULL, NULL, NULL, 0, false, NULL, false);
### Disconnect
void mqttDisconnect(mqttClient_t *mqttClient);
### Publish
bool publish(mqttClient_t *mqttClient,
const char *topic,
const uint8_t *payload, uint16_t plength,
bool retained);
### Subscribe and Unsubscribe
bool subscribe(mqttClient_t *mqttClient,
const char *topic, uint8_t qos);
bool unsubscribe(mqttClient_t *mqttClient,
const char* topic);
### Loop
bool mqttLoop(mqttClient_t *mqttClient);
Call this function in your idle loop. For my own part, I've implemented a time-triggered system
and call it with a period of 100ms.
### Connection status
bool mqttConnected(mqttClient_t *mqttClient);

View File

@@ -1,12 +0,0 @@
/*
client.c - A simple client for MQTT in C
Derived from the PubSubClient from Nick O'Leary
Wolfgang Hottgenroth <woho@hottis.de>
https://home.hottis.de/gitlab/wolutator/pubsubc
*/
#include <client.h>

View File

@@ -1,22 +0,0 @@
/*
client.h - A simple client for MQTT in C
Derived from the PubSubClient from Nick O'Leary
Wolfgang Hottgenroth <woho@hottis.de>
https://home.hottis.de/gitlab/wolutator/pubsubc
*/
#ifndef _CLIENT_H_
#define _CLIENT_H_
#include <stdint.h>
typedef struct {
uint8_t sockNum;
} client_t;
#endif // _CLIENT_H_

104
src/platformAdaption.c Normal file
View File

@@ -0,0 +1,104 @@
/*
platformAdaption.c - A simple client for MQTT in C
Wolfgang Hottgenroth <woho@hottis.de>
https://home.hottis.de/gitlab/wolutator/pubsubc
*/
#include <platformAdaption.h>
#include <socket.h>
#include <stdint.h>
#include <stdlib.h>
#include <stdbool.h>
int clientConnect(client_t *client, uint8_t *host, uint16_t port) {
int8_t res = socket(client->sockNum, Sn_MR_TCP, port, SF_IO_NONBLOCK);
if (res != client->sockNum) {
close(client->sockNum);
return PSC_INVALID_RESPONSE;
}
logMsg("clientConnect: socket initialized");
res = connect(client->sockNum, host, port);
if (res != SOCK_BUSY) {
close(client->sockNum);
return PSC_INVALID_RESPONSE;
}
uint32_t startTime = HAL_GetTick();
while (startTime + PSC_TIMEOUT_MS > HAL_GetTick()) {
uint8_t sockState = getSn_SR(client->sockNum);
if (sockState == SOCK_ESTABLISHED) {
logMsg("clientConnect: connection established");
return PSC_SUCCESS;
}
}
return PSC_TIMED_OUT;
}
int clientAvailable(client_t *client) {
return getSn_RX_RSR(client->sockNum);
}
void clientStop(client_t *client) {
int8_t res = disconnect(client->sockNum);
if (res != SOCK_BUSY) {
close(client->sockNum);
logMsg("clientStop: disconnect returns %d, invalid response, ignore it", res);
} else {
bool successfullyClosed = false;
uint32_t startTime = HAL_GetTick();
while (startTime + PSC_TIMEOUT_MS > HAL_GetTick()) {
uint8_t sockState = getSn_SR(client->sockNum);
if (sockState == SOCK_CLOSED) {
logMsg("clientStop: connection closed");
successfullyClosed = true;
break;
}
}
if (successfullyClosed) {
logMsg("clientStop: done");
close(client->sockNum);
} else {
logMsg("clientStop: timeout when closing, ignore");
close(client->sockNum);
}
}
}
int clientRead(client_t *client) {
int res = -1;
if (clientAvailable(client) >= 1) {
uint8_t buf;
res = recv(client->sockNum, &buf, 1);
if (res == 1) {
res = (int) buf;
}
}
return res;
}
size_t clientWrite(client_t *client, const uint8_t *buf, size_t size) {
int32_t res = send(client->sockNum, (uint8_t*) buf, size);
return (res == size) ? size : 0;
}
size_t clientWriteOne(client_t *client, uint8_t b) {
return clientWrite(client, &b, 1);
}
void clientFlush(client_t *client) {
// does nothing
}
bool clientConnected(client_t *client) {
return (getSn_SR(client->sockNum) == SOCK_ESTABLISHED) ? 1 : 0;
}
uint32_t millis() {
return HAL_GetTick();
}

44
src/platformAdaption.h Normal file
View File

@@ -0,0 +1,44 @@
/*
platformAdaption.h - A simple client for MQTT in C
Wolfgang Hottgenroth <woho@hottis.de>
https://home.hottis.de/gitlab/wolutator/pubsubc
*/
#ifndef _PLATFORMADAPTION_H_
#define _PLATFORMADAPTION_H_
#include <stdint.h>
#include <stdlib.h>
#include <stdbool.h>
#define PSC_SUCCESS 1
#define PSC_TIMED_OUT -1
#define PSC_INVALID_SERVER -2
#define PSC_TRUNCATED -3
#define PSC_INVALID_RESPONSE -4
#define PSC_TIMEOUT_MS 1000
typedef struct {
uint8_t sockNum;
} client_t;
int clientConnect(client_t *client, uint8_t *host, uint16_t port);
int clientAvailable(client_t *client);
void clientStop(client_t *client);
int clientRead(client_t *client);
size_t clientWrite(client_t *client, const uint8_t *buf, size_t size);
size_t clientWriteOne(client_t *client, uint8_t b);
void clientFlush(client_t *client);
bool clientConnected(client_t *client);
uint32_t millis();
uint32_t HAL_GetTick(void);
int logMsg(const char *format, ...);
#endif // _PLATFORMADAPTION_H_

View File

@@ -7,5 +7,430 @@
#include <pubsubc.h> #include <pubsubc.h>
#include <client.h> #include <platformAdaption.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
static bool readByte(mqttClient_t *mqttClient, uint8_t * result) {
uint32_t previousMillis = millis();
while(!clientAvailable(mqttClient->client)) {
uint32_t currentMillis = millis();
if(currentMillis - previousMillis >= ((int32_t) mqttClient->socketTimeout * 1000)){
return false;
}
}
*result = clientRead(mqttClient->client);
return true;
}
// reads a byte into result[*index] and increments index
static bool readByteAt(mqttClient_t *mqttClient, uint8_t * result, uint16_t * index){
uint16_t current_index = *index;
uint8_t * write_address = &(result[current_index]);
if (readByte(mqttClient, write_address)) {
*index = current_index + 1;
return true;
}
return false;
}
static uint16_t writeString(mqttClient_t *mqttClient, const char* string, uint8_t* buf, uint16_t pos) {
const char* idp = string;
uint16_t i = 0;
pos += 2;
while (*idp) {
buf[pos++] = *idp++;
i++;
}
buf[pos-i-2] = (i >> 8);
buf[pos-i-1] = (i & 0xFF);
return pos;
}
static size_t buildHeader(mqttClient_t *mqttClient, uint8_t header, uint8_t* buf, uint16_t length) {
uint8_t lenBuf[4];
uint8_t llen = 0;
uint8_t digit;
uint8_t pos = 0;
uint16_t len = length;
do {
digit = len & 127; //digit = len %128
len >>= 7; //len = len / 128
if (len > 0) {
digit |= 0x80;
}
lenBuf[pos++] = digit;
llen++;
} while(len>0);
buf[4-llen] = header;
for (int i=0;i<llen;i++) {
buf[MQTT_MAX_HEADER_SIZE-llen+i] = lenBuf[i];
}
return llen+1; // Full header size is variable length bit plus the 1-byte fixed header
}
static bool write(mqttClient_t *mqttClient, uint8_t header, uint8_t* buf, uint16_t length) {
uint16_t rc;
uint8_t hlen = buildHeader(mqttClient, header, buf, length);
rc = clientWrite(mqttClient->client, buf + (MQTT_MAX_HEADER_SIZE - hlen), length + hlen);
mqttClient->lastOutActivity = millis();
return (rc == hlen + length);
}
static uint32_t readPacket(mqttClient_t *mqttClient, uint8_t* lengthLength) {
uint16_t len = 0;
if (!readByteAt(mqttClient, mqttClient->buffer, &len)) return 0;
bool isPublish = (mqttClient->buffer[0] & 0xF0) == MQTTPUBLISH;
uint32_t multiplier = 1;
uint32_t length = 0;
uint8_t digit = 0;
uint16_t skip = 0;
uint32_t start = 0;
do {
if (len == 5) {
// Invalid remaining length encoding - kill the connection
mqttClient->state = MQTT_DISCONNECTED;
clientStop(mqttClient->client);
return 0;
}
if (!readByte(mqttClient, &digit)) return 0;
mqttClient->buffer[len++] = digit;
length += (digit & 127) * multiplier;
multiplier <<=7; //multiplier *= 128
} while ((digit & 128) != 0);
*lengthLength = len-1;
if (isPublish) {
// Read in topic length to calculate bytes to skip over for Stream writing
if (!readByteAt(mqttClient, mqttClient->buffer, &len)) return 0;
if (!readByteAt(mqttClient, mqttClient->buffer, &len)) return 0;
skip = (mqttClient->buffer[*lengthLength + 1] << 8) + mqttClient->buffer[*lengthLength + 2];
start = 2;
if (mqttClient->buffer[0]&MQTTQOS1) {
// skip message id
skip += 2;
}
}
uint32_t idx = len;
for (uint32_t i = start; i < length; i++) {
if (!readByte(mqttClient, &digit)) return 0;
if (len < mqttClient->bufferSize) {
mqttClient->buffer[len] = digit;
len++;
}
idx++;
}
if (idx > mqttClient->bufferSize) {
len = 0; // This will cause the packet to be ignored.
}
return len;
}
void mqttClientInit(mqttClient_t *mqttClient, client_t *client, callback_t callback) {
mqttClient->client = client;
mqttClient->callback = callback;
mqttClient->bufferSize = MQTT_MAX_PACKET_SIZE;
mqttClient->keepAlive = MQTT_KEEPALIVE;
mqttClient->state = MQTT_DISCONNECTED;
mqttClient->socketTimeout = MQTT_SOCKET_TIMEOUT;
memset(mqttClient->buffer, 0, mqttClient->bufferSize);
}
bool mqttConnect(mqttClient_t *mqttClient,
uint8_t *address, uint16_t port,
const char *id,
const char *user, const char *pass,
const char *willTopic, uint8_t willQos,
bool willRetain, const char *willMessage,
bool cleanSession) {
if (!mqttConnected(mqttClient)) {
int result = 0;
if(clientConnected(mqttClient->client)) {
result = 1;
} else {
mqttClient->brokerAddress = address;
mqttClient->brokerPort = port;
result = clientConnect(mqttClient->client, address, port);
}
if (result == 1) {
mqttClient->nextMsgId = 1;
// Leave room in the buffer for header and variable length field
uint16_t length = MQTT_MAX_HEADER_SIZE;
#if MQTT_VERSION == MQTT_VERSION_3_1
uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION};
#define MQTT_HEADER_VERSION_LENGTH 9
#elif MQTT_VERSION == MQTT_VERSION_3_1_1
uint8_t d[7] = {0x00,0x04,'M','Q','T','T',MQTT_VERSION};
#define MQTT_HEADER_VERSION_LENGTH 7
#endif
for (uint16_t j = 0; j<MQTT_HEADER_VERSION_LENGTH; j++) {
mqttClient->buffer[length++] = d[j];
}
uint8_t v;
if (willTopic) {
v = 0x04|(willQos<<3)|(willRetain<<5);
} else {
v = 0x00;
}
if (cleanSession) {
v = v|0x02;
}
if(user != NULL) {
v = v|0x80;
if(pass != NULL) {
v = v|(0x80>>1);
}
}
mqttClient->buffer[length++] = v;
mqttClient->buffer[length++] = ((mqttClient->keepAlive) >> 8);
mqttClient->buffer[length++] = ((mqttClient->keepAlive) & 0xFF);
// CHECK_STRING_LENGTH(length,id)
length = writeString(mqttClient, id, mqttClient->buffer, length);
if (willTopic) {
// CHECK_STRING_LENGTH(length,willTopic)
length = writeString(mqttClient, willTopic, mqttClient->buffer, length);
// CHECK_STRING_LENGTH(length,willMessage)
length = writeString(mqttClient, willMessage, mqttClient->buffer, length);
}
if (user != NULL) {
// CHECK_STRING_LENGTH(length,user)
length = writeString(mqttClient, user, mqttClient->buffer, length);
if (pass != NULL) {
// CHECK_STRING_LENGTH(length,pass)
length = writeString(mqttClient, pass, mqttClient->buffer, length);
}
}
write(mqttClient, MQTTCONNECT, mqttClient->buffer, length - MQTT_MAX_HEADER_SIZE);
mqttClient->lastInActivity = mqttClient->lastOutActivity = millis();
while (! clientAvailable(mqttClient->client)) {
uint32_t t = millis();
if (t - mqttClient->lastInActivity >= ((int32_t) mqttClient->socketTimeout*1000UL)) {
mqttClient->state = MQTT_CONNECTION_TIMEOUT;
clientStop(mqttClient->client);
return false;
}
}
uint8_t llen;
uint32_t len = readPacket(mqttClient, &llen);
if (len == 4) {
if (mqttClient->buffer[3] == 0) {
mqttClient->lastInActivity = millis();
mqttClient->pingOutstanding = false;
mqttClient->state = MQTT_CONNECTED;
return true;
} else {
mqttClient->state = mqttClient->buffer[3];
}
}
clientStop(mqttClient->client);
} else {
mqttClient->state = MQTT_CONNECT_FAILED;
}
return false;
}
return true;
}
void mqttDisconnect(mqttClient_t *mqttClient) {
mqttClient->buffer[0] = MQTTDISCONNECT;
mqttClient->buffer[1] = 0;
clientWrite(mqttClient->client, mqttClient->buffer, 2);
mqttClient->state = MQTT_DISCONNECTED;
clientFlush(mqttClient->client);
clientStop(mqttClient->client);
mqttClient->lastInActivity = mqttClient->lastOutActivity = millis();
}
bool mqttLoop(mqttClient_t *mqttClient) {
if (mqttConnected(mqttClient)) {
uint32_t t = millis();
if ((t - mqttClient->lastInActivity > mqttClient->keepAlive*1000UL) ||
(t - mqttClient->lastOutActivity > mqttClient->keepAlive*1000UL)) {
if (mqttClient->pingOutstanding) {
mqttClient->state = MQTT_CONNECTION_TIMEOUT;
clientStop(mqttClient->client);
return false;
} else {
mqttClient->buffer[0] = MQTTPINGREQ;
mqttClient->buffer[1] = 0;
clientWrite(mqttClient->client, mqttClient->buffer,2);
mqttClient->lastOutActivity = t;
mqttClient->lastInActivity = t;
mqttClient->pingOutstanding = true;
}
}
if (clientAvailable(mqttClient->client)) {
uint8_t llen;
uint16_t len = readPacket(mqttClient, &llen);
uint16_t msgId = 0;
uint8_t *payload;
if (len > 0) {
mqttClient->lastInActivity = t;
uint8_t type = mqttClient->buffer[0]&0xF0;
if (type == MQTTPUBLISH) {
if (mqttClient->callback) {
uint16_t tl = (mqttClient->buffer[llen+1]<<8) + mqttClient->buffer[llen+2]; /* topic length in bytes */
memmove(mqttClient->buffer + llen + 2, mqttClient->buffer + llen + 3, tl); /* move topic inside buffer 1 byte to front */
mqttClient->buffer[llen + 2 + tl] = 0; /* end the topic as a 'C' string with \x00 */
char *topic = (char*) mqttClient->buffer+llen+2;
// msgId only present for QOS>0
if ((mqttClient->buffer[0] & 0x06) == MQTTQOS1) {
msgId = (mqttClient->buffer[llen + 3 + tl]<<8) + mqttClient->buffer[llen + 3 + tl + 1];
payload = mqttClient->buffer + llen + 3 + tl + 2;
mqttClient->callback(topic, payload, len - llen - 3 - tl - 2);
mqttClient->buffer[0] = MQTTPUBACK;
mqttClient->buffer[1] = 2;
mqttClient->buffer[2] = (msgId >> 8);
mqttClient->buffer[3] = (msgId & 0xFF);
clientWrite(mqttClient->client, mqttClient->buffer, 4);
mqttClient->lastOutActivity = t;
} else {
payload = mqttClient->buffer + llen + 3 + tl;
mqttClient->callback(topic, payload, len - llen - 3 - tl);
}
}
} else if (type == MQTTPINGREQ) {
mqttClient->buffer[0] = MQTTPINGRESP;
mqttClient->buffer[1] = 0;
clientWrite(mqttClient->client, mqttClient->buffer,2);
} else if (type == MQTTPINGRESP) {
mqttClient->pingOutstanding = false;
}
} else if (!mqttConnected(mqttClient)) {
// readPacket has closed the connection
return false;
}
}
return true;
}
return false;
}
bool mqttConnected(mqttClient_t *mqttClient) {
bool rc;
if (mqttClient->client == NULL ) {
rc = false;
} else {
rc = clientConnected(mqttClient->client);
if (!rc) {
if (mqttClient->state == MQTT_CONNECTED) {
mqttClient->state = MQTT_CONNECTION_LOST;
clientFlush(mqttClient->client);
clientStop(mqttClient->client);
}
} else {
return mqttClient->state == MQTT_CONNECTED;
}
}
return rc;
}
bool publish(mqttClient_t *mqttClient,
const char *topic,
const uint8_t *payload, uint16_t plength,
bool retained) {
if (mqttConnected(mqttClient)) {
if (mqttClient->bufferSize < MQTT_MAX_HEADER_SIZE + 2 + strnlen(topic, mqttClient->bufferSize) + plength) {
// Too long
return false;
}
// Leave room in the buffer for header and variable length field
uint16_t length = MQTT_MAX_HEADER_SIZE;
length = writeString(mqttClient, topic, mqttClient->buffer, length);
// Add payload
for (uint16_t i = 0; i < plength; i++) {
mqttClient->buffer[length++] = payload[i];
}
// Write the header
uint8_t header = MQTTPUBLISH;
if (retained) {
header |= 1;
}
return write(mqttClient, header, mqttClient->buffer, length - MQTT_MAX_HEADER_SIZE);
}
return false;
}
bool subscribe(mqttClient_t *mqttClient,
const char *topic, uint8_t qos) {
size_t topicLength = strnlen(topic, mqttClient->bufferSize);
if (topic == 0) {
return false;
}
if (qos > 1) {
return false;
}
if (mqttClient->bufferSize < 9 + topicLength) {
// Too long
return false;
}
if (mqttConnected(mqttClient)) {
// Leave room in the buffer for header and variable length field
uint16_t length = MQTT_MAX_HEADER_SIZE;
mqttClient->nextMsgId++;
if (mqttClient->nextMsgId == 0) {
mqttClient->nextMsgId = 1;
}
mqttClient->buffer[length++] = (mqttClient->nextMsgId >> 8);
mqttClient->buffer[length++] = (mqttClient->nextMsgId & 0xFF);
length = writeString(mqttClient, (char*)topic, mqttClient->buffer, length);
mqttClient->buffer[length++] = qos;
return write(mqttClient, MQTTSUBSCRIBE|MQTTQOS1, mqttClient->buffer, length - MQTT_MAX_HEADER_SIZE);
}
return false;
}
bool unsubscribe(mqttClient_t *mqttClient,
const char* topic) {
size_t topicLength = strnlen(topic, mqttClient->bufferSize);
if (topic == 0) {
return false;
}
if (mqttClient->bufferSize < 9 + topicLength) {
// Too long
return false;
}
if (mqttConnected(mqttClient)) {
uint16_t length = MQTT_MAX_HEADER_SIZE;
mqttClient->nextMsgId++;
if (mqttClient->nextMsgId == 0) {
mqttClient->nextMsgId = 1;
}
mqttClient->buffer[length++] = (mqttClient->nextMsgId >> 8);
mqttClient->buffer[length++] = (mqttClient->nextMsgId & 0xFF);
length = writeString(mqttClient, topic, mqttClient->buffer,length);
return write(mqttClient, MQTTUNSUBSCRIBE|MQTTQOS1, mqttClient->buffer, length - MQTT_MAX_HEADER_SIZE);
}
return false;
}

View File

@@ -22,7 +22,7 @@
// MQTT_MAX_PACKET_SIZE : Maximum packet size. Override with setBufferSize(). // MQTT_MAX_PACKET_SIZE : Maximum packet size. Override with setBufferSize().
#ifndef MQTT_MAX_PACKET_SIZE #ifndef MQTT_MAX_PACKET_SIZE
#define MQTT_MAX_PACKET_SIZE 256 #define MQTT_MAX_PACKET_SIZE 512
#endif #endif
// MQTT_KEEPALIVE : keepAlive interval in Seconds. Override with setKeepAlive() // MQTT_KEEPALIVE : keepAlive interval in Seconds. Override with setKeepAlive()
@@ -37,10 +37,6 @@
// MQTT_MAX_TRANSFER_SIZE : limit how much data is passed to the network client
// in each write call. Needed for the Arduino Wifi Shield. Leave undefined to
// pass the entire MQTT packet in each write call.
//#define MQTT_MAX_TRANSFER_SIZE 80
// Possible values for client.state() // Possible values for client.state()
#define MQTT_CONNECTION_TIMEOUT -4 #define MQTT_CONNECTION_TIMEOUT -4
@@ -81,14 +77,14 @@
#include <stdint.h> #include <stdint.h>
#include <stdbool.h> #include <stdbool.h>
#include <client.h> #include <platformAdaption.h>
typedef void (*callback_t)(char*, uint8_t*, unsigned int); typedef void (*callback_t)(char*, uint8_t*, uint16_t);
typedef struct { typedef struct {
client_t *client; client_t *client;
uint8_t *buffer; uint8_t buffer[MQTT_MAX_PACKET_SIZE];
uint16_t bufferSize; uint16_t bufferSize;
uint16_t keepAlive; uint16_t keepAlive;
uint16_t socketTimeout; uint16_t socketTimeout;
@@ -96,31 +92,36 @@ typedef struct {
uint32_t lastOutActivity; uint32_t lastOutActivity;
uint32_t lastInActivity; uint32_t lastInActivity;
bool pingOutstanding; bool pingOutstanding;
callback_t *callback; callback_t callback;
uint8_t brokerAddress[4]; uint8_t *brokerAddress;
uint16_t brokerPort; uint16_t brokerPort;
int _state; int state;
} mqttClient_t; } mqttClient_t;
void mqttClientInit(mqttClient_t *mqttClient); void mqttClientInit(mqttClient_t *mqttClient, client_t *client, callback_t callback);
bool connect(mqttClient_t *mqttClient, const char *id, bool mqttConnect(mqttClient_t *mqttClient,
uint8_t *address, uint16_t port,
const char *id,
const char *user, const char *pass, const char *user, const char *pass,
const char *willTopic, uint8_t willQos, const char *willTopic, uint8_t willQos,
bool willRetain, const char *willMessage, bool willRetain, const char *willMessage,
bool cleanSession); bool cleanSession);
void disconnect();
void mqttDisconnect(mqttClient_t *mqttClient);
bool publish(mqttClient_t *mqttClient, bool publish(mqttClient_t *mqttClient,
const char *topic, const char *topic,
const uint8_t *payload, uint16_t pLength, const uint8_t *payload, uint16_t plength,
bool retained); bool retained);
bool subscribe(const char *topic, uint8_t qos); bool subscribe(mqttClient_t *mqttClient,
bool unsubscribe(const char* topic); const char *topic, uint8_t qos);
bool unsubscribe(mqttClient_t *mqttClient,
const char* topic);
bool loop(); bool mqttLoop(mqttClient_t *mqttClient);
bool connected(); bool mqttConnected(mqttClient_t *mqttClient);
#endif // _PUBSUBC_H_ #endif // _PUBSUBC_H_