influxUrl

This commit is contained in:
2021-02-10 15:26:56 +01:00
parent 8c8b8e838e
commit bc39cef704
2 changed files with 24 additions and 8 deletions

3
sink/readme.md Normal file
View File

@ -0,0 +1,3 @@
Make sure libconfig and libcurl are available.
On Linux (Debian) install libconfig-dev and libcurl4-openssl-dev,
on OpenBSD install libconfig and curl.

View File

@ -8,6 +8,7 @@
#include <string.h> #include <string.h>
#include <libconfig.h> #include <libconfig.h>
#include <curl/curl.h>
#include <sinkStruct.h> #include <sinkStruct.h>
#include <logging.h> #include <logging.h>
@ -21,7 +22,7 @@ typedef struct {
int receiveSockFd; int receiveSockFd;
} t_receiverHandle; } t_receiverHandle;
t_receiverHandle receiveHandle; t_receiverHandle receiverHandle;
typedef struct { typedef struct {
const char *influxUser; const char *influxUser;
@ -30,6 +31,7 @@ typedef struct {
uint16_t influxPort; uint16_t influxPort;
const char *influxDatabase; const char *influxDatabase;
const char *influxMeasurement; const char *influxMeasurement;
char influxUrl[1024];
} t_forwarderHandle; } t_forwarderHandle;
t_forwarderHandle forwarderHandle = { t_forwarderHandle forwarderHandle = {
@ -88,10 +90,6 @@ void deinitReceiver(t_receiverHandle *handle) {
close(handle->receiveSockFd); close(handle->receiveSockFd);
} }
void deinitForwarder(t_forwarderHandle *handle) {
}
int receiveAndVerifyMinuteBuffer(t_receiverHandle *handle, t_minuteBuffer *buf) { int receiveAndVerifyMinuteBuffer(t_receiverHandle *handle, t_minuteBuffer *buf) {
struct sockaddr_in cliaddr; struct sockaddr_in cliaddr;
socklen_t cliaddrlen = sizeof(cliaddr); socklen_t cliaddrlen = sizeof(cliaddr);
@ -174,9 +172,22 @@ int initForwarder(config_t *cfg, t_forwarderHandle *handle) {
return -3; return -3;
} }
int res = snprintf(handle->influxUrl, sizeof(handle->influxUrl,
"http://%s:%d/write?db=%s&precision=s",
handle->influxServer, handle->influxPort, handle->influxDatabase);
if (res > sizeof(handle->influxUrl)) {
logmsg(LOG_ERR, "influxUrl has not enough space");
return -4;
}
logmsg(LOG_INFO, "influxUrl is %s", handle->influxUrl);
return 0; return 0;
} }
void deinitForwarder(t_forwarderHandle *handle) {
}
int forwardMinuteBuffer(t_forwarderHandle *handle, t_minuteBuffer *buf) { int forwardMinuteBuffer(t_forwarderHandle *handle, t_minuteBuffer *buf) {
logmsg(LOG_INFO, "DeviceId: %s", buf->s.deviceId); logmsg(LOG_INFO, "DeviceId: %s", buf->s.deviceId);
logmsg(LOG_INFO, "Location: %s", buf->s.location); logmsg(LOG_INFO, "Location: %s", buf->s.location);
@ -186,13 +197,14 @@ int forwardMinuteBuffer(t_forwarderHandle *handle, t_minuteBuffer *buf) {
return 0; return 0;
} }
int main() { int main() {
if (0 != readConfig(&cfg)) { if (0 != readConfig(&cfg)) {
logmsg(LOG_ERR, "error when reading configuration"); logmsg(LOG_ERR, "error when reading configuration");
exit(-1); exit(-1);
} }
if (0 != initReceiver(&cfg, &receiveHandle)) { if (0 != initReceiver(&cfg, &receiverHandle)) {
logmsg(LOG_ERR, "error when initializing receiver"); logmsg(LOG_ERR, "error when initializing receiver");
exit(-2); exit(-2);
} }
@ -202,10 +214,11 @@ int main() {
exit(-2); exit(-2);
} }
while (1) { while (1) {
t_minuteBuffer buf; t_minuteBuffer buf;
if (receiveAndVerifyMinuteBuffer(&receiveHandle, &buf) < 0) { if (receiveAndVerifyMinuteBuffer(&receiverHandle, &buf) < 0) {
logmsg(LOG_ERR, "error in receiveAndVerify"); logmsg(LOG_ERR, "error in receiveAndVerify");
continue; continue;
} }
@ -217,6 +230,6 @@ int main() {
deinitForwarder(&forwarderHandle); deinitForwarder(&forwarderHandle);
deinitReceiver(&receiveHandle); deinitReceiver(&receiverHandle);
config_destroy(&cfg); config_destroy(&cfg);
} }