add mariadbsink

This commit is contained in:
Wolfgang Hottgenroth 2020-06-19 07:46:09 +00:00
parent 306d047f23
commit 772cbdb82e
5 changed files with 87 additions and 3 deletions

View File

@ -11,7 +11,7 @@ VERSION:=$(shell cat VERSION)
.PHONY: all
all: mqttauditing
mqttauditing: mqttauditing.o mqttreceiver.o mqtttopicmatcher.o sink.o logging.o ringbuffer.o version.o
mqttauditing: mqttauditing.o mqttreceiver.o mqtttopicmatcher.o sink.o mariadbsink.o logging.o ringbuffer.o version.o
$(CC) -o $@ $(LDFLAGS) $^
tests: tests.o ringbuffer.o mqtttopicmatcher.o

51
mariadbsink.c Normal file
View File

@ -0,0 +1,51 @@
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include "commonTypes.h"
#include "ringbuffer.h"
typedef struct {
bool stopSignal;
} mariadbsinkThreadContext_t;
void mariadbsinkStop(void *ptr) {
commonThreadHandle_t *handle = (commonThreadHandle_t*)ptr;
mariadbsinkThreadContext_t *context = (mariadbsinkThreadContext_t*)handle->sinkContext;
context->stopSignal = true;
fprintf(stderr, "mariadb sink thread, stop flagged\n");
}
void *mariadbsink(void *ptr) {
fprintf(stderr, "mariadb sink entered\n");
commonThreadHandle_t *handle = (commonThreadHandle_t*)ptr;
mariadbsinkThreadContext_t *context = (mariadbsinkThreadContext_t*) malloc(sizeof(mariadbsinkThreadContext_t));
handle->sinkContext = (void*)context;
context->stopSignal = false;
while (! context->stopSignal) {
auditItem_t *auditItem = (auditItem_t*)ringbufferGet(handle->ringbuffer);
if (auditItem != NULL) {
printf("AuditItem: Time: %ld\n", auditItem->ts);
printf("AuditItem: Topic: %s\n", auditItem->topic);
printf("AuditItem: Payload: %s\n", auditItem->payload);
free(auditItem->topic);
auditItem->topic = NULL;
free(auditItem->payload);
auditItem->payload = NULL;
free(auditItem);
auditItem = NULL;
}
}
fprintf(stderr, "mariadb sink thread stopped\n");
return (void*)NULL;
}

8
mariadbsink.h Normal file
View File

@ -0,0 +1,8 @@
#ifndef _MARIADBSINK_H_
#define _MARIADBSINK_H_
void *mariadbsink(void *ptr);
void mariadbsinkStop(void *ptr);
#endif // _MARIADBSINK_H_

View File

@ -1,6 +1,7 @@
#include <stdint.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <libconfig.h>
#include <math.h>
@ -9,9 +10,13 @@
#include "logging.h"
#include "mqttreceiver.h"
#include "commonTypes.h"
#include "mariadbsink.h"
#include "sink.h"
const char SINK_KEY[] = "sink";
const char DEFAULT_SINK[] = "dummy";
extern char VERSION[];
extern uint32_t REFCNT;
@ -44,6 +49,24 @@ int main (void) {
commonThreadHandle.config = &cfg;
commonThreadHandle.ringbuffer = &ringbuffer;
const char *sinkType;
if (! config_lookup_string(&cfg, SINK_KEY, &sinkType)) {
sinkType = DEFAULT_SINK;
}
void* (*sinkFunc)(void*);
void (*sinkStopFunc)(void*);
if (0 == strcmp(sinkType, "dummy")) {
sinkFunc = sink;
sinkStopFunc = sinkStop;
} else if (0 == strcmp(sinkType, "mariadb")) {
sinkFunc = mariadbsink;
sinkStopFunc = mariadbsinkStop;
} else {
fprintf(stderr, "Unknown sink type %s\n", sinkType);
exit(1);
}
pthread_t mqttThread;
int err = pthread_create(&mqttThread, NULL, mqttreceiver, (void*) &commonThreadHandle);
@ -53,7 +76,7 @@ int main (void) {
}
pthread_t sinkThread;
err = pthread_create(&sinkThread, NULL, sink, (void*) &commonThreadHandle);
err = pthread_create(&sinkThread, NULL, sinkFunc, (void*) &commonThreadHandle);
if (err != 0) {
fprintf(stderr, "Unable to create sink receiver thread: %d\n", err);
exit(1);
@ -66,7 +89,7 @@ int main (void) {
pthread_join(mqttThread, NULL);
fprintf(stderr, "mqtt receiver thread joined\n");
sinkStop((void*) &commonThreadHandle);
sinkStopFunc((void*) &commonThreadHandle);
ringbufferInterrupt(&ringbuffer);
pthread_join(sinkThread, NULL);
fprintf(stderr, "sink thread joined\n");

View File

@ -3,3 +3,5 @@
mqttBroker = "broker"
mqttAuditTopic = "Iot/MqttAuditing/#"
mqttWatchdogGraceTime = 600
sink = "mariadb"