Files
smmapdfw/smmapdfw/modules/verify_worker.c
2004-09-27 13:38:53 +00:00

746 lines
22 KiB
C

#include <stdlib.h>
#include <syslog.h>
#include <pthread.h>
#include <errno.h>
#include <time.h>
#include <string.h>
#include <sys/types.h>
#include <fcntl.h>
#include <stdio.h>
#define DB_DBM_HSEARCH 1
#include <db.h>
#include "containers_public.h"
#include "smmapd.h"
#include "dns.h"
#include "queue.h"
#include "smtp.h"
#define SMM_LOCAL_PERM_NOK 101
#define SMM_LOCAL_TEMP_NOK 102
#define SMM_LOCAL_OK 103
struct verify_container_handle_s {
cfgl_t *cfg;
int timeout_result;
int timeout_dialog;
int cache_enabled;
int cache_expiry;
pthread_mutex_t *cache_mutex;
char *cache_file;
char *sender_address;
char *ehlo_arg;
int smtp_port;
int max_checker_threads;
};
typedef struct verify_container_handle_s verify_container_handle_t;
struct verify_result_s {
int id;
int result;
char *output;
};
typedef struct verify_result_s verify_result_t;
struct verify_work_handle_s {
int id;
pthread_mutex_t *result_mutex;
pthread_cond_t *result_cond;
verify_result_t *result;
ht_queue_t *terminator_queue;
verify_container_handle_t *vch;
};
typedef struct verify_work_handle_s verify_work_handle_t;
struct checker_thread_s {
pthread_t thread;
int id;
int ip_address;
char *email_address;
char *output;
int result;
ht_queue_t *checker_terminator_queue;
verify_work_handle_t *vwh;
};
typedef struct checker_thread_s checker_thread_t;
struct worker_thread_s {
pthread_t thread;
int id;
char *input;
char *output;
pthread_mutex_t *mutex;
pthread_cond_t *cond;
verify_result_t *result;
ht_queue_t *checker_terminator_queue;
int checker_cnt;
ht_queue_t *terminator_queue;
verify_work_handle_t *vwh;
};
typedef struct worker_thread_s worker_thread_t;
struct mydata_s {
int result;
time_t timestamp;
char output[1];
};
typedef struct mydata_s mydata_t;
int verify_init(cfgl_t *cfg, void **handle);
int verify_destroy(void *handle);
int verify_work_setup(void *handle, void **work_handle);
int verify_work(void *handle, void *work_handle, char *input, char *output);
int verify_work_destroy(void *handle, void *work_handle);
static void *worker_thread(void *arg);
static unsigned int *get_mx_ip_addresses(char *domain);
class_descriptor_t verifier = {
"verifier",
&verify_init,
&verify_destroy,
&verify_work_setup,
&verify_work,
&verify_work_destroy
};
/* verify_init will be called when the class is loaded, directly at start-up */
/* It will be called definitely only once, there is definitely only one */
/* container handle for each class in the application. */
int verify_init(cfgl_t *cfg, void **handle) {
verify_container_handle_t *vch;
vch = (verify_container_handle_t*) malloc(sizeof(verify_container_handle_t));
vch->cfg = cfg;
vch->timeout_result = atoi(findcfglx(vch->cfg, "timeout_result", "5"));
vch->timeout_dialog = atoi(findcfglx(vch->cfg, "timeout_dialog", "20"));
vch->sender_address = findcfglx(vch->cfg, "sender_address", "<>");
vch->ehlo_arg = findcfglx(vch->cfg, "ehlo_arg", "local");
vch->smtp_port = atoi(findcfglx(vch->cfg, "smtp_port", "25"));
vch->max_checker_threads = atoi(findcfglx(vch->cfg, "max_checker_threads", "25"));
vch->cache_enabled = atoi(findcfglx(vch->cfg, "cache_enabled", "1"));
vch->cache_expiry = atoi(findcfglx(vch->cfg, "cache_expiry", "86400"));
vch->cache_file = findcfglx(vch->cfg, "cache_file", "verifier_cache");
if (1 == vch->cache_enabled) {
vch->cache_mutex = (pthread_mutex_t*) malloc(sizeof(pthread_mutex_t));
pthread_mutex_init(vch->cache_mutex, NULL);
pthread_mutex_unlock(vch->cache_mutex);
} else {
vch->cache_mutex = NULL;
}
*handle = vch;
return 0;
}
/* currently this will be called never. It would be called when the server */
/* is gracefully shutted down, which is currently not supported */
int verify_destroy(void *handle) {
verify_container_handle_t *vch = (verify_container_handle_t*)handle;
if (1 == vch->cache_enabled) {
pthread_mutex_destroy(vch->cache_mutex);
free(vch->cache_mutex);
}
free(vch);
return 0;
}
int verify_work_setup(void *handle, void **work_handle) {
verify_work_handle_t *vwh;
vwh = (verify_work_handle_t*)malloc(sizeof(verify_work_handle_t));
vwh->id = 0;
vwh->result = (verify_result_t*) malloc(sizeof(verify_result_t));
vwh->result_mutex = (pthread_mutex_t*) malloc(sizeof(pthread_mutex_t));
pthread_mutex_init(vwh->result_mutex, NULL);
pthread_mutex_unlock(vwh->result_mutex);
vwh->result_cond = (pthread_cond_t*) malloc(sizeof(pthread_cond_t));
pthread_cond_init(vwh->result_cond, NULL);
vwh->terminator_queue = (ht_queue_t*) malloc(sizeof(ht_queue_t));
queue_init(vwh->terminator_queue);
vwh->vch = (verify_container_handle_t*)handle;
*work_handle = vwh;
return 0;
}
int verify_work_destroy(void *handle, void *work_handle) {
verify_work_handle_t *vwh = (verify_work_handle_t*)work_handle;
worker_thread_t *wt;
checker_thread_t *ct;
int cnt, cnt2, err;
syslog(LOG_DEBUG, "verify_work_destroy: was %d times used", vwh->id);
/* The terminator_queue must be drained until id_counter is zero again */
cnt = vwh->id;
while (cnt != 0) {
syslog(LOG_DEBUG, "verify_work_destroy, %d thread in queue, waiting for it", cnt);
wt = (worker_thread_t*) queue_get_wait(vwh->terminator_queue);
cnt2 = wt->checker_cnt;
while (cnt2 != 0) {
ct = (checker_thread_t*) queue_get_wait(wt->checker_terminator_queue);
/* clean up the checker stuff */
pthread_join(ct->thread, NULL);
syslog(LOG_DEBUG, "verify_work_destroy, checker_thread (id=%d) joined", ct->id);
free(ct->output);
free(ct);
cnt2--;
}
err = pthread_join(wt->thread, NULL);
syslog(LOG_DEBUG, "verify_work_destroy, worker_thread (id=%d) joined", wt->id);
free(wt->input);
/* this will always be a pointer to something const or allocated, which
will be freed somewhere else */
/* free(wt->output); */
queue_destroy(wt->checker_terminator_queue);
free(wt->checker_terminator_queue);
free(wt);
cnt--;
}
/* Free the members of the work_handle */
pthread_mutex_destroy(vwh->result_mutex);
free(vwh->result_mutex);
pthread_cond_destroy(vwh->result_cond);
free(vwh->result_cond);
free(vwh->result);
queue_destroy(vwh->terminator_queue);
free(vwh->terminator_queue);
free(vwh);
return 0;
}
void cache_insert(verify_container_handle_t *vch, const char *address, int result, const char *output) {
DBM *cache;
datum data, key;
int ret;
mydata_t *mydata;
if (1 == vch->cache_enabled) {
syslog(LOG_DEBUG, "cache_insert: inserting %s -> %d, %s", address, result, output);
key.dsize = strlen(address) + 1; /* one more for the terminating \0 */
key.dptr = (char*) address;
data.dsize = (sizeof(mydata_t) + (sizeof(char) * (strlen(output) + 1)));
mydata = (mydata_t *) malloc(data.dsize);
mydata->result = result;
mydata->timestamp = time(NULL);
strcpy(mydata->output, output);
data.dptr = (char*) mydata;
pthread_mutex_lock(vch->cache_mutex);
if (NULL != (cache = dbm_open(vch->cache_file, O_RDWR | O_CREAT, 0644))) {
ret = dbm_store(cache, key, data, DBM_INSERT);
if (ret != 0) {
syslog(LOG_DEBUG, "cache_insert: couldn't insert record");
} else {
syslog(LOG_DEBUG, "cache_insert: record inserted");
}
dbm_close(cache);
}
pthread_mutex_unlock(vch->cache_mutex);
free(mydata);
}
}
int cache_lookup(verify_container_handle_t *vch, const char* address, int *result, char **output) {
DBM *cache;
datum data, key;
mydata_t *mydata;
int ret, res = -1;
if (1 == vch->cache_enabled) {
syslog(LOG_DEBUG, "cache_lookup: looking up %s, expiry %d",
address, vch->cache_expiry);
if (NULL != (cache = dbm_open(vch->cache_file, O_RDONLY, 0644))) {
key.dsize = strlen(address) + 1; /* one more for the terminating \0 */
key.dptr = (char*) address;
data = dbm_fetch(cache, key);
if (NULL == data.dptr) {
syslog(LOG_DEBUG, "cache_lookup: nothing found");
dbm_close(cache);
} else {
mydata = (mydata_t *) data.dptr;
syslog(LOG_DEBUG, "cache_lookup: found: %s -> %d, %d, %s",
address, mydata->result, mydata->timestamp, mydata->output);
if ((mydata->timestamp + vch->cache_expiry) > time(NULL)) {
syslog(LOG_DEBUG, "cache_lookup: not yet expired");
*result = mydata->result;
*output = (char*) malloc(sizeof(char) * (strlen(mydata->output) + 1));
strcpy(*output, mydata->output);
free(data.dptr);
dbm_close(cache);
res = 0;
} else {
syslog(LOG_DEBUG, "cache_lookup: expired, will be deleted from cache");
free(data.dptr);
dbm_close(cache);
pthread_mutex_lock(vch->cache_mutex);
if (NULL != (cache = dbm_open(vch->cache_file, O_RDWR, 0644))) {
ret = dbm_delete(cache, key);
if (ret != 0) {
syslog(LOG_DEBUG, "cache_insert: couldn't delete record");
} else {
syslog(LOG_DEBUG, "cache_insert: record deleted");
}
dbm_close(cache);
}
pthread_mutex_unlock(vch->cache_mutex);
}
}
}
}
return res;
}
/* Each time verify_work is called, it starts a worker thread. This
thread gets the address to check, an id, a worker termination queue
and a pointer the a result structure as argument.
The id is just a number which is increased each time a new worker
thread is started. The id is also part of the result structure.
verify_work waits a certain amount of time for the result of a
worker thread, afterwards it returns with temporary failure.
A worker thread returns its result by putting it into the result
structure, but it does so only if the id in the result structure is
equal to the id it go as argument. If a worker thread finds an id
in the result structure different from its own one, verify_work has
started a new worker thread in between and is not longer interested
in the result of the current thread.
A thread puts its own argument structure into the termination
queue, just before actually terminating.
verify_work_destroy will drain termination queue, joins each thread
it takes out of it an frees the argument structure. It does so
until the thread count (id) goes to zero.
*/
#define PERM_NOK_RETURN(msg) \
syslog(LOG_ERR, "verify_work: %s", msg); \
snprintf(output, ANSWER_BUFSIZE, "verify_work: %s", msg); \
return SMM_PERM_NOK;
#define TEMP_NOK_RETURN(msg) \
syslog(LOG_ERR, "verify_work: %s", msg); \
snprintf(output, ANSWER_BUFSIZE, "verify_work: %s", msg); \
return SMM_TEMP_NOK;
int verify_work(void *handle, void *work_handle, char *input, char *output) {
int err;
pthread_t tid;
worker_thread_t *wt;
verify_container_handle_t *vch = (verify_container_handle_t*) handle;
verify_work_handle_t *vwh = (verify_work_handle_t*) work_handle;
struct timespec ts;
syslog(LOG_DEBUG, "verify_work: going to verify %s\n", input);
vwh->id += 1;
vwh->result->id = vwh->id;
wt = (worker_thread_t*) malloc(sizeof(worker_thread_t));
wt->id = vwh->id;
wt->terminator_queue = vwh->terminator_queue;
wt->input = (char*) malloc(sizeof(char) * (strlen(input)+1));
strcpy(wt->input, input);
wt->output = NULL;
wt->mutex = vwh->result_mutex;
wt->cond = vwh->result_cond;
wt->result = vwh->result;
wt->checker_terminator_queue = (ht_queue_t*) malloc(sizeof(ht_queue_t));
queue_init(wt->checker_terminator_queue);
wt->checker_cnt = 0;
wt->vwh = work_handle;
syslog(LOG_DEBUG, "verify_work: going to start worker thread, id=%d", vwh->id);
err = pthread_create(&tid, NULL, &worker_thread, wt);
if (-1 == err) {
free(wt->input);
free(wt);
PERM_NOK_RETURN("unable to create worker thread");
}
syslog(LOG_DEBUG, "verify_work: waiting for result");
pthread_mutex_lock(vwh->result_mutex);
ts.tv_sec = time(0) + vch->timeout_result;
ts.tv_nsec = 0;
err = pthread_cond_timedwait(vwh->result_cond, vwh->result_mutex, &ts);
pthread_mutex_unlock(vwh->result_mutex);
if (ETIMEDOUT == err) {
TEMP_NOK_RETURN("worker thread timed out");
}
snprintf(output, ANSWER_BUFSIZE, vwh->result->output);
free(vwh->result->output);
return vwh->result->result;
}
static unsigned int *get_mx_ip_addresses(char *domain) {
mx_rdata_t **mx_rdata, **mx_rdata2;
a_rdata_t **a_rdata, **a_rdata2;
int i = 0;
unsigned int *addresses = NULL;
mx_rdata = get_best_mx_rrs(domain);
if (NULL == mx_rdata) {
syslog(LOG_DEBUG, "get_mx_ip_address: no mx at all");
return NULL;
}
for (mx_rdata2 = mx_rdata; *mx_rdata2 != NULL; mx_rdata2++) {
syslog(LOG_DEBUG, "get_mx_ip_address: %s", (*mx_rdata2)->exchange);
a_rdata = get_a_rrs((*mx_rdata2)->exchange);
if (NULL != a_rdata) {
for (a_rdata2 = a_rdata; *a_rdata2 != NULL; a_rdata2++) {
addresses = (unsigned int*)realloc(addresses, sizeof(unsigned int)*(i+2)); /* 2 since first i==0 and
we always need one more
for the termination */
*(addresses+i) = (*a_rdata2)->address;
i++;
}
free_rrs((void**)a_rdata);
}
}
if (NULL != addresses)
*(addresses+i) = 0; /* termination */
free_rrs((void**)mx_rdata);
return addresses;
}
static void *checker_thread(void *arg) {
static const char *UNEXPECTED_ERROR = "unexpected error in smtp dialog";
static const char *TIMEOUT_ERROR = "timeout on smtp dialog";
static const char *ADDRESS_VALID = "address is valid";
checker_thread_t *ct = (checker_thread_t*) arg;
int timeout = ct->vwh->vch->timeout_dialog;
char *sender_address = ct->vwh->vch->sender_address;
char *ehlo_arg = ct->vwh->vch->ehlo_arg;
int port = ct->vwh->vch->smtp_port;
smtp_t *smtp;
char *response_text, *tmp_arg;
int err, done = 0;
enum {
CONNECT, EHLO, MAILFROM, RCPTTO, RSET, QUIT, END
} state = CONNECT;
syslog(LOG_DEBUG, "checker_thread (id=%d) started, %08x %s", ct->id, ct->ip_address, ct->email_address);
ct->thread = pthread_self();
/*
Connect to ct->ip_address using ct->email_address,
put the result text in ct->output, TBD: by copy or pointer,
evaluate whether it is SMM_TEMP_NOK, SMM_PERM_NOK or SMM_OK
*/
smtp = smtp_init(ct->ip_address, port, timeout);
while ((END != state) && (0 == done)) {
syslog(LOG_DEBUG, "checker_thread (id=%d), smtp dialog state %d", ct->id, state);
switch(state) {
case CONNECT:
err = smtp_connect(smtp);
break;
case EHLO:
err = smtp_ehlo(smtp, ehlo_arg);
break;
case MAILFROM:
err = smtp_mailfrom(smtp, sender_address);
break;
case RCPTTO:
tmp_arg = (char*) malloc(sizeof(char) * (strlen(ct->email_address)+5));
*tmp_arg = '\0';
strcat(tmp_arg, "<");
strcat(tmp_arg, ct->email_address);
strcat(tmp_arg, ">");
err = smtp_rcptto(smtp, tmp_arg);
free(tmp_arg);
break;
case RSET:
err = smtp_rset(smtp);
break;
case QUIT:
err = smtp_quit(smtp);
break;
}
state++;
switch(err) {
case SMTP_TIMEOUT:
syslog(LOG_DEBUG, "checker_thread (id=%d), timeout in smtp dialog", ct->id);
ct->result = SMM_LOCAL_TEMP_NOK;
response_text = (char*)TIMEOUT_ERROR;
done = 1;
break;
case 0:
/* evaluate smtp_response, return or continue */
err = smtp_response(smtp, &response_text);
if (-1 == err) {
syslog(LOG_DEBUG, "checker_thread (id=%d), response could not be parsed", ct->id);
ct->result = SMM_LOCAL_TEMP_NOK;
response_text = (char*)UNEXPECTED_ERROR;
done = 1;
break;
}
syslog(LOG_DEBUG, "checker_thread (id=%d), response: %d, %s (%d)", ct->id, err, response_text, strlen(response_text));
switch(err/100) {
case 4:
ct->result = SMM_LOCAL_TEMP_NOK;
done = 1;
break;
case 5:
ct->result = SMM_LOCAL_PERM_NOK;
done = 1;
break;
case 2:
if (END == state) {
ct->result = SMM_LOCAL_OK;
response_text = (char*)ADDRESS_VALID;
done = 1;
break;
}
}
break;
default:
syslog(LOG_DEBUG, "checker_thread (id=%d), unexpected error in smtp dialog", ct->id);
ct->result = SMM_LOCAL_TEMP_NOK;
response_text = (char*)UNEXPECTED_ERROR;
done = 1;
break;
}
}
smtp_close(smtp);
ct->output = (char*) malloc(sizeof(char) * (strlen(response_text)+1));
strcpy(ct->output, response_text);
smtp_destroy(smtp);
syslog(LOG_DEBUG, "checker_thread (id=%d) goes to terminator queue", ct->id);
queue_put(ct->checker_terminator_queue, ct);
}
static void *worker_thread(void *arg) {
worker_thread_t *wt = (worker_thread_t*) arg;
char *domain_part;
unsigned int *mx_ip_addresses;
int result = SMM_TEMP_NOK;
int i, err;
pthread_t tid;
checker_thread_t *ct = NULL;
char *cached_output = NULL;
static const char *NOT_AN_ADDRESS = "not an email-address, no @ in it";
static const char *DEFAULT_ANSWER = "default answer";
static const char *NO_MX_AVAILABLE = "no MX available";
static const char *NO_PERM_RESULT = "no checker returned permanent result";
syslog(LOG_DEBUG, "worker_thread %d started, %s", wt->id, wt->input);
wt->thread = pthread_self();
/* separate domain part, some sanity checks */
domain_part = strchr(wt->input, '@');
if (NULL == domain_part) {
wt->output = (char*) NOT_AN_ADDRESS;
result = SMM_LOCAL_PERM_NOK;
} else {
if (0 == cache_lookup(wt->vwh->vch, wt->input, &result, &cached_output)) {
syslog(LOG_DEBUG, "worker_thread: got a cached result for %s -> %d, %s",
wt->input, result, cached_output);
wt->output = cached_output;
} else {
domain_part += 1;
syslog(LOG_DEBUG, "worker_thread: looking up %s", domain_part);
mx_ip_addresses = get_mx_ip_addresses(domain_part);
if (NULL == mx_ip_addresses) {
wt->output = (char*) NO_MX_AVAILABLE;
result = SMM_LOCAL_PERM_NOK;
} else {
for (i = 0; (*(mx_ip_addresses+i) != 0) && (i < wt->vwh->vch->max_checker_threads); i++) {
unsigned int address = *(mx_ip_addresses+i);
syslog(LOG_DEBUG, "worker_thread: starting checker thread to %d.%d.%d.%d, for email-address %s",
(address&0xff000000)>>24, (address&0x00ff0000)>>16,
(address&0x0000ff00)>>8, (address&0x000000ff),
wt->input);
ct = (checker_thread_t*) malloc(sizeof(checker_thread_t));
ct->id = i+1; /* id of ct should start with 1, same as id of wt */
ct->ip_address = address;
ct->email_address = wt->input;
ct->output = NULL;
ct->result = SMM_TEMP_NOK;
ct->checker_terminator_queue = wt->checker_terminator_queue;
ct->vwh = wt->vwh;
err = pthread_create(&tid, NULL, &checker_thread, ct);
if (-1 == err) {
syslog(LOG_ERR, "worker_thread: unable to create checker thread");
free(ct);
} else {
wt->checker_cnt += 1;
}
}
free(mx_ip_addresses);
while (wt->checker_cnt > 0) {
ct = (checker_thread_t*) queue_get_wait(wt->checker_terminator_queue);
wt->checker_cnt -= 1;
syslog(LOG_DEBUG, "worker_thread: got checker result for %d.%d.%d.%d: %s -> %d, %s",
((ct->ip_address)&0xff000000)>>24, ((ct->ip_address)&0x00ff0000)>>16,
((ct->ip_address)&0x0000ff00)>>8, ((ct->ip_address)&0x000000ff),
wt->input, ct->result, ct->output);
pthread_join(ct->thread, NULL);
syslog(LOG_DEBUG, "worker_thread: checker thread joined");
if ((SMM_LOCAL_TEMP_NOK != ct->result) &&
(SMM_TEMP_NOK != ct->result)) {
syslog(LOG_DEBUG, "worker_thread: this is a permanent result, returning");
wt->output = ct->output;
result = ct->result;
cache_insert(wt->vwh->vch, wt->input, ct->result, ct->output);
/* ct will be freed later, since its output is needed below */
break; /* exit from the ct-collecting while loop, leave the rest of the ct's
for the cleanup */
} else {
syslog(LOG_DEBUG, "worker_thread: this is a temporary result, continue to wait");
wt->output = (char*) NO_PERM_RESULT;
result = SMM_LOCAL_TEMP_NOK;
/* we've collected the ct but its output is not longer need, so free it here */
free(ct->output);
free(ct);
ct = NULL;
}
} /* ct-collecting while */
} /* else: cache lookup */
} /* else: no mx available */
} /* else: not an address */
/* -------------------------------------------------------------------- */
syslog(LOG_DEBUG, "worker_thread %d waiting for mutex", wt->id);
pthread_mutex_lock(wt->mutex);
if (wt->result->id == wt->id) {
syslog(LOG_DEBUG, "worker_thread %d returned result", wt->id);
/* we can write the result */
wt->result->output = (char*) malloc(sizeof(char) * (strlen(wt->output)+15)); /* enough for the output
plus <><> and prefix */
switch (result) {
case SMM_LOCAL_TEMP_NOK:
sprintf(wt->result->output, "<TNOK><%s>", wt->output);
result = SMM_OK;
break;
case SMM_LOCAL_PERM_NOK:
sprintf(wt->result->output, "<NOK><%s>", wt->output);
result = SMM_OK;
break;
case SMM_LOCAL_OK:
sprintf(wt->result->output, "<OK><%s>", wt->output);
result = SMM_OK;
break;
default:
strcpy(wt->result->output, wt->output);
break;
}
wt->result->result = result;
pthread_cond_signal(wt->cond);
} else {
syslog(LOG_DEBUG, "worker_thread %d drops result", wt->id);
/* result not longer interested */
}
pthread_mutex_unlock(wt->mutex);
/* we've collected the ct, so we need to free it.
free only if not NULL to avoid double-free in case of
no permenant result at all */
if (NULL != ct) {
free(ct->output);
free(ct);
}
/* if cached_output is not NULL, it was allocated for us and
we need to free it */
if (NULL != cached_output) {
free(cached_output);
}
syslog(LOG_DEBUG, "worker_thread %d goes to terminator queue", wt->id);
queue_put(wt->terminator_queue, wt);
}