diff --git a/sink/Makefile b/sink/Makefile index afb71a0..e2044e2 100644 --- a/sink/Makefile +++ b/sink/Makefile @@ -8,13 +8,14 @@ logging.c C_INCLUDES = \ -I. \ --I../cube/User/Inc +-I../cube/User/Inc \ +-I/usr/include/postgresql VERSION := $(shell git rev-parse --short=8 HEAD) CC = gcc CFLAGS = $(C_INCLUDES) -Wall -Werror -std=c99 -DVERSION="\"$(VERSION)\"" -LDFLAGS = -lconfig -lcurl +LDFLAGS = -lconfig -lcurl -lpq TARGET = sink20169 UNAME_S := $(shell uname -s) diff --git a/sink/sink20169.c b/sink/sink20169.c index 89cc72d..6ccfda4 100644 --- a/sink/sink20169.c +++ b/sink/sink20169.c @@ -1,5 +1,13 @@ +/* + * vim:sw=4:ts=4:et + */ + + #define _DEFAULT_SOURCE +#define POSTGRESQL +// #define INFLUXDB + #include #include #include @@ -13,7 +21,14 @@ #include #include + +#ifdef INFLUXDB #include +#endif + +#ifdef POSTGRESQL +#include +#endif #include #include @@ -40,10 +55,17 @@ typedef struct { int receiveSockFd; } t_receiverHandle; +#ifdef POSTGRESQL + #define NUM_OF_STMT_PARAMS 4 +#endif + + typedef struct { t_configHandle *configHandle; - uint32_t lowerBound; - uint32_t upperBound; + int32_t lowerBound; + int32_t upperBound; + +#ifdef INFLUXDB const char *influxUser; const char *influxPass; const char *influxServer; @@ -51,6 +73,12 @@ typedef struct { const char *influxDatabase; const char *influxMeasurement; char influxUrl[1024]; +#endif + +#ifdef POSTGRESQL + const char *postgresqlConnInfo; + PGconn *conn; +#endif } t_forwarderHandle; bool verbose = false; @@ -206,6 +234,7 @@ int receiveAndVerifyMinuteBuffer(t_receiverHandle *handle, t_minuteBuffer *buf) int initForwarder(t_configHandle *configHandle, t_forwarderHandle *handle) { handle->configHandle = configHandle; +#ifdef INFLUXDB handle->influxUser = NULL; handle->influxPass = NULL; handle->influxServer = NULL; @@ -247,6 +276,18 @@ int initForwarder(t_configHandle *configHandle, t_forwarderHandle *handle) { return -4; } logmsg(LOG_INFO, "influxUrl is %s", handle->influxUrl); +#endif // INFLUXDB + +#ifdef POSTGRESQL + handle->postgresqlConnInfo = NULL; + config_lookup_string(&(configHandle->cfg), "postgresqlConnInfo", &(handle->postgresqlConnInfo)); + if (! handle->postgresqlConnInfo) { + logmsg(LOG_ERR, "no postgresql connInfo configured"); + return -1; + } + + handle->conn = NULL; +#endif // POSTGRESQL handle->lowerBound = 45000; config_lookup_int(&(configHandle->cfg), "lowerBound", &(handle->lowerBound)); @@ -261,6 +302,7 @@ void deinitForwarder(t_forwarderHandle *handle) { } +#ifdef INFLUXDB int httpPostRequest(char *url, const char *user, const char *pass, char *payload) { CURL *curl = curl_easy_init(); if (! curl) { @@ -287,6 +329,91 @@ int httpPostRequest(char *url, const char *user, const char *pass, char *payload return 0; } +int sendToDB(t_forwarderHandle *handle, const char *location, const char *deviceId, + uint32_t frequency, uint64_t timestamp) { + int frequency_before_point = frequency / 1000; + int frequency_behind_point = frequency - (frequency_before_point * 1000); + char payload[256]; + int res = snprintf(payload, sizeof(payload), + "%s,valid=1,location=%s,host=%s freq=%d.%03d" +#ifdef OpenBSD + " %llu" +#else + " %lu" +#endif + "", + handle->influxMeasurement, location, deviceId, + frequency_before_point, frequency_behind_point, + timestamp); + if (res > sizeof(payload)) { + logmsg(LOG_ERR, "payload buffer to small"); + return -1; + } + logmsg(LOG_DEBUG, "Payload: %s", payload); + res = httpPostRequest(handle->influxUrl, handle->influxUser, handle->influxPass, payload); + if (res == 0) { + logmsg(LOG_DEBUG, "Successfully sent to InfluxDB"); + } + return res; +} +#endif // INFLUXDB + + +#ifdef POSTGRESQL +int openDatabaseConnection(t_forwarderHandle *handle) { + int res = 0; + + if (! handle->conn) { + logmsg(LOG_DEBUG, "Opening connection to database"); + handle->conn = PQconnectdb(handle->postgresqlConnInfo); + } else if (PQstatus(handle->conn) != CONNECTION_OK) { + logmsg(LOG_DEBUG, "Resetting connection to database"); + PQreset(handle->conn); + } + + if (PQstatus(handle->conn) != CONNECTION_OK) { + logmsg(LOG_ERR, "Connection to database failed: %s", PQerrorMessage(handle->conn)); + res = -1; + } + + return res; +} + +int sendToDB(t_forwarderHandle *handle, const char *location, const char *deviceId, + uint32_t frequency, uint64_t timestamp) { + int retcode = 0; + if (0 == openDatabaseConnection(handle)) { + int frequency_before_point = frequency / 1000; + int frequency_behind_point = frequency - (frequency_before_point * 1000); + char stmt[256]; + int res1 = snprintf(stmt, sizeof(stmt), + "INSERT INTO mainsfrequency (time, host, location, freq) " + "VALUES(to_timestamp(%lu), '%s', '%s', %d.%03d)", + timestamp, deviceId, location, + frequency_before_point, frequency_behind_point); + if (res1 > sizeof(stmt)) { + logmsg(LOG_ERR, "stmt buffer to small"); + retcode = -1; + } else { + logmsg(LOG_DEBUG, "Statement: %s", stmt); + PGresult *res2 = PQexec(handle->conn, stmt); + if (PQresultStatus(res2) != PGRES_COMMAND_OK) { + logmsg(LOG_ERR, "Failed to insert into database (%s), data lost", + PQresultErrorMessage(res2)); + retcode = -2; + } + PQclear(res2); + } + } else { + logmsg(LOG_ERR, "No database connection available, data lost"); + retcode = -1; + } + + return retcode; +} +#endif // POSTGRESQL + + int forwardMinuteBuffer(t_forwarderHandle *handle, t_minuteBuffer *buf) { t_device *device = findDevice(handle->configHandle, buf->s.deviceId); if (device == NULL) { @@ -305,40 +432,17 @@ int forwardMinuteBuffer(t_forwarderHandle *handle, t_minuteBuffer *buf) { if (device->inactive == 0) { if ((buf->s.frequency[j] >= handle->lowerBound) && (buf->s.frequency[j] <= handle->upperBound)) { - int frequency_before_point = buf->s.frequency[j] / 1000; - int frequency_behind_point = buf->s.frequency[j] - (frequency_before_point * 1000); - - char payload[256]; - int res = snprintf(payload, sizeof(payload), - "%s,valid=1,location=%s,host=%s freq=%d.%03d" -#ifdef OpenBSD - " %llu" -#else - " %lu" -#endif - "", - handle->influxMeasurement, location, buf->s.deviceId, - frequency_before_point, frequency_behind_point, - timestamp); - if (res > sizeof(payload)) { - logmsg(LOG_ERR, "payload buffer to small"); - return -1; - } - logmsg(LOG_DEBUG, "Payload: %s", payload); - res = httpPostRequest(handle->influxUrl, handle->influxUser, handle->influxPass, payload); - if (res == 0) { - logmsg(LOG_DEBUG, "Successfully sent to InfluxDB"); - } + sendToDB(handle, location, buf->s.deviceId, buf->s.frequency[j], timestamp); } else { logmsg(LOG_ERR, "%u out of bound, ignored", buf->s.frequency[j]); } } else { - logmsg(LOG_DEBUG, "Inactive device, not sent to InfluxDB"); + logmsg(LOG_DEBUG, "Inactive device, not sent to database"); } } if (device->inactive == 0) { - logmsg(LOG_INFO, "Successfully sent whole minute to InfluxDB"); + logmsg(LOG_INFO, "Successfully sent whole minute to database"); } else { logmsg(LOG_INFO, "Not sent to database, device is marked as inactive"); } diff --git a/sink/sink20169.cfg b/sink/sink20169.cfg index dc33c60..0fd307f 100644 --- a/sink/sink20169.cfg +++ b/sink/sink20169.cfg @@ -1,9 +1,4 @@ -// influxUser = ""; -// influxPass = ""; -influxServer = "172.16.10.16"; -influxPort = 8086; -influxDatabase = "smarthome2"; -influxMeasurement = "mainsfrequency"; +postgresqlConnInfo = "host=172.16.3.32 dbname=mainscnt user=sink password=test123"; lowerBound = 45000; upperBound = 55000; @@ -12,10 +7,10 @@ receivePort = 20169; devices = ( { - inactive = true; - deviceId = "MainsCnt01"; + inactive = false; + deviceId = "testsource"; // sharedSecret must have exactly 31 characters - sharedSecret = "Uj6*uKDp@8Kvfa4g5eRMLUfVsSuqjxW"; + sharedSecret = "0123456789012345678901234567890"; // location must neither contains spaces nor commas or any other URL-special characters location = "Essen_DE"; }