172 lines
5.8 KiB
C
172 lines
5.8 KiB
C
#include <pthread.h>
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <stdbool.h>
|
|
#include <unistd.h>
|
|
#include <string.h>
|
|
#include <mariadb/mysql.h>
|
|
#include "commonTypes.h"
|
|
#include "ringbuffer.h"
|
|
|
|
|
|
const char SINK_DBHOST_KEY[] = "sinkDbHost";
|
|
const char DEFAULT_SINK_DBHOST[] = "mariadb";
|
|
const char SINK_DBPORT_KEY[] = "sinkDbPort";
|
|
const int DEFAULT_SINK_DBPORT = 3306;
|
|
const char SINK_DBUSER_KEY[] = "sinkDbUser";
|
|
const char SINK_DBPASS_KEY[] = "sinkDbPass";
|
|
const char SINK_DBNAME_KEY[] = "sinkDbName";
|
|
const char DEFAULT_SINK_DBNAME[] = "auditdb";
|
|
|
|
|
|
typedef struct {
|
|
bool stopSignal;
|
|
ringbuffer_t *ringbuffer;
|
|
const char *dbHost;
|
|
int dbPort;
|
|
const char *dbUser;
|
|
const char *dbPass;
|
|
const char *dbName;
|
|
MYSQL *mysql;
|
|
} mariadbsinkThreadContext_t;
|
|
|
|
|
|
|
|
void mariadbsinkStop(void *ptr) {
|
|
commonThreadHandle_t *handle = (commonThreadHandle_t*)ptr;
|
|
mariadbsinkThreadContext_t *context = (mariadbsinkThreadContext_t*)handle->sinkContext;
|
|
|
|
context->stopSignal = true;
|
|
fprintf(stderr, "mariadb sink thread, stop flagged\n");
|
|
}
|
|
|
|
|
|
mariadbsinkThreadContext_t *_mariadbsinkInit(commonThreadHandle_t *handle) {
|
|
mariadbsinkThreadContext_t *context = (mariadbsinkThreadContext_t*) malloc(sizeof(mariadbsinkThreadContext_t));
|
|
handle->sinkContext = (void*)context;
|
|
|
|
context->stopSignal = false;
|
|
context->ringbuffer = handle->ringbuffer;
|
|
|
|
if (! config_lookup_string(handle->config, SINK_DBHOST_KEY, &(context->dbHost))) {
|
|
context->dbHost = DEFAULT_SINK_DBHOST;
|
|
}
|
|
if (! config_lookup_string(handle->config, SINK_DBNAME_KEY, &(context->dbName))) {
|
|
context->dbName = DEFAULT_SINK_DBNAME;
|
|
}
|
|
if (! config_lookup_string(handle->config, SINK_DBUSER_KEY, &(context->dbUser))) {
|
|
fprintf(stderr, "MariaDB Sink: no database user configured, failing\n");
|
|
return NULL;
|
|
}
|
|
if (! config_lookup_string(handle->config, SINK_DBPASS_KEY, &(context->dbPass))) {
|
|
fprintf(stderr, "MariaDB Sink: no database password configured, failing\n");
|
|
return NULL;
|
|
}
|
|
if (! config_lookup_int(handle->config, SINK_DBPORT_KEY, &(context->dbPort))) {
|
|
context->dbPort = DEFAULT_SINK_DBPORT;
|
|
}
|
|
|
|
context->mysql = mysql_init(NULL);
|
|
if (! mysql_real_connect(context->mysql, context->dbHost, context->dbUser, context->dbPass,
|
|
context->dbName, context->dbPort, NULL, 0)) {
|
|
fprintf(stderr, "MariaDB Sink: Initial connect to database server fails: %d, %s\n",
|
|
mysql_errno(context->mysql), mysql_error(context->mysql));
|
|
return NULL;
|
|
}
|
|
fprintf(stderr, "MariaDB Sink: Successfully connected to database server\n");
|
|
mysql_autocommit(context->mysql, 1);
|
|
|
|
|
|
return context;
|
|
}
|
|
|
|
int _mariadbsinkExec(mariadbsinkThreadContext_t *context) {
|
|
const char statementFormatString[] = "INSERT INTO audittrail (ts, topic, payload) VALUES (FROM_UNIXTIME(%ld), '%s', '%s')";
|
|
|
|
while (! context->stopSignal) {
|
|
auditItem_t *auditItem = (auditItem_t*)ringbufferPeek(context->ringbuffer);
|
|
|
|
if (auditItem != NULL) {
|
|
printf("AuditItem: Time: %ld\n", auditItem->ts);
|
|
printf("AuditItem: Topic: %s\n", auditItem->topic);
|
|
printf("AuditItem: Payload: %s\n", auditItem->payload);
|
|
|
|
char *escapedTopic = (char*)malloc(strlen(auditItem->topic)*2+1);
|
|
mysql_real_escape_string(context->mysql, escapedTopic, auditItem->topic, strlen(auditItem->topic));
|
|
|
|
char *escapedPayload = (char*)malloc(strlen(auditItem->payload)*2+1);
|
|
mysql_real_escape_string(context->mysql, escapedPayload, auditItem->payload, strlen(auditItem->payload));
|
|
|
|
size_t bufferLen = sizeof(statementFormatString) +
|
|
9 + // maximum characters in timestamp number
|
|
strlen(escapedTopic) +
|
|
strlen(escapedPayload) +
|
|
10; // reserve
|
|
char *buffer = (char*) malloc(bufferLen);
|
|
memset(buffer, 0, bufferLen);
|
|
sprintf(buffer, statementFormatString, auditItem->ts, escapedTopic, escapedPayload);
|
|
fprintf(stderr, "STMT: %s\n", buffer);
|
|
|
|
if (mysql_query(context->mysql, (const char*) buffer)) {
|
|
fprintf(stderr, "MariaDB Sink: failed to execute query %s: %d, %s\n", buffer,
|
|
mysql_errno(context->mysql), mysql_error(context->mysql));
|
|
} else {
|
|
fprintf(stderr, "MariaDB Sink: query successfully executed\n");
|
|
ringbufferRemove(context->ringbuffer);
|
|
free(auditItem->topic);
|
|
auditItem->topic = NULL;
|
|
free(auditItem->payload);
|
|
auditItem->payload = NULL;
|
|
free(auditItem);
|
|
auditItem = NULL;
|
|
}
|
|
|
|
|
|
free(escapedTopic);
|
|
escapedTopic = NULL;
|
|
free(escapedPayload);
|
|
escapedPayload = NULL;
|
|
free(buffer);
|
|
buffer = NULL;
|
|
}
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
int _mariadbsinkDeInit(commonThreadHandle_t *handle) {
|
|
mariadbsinkThreadContext_t *context = (mariadbsinkThreadContext_t*) handle->sinkContext;
|
|
|
|
if (context) {
|
|
if (context->mysql) {
|
|
mysql_close(context->mysql);
|
|
context->mysql = NULL;
|
|
}
|
|
free(context);
|
|
handle->sinkContext = NULL;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
|
|
|
|
void *mariadbsink(void *ptr) {
|
|
fprintf(stderr, "mariadb sink entered\n");
|
|
|
|
mariadbsinkThreadContext_t *context = _mariadbsinkInit((commonThreadHandle_t*)ptr);
|
|
|
|
if (context != NULL) {
|
|
_mariadbsinkExec(context);
|
|
} else {
|
|
fprintf(stderr, "mariadb sink thread failed\n");
|
|
}
|
|
|
|
_mariadbsinkDeInit((commonThreadHandle_t*)ptr);
|
|
|
|
fprintf(stderr, "mariadb sink thread stopped\n");
|
|
|
|
return (void*)NULL;
|
|
}
|
|
|