Files
smmapdfw/smmapdfw/verify_worker/verify_worker.c
whottgen f0928a82fc fix
2007-08-28 09:32:39 +00:00

945 lines
29 KiB
C

/*
Copyright (C) 2004,2005 Wolfgang Hottgenroth
This file is part of smmapdfw.
smmapdfw is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
smmapdfw is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
License for more details.
You should have received a copy of the GNU General Public License
along with smmapdfw. If not, write to the Free Software Foundation, 59
Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#if HAVE_CONFIG_H
# include "config.h"
#endif
#include <stdlib.h>
#include <syslog.h>
#include <pthread.h>
#include <errno.h>
#include <time.h>
#include <string.h>
#include <sys/types.h>
#include <fcntl.h>
#include <stdio.h>
#include <signal.h>
#include "htcache.h"
#include "containers_public.h"
#include "smmapd.h"
#include "htdns.h"
#include "queue.h"
#include "smtp.h"
#include "htmalloc.h"
#include "htbuffer.h"
#include "stats.h"
#include "dvl.h"
#define SMM_LOCAL_PERM_NOK 101
#define SMM_LOCAL_TEMP_NOK 102
#define SMM_LOCAL_OK 103
/* --- statistics counter --- */
#define STAT_CNT_VERIFIER_CACHE 0
#define STAT_CNT_VERIFIER_WORKER_R_THREADS 1
#define STAT_CNT_VERIFIER_WORKER_THREADS 2
#define STAT_CNT_VERIFIER_WORKER_THREADS_FAILED 3
#define STAT_CNT_VERIFIER_WORKER_THREADS_TIMEOUT 4
#define STAT_CNT_VERIFIER_CHECKER_R_THREADS 5
#define STAT_CNT_VERIFIER_CHECKER_THREADS 6
#define STAT_CNT_VERIFIER_CHECKER_THREADS_FAILED 7
#define STAT_CNT_VERIFIER_ANSWERED_FROM_CACHE 8
#define STAT_CNT_VERIFIER_RETURNED_OK 9
#define STAT_CNT_VERIFIER_RETURNED_NOK 10
#define STAT_CNT_VERIFIER_RETURNED_TNOK 11
counterDef_t *verify_counterDefs = NULL;
void verify_setupCounterList() {
verify_counterDefs = addOneCounterDef(verify_counterDefs, SMM_TYPE_GAUGE, "Verifier: entries in cache");
verify_counterDefs = addOneCounterDef(verify_counterDefs, SMM_TYPE_GAUGE, "Verifier: running worker threads");
verify_counterDefs = addOneCounterDef(verify_counterDefs, SMM_TYPE_COUNTER, "Verifier: worker threads");
verify_counterDefs = addOneCounterDef(verify_counterDefs, SMM_TYPE_COUNTER, "Verifier: failed to start, worker threads");
verify_counterDefs = addOneCounterDef(verify_counterDefs, SMM_TYPE_COUNTER, "Verifier: timed out worker threads");
verify_counterDefs = addOneCounterDef(verify_counterDefs, SMM_TYPE_GAUGE, "Verifier: running checker threads");
verify_counterDefs = addOneCounterDef(verify_counterDefs, SMM_TYPE_COUNTER, "Verifier: checker threads");
verify_counterDefs = addOneCounterDef(verify_counterDefs, SMM_TYPE_COUNTER, "Verifier: failed to start, checker threads");
verify_counterDefs = addOneCounterDef(verify_counterDefs, SMM_TYPE_COUNTER, "Verifier: answered from cache");
verify_counterDefs = addOneCounterDef(verify_counterDefs, SMM_TYPE_COUNTER, "Verifier: returned OK");
verify_counterDefs = addOneCounterDef(verify_counterDefs, SMM_TYPE_COUNTER, "Verifier: returned NOK");
verify_counterDefs = addOneCounterDef(verify_counterDefs, SMM_TYPE_COUNTER, "Verifier: returned TNOK");
};
struct verify_container_handle_s {
cfgl_t *cfg;
statCounter_t *statCounter;
int timeout_result;
int timeout_dialog;
htcache_t *cache;
char *sender_address;
char *helo_arg;
int smtp_port;
int max_checker_threads;
};
typedef struct verify_container_handle_s verify_container_handle_t;
struct verify_result_s {
int id;
int result;
char *output;
};
typedef struct verify_result_s verify_result_t;
struct verify_work_handle_s {
int id;
pthread_mutex_t *result_mutex;
pthread_cond_t *result_cond;
verify_result_t *result;
ht_queue_t *terminator_queue;
verify_container_handle_t *vch;
int worker_cnt;
};
typedef struct verify_work_handle_s verify_work_handle_t;
struct checker_thread_s {
pthread_t thread;
int id;
int ip_address;
char *email_address;
char *output;
int result;
ht_queue_t *checker_terminator_queue;
verify_work_handle_t *vwh;
};
typedef struct checker_thread_s checker_thread_t;
struct worker_thread_s {
pthread_t thread;
int id;
char *input;
char *output;
pthread_mutex_t *mutex;
pthread_cond_t *cond;
verify_result_t *result;
ht_queue_t *checker_terminator_queue;
int checker_cnt;
ht_queue_t *terminator_queue;
verify_work_handle_t *vwh;
char* rememberPropagatedOutput;
};
typedef struct worker_thread_s worker_thread_t;
struct mydata_s {
int result;
char output[1];
};
typedef struct mydata_s mydata_t;
int verify_init(cfgl_t *cfg, void **handle);
int verify_destroy(void *handle);
int verify_work_setup(void *handle, void **work_handle);
int verify_work(void *handle, void *work_handle, char *input, htbuffer_t *output);
int verify_work_destroy(void *handle, void *work_handle);
static void *worker_thread(void *arg);
static unsigned int *get_mx_ip_addresses(char *domain);
class_descriptor_t verifier = {
"verifier",
&verify_init,
&verify_destroy,
&verify_work_setup,
&verify_work,
&verify_work_destroy
};
/* verify_init will be called when the class is loaded, directly at start-up */
/* It will be called definitely only once, there is definitely only one */
/* container handle for each class in the application. */
int verify_init(cfgl_t *cfg, void **handle) {
verify_container_handle_t *vch;
int cache_enabled, cache_expiry;
char *cache_file;
vch = (verify_container_handle_t*) htmalloc(sizeof(verify_container_handle_t));
vch->cfg = cfg;
vch->timeout_result = atoi(findcfglx(vch->cfg, "timeout_result", "5"));
vch->timeout_dialog = atoi(findcfglx(vch->cfg, "timeout_dialog", "20"));
vch->sender_address = findcfglx(vch->cfg, "sender_address", "<>");
vch->helo_arg = findcfglx(vch->cfg, "helo_arg", "local");
vch->smtp_port = atoi(findcfglx(vch->cfg, "smtp_port", "25"));
vch->max_checker_threads = atoi(findcfglx(vch->cfg, "max_checker_threads", "25"));
cache_enabled = atoi(findcfglx(vch->cfg, "cache_enabled", "1"));
cache_expiry = atoi(findcfglx(vch->cfg, "cache_expiry", "86400"));
cache_file = findcfglx(vch->cfg, "cache_file", "verifier_cache");
if (1 == cache_enabled) {
vch->cache = htcache_init(cache_file, cache_expiry);
} else {
vch->cache = NULL;
}
verify_setupCounterList();
vch->statCounter = initStatCounter("verifier", verify_counterDefs);
*handle = vch;
return 0;
}
/* currently this will be called never. It would be called when the server */
/* is gracefully shutted down, which is currently not supported */
int verify_destroy(void *handle) {
verify_container_handle_t *vch = (verify_container_handle_t*)handle;
if (NULL != vch->cache) {
htcache_destroy(vch->cache);
}
/* FIXME The statCounter structure should be freed too, however, as this won't ever be called ... */
free(vch);
return 0;
}
int verify_work_setup(void *handle, void **work_handle) {
verify_work_handle_t *vwh;
vwh = (verify_work_handle_t*)htmalloc(sizeof(verify_work_handle_t));
vwh->id = 0;
vwh->result = (verify_result_t*) htmalloc(sizeof(verify_result_t));
vwh->result_mutex = (pthread_mutex_t*) htmalloc(sizeof(pthread_mutex_t));
pthread_mutex_init(vwh->result_mutex, NULL);
pthread_mutex_unlock(vwh->result_mutex);
vwh->result_cond = (pthread_cond_t*) htmalloc(sizeof(pthread_cond_t));
pthread_cond_init(vwh->result_cond, NULL);
vwh->terminator_queue = (ht_queue_t*) htmalloc(sizeof(ht_queue_t));
queue_init(vwh->terminator_queue);
vwh->worker_cnt = 0;
vwh->vch = (verify_container_handle_t*)handle;
*work_handle = vwh;
return 0;
}
int verify_work_destroy(void *handle, void *work_handle) {
verify_container_handle_t *vch = (verify_container_handle_t*)handle;
verify_work_handle_t *vwh = (verify_work_handle_t*)work_handle;
worker_thread_t *wt;
checker_thread_t *ct;
int err;
syslog(LOG_DEBUG, "verify (%p) verify_work_destroy: was %d times used", work_handle, vwh->id);
while (vwh->worker_cnt != 0) {
syslog(LOG_DEBUG, "verify (%p) verify_work_destroy, %d worker thread in queue, waiting for it",
work_handle, vwh->worker_cnt);
wt = (worker_thread_t*) queue_get_wait(vwh->terminator_queue);
while (wt->checker_cnt != 0) {
syslog(LOG_DEBUG, "verify (%p) verify_work_destroy, %d checker thread in queue, waiting for it",
work_handle, wt->checker_cnt);
ct = (checker_thread_t*) queue_get_wait(wt->checker_terminator_queue);
/* clean up the checker stuff */
pthread_join(ct->thread, NULL);
decStatCounter(vwh->vch->statCounter, STAT_CNT_VERIFIER_CHECKER_R_THREADS);
syslog(LOG_DEBUG, "verify (%p) verify_work_destroy, checker_thread (id=%d) joined",
work_handle, ct->id);
free(ct->output);
free(ct);
wt->checker_cnt--;
}
err = pthread_join(wt->thread, NULL);
decStatCounter(vwh->vch->statCounter, STAT_CNT_VERIFIER_WORKER_R_THREADS);
syslog(LOG_DEBUG, "verify (%p) verify_work_destroy, worker_thread (id=%d) joined",
work_handle, wt->id);
free(wt->input);
/* this will always be a pointer to something const or allocated, which
will be freed somewhere else */
/* free(wt->output); */
queue_destroy(wt->checker_terminator_queue);
free(wt->checker_terminator_queue);
if (NULL != wt->rememberPropagatedOutput)
free(wt->rememberPropagatedOutput);
free(wt);
vwh->worker_cnt--;
}
/* Free the members of the work_handle */
pthread_mutex_destroy(vwh->result_mutex);
free(vwh->result_mutex);
pthread_cond_destroy(vwh->result_cond);
free(vwh->result_cond);
free(vwh->result);
queue_destroy(vwh->terminator_queue);
free(vwh->terminator_queue);
free(vwh);
return 0;
}
void cache_insert(verify_work_handle_t *vwh, const char *address, int result, const char *output) {
int mydata_size;
mydata_t *mydata;
if (vwh == NULL) {
syslog(LOG_CRIT, "cache_insert: vwh is NULL");
return;
}
if (address == NULL) {
syslog(LOG_CRIT, "cache_insert: address is NULL");
return;
}
if (output == NULL) {
syslog(LOG_CRIT, "cache_insert: output is NULL");
return;
}
if (NULL != vwh->vch->cache) {
syslog(LOG_DEBUG, "verify (%p) cache_insert: inserting %s -> %d, %s", vwh, address, result, output);
mydata_size = (sizeof(mydata_t) + (sizeof(char) * (strlen(output) + 1)));
mydata = (mydata_t *) htmalloc(mydata_size);
mydata->result = result;
strcpy(mydata->output, output);
htcache_insert(vwh->vch->cache, address, (const char*) mydata, mydata_size);
free(mydata);
}
}
int cache_lookup(verify_work_handle_t *vwh, const char* address, int *result, char **output) {
mydata_t *mydata;
int ret, res = -1;
int mydata_size;
if (vwh == NULL) {
syslog(LOG_CRIT, "cache_lookup: vwh is NULL");
return res;
}
if (address == NULL) {
syslog(LOG_CRIT, "cache_lookup: address is NULL");
return res;
}
if (NULL != vwh->vch->cache) {
syslog(LOG_DEBUG, "verify (%p) cache_lookup: Looking up %s", vwh, address);
ret = htcache_lookup(vwh->vch->cache, address, (char**) &mydata, &mydata_size);
if (0 == ret) {
*result = mydata->result;
*output = (char*) htmalloc(sizeof(char) * (strlen(mydata->output) + 1));
strcpy(*output, mydata->output);
res = 0;
free(mydata);
syslog(LOG_DEBUG, "verify (%p) cache_lookup: found %d %s", vwh, *result, *output);
}
}
return res;
}
/* Each time verify_work is called, it starts a worker thread. This
thread gets the address to check, an id, a worker termination queue
and a pointer the a result structure as argument.
The id is just a number which is increased each time a new worker
thread is started. The id is also part of the result structure.
verify_work waits a certain amount of time for the result of a
worker thread, afterwards it returns with temporary failure.
A worker thread returns its result by putting it into the result
structure, but it does so only if the id in the result structure is
equal to the id it go as argument. If a worker thread finds an id
in the result structure different from its own one, verify_work has
started a new worker thread in between and is not longer interested
in the result of the current thread.
A thread puts its own argument structure into the termination
queue, just before actually terminating.
verify_work_destroy will drain termination queue, joins each thread
it takes out of it an frees the argument structure. It does so
until the thread count (id) goes to zero.
*/
#define PERM_NOK_RETURN(msg) \
syslog(LOG_ERR, "verify (%p) verify_work: %s", vwh, msg); \
htbuffer_strcat(output, "verify_work: "); \
htbuffer_strcat(output, msg); \
return SMM_PERM_NOK;
#define TEMP_NOK_RETURN(msg) \
syslog(LOG_ERR, "verify (%p) verify_work: %s", vwh, msg); \
htbuffer_strcat(output, "verify_work: "); \
htbuffer_strcat(output, msg); \
return SMM_TEMP_NOK;
int verify_work(void *handle, void *work_handle, char *input, htbuffer_t *output) {
int err;
pthread_t tid;
worker_thread_t *wt;
verify_container_handle_t *vch = (verify_container_handle_t*) handle;
verify_work_handle_t *vwh = (verify_work_handle_t*) work_handle;
struct timespec ts;
if (handle == NULL) {
TEMP_NOK_RETURN("handle is NULL");
}
if (work_handle == NULL) {
TEMP_NOK_RETURN("work_handle is NULL");
}
if (input == NULL) {
TEMP_NOK_RETURN("input is NULL");
}
if (output == NULL) {
TEMP_NOK_RETURN("output is NULL");
}
syslog(LOG_DEBUG, "verify (%p) verify_work: going to verify %s\n", vwh, input);
vwh->id += 1;
vwh->result->id = vwh->id;
wt = (worker_thread_t*) htmalloc(sizeof(worker_thread_t));
wt->id = vwh->id;
wt->terminator_queue = vwh->terminator_queue;
wt->input = (char*) htmalloc(sizeof(char) * (strlen(input)+1));
strcpy(wt->input, input);
wt->output = NULL;
wt->mutex = vwh->result_mutex;
wt->cond = vwh->result_cond;
wt->result = vwh->result;
wt->rememberPropagatedOutput = NULL;
wt->checker_terminator_queue = (ht_queue_t*) htmalloc(sizeof(ht_queue_t));
queue_init(wt->checker_terminator_queue);
wt->checker_cnt = 0;
wt->vwh = work_handle;
syslog(LOG_DEBUG, "verify (%p) verify_work: going to start worker thread, id=%d",
vwh, vwh->id);
pthread_mutex_lock(vwh->result_mutex);
err = pthread_create(&tid, NULL, &worker_thread, wt);
if (0 != err) {
incStatCounter(vch->statCounter, STAT_CNT_VERIFIER_WORKER_THREADS_FAILED);
free(wt->input);
free(wt);
wt = NULL;
pthread_mutex_unlock(vwh->result_mutex);
PERM_NOK_RETURN("unable to create worker thread");
} else {
vwh->worker_cnt++;
}
syslog(LOG_DEBUG, "verify (%p) verify_work: waiting for result", vwh);
ts.tv_sec = time(0) + vch->timeout_result;
ts.tv_nsec = 0;
err = pthread_cond_timedwait(vwh->result_cond, vwh->result_mutex, &ts);
pthread_mutex_unlock(vwh->result_mutex);
if (ETIMEDOUT == err) {
incStatCounter(vwh->vch->statCounter, STAT_CNT_VERIFIER_WORKER_THREADS_TIMEOUT);
TEMP_NOK_RETURN("worker thread timed out");
}
// snprintf(output, ANSWER_BUFSIZE, vwh->result->output);
htbuffer_strcpy(output, vwh->result->output);
/* this belongs to the worker thread, it will be freed when the worker thread is cleaned up */
/* free(vwh->result->output); */
/* vwh->result->output = NULL; */
return vwh->result->result;
}
static void inner_resolve(char *domain, unsigned int **addresses, int *num_of_addresses) {
if (domain != NULL) {
syslog(LOG_DEBUG, "inner_resolve: %s", domain);
a_rdata_t **a_rdata = get_a_rrs(domain);
a_rdata_t **a_rdata2;
if (NULL != a_rdata) {
for (a_rdata2 = a_rdata; *a_rdata2 != NULL; a_rdata2++) {
syslog(LOG_DEBUG, "inner_resolve: arranging result");
*addresses = (unsigned int*)htrealloc(*addresses, sizeof(unsigned int)*(*num_of_addresses + 2)); /* 2 since first i==0 and
we always need one more
for the termination */
*(*addresses + *num_of_addresses) = (*a_rdata2)->address;
*num_of_addresses += 1;
}
free_rrs((void**)a_rdata);
}
}
}
static unsigned int *get_mx_ip_addresses(char *domain) {
mx_rdata_t **mx_rdata, **mx_rdata2;
// a_rdata_t **a_rdata, **a_rdata2;
int i = 0;
unsigned int *addresses = NULL;
if (NULL == domain) {
syslog(LOG_CRIT, "get_mx_ip_addresses: domain is NULL");
return NULL;
}
mx_rdata = get_best_mx_rrs(domain);
if (NULL != mx_rdata) {
for (mx_rdata2 = mx_rdata; *mx_rdata2 != NULL; mx_rdata2++) {
inner_resolve((*mx_rdata2)->exchange, &addresses, &i);
}
free_rrs((void**)mx_rdata);
} else {
syslog(LOG_DEBUG, "get_mx_ip_address: no mx at all, trying a-rr");
inner_resolve(domain, &addresses, &i);
}
if (NULL != addresses)
*(addresses+i) = 0; /* termination */
syslog(LOG_DEBUG, "get_mx_ip_addresses: Found %d addresses", i);
int j;
for (j = 0; j < i; j++)
syslog(LOG_DEBUG, "get_mx_ip_addresses: %d.%d.%d.%d",
(htonl(*(addresses+j))&0xff000000)>>24, (htonl(*(addresses+j))&0x00ff0000)>>16,
(htonl(*(addresses+j))&0x0000ff00)>>8, (htonl(*(addresses+j))&0x000000ff));
return addresses;
}
static unsigned int *get_hostname_ip_addresses(char *hostname) {
if (NULL == hostname) {
syslog(LOG_CRIT, "get_hostname_ip_addresses: hostname is NULL");
return NULL;
}
unsigned int *addresses = NULL;
int i;
inner_resolve(hostname, &addresses, &i);
if (NULL != addresses)
*(addresses+i) = 0; /* termination */
syslog(LOG_DEBUG, "get_hostname_ip_addresses: Found %d addresses", i);
int j;
for (j = 0; j < i; j++)
syslog(LOG_DEBUG, "get_hostname_ip_addresses: %d.%d.%d.%d",
(htonl(*(addresses+j))&0xff000000)>>24, (htonl(*(addresses+j))&0x00ff0000)>>16,
(htonl(*(addresses+j))&0x0000ff00)>>8, (htonl(*(addresses+j))&0x000000ff));
return addresses;
}
static void *checker_thread(void *arg) {
if (arg == NULL) {
syslog(LOG_CRIT, "checker_thread: arg is NULL");
return NULL;
}
static const char *UNEXPECTED_ERROR = "unexpected error in smtp dialog";
static const char *TIMEOUT_ERROR = "timeout on smtp dialog";
static const char *ADDRESS_VALID = "address is valid";
checker_thread_t *ct = (checker_thread_t*) arg;
int timeout = ct->vwh->vch->timeout_dialog;
char *sender_address = ct->vwh->vch->sender_address;
char *helo_arg = ct->vwh->vch->helo_arg;
int port = ct->vwh->vch->smtp_port;
smtp_t *smtp;
char *response_text, *tmp_arg;
int err, done = 0;
enum {
CONNECT, EHLO, MAILFROM, RCPTTO, RSET, QUIT, END
} state = CONNECT;
syslog(LOG_DEBUG, "verify (%p) checker_thread (id=%d) started, %08x %s",
ct->vwh, ct->id, ct->ip_address, ct->email_address);
ct->thread = pthread_self();
incStatCounter(ct->vwh->vch->statCounter, STAT_CNT_VERIFIER_CHECKER_THREADS);
incStatCounter(ct->vwh->vch->statCounter, STAT_CNT_VERIFIER_CHECKER_R_THREADS);
/*
Connect to ct->ip_address using ct->email_address,
put the result text in ct->output, TBD: by copy or pointer,
evaluate whether it is SMM_TEMP_NOK, SMM_PERM_NOK or SMM_OK
*/
smtp = smtp_init(ct->ip_address, port, timeout);
while ((END != state) && (0 == done)) {
syslog(LOG_DEBUG, "verify (%p) checker_thread (id=%d), smtp dialog state %d",
ct->vwh, ct->id, state);
switch(state) {
case CONNECT:
err = smtp_connect(smtp);
break;
case EHLO:
err = smtp_helo(smtp, helo_arg);
break;
case MAILFROM:
err = smtp_mailfrom(smtp, sender_address);
break;
case RCPTTO:
tmp_arg = (char*) htmalloc(sizeof(char) * (strlen(ct->email_address)+5));
*tmp_arg = '\0';
strcat(tmp_arg, "<");
strcat(tmp_arg, ct->email_address);
strcat(tmp_arg, ">");
err = smtp_rcptto(smtp, tmp_arg);
free(tmp_arg);
break;
case RSET:
err = smtp_rset(smtp);
break;
case QUIT:
err = smtp_quit(smtp);
break;
}
state++;
switch(err) {
case SMTP_TIMEOUT:
syslog(LOG_DEBUG, "verify (%p) checker_thread (id=%d), timeout in smtp dialog",
ct->vwh, ct->id);
ct->result = SMM_LOCAL_TEMP_NOK;
response_text = (char*)TIMEOUT_ERROR;
done = 1;
break;
case 0:
/* evaluate smtp_response, return or continue */
err = smtp_response(smtp, &response_text);
if (-1 == err) {
syslog(LOG_DEBUG, "verify (%p) checker_thread (id=%d), response could not be parsed",
ct->vwh, ct->id);
ct->result = SMM_LOCAL_TEMP_NOK;
response_text = (char*)UNEXPECTED_ERROR;
done = 1;
break;
}
syslog(LOG_DEBUG, "verify (%p) checker_thread (id=%d), response: %d, %s (%d)",
ct->vwh, ct->id, err, response_text, strlen(response_text));
switch(err/100) {
case 4:
ct->result = SMM_LOCAL_TEMP_NOK;
done = 1;
break;
case 5:
ct->result = SMM_LOCAL_PERM_NOK;
done = 1;
break;
case 2:
if (END == state) {
ct->result = SMM_LOCAL_OK;
response_text = (char*)ADDRESS_VALID;
done = 1;
break;
}
}
break;
default:
syslog(LOG_DEBUG, "verify (%p) checker_thread (id=%d), unexpected error in smtp dialog",
ct->vwh, ct->id);
ct->result = SMM_LOCAL_TEMP_NOK;
response_text = (char*)UNEXPECTED_ERROR;
done = 1;
break;
}
}
smtp_close(smtp);
ct->output = (char*) htmalloc(sizeof(char) * (strlen(response_text)+1));
strcpy(ct->output, response_text);
smtp_destroy(smtp);
syslog(LOG_DEBUG, "verify (%p) checker_thread (id=%d) goes to terminator queue",
ct->vwh, ct->id);
queue_put(ct->checker_terminator_queue, ct);
}
static void *worker_thread(void *arg) {
if (arg == NULL) {
syslog(LOG_CRIT, "worker_thread: arg is NULL");
return NULL;
}
worker_thread_t *wt = (worker_thread_t*) arg;
char *domain_part, *hostname_part;
unsigned int *mx_ip_addresses;
int result = SMM_TEMP_NOK;
int i, err;
pthread_t tid;
checker_thread_t *ct = NULL;
char *cached_output = NULL;
static const char *NOT_AN_ADDRESS = "not an email-address, no @ in it";
static const char *DEFAULT_ANSWER = "default answer";
static const char *NO_MX_AVAILABLE = "no MX available";
static const char *ILLEGAL_MX_ADDRESS = "0.0.0.0 can not be used at all";
static const char *NO_PERM_RESULT = "no checker returned permanent result";
static const char *INTERNAL_ERROR = "verifier internal error";
syslog(LOG_DEBUG, "verify (%p) worker_thread %d started, %s",
wt->vwh, wt->id, wt->input);
wt->thread = pthread_self();
incStatCounter(wt->vwh->vch->statCounter, STAT_CNT_VERIFIER_WORKER_R_THREADS);
incStatCounter(wt->vwh->vch->statCounter, STAT_CNT_VERIFIER_WORKER_THREADS);
/* separate domain part, some sanity checks */
domain_part = strchr(wt->input, '@');
if (NULL == domain_part) {
wt->output = (char*) NOT_AN_ADDRESS;
result = SMM_LOCAL_PERM_NOK;
} else {
if (0 == cache_lookup(wt->vwh, wt->input, &result, &cached_output)) {
syslog(LOG_DEBUG, "verify (%p) worker_thread: got a cached result for %s -> %d, %s",
wt->vwh, wt->input, result, cached_output);
incStatCounter(wt->vwh->vch->statCounter, STAT_CNT_VERIFIER_ANSWERED_FROM_CACHE);
wt->output = cached_output;
} else {
domain_part += 1;
hostname_part = strchr(domain_part, ' ');
if (hostname_part) {
*hostname_part = '\0';
hostname_part += 1;
mx_ip_addresses = get_hostname_ip_addresses(hostname_part);
} else {
syslog(LOG_DEBUG, "verify (%p) worker_thread: looking up %s by MX",
wt->vwh, domain_part);
mx_ip_addresses = get_mx_ip_addresses(domain_part);
}
if (NULL == mx_ip_addresses) {
wt->output = (char*) NO_MX_AVAILABLE;
result = SMM_LOCAL_PERM_NOK;
} else if (0 == *mx_ip_addresses) { /* 0.0.0.0 is strange enough for a special case */
wt->output = (char*) ILLEGAL_MX_ADDRESS;
result = SMM_LOCAL_PERM_NOK;
} else {
for (i = 0; (*(mx_ip_addresses+i) != 0) && (i < wt->vwh->vch->max_checker_threads); i++) {
unsigned int address = *(mx_ip_addresses+i);
ct = (checker_thread_t*) htmalloc(sizeof(checker_thread_t));
ct->id = i+1; /* id of ct should start with 1, same as id of wt */
ct->ip_address = address;
ct->email_address = wt->input;
ct->output = NULL;
ct->result = SMM_TEMP_NOK;
ct->checker_terminator_queue = wt->checker_terminator_queue;
ct->vwh = wt->vwh;
syslog(LOG_DEBUG, "verify (%p) worker_thread: starting checker thread (%p) to %d.%d.%d.%d, for email-address %s",
wt->vwh,
ct,
(htonl(address)&0xff000000)>>24, (htonl(address)&0x00ff0000)>>16,
(htonl(address)&0x0000ff00)>>8, (htonl(address)&0x000000ff),
wt->input);
err = pthread_create(&tid, NULL, &checker_thread, ct);
if (0 != err) {
incStatCounter(ct->vwh->vch->statCounter, STAT_CNT_VERIFIER_CHECKER_THREADS_FAILED);
syslog(LOG_ERR, "verify (%p) worker_thread: unable to create checker thread: %d", wt->vwh, err);
free(ct);
ct = NULL;
wt->output = (char*) INTERNAL_ERROR;
result = SMM_LOCAL_TEMP_NOK;
} else {
wt->checker_cnt += 1;
}
}
free(mx_ip_addresses);
while (wt->checker_cnt > 0) {
ct = (checker_thread_t*) queue_get_wait(wt->checker_terminator_queue);
wt->checker_cnt -= 1;
syslog(LOG_DEBUG, "verify (%p) worker_thread: got checker result for %d.%d.%d.%d: %s -> %d, %s",
wt->vwh,
(htonl(ct->ip_address)&0xff000000)>>24, (htonl(ct->ip_address)&0x00ff0000)>>16,
(htonl(ct->ip_address)&0x0000ff00)>>8, (htonl(ct->ip_address)&0x000000ff),
wt->input, ct->result, ct->output);
pthread_join(ct->thread, NULL);
decStatCounter(ct->vwh->vch->statCounter, STAT_CNT_VERIFIER_CHECKER_R_THREADS);
syslog(LOG_DEBUG, "verify (%p) worker_thread: checker thread joined", wt->vwh);
if ((SMM_LOCAL_TEMP_NOK != ct->result) &&
(SMM_TEMP_NOK != ct->result)) {
syslog(LOG_DEBUG, "verify (%p) worker_thread: this is a permanent result, returning", wt->vwh);
wt->output = ct->output;
result = ct->result;
/* re-concatenate hostname_part */
if (hostname_part) {
hostname_part--;
*hostname_part = ' ';
}
cache_insert(wt->vwh, wt->input, ct->result, ct->output);
/* ct will be freed later, since its output is needed below */
break; /* exit from the ct-collecting while loop, leave the rest of the ct's
for the cleanup */
} else {
syslog(LOG_DEBUG, "verify (%p) worker_thread: this is a temporary result, continue to wait",
wt->vwh);
wt->output = (char*) NO_PERM_RESULT;
result = SMM_LOCAL_TEMP_NOK;
/* we've collected the ct but its output is not longer need, so free it here */
free(ct->output);
free(ct);
ct = NULL;
}
} /* ct-collecting while */
} /* else: cache lookup */
} /* else: no mx available */
} /* else: not an address */
/* -------------------------------------------------------------------- */
syslog(LOG_DEBUG, "verify (%p) worker_thread %d waiting for mutex",
wt->vwh, wt->id);
pthread_mutex_lock(wt->mutex);
if (wt->result->id == wt->id) {
syslog(LOG_DEBUG, "verify (%p) worker_thread %d returned result", wt->vwh, wt->id);
/* we can write the result */
wt->result->output = (char*) htmalloc(sizeof(char) * (strlen(wt->output)+15)); /* enough for the output
plus <><> and prefix */
wt->rememberPropagatedOutput = wt->result->output;
switch (result) {
case SMM_LOCAL_TEMP_NOK:
sprintf(wt->result->output, "<TNOK><%s>", wt->output);
incStatCounter(wt->vwh->vch->statCounter, STAT_CNT_VERIFIER_RETURNED_TNOK);
result = SMM_OK;
break;
case SMM_LOCAL_PERM_NOK:
sprintf(wt->result->output, "<NOK><%s>", wt->output);
incStatCounter(wt->vwh->vch->statCounter, STAT_CNT_VERIFIER_RETURNED_NOK);
result = SMM_OK;
break;
case SMM_LOCAL_OK:
sprintf(wt->result->output, "<OK><%s>", wt->output);
incStatCounter(wt->vwh->vch->statCounter, STAT_CNT_VERIFIER_RETURNED_OK);
result = SMM_OK;
break;
default:
strcpy(wt->result->output, wt->output);
break;
}
wt->result->result = result;
pthread_cond_signal(wt->cond);
} else {
syslog(LOG_DEBUG, "verify (%p) worker_thread %d drops result", wt->vwh, wt->id);
/* result not longer interested */
}
pthread_mutex_unlock(wt->mutex);
/* we've collected the ct, so we need to free it.
free only if not NULL to avoid double-free in case of
no permenant result at all */
if (NULL != ct) {
free(ct->output);
free(ct);
}
/* if cached_output is not NULL, it was allocated for us and
we need to free it */
if (NULL != cached_output) {
free(cached_output);
}
syslog(LOG_DEBUG, "verify (%p) worker_thread %d goes to terminator queue",
wt->vwh, wt->id);
queue_put(wt->terminator_queue, wt);
}