some more mqtt stuff
This commit is contained in:
parent
034ad78ceb
commit
e1a8e7d63e
14
handleType.h
Normal file
14
handleType.h
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
#ifndef _HANDLETYPE_H_
|
||||||
|
#define _HANDLETYPE_H_
|
||||||
|
|
||||||
|
#include <libconfig.h>
|
||||||
|
#include "ringbuffer.h"
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
config_t *config;
|
||||||
|
ringbuffer_t *ringbuffer;
|
||||||
|
void* context;
|
||||||
|
} commonThreadHandle_t;
|
||||||
|
|
||||||
|
#endif // _HANDLETYPE_H_
|
@ -8,13 +8,19 @@
|
|||||||
#include "ringbuffer.h"
|
#include "ringbuffer.h"
|
||||||
#include "logging.h"
|
#include "logging.h"
|
||||||
#include "mqttreceiver.h"
|
#include "mqttreceiver.h"
|
||||||
|
#include "handleType.h"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
extern char VERSION[];
|
extern char VERSION[];
|
||||||
extern uint32_t REFCNT;
|
extern uint32_t REFCNT;
|
||||||
|
|
||||||
|
|
||||||
config_t cfg;
|
config_t cfg;
|
||||||
t_ringbuffer ringbuffer;
|
ringbuffer_t ringbuffer;
|
||||||
|
|
||||||
|
commonThreadHandle_t commonThreadHandle;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void readConfig() {
|
void readConfig() {
|
||||||
@ -32,15 +38,26 @@ int main (void) {
|
|||||||
fprintf(stderr, "VERSION: %s, REFCNT: %u\n", VERSION, REFCNT);
|
fprintf(stderr, "VERSION: %s, REFCNT: %u\n", VERSION, REFCNT);
|
||||||
|
|
||||||
readConfig();
|
readConfig();
|
||||||
|
|
||||||
ringbufferInit(&ringbuffer);
|
ringbufferInit(&ringbuffer);
|
||||||
mqttreceiverInit(&cfg, &ringbuffer);
|
|
||||||
|
commonThreadHandle.config = &cfg;
|
||||||
|
commonThreadHandle.ringbuffer = &ringbuffer;
|
||||||
|
|
||||||
|
|
||||||
|
pthread_t mqttThread;
|
||||||
|
int err = pthread_create(&mqttThread, NULL, mqttreceiver, (void*) &commonThreadHandle);
|
||||||
|
if (err != 0) {
|
||||||
|
fprintf(stderr, "Unable to create mqtt receiver thread: %d\n", err);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
fprintf(stderr, "started.\n");
|
fprintf(stderr, "started.\n");
|
||||||
|
|
||||||
while (1) {
|
pthread_join(mqttThread, NULL);
|
||||||
sleep(1);
|
fprintf(stderr, "mqtt receiver thread joined\n");
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// will never be reached
|
// will never be reached
|
||||||
config_destroy(&cfg);
|
config_destroy(&cfg);
|
||||||
|
191
mqttreceiver.c
191
mqttreceiver.c
@ -5,6 +5,8 @@
|
|||||||
#include <MQTTClient.h>
|
#include <MQTTClient.h>
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
|
#include <stdbool.h>
|
||||||
|
#include "handleType.h"
|
||||||
#include "ringbuffer.h"
|
#include "ringbuffer.h"
|
||||||
|
|
||||||
|
|
||||||
@ -12,24 +14,57 @@ const char MQTT_BROKER_KEY[] = "mqttBroker";
|
|||||||
const char DEFAULT_MQTT_BROKER[] = "127.0.0.1:1883";
|
const char DEFAULT_MQTT_BROKER[] = "127.0.0.1:1883";
|
||||||
const char MQTT_CLIENTID_KEY[] = "mqttClientId";
|
const char MQTT_CLIENTID_KEY[] = "mqttClientId";
|
||||||
const char *generatedMqttClientId = NULL;
|
const char *generatedMqttClientId = NULL;
|
||||||
|
const char MQTT_STATUS_TOPIC_KEY[] = "mqttStatusTopic";
|
||||||
|
const char DEFAULT_MQTT_STATUS_TOPIC[] = "Iot/MqttAuditing/Status";
|
||||||
|
const char MQTT_HEARTBEAT_TOPIC_KEY[] = "mqttHeartbeatTopic";
|
||||||
|
const char DEFAULT_MQTT_HEARTBEAT_TOPIC[] = "Iot/MqttAuditing/Heartbeat";
|
||||||
|
const char MQTT_COMMAND_TOPIC_KEY[] = "mqttCommandTopic";
|
||||||
|
const char DEFAULT_MQTT_COMMAND_TOPIC[] = "Iot/MqttAuditing/Command";
|
||||||
|
const char MQTT_AUDIT_TOPIC_KEY[] = "mqttAuditTopic";
|
||||||
|
const char DEFAULT_MQTT_AUDIT_TOPIC[] = "Iot/MqttAuditing/Audit";
|
||||||
|
const char MQTT_WATCHDOG_TOPIC_KEY[] = "mqttWatchdogTopic";
|
||||||
|
const char DEFAULT_MQTT_WATCHDOG_TOPIC[] = "Iot/MqttAuditing/Watchdog";
|
||||||
|
const char MQTT_WATCHDOG_GRACE_TIME_KEY[] = "mqttWatchdogGraceTime";
|
||||||
|
const int DEFAULT_MQTT_WATCHDOG_GRACE_TIME = 15;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
t_ringbuffer *ringbuffer;
|
const char *broker;
|
||||||
char *mqttBroker;
|
const char *clientId;
|
||||||
char *mqttClientId;
|
const char *statusTopic;
|
||||||
} t_mqttThreadHandle;
|
const char *watchdogTopic;
|
||||||
|
const char *heartbeatTopic;
|
||||||
|
const char *commandTopic;
|
||||||
|
const char *auditTopic;
|
||||||
|
int watchdogGraceTime;
|
||||||
|
bool stopSignal;
|
||||||
|
uint32_t watchdogCounter;
|
||||||
|
} mqttThreadContext_t;
|
||||||
|
|
||||||
t_mqttThreadHandle mqttThreadHandle;
|
|
||||||
|
|
||||||
pthread_t mqttThread;
|
|
||||||
|
|
||||||
int on_message(void *context, char *topicName, int topicLen, MQTTClient_message *message) {
|
|
||||||
t_mqttThreadHandle *handle = (t_mqttThreadHandle*)context;
|
|
||||||
|
|
||||||
|
int on_message(void *kontext, char *topicName, int topicLen, MQTTClient_message *message) {
|
||||||
|
commonThreadHandle_t *handle = (commonThreadHandle_t*)kontext;
|
||||||
|
mqttThreadContext_t *context = (mqttThreadContext_t*)handle->context;
|
||||||
|
|
||||||
|
printf("Topic: %s, TopicLen: %d\n", topicName, topicLen);
|
||||||
char* payload = message->payload;
|
char* payload = message->payload;
|
||||||
printf("Received operation %s\n", payload);
|
printf("Received operation: %s\n", payload);
|
||||||
|
|
||||||
|
int curTopicLen = (topicLen == 0) ? strlen(topicName) : topicLen;
|
||||||
|
if (strncmp(topicName, context->watchdogTopic, curTopicLen) == 0) {
|
||||||
|
printf("Watchdog signal received\n");
|
||||||
|
context->watchdogCounter += 1;
|
||||||
|
} else if (strncmp(topicName, context->commandTopic, curTopicLen) == 0) {
|
||||||
|
printf("Command received\n");
|
||||||
|
if (strcmp(payload, "stop") == 0) {
|
||||||
|
context->stopSignal = true;
|
||||||
|
printf("Stop command received\n");
|
||||||
|
}
|
||||||
|
} else if (strncmp(topicName, context->auditTopic, curTopicLen) == 0) {
|
||||||
|
printf("Audit message received\n");
|
||||||
|
}
|
||||||
|
|
||||||
MQTTClient_freeMessage(&message);
|
MQTTClient_freeMessage(&message);
|
||||||
MQTTClient_free(topicName);
|
MQTTClient_free(topicName);
|
||||||
return 1;
|
return 1;
|
||||||
@ -39,13 +74,59 @@ int on_message(void *context, char *topicName, int topicLen, MQTTClient_message
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
void *mqttreceiverRun(void *ptr) {
|
void *mqttreceiver(void *ptr) {
|
||||||
fprintf(stderr, "mqttreceiverRun entered\n");
|
|
||||||
|
|
||||||
t_mqttThreadHandle *handle = (t_mqttThreadHandle*)ptr;
|
fprintf(stderr, "mqttreceiver entered\n");
|
||||||
|
|
||||||
|
commonThreadHandle_t *handle = (commonThreadHandle_t*)ptr;
|
||||||
|
mqttThreadContext_t *context = (mqttThreadContext_t*) malloc(sizeof(mqttThreadContext_t));
|
||||||
|
handle->context = (void*)context;
|
||||||
|
context->stopSignal = false;
|
||||||
|
context->watchdogCounter = 0;
|
||||||
|
|
||||||
|
char *generatedMqttClientId;
|
||||||
|
|
||||||
|
if (! config_lookup_string(handle->config, MQTT_BROKER_KEY, &(context->broker))) {
|
||||||
|
context->broker = DEFAULT_MQTT_BROKER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (! config_lookup_string(handle->config, MQTT_CLIENTID_KEY, &(context->clientId))) {
|
||||||
|
const uint32_t sizeOfGeneratedMqttClientId = 32;
|
||||||
|
generatedMqttClientId = (char*) malloc(sizeOfGeneratedMqttClientId);
|
||||||
|
memset(generatedMqttClientId, 0, sizeOfGeneratedMqttClientId);
|
||||||
|
char myHostname[8];
|
||||||
|
gethostname(myHostname, sizeof(myHostname));
|
||||||
|
pid_t myPid = getpid();
|
||||||
|
snprintf(generatedMqttClientId, sizeOfGeneratedMqttClientId-1, "%s-%u", myHostname, myPid);
|
||||||
|
context->clientId = generatedMqttClientId;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (! config_lookup_string(handle->config, MQTT_STATUS_TOPIC_KEY, &(context->statusTopic))) {
|
||||||
|
context->statusTopic = DEFAULT_MQTT_STATUS_TOPIC;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (! config_lookup_string(handle->config, MQTT_HEARTBEAT_TOPIC_KEY, &(context->heartbeatTopic))) {
|
||||||
|
context->heartbeatTopic = DEFAULT_MQTT_HEARTBEAT_TOPIC;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (! config_lookup_string(handle->config, MQTT_COMMAND_TOPIC_KEY, &(context->commandTopic))) {
|
||||||
|
context->commandTopic = DEFAULT_MQTT_COMMAND_TOPIC;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (! config_lookup_string(handle->config, MQTT_AUDIT_TOPIC_KEY, &(context->auditTopic))) {
|
||||||
|
context->auditTopic = DEFAULT_MQTT_AUDIT_TOPIC;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (! config_lookup_string(handle->config, MQTT_WATCHDOG_TOPIC_KEY, &(context->watchdogTopic))) {
|
||||||
|
context->watchdogTopic = DEFAULT_MQTT_WATCHDOG_TOPIC;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (! config_lookup_int(handle->config, MQTT_WATCHDOG_GRACE_TIME_KEY, &(context->watchdogGraceTime))) {
|
||||||
|
context->watchdogGraceTime = DEFAULT_MQTT_WATCHDOG_GRACE_TIME;
|
||||||
|
}
|
||||||
|
|
||||||
MQTTClient client;
|
MQTTClient client;
|
||||||
MQTTClient_create(&client, handle->mqttBroker, handle->mqttClientId, MQTTCLIENT_PERSISTENCE_NONE, NULL);
|
MQTTClient_create(&client, context->broker, context->clientId, MQTTCLIENT_PERSISTENCE_NONE, NULL);
|
||||||
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
|
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
|
||||||
//conn_opts.username = "<<tenant_ID>>/<<username>>";
|
//conn_opts.username = "<<tenant_ID>>/<<username>>";
|
||||||
//conn_opts.password = "<<password>>";
|
//conn_opts.password = "<<password>>";
|
||||||
@ -55,56 +136,52 @@ void *mqttreceiverRun(void *ptr) {
|
|||||||
int rc;
|
int rc;
|
||||||
if ((rc = MQTTClient_connect(client, &conn_opts)) == MQTTCLIENT_SUCCESS) {
|
if ((rc = MQTTClient_connect(client, &conn_opts)) == MQTTCLIENT_SUCCESS) {
|
||||||
fprintf(stderr, "Connected to MQTT broker\n");
|
fprintf(stderr, "Connected to MQTT broker\n");
|
||||||
MQTTClient_subscribe(client, "s/ds", 0);
|
if (MQTTClient_subscribe(client, context->watchdogTopic, 0) == MQTTCLIENT_SUCCESS) {
|
||||||
|
fprintf(stderr, "Subscribed to %s\n", context->watchdogTopic);
|
||||||
|
};
|
||||||
|
|
||||||
for (;;) {
|
if (MQTTClient_subscribe(client, context->commandTopic, 0) == MQTTCLIENT_SUCCESS) {
|
||||||
MQTTClient_yield();
|
fprintf(stderr, "Subscribed to %s\n", context->commandTopic);
|
||||||
sleep(3);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (MQTTClient_subscribe(client, context->auditTopic, 0) == MQTTCLIENT_SUCCESS) {
|
||||||
|
fprintf(stderr, "Subscribed to %s\n", context->auditTopic);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t loopCounter = 0;
|
||||||
|
while (! context->stopSignal) {
|
||||||
|
loopCounter++;
|
||||||
|
if (loopCounter > context->watchdogGraceTime) {
|
||||||
|
if (context->watchdogCounter == 0) {
|
||||||
|
fprintf(stderr, "missing watchdog signals for too longer, terminating\n");
|
||||||
|
context->stopSignal = true;
|
||||||
|
} else {
|
||||||
|
fprintf(stderr, "Watchdog fine: %d\n", context->watchdogCounter);
|
||||||
|
}
|
||||||
|
loopCounter = 0;
|
||||||
|
context->watchdogCounter = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
MQTTClient_yield();
|
||||||
|
sleep(1);
|
||||||
|
}
|
||||||
|
fprintf(stderr, "mqtt receiver thread stopped\n");
|
||||||
|
} else {
|
||||||
|
fprintf(stderr, "Failed to connect to MQTT broker, return code %d\n", rc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
fprintf(stderr, "Failed to connect to MQTT broker, return code %d\n", rc);
|
|
||||||
|
|
||||||
MQTTClient_disconnect(client, 1000);
|
MQTTClient_disconnect(client, 1000);
|
||||||
MQTTClient_destroy(&client);
|
MQTTClient_destroy(&client);
|
||||||
|
|
||||||
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
int mqttreceiverInit(config_t *pCfg, t_ringbuffer *pRingbuffer) {
|
|
||||||
mqttThreadHandle.ringbuffer = pRingbuffer;
|
|
||||||
|
|
||||||
if (! config_lookup_string(pCfg, MQTT_BROKER_KEY, &(mqttThreadHandle.mqttBroker))) {
|
|
||||||
mqttThreadHandle.mqttBroker = DEFAULT_MQTT_BROKER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (! config_lookup_string(pCfg, MQTT_CLIENTID_KEY, &(mqttThreadHandle.mqttClientId))) {
|
|
||||||
const uint32_t sizeOfGeneratedMqttClientId = 32;
|
|
||||||
generatedMqttClientId = (char*) malloc(sizeOfGeneratedMqttClientId);
|
|
||||||
memset(generatedMqttClientId, 0, sizeOfGeneratedMqttClientId);
|
|
||||||
char myHostname[8];
|
|
||||||
gethostname(myHostname, sizeof(myHostname));
|
|
||||||
pid_t myPid = getpid();
|
|
||||||
snprintf(generatedMqttClientId, sizeOfGeneratedMqttClientId-1, "%s-%u", myHostname, myPid);
|
|
||||||
mqttThreadHandle.mqttClientId = generatedMqttClientId;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
fprintf(stderr, "Create mqtt receiver thread\n");
|
|
||||||
int r = pthread_create(&mqttThread, NULL, mqttreceiverRun, (void*) &mqttThreadHandle);
|
|
||||||
fprintf(stderr, "pthread_create returns: %d\n", r);
|
|
||||||
|
|
||||||
/*
|
|
||||||
pthread_join(mqttThread, NULL);
|
|
||||||
|
|
||||||
|
|
||||||
if (generatedMqttClientId != NULL) {
|
if (generatedMqttClientId != NULL) {
|
||||||
free(generatedMqttClientId);
|
free(generatedMqttClientId);
|
||||||
|
generatedMqttClientId = NULL;
|
||||||
|
}
|
||||||
|
if (context != NULL) {
|
||||||
|
free(context);
|
||||||
|
context = NULL;
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
||||||
return 0;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,9 +1,6 @@
|
|||||||
#ifndef _MQTTRECEIVER_H_
|
#ifndef _MQTTRECEIVER_H_
|
||||||
#define _MQTTRECEIVER_H_
|
#define _MQTTRECEIVER_H_
|
||||||
|
|
||||||
#include <libconfig.h>
|
void *mqttreceiver(void *ptr);
|
||||||
#include "ringbuffer.h"
|
|
||||||
|
|
||||||
int mqttreceiverInit(config_t *pCfg, t_ringbuffer *pRingbuffer);
|
|
||||||
|
|
||||||
#endif // _MQTTRECEIVER_H_
|
#endif // _MQTTRECEIVER_H_
|
@ -5,7 +5,7 @@
|
|||||||
#include "ringbuffer.h"
|
#include "ringbuffer.h"
|
||||||
|
|
||||||
|
|
||||||
void ringbufferInit(t_ringbuffer *handle) {
|
void ringbufferInit(ringbuffer_t *handle) {
|
||||||
handle->bufferReadIdx = 0;
|
handle->bufferReadIdx = 0;
|
||||||
handle->bufferWriteIdx = 0;
|
handle->bufferWriteIdx = 0;
|
||||||
pthread_mutex_init(&(handle->eventMutex), NULL);
|
pthread_mutex_init(&(handle->eventMutex), NULL);
|
||||||
@ -13,7 +13,7 @@ void ringbufferInit(t_ringbuffer *handle) {
|
|||||||
fprintf(stderr, "ringbuffer initialized\n");
|
fprintf(stderr, "ringbuffer initialized\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
void ringbufferPut(t_ringbuffer *handle, void *f) {
|
void ringbufferPut(ringbuffer_t *handle, void *f) {
|
||||||
if (handle->bufferWriteIdx == (BUFFER_SIZE - 1)) {
|
if (handle->bufferWriteIdx == (BUFFER_SIZE - 1)) {
|
||||||
while (handle->bufferReadIdx == BUFFER_SIZE);
|
while (handle->bufferReadIdx == BUFFER_SIZE);
|
||||||
} else {
|
} else {
|
||||||
@ -33,7 +33,7 @@ void ringbufferPut(t_ringbuffer *handle, void *f) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void *ringbufferGet(t_ringbuffer *handle) {
|
void *ringbufferGet(ringbuffer_t *handle) {
|
||||||
if (handle->bufferReadIdx == handle->bufferWriteIdx) {
|
if (handle->bufferReadIdx == handle->bufferWriteIdx) {
|
||||||
pthread_mutex_lock(&(handle->eventMutex));
|
pthread_mutex_lock(&(handle->eventMutex));
|
||||||
pthread_cond_wait(&(handle->eventSignal), &(handle->eventMutex));
|
pthread_cond_wait(&(handle->eventSignal), &(handle->eventMutex));
|
||||||
|
@ -13,10 +13,10 @@ typedef struct {
|
|||||||
uint32_t bufferWriteIdx;
|
uint32_t bufferWriteIdx;
|
||||||
pthread_mutex_t eventMutex;
|
pthread_mutex_t eventMutex;
|
||||||
pthread_cond_t eventSignal;
|
pthread_cond_t eventSignal;
|
||||||
} t_ringbuffer;
|
} ringbuffer_t;
|
||||||
|
|
||||||
void ringbufferInit(t_ringbuffer *handle);
|
void ringbufferInit(ringbuffer_t *handle);
|
||||||
void ringbufferPut(t_ringbuffer *handle, void *f);
|
void ringbufferPut(ringbuffer_t *handle, void *f);
|
||||||
void *ringbufferGet(t_ringbuffer *handle);
|
void *ringbufferGet(ringbuffer_t *handle);
|
||||||
|
|
||||||
#endif // _RINGBUFFER_H_
|
#endif // _RINGBUFFER_H_
|
Loading…
x
Reference in New Issue
Block a user