mqttauditing/mariadbsink.c

172 lines
5.8 KiB
C

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <unistd.h>
#include <string.h>
#include <mariadb/mysql.h>
#include "commonTypes.h"
#include "ringbuffer.h"
const char SINK_DBHOST_KEY[] = "sinkDbHost";
const char DEFAULT_SINK_DBHOST[] = "mariadb";
const char SINK_DBPORT_KEY[] = "sinkDbPort";
const int DEFAULT_SINK_DBPORT = 3306;
const char SINK_DBUSER_KEY[] = "sinkDbUser";
const char SINK_DBPASS_KEY[] = "sinkDbPass";
const char SINK_DBNAME_KEY[] = "sinkDbName";
const char DEFAULT_SINK_DBNAME[] = "auditdb";
typedef struct {
bool stopSignal;
ringbuffer_t *ringbuffer;
const char *dbHost;
int dbPort;
const char *dbUser;
const char *dbPass;
const char *dbName;
MYSQL *mysql;
} mariadbsinkThreadContext_t;
void mariadbsinkStop(void *ptr) {
commonThreadHandle_t *handle = (commonThreadHandle_t*)ptr;
mariadbsinkThreadContext_t *context = (mariadbsinkThreadContext_t*)handle->sinkContext;
context->stopSignal = true;
fprintf(stderr, "mariadb sink thread, stop flagged\n");
}
mariadbsinkThreadContext_t *_mariadbsinkInit(commonThreadHandle_t *handle) {
mariadbsinkThreadContext_t *context = (mariadbsinkThreadContext_t*) malloc(sizeof(mariadbsinkThreadContext_t));
handle->sinkContext = (void*)context;
context->stopSignal = false;
context->ringbuffer = handle->ringbuffer;
if (! config_lookup_string(handle->config, SINK_DBHOST_KEY, &(context->dbHost))) {
context->dbHost = DEFAULT_SINK_DBHOST;
}
if (! config_lookup_string(handle->config, SINK_DBNAME_KEY, &(context->dbName))) {
context->dbName = DEFAULT_SINK_DBNAME;
}
if (! config_lookup_string(handle->config, SINK_DBUSER_KEY, &(context->dbUser))) {
fprintf(stderr, "MariaDB Sink: no database user configured, failing\n");
return NULL;
}
if (! config_lookup_string(handle->config, SINK_DBPASS_KEY, &(context->dbPass))) {
fprintf(stderr, "MariaDB Sink: no database password configured, failing\n");
return NULL;
}
if (! config_lookup_int(handle->config, SINK_DBPORT_KEY, &(context->dbPort))) {
context->dbPort = DEFAULT_SINK_DBPORT;
}
context->mysql = mysql_init(NULL);
if (! mysql_real_connect(context->mysql, context->dbHost, context->dbUser, context->dbPass,
context->dbName, context->dbPort, NULL, 0)) {
fprintf(stderr, "MariaDB Sink: Initial connect to database server fails: %d, %s\n",
mysql_errno(context->mysql), mysql_error(context->mysql));
return NULL;
}
fprintf(stderr, "MariaDB Sink: Successfully connected to database server\n");
mysql_autocommit(context->mysql, 1);
return context;
}
int _mariadbsinkExec(mariadbsinkThreadContext_t *context) {
const char statementFormatString[] = "INSERT INTO audittrail (ts, topic, payload) VALUES (FROM_UNIXTIME(%ld), '%s', '%s')";
while (! context->stopSignal) {
auditItem_t *auditItem = (auditItem_t*)ringbufferPeek(context->ringbuffer);
if (auditItem != NULL) {
printf("AuditItem: Time: %ld\n", auditItem->ts);
printf("AuditItem: Topic: %s\n", auditItem->topic);
printf("AuditItem: Payload: %s\n", auditItem->payload);
char *escapedTopic = (char*)malloc(strlen(auditItem->topic)*2+1);
mysql_real_escape_string(context->mysql, escapedTopic, auditItem->topic, strlen(auditItem->topic));
char *escapedPayload = (char*)malloc(strlen(auditItem->payload)*2+1);
mysql_real_escape_string(context->mysql, escapedPayload, auditItem->payload, strlen(auditItem->payload));
size_t bufferLen = sizeof(statementFormatString) +
9 + // maximum characters in timestamp number
strlen(escapedTopic) +
strlen(escapedPayload) +
10; // reserve
char *buffer = (char*) malloc(bufferLen);
memset(buffer, 0, bufferLen);
sprintf(buffer, statementFormatString, auditItem->ts, escapedTopic, escapedPayload);
fprintf(stderr, "STMT: %s\n", buffer);
if (mysql_query(context->mysql, (const char*) buffer)) {
fprintf(stderr, "MariaDB Sink: failed to execute query %s: %d, %s\n", buffer,
mysql_errno(context->mysql), mysql_error(context->mysql));
} else {
fprintf(stderr, "MariaDB Sink: query successfully executed\n");
ringbufferRemove(context->ringbuffer);
free(auditItem->topic);
auditItem->topic = NULL;
free(auditItem->payload);
auditItem->payload = NULL;
free(auditItem);
auditItem = NULL;
}
free(escapedTopic);
escapedTopic = NULL;
free(escapedPayload);
escapedPayload = NULL;
free(buffer);
buffer = NULL;
}
}
return 0;
}
int _mariadbsinkDeInit(commonThreadHandle_t *handle) {
mariadbsinkThreadContext_t *context = (mariadbsinkThreadContext_t*) handle->sinkContext;
if (context) {
if (context->mysql) {
mysql_close(context->mysql);
context->mysql = NULL;
}
free(context);
handle->sinkContext = NULL;
}
return 0;
}
void *mariadbsink(void *ptr) {
fprintf(stderr, "mariadb sink entered\n");
mariadbsinkThreadContext_t *context = _mariadbsinkInit((commonThreadHandle_t*)ptr);
if (context != NULL) {
_mariadbsinkExec(context);
} else {
fprintf(stderr, "mariadb sink thread failed\n");
}
_mariadbsinkDeInit((commonThreadHandle_t*)ptr);
fprintf(stderr, "mariadb sink thread stopped\n");
return (void*)NULL;
}