mqttauditing/mqttreceiver.c

230 lines
8.2 KiB
C

#include <libconfig.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <MQTTClient.h>
#include <pthread.h>
#include <stdio.h>
#include <stdbool.h>
#include <time.h>
#include "commonTypes.h"
#include "ringbuffer.h"
#include "mqtttopicmatcher.h"
const char MQTT_BROKER_KEY[] = "mqttBroker";
const char DEFAULT_MQTT_BROKER[] = "127.0.0.1:1883";
const char MQTT_CLIENTID_KEY[] = "mqttClientId";
const char *generatedMqttClientId = NULL;
const char MQTT_STATUS_TOPIC_KEY[] = "mqttStatusTopic";
const char DEFAULT_MQTT_STATUS_TOPIC[] = "Iot/MqttAuditing/Status";
const char MQTT_HEARTBEAT_TOPIC_KEY[] = "mqttHeartbeatTopic";
const char DEFAULT_MQTT_HEARTBEAT_TOPIC[] = "Iot/MqttAuditing/Heartbeat";
const char MQTT_COMMAND_TOPIC_KEY[] = "mqttCommandTopic";
const char DEFAULT_MQTT_COMMAND_TOPIC[] = "Iot/MqttAuditing/Command";
const char MQTT_AUDIT_TOPIC_KEY[] = "mqttAuditTopic";
const char DEFAULT_MQTT_AUDIT_TOPIC[] = "Iot/MqttAuditing/Audit";
const char MQTT_WATCHDOG_TOPIC_KEY[] = "mqttWatchdogTopic";
const char DEFAULT_MQTT_WATCHDOG_TOPIC[] = "Iot/MqttAuditing/Watchdog";
const char MQTT_WATCHDOG_GRACE_TIME_KEY[] = "mqttWatchdogGraceTime";
const int DEFAULT_MQTT_WATCHDOG_GRACE_TIME = 15;
typedef struct {
MQTTClient *client;
const char *broker;
const char *clientId;
const char *statusTopic;
const char *watchdogTopic;
const char *heartbeatTopic;
const char *commandTopic;
const char *auditTopic;
int watchdogGraceTime;
bool stopSignal;
uint32_t watchdogCounter;
} mqttThreadContext_t;
int on_message(void *kontext, char *topicName, int topicLen, MQTTClient_message *message) {
commonThreadHandle_t *handle = (commonThreadHandle_t*)kontext;
mqttThreadContext_t *context = (mqttThreadContext_t*)handle->mqttContext;
char* payload = message->payload;
printf("Topic: %s, TopicLen: %d, Payload: %s\n", topicName, topicLen, payload);
if (strcmp(topicName, context->watchdogTopic) == 0) {
// printf("Watchdog signal received\n");
context->watchdogCounter += 1;
}
if (strcmp(topicName, context->commandTopic) == 0) {
printf("Command received\n");
if (strcmp(payload, "stop") == 0) {
context->stopSignal = true;
printf("Stop command received\n");
}
}
if (cmpTopicWithWildcard((char*)context->auditTopic, topicName) == 0) {
printf("Audit message received\n");
auditItem_t *auditItem = (auditItem_t*)malloc(sizeof(auditItem_t));
auditItem->ts = time(NULL);
auditItem->topic = strdup(topicName);
auditItem->payload = strdup(payload);
ringbufferPut(handle->ringbuffer, (void*)auditItem);
}
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
void on_disconnect(void *kontext, char *cause) {
commonThreadHandle_t *handle = (commonThreadHandle_t*)kontext;
mqttThreadContext_t *context = (mqttThreadContext_t*)handle->mqttContext;
fprintf(stderr, "disconnected from broker: %s trying to reconnect\n", cause);
MQTTClient_connectOptions options = MQTTClient_connectOptions_initializer;
while (1) {
fprintf(stderr, "trying ...\n");
int res = MQTTClient_connect(*(context->client), &options);
if (res == MQTTCLIENT_SUCCESS) {
fprintf(stderr, "Connected to MQTT broker\n");
if (MQTTClient_subscribe(*(context->client), context->watchdogTopic, 0) == MQTTCLIENT_SUCCESS) {
fprintf(stderr, "Subscribed to %s\n", context->watchdogTopic);
};
if (MQTTClient_subscribe(*(context->client), context->commandTopic, 0) == MQTTCLIENT_SUCCESS) {
fprintf(stderr, "Subscribed to %s\n", context->commandTopic);
}
if (MQTTClient_subscribe(*(context->client), context->auditTopic, 0) == MQTTCLIENT_SUCCESS) {
fprintf(stderr, "Subscribed to %s\n", context->auditTopic);
}
break;
}
sleep(1);
}
fprintf(stderr, "Connected\n");
}
void *mqttreceiver(void *ptr) {
fprintf(stderr, "mqttreceiver entered\n");
commonThreadHandle_t *handle = (commonThreadHandle_t*)ptr;
mqttThreadContext_t *context = (mqttThreadContext_t*) malloc(sizeof(mqttThreadContext_t));
handle->mqttContext = (void*)context;
context->stopSignal = false;
context->watchdogCounter = 0;
char *generatedMqttClientId;
if (! config_lookup_string(handle->config, MQTT_BROKER_KEY, &(context->broker))) {
context->broker = DEFAULT_MQTT_BROKER;
}
if (! config_lookup_string(handle->config, MQTT_CLIENTID_KEY, &(context->clientId))) {
const uint32_t sizeOfGeneratedMqttClientId = 32;
generatedMqttClientId = (char*) malloc(sizeOfGeneratedMqttClientId);
memset(generatedMqttClientId, 0, sizeOfGeneratedMqttClientId);
char myHostname[8];
gethostname(myHostname, sizeof(myHostname));
pid_t myPid = getpid();
snprintf(generatedMqttClientId, sizeOfGeneratedMqttClientId-1, "%s-%u", myHostname, myPid);
context->clientId = generatedMqttClientId;
}
if (! config_lookup_string(handle->config, MQTT_STATUS_TOPIC_KEY, &(context->statusTopic))) {
context->statusTopic = DEFAULT_MQTT_STATUS_TOPIC;
}
if (! config_lookup_string(handle->config, MQTT_HEARTBEAT_TOPIC_KEY, &(context->heartbeatTopic))) {
context->heartbeatTopic = DEFAULT_MQTT_HEARTBEAT_TOPIC;
}
if (! config_lookup_string(handle->config, MQTT_COMMAND_TOPIC_KEY, &(context->commandTopic))) {
context->commandTopic = DEFAULT_MQTT_COMMAND_TOPIC;
}
if (! config_lookup_string(handle->config, MQTT_AUDIT_TOPIC_KEY, &(context->auditTopic))) {
context->auditTopic = DEFAULT_MQTT_AUDIT_TOPIC;
}
if (! config_lookup_string(handle->config, MQTT_WATCHDOG_TOPIC_KEY, &(context->watchdogTopic))) {
context->watchdogTopic = DEFAULT_MQTT_WATCHDOG_TOPIC;
}
if (! config_lookup_int(handle->config, MQTT_WATCHDOG_GRACE_TIME_KEY, &(context->watchdogGraceTime))) {
context->watchdogGraceTime = DEFAULT_MQTT_WATCHDOG_GRACE_TIME;
}
MQTTClient client;
context->client = &client;
MQTTClient_create(&client, context->broker, context->clientId, MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
//conn_opts.username = "<<tenant_ID>>/<<username>>";
//conn_opts.password = "<<password>>";
MQTTClient_setCallbacks(client, (void*)handle, on_disconnect, on_message, NULL);
int rc;
if ((rc = MQTTClient_connect(client, &conn_opts)) == MQTTCLIENT_SUCCESS) {
fprintf(stderr, "Connected to MQTT broker\n");
if (MQTTClient_subscribe(client, context->watchdogTopic, 0) == MQTTCLIENT_SUCCESS) {
fprintf(stderr, "Subscribed to %s\n", context->watchdogTopic);
};
if (MQTTClient_subscribe(client, context->commandTopic, 0) == MQTTCLIENT_SUCCESS) {
fprintf(stderr, "Subscribed to %s\n", context->commandTopic);
}
if (MQTTClient_subscribe(client, context->auditTopic, 0) == MQTTCLIENT_SUCCESS) {
fprintf(stderr, "Subscribed to %s\n", context->auditTopic);
}
uint32_t loopCounter = 0;
while (! context->stopSignal) {
loopCounter++;
if (loopCounter > context->watchdogGraceTime) {
if (context->watchdogCounter == 0) {
fprintf(stderr, "missing watchdog signals for too longer, terminating\n");
context->stopSignal = true;
} else {
fprintf(stderr, "Watchdog fine: %d\n", context->watchdogCounter);
}
loopCounter = 0;
context->watchdogCounter = 0;
}
MQTTClient_yield();
sleep(1);
}
fprintf(stderr, "mqtt receiver thread stopped\n");
} else {
fprintf(stderr, "Failed to connect to MQTT broker, return code %d\n", rc);
}
MQTTClient_disconnect(client, 1000);
MQTTClient_destroy(&client);
if (generatedMqttClientId != NULL) {
free(generatedMqttClientId);
generatedMqttClientId = NULL;
}
if (context != NULL) {
free(context);
context = NULL;
}
return NULL;
}