database stuff, failure handling not working yet

This commit is contained in:
Wolfgang Hottgenroth 2020-06-22 15:26:04 +00:00
parent 19eb7209ee
commit 82803985aa
6 changed files with 137 additions and 11 deletions

1
.gitignore vendored
View File

@ -3,6 +3,7 @@
*~
.*~
.bash_history
core
mqttauditing
tests
paho

View File

@ -1,7 +1,7 @@
CC=gcc
CFLAGS=-Wall
LDFLAGS=-lconfig -lpthread -lpaho-mqtt3c
LDFLAGS=-lconfig -lpthread -lpaho-mqtt3c -lmariadb
INST_DIR=/opt/sbin

View File

@ -2,16 +2,36 @@
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <unistd.h>
#include <string.h>
#include <mariadb/mysql.h>
#include "commonTypes.h"
#include "ringbuffer.h"
const char SINK_DBHOST_KEY[] = "sinkDbHost";
const char DEFAULT_SINK_DBHOST[] = "mariadb";
const char SINK_DBPORT_KEY[] = "sinkDbPort";
const int DEFAULT_SINK_DBPORT = 3306;
const char SINK_DBUSER_KEY[] = "sinkDbUser";
const char SINK_DBPASS_KEY[] = "sinkDbPass";
const char SINK_DBNAME_KEY[] = "sinkDbName";
const char DEFAULT_SINK_DBNAME[] = "auditdb";
typedef struct {
bool stopSignal;
ringbuffer_t *ringbuffer;
const char *dbHost;
int dbPort;
const char *dbUser;
const char *dbPass;
const char *dbName;
MYSQL *mysql;
} mariadbsinkThreadContext_t;
void mariadbsinkStop(void *ptr) {
commonThreadHandle_t *handle = (commonThreadHandle_t*)ptr;
mariadbsinkThreadContext_t *context = (mariadbsinkThreadContext_t*)handle->sinkContext;
@ -20,6 +40,7 @@ void mariadbsinkStop(void *ptr) {
fprintf(stderr, "mariadb sink thread, stop flagged\n");
}
mariadbsinkThreadContext_t *_mariadbsinkInit(commonThreadHandle_t *handle) {
mariadbsinkThreadContext_t *context = (mariadbsinkThreadContext_t*) malloc(sizeof(mariadbsinkThreadContext_t));
handle->sinkContext = (void*)context;
@ -27,32 +48,103 @@ mariadbsinkThreadContext_t *_mariadbsinkInit(commonThreadHandle_t *handle) {
context->stopSignal = false;
context->ringbuffer = handle->ringbuffer;
if (! config_lookup_string(handle->config, SINK_DBHOST_KEY, &(context->dbHost))) {
context->dbHost = DEFAULT_SINK_DBHOST;
}
if (! config_lookup_string(handle->config, SINK_DBNAME_KEY, &(context->dbName))) {
context->dbName = DEFAULT_SINK_DBNAME;
}
if (! config_lookup_string(handle->config, SINK_DBUSER_KEY, &(context->dbUser))) {
fprintf(stderr, "MariaDB Sink: no database user configured, failing\n");
return NULL;
}
if (! config_lookup_string(handle->config, SINK_DBPASS_KEY, &(context->dbPass))) {
fprintf(stderr, "MariaDB Sink: no database password configured, failing\n");
return NULL;
}
if (! config_lookup_int(handle->config, SINK_DBPORT_KEY, &(context->dbPort))) {
context->dbPort = DEFAULT_SINK_DBPORT;
}
context->mysql = mysql_init(NULL);
if (! mysql_real_connect(context->mysql, context->dbHost, context->dbUser, context->dbPass,
context->dbName, context->dbPort, NULL, 0)) {
fprintf(stderr, "MariaDB Sink: Initial connect to database server fails: %d, %s\n",
mysql_errno(context->mysql), mysql_error(context->mysql));
return NULL;
}
fprintf(stderr, "MariaDB Sink: Successfully connected to database server\n");
mysql_autocommit(context->mysql, 1);
return context;
}
int _mariadbsinkExec(mariadbsinkThreadContext_t *context) {
const char statementFormatString[] = "INSERT INTO audittrail (ts, topic, payload) VALUES (FROM_UNIXTIME(%ld), '%s', '%s')";
while (! context->stopSignal) {
auditItem_t *auditItem = (auditItem_t*)ringbufferGet(context->ringbuffer);
auditItem_t *auditItem = (auditItem_t*)ringbufferPeek(context->ringbuffer);
if (auditItem != NULL) {
printf("AuditItem: Time: %ld\n", auditItem->ts);
printf("AuditItem: Topic: %s\n", auditItem->topic);
printf("AuditItem: Payload: %s\n", auditItem->payload);
free(auditItem->topic);
auditItem->topic = NULL;
free(auditItem->payload);
auditItem->payload = NULL;
free(auditItem);
auditItem = NULL;
char *escapedTopic = (char*)malloc(strlen(auditItem->topic)*2+1);
mysql_real_escape_string(context->mysql, escapedTopic, auditItem->topic, strlen(auditItem->topic));
char *escapedPayload = (char*)malloc(strlen(auditItem->payload)*2+1);
mysql_real_escape_string(context->mysql, escapedPayload, auditItem->payload, strlen(auditItem->payload));
size_t bufferLen = sizeof(statementFormatString) +
9 + // maximum characters in timestamp number
strlen(escapedTopic) +
strlen(escapedPayload) +
10; // reserve
char *buffer = (char*) malloc(bufferLen);
memset(buffer, 0, bufferLen);
sprintf(buffer, statementFormatString, auditItem->ts, escapedTopic, escapedPayload);
fprintf(stderr, "STMT: %s\n", buffer);
if (mysql_query(context->mysql, (const char*) buffer)) {
fprintf(stderr, "MariaDB Sink: failed to execute query %s: %d, %s\n", buffer,
mysql_errno(context->mysql), mysql_error(context->mysql));
} else {
fprintf(stderr, "MariaDB Sink: query successfully executed\n");
ringbufferRemove(context->ringbuffer);
free(auditItem->topic);
auditItem->topic = NULL;
free(auditItem->payload);
auditItem->payload = NULL;
free(auditItem);
auditItem = NULL;
}
free(escapedTopic);
escapedTopic = NULL;
free(escapedPayload);
escapedPayload = NULL;
free(buffer);
buffer = NULL;
}
}
return 0;
}
int _mariadbsinkDeInit(commonThreadHandle_t *handle) {
free((mariadbsinkThreadContext_t*)handle->sinkContext);
handle->sinkContext = NULL;
mariadbsinkThreadContext_t *context = (mariadbsinkThreadContext_t*) handle->sinkContext;
if (context) {
if (context->mysql) {
mysql_close(context->mysql);
context->mysql = NULL;
}
free(context);
handle->sinkContext = NULL;
}
return 0;
}
@ -64,11 +156,16 @@ void *mariadbsink(void *ptr) {
mariadbsinkThreadContext_t *context = _mariadbsinkInit((commonThreadHandle_t*)ptr);
_mariadbsinkExec(context);
if (context != NULL) {
_mariadbsinkExec(context);
} else {
fprintf(stderr, "mariadb sink thread failed\n");
}
_mariadbsinkDeInit((commonThreadHandle_t*)ptr);
fprintf(stderr, "mariadb sink thread stopped\n");
return (void*)NULL;
}

View File

@ -5,3 +5,6 @@ mqttAuditTopic = "Iot/MqttAuditing/#"
mqttWatchdogGraceTime = 600
sink = "mariadb"
sinkDbUser = "auditadder"
sinkDbPass = "test123"

View File

@ -64,3 +64,26 @@ void *ringbufferGet(ringbuffer_t *handle) {
}
void *ringbufferPeek(ringbuffer_t *handle) {
void *res = NULL;
if (handle->bufferReadIdx == handle->bufferWriteIdx) {
pthread_mutex_lock(&(handle->eventMutex));
pthread_cond_wait(&(handle->eventSignal), &(handle->eventMutex));
pthread_mutex_unlock(&(handle->eventMutex));
}
if (handle->interrupted) {
handle->interrupted = false;
} else {
res = handle->buffer[handle->bufferReadIdx];
}
return res;
}
void ringbufferRemove(ringbuffer_t *handle) {
handle->bufferReadIdx++;
if (handle->bufferReadIdx > BUFFER_SIZE) {
handle->bufferReadIdx = 0;
}
}

View File

@ -19,6 +19,8 @@ typedef struct {
void ringbufferInit(ringbuffer_t *handle);
void ringbufferPut(ringbuffer_t *handle, void *f);
void *ringbufferGet(ringbuffer_t *handle);
void *ringbufferPeek(ringbuffer_t *handle); // get without remove
void ringbufferRemove(ringbuffer_t *handle); // remove element peeked before
void ringbufferInterrupt(ringbuffer_t *handle);
#endif // _RINGBUFFER_H_