From dd2639c4befc730d750752aa0b60c84892e87a18 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Thu, 18 Jun 2020 14:28:38 +0000 Subject: [PATCH] rework env start script, add sink and add mqtt reconnect --- Makefile | 2 +- handleType.h => commonTypes.h | 10 +++++++- mqttauditing.c | 11 ++++++++- mqttauditing.cfg | 5 ++-- mqttreceiver.c | 43 +++++++++++++++++++++++++++++++++-- sink.c | 27 ++++++++++++++++++++++ sink.h | 6 +++++ startBuildEnv.sh | 37 ++++++++++++++++++++++++++++-- 8 files changed, 132 insertions(+), 9 deletions(-) rename handleType.h => commonTypes.h (61%) create mode 100644 sink.c create mode 100644 sink.h diff --git a/Makefile b/Makefile index 3bd6c38..5fee18e 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ VERSION:=$(shell cat VERSION) .PHONY: all all: mqttauditing -mqttauditing: mqttauditing.o mqttreceiver.o mqtttopicmatcher.o logging.o ringbuffer.o version.o +mqttauditing: mqttauditing.o mqttreceiver.o mqtttopicmatcher.o sink.o logging.o ringbuffer.o version.o $(CC) -o $@ $(LDFLAGS) $^ tests: tests.o ringbuffer.o mqtttopicmatcher.o diff --git a/handleType.h b/commonTypes.h similarity index 61% rename from handleType.h rename to commonTypes.h index 6ed603d..530b881 100644 --- a/handleType.h +++ b/commonTypes.h @@ -1,6 +1,7 @@ #ifndef _HANDLETYPE_H_ #define _HANDLETYPE_H_ +#include #include #include "ringbuffer.h" @@ -11,4 +12,11 @@ typedef struct { void* context; } commonThreadHandle_t; -#endif // _HANDLETYPE_H_ \ No newline at end of file +typedef struct { + time_t ts; + char *topic; + char *payload; +} auditItem_t; + + +#endif // _HANDLETYPE_H_ diff --git a/mqttauditing.c b/mqttauditing.c index 541bf93..b2fceb6 100644 --- a/mqttauditing.c +++ b/mqttauditing.c @@ -8,7 +8,8 @@ #include "ringbuffer.h" #include "logging.h" #include "mqttreceiver.h" -#include "handleType.h" +#include "commonTypes.h" +#include "sink.h" @@ -51,6 +52,14 @@ int main (void) { exit(1); } + pthread_t sinkThread; + err = pthread_create(&sinkThread, NULL, sink, (void*) &commonThreadHandle); + if (err != 0) { + fprintf(stderr, "Unable to create sink receiver thread: %d\n", err); + exit(1); + } + + fprintf(stderr, "started.\n"); diff --git a/mqttauditing.cfg b/mqttauditing.cfg index b37f5e0..337e0d4 100644 --- a/mqttauditing.cfg +++ b/mqttauditing.cfg @@ -1,4 +1,5 @@ # mqttauditing file -mqttBroker = "172.16.2.16:1883" -mqttAuditTopic = "Iot/MqttAuditing/#" \ No newline at end of file +mqttBroker = "broker" +mqttAuditTopic = "Iot/MqttAuditing/#" +mqttWatchdogGraceTime = 600 diff --git a/mqttreceiver.c b/mqttreceiver.c index c355249..9029291 100644 --- a/mqttreceiver.c +++ b/mqttreceiver.c @@ -6,7 +6,8 @@ #include #include #include -#include "handleType.h" +#include +#include "commonTypes.h" #include "ringbuffer.h" #include "mqtttopicmatcher.h" @@ -30,6 +31,7 @@ const char MQTT_WATCHDOG_GRACE_TIME_KEY[] = "mqttWatchdogGraceTime"; const int DEFAULT_MQTT_WATCHDOG_GRACE_TIME = 15; typedef struct { + MQTTClient *client; const char *broker; const char *clientId; const char *statusTopic; @@ -66,6 +68,11 @@ int on_message(void *kontext, char *topicName, int topicLen, MQTTClient_message if (cmpTopicWithWildcard((char*)context->auditTopic, topicName) == 0) { printf("Audit message received\n"); + auditItem_t *auditItem = (auditItem_t*)malloc(sizeof(auditItem_t)); + auditItem->ts = time(NULL); + auditItem->topic = strdup(topicName); + auditItem->payload = strdup(payload); + ringbufferPut(handle->ringbuffer, (void*)auditItem); } MQTTClient_freeMessage(&message); @@ -74,6 +81,37 @@ int on_message(void *kontext, char *topicName, int topicLen, MQTTClient_message } +void on_disconnect(void *kontext, char *cause) { + commonThreadHandle_t *handle = (commonThreadHandle_t*)kontext; + mqttThreadContext_t *context = (mqttThreadContext_t*)handle->context; + + fprintf(stderr, "disconnected from broker: %s trying to reconnect\n", cause); + + MQTTClient_connectOptions options = MQTTClient_connectOptions_initializer; + while (1) { + fprintf(stderr, "trying ...\n"); + int res = MQTTClient_connect(*(context->client), &options); + if (res == MQTTCLIENT_SUCCESS) { + fprintf(stderr, "Connected to MQTT broker\n"); + if (MQTTClient_subscribe(*(context->client), context->watchdogTopic, 0) == MQTTCLIENT_SUCCESS) { + fprintf(stderr, "Subscribed to %s\n", context->watchdogTopic); + }; + + if (MQTTClient_subscribe(*(context->client), context->commandTopic, 0) == MQTTCLIENT_SUCCESS) { + fprintf(stderr, "Subscribed to %s\n", context->commandTopic); + } + + if (MQTTClient_subscribe(*(context->client), context->auditTopic, 0) == MQTTCLIENT_SUCCESS) { + fprintf(stderr, "Subscribed to %s\n", context->auditTopic); + } + break; + } + sleep(1); + } + fprintf(stderr, "Connected\n"); +} + + @@ -129,12 +167,13 @@ void *mqttreceiver(void *ptr) { } MQTTClient client; + context->client = &client; MQTTClient_create(&client, context->broker, context->clientId, MQTTCLIENT_PERSISTENCE_NONE, NULL); MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; //conn_opts.username = "<>/<>"; //conn_opts.password = "<>"; - MQTTClient_setCallbacks(client, (void*)handle, NULL, on_message, NULL); + MQTTClient_setCallbacks(client, (void*)handle, on_disconnect, on_message, NULL); int rc; if ((rc = MQTTClient_connect(client, &conn_opts)) == MQTTCLIENT_SUCCESS) { diff --git a/sink.c b/sink.c new file mode 100644 index 0000000..b141ca8 --- /dev/null +++ b/sink.c @@ -0,0 +1,27 @@ +#include +#include +#include +#include "commonTypes.h" +#include "ringbuffer.h" + +void *sink(void *ptr) { + fprintf(stderr, "sink entered\n"); + + commonThreadHandle_t *handle = (commonThreadHandle_t*)ptr; + + while (1) { + auditItem_t *auditItem = (auditItem_t*)ringbufferGet(handle->ringbuffer); + + printf("AuditItem: Time: %ld\n", auditItem->ts); + printf("AuditItem: Topic: %s\n", auditItem->topic); + printf("AuditItem: Payload: %s\n", auditItem->payload); + + free(auditItem->topic); + auditItem->topic = NULL; + free(auditItem->payload); + auditItem->payload = NULL; + free(auditItem); + auditItem = NULL; + } +} + diff --git a/sink.h b/sink.h new file mode 100644 index 0000000..c0724d4 --- /dev/null +++ b/sink.h @@ -0,0 +1,6 @@ +#ifndef _SINK_H_ +#define _SINK_H_ + +void *sink(void *ptr); + +#endif // _SINK_H_ diff --git a/startBuildEnv.sh b/startBuildEnv.sh index d3f6a70..d6e8926 100755 --- a/startBuildEnv.sh +++ b/startBuildEnv.sh @@ -1,6 +1,39 @@ #!/bin/bash + IMAGE=registry.hottis.de/dockerized/c-build-env:latest -docker pull $IMAGE -docker run -it --rm -v $PWD:/work $IMAGE bash +OPT_BROKER="" +OPT_ROOT="" + +while getopts bphr flag +do + case "${flag}" in + b) echo "Start broker"; + docker run -d --rm --name broker -p 1883:1883 eclipse-mosquitto; + OPT_BROKER="--link broker"; + ;; + p) echo "Pull environment image"; + docker pull $IMAGE; + ;; + r) echo "Run environment as root"; + OPT_ROOT="-u 0"; + ;; + h) echo "Usage:"; + echo "-b: Start broker"; + echo "-p: Pull new environment image"; + echo "-r: Run environment as root"; + echo "-h: This help"; + exit 0; + ;; + esac +done + + +docker run -it --rm $OPT_BROKER $OPT_ROOT -v $PWD:/work $IMAGE bash + +if [ "$OPT_BROKER" != "" ]; then + echo "Stopping broker" + docker stop broker +fi +