From 306d047f231185ba749d720f10402ba737d17cef Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Thu, 18 Jun 2020 14:52:05 +0000 Subject: [PATCH] graceful stop of sink --- commonTypes.h | 3 ++- mqttauditing.c | 5 +++++ mqttreceiver.c | 6 +++--- ringbuffer.c | 23 +++++++++++++++++++---- ringbuffer.h | 6 ++++-- sink.c | 44 ++++++++++++++++++++++++++++++++++---------- sink.h | 2 ++ 7 files changed, 69 insertions(+), 20 deletions(-) diff --git a/commonTypes.h b/commonTypes.h index 530b881..24a31f5 100644 --- a/commonTypes.h +++ b/commonTypes.h @@ -9,7 +9,8 @@ typedef struct { config_t *config; ringbuffer_t *ringbuffer; - void* context; + void* mqttContext; + void* sinkContext; } commonThreadHandle_t; typedef struct { diff --git a/mqttauditing.c b/mqttauditing.c index b2fceb6..3d7cbd2 100644 --- a/mqttauditing.c +++ b/mqttauditing.c @@ -66,6 +66,11 @@ int main (void) { pthread_join(mqttThread, NULL); fprintf(stderr, "mqtt receiver thread joined\n"); + sinkStop((void*) &commonThreadHandle); + ringbufferInterrupt(&ringbuffer); + pthread_join(sinkThread, NULL); + fprintf(stderr, "sink thread joined\n"); + // will never be reached diff --git a/mqttreceiver.c b/mqttreceiver.c index 9029291..b2fc45c 100644 --- a/mqttreceiver.c +++ b/mqttreceiver.c @@ -49,7 +49,7 @@ typedef struct { int on_message(void *kontext, char *topicName, int topicLen, MQTTClient_message *message) { commonThreadHandle_t *handle = (commonThreadHandle_t*)kontext; - mqttThreadContext_t *context = (mqttThreadContext_t*)handle->context; + mqttThreadContext_t *context = (mqttThreadContext_t*)handle->mqttContext; char* payload = message->payload; printf("Topic: %s, TopicLen: %d, Payload: %s\n", topicName, topicLen, payload); @@ -83,7 +83,7 @@ int on_message(void *kontext, char *topicName, int topicLen, MQTTClient_message void on_disconnect(void *kontext, char *cause) { commonThreadHandle_t *handle = (commonThreadHandle_t*)kontext; - mqttThreadContext_t *context = (mqttThreadContext_t*)handle->context; + mqttThreadContext_t *context = (mqttThreadContext_t*)handle->mqttContext; fprintf(stderr, "disconnected from broker: %s trying to reconnect\n", cause); @@ -121,7 +121,7 @@ void *mqttreceiver(void *ptr) { commonThreadHandle_t *handle = (commonThreadHandle_t*)ptr; mqttThreadContext_t *context = (mqttThreadContext_t*) malloc(sizeof(mqttThreadContext_t)); - handle->context = (void*)context; + handle->mqttContext = (void*)context; context->stopSignal = false; context->watchdogCounter = 0; diff --git a/ringbuffer.c b/ringbuffer.c index cf49a38..58d323f 100644 --- a/ringbuffer.c +++ b/ringbuffer.c @@ -8,11 +8,20 @@ void ringbufferInit(ringbuffer_t *handle) { handle->bufferReadIdx = 0; handle->bufferWriteIdx = 0; + handle->interrupted = false; pthread_mutex_init(&(handle->eventMutex), NULL); pthread_cond_init(&(handle->eventSignal), NULL); fprintf(stderr, "ringbuffer initialized\n"); } +void ringbufferInterrupt(ringbuffer_t *handle) { + handle->interrupted = true; + + pthread_mutex_lock(&(handle->eventMutex)); + pthread_cond_signal(&(handle->eventSignal)); + pthread_mutex_unlock(&(handle->eventMutex)); +} + void ringbufferPut(ringbuffer_t *handle, void *f) { if (handle->bufferWriteIdx == (BUFFER_SIZE - 1)) { while (handle->bufferReadIdx == BUFFER_SIZE); @@ -34,16 +43,22 @@ void ringbufferPut(ringbuffer_t *handle, void *f) { void *ringbufferGet(ringbuffer_t *handle) { + void *res = NULL; + if (handle->bufferReadIdx == handle->bufferWriteIdx) { pthread_mutex_lock(&(handle->eventMutex)); pthread_cond_wait(&(handle->eventSignal), &(handle->eventMutex)); pthread_mutex_unlock(&(handle->eventMutex)); } - void *res = handle->buffer[handle->bufferReadIdx]; - handle->bufferReadIdx++; - if (handle->bufferReadIdx > BUFFER_SIZE) { - handle->bufferReadIdx = 0; + if (handle->interrupted) { + handle->interrupted = false; + } else { + res = handle->buffer[handle->bufferReadIdx]; + handle->bufferReadIdx++; + if (handle->bufferReadIdx > BUFFER_SIZE) { + handle->bufferReadIdx = 0; + } } return res; } diff --git a/ringbuffer.h b/ringbuffer.h index 2a56df8..6f3c1e5 100644 --- a/ringbuffer.h +++ b/ringbuffer.h @@ -3,7 +3,7 @@ #include #include - +#include #define BUFFER_SIZE 256 @@ -11,6 +11,7 @@ typedef struct { void* buffer[BUFFER_SIZE+5]; uint32_t bufferReadIdx; uint32_t bufferWriteIdx; + bool interrupted; pthread_mutex_t eventMutex; pthread_cond_t eventSignal; } ringbuffer_t; @@ -18,5 +19,6 @@ typedef struct { void ringbufferInit(ringbuffer_t *handle); void ringbufferPut(ringbuffer_t *handle, void *f); void *ringbufferGet(ringbuffer_t *handle); +void ringbufferInterrupt(ringbuffer_t *handle); -#endif // _RINGBUFFER_H_ \ No newline at end of file +#endif // _RINGBUFFER_H_ diff --git a/sink.c b/sink.c index b141ca8..240ae04 100644 --- a/sink.c +++ b/sink.c @@ -1,27 +1,51 @@ #include #include #include +#include #include "commonTypes.h" #include "ringbuffer.h" + + +typedef struct { + bool stopSignal; +} sinkThreadContext_t; + +void sinkStop(void *ptr) { + commonThreadHandle_t *handle = (commonThreadHandle_t*)ptr; + sinkThreadContext_t *context = (sinkThreadContext_t*)handle->sinkContext; + + context->stopSignal = true; + fprintf(stderr, "sink thread, stop flagged\n"); +} + + void *sink(void *ptr) { fprintf(stderr, "sink entered\n"); commonThreadHandle_t *handle = (commonThreadHandle_t*)ptr; + sinkThreadContext_t *context = (sinkThreadContext_t*) malloc(sizeof(sinkThreadContext_t)); + handle->sinkContext = (void*)context; + context->stopSignal = false; - while (1) { + while (! context->stopSignal) { auditItem_t *auditItem = (auditItem_t*)ringbufferGet(handle->ringbuffer); - printf("AuditItem: Time: %ld\n", auditItem->ts); - printf("AuditItem: Topic: %s\n", auditItem->topic); - printf("AuditItem: Payload: %s\n", auditItem->payload); + if (auditItem != NULL) { + printf("AuditItem: Time: %ld\n", auditItem->ts); + printf("AuditItem: Topic: %s\n", auditItem->topic); + printf("AuditItem: Payload: %s\n", auditItem->payload); - free(auditItem->topic); - auditItem->topic = NULL; - free(auditItem->payload); - auditItem->payload = NULL; - free(auditItem); - auditItem = NULL; + free(auditItem->topic); + auditItem->topic = NULL; + free(auditItem->payload); + auditItem->payload = NULL; + free(auditItem); + auditItem = NULL; + } } + + fprintf(stderr, "sink thread stopped\n"); + return (void*)NULL; } diff --git a/sink.h b/sink.h index c0724d4..72755ea 100644 --- a/sink.h +++ b/sink.h @@ -2,5 +2,7 @@ #define _SINK_H_ void *sink(void *ptr); +void sinkStop(void *ptr); + #endif // _SINK_H_