1s entries
This commit is contained in:
parent
784346b833
commit
e8102e74a9
88
src/influx.c
88
src/influx.c
@ -23,6 +23,8 @@ const char DEFAULT_LOCATION[] = "Essen_DE";
|
||||
const char LOCATION_KEY[] = "location";
|
||||
const char *location;
|
||||
|
||||
const uint8_t ONE_SECOND_DIVIDER = 50;
|
||||
|
||||
|
||||
extern uint32_t skipped;
|
||||
|
||||
@ -30,15 +32,34 @@ extern uint32_t skipped;
|
||||
// #define BUFSIZE 131070
|
||||
#define BUFSIZE 65535
|
||||
// #define BUFSIZE 1024
|
||||
char influxBuffer[BUFSIZE];
|
||||
char *bufferNextEntry;
|
||||
// char influxBuffer[BUFSIZE];
|
||||
// char *bufferNextEntry;
|
||||
|
||||
typedef struct influxBuffer {
|
||||
uint32_t entries;
|
||||
uint32_t totalEntries;
|
||||
char *nextEntry;
|
||||
char buffer[BUFSIZE];
|
||||
} tInfluxBuffer;
|
||||
|
||||
typedef enum {
|
||||
PERIOD_20MS = 0,
|
||||
PERIOD_1S,
|
||||
PERIOD_END
|
||||
} ePeriod;
|
||||
|
||||
tInfluxBuffer influxBuffers[PERIOD_END];
|
||||
|
||||
#define HOSTNAMESIZE 128
|
||||
char hostname[HOSTNAMESIZE];
|
||||
|
||||
static void influxClearBuffer() {
|
||||
memset(influxBuffer, 0, BUFSIZE);
|
||||
bufferNextEntry = influxBuffer;
|
||||
static void influxClearBuffer(tInfluxBuffer *influxBuffer, bool initial) {
|
||||
memset(influxBuffer->buffer, 0, BUFSIZE);
|
||||
influxBuffer->nextEntry = influxBuffer->buffer;
|
||||
influxBuffer->entries = 0;
|
||||
if (initial) {
|
||||
influxBuffer->totalEntries = 0;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -64,12 +85,13 @@ void influxInit(config_t *pCfg) {
|
||||
influxTag = hostname;
|
||||
}
|
||||
fprintf(stderr, "CONFIG: influxTag=%s\n", influxTag);
|
||||
influxClearBuffer();
|
||||
influxClearBuffer(influxBuffers[PERIOD_20MS], true);
|
||||
influxClearBuffer(influxBuffers[PERIOD_1S], true);
|
||||
}
|
||||
|
||||
|
||||
|
||||
static void influxSendRequest() {
|
||||
static void influxSendRequest(tInfluxBuffer *influxBuffer) {
|
||||
led(E_RED, false);
|
||||
led(E_BLUE, true);
|
||||
CURL *curl = curl_easy_init();
|
||||
@ -80,7 +102,7 @@ static void influxSendRequest() {
|
||||
curl_easy_setopt(curl, CURLOPT_USERNAME, influxUser);
|
||||
curl_easy_setopt(curl, CURLOPT_PASSWORD, influxPass);
|
||||
}
|
||||
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, influxBuffer);
|
||||
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, influxBuffer->buffer);
|
||||
CURLcode res = curl_easy_perform(curl);
|
||||
if(res != CURLE_OK) {
|
||||
logmsg(LOG_ERR, "curl_easy_perform() failed: %s\n", curl_easy_strerror(res));
|
||||
@ -93,8 +115,8 @@ static void influxSendRequest() {
|
||||
|
||||
void influxAddFrequency(uint32_t period, double fRaw, double fSmoothed, double gradient,
|
||||
int valid) {
|
||||
static uint32_t entries = 0;
|
||||
static uint32_t totalEntries = 0;
|
||||
static uint8_t divider = 0;
|
||||
static double summedUpTime = 0.0;
|
||||
char tmpBuf[256];
|
||||
struct timespec t;
|
||||
|
||||
@ -107,17 +129,45 @@ void influxAddFrequency(uint32_t period, double fRaw, double fSmoothed, double g
|
||||
period, fSmoothed, fSmoothed, gradient, fRaw,
|
||||
tt);
|
||||
|
||||
if ((bufferNextEntry + c + 10) > (influxBuffer + BUFSIZE)) {
|
||||
influxSendRequest();
|
||||
influxClearBuffer();
|
||||
totalEntries += entries;
|
||||
logmsg(LOG_INFO, "%u entries sent to database, in total %u, invalid: %u\n", entries, totalEntries, skipped);
|
||||
entries = 0;
|
||||
if ((influxBuffers[PERIOD_20MS]->nextEntry + c + 10) > (influxBuffers[PERIOD_20MS]->buffer + BUFSIZE)) {
|
||||
influxSendRequest(influxBuffers[PERIOD_20MS]);
|
||||
influxBuffers[PERIOD_20MS]->totalEntries += influxBuffers[PERIOD_20MS]->entries;
|
||||
logmsg(LOG_INFO, "%u 20ms-entries sent to database, in total %u, invalid: %u\n",
|
||||
influxBuffers[PERIOD_20MS]->entries, influxBuffers[PERIOD_20MS]->totalEntries, skipped);
|
||||
influxClearBuffer(influxBuffers[PERIOD_20MS], false);
|
||||
}
|
||||
|
||||
memcpy(bufferNextEntry, tmpBuf, c);
|
||||
bufferNextEntry += c;
|
||||
entries++;
|
||||
memcpy(influxBuffers[PERIOD_20MS]->nextEntry, tmpBuf, c);
|
||||
influxBuffers[PERIOD_20MS]->nextEntry += c;
|
||||
influxBuffers[PERIOD_20MS]->entries += 1;
|
||||
|
||||
summedUpTime += fSmoothed;
|
||||
divider += 1;
|
||||
|
||||
if (divider == ONE_SECOND_DIVIDER) {
|
||||
double freq1S = ((double)ONE_SECOND_DIVIDER) / summedUpTime;
|
||||
int c = sprintf(tmpBuf, "mainsfrequency1S,host=%s,location=%s "
|
||||
"freq=%f "
|
||||
"%llu\n",
|
||||
influxTag, valid, location,
|
||||
freq1S,
|
||||
tt);
|
||||
|
||||
//if ((influxBuffers[PERIOD_1S]->nextEntry + c + 10) > (influxBuffers[PERIOD_1S]->buffer + BUFSIZE)) {
|
||||
if (influxBuffers[PERIOD_1S]->entries == 10) {
|
||||
influxSendRequest(influxBuffers[PERIOD_1S]);
|
||||
influxBuffers[PERIOD_1S]->totalEntries += influxBuffers[PERIOD_1S]->entries;
|
||||
logmsg(LOG_INFO, "%u 1s-entries sent to database, in total %u\n",
|
||||
influxBuffers[PERIOD_1S]->entries, influxBuffers[PERIOD_1S]->totalEntries);
|
||||
influxClearBuffer(influxBuffers[PERIOD_1S], false);
|
||||
}
|
||||
|
||||
memcpy(influxBuffers[PERIOD_1S]->nextEntry, tmpBuf, c);
|
||||
influxBuffers[PERIOD_1S]->nextEntry += c;
|
||||
influxBuffers[PERIOD_1S]->entries += 1;
|
||||
|
||||
divider = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user