start change to sinkServer support

This commit is contained in:
2021-03-04 13:55:27 +01:00
parent bfbf686298
commit e357072888
5 changed files with 10 additions and 221 deletions

View File

@ -1,7 +1,7 @@
CC=gcc CC=gcc
CFLAGS=-Wall CFLAGS=-Wall
LDFLAGS=-lwiringPi -lcurl -lconfig LDFLAGS=-lwiringPi -lconfig
INST_DIR=/opt/sbin INST_DIR=/opt/sbin
@ -11,7 +11,7 @@ VERSION:=$(shell cat VERSION)
.PHONY: all .PHONY: all
all: counter all: counter
counter: counter.o LS7366R.o influx.o ringbuffer.o led.o logging.o version.o counter: counter.o LS7366R.o ringbuffer.o led.o logging.o version.o
$(CC) -o $@ $(LDFLAGS) $^ $(CC) -o $@ $(LDFLAGS) $^
version.o: version.c VERSION version.o: version.c VERSION

View File

@ -8,7 +8,6 @@
#include <math.h> #include <math.h>
#include "LS7366R.h" #include "LS7366R.h"
#include "influx.h"
#include "ringbuffer.h" #include "ringbuffer.h"
#include "led.h" #include "led.h"
#include "logging.h" #include "logging.h"
@ -75,40 +74,15 @@ int main (void) {
init(); init();
ledInit(); ledInit();
ls7366rInit(SPI_CHAN); ls7366rInit(SPI_CHAN);
influxInit(&cfg);
start(); start();
double epsilon;
if (! config_lookup_float(&cfg, EPSILON_KEY, &epsilon)) {
epsilon = DEFAULT_EPSILON;
}
fprintf(stderr, "CONFIG: epsilon=%f\n", epsilon);
double lastF = 0;
bool settled = false;
uint8_t ledTick = 0; uint8_t ledTick = 0;
while (1) { while (1) {
uint32_t period = ringbufferGet(); uint32_t period = ringbufferGet();
double fRaw = 1.0 / (((double) period) / 1000000.0); // add averaging and forming the struct here
int valid = settled ? 1 : 0; logmsg(LOG_DEBUG, "Period: %lu", period);
double gradient = fRaw - lastF;
double fSmoothed = fRaw;
if (settled && (fabs(gradient) > epsilon)) {
logmsg(LOG_INFO, "Current f=%f, last f=%f, gradient %f too large, invalid\n", fRaw, lastF, gradient);
skipped++;
fSmoothed = lastF;
valid = 0;
} else {
lastF = fRaw;
}
if (settled) {
influxAddFrequency(period, fRaw, fSmoothed, gradient, valid);
}
ledTick++; ledTick++;
if (ledTick == 50) { if (ledTick == 50) {

View File

@ -1,3 +1,6 @@
influxUser = "secundus" sinkServer = "sink.hottis.de"
influxPass = "geheim" sinkPort = 20169
influxUrl = "http://172.16.3.15:8086/write?db=smarthome2&precision=ms"
deviceId = "mainscnt03"
// sharedSecret has to have exactly 31 octets
sharedSecret = "1234567890123456789012345678901"

View File

@ -1,179 +0,0 @@
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <curl/curl.h>
#include <time.h>
#include <string.h>
#include <unistd.h>
#include <libconfig.h>
#include "led.h"
#include "logging.h"
const char INFLUXURL_KEY[] = "influxUrl";
const char DEFAULT_INFLUXURL[] = "http://172.16.3.15:8086/write?db=smarthome2&precision=ms";
const char *influxUrl;
const char INFLUXUSER_KEY[] = "influxUser";
const char *influxUser;
const char INFLUXPASS_KEY[] = "influxPass";
const char *influxPass;
const char INFLUXTAG_KEY[] = "influxTag";
const char *influxTag;
const char DEFAULT_LOCATION[] = "Essen_DE";
const char LOCATION_KEY[] = "location";
const char *location;
const uint32_t ONE_SECOND_DIVIDER = 50;
extern uint32_t skipped;
// #define BUFSIZE 131070
#define BUFSIZE 65535
// #define BUFSIZE 1024
// char influxBuffer[BUFSIZE];
// char *bufferNextEntry;
typedef struct influxBuffer {
uint32_t entries;
uint32_t totalEntries;
char *nextEntry;
char buffer[BUFSIZE];
} tInfluxBuffer;
typedef enum {
PERIOD_20MS = 0,
PERIOD_1S,
PERIOD_END
} ePeriod;
tInfluxBuffer *influxBuffers[PERIOD_END];
#define HOSTNAMESIZE 128
char hostname[HOSTNAMESIZE];
static void influxClearBuffer(tInfluxBuffer *influxBuffer, bool initial) {
memset(influxBuffer->buffer, 0, BUFSIZE);
influxBuffer->nextEntry = influxBuffer->buffer;
influxBuffer->entries = 0;
if (initial) {
influxBuffer->totalEntries = 0;
}
}
void influxInit(config_t *pCfg) {
if (! config_lookup_string(pCfg, INFLUXURL_KEY, &influxUrl)) {
influxUrl = DEFAULT_INFLUXURL;
}
fprintf(stderr, "CONFIG: influxUrl=%s\n", influxUrl);
if (! config_lookup_string(pCfg, INFLUXUSER_KEY, &influxUser)) {
influxUser = NULL;
}
fprintf(stderr, "CONFIG: influxUser=%s\n", (influxUser == NULL ? "<null>" : influxUser));
if (! config_lookup_string(pCfg, INFLUXPASS_KEY, &influxPass)) {
influxPass = NULL;
}
fprintf(stderr, "CONFIG: influxPass=%s\n", (influxPass == NULL ? "<null>" : influxPass));
if (! config_lookup_string(pCfg, LOCATION_KEY, &location)) {
location = DEFAULT_LOCATION;
}
fprintf(stderr, "CONFIG: location=%s\n", location);
if (! config_lookup_string(pCfg, INFLUXTAG_KEY, &influxTag)) {
gethostname(hostname, HOSTNAMESIZE);
influxTag = hostname;
}
fprintf(stderr, "CONFIG: influxTag=%s\n", influxTag);
influxBuffers[PERIOD_20MS] = (tInfluxBuffer*) malloc(sizeof(tInfluxBuffer));
influxClearBuffer(influxBuffers[PERIOD_20MS], true);
influxBuffers[PERIOD_1S] = (tInfluxBuffer*) malloc(sizeof(tInfluxBuffer));
influxClearBuffer(influxBuffers[PERIOD_1S], true);
}
static void influxSendRequest(tInfluxBuffer *influxBuffer) {
led(E_RED, false);
led(E_BLUE, true);
CURL *curl = curl_easy_init();
if(curl) {
curl_easy_setopt(curl, CURLOPT_URL, influxUrl);
if (influxUser && influxPass) {
curl_easy_setopt(curl, CURLOPT_HTTPAUTH, CURLAUTH_DIGEST);
curl_easy_setopt(curl, CURLOPT_USERNAME, influxUser);
curl_easy_setopt(curl, CURLOPT_PASSWORD, influxPass);
}
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, influxBuffer->buffer);
CURLcode res = curl_easy_perform(curl);
if(res != CURLE_OK) {
logmsg(LOG_ERR, "curl_easy_perform() failed: %s\n", curl_easy_strerror(res));
led(E_RED, true);
}
curl_easy_cleanup(curl);
}
led(E_BLUE, false);
}
void influxAddFrequency(uint32_t period, double fRaw, double fSmoothed, double gradient,
int valid) {
static uint32_t divider = 0;
static uint32_t summedUpTime = 0;
char tmpBuf[256];
struct timespec t;
clock_gettime(CLOCK_REALTIME, &t);
uint64_t tt = (((uint64_t)t.tv_sec) * 1000) + (((uint64_t)t.tv_nsec) / 1000000);
int c = sprintf(tmpBuf, "mainsfrequency,host=%s,valid=%d,location=%s "
"period=%u,freq=%f,freqSmoothed=%f,gradient=%f,freqRaw=%f "
"%llu\n",
influxTag, valid, location,
period, fSmoothed, fSmoothed, gradient, fRaw,
tt);
if ((influxBuffers[PERIOD_20MS]->nextEntry + c + 10) > (influxBuffers[PERIOD_20MS]->buffer + BUFSIZE)) {
influxSendRequest(influxBuffers[PERIOD_20MS]);
influxBuffers[PERIOD_20MS]->totalEntries += influxBuffers[PERIOD_20MS]->entries;
logmsg(LOG_INFO, "%u 20ms-entries sent to database, in total %u, invalid: %u\n",
influxBuffers[PERIOD_20MS]->entries, influxBuffers[PERIOD_20MS]->totalEntries, skipped);
influxClearBuffer(influxBuffers[PERIOD_20MS], false);
}
memcpy(influxBuffers[PERIOD_20MS]->nextEntry, tmpBuf, c);
influxBuffers[PERIOD_20MS]->nextEntry += c;
influxBuffers[PERIOD_20MS]->entries += 1;
summedUpTime += period;
divider += 1;
if (divider == ONE_SECOND_DIVIDER) {
double freq1S = ((double)ONE_SECOND_DIVIDER) / (((double)summedUpTime) / 1000000.0);
logmsg(LOG_DEBUG, "%llu: %u %f\n", tt, summedUpTime, freq1S);
int c = sprintf(tmpBuf, "mainsfrequency1S,host=%s,location=%s "
"freq=%f "
"%llu\n",
influxTag, location,
freq1S,
tt);
//if ((influxBuffers[PERIOD_1S]->nextEntry + c + 10) > (influxBuffers[PERIOD_1S]->buffer + BUFSIZE)) {
if (influxBuffers[PERIOD_1S]->entries == 30) {
influxSendRequest(influxBuffers[PERIOD_1S]);
influxBuffers[PERIOD_1S]->totalEntries += influxBuffers[PERIOD_1S]->entries;
logmsg(LOG_INFO, "%u 1s-entries sent to database, in total %u\n",
influxBuffers[PERIOD_1S]->entries, influxBuffers[PERIOD_1S]->totalEntries);
influxClearBuffer(influxBuffers[PERIOD_1S], false);
}
memcpy(influxBuffers[PERIOD_1S]->nextEntry, tmpBuf, c);
influxBuffers[PERIOD_1S]->nextEntry += c;
influxBuffers[PERIOD_1S]->entries += 1;
divider = 0;
summedUpTime = 0.0;
}
}

View File

@ -1,9 +0,0 @@
#ifndef _INFLUX_H_
#define _INFLUX_H_
#include <libconfig.h>
void influxAddFrequency(uint32_t period, double fRaw, double fSmoothed, double gradient, int valid);
void influxInit(config_t *pCfg);
#endif // _INFLUX_H_