graceful stop of sink

This commit is contained in:
Wolfgang Hottgenroth 2020-06-18 14:52:05 +00:00
parent dd2639c4be
commit 306d047f23
7 changed files with 69 additions and 20 deletions

View File

@ -9,7 +9,8 @@
typedef struct { typedef struct {
config_t *config; config_t *config;
ringbuffer_t *ringbuffer; ringbuffer_t *ringbuffer;
void* context; void* mqttContext;
void* sinkContext;
} commonThreadHandle_t; } commonThreadHandle_t;
typedef struct { typedef struct {

View File

@ -66,6 +66,11 @@ int main (void) {
pthread_join(mqttThread, NULL); pthread_join(mqttThread, NULL);
fprintf(stderr, "mqtt receiver thread joined\n"); fprintf(stderr, "mqtt receiver thread joined\n");
sinkStop((void*) &commonThreadHandle);
ringbufferInterrupt(&ringbuffer);
pthread_join(sinkThread, NULL);
fprintf(stderr, "sink thread joined\n");
// will never be reached // will never be reached

View File

@ -49,7 +49,7 @@ typedef struct {
int on_message(void *kontext, char *topicName, int topicLen, MQTTClient_message *message) { int on_message(void *kontext, char *topicName, int topicLen, MQTTClient_message *message) {
commonThreadHandle_t *handle = (commonThreadHandle_t*)kontext; commonThreadHandle_t *handle = (commonThreadHandle_t*)kontext;
mqttThreadContext_t *context = (mqttThreadContext_t*)handle->context; mqttThreadContext_t *context = (mqttThreadContext_t*)handle->mqttContext;
char* payload = message->payload; char* payload = message->payload;
printf("Topic: %s, TopicLen: %d, Payload: %s\n", topicName, topicLen, payload); printf("Topic: %s, TopicLen: %d, Payload: %s\n", topicName, topicLen, payload);
@ -83,7 +83,7 @@ int on_message(void *kontext, char *topicName, int topicLen, MQTTClient_message
void on_disconnect(void *kontext, char *cause) { void on_disconnect(void *kontext, char *cause) {
commonThreadHandle_t *handle = (commonThreadHandle_t*)kontext; commonThreadHandle_t *handle = (commonThreadHandle_t*)kontext;
mqttThreadContext_t *context = (mqttThreadContext_t*)handle->context; mqttThreadContext_t *context = (mqttThreadContext_t*)handle->mqttContext;
fprintf(stderr, "disconnected from broker: %s trying to reconnect\n", cause); fprintf(stderr, "disconnected from broker: %s trying to reconnect\n", cause);
@ -121,7 +121,7 @@ void *mqttreceiver(void *ptr) {
commonThreadHandle_t *handle = (commonThreadHandle_t*)ptr; commonThreadHandle_t *handle = (commonThreadHandle_t*)ptr;
mqttThreadContext_t *context = (mqttThreadContext_t*) malloc(sizeof(mqttThreadContext_t)); mqttThreadContext_t *context = (mqttThreadContext_t*) malloc(sizeof(mqttThreadContext_t));
handle->context = (void*)context; handle->mqttContext = (void*)context;
context->stopSignal = false; context->stopSignal = false;
context->watchdogCounter = 0; context->watchdogCounter = 0;

View File

@ -8,11 +8,20 @@
void ringbufferInit(ringbuffer_t *handle) { void ringbufferInit(ringbuffer_t *handle) {
handle->bufferReadIdx = 0; handle->bufferReadIdx = 0;
handle->bufferWriteIdx = 0; handle->bufferWriteIdx = 0;
handle->interrupted = false;
pthread_mutex_init(&(handle->eventMutex), NULL); pthread_mutex_init(&(handle->eventMutex), NULL);
pthread_cond_init(&(handle->eventSignal), NULL); pthread_cond_init(&(handle->eventSignal), NULL);
fprintf(stderr, "ringbuffer initialized\n"); fprintf(stderr, "ringbuffer initialized\n");
} }
void ringbufferInterrupt(ringbuffer_t *handle) {
handle->interrupted = true;
pthread_mutex_lock(&(handle->eventMutex));
pthread_cond_signal(&(handle->eventSignal));
pthread_mutex_unlock(&(handle->eventMutex));
}
void ringbufferPut(ringbuffer_t *handle, void *f) { void ringbufferPut(ringbuffer_t *handle, void *f) {
if (handle->bufferWriteIdx == (BUFFER_SIZE - 1)) { if (handle->bufferWriteIdx == (BUFFER_SIZE - 1)) {
while (handle->bufferReadIdx == BUFFER_SIZE); while (handle->bufferReadIdx == BUFFER_SIZE);
@ -34,17 +43,23 @@ void ringbufferPut(ringbuffer_t *handle, void *f) {
void *ringbufferGet(ringbuffer_t *handle) { void *ringbufferGet(ringbuffer_t *handle) {
void *res = NULL;
if (handle->bufferReadIdx == handle->bufferWriteIdx) { if (handle->bufferReadIdx == handle->bufferWriteIdx) {
pthread_mutex_lock(&(handle->eventMutex)); pthread_mutex_lock(&(handle->eventMutex));
pthread_cond_wait(&(handle->eventSignal), &(handle->eventMutex)); pthread_cond_wait(&(handle->eventSignal), &(handle->eventMutex));
pthread_mutex_unlock(&(handle->eventMutex)); pthread_mutex_unlock(&(handle->eventMutex));
} }
void *res = handle->buffer[handle->bufferReadIdx]; if (handle->interrupted) {
handle->interrupted = false;
} else {
res = handle->buffer[handle->bufferReadIdx];
handle->bufferReadIdx++; handle->bufferReadIdx++;
if (handle->bufferReadIdx > BUFFER_SIZE) { if (handle->bufferReadIdx > BUFFER_SIZE) {
handle->bufferReadIdx = 0; handle->bufferReadIdx = 0;
} }
}
return res; return res;
} }

View File

@ -3,7 +3,7 @@
#include <stdint.h> #include <stdint.h>
#include <pthread.h> #include <pthread.h>
#include <stdbool.h>
#define BUFFER_SIZE 256 #define BUFFER_SIZE 256
@ -11,6 +11,7 @@ typedef struct {
void* buffer[BUFFER_SIZE+5]; void* buffer[BUFFER_SIZE+5];
uint32_t bufferReadIdx; uint32_t bufferReadIdx;
uint32_t bufferWriteIdx; uint32_t bufferWriteIdx;
bool interrupted;
pthread_mutex_t eventMutex; pthread_mutex_t eventMutex;
pthread_cond_t eventSignal; pthread_cond_t eventSignal;
} ringbuffer_t; } ringbuffer_t;
@ -18,5 +19,6 @@ typedef struct {
void ringbufferInit(ringbuffer_t *handle); void ringbufferInit(ringbuffer_t *handle);
void ringbufferPut(ringbuffer_t *handle, void *f); void ringbufferPut(ringbuffer_t *handle, void *f);
void *ringbufferGet(ringbuffer_t *handle); void *ringbufferGet(ringbuffer_t *handle);
void ringbufferInterrupt(ringbuffer_t *handle);
#endif // _RINGBUFFER_H_ #endif // _RINGBUFFER_H_

26
sink.c
View File

@ -1,17 +1,37 @@
#include <pthread.h> #include <pthread.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdbool.h>
#include "commonTypes.h" #include "commonTypes.h"
#include "ringbuffer.h" #include "ringbuffer.h"
typedef struct {
bool stopSignal;
} sinkThreadContext_t;
void sinkStop(void *ptr) {
commonThreadHandle_t *handle = (commonThreadHandle_t*)ptr;
sinkThreadContext_t *context = (sinkThreadContext_t*)handle->sinkContext;
context->stopSignal = true;
fprintf(stderr, "sink thread, stop flagged\n");
}
void *sink(void *ptr) { void *sink(void *ptr) {
fprintf(stderr, "sink entered\n"); fprintf(stderr, "sink entered\n");
commonThreadHandle_t *handle = (commonThreadHandle_t*)ptr; commonThreadHandle_t *handle = (commonThreadHandle_t*)ptr;
sinkThreadContext_t *context = (sinkThreadContext_t*) malloc(sizeof(sinkThreadContext_t));
handle->sinkContext = (void*)context;
context->stopSignal = false;
while (1) { while (! context->stopSignal) {
auditItem_t *auditItem = (auditItem_t*)ringbufferGet(handle->ringbuffer); auditItem_t *auditItem = (auditItem_t*)ringbufferGet(handle->ringbuffer);
if (auditItem != NULL) {
printf("AuditItem: Time: %ld\n", auditItem->ts); printf("AuditItem: Time: %ld\n", auditItem->ts);
printf("AuditItem: Topic: %s\n", auditItem->topic); printf("AuditItem: Topic: %s\n", auditItem->topic);
printf("AuditItem: Payload: %s\n", auditItem->payload); printf("AuditItem: Payload: %s\n", auditItem->payload);
@ -25,3 +45,7 @@ void *sink(void *ptr) {
} }
} }
fprintf(stderr, "sink thread stopped\n");
return (void*)NULL;
}

2
sink.h
View File

@ -2,5 +2,7 @@
#define _SINK_H_ #define _SINK_H_
void *sink(void *ptr); void *sink(void *ptr);
void sinkStop(void *ptr);
#endif // _SINK_H_ #endif // _SINK_H_