From 82803985aa15813e3c0cd4cd708e8b72d6383422 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Mon, 22 Jun 2020 15:26:04 +0000 Subject: [PATCH] database stuff, failure handling not working yet --- .gitignore | 1 + Makefile | 2 +- mariadbsink.c | 117 +++++++++++++++++++++++++++++++++++++++++++---- mqttauditing.cfg | 3 ++ ringbuffer.c | 23 ++++++++++ ringbuffer.h | 2 + 6 files changed, 137 insertions(+), 11 deletions(-) diff --git a/.gitignore b/.gitignore index 40c1711..cb5441b 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ *~ .*~ .bash_history +core mqttauditing tests paho diff --git a/Makefile b/Makefile index 59ceb8a..b04c5d0 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ CC=gcc CFLAGS=-Wall -LDFLAGS=-lconfig -lpthread -lpaho-mqtt3c +LDFLAGS=-lconfig -lpthread -lpaho-mqtt3c -lmariadb INST_DIR=/opt/sbin diff --git a/mariadbsink.c b/mariadbsink.c index e157564..71d6804 100644 --- a/mariadbsink.c +++ b/mariadbsink.c @@ -2,16 +2,36 @@ #include #include #include +#include +#include +#include #include "commonTypes.h" #include "ringbuffer.h" +const char SINK_DBHOST_KEY[] = "sinkDbHost"; +const char DEFAULT_SINK_DBHOST[] = "mariadb"; +const char SINK_DBPORT_KEY[] = "sinkDbPort"; +const int DEFAULT_SINK_DBPORT = 3306; +const char SINK_DBUSER_KEY[] = "sinkDbUser"; +const char SINK_DBPASS_KEY[] = "sinkDbPass"; +const char SINK_DBNAME_KEY[] = "sinkDbName"; +const char DEFAULT_SINK_DBNAME[] = "auditdb"; + typedef struct { bool stopSignal; ringbuffer_t *ringbuffer; + const char *dbHost; + int dbPort; + const char *dbUser; + const char *dbPass; + const char *dbName; + MYSQL *mysql; } mariadbsinkThreadContext_t; + + void mariadbsinkStop(void *ptr) { commonThreadHandle_t *handle = (commonThreadHandle_t*)ptr; mariadbsinkThreadContext_t *context = (mariadbsinkThreadContext_t*)handle->sinkContext; @@ -20,6 +40,7 @@ void mariadbsinkStop(void *ptr) { fprintf(stderr, "mariadb sink thread, stop flagged\n"); } + mariadbsinkThreadContext_t *_mariadbsinkInit(commonThreadHandle_t *handle) { mariadbsinkThreadContext_t *context = (mariadbsinkThreadContext_t*) malloc(sizeof(mariadbsinkThreadContext_t)); handle->sinkContext = (void*)context; @@ -27,32 +48,103 @@ mariadbsinkThreadContext_t *_mariadbsinkInit(commonThreadHandle_t *handle) { context->stopSignal = false; context->ringbuffer = handle->ringbuffer; + if (! config_lookup_string(handle->config, SINK_DBHOST_KEY, &(context->dbHost))) { + context->dbHost = DEFAULT_SINK_DBHOST; + } + if (! config_lookup_string(handle->config, SINK_DBNAME_KEY, &(context->dbName))) { + context->dbName = DEFAULT_SINK_DBNAME; + } + if (! config_lookup_string(handle->config, SINK_DBUSER_KEY, &(context->dbUser))) { + fprintf(stderr, "MariaDB Sink: no database user configured, failing\n"); + return NULL; + } + if (! config_lookup_string(handle->config, SINK_DBPASS_KEY, &(context->dbPass))) { + fprintf(stderr, "MariaDB Sink: no database password configured, failing\n"); + return NULL; + } + if (! config_lookup_int(handle->config, SINK_DBPORT_KEY, &(context->dbPort))) { + context->dbPort = DEFAULT_SINK_DBPORT; + } + + context->mysql = mysql_init(NULL); + if (! mysql_real_connect(context->mysql, context->dbHost, context->dbUser, context->dbPass, + context->dbName, context->dbPort, NULL, 0)) { + fprintf(stderr, "MariaDB Sink: Initial connect to database server fails: %d, %s\n", + mysql_errno(context->mysql), mysql_error(context->mysql)); + return NULL; + } + fprintf(stderr, "MariaDB Sink: Successfully connected to database server\n"); + mysql_autocommit(context->mysql, 1); + + return context; } int _mariadbsinkExec(mariadbsinkThreadContext_t *context) { + const char statementFormatString[] = "INSERT INTO audittrail (ts, topic, payload) VALUES (FROM_UNIXTIME(%ld), '%s', '%s')"; + while (! context->stopSignal) { - auditItem_t *auditItem = (auditItem_t*)ringbufferGet(context->ringbuffer); + auditItem_t *auditItem = (auditItem_t*)ringbufferPeek(context->ringbuffer); if (auditItem != NULL) { printf("AuditItem: Time: %ld\n", auditItem->ts); printf("AuditItem: Topic: %s\n", auditItem->topic); printf("AuditItem: Payload: %s\n", auditItem->payload); - free(auditItem->topic); - auditItem->topic = NULL; - free(auditItem->payload); - auditItem->payload = NULL; - free(auditItem); - auditItem = NULL; + char *escapedTopic = (char*)malloc(strlen(auditItem->topic)*2+1); + mysql_real_escape_string(context->mysql, escapedTopic, auditItem->topic, strlen(auditItem->topic)); + + char *escapedPayload = (char*)malloc(strlen(auditItem->payload)*2+1); + mysql_real_escape_string(context->mysql, escapedPayload, auditItem->payload, strlen(auditItem->payload)); + + size_t bufferLen = sizeof(statementFormatString) + + 9 + // maximum characters in timestamp number + strlen(escapedTopic) + + strlen(escapedPayload) + + 10; // reserve + char *buffer = (char*) malloc(bufferLen); + memset(buffer, 0, bufferLen); + sprintf(buffer, statementFormatString, auditItem->ts, escapedTopic, escapedPayload); + fprintf(stderr, "STMT: %s\n", buffer); + + if (mysql_query(context->mysql, (const char*) buffer)) { + fprintf(stderr, "MariaDB Sink: failed to execute query %s: %d, %s\n", buffer, + mysql_errno(context->mysql), mysql_error(context->mysql)); + } else { + fprintf(stderr, "MariaDB Sink: query successfully executed\n"); + ringbufferRemove(context->ringbuffer); + free(auditItem->topic); + auditItem->topic = NULL; + free(auditItem->payload); + auditItem->payload = NULL; + free(auditItem); + auditItem = NULL; + } + + + free(escapedTopic); + escapedTopic = NULL; + free(escapedPayload); + escapedPayload = NULL; + free(buffer); + buffer = NULL; } } return 0; } int _mariadbsinkDeInit(commonThreadHandle_t *handle) { - free((mariadbsinkThreadContext_t*)handle->sinkContext); - handle->sinkContext = NULL; + mariadbsinkThreadContext_t *context = (mariadbsinkThreadContext_t*) handle->sinkContext; + + if (context) { + if (context->mysql) { + mysql_close(context->mysql); + context->mysql = NULL; + } + free(context); + handle->sinkContext = NULL; + } + return 0; } @@ -64,11 +156,16 @@ void *mariadbsink(void *ptr) { mariadbsinkThreadContext_t *context = _mariadbsinkInit((commonThreadHandle_t*)ptr); - _mariadbsinkExec(context); + if (context != NULL) { + _mariadbsinkExec(context); + } else { + fprintf(stderr, "mariadb sink thread failed\n"); + } _mariadbsinkDeInit((commonThreadHandle_t*)ptr); fprintf(stderr, "mariadb sink thread stopped\n"); + return (void*)NULL; } diff --git a/mqttauditing.cfg b/mqttauditing.cfg index 80e380c..a430068 100644 --- a/mqttauditing.cfg +++ b/mqttauditing.cfg @@ -5,3 +5,6 @@ mqttAuditTopic = "Iot/MqttAuditing/#" mqttWatchdogGraceTime = 600 sink = "mariadb" + +sinkDbUser = "auditadder" +sinkDbPass = "test123" diff --git a/ringbuffer.c b/ringbuffer.c index 58d323f..1f4a8ff 100644 --- a/ringbuffer.c +++ b/ringbuffer.c @@ -64,3 +64,26 @@ void *ringbufferGet(ringbuffer_t *handle) { } +void *ringbufferPeek(ringbuffer_t *handle) { + void *res = NULL; + + if (handle->bufferReadIdx == handle->bufferWriteIdx) { + pthread_mutex_lock(&(handle->eventMutex)); + pthread_cond_wait(&(handle->eventSignal), &(handle->eventMutex)); + pthread_mutex_unlock(&(handle->eventMutex)); + } + + if (handle->interrupted) { + handle->interrupted = false; + } else { + res = handle->buffer[handle->bufferReadIdx]; + } + return res; +} + +void ringbufferRemove(ringbuffer_t *handle) { + handle->bufferReadIdx++; + if (handle->bufferReadIdx > BUFFER_SIZE) { + handle->bufferReadIdx = 0; + } +} diff --git a/ringbuffer.h b/ringbuffer.h index 6f3c1e5..5241c1b 100644 --- a/ringbuffer.h +++ b/ringbuffer.h @@ -19,6 +19,8 @@ typedef struct { void ringbufferInit(ringbuffer_t *handle); void ringbufferPut(ringbuffer_t *handle, void *f); void *ringbufferGet(ringbuffer_t *handle); +void *ringbufferPeek(ringbuffer_t *handle); // get without remove +void ringbufferRemove(ringbuffer_t *handle); // remove element peeked before void ringbufferInterrupt(ringbuffer_t *handle); #endif // _RINGBUFFER_H_