From e1a8e7d63e95736125d7d479076c41011fadd22b Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Wed, 17 Jun 2020 16:03:32 +0200 Subject: [PATCH] some more mqtt stuff --- handleType.h | 14 ++++ mqttauditing.c | 29 ++++++-- mqttreceiver.c | 191 ++++++++++++++++++++++++++++++++++--------------- mqttreceiver.h | 5 +- ringbuffer.c | 6 +- ringbuffer.h | 8 +-- 6 files changed, 179 insertions(+), 74 deletions(-) create mode 100644 handleType.h diff --git a/handleType.h b/handleType.h new file mode 100644 index 0000000..6ed603d --- /dev/null +++ b/handleType.h @@ -0,0 +1,14 @@ +#ifndef _HANDLETYPE_H_ +#define _HANDLETYPE_H_ + +#include +#include "ringbuffer.h" + + +typedef struct { + config_t *config; + ringbuffer_t *ringbuffer; + void* context; +} commonThreadHandle_t; + +#endif // _HANDLETYPE_H_ \ No newline at end of file diff --git a/mqttauditing.c b/mqttauditing.c index e8ac761..541bf93 100644 --- a/mqttauditing.c +++ b/mqttauditing.c @@ -8,13 +8,19 @@ #include "ringbuffer.h" #include "logging.h" #include "mqttreceiver.h" +#include "handleType.h" + + extern char VERSION[]; extern uint32_t REFCNT; config_t cfg; -t_ringbuffer ringbuffer; +ringbuffer_t ringbuffer; + +commonThreadHandle_t commonThreadHandle; + void readConfig() { @@ -32,15 +38,26 @@ int main (void) { fprintf(stderr, "VERSION: %s, REFCNT: %u\n", VERSION, REFCNT); readConfig(); - ringbufferInit(&ringbuffer); - mqttreceiverInit(&cfg, &ringbuffer); + + commonThreadHandle.config = &cfg; + commonThreadHandle.ringbuffer = &ringbuffer; + + + pthread_t mqttThread; + int err = pthread_create(&mqttThread, NULL, mqttreceiver, (void*) &commonThreadHandle); + if (err != 0) { + fprintf(stderr, "Unable to create mqtt receiver thread: %d\n", err); + exit(1); + } + fprintf(stderr, "started.\n"); - while (1) { - sleep(1); - } + pthread_join(mqttThread, NULL); + fprintf(stderr, "mqtt receiver thread joined\n"); + + // will never be reached config_destroy(&cfg); diff --git a/mqttreceiver.c b/mqttreceiver.c index 8a7110c..4c84a70 100644 --- a/mqttreceiver.c +++ b/mqttreceiver.c @@ -5,6 +5,8 @@ #include #include #include +#include +#include "handleType.h" #include "ringbuffer.h" @@ -12,24 +14,57 @@ const char MQTT_BROKER_KEY[] = "mqttBroker"; const char DEFAULT_MQTT_BROKER[] = "127.0.0.1:1883"; const char MQTT_CLIENTID_KEY[] = "mqttClientId"; const char *generatedMqttClientId = NULL; - - +const char MQTT_STATUS_TOPIC_KEY[] = "mqttStatusTopic"; +const char DEFAULT_MQTT_STATUS_TOPIC[] = "Iot/MqttAuditing/Status"; +const char MQTT_HEARTBEAT_TOPIC_KEY[] = "mqttHeartbeatTopic"; +const char DEFAULT_MQTT_HEARTBEAT_TOPIC[] = "Iot/MqttAuditing/Heartbeat"; +const char MQTT_COMMAND_TOPIC_KEY[] = "mqttCommandTopic"; +const char DEFAULT_MQTT_COMMAND_TOPIC[] = "Iot/MqttAuditing/Command"; +const char MQTT_AUDIT_TOPIC_KEY[] = "mqttAuditTopic"; +const char DEFAULT_MQTT_AUDIT_TOPIC[] = "Iot/MqttAuditing/Audit"; +const char MQTT_WATCHDOG_TOPIC_KEY[] = "mqttWatchdogTopic"; +const char DEFAULT_MQTT_WATCHDOG_TOPIC[] = "Iot/MqttAuditing/Watchdog"; +const char MQTT_WATCHDOG_GRACE_TIME_KEY[] = "mqttWatchdogGraceTime"; +const int DEFAULT_MQTT_WATCHDOG_GRACE_TIME = 15; typedef struct { - t_ringbuffer *ringbuffer; - char *mqttBroker; - char *mqttClientId; -} t_mqttThreadHandle; + const char *broker; + const char *clientId; + const char *statusTopic; + const char *watchdogTopic; + const char *heartbeatTopic; + const char *commandTopic; + const char *auditTopic; + int watchdogGraceTime; + bool stopSignal; + uint32_t watchdogCounter; +} mqttThreadContext_t; -t_mqttThreadHandle mqttThreadHandle; -pthread_t mqttThread; -int on_message(void *context, char *topicName, int topicLen, MQTTClient_message *message) { - t_mqttThreadHandle *handle = (t_mqttThreadHandle*)context; +int on_message(void *kontext, char *topicName, int topicLen, MQTTClient_message *message) { + commonThreadHandle_t *handle = (commonThreadHandle_t*)kontext; + mqttThreadContext_t *context = (mqttThreadContext_t*)handle->context; + + printf("Topic: %s, TopicLen: %d\n", topicName, topicLen); char* payload = message->payload; - printf("Received operation %s\n", payload); + printf("Received operation: %s\n", payload); + + int curTopicLen = (topicLen == 0) ? strlen(topicName) : topicLen; + if (strncmp(topicName, context->watchdogTopic, curTopicLen) == 0) { + printf("Watchdog signal received\n"); + context->watchdogCounter += 1; + } else if (strncmp(topicName, context->commandTopic, curTopicLen) == 0) { + printf("Command received\n"); + if (strcmp(payload, "stop") == 0) { + context->stopSignal = true; + printf("Stop command received\n"); + } + } else if (strncmp(topicName, context->auditTopic, curTopicLen) == 0) { + printf("Audit message received\n"); + } + MQTTClient_freeMessage(&message); MQTTClient_free(topicName); return 1; @@ -39,13 +74,59 @@ int on_message(void *context, char *topicName, int topicLen, MQTTClient_message -void *mqttreceiverRun(void *ptr) { - fprintf(stderr, "mqttreceiverRun entered\n"); +void *mqttreceiver(void *ptr) { - t_mqttThreadHandle *handle = (t_mqttThreadHandle*)ptr; + fprintf(stderr, "mqttreceiver entered\n"); + + commonThreadHandle_t *handle = (commonThreadHandle_t*)ptr; + mqttThreadContext_t *context = (mqttThreadContext_t*) malloc(sizeof(mqttThreadContext_t)); + handle->context = (void*)context; + context->stopSignal = false; + context->watchdogCounter = 0; + + char *generatedMqttClientId; + + if (! config_lookup_string(handle->config, MQTT_BROKER_KEY, &(context->broker))) { + context->broker = DEFAULT_MQTT_BROKER; + } + + if (! config_lookup_string(handle->config, MQTT_CLIENTID_KEY, &(context->clientId))) { + const uint32_t sizeOfGeneratedMqttClientId = 32; + generatedMqttClientId = (char*) malloc(sizeOfGeneratedMqttClientId); + memset(generatedMqttClientId, 0, sizeOfGeneratedMqttClientId); + char myHostname[8]; + gethostname(myHostname, sizeof(myHostname)); + pid_t myPid = getpid(); + snprintf(generatedMqttClientId, sizeOfGeneratedMqttClientId-1, "%s-%u", myHostname, myPid); + context->clientId = generatedMqttClientId; + } + + if (! config_lookup_string(handle->config, MQTT_STATUS_TOPIC_KEY, &(context->statusTopic))) { + context->statusTopic = DEFAULT_MQTT_STATUS_TOPIC; + } + + if (! config_lookup_string(handle->config, MQTT_HEARTBEAT_TOPIC_KEY, &(context->heartbeatTopic))) { + context->heartbeatTopic = DEFAULT_MQTT_HEARTBEAT_TOPIC; + } + + if (! config_lookup_string(handle->config, MQTT_COMMAND_TOPIC_KEY, &(context->commandTopic))) { + context->commandTopic = DEFAULT_MQTT_COMMAND_TOPIC; + } + + if (! config_lookup_string(handle->config, MQTT_AUDIT_TOPIC_KEY, &(context->auditTopic))) { + context->auditTopic = DEFAULT_MQTT_AUDIT_TOPIC; + } + + if (! config_lookup_string(handle->config, MQTT_WATCHDOG_TOPIC_KEY, &(context->watchdogTopic))) { + context->watchdogTopic = DEFAULT_MQTT_WATCHDOG_TOPIC; + } + + if (! config_lookup_int(handle->config, MQTT_WATCHDOG_GRACE_TIME_KEY, &(context->watchdogGraceTime))) { + context->watchdogGraceTime = DEFAULT_MQTT_WATCHDOG_GRACE_TIME; + } MQTTClient client; - MQTTClient_create(&client, handle->mqttBroker, handle->mqttClientId, MQTTCLIENT_PERSISTENCE_NONE, NULL); + MQTTClient_create(&client, context->broker, context->clientId, MQTTCLIENT_PERSISTENCE_NONE, NULL); MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; //conn_opts.username = "<>/<>"; //conn_opts.password = "<>"; @@ -55,56 +136,52 @@ void *mqttreceiverRun(void *ptr) { int rc; if ((rc = MQTTClient_connect(client, &conn_opts)) == MQTTCLIENT_SUCCESS) { fprintf(stderr, "Connected to MQTT broker\n"); - MQTTClient_subscribe(client, "s/ds", 0); + if (MQTTClient_subscribe(client, context->watchdogTopic, 0) == MQTTCLIENT_SUCCESS) { + fprintf(stderr, "Subscribed to %s\n", context->watchdogTopic); + }; - for (;;) { - MQTTClient_yield(); - sleep(3); + if (MQTTClient_subscribe(client, context->commandTopic, 0) == MQTTCLIENT_SUCCESS) { + fprintf(stderr, "Subscribed to %s\n", context->commandTopic); } + + if (MQTTClient_subscribe(client, context->auditTopic, 0) == MQTTCLIENT_SUCCESS) { + fprintf(stderr, "Subscribed to %s\n", context->auditTopic); + } + + uint32_t loopCounter = 0; + while (! context->stopSignal) { + loopCounter++; + if (loopCounter > context->watchdogGraceTime) { + if (context->watchdogCounter == 0) { + fprintf(stderr, "missing watchdog signals for too longer, terminating\n"); + context->stopSignal = true; + } else { + fprintf(stderr, "Watchdog fine: %d\n", context->watchdogCounter); + } + loopCounter = 0; + context->watchdogCounter = 0; + } + + MQTTClient_yield(); + sleep(1); + } + fprintf(stderr, "mqtt receiver thread stopped\n"); + } else { + fprintf(stderr, "Failed to connect to MQTT broker, return code %d\n", rc); } - - - fprintf(stderr, "Failed to connect to MQTT broker, return code %d\n", rc); MQTTClient_disconnect(client, 1000); MQTTClient_destroy(&client); - - return rc; -} - - -int mqttreceiverInit(config_t *pCfg, t_ringbuffer *pRingbuffer) { - mqttThreadHandle.ringbuffer = pRingbuffer; - - if (! config_lookup_string(pCfg, MQTT_BROKER_KEY, &(mqttThreadHandle.mqttBroker))) { - mqttThreadHandle.mqttBroker = DEFAULT_MQTT_BROKER; - } - - if (! config_lookup_string(pCfg, MQTT_CLIENTID_KEY, &(mqttThreadHandle.mqttClientId))) { - const uint32_t sizeOfGeneratedMqttClientId = 32; - generatedMqttClientId = (char*) malloc(sizeOfGeneratedMqttClientId); - memset(generatedMqttClientId, 0, sizeOfGeneratedMqttClientId); - char myHostname[8]; - gethostname(myHostname, sizeof(myHostname)); - pid_t myPid = getpid(); - snprintf(generatedMqttClientId, sizeOfGeneratedMqttClientId-1, "%s-%u", myHostname, myPid); - mqttThreadHandle.mqttClientId = generatedMqttClientId; - } - - - fprintf(stderr, "Create mqtt receiver thread\n"); - int r = pthread_create(&mqttThread, NULL, mqttreceiverRun, (void*) &mqttThreadHandle); - fprintf(stderr, "pthread_create returns: %d\n", r); - -/* - pthread_join(mqttThread, NULL); - - if (generatedMqttClientId != NULL) { free(generatedMqttClientId); + generatedMqttClientId = NULL; + } + if (context != NULL) { + free(context); + context = NULL; } -*/ - return 0; -} \ No newline at end of file + return NULL; +} + diff --git a/mqttreceiver.h b/mqttreceiver.h index 54199e6..ee09c50 100644 --- a/mqttreceiver.h +++ b/mqttreceiver.h @@ -1,9 +1,6 @@ #ifndef _MQTTRECEIVER_H_ #define _MQTTRECEIVER_H_ -#include -#include "ringbuffer.h" - -int mqttreceiverInit(config_t *pCfg, t_ringbuffer *pRingbuffer); +void *mqttreceiver(void *ptr); #endif // _MQTTRECEIVER_H_ \ No newline at end of file diff --git a/ringbuffer.c b/ringbuffer.c index b7d1d36..cf49a38 100644 --- a/ringbuffer.c +++ b/ringbuffer.c @@ -5,7 +5,7 @@ #include "ringbuffer.h" -void ringbufferInit(t_ringbuffer *handle) { +void ringbufferInit(ringbuffer_t *handle) { handle->bufferReadIdx = 0; handle->bufferWriteIdx = 0; pthread_mutex_init(&(handle->eventMutex), NULL); @@ -13,7 +13,7 @@ void ringbufferInit(t_ringbuffer *handle) { fprintf(stderr, "ringbuffer initialized\n"); } -void ringbufferPut(t_ringbuffer *handle, void *f) { +void ringbufferPut(ringbuffer_t *handle, void *f) { if (handle->bufferWriteIdx == (BUFFER_SIZE - 1)) { while (handle->bufferReadIdx == BUFFER_SIZE); } else { @@ -33,7 +33,7 @@ void ringbufferPut(t_ringbuffer *handle, void *f) { } -void *ringbufferGet(t_ringbuffer *handle) { +void *ringbufferGet(ringbuffer_t *handle) { if (handle->bufferReadIdx == handle->bufferWriteIdx) { pthread_mutex_lock(&(handle->eventMutex)); pthread_cond_wait(&(handle->eventSignal), &(handle->eventMutex)); diff --git a/ringbuffer.h b/ringbuffer.h index 98e0991..2a56df8 100644 --- a/ringbuffer.h +++ b/ringbuffer.h @@ -13,10 +13,10 @@ typedef struct { uint32_t bufferWriteIdx; pthread_mutex_t eventMutex; pthread_cond_t eventSignal; -} t_ringbuffer; +} ringbuffer_t; -void ringbufferInit(t_ringbuffer *handle); -void ringbufferPut(t_ringbuffer *handle, void *f); -void *ringbufferGet(t_ringbuffer *handle); +void ringbufferInit(ringbuffer_t *handle); +void ringbufferPut(ringbuffer_t *handle, void *f); +void *ringbufferGet(ringbuffer_t *handle); #endif // _RINGBUFFER_H_ \ No newline at end of file