diff --git a/Makefile b/Makefile index ad43154..eae802e 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,7 @@ CFLAGS+=-I../ioLibrary_Driver/Ethernet -Isrc OBJDIR=build VPATH=src -OBJS=$(addprefix $(OBJDIR)/,pubsubc.o client.o) +OBJS=$(addprefix $(OBJDIR)/,pubsubc.o platformAdaption.o) all: $(OBJS) $(AR) rcs pubsubc.a $^ diff --git a/src/client.h b/src/client.h deleted file mode 100644 index 04c54fe..0000000 --- a/src/client.h +++ /dev/null @@ -1,22 +0,0 @@ -/* - client.h - A simple client for MQTT in C - Derived from the PubSubClient from Nick O'Leary - Wolfgang Hottgenroth - https://home.hottis.de/gitlab/wolutator/pubsubc -*/ - - -#ifndef _CLIENT_H_ -#define _CLIENT_H_ - - -#include - -typedef struct { - uint8_t sockNum; -} client_t; - - - - -#endif // _CLIENT_H_ \ No newline at end of file diff --git a/src/client.c b/src/platformAdaption.c similarity index 64% rename from src/client.c rename to src/platformAdaption.c index 147e13b..638b249 100644 --- a/src/client.c +++ b/src/platformAdaption.c @@ -1,12 +1,12 @@ /* - client.c - A simple client for MQTT in C + platformAdaption.c - A simple client for MQTT in C Derived from the PubSubClient from Nick O'Leary Wolfgang Hottgenroth https://home.hottis.de/gitlab/wolutator/pubsubc */ -#include +#include diff --git a/src/platformAdaption.h b/src/platformAdaption.h new file mode 100644 index 0000000..b5d0d21 --- /dev/null +++ b/src/platformAdaption.h @@ -0,0 +1,33 @@ +/* + platformAdaption.h - A simple client for MQTT in C + Derived from the PubSubClient from Nick O'Leary + Wolfgang Hottgenroth + https://home.hottis.de/gitlab/wolutator/pubsubc +*/ + + +#ifndef _PLATFORMADAPTION_H_ +#define _PLATFORMADAPTION_H_ + + +#include +#include +#include + +typedef struct { + uint8_t sockNum; +} client_t; + +int clientConnect(client_t *client, const char *host, uint16_t port); +int clientAvailable(client_t *client); +void clientStop(client_t *client); +int clientRead(client_t *client); +size_t clientWrite(client_t *client, const uint8_t *buf, size_t size); +size_t clientWriteOne(client_t *client, uint8_t b); +void clientFlush(client_t *client); +bool clientConnected(client_t *client); + +uint32_t millis(); + + +#endif // _PLATFORMADAPTION_H_ \ No newline at end of file diff --git a/src/pubsubc.c b/src/pubsubc.c index d534353..7b2c6ee 100644 --- a/src/pubsubc.c +++ b/src/pubsubc.c @@ -7,5 +7,430 @@ #include -#include +#include +#include +#include +#include + + + +static bool readByte(mqttClient_t *mqttClient, uint8_t * result) { + uint32_t previousMillis = millis(); + while(!clientAvailable(mqttClient->client)) { + uint32_t currentMillis = millis(); + if(currentMillis - previousMillis >= ((int32_t) mqttClient->socketTimeout * 1000)){ + return false; + } + } + *result = clientRead(mqttClient->client); + return true; +} + + +// reads a byte into result[*index] and increments index +static bool readByteAt(mqttClient_t *mqttClient, uint8_t * result, uint16_t * index){ + uint16_t current_index = *index; + uint8_t * write_address = &(result[current_index]); + if (readByte(mqttClient, write_address)) { + *index = current_index + 1; + return true; + } + return false; +} + +static uint16_t writeString(mqttClient_t *mqttClient, const char* string, uint8_t* buf, uint16_t pos) { + const char* idp = string; + uint16_t i = 0; + pos += 2; + while (*idp) { + buf[pos++] = *idp++; + i++; + } + buf[pos-i-2] = (i >> 8); + buf[pos-i-1] = (i & 0xFF); + return pos; +} + + +static size_t buildHeader(mqttClient_t *mqttClient, uint8_t header, uint8_t* buf, uint16_t length) { + uint8_t lenBuf[4]; + uint8_t llen = 0; + uint8_t digit; + uint8_t pos = 0; + uint16_t len = length; + do { + digit = len & 127; //digit = len %128 + len >>= 7; //len = len / 128 + if (len > 0) { + digit |= 0x80; + } + lenBuf[pos++] = digit; + llen++; + } while(len>0); + + buf[4-llen] = header; + for (int i=0;iclient, buf + (MQTT_MAX_HEADER_SIZE - hlen), length + hlen); + mqttClient->lastOutActivity = millis(); + return (rc == hlen + length); +} + +static uint32_t readPacket(mqttClient_t *mqttClient, uint8_t* lengthLength) { + uint16_t len = 0; + if (!readByteAt(mqttClient, mqttClient->buffer, &len)) return 0; + bool isPublish = (mqttClient->buffer[0] & 0xF0) == MQTTPUBLISH; + uint32_t multiplier = 1; + uint32_t length = 0; + uint8_t digit = 0; + uint16_t skip = 0; + uint32_t start = 0; + + do { + if (len == 5) { + // Invalid remaining length encoding - kill the connection + mqttClient->state = MQTT_DISCONNECTED; + clientStop(mqttClient->client); + return 0; + } + if (!readByte(mqttClient, &digit)) return 0; + mqttClient->buffer[len++] = digit; + length += (digit & 127) * multiplier; + multiplier <<=7; //multiplier *= 128 + } while ((digit & 128) != 0); + *lengthLength = len-1; + + if (isPublish) { + // Read in topic length to calculate bytes to skip over for Stream writing + if (!readByteAt(mqttClient, mqttClient->buffer, &len)) return 0; + if (!readByteAt(mqttClient, mqttClient->buffer, &len)) return 0; + skip = (mqttClient->buffer[*lengthLength + 1] << 8) + mqttClient->buffer[*lengthLength + 2]; + start = 2; + if (mqttClient->buffer[0]&MQTTQOS1) { + // skip message id + skip += 2; + } + } + uint32_t idx = len; + + for (uint32_t i = start; i < length; i++) { + if (!readByte(mqttClient, &digit)) return 0; + if (len < mqttClient->bufferSize) { + mqttClient->buffer[len] = digit; + len++; + } + idx++; + } + + if (idx > mqttClient->bufferSize) { + len = 0; // This will cause the packet to be ignored. + } + return len; +} + + + +void mqttClientInit(mqttClient_t *mqttClient, client_t *client, callback_t callback) { + mqttClient->client = client; + mqttClient->callback = callback; + mqttClient->bufferSize = MQTT_MAX_PACKET_SIZE; + mqttClient->keepAlive = MQTT_KEEPALIVE; + mqttClient->state = MQTT_DISCONNECTED; + mqttClient->socketTimeout = MQTT_SOCKET_TIMEOUT; + + memset(mqttClient->buffer, 0, mqttClient->bufferSize); +} + +bool connect(mqttClient_t *mqttClient, + uint8_t *address, uint16_t port, + const char *id, + const char *user, const char *pass, + const char *willTopic, uint8_t willQos, + bool willRetain, const char *willMessage, + bool cleanSession) { + if (!connected(mqttClient)) { + int result = 0; + + if(clientConnected(mqttClient->client)) { + result = 1; + } else { + mqttClient->brokerAddress = address; + mqttClient->brokerPort = port; + result = clientConnect(mqttClient->client, address, port); + } + + if (result == 1) { + mqttClient->nextMsgId = 1; + // Leave room in the buffer for header and variable length field + uint16_t length = MQTT_MAX_HEADER_SIZE; + +#if MQTT_VERSION == MQTT_VERSION_3_1 + uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION}; +#define MQTT_HEADER_VERSION_LENGTH 9 +#elif MQTT_VERSION == MQTT_VERSION_3_1_1 + uint8_t d[7] = {0x00,0x04,'M','Q','T','T',MQTT_VERSION}; +#define MQTT_HEADER_VERSION_LENGTH 7 +#endif + for (uint16_t j = 0; jbuffer[length++] = d[j]; + } + + uint8_t v; + if (willTopic) { + v = 0x04|(willQos<<3)|(willRetain<<5); + } else { + v = 0x00; + } + if (cleanSession) { + v = v|0x02; + } + if(user != NULL) { + v = v|0x80; + + if(pass != NULL) { + v = v|(0x80>>1); + } + } + mqttClient->buffer[length++] = v; + + mqttClient->buffer[length++] = ((mqttClient->keepAlive) >> 8); + mqttClient->buffer[length++] = ((mqttClient->keepAlive) & 0xFF); + + // CHECK_STRING_LENGTH(length,id) + length = writeString(mqttClient, id, mqttClient->buffer, length); + if (willTopic) { + // CHECK_STRING_LENGTH(length,willTopic) + length = writeString(mqttClient, willTopic, mqttClient->buffer, length); + // CHECK_STRING_LENGTH(length,willMessage) + length = writeString(mqttClient, willMessage, mqttClient->buffer, length); + } + + if (user != NULL) { + // CHECK_STRING_LENGTH(length,user) + length = writeString(mqttClient, user, mqttClient->buffer, length); + if (pass != NULL) { + // CHECK_STRING_LENGTH(length,pass) + length = writeString(mqttClient, pass, mqttClient->buffer, length); + } + } + + write(mqttClient, MQTTCONNECT, mqttClient->buffer, length - MQTT_MAX_HEADER_SIZE); + + mqttClient->lastInActivity = mqttClient->lastOutActivity = millis(); + + while (! clientAvailable(mqttClient->client)) { + uint32_t t = millis(); + if (t - mqttClient->lastInActivity >= ((int32_t) mqttClient->socketTimeout*1000UL)) { + mqttClient->state = MQTT_CONNECTION_TIMEOUT; + clientStop(mqttClient->client); + return false; + } + } + + uint8_t llen; + uint32_t len = readPacket(mqttClient, &llen); + + if (len == 4) { + if (mqttClient->buffer[3] == 0) { + mqttClient->lastInActivity = millis(); + mqttClient->pingOutstanding = false; + mqttClient->state = MQTT_CONNECTED; + return true; + } else { + mqttClient->state = mqttClient->buffer[3]; + } + } + + clientStop(mqttClient->client); + } else { + mqttClient->state = MQTT_CONNECT_FAILED; + } + return false; + } + return true; +} + +void disconnect(mqttClient_t *mqttClient) { + mqttClient->buffer[0] = MQTTDISCONNECT; + mqttClient->buffer[1] = 0; + clientWrite(mqttClient->client, mqttClient->buffer, 2); + mqttClient->state = MQTT_DISCONNECTED; + clientFlush(mqttClient->client); + clientStop(mqttClient->client); + mqttClient->lastInActivity = mqttClient->lastOutActivity = millis(); +} + +bool loop(mqttClient_t *mqttClient) { + if (connected(mqttClient)) { + uint32_t t = millis(); + if ((t - mqttClient->lastInActivity > mqttClient->keepAlive*1000UL) || + (t - mqttClient->lastOutActivity > mqttClient->keepAlive*1000UL)) { + if (mqttClient->pingOutstanding) { + mqttClient->state = MQTT_CONNECTION_TIMEOUT; + clientStop(mqttClient->client); + return false; + } else { + mqttClient->buffer[0] = MQTTPINGREQ; + mqttClient->buffer[1] = 0; + clientWrite(mqttClient->client, mqttClient->buffer,2); + mqttClient->lastOutActivity = t; + mqttClient->lastInActivity = t; + mqttClient->pingOutstanding = true; + } + } + if (clientAvailable(mqttClient->client)) { + uint8_t llen; + uint16_t len = readPacket(mqttClient, &llen); + uint16_t msgId = 0; + uint8_t *payload; + if (len > 0) { + mqttClient->lastInActivity = t; + uint8_t type = mqttClient->buffer[0]&0xF0; + if (type == MQTTPUBLISH) { + if (mqttClient->callback) { + uint16_t tl = (mqttClient->buffer[llen+1]<<8) + mqttClient->buffer[llen+2]; /* topic length in bytes */ + memmove(mqttClient->buffer + llen + 2, mqttClient->buffer + llen + 3, tl); /* move topic inside buffer 1 byte to front */ + mqttClient->buffer[llen + 2 + tl] = 0; /* end the topic as a 'C' string with \x00 */ + char *topic = (char*) mqttClient->buffer+llen+2; + // msgId only present for QOS>0 + if ((mqttClient->buffer[0] & 0x06) == MQTTQOS1) { + msgId = (mqttClient->buffer[llen + 3 + tl]<<8) + mqttClient->buffer[llen + 3 + tl + 1]; + payload = mqttClient->buffer + llen + 3 + tl + 2; + mqttClient->callback(topic, payload, len - llen - 3 - tl - 2); + + mqttClient->buffer[0] = MQTTPUBACK; + mqttClient->buffer[1] = 2; + mqttClient->buffer[2] = (msgId >> 8); + mqttClient->buffer[3] = (msgId & 0xFF); + clientWrite(mqttClient->client, mqttClient->buffer, 4); + mqttClient->lastOutActivity = t; + } else { + payload = mqttClient->buffer + llen + 3 + tl; + mqttClient->callback(topic, payload, len - llen - 3 - tl); + } + } + } else if (type == MQTTPINGREQ) { + mqttClient->buffer[0] = MQTTPINGRESP; + mqttClient->buffer[1] = 0; + clientWrite(mqttClient->client, mqttClient->buffer,2); + } else if (type == MQTTPINGRESP) { + mqttClient->pingOutstanding = false; + } + } else if (!connected(mqttClient)) { + // readPacket has closed the connection + return false; + } + } + return true; + } + return false; +} + +bool connected(mqttClient_t *mqttClient) { + bool rc; + if (mqttClient->client == NULL ) { + rc = false; + } else { + rc = clientConnected(mqttClient->client); + if (!rc) { + if (mqttClient->state == MQTT_CONNECTED) { + mqttClient->state = MQTT_CONNECTION_LOST; + clientFlush(mqttClient->client); + clientStop(mqttClient->client); + } + } else { + return mqttClient->state == MQTT_CONNECTED; + } + } + return rc; +} + +bool publish(mqttClient_t *mqttClient, + const char *topic, + const uint8_t *payload, uint16_t plength, + bool retained) { + if (connected(mqttClient)) { + if (mqttClient->bufferSize < MQTT_MAX_HEADER_SIZE + 2 + strnlen(topic, mqttClient->bufferSize) + plength) { + // Too long + return false; + } + // Leave room in the buffer for header and variable length field + uint16_t length = MQTT_MAX_HEADER_SIZE; + length = writeString(mqttClient, topic, mqttClient->buffer, length); + + // Add payload + for (uint16_t i = 0; i < plength; i++) { + mqttClient->buffer[length++] = payload[i]; + } + + // Write the header + uint8_t header = MQTTPUBLISH; + if (retained) { + header |= 1; + } + return write(mqttClient, header, mqttClient->buffer, length - MQTT_MAX_HEADER_SIZE); + } + return false; +} + +bool subscribe(mqttClient_t *mqttClient, + const char *topic, uint8_t qos) { + size_t topicLength = strnlen(topic, mqttClient->bufferSize); + if (topic == 0) { + return false; + } + if (qos > 1) { + return false; + } + if (mqttClient->bufferSize < 9 + topicLength) { + // Too long + return false; + } + if (connected(mqttClient)) { + // Leave room in the buffer for header and variable length field + uint16_t length = MQTT_MAX_HEADER_SIZE; + mqttClient->nextMsgId++; + if (mqttClient->nextMsgId == 0) { + mqttClient->nextMsgId = 1; + } + mqttClient->buffer[length++] = (mqttClient->nextMsgId >> 8); + mqttClient->buffer[length++] = (mqttClient->nextMsgId & 0xFF); + length = writeString(mqttClient, (char*)topic, mqttClient->buffer, length); + mqttClient->buffer[length++] = qos; + return write(mqttClient, MQTTSUBSCRIBE|MQTTQOS1, mqttClient->buffer, length - MQTT_MAX_HEADER_SIZE); + } + return false; +} + +bool unsubscribe(mqttClient_t *mqttClient, + const char* topic) { + size_t topicLength = strnlen(topic, mqttClient->bufferSize); + if (topic == 0) { + return false; + } + if (mqttClient->bufferSize < 9 + topicLength) { + // Too long + return false; + } + if (connected(mqttClient)) { + uint16_t length = MQTT_MAX_HEADER_SIZE; + mqttClient->nextMsgId++; + if (mqttClient->nextMsgId == 0) { + mqttClient->nextMsgId = 1; + } + mqttClient->buffer[length++] = (mqttClient->nextMsgId >> 8); + mqttClient->buffer[length++] = (mqttClient->nextMsgId & 0xFF); + length = writeString(mqttClient, topic, mqttClient->buffer,length); + return write(mqttClient, MQTTUNSUBSCRIBE|MQTTQOS1, mqttClient->buffer, length - MQTT_MAX_HEADER_SIZE); + } + return false; +} diff --git a/src/pubsubc.h b/src/pubsubc.h index 78f2dfc..31469cf 100644 --- a/src/pubsubc.h +++ b/src/pubsubc.h @@ -81,14 +81,14 @@ #include #include -#include +#include -typedef void (*callback_t)(char*, uint8_t*, unsigned int); +typedef void (*callback_t)(char*, uint8_t*, uint16_t); typedef struct { client_t *client; - uint8_t *buffer; + uint8_t buffer[MQTT_MAX_PACKET_SIZE]; uint16_t bufferSize; uint16_t keepAlive; uint16_t socketTimeout; @@ -96,31 +96,36 @@ typedef struct { uint32_t lastOutActivity; uint32_t lastInActivity; bool pingOutstanding; - callback_t *callback; - uint8_t brokerAddress[4]; + callback_t callback; + uint8_t *brokerAddress; uint16_t brokerPort; - int _state; + int state; } mqttClient_t; -void mqttClientInit(mqttClient_t *mqttClient); -bool connect(mqttClient_t *mqttClient, const char *id, +void mqttClientInit(mqttClient_t *mqttClient, client_t *client, callback_t callback); +bool connect(mqttClient_t *mqttClient, + uint8_t *address, uint16_t port, + const char *id, const char *user, const char *pass, const char *willTopic, uint8_t willQos, bool willRetain, const char *willMessage, bool cleanSession); -void disconnect(); + +void disconnect(mqttClient_t *mqttClient); bool publish(mqttClient_t *mqttClient, const char *topic, - const uint8_t *payload, uint16_t pLength, + const uint8_t *payload, uint16_t plength, bool retained); -bool subscribe(const char *topic, uint8_t qos); -bool unsubscribe(const char* topic); +bool subscribe(mqttClient_t *mqttClient, + const char *topic, uint8_t qos); +bool unsubscribe(mqttClient_t *mqttClient, + const char* topic); -bool loop(); +bool loop(mqttClient_t *mqttClient); -bool connected(); +bool connected(mqttClient_t *mqttClient); #endif // _PUBSUBC_H_