From 034ad78ceb8aeab465e3a0157710419e6bff49e5 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Wed, 17 Jun 2020 13:53:54 +0200 Subject: [PATCH] mqttreceiver added --- .gitignore | 3 +- Makefile | 13 +++- mqttauditing.c | 11 +++- mqttauditing.cfg | 1 + mqttreceiver.c | 110 ++++++++++++++++++++++++++++++++++ mqttreceiver.h | 9 +++ ringbuffer.c | 2 + tests.c | 153 +++++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 295 insertions(+), 7 deletions(-) create mode 100644 mqttreceiver.c create mode 100644 mqttreceiver.h create mode 100644 tests.c diff --git a/.gitignore b/.gitignore index ec1e16b..42e68cb 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ .*~ .bash_history mqttauditing - +tests +paho diff --git a/Makefile b/Makefile index fb8213f..9ef0fa0 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ CC=gcc CFLAGS=-Wall -LDFLAGS=-lconfig +LDFLAGS=-lconfig -lpthread -lpaho-mqtt3c INST_DIR=/opt/sbin @@ -11,9 +11,12 @@ VERSION:=$(shell cat VERSION) .PHONY: all all: mqttauditing -mqttauditing: mqttauditing.o logging.o ringbuffer.o version.o +mqttauditing: mqttauditing.o mqttreceiver.o logging.o ringbuffer.o version.o $(CC) -o $@ $(LDFLAGS) $^ +tests: tests.o ringbuffer.o + $(CC) -o $@ $(LDFLAGS) -lcunit $^ + version.o: version.c VERSION $(CC) -DD_REFCNT=$(REFCNT) -DD_VERSION=\"$(VERSION)\" -c $< @@ -22,4 +25,8 @@ version.o: version.c VERSION .PHONY: clean clean: - -rm -f *.o mqttauditing + -rm -f *.o mqttauditing tests + +.PHONY: test +test: tests + ./tests diff --git a/mqttauditing.c b/mqttauditing.c index 60c7a1a..e8ac761 100644 --- a/mqttauditing.c +++ b/mqttauditing.c @@ -7,19 +7,19 @@ #include "ringbuffer.h" #include "logging.h" - +#include "mqttreceiver.h" extern char VERSION[]; extern uint32_t REFCNT; config_t cfg; - +t_ringbuffer ringbuffer; void readConfig() { config_init(&cfg); - if (! config_read_file(&cfg, "/opt/etc/mqttauditing.cfg")) { + if (! config_read_file(&cfg, "./mqttauditing.cfg")) { logmsg(LOG_ERR, "failed to read config file: %s:%d - %s\n", config_error_file(&cfg), config_error_line(&cfg), config_error_text(&cfg)); @@ -33,9 +33,14 @@ int main (void) { readConfig(); + ringbufferInit(&ringbuffer); + mqttreceiverInit(&cfg, &ringbuffer); fprintf(stderr, "started.\n"); + while (1) { + sleep(1); + } // will never be reached config_destroy(&cfg); diff --git a/mqttauditing.cfg b/mqttauditing.cfg index ab8ffdd..e009afc 100644 --- a/mqttauditing.cfg +++ b/mqttauditing.cfg @@ -1,2 +1,3 @@ # mqttauditing file +mqttBroker = "172.16.2.16:1883" \ No newline at end of file diff --git a/mqttreceiver.c b/mqttreceiver.c new file mode 100644 index 0000000..8a7110c --- /dev/null +++ b/mqttreceiver.c @@ -0,0 +1,110 @@ +#include +#include +#include +#include +#include +#include +#include +#include "ringbuffer.h" + + +const char MQTT_BROKER_KEY[] = "mqttBroker"; +const char DEFAULT_MQTT_BROKER[] = "127.0.0.1:1883"; +const char MQTT_CLIENTID_KEY[] = "mqttClientId"; +const char *generatedMqttClientId = NULL; + + + +typedef struct { + t_ringbuffer *ringbuffer; + char *mqttBroker; + char *mqttClientId; +} t_mqttThreadHandle; + +t_mqttThreadHandle mqttThreadHandle; + +pthread_t mqttThread; + +int on_message(void *context, char *topicName, int topicLen, MQTTClient_message *message) { + t_mqttThreadHandle *handle = (t_mqttThreadHandle*)context; + + char* payload = message->payload; + printf("Received operation %s\n", payload); + MQTTClient_freeMessage(&message); + MQTTClient_free(topicName); + return 1; +} + + + + + +void *mqttreceiverRun(void *ptr) { + fprintf(stderr, "mqttreceiverRun entered\n"); + + t_mqttThreadHandle *handle = (t_mqttThreadHandle*)ptr; + + MQTTClient client; + MQTTClient_create(&client, handle->mqttBroker, handle->mqttClientId, MQTTCLIENT_PERSISTENCE_NONE, NULL); + MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; + //conn_opts.username = "<>/<>"; + //conn_opts.password = "<>"; + + MQTTClient_setCallbacks(client, (void*)handle, NULL, on_message, NULL); + + int rc; + if ((rc = MQTTClient_connect(client, &conn_opts)) == MQTTCLIENT_SUCCESS) { + fprintf(stderr, "Connected to MQTT broker\n"); + MQTTClient_subscribe(client, "s/ds", 0); + + for (;;) { + MQTTClient_yield(); + sleep(3); + } + } + + + fprintf(stderr, "Failed to connect to MQTT broker, return code %d\n", rc); + + MQTTClient_disconnect(client, 1000); + MQTTClient_destroy(&client); + + + return rc; +} + + +int mqttreceiverInit(config_t *pCfg, t_ringbuffer *pRingbuffer) { + mqttThreadHandle.ringbuffer = pRingbuffer; + + if (! config_lookup_string(pCfg, MQTT_BROKER_KEY, &(mqttThreadHandle.mqttBroker))) { + mqttThreadHandle.mqttBroker = DEFAULT_MQTT_BROKER; + } + + if (! config_lookup_string(pCfg, MQTT_CLIENTID_KEY, &(mqttThreadHandle.mqttClientId))) { + const uint32_t sizeOfGeneratedMqttClientId = 32; + generatedMqttClientId = (char*) malloc(sizeOfGeneratedMqttClientId); + memset(generatedMqttClientId, 0, sizeOfGeneratedMqttClientId); + char myHostname[8]; + gethostname(myHostname, sizeof(myHostname)); + pid_t myPid = getpid(); + snprintf(generatedMqttClientId, sizeOfGeneratedMqttClientId-1, "%s-%u", myHostname, myPid); + mqttThreadHandle.mqttClientId = generatedMqttClientId; + } + + + fprintf(stderr, "Create mqtt receiver thread\n"); + int r = pthread_create(&mqttThread, NULL, mqttreceiverRun, (void*) &mqttThreadHandle); + fprintf(stderr, "pthread_create returns: %d\n", r); + +/* + pthread_join(mqttThread, NULL); + + + if (generatedMqttClientId != NULL) { + free(generatedMqttClientId); + } +*/ + + return 0; +} \ No newline at end of file diff --git a/mqttreceiver.h b/mqttreceiver.h new file mode 100644 index 0000000..54199e6 --- /dev/null +++ b/mqttreceiver.h @@ -0,0 +1,9 @@ +#ifndef _MQTTRECEIVER_H_ +#define _MQTTRECEIVER_H_ + +#include +#include "ringbuffer.h" + +int mqttreceiverInit(config_t *pCfg, t_ringbuffer *pRingbuffer); + +#endif // _MQTTRECEIVER_H_ \ No newline at end of file diff --git a/ringbuffer.c b/ringbuffer.c index ac60f1a..b7d1d36 100644 --- a/ringbuffer.c +++ b/ringbuffer.c @@ -1,5 +1,6 @@ #include #include +#include #include "ringbuffer.h" @@ -9,6 +10,7 @@ void ringbufferInit(t_ringbuffer *handle) { handle->bufferWriteIdx = 0; pthread_mutex_init(&(handle->eventMutex), NULL); pthread_cond_init(&(handle->eventSignal), NULL); + fprintf(stderr, "ringbuffer initialized\n"); } void ringbufferPut(t_ringbuffer *handle, void *f) { diff --git a/tests.c b/tests.c new file mode 100644 index 0000000..320e8b8 --- /dev/null +++ b/tests.c @@ -0,0 +1,153 @@ +#include +#include "CUnit/Basic.h" +#include +#include +#include +#include "ringbuffer.h" + + +t_ringbuffer rb; + + +int init_suite_ringbuffer(void) +{ + ringbufferInit(&rb); + return 0; +} + +int clean_suite_ringbuffer(void) +{ + return 0; +} + + + +void testRingbuffer1() { + uint32_t a = 1; + uint32_t b = 2; + uint32_t c = 3; + + ringbufferPut(&rb, &a); + ringbufferPut(&rb, &b); + ringbufferPut(&rb, &c); + + uint32_t *r = ringbufferGet(&rb); + CU_ASSERT(r == &a); + + r = ringbufferGet(&rb); + CU_ASSERT(r == &b); + + r = ringbufferGet(&rb); + CU_ASSERT(r == &c); +} + +void testRingbuffer2() { + uint32_t a = 4; + uint32_t b = 5; + uint32_t c = 6; + + ringbufferPut(&rb, &a); + ringbufferPut(&rb, &b); + ringbufferPut(&rb, &c); + + uint32_t *r = ringbufferGet(&rb); + CU_ASSERT(r == &a); + + r = ringbufferGet(&rb); + CU_ASSERT(r == &b); + + r = ringbufferGet(&rb); + CU_ASSERT(r == &c); +} + +void testRingbuffer3() { + for (uint32_t i = 0; i < BUFFER_SIZE + 25; i++) { + uint32_t *a = (uint32_t*) malloc(sizeof(uint32_t)); + // fprintf(stderr, "&a: %p\n", a); + ringbufferPut(&rb, a); + + uint32_t *r = ringbufferGet(&rb); + CU_ASSERT(r == a); + /* DO NOT free(r), otherwise always the same address is used */ + } +} + +void testRingbuffer4() { + #define TEST4SIZE 1000 + + uint32_t samples[TEST4SIZE]; + + for (uint32_t i = 0; i <= 100; i++) { + uint32_t *a = &(samples[i]); + ringbufferPut(&rb, a); + } + + for (uint32_t i = 0; i <= 25; i++) { + uint32_t *a = &(samples[i]); + uint32_t *b = ringbufferGet(&rb); + CU_ASSERT(a == b); + } + + for (uint32_t i = 101; i <= 125; i++) { + uint32_t *a = &(samples[i]); + ringbufferPut(&rb, a); + } + + for (uint32_t i = 26; i <= 51; i++) { + uint32_t *a = &(samples[i]); + uint32_t *b = ringbufferGet(&rb); + CU_ASSERT(a == b); + } + + for (uint32_t i = 126; i <= 200; i++) { + uint32_t *a = &(samples[i]); + ringbufferPut(&rb, a); + } + + for (uint32_t i = 52; i <= 190; i++) { + uint32_t *a = &(samples[i]); + uint32_t *b = ringbufferGet(&rb); + CU_ASSERT(a == b); + } + + for (uint32_t i = 201; i <= 300; i++) { + uint32_t *a = &(samples[i]); + ringbufferPut(&rb, a); + } + + for (uint32_t i = 191; i <= 300; i++) { + uint32_t *a = &(samples[i]); + uint32_t *b = ringbufferGet(&rb); + CU_ASSERT(a == b); + } +} + + + +int main() +{ + if (CUE_SUCCESS != CU_initialize_registry()) + return CU_get_error(); + + CU_pSuite ringbufferSuite = CU_add_suite("Suite_Ringbuffer", init_suite_ringbuffer, clean_suite_ringbuffer); + if (NULL == ringbufferSuite) { + CU_cleanup_registry(); + return CU_get_error(); + } + + if ((NULL == CU_add_test(ringbufferSuite, "test 1 of ringbuffer", testRingbuffer1)) || + (NULL == CU_add_test(ringbufferSuite, "test 2 of ringbuffer", testRingbuffer2)) || + (NULL == CU_add_test(ringbufferSuite, "test 3 of ringbuffer", testRingbuffer3)) || + (NULL == CU_add_test(ringbufferSuite, "test 4 of ringbuffer", testRingbuffer4)) || + 0 + ) + { + CU_cleanup_registry(); + return CU_get_error(); + } + + CU_basic_set_mode(CU_BRM_VERBOSE); + CU_basic_run_tests(); + CU_cleanup_registry(); + return CU_get_error(); +}