#include #include #include #include #include #include #include #include #include "handleType.h" #include "ringbuffer.h" const char MQTT_BROKER_KEY[] = "mqttBroker"; const char DEFAULT_MQTT_BROKER[] = "127.0.0.1:1883"; const char MQTT_CLIENTID_KEY[] = "mqttClientId"; const char *generatedMqttClientId = NULL; const char MQTT_STATUS_TOPIC_KEY[] = "mqttStatusTopic"; const char DEFAULT_MQTT_STATUS_TOPIC[] = "Iot/MqttAuditing/Status"; const char MQTT_HEARTBEAT_TOPIC_KEY[] = "mqttHeartbeatTopic"; const char DEFAULT_MQTT_HEARTBEAT_TOPIC[] = "Iot/MqttAuditing/Heartbeat"; const char MQTT_COMMAND_TOPIC_KEY[] = "mqttCommandTopic"; const char DEFAULT_MQTT_COMMAND_TOPIC[] = "Iot/MqttAuditing/Command"; const char MQTT_AUDIT_TOPIC_KEY[] = "mqttAuditTopic"; const char DEFAULT_MQTT_AUDIT_TOPIC[] = "Iot/MqttAuditing/Audit"; const char MQTT_WATCHDOG_TOPIC_KEY[] = "mqttWatchdogTopic"; const char DEFAULT_MQTT_WATCHDOG_TOPIC[] = "Iot/MqttAuditing/Watchdog"; const char MQTT_WATCHDOG_GRACE_TIME_KEY[] = "mqttWatchdogGraceTime"; const int DEFAULT_MQTT_WATCHDOG_GRACE_TIME = 15; typedef struct { const char *broker; const char *clientId; const char *statusTopic; const char *watchdogTopic; const char *heartbeatTopic; const char *commandTopic; const char *auditTopic; int watchdogGraceTime; bool stopSignal; uint32_t watchdogCounter; } mqttThreadContext_t; int on_message(void *kontext, char *topicName, int topicLen, MQTTClient_message *message) { commonThreadHandle_t *handle = (commonThreadHandle_t*)kontext; mqttThreadContext_t *context = (mqttThreadContext_t*)handle->context; printf("Topic: %s, TopicLen: %d\n", topicName, topicLen); char* payload = message->payload; printf("Received operation: %s\n", payload); int curTopicLen = (topicLen == 0) ? strlen(topicName) : topicLen; if (strncmp(topicName, context->watchdogTopic, curTopicLen) == 0) { printf("Watchdog signal received\n"); context->watchdogCounter += 1; } else if (strncmp(topicName, context->commandTopic, curTopicLen) == 0) { printf("Command received\n"); if (strcmp(payload, "stop") == 0) { context->stopSignal = true; printf("Stop command received\n"); } } else if (strncmp(topicName, context->auditTopic, curTopicLen) == 0) { printf("Audit message received\n"); } MQTTClient_freeMessage(&message); MQTTClient_free(topicName); return 1; } void *mqttreceiver(void *ptr) { fprintf(stderr, "mqttreceiver entered\n"); commonThreadHandle_t *handle = (commonThreadHandle_t*)ptr; mqttThreadContext_t *context = (mqttThreadContext_t*) malloc(sizeof(mqttThreadContext_t)); handle->context = (void*)context; context->stopSignal = false; context->watchdogCounter = 0; char *generatedMqttClientId; if (! config_lookup_string(handle->config, MQTT_BROKER_KEY, &(context->broker))) { context->broker = DEFAULT_MQTT_BROKER; } if (! config_lookup_string(handle->config, MQTT_CLIENTID_KEY, &(context->clientId))) { const uint32_t sizeOfGeneratedMqttClientId = 32; generatedMqttClientId = (char*) malloc(sizeOfGeneratedMqttClientId); memset(generatedMqttClientId, 0, sizeOfGeneratedMqttClientId); char myHostname[8]; gethostname(myHostname, sizeof(myHostname)); pid_t myPid = getpid(); snprintf(generatedMqttClientId, sizeOfGeneratedMqttClientId-1, "%s-%u", myHostname, myPid); context->clientId = generatedMqttClientId; } if (! config_lookup_string(handle->config, MQTT_STATUS_TOPIC_KEY, &(context->statusTopic))) { context->statusTopic = DEFAULT_MQTT_STATUS_TOPIC; } if (! config_lookup_string(handle->config, MQTT_HEARTBEAT_TOPIC_KEY, &(context->heartbeatTopic))) { context->heartbeatTopic = DEFAULT_MQTT_HEARTBEAT_TOPIC; } if (! config_lookup_string(handle->config, MQTT_COMMAND_TOPIC_KEY, &(context->commandTopic))) { context->commandTopic = DEFAULT_MQTT_COMMAND_TOPIC; } if (! config_lookup_string(handle->config, MQTT_AUDIT_TOPIC_KEY, &(context->auditTopic))) { context->auditTopic = DEFAULT_MQTT_AUDIT_TOPIC; } if (! config_lookup_string(handle->config, MQTT_WATCHDOG_TOPIC_KEY, &(context->watchdogTopic))) { context->watchdogTopic = DEFAULT_MQTT_WATCHDOG_TOPIC; } if (! config_lookup_int(handle->config, MQTT_WATCHDOG_GRACE_TIME_KEY, &(context->watchdogGraceTime))) { context->watchdogGraceTime = DEFAULT_MQTT_WATCHDOG_GRACE_TIME; } MQTTClient client; MQTTClient_create(&client, context->broker, context->clientId, MQTTCLIENT_PERSISTENCE_NONE, NULL); MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; //conn_opts.username = "<>/<>"; //conn_opts.password = "<>"; MQTTClient_setCallbacks(client, (void*)handle, NULL, on_message, NULL); int rc; if ((rc = MQTTClient_connect(client, &conn_opts)) == MQTTCLIENT_SUCCESS) { fprintf(stderr, "Connected to MQTT broker\n"); if (MQTTClient_subscribe(client, context->watchdogTopic, 0) == MQTTCLIENT_SUCCESS) { fprintf(stderr, "Subscribed to %s\n", context->watchdogTopic); }; if (MQTTClient_subscribe(client, context->commandTopic, 0) == MQTTCLIENT_SUCCESS) { fprintf(stderr, "Subscribed to %s\n", context->commandTopic); } if (MQTTClient_subscribe(client, context->auditTopic, 0) == MQTTCLIENT_SUCCESS) { fprintf(stderr, "Subscribed to %s\n", context->auditTopic); } uint32_t loopCounter = 0; while (! context->stopSignal) { loopCounter++; if (loopCounter > context->watchdogGraceTime) { if (context->watchdogCounter == 0) { fprintf(stderr, "missing watchdog signals for too longer, terminating\n"); context->stopSignal = true; } else { fprintf(stderr, "Watchdog fine: %d\n", context->watchdogCounter); } loopCounter = 0; context->watchdogCounter = 0; } MQTTClient_yield(); sleep(1); } fprintf(stderr, "mqtt receiver thread stopped\n"); } else { fprintf(stderr, "Failed to connect to MQTT broker, return code %d\n", rc); } MQTTClient_disconnect(client, 1000); MQTTClient_destroy(&client); if (generatedMqttClientId != NULL) { free(generatedMqttClientId); generatedMqttClientId = NULL; } if (context != NULL) { free(context); context = NULL; } return NULL; }