database integration
This commit is contained in:
@ -60,6 +60,7 @@ typedef struct pg_worker_handle_s {
|
|||||||
int counter;
|
int counter;
|
||||||
pg_container_handle_t *pch;
|
pg_container_handle_t *pch;
|
||||||
char **param_array; /* pre-allocated space for separated params */
|
char **param_array; /* pre-allocated space for separated params */
|
||||||
|
PGconn *dbconn;
|
||||||
} pg_worker_handle_t;
|
} pg_worker_handle_t;
|
||||||
|
|
||||||
|
|
||||||
@ -159,12 +160,23 @@ int pg_worker_work_setup(void *handle, void **work_handle) {
|
|||||||
*work_handle = pwh;
|
*work_handle = pwh;
|
||||||
pwh->param_array = (char **) htmalloc(sizeof(char*) * pwh->pch->param_cnt);
|
pwh->param_array = (char **) htmalloc(sizeof(char*) * pwh->pch->param_cnt);
|
||||||
syslog(LOG_DEBUG, "pgworker c=%p w=%p", pwh->pch, pwh);
|
syslog(LOG_DEBUG, "pgworker c=%p w=%p", pwh->pch, pwh);
|
||||||
|
|
||||||
|
pwh->dbconn = PQsetdbLogin("confdb.hottis.de",
|
||||||
|
NULL,
|
||||||
|
NULL,
|
||||||
|
NULL,
|
||||||
|
"config",
|
||||||
|
"lurker",
|
||||||
|
"lurker123");
|
||||||
|
syslog(LOG_DEBUG, "pgworker (%p) pg_worker_work_setup: dbconn=%p", pwh, pwh->dbconn);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int pg_worker_work_destroy(void *handle, void *work_handle) {
|
int pg_worker_work_destroy(void *handle, void *work_handle) {
|
||||||
pg_worker_handle_t *pwh = (pg_worker_handle_t*) work_handle;
|
pg_worker_handle_t *pwh = (pg_worker_handle_t*) work_handle;
|
||||||
syslog(LOG_DEBUG, "pgworker (%p) pg_worker_destroy: freeing the worker handle", pwh);
|
syslog(LOG_DEBUG, "pgworker (%p) pg_worker_destroy: freeing the worker handle", pwh);
|
||||||
|
PQfinish(pwh->dbconn);
|
||||||
free(pwh->param_array);
|
free(pwh->param_array);
|
||||||
free(pwh);
|
free(pwh);
|
||||||
}
|
}
|
||||||
@ -227,15 +239,110 @@ int break_input(pg_worker_handle_t *pwh, char *input) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#define MAX_RESULT_DIGIT_NUM 2
|
||||||
|
int populate_output_format(pg_worker_handle_t *pwh, PGresult *pgr, char *output, int tuple_num) {
|
||||||
|
const char *formatPtr = pwh->pch->output_format_string;
|
||||||
|
const char *tmpPtr = formatPtr;
|
||||||
|
char numBuf[MAX_PARAM_DIGIT_NUM + 1];
|
||||||
|
char *s;
|
||||||
|
int i, x, cnt=0, done, res=0;
|
||||||
|
|
||||||
|
output[0] = '\0';
|
||||||
|
|
||||||
|
while ((formatPtr = strchr(formatPtr, '$')) && (res == 0)) {
|
||||||
|
strncat(output, tmpPtr, formatPtr - tmpPtr);
|
||||||
|
syslog(LOG_DEBUG, "pgworker (%p) populate_output_format OUTPUT: %s", pwh, output);
|
||||||
|
formatPtr++;
|
||||||
|
for (i = 0, done = 0; (i <= MAX_RESULT_DIGIT_NUM) && !done; i++) {
|
||||||
|
if (isdigit(*formatPtr)) {
|
||||||
|
numBuf[i] = *formatPtr;
|
||||||
|
formatPtr++;
|
||||||
|
tmpPtr = formatPtr;
|
||||||
|
} else {
|
||||||
|
numBuf[i] = '\0';
|
||||||
|
x = atoi(numBuf) - 1;
|
||||||
|
if (x > PQnfields(pgr)) {
|
||||||
|
syslog(LOG_ERR, "pgworker (%p) populate_output_format param num too high");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
s = PQgetvalue(pgr, tuple_num, x);
|
||||||
|
syslog(LOG_DEBUG, "pgworker (%p) populate_output_format OUTPUT PARAM: %s", pwh, s);
|
||||||
|
strcat(output, s);
|
||||||
|
done = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!done) {
|
||||||
|
syslog(LOG_ERR, "pgworker (%p) populate_outputformat, not done, failure", pwh);
|
||||||
|
output[0] = '\0';
|
||||||
|
res = -1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (done == 1) {
|
||||||
|
/* reminds */
|
||||||
|
strncat(output, tmpPtr, formatPtr - tmpPtr);
|
||||||
|
syslog(LOG_DEBUG, "OUTPUT: %s", output);
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int process(pg_worker_handle_t *pwh, char *input, char *output) {
|
int process(pg_worker_handle_t *pwh, char *input, char *output) {
|
||||||
int i, res;
|
int i, j, t, f, res;
|
||||||
|
char *s;
|
||||||
|
PGresult *pgr;
|
||||||
|
|
||||||
|
|
||||||
if (0 == (res = break_input(pwh, input))) {
|
if (0 == (res = break_input(pwh, input))) {
|
||||||
for (i = 0; i < pwh->pch->param_cnt; i++) {
|
for (i = 0; i < pwh->pch->param_cnt; i++) {
|
||||||
syslog(LOG_DEBUG, "pgworker (%p) process: param %d --> %s", pwh, i+1, pwh->param_array[i]);
|
syslog(LOG_DEBUG, "pgworker (%p) process: param %d --> %s", pwh, i+1, pwh->param_array[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pgr = PQexecParams(pwh->dbconn,
|
||||||
|
pwh->pch->sql_statement,
|
||||||
|
pwh->pch->param_cnt,
|
||||||
|
NULL,
|
||||||
|
pwh->param_array,
|
||||||
|
NULL,
|
||||||
|
NULL,
|
||||||
|
0);
|
||||||
|
syslog(LOG_DEBUG, "pgworker (%p) process pgr=%p", pwh, pgr);
|
||||||
|
|
||||||
|
t = PQntuples(pgr);
|
||||||
|
syslog(LOG_DEBUG, "pgworker (%p) process PQntuples=%d", pwh, t);
|
||||||
|
|
||||||
|
f = PQnfields(pgr);
|
||||||
|
syslog(LOG_DEBUG, "pgworker (%p) process PQnfields=%d", pwh, f);
|
||||||
|
|
||||||
|
if (t > 0) {
|
||||||
|
for (i = 0; i < t; i++) {
|
||||||
|
for (j = 0; j < f; j++) {
|
||||||
|
s = PQgetvalue(pgr, i, j);
|
||||||
|
syslog(LOG_DEBUG, "pgworker (%p) process result (%d, %d) = {%s}", pwh, i, j, s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL == pwh->pch->output_format_string) {
|
||||||
|
if (f > 1)
|
||||||
|
syslog(LOG_WARNING, "pgworker (%p) process: more than one result fields (%d) but no format string", pwh, f);
|
||||||
|
strcpy(output, PQgetvalue(pgr, 0, 0));
|
||||||
|
res = 0;
|
||||||
|
} else {
|
||||||
|
if (0 == (res = populate_output_format(pwh, pgr, output, 0))) {
|
||||||
|
syslog(LOG_DEBUG, "pgworker (%p) process: output %s", pwh, output);
|
||||||
|
} else {
|
||||||
|
syslog(LOG_ERR, "pgworker (%p) process failure in population of output", pwh);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
syslog(LOG_DEBUG, "pgworker (%p) process: no output", pwh);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
PQclear(pgr);
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user