From 772cbdb82e03e3acf95a241540fe6d03de27b7e5 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Fri, 19 Jun 2020 07:46:09 +0000 Subject: [PATCH] add mariadbsink --- Makefile | 2 +- mariadbsink.c | 51 ++++++++++++++++++++++++++++++++++++++++++++++++ mariadbsink.h | 8 ++++++++ mqttauditing.c | 27 +++++++++++++++++++++++-- mqttauditing.cfg | 2 ++ 5 files changed, 87 insertions(+), 3 deletions(-) create mode 100644 mariadbsink.c create mode 100644 mariadbsink.h diff --git a/Makefile b/Makefile index 5fee18e..59ceb8a 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ VERSION:=$(shell cat VERSION) .PHONY: all all: mqttauditing -mqttauditing: mqttauditing.o mqttreceiver.o mqtttopicmatcher.o sink.o logging.o ringbuffer.o version.o +mqttauditing: mqttauditing.o mqttreceiver.o mqtttopicmatcher.o sink.o mariadbsink.o logging.o ringbuffer.o version.o $(CC) -o $@ $(LDFLAGS) $^ tests: tests.o ringbuffer.o mqtttopicmatcher.o diff --git a/mariadbsink.c b/mariadbsink.c new file mode 100644 index 0000000..41653bd --- /dev/null +++ b/mariadbsink.c @@ -0,0 +1,51 @@ +#include +#include +#include +#include +#include "commonTypes.h" +#include "ringbuffer.h" + + + +typedef struct { + bool stopSignal; +} mariadbsinkThreadContext_t; + +void mariadbsinkStop(void *ptr) { + commonThreadHandle_t *handle = (commonThreadHandle_t*)ptr; + mariadbsinkThreadContext_t *context = (mariadbsinkThreadContext_t*)handle->sinkContext; + + context->stopSignal = true; + fprintf(stderr, "mariadb sink thread, stop flagged\n"); +} + + +void *mariadbsink(void *ptr) { + fprintf(stderr, "mariadb sink entered\n"); + + commonThreadHandle_t *handle = (commonThreadHandle_t*)ptr; + mariadbsinkThreadContext_t *context = (mariadbsinkThreadContext_t*) malloc(sizeof(mariadbsinkThreadContext_t)); + handle->sinkContext = (void*)context; + context->stopSignal = false; + + while (! context->stopSignal) { + auditItem_t *auditItem = (auditItem_t*)ringbufferGet(handle->ringbuffer); + + if (auditItem != NULL) { + printf("AuditItem: Time: %ld\n", auditItem->ts); + printf("AuditItem: Topic: %s\n", auditItem->topic); + printf("AuditItem: Payload: %s\n", auditItem->payload); + + free(auditItem->topic); + auditItem->topic = NULL; + free(auditItem->payload); + auditItem->payload = NULL; + free(auditItem); + auditItem = NULL; + } + } + + fprintf(stderr, "mariadb sink thread stopped\n"); + return (void*)NULL; +} + diff --git a/mariadbsink.h b/mariadbsink.h new file mode 100644 index 0000000..daeb2e4 --- /dev/null +++ b/mariadbsink.h @@ -0,0 +1,8 @@ +#ifndef _MARIADBSINK_H_ +#define _MARIADBSINK_H_ + +void *mariadbsink(void *ptr); +void mariadbsinkStop(void *ptr); + + +#endif // _MARIADBSINK_H_ diff --git a/mqttauditing.c b/mqttauditing.c index 3d7cbd2..e29cd69 100644 --- a/mqttauditing.c +++ b/mqttauditing.c @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -9,9 +10,13 @@ #include "logging.h" #include "mqttreceiver.h" #include "commonTypes.h" +#include "mariadbsink.h" #include "sink.h" +const char SINK_KEY[] = "sink"; +const char DEFAULT_SINK[] = "dummy"; + extern char VERSION[]; extern uint32_t REFCNT; @@ -44,6 +49,24 @@ int main (void) { commonThreadHandle.config = &cfg; commonThreadHandle.ringbuffer = &ringbuffer; + const char *sinkType; + if (! config_lookup_string(&cfg, SINK_KEY, &sinkType)) { + sinkType = DEFAULT_SINK; + } + void* (*sinkFunc)(void*); + void (*sinkStopFunc)(void*); + if (0 == strcmp(sinkType, "dummy")) { + sinkFunc = sink; + sinkStopFunc = sinkStop; + } else if (0 == strcmp(sinkType, "mariadb")) { + sinkFunc = mariadbsink; + sinkStopFunc = mariadbsinkStop; + } else { + fprintf(stderr, "Unknown sink type %s\n", sinkType); + exit(1); + } + + pthread_t mqttThread; int err = pthread_create(&mqttThread, NULL, mqttreceiver, (void*) &commonThreadHandle); @@ -53,7 +76,7 @@ int main (void) { } pthread_t sinkThread; - err = pthread_create(&sinkThread, NULL, sink, (void*) &commonThreadHandle); + err = pthread_create(&sinkThread, NULL, sinkFunc, (void*) &commonThreadHandle); if (err != 0) { fprintf(stderr, "Unable to create sink receiver thread: %d\n", err); exit(1); @@ -66,7 +89,7 @@ int main (void) { pthread_join(mqttThread, NULL); fprintf(stderr, "mqtt receiver thread joined\n"); - sinkStop((void*) &commonThreadHandle); + sinkStopFunc((void*) &commonThreadHandle); ringbufferInterrupt(&ringbuffer); pthread_join(sinkThread, NULL); fprintf(stderr, "sink thread joined\n"); diff --git a/mqttauditing.cfg b/mqttauditing.cfg index 337e0d4..80e380c 100644 --- a/mqttauditing.cfg +++ b/mqttauditing.cfg @@ -3,3 +3,5 @@ mqttBroker = "broker" mqttAuditTopic = "Iot/MqttAuditing/#" mqttWatchdogGraceTime = 600 + +sink = "mariadb"