Files
smmapdfw/pgworker/pgworker/pg_worker.c
2005-03-07 14:11:29 +00:00

420 lines
12 KiB
C

/*
Copyright (C) 2004, 2005 Wolfgang Hottgenroth
This file is part of smmapdfw-pgworker.
smmapdfw is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
smmapdfw is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
License for more details.
You should have received a copy of the GNU General Public License
along with smmapdfw. If not, write to the Free Software Foundation, 59
Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#include <stdlib.h>
#include <syslog.h>
#include <ctype.h>
#include <libpq-fe.h>
#include <containers_public.h>
#include "htmalloc.h"
#include "htbuffer.h"
#include "smmapd.h"
int pg_worker_init(cfgl_t *cfg, void **handle);
int pg_worker_destroy(void *handle);
int pg_worker_work_setup(void *handle, void **work_handle);
int pg_worker_work_destroy(void *handle, void *work_handle);
int pg_worker_work(void *handle, void *work_handle, char *input, htbuffer_t *output);
class_descriptor_t pgworker = {
"pgworker",
&pg_worker_init,
&pg_worker_destroy,
&pg_worker_work_setup,
&pg_worker_work,
&pg_worker_work_destroy
};
typedef struct pg_container_handle_s {
int counter;
cfgl_t *cfg;
char *sql_statement;
int param_cnt;
char *input_delimiter;
char *output_format_string;
char *dbhost;
char *dbname;
char *dbuser;
char *dbpass;
} pg_container_handle_t;
typedef struct pg_worker_handle_s {
int counter;
pg_container_handle_t *pch;
char **param_array; /* pre-allocated space for separated params */
PGconn *dbconn;
} pg_worker_handle_t;
static int process(pg_worker_handle_t *pwh, char *input, htbuffer_t *output);
static int count_params(const char *stmt);
static int break_input(pg_worker_handle_t *pwh, char *input);
int debug = 0;
#define MAX_PARAM_DIGIT_NUM 2
#define MAX_PARAM_COUNT 99
static int count_params(const char *stmt) {
const char *paramPtr = stmt;
const char *spacePtr;
char numBuf[MAX_PARAM_DIGIT_NUM + 1];
char checkArray[MAX_PARAM_COUNT];
int i, x, cnt=0, done, res=0;
memset(checkArray, 0, MAX_PARAM_COUNT);
if (debug) syslog(LOG_DEBUG, "pgworker count_params: start %s", paramPtr);
while ((paramPtr = strchr(paramPtr, '$')) && (res == 0)) {
paramPtr++;
if (debug) syslog(LOG_DEBUG, "pgworker count_params: step %s", paramPtr);
for (i = 0, done = 0; (i <= MAX_PARAM_DIGIT_NUM) && !done; i++) {
if (isdigit(*paramPtr)) {
numBuf[i] = *paramPtr;
if (debug) syslog(LOG_DEBUG, "pgworker count_params: i=%d, nb=%c",
i, numBuf[i]);
paramPtr++;
} else {
numBuf[i] = '\0';
if (debug) syslog(LOG_DEBUG, "pgworker count_params: end, i=%d, nb=%s",
i, numBuf);
x = atoi(numBuf);
if (x == 0) {
syslog(LOG_ERR, "pgworker count_params: failure, x=0");
break;
}
if (x >= MAX_PARAM_COUNT) {
syslog(LOG_ERR, "pgworker count_params: failure, x > MAX_PARAM_COUNT");
break;
}
checkArray[x-1] = 1;
cnt++;
done = 1;
}
}
if (!done) {
syslog(LOG_ERR, "pgworker count_params: not done, failure");
res = -1;
break;
}
}
if (done) {
for (i = 0; (i <= MAX_PARAM_COUNT) && checkArray[i]; i++);
if (i != cnt) {
syslog(LOG_ERR,
"pgworker count_params: none continuous sequence of param numbers, failure");
res = -1;
} else {
if (debug) syslog(LOG_DEBUG, "pgworker count_params: %d params", cnt);
res = cnt;
}
}
if (debug) syslog(LOG_DEBUG, "pgworker count_params: finish, res=%d", res);
return res;
}
int pg_worker_init(cfgl_t *cfg, void **handle) {
pg_container_handle_t *pch =
(pg_container_handle_t*)htmalloc(sizeof(pg_container_handle_t));
pch->counter = 0;
pch->cfg = cfg;
debug = atoi(findcfglx(pch->cfg, "debug", "0"));
pch->sql_statement = findcfgl(pch->cfg, "sql_statement");
if (pch->sql_statement == NULL) {
syslog(LOG_ERR, "pgworker pg_worker_init: missing SQL statement");
return -1;
}
pch->param_cnt = count_params(pch->sql_statement);
if (pch->param_cnt <= 0)
return -1;
pch->input_delimiter = findcfgl(pch->cfg, "input_delimiter");
if ((pch->param_cnt > 1) && (pch->input_delimiter == NULL)) {
syslog(LOG_ERR,
"pgworker pg_worker_init: more than one parameter but no input_delimiter given");
return -1;
}
pch->output_format_string = findcfgl(pch->cfg, "output_format_string");
pch->dbhost = findcfgl(pch->cfg, "dbhost");
if (pch->dbhost == NULL) {
syslog(LOG_ERR, "pgworker pg_worker_init: missing dbhost");
return -1;
}
pch->dbname = findcfgl(pch->cfg, "dbname");
if (pch->dbname == NULL) {
syslog(LOG_ERR, "pgworker pg_worker_init: missing dbname");
return -1;
}
pch->dbuser = findcfgl(pch->cfg, "dbuser");
if (pch->dbuser == NULL) {
syslog(LOG_ERR, "pgworker pg_worker_init: missing dbuser");
return -1;
}
pch->dbpass = findcfgl(pch->cfg, "dbpass");
if (pch->dbpass == NULL) {
syslog(LOG_ERR, "pgworker pg_worker_init: missing dbpass");
return -1;
}
*handle = pch;
return 0;
}
int pg_worker_destroy(void *handle) {
pg_container_handle_t *pch = (pg_container_handle_t*)handle;
free(pch);
return 0;
}
int pg_worker_work_setup(void *handle, void **work_handle) {
pg_worker_handle_t *pwh =
(pg_worker_handle_t*)htmalloc(sizeof(pg_worker_handle_t));
pwh->counter = 0;
pwh->pch = handle;
*work_handle = pwh;
pwh->param_array = (char **) htmalloc(sizeof(char*) * pwh->pch->param_cnt);
syslog(LOG_DEBUG, "pgworker c=%p w=%p", pwh->pch, pwh);
pwh->dbconn = PQsetdbLogin(pwh->pch->dbhost,
NULL,
NULL,
NULL,
pwh->pch->dbname,
pwh->pch->dbuser,
pwh->pch->dbpass);
syslog(LOG_DEBUG, "pgworker (%p) pg_worker_work_setup: dbconn=%p",
pwh, pwh->dbconn);
return 0;
}
int pg_worker_work_destroy(void *handle, void *work_handle) {
pg_worker_handle_t *pwh = (pg_worker_handle_t*) work_handle;
syslog(LOG_DEBUG, "pgworker (%p) pg_worker_destroy: freeing the worker handle",
pwh);
PQfinish(pwh->dbconn);
free(pwh->param_array);
free(pwh);
}
int pg_worker_work(void *handle, void *work_handle, char *input, htbuffer_t *output) {
int res;
pg_worker_handle_t *pwh = (pg_worker_handle_t*) work_handle;
syslog(LOG_DEBUG, "pgworker (%p) pg_worker_work entered", pwh);
res = process(pwh, input, output);
return SMM_OK;
}
static int break_input(pg_worker_handle_t *pwh, char *input) {
char *param, *nextptr;
int cnt=0;
if (pwh->pch->input_delimiter == NULL) {
syslog(LOG_DEBUG, "pgworker (%p) break_input: input is %s", pwh, input);
pwh->param_array[0] = input;
} else {
nextptr = input;
while (NULL != (param = strstr(nextptr, pwh->pch->input_delimiter))) {
*param = '\0';
if (cnt >= pwh->pch->param_cnt) {
syslog(LOG_ERR, "pgworker (%p) break_input: too much parameters", pwh);
return -1;
}
pwh->param_array[cnt] = nextptr;
cnt++;
syslog(LOG_DEBUG, "pgworker (%p) break_input: param found (cnt=%d): {%s}",
pwh, cnt, nextptr);
nextptr = param + strlen(pwh->pch->input_delimiter);
}
/* last */
if (cnt >= pwh->pch->param_cnt) {
syslog(LOG_ERR, "pgworker (%p) break_input: too much parameters", pwh);
return -1;
}
pwh->param_array[cnt] = nextptr;
cnt++;
syslog(LOG_DEBUG, "pgworker (%p) break_input: last param (cnt=%d): {%s}",
pwh, cnt, nextptr);
if (cnt < pwh->pch->param_cnt) {
syslog(LOG_ERR, "pgworker (%p) break_input: too few parameters", pwh);
return -1;
}
syslog(LOG_DEBUG, "pgworker (%p) break_input: end", pwh);
}
return 0;
}
#define MAX_RESULT_DIGIT_NUM 2
static int populate_output_format(pg_worker_handle_t *pwh, PGresult *pgr,
htbuffer_t *output, int tuple_num) {
const char *formatPtr = pwh->pch->output_format_string;
const char *tmpPtr = formatPtr;
char numBuf[MAX_PARAM_DIGIT_NUM + 1];
char *s;
int i, x, cnt=0, done, res=0;
htbuffer_strcpy(output, "");
while ((formatPtr = strchr(formatPtr, '$')) && (res == 0)) {
htbuffer_strncat(output, tmpPtr, formatPtr - tmpPtr);
syslog(LOG_DEBUG, "pgworker (%p) populate_output_format OUTPUT: %s",
pwh, output);
formatPtr++;
for (i = 0, done = 0; (i <= MAX_RESULT_DIGIT_NUM) && !done; i++) {
if (isdigit(*formatPtr)) {
numBuf[i] = *formatPtr;
formatPtr++;
tmpPtr = formatPtr;
} else {
numBuf[i] = '\0';
x = atoi(numBuf) - 1;
if (x > PQnfields(pgr)) {
syslog(LOG_ERR,
"pgworker (%p) populate_output_format param num too high");
break;
}
s = PQgetvalue(pgr, tuple_num, x);
syslog(LOG_DEBUG, "pgworker (%p) populate_output_format OUTPUT PARAM: %s",
pwh, s);
htbuffer_strcat(output, s);
done = 1;
}
}
if (!done) {
syslog(LOG_ERR, "pgworker (%p) populate_outputformat, not done, failure",
pwh);
htbuffer_strcpy(output, "");
res = -1;
break;
}
}
if (done == 1) {
/* reminds */
htbuffer_strncat(output, tmpPtr, formatPtr - tmpPtr);
syslog(LOG_DEBUG, "OUTPUT: %s", output->buf);
}
return res;
}
static int process(pg_worker_handle_t *pwh, char *input, htbuffer_t *output) {
int i, j, t, f, res;
char *s;
PGresult *pgr;
if (0 == (res = break_input(pwh, input))) {
pgr = PQexecParams(pwh->dbconn,
pwh->pch->sql_statement,
pwh->pch->param_cnt,
NULL,
(const char * const *) pwh->param_array,
NULL,
NULL,
0);
syslog(LOG_DEBUG, "pgworker (%p) process pgr=%p", pwh, pgr);
t = PQntuples(pgr);
syslog(LOG_DEBUG, "pgworker (%p) process PQntuples=%d", pwh, t);
f = PQnfields(pgr);
syslog(LOG_DEBUG, "pgworker (%p) process PQnfields=%d", pwh, f);
if (t > 0) {
if (NULL == pwh->pch->output_format_string) {
if (f > 1)
syslog(LOG_WARNING,
"pgworker (%p) process: more than one result fields (%d) but no format string",
pwh, f);
htbuffer_strcpy(output, PQgetvalue(pgr, 0, 0));
res = 0;
} else {
if (0 == (res = populate_output_format(pwh, pgr, output, 0))) {
syslog(LOG_DEBUG, "pgworker (%p) process: output %s", pwh, output->buf);
} else {
syslog(LOG_ERR, "pgworker (%p) process failure in population of output", pwh);
}
}
} else {
syslog(LOG_DEBUG, "pgworker (%p) process: no output", pwh);
}
PQclear(pgr);
}
return res;
}
/**
A:
- An instance of the pgworker will get an SQL-statement.
- The SQL-statement may contain multiple parameter.
- A separator token to be used in the input from sendmail will be defined.
- The SQL-statement contains one result parameter.
- A cache file can be defined.
B:
- Multiple instances of the pgworker will get exactly the same SQL-statement.
- The SQL-statement may contain multiple parameter.
- A separator token to be used in the input from sendmail will be defined.
- The SQL-statement contains multiple result parameters, usually as many
as instances are using it.
- A position argument to identify the result parameter for the particular
instance will be defined.
- If a cache file should be used, the same cache file can be used of
instances using the same SQL-statement.
C:
- As B, but for each instance a list of position arguments can be given
to return multiple output results.
- A separator token for the different result items must be given.
D:
- As C, but instead of a separator token a format string must be given.
E:
- As D, but instead of given output positions in a separate list, the
position numbers will be used directly in the format string.
*/