102 lines
2.2 KiB
C
102 lines
2.2 KiB
C
#include <stdint.h>
|
|
#include <stdlib.h>
|
|
#include <stdio.h>
|
|
#include <string.h>
|
|
#include <unistd.h>
|
|
#include <libconfig.h>
|
|
#include <math.h>
|
|
|
|
#include "ringbuffer.h"
|
|
#include "logging.h"
|
|
#include "mqttreceiver.h"
|
|
#include "commonTypes.h"
|
|
#include "mariadbsink.h"
|
|
#include "sink.h"
|
|
|
|
|
|
const char SINK_KEY[] = "sink";
|
|
const char DEFAULT_SINK[] = "dummy";
|
|
|
|
|
|
extern char VERSION[];
|
|
extern uint32_t REFCNT;
|
|
|
|
|
|
config_t cfg;
|
|
ringbuffer_t ringbuffer;
|
|
|
|
commonThreadHandle_t commonThreadHandle;
|
|
|
|
|
|
|
|
void readConfig() {
|
|
config_init(&cfg);
|
|
if (! config_read_file(&cfg, "./mqttauditing.cfg")) {
|
|
logmsg(LOG_ERR, "failed to read config file: %s:%d - %s\n",
|
|
config_error_file(&cfg), config_error_line(&cfg),
|
|
config_error_text(&cfg));
|
|
config_destroy(&cfg);
|
|
exit(-1);
|
|
}
|
|
}
|
|
|
|
int main (void) {
|
|
fprintf(stderr, "VERSION: %s, REFCNT: %u\n", VERSION, REFCNT);
|
|
|
|
readConfig();
|
|
ringbufferInit(&ringbuffer);
|
|
|
|
commonThreadHandle.config = &cfg;
|
|
commonThreadHandle.ringbuffer = &ringbuffer;
|
|
|
|
const char *sinkType;
|
|
if (! config_lookup_string(&cfg, SINK_KEY, &sinkType)) {
|
|
sinkType = DEFAULT_SINK;
|
|
}
|
|
void* (*sinkFunc)(void*);
|
|
void (*sinkStopFunc)(void*);
|
|
if (0 == strcmp(sinkType, "dummy")) {
|
|
sinkFunc = sink;
|
|
sinkStopFunc = sinkStop;
|
|
} else if (0 == strcmp(sinkType, "mariadb")) {
|
|
sinkFunc = mariadbsink;
|
|
sinkStopFunc = mariadbsinkStop;
|
|
} else {
|
|
fprintf(stderr, "Unknown sink type %s\n", sinkType);
|
|
exit(1);
|
|
}
|
|
|
|
|
|
|
|
pthread_t mqttThread;
|
|
int err = pthread_create(&mqttThread, NULL, mqttreceiver, (void*) &commonThreadHandle);
|
|
if (err != 0) {
|
|
fprintf(stderr, "Unable to create mqtt receiver thread: %d\n", err);
|
|
exit(1);
|
|
}
|
|
|
|
pthread_t sinkThread;
|
|
err = pthread_create(&sinkThread, NULL, sinkFunc, (void*) &commonThreadHandle);
|
|
if (err != 0) {
|
|
fprintf(stderr, "Unable to create sink receiver thread: %d\n", err);
|
|
exit(1);
|
|
}
|
|
|
|
|
|
|
|
fprintf(stderr, "started.\n");
|
|
|
|
pthread_join(mqttThread, NULL);
|
|
fprintf(stderr, "mqtt receiver thread joined\n");
|
|
|
|
sinkStopFunc((void*) &commonThreadHandle);
|
|
ringbufferInterrupt(&ringbuffer);
|
|
pthread_join(sinkThread, NULL);
|
|
fprintf(stderr, "sink thread joined\n");
|
|
|
|
|
|
|
|
// will never be reached
|
|
config_destroy(&cfg);
|
|
}
|