mqttreceiver added

This commit is contained in:
2020-06-17 13:53:54 +02:00
parent 0e9b7c8e22
commit 034ad78ceb
8 changed files with 295 additions and 7 deletions

110
mqttreceiver.c Normal file
View File

@ -0,0 +1,110 @@
#include <libconfig.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <MQTTClient.h>
#include <pthread.h>
#include <stdio.h>
#include "ringbuffer.h"
const char MQTT_BROKER_KEY[] = "mqttBroker";
const char DEFAULT_MQTT_BROKER[] = "127.0.0.1:1883";
const char MQTT_CLIENTID_KEY[] = "mqttClientId";
const char *generatedMqttClientId = NULL;
typedef struct {
t_ringbuffer *ringbuffer;
char *mqttBroker;
char *mqttClientId;
} t_mqttThreadHandle;
t_mqttThreadHandle mqttThreadHandle;
pthread_t mqttThread;
int on_message(void *context, char *topicName, int topicLen, MQTTClient_message *message) {
t_mqttThreadHandle *handle = (t_mqttThreadHandle*)context;
char* payload = message->payload;
printf("Received operation %s\n", payload);
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
void *mqttreceiverRun(void *ptr) {
fprintf(stderr, "mqttreceiverRun entered\n");
t_mqttThreadHandle *handle = (t_mqttThreadHandle*)ptr;
MQTTClient client;
MQTTClient_create(&client, handle->mqttBroker, handle->mqttClientId, MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
//conn_opts.username = "<<tenant_ID>>/<<username>>";
//conn_opts.password = "<<password>>";
MQTTClient_setCallbacks(client, (void*)handle, NULL, on_message, NULL);
int rc;
if ((rc = MQTTClient_connect(client, &conn_opts)) == MQTTCLIENT_SUCCESS) {
fprintf(stderr, "Connected to MQTT broker\n");
MQTTClient_subscribe(client, "s/ds", 0);
for (;;) {
MQTTClient_yield();
sleep(3);
}
}
fprintf(stderr, "Failed to connect to MQTT broker, return code %d\n", rc);
MQTTClient_disconnect(client, 1000);
MQTTClient_destroy(&client);
return rc;
}
int mqttreceiverInit(config_t *pCfg, t_ringbuffer *pRingbuffer) {
mqttThreadHandle.ringbuffer = pRingbuffer;
if (! config_lookup_string(pCfg, MQTT_BROKER_KEY, &(mqttThreadHandle.mqttBroker))) {
mqttThreadHandle.mqttBroker = DEFAULT_MQTT_BROKER;
}
if (! config_lookup_string(pCfg, MQTT_CLIENTID_KEY, &(mqttThreadHandle.mqttClientId))) {
const uint32_t sizeOfGeneratedMqttClientId = 32;
generatedMqttClientId = (char*) malloc(sizeOfGeneratedMqttClientId);
memset(generatedMqttClientId, 0, sizeOfGeneratedMqttClientId);
char myHostname[8];
gethostname(myHostname, sizeof(myHostname));
pid_t myPid = getpid();
snprintf(generatedMqttClientId, sizeOfGeneratedMqttClientId-1, "%s-%u", myHostname, myPid);
mqttThreadHandle.mqttClientId = generatedMqttClientId;
}
fprintf(stderr, "Create mqtt receiver thread\n");
int r = pthread_create(&mqttThread, NULL, mqttreceiverRun, (void*) &mqttThreadHandle);
fprintf(stderr, "pthread_create returns: %d\n", r);
/*
pthread_join(mqttThread, NULL);
if (generatedMqttClientId != NULL) {
free(generatedMqttClientId);
}
*/
return 0;
}