rework env start script, add sink and add mqtt reconnect

This commit is contained in:
Wolfgang Hottgenroth 2020-06-18 14:28:38 +00:00
parent 44884a58ac
commit dd2639c4be
8 changed files with 132 additions and 9 deletions

View File

@ -11,7 +11,7 @@ VERSION:=$(shell cat VERSION)
.PHONY: all
all: mqttauditing
mqttauditing: mqttauditing.o mqttreceiver.o mqtttopicmatcher.o logging.o ringbuffer.o version.o
mqttauditing: mqttauditing.o mqttreceiver.o mqtttopicmatcher.o sink.o logging.o ringbuffer.o version.o
$(CC) -o $@ $(LDFLAGS) $^
tests: tests.o ringbuffer.o mqtttopicmatcher.o

View File

@ -1,6 +1,7 @@
#ifndef _HANDLETYPE_H_
#define _HANDLETYPE_H_
#include <time.h>
#include <libconfig.h>
#include "ringbuffer.h"
@ -11,4 +12,11 @@ typedef struct {
void* context;
} commonThreadHandle_t;
#endif // _HANDLETYPE_H_
typedef struct {
time_t ts;
char *topic;
char *payload;
} auditItem_t;
#endif // _HANDLETYPE_H_

View File

@ -8,7 +8,8 @@
#include "ringbuffer.h"
#include "logging.h"
#include "mqttreceiver.h"
#include "handleType.h"
#include "commonTypes.h"
#include "sink.h"
@ -51,6 +52,14 @@ int main (void) {
exit(1);
}
pthread_t sinkThread;
err = pthread_create(&sinkThread, NULL, sink, (void*) &commonThreadHandle);
if (err != 0) {
fprintf(stderr, "Unable to create sink receiver thread: %d\n", err);
exit(1);
}
fprintf(stderr, "started.\n");

View File

@ -1,4 +1,5 @@
# mqttauditing file
mqttBroker = "172.16.2.16:1883"
mqttAuditTopic = "Iot/MqttAuditing/#"
mqttBroker = "broker"
mqttAuditTopic = "Iot/MqttAuditing/#"
mqttWatchdogGraceTime = 600

View File

@ -6,7 +6,8 @@
#include <pthread.h>
#include <stdio.h>
#include <stdbool.h>
#include "handleType.h"
#include <time.h>
#include "commonTypes.h"
#include "ringbuffer.h"
#include "mqtttopicmatcher.h"
@ -30,6 +31,7 @@ const char MQTT_WATCHDOG_GRACE_TIME_KEY[] = "mqttWatchdogGraceTime";
const int DEFAULT_MQTT_WATCHDOG_GRACE_TIME = 15;
typedef struct {
MQTTClient *client;
const char *broker;
const char *clientId;
const char *statusTopic;
@ -66,6 +68,11 @@ int on_message(void *kontext, char *topicName, int topicLen, MQTTClient_message
if (cmpTopicWithWildcard((char*)context->auditTopic, topicName) == 0) {
printf("Audit message received\n");
auditItem_t *auditItem = (auditItem_t*)malloc(sizeof(auditItem_t));
auditItem->ts = time(NULL);
auditItem->topic = strdup(topicName);
auditItem->payload = strdup(payload);
ringbufferPut(handle->ringbuffer, (void*)auditItem);
}
MQTTClient_freeMessage(&message);
@ -74,6 +81,37 @@ int on_message(void *kontext, char *topicName, int topicLen, MQTTClient_message
}
void on_disconnect(void *kontext, char *cause) {
commonThreadHandle_t *handle = (commonThreadHandle_t*)kontext;
mqttThreadContext_t *context = (mqttThreadContext_t*)handle->context;
fprintf(stderr, "disconnected from broker: %s trying to reconnect\n", cause);
MQTTClient_connectOptions options = MQTTClient_connectOptions_initializer;
while (1) {
fprintf(stderr, "trying ...\n");
int res = MQTTClient_connect(*(context->client), &options);
if (res == MQTTCLIENT_SUCCESS) {
fprintf(stderr, "Connected to MQTT broker\n");
if (MQTTClient_subscribe(*(context->client), context->watchdogTopic, 0) == MQTTCLIENT_SUCCESS) {
fprintf(stderr, "Subscribed to %s\n", context->watchdogTopic);
};
if (MQTTClient_subscribe(*(context->client), context->commandTopic, 0) == MQTTCLIENT_SUCCESS) {
fprintf(stderr, "Subscribed to %s\n", context->commandTopic);
}
if (MQTTClient_subscribe(*(context->client), context->auditTopic, 0) == MQTTCLIENT_SUCCESS) {
fprintf(stderr, "Subscribed to %s\n", context->auditTopic);
}
break;
}
sleep(1);
}
fprintf(stderr, "Connected\n");
}
@ -129,12 +167,13 @@ void *mqttreceiver(void *ptr) {
}
MQTTClient client;
context->client = &client;
MQTTClient_create(&client, context->broker, context->clientId, MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
//conn_opts.username = "<<tenant_ID>>/<<username>>";
//conn_opts.password = "<<password>>";
MQTTClient_setCallbacks(client, (void*)handle, NULL, on_message, NULL);
MQTTClient_setCallbacks(client, (void*)handle, on_disconnect, on_message, NULL);
int rc;
if ((rc = MQTTClient_connect(client, &conn_opts)) == MQTTCLIENT_SUCCESS) {

27
sink.c Normal file
View File

@ -0,0 +1,27 @@
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include "commonTypes.h"
#include "ringbuffer.h"
void *sink(void *ptr) {
fprintf(stderr, "sink entered\n");
commonThreadHandle_t *handle = (commonThreadHandle_t*)ptr;
while (1) {
auditItem_t *auditItem = (auditItem_t*)ringbufferGet(handle->ringbuffer);
printf("AuditItem: Time: %ld\n", auditItem->ts);
printf("AuditItem: Topic: %s\n", auditItem->topic);
printf("AuditItem: Payload: %s\n", auditItem->payload);
free(auditItem->topic);
auditItem->topic = NULL;
free(auditItem->payload);
auditItem->payload = NULL;
free(auditItem);
auditItem = NULL;
}
}

6
sink.h Normal file
View File

@ -0,0 +1,6 @@
#ifndef _SINK_H_
#define _SINK_H_
void *sink(void *ptr);
#endif // _SINK_H_

View File

@ -1,6 +1,39 @@
#!/bin/bash
IMAGE=registry.hottis.de/dockerized/c-build-env:latest
docker pull $IMAGE
docker run -it --rm -v $PWD:/work $IMAGE bash
OPT_BROKER=""
OPT_ROOT=""
while getopts bphr flag
do
case "${flag}" in
b) echo "Start broker";
docker run -d --rm --name broker -p 1883:1883 eclipse-mosquitto;
OPT_BROKER="--link broker";
;;
p) echo "Pull environment image";
docker pull $IMAGE;
;;
r) echo "Run environment as root";
OPT_ROOT="-u 0";
;;
h) echo "Usage:";
echo "-b: Start broker";
echo "-p: Pull new environment image";
echo "-r: Run environment as root";
echo "-h: This help";
exit 0;
;;
esac
done
docker run -it --rm $OPT_BROKER $OPT_ROOT -v $PWD:/work $IMAGE bash
if [ "$OPT_BROKER" != "" ]; then
echo "Stopping broker"
docker stop broker
fi