start with postgres

This commit is contained in:
Wolfgang Hottgenroth 2021-04-21 18:51:12 +02:00
parent 035a288874
commit a3b9c2071e
3 changed files with 139 additions and 39 deletions

View File

@ -8,13 +8,14 @@ logging.c
C_INCLUDES = \ C_INCLUDES = \
-I. \ -I. \
-I../cube/User/Inc -I../cube/User/Inc \
-I/usr/include/postgresql
VERSION := $(shell git rev-parse --short=8 HEAD) VERSION := $(shell git rev-parse --short=8 HEAD)
CC = gcc CC = gcc
CFLAGS = $(C_INCLUDES) -Wall -Werror -std=c99 -DVERSION="\"$(VERSION)\"" CFLAGS = $(C_INCLUDES) -Wall -Werror -std=c99 -DVERSION="\"$(VERSION)\""
LDFLAGS = -lconfig -lcurl LDFLAGS = -lconfig -lcurl -lpq
TARGET = sink20169 TARGET = sink20169
UNAME_S := $(shell uname -s) UNAME_S := $(shell uname -s)

View File

@ -1,5 +1,13 @@
/*
* vim:sw=4:ts=4:et
*/
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#define POSTGRESQL
// #define INFLUXDB
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdbool.h> #include <stdbool.h>
@ -13,7 +21,14 @@
#include <pwd.h> #include <pwd.h>
#include <libconfig.h> #include <libconfig.h>
#ifdef INFLUXDB
#include <curl/curl.h> #include <curl/curl.h>
#endif
#ifdef POSTGRESQL
#include <libpq-fe.h>
#endif
#include <sinkStruct.h> #include <sinkStruct.h>
#include <logging.h> #include <logging.h>
@ -40,10 +55,17 @@ typedef struct {
int receiveSockFd; int receiveSockFd;
} t_receiverHandle; } t_receiverHandle;
#ifdef POSTGRESQL
#define NUM_OF_STMT_PARAMS 4
#endif
typedef struct { typedef struct {
t_configHandle *configHandle; t_configHandle *configHandle;
uint32_t lowerBound; int32_t lowerBound;
uint32_t upperBound; int32_t upperBound;
#ifdef INFLUXDB
const char *influxUser; const char *influxUser;
const char *influxPass; const char *influxPass;
const char *influxServer; const char *influxServer;
@ -51,6 +73,12 @@ typedef struct {
const char *influxDatabase; const char *influxDatabase;
const char *influxMeasurement; const char *influxMeasurement;
char influxUrl[1024]; char influxUrl[1024];
#endif
#ifdef POSTGRESQL
const char *postgresqlConnInfo;
PGconn *conn;
#endif
} t_forwarderHandle; } t_forwarderHandle;
bool verbose = false; bool verbose = false;
@ -206,6 +234,7 @@ int receiveAndVerifyMinuteBuffer(t_receiverHandle *handle, t_minuteBuffer *buf)
int initForwarder(t_configHandle *configHandle, t_forwarderHandle *handle) { int initForwarder(t_configHandle *configHandle, t_forwarderHandle *handle) {
handle->configHandle = configHandle; handle->configHandle = configHandle;
#ifdef INFLUXDB
handle->influxUser = NULL; handle->influxUser = NULL;
handle->influxPass = NULL; handle->influxPass = NULL;
handle->influxServer = NULL; handle->influxServer = NULL;
@ -247,6 +276,18 @@ int initForwarder(t_configHandle *configHandle, t_forwarderHandle *handle) {
return -4; return -4;
} }
logmsg(LOG_INFO, "influxUrl is %s", handle->influxUrl); logmsg(LOG_INFO, "influxUrl is %s", handle->influxUrl);
#endif // INFLUXDB
#ifdef POSTGRESQL
handle->postgresqlConnInfo = NULL;
config_lookup_string(&(configHandle->cfg), "postgresqlConnInfo", &(handle->postgresqlConnInfo));
if (! handle->postgresqlConnInfo) {
logmsg(LOG_ERR, "no postgresql connInfo configured");
return -1;
}
handle->conn = NULL;
#endif // POSTGRESQL
handle->lowerBound = 45000; handle->lowerBound = 45000;
config_lookup_int(&(configHandle->cfg), "lowerBound", &(handle->lowerBound)); config_lookup_int(&(configHandle->cfg), "lowerBound", &(handle->lowerBound));
@ -261,6 +302,7 @@ void deinitForwarder(t_forwarderHandle *handle) {
} }
#ifdef INFLUXDB
int httpPostRequest(char *url, const char *user, const char *pass, char *payload) { int httpPostRequest(char *url, const char *user, const char *pass, char *payload) {
CURL *curl = curl_easy_init(); CURL *curl = curl_easy_init();
if (! curl) { if (! curl) {
@ -287,6 +329,91 @@ int httpPostRequest(char *url, const char *user, const char *pass, char *payload
return 0; return 0;
} }
int sendToDB(t_forwarderHandle *handle, const char *location, const char *deviceId,
uint32_t frequency, uint64_t timestamp) {
int frequency_before_point = frequency / 1000;
int frequency_behind_point = frequency - (frequency_before_point * 1000);
char payload[256];
int res = snprintf(payload, sizeof(payload),
"%s,valid=1,location=%s,host=%s freq=%d.%03d"
#ifdef OpenBSD
" %llu"
#else
" %lu"
#endif
"",
handle->influxMeasurement, location, deviceId,
frequency_before_point, frequency_behind_point,
timestamp);
if (res > sizeof(payload)) {
logmsg(LOG_ERR, "payload buffer to small");
return -1;
}
logmsg(LOG_DEBUG, "Payload: %s", payload);
res = httpPostRequest(handle->influxUrl, handle->influxUser, handle->influxPass, payload);
if (res == 0) {
logmsg(LOG_DEBUG, "Successfully sent to InfluxDB");
}
return res;
}
#endif // INFLUXDB
#ifdef POSTGRESQL
int openDatabaseConnection(t_forwarderHandle *handle) {
int res = 0;
if (! handle->conn) {
logmsg(LOG_DEBUG, "Opening connection to database");
handle->conn = PQconnectdb(handle->postgresqlConnInfo);
} else if (PQstatus(handle->conn) != CONNECTION_OK) {
logmsg(LOG_DEBUG, "Resetting connection to database");
PQreset(handle->conn);
}
if (PQstatus(handle->conn) != CONNECTION_OK) {
logmsg(LOG_ERR, "Connection to database failed: %s", PQerrorMessage(handle->conn));
res = -1;
}
return res;
}
int sendToDB(t_forwarderHandle *handle, const char *location, const char *deviceId,
uint32_t frequency, uint64_t timestamp) {
int retcode = 0;
if (0 == openDatabaseConnection(handle)) {
int frequency_before_point = frequency / 1000;
int frequency_behind_point = frequency - (frequency_before_point * 1000);
char stmt[256];
int res1 = snprintf(stmt, sizeof(stmt),
"INSERT INTO mainsfrequency (time, host, location, freq) "
"VALUES(to_timestamp(%lu), '%s', '%s', %d.%03d)",
timestamp, deviceId, location,
frequency_before_point, frequency_behind_point);
if (res1 > sizeof(stmt)) {
logmsg(LOG_ERR, "stmt buffer to small");
retcode = -1;
} else {
logmsg(LOG_DEBUG, "Statement: %s", stmt);
PGresult *res2 = PQexec(handle->conn, stmt);
if (PQresultStatus(res2) != PGRES_COMMAND_OK) {
logmsg(LOG_ERR, "Failed to insert into database (%s), data lost",
PQresultErrorMessage(res2));
retcode = -2;
}
PQclear(res2);
}
} else {
logmsg(LOG_ERR, "No database connection available, data lost");
retcode = -1;
}
return retcode;
}
#endif // POSTGRESQL
int forwardMinuteBuffer(t_forwarderHandle *handle, t_minuteBuffer *buf) { int forwardMinuteBuffer(t_forwarderHandle *handle, t_minuteBuffer *buf) {
t_device *device = findDevice(handle->configHandle, buf->s.deviceId); t_device *device = findDevice(handle->configHandle, buf->s.deviceId);
if (device == NULL) { if (device == NULL) {
@ -305,40 +432,17 @@ int forwardMinuteBuffer(t_forwarderHandle *handle, t_minuteBuffer *buf) {
if (device->inactive == 0) { if (device->inactive == 0) {
if ((buf->s.frequency[j] >= handle->lowerBound) && (buf->s.frequency[j] <= handle->upperBound)) { if ((buf->s.frequency[j] >= handle->lowerBound) && (buf->s.frequency[j] <= handle->upperBound)) {
int frequency_before_point = buf->s.frequency[j] / 1000; sendToDB(handle, location, buf->s.deviceId, buf->s.frequency[j], timestamp);
int frequency_behind_point = buf->s.frequency[j] - (frequency_before_point * 1000);
char payload[256];
int res = snprintf(payload, sizeof(payload),
"%s,valid=1,location=%s,host=%s freq=%d.%03d"
#ifdef OpenBSD
" %llu"
#else
" %lu"
#endif
"",
handle->influxMeasurement, location, buf->s.deviceId,
frequency_before_point, frequency_behind_point,
timestamp);
if (res > sizeof(payload)) {
logmsg(LOG_ERR, "payload buffer to small");
return -1;
}
logmsg(LOG_DEBUG, "Payload: %s", payload);
res = httpPostRequest(handle->influxUrl, handle->influxUser, handle->influxPass, payload);
if (res == 0) {
logmsg(LOG_DEBUG, "Successfully sent to InfluxDB");
}
} else { } else {
logmsg(LOG_ERR, "%u out of bound, ignored", buf->s.frequency[j]); logmsg(LOG_ERR, "%u out of bound, ignored", buf->s.frequency[j]);
} }
} else { } else {
logmsg(LOG_DEBUG, "Inactive device, not sent to InfluxDB"); logmsg(LOG_DEBUG, "Inactive device, not sent to database");
} }
} }
if (device->inactive == 0) { if (device->inactive == 0) {
logmsg(LOG_INFO, "Successfully sent whole minute to InfluxDB"); logmsg(LOG_INFO, "Successfully sent whole minute to database");
} else { } else {
logmsg(LOG_INFO, "Not sent to database, device is marked as inactive"); logmsg(LOG_INFO, "Not sent to database, device is marked as inactive");
} }

View File

@ -1,9 +1,4 @@
// influxUser = ""; postgresqlConnInfo = "host=172.16.3.32 dbname=mainscnt user=sink password=test123";
// influxPass = "";
influxServer = "172.16.10.16";
influxPort = 8086;
influxDatabase = "smarthome2";
influxMeasurement = "mainsfrequency";
lowerBound = 45000; lowerBound = 45000;
upperBound = 55000; upperBound = 55000;
@ -12,10 +7,10 @@ receivePort = 20169;
devices = ( devices = (
{ {
inactive = true; inactive = false;
deviceId = "MainsCnt01"; deviceId = "testsource";
// sharedSecret must have exactly 31 characters // sharedSecret must have exactly 31 characters
sharedSecret = "Uj6*uKDp@8Kvfa4g5eRMLUfVsSuqjxW"; sharedSecret = "0123456789012345678901234567890";
// location must neither contains spaces nor commas or any other URL-special characters // location must neither contains spaces nor commas or any other URL-special characters
location = "Essen_DE"; location = "Essen_DE";
} }