may be I found and fixed the memory leak ... may be

This commit is contained in:
whottgen 2006-09-19 11:06:07 +00:00
parent 5a26ac47f4
commit 1aa603aff2
2 changed files with 90 additions and 58 deletions

View File

@ -6,7 +6,7 @@ port = 8887
; plugin_dir = /home/who/Sources/sf/smmapdfw
; plugins = test_worker1 test_worker2 verifier cyruscheck lua_worker
plugins = verifier
enable_stats = 1
enable_stats = 0
enable_snmp = 1
; disables the netstring codec, only for debugging in the development

View File

@ -50,6 +50,7 @@
#include "htbuffer.h"
#include "stats.h"
#include "dvl.h"
#define SMM_LOCAL_PERM_NOK 101
#define SMM_LOCAL_TEMP_NOK 102
@ -124,28 +125,11 @@ struct verify_work_handle_s {
ht_queue_t *terminator_queue;
verify_container_handle_t *vch;
int whereAreWe1;
int whereAreWe2;
int worker_cnt;
};
typedef struct verify_work_handle_s verify_work_handle_t;
/* --- debugging ------------------------------------- */
verify_work_handle_t *lastVerifyWorkHandle = NULL;
void showLastVerifyWorkHandleDetails(int signum) {
printf("showLastVerifyWorkHandleDetails:\n");
printf(" address: %p\n, lastVerifyWorkHandle");
if (lastVerifyWorkHandle != NULL) {
printf(" debug step: %d\n", lastVerifyWorkHandle->whereAreWe1);
printf(" debug sub step: %d\n", lastVerifyWorkHandle->whereAreWe2);
}
}
#define DEBUG_INSTALL do { signal(SIGUSR2, showLastVerifyWorkHandleDetails); } while(0)
#define DEBUG_INIT do { lastVerifyWorkHandle->whereAreWe1 = 0; lastVerifyWorkHandle->whereAreWe2 = 0; } while(0)
#define DEBUG_SUB_STEP_RESET do {lastVerifyWorkHandle->whereAreWe2 = 0; } while(0)
#define DEBUG_STEP do { lastVerifyWorkHandle->whereAreWe1++; } while(0)
#define DEBUG_SUB_STEP do { lastVerifyWorkHandle->whereAreWe2++; } while(0)
/* --------------------------------------------------- */
@ -179,6 +163,7 @@ struct worker_thread_s {
typedef struct worker_thread_s worker_thread_t;
struct mydata_s {
int result;
char output[1];
@ -276,31 +261,31 @@ int verify_work_setup(void *handle, void **work_handle) {
pthread_cond_init(vwh->result_cond, NULL);
vwh->terminator_queue = (ht_queue_t*) htmalloc(sizeof(ht_queue_t));
queue_init(vwh->terminator_queue);
vwh->worker_cnt = 0;
vwh->vch = (verify_container_handle_t*)handle;
*work_handle = vwh;
DEBUG_INSTALL;
return 0;
}
int verify_work_destroy(void *handle, void *work_handle) {
verify_container_handle_t *vch = (verify_container_handle_t*)handle;
verify_work_handle_t *vwh = (verify_work_handle_t*)work_handle;
worker_thread_t *wt;
checker_thread_t *ct;
int cnt, cnt2, err;
int err;
syslog(LOG_DEBUG, "verify (%p) verify_work_destroy: was %d times used", work_handle, vwh->id);
/* The terminator_queue must be drained until id_counter is zero again */
cnt = vwh->id;
while (cnt != 0) {
syslog(LOG_DEBUG, "verify (%p) verify_work_destroy, %d thread in queue, waiting for it",
work_handle, cnt);
while (vwh->worker_cnt != 0) {
syslog(LOG_DEBUG, "verify (%p) verify_work_destroy, %d worker thread in queue, waiting for it",
work_handle, vwh->worker_cnt);
wt = (worker_thread_t*) queue_get_wait(vwh->terminator_queue);
cnt2 = wt->checker_cnt;
while (cnt2 != 0) {
while (wt->checker_cnt != 0) {
syslog(LOG_DEBUG, "verify (%p) verify_work_destroy, %d checker thread in queue, waiting for it",
work_handle, wt->checker_cnt);
ct = (checker_thread_t*) queue_get_wait(wt->checker_terminator_queue);
/* clean up the checker stuff */
@ -312,7 +297,7 @@ int verify_work_destroy(void *handle, void *work_handle) {
free(ct->output);
free(ct);
cnt2--;
wt->checker_cnt--;
}
err = pthread_join(wt->thread, NULL);
@ -330,7 +315,7 @@ int verify_work_destroy(void *handle, void *work_handle) {
free(wt);
cnt--;
vwh->worker_cnt--;
}
/* Free the members of the work_handle */
@ -353,6 +338,19 @@ void cache_insert(verify_work_handle_t *vwh, const char *address, int result, co
int mydata_size;
mydata_t *mydata;
if (vwh == NULL) {
syslog(LOG_CRIT, "cache_insert: vwh is NULL");
return;
}
if (address == NULL) {
syslog(LOG_CRIT, "cache_insert: address is NULL");
return;
}
if (output == NULL) {
syslog(LOG_CRIT, "cache_insert: output is NULL");
return;
}
if (NULL != vwh->vch->cache) {
syslog(LOG_DEBUG, "verify (%p) cache_insert: inserting %s -> %d, %s", vwh, address, result, output);
mydata_size = (sizeof(mydata_t) + (sizeof(char) * (strlen(output) + 1)));
@ -371,6 +369,15 @@ int cache_lookup(verify_work_handle_t *vwh, const char* address, int *result, ch
int ret, res = -1;
int mydata_size;
if (vwh == NULL) {
syslog(LOG_CRIT, "cache_lookup: vwh is NULL");
return res;
}
if (address == NULL) {
syslog(LOG_CRIT, "cache_lookup: address is NULL");
return res;
}
if (NULL != vwh->vch->cache) {
syslog(LOG_DEBUG, "verify (%p) cache_lookup: Looking up %s", vwh, address);
ret = htcache_lookup(vwh->vch->cache, address, (char**) &mydata, &mydata_size);
@ -432,8 +439,18 @@ int verify_work(void *handle, void *work_handle, char *input, htbuffer_t *output
verify_work_handle_t *vwh = (verify_work_handle_t*) work_handle;
struct timespec ts;
lastVerifyWorkHandle = vwh;
DEBUG_INIT;
if (handle == NULL) {
TEMP_NOK_RETURN("handle is NULL");
}
if (work_handle == NULL) {
TEMP_NOK_RETURN("work_handle is NULL");
}
if (input == NULL) {
TEMP_NOK_RETURN("input is NULL");
}
if (output == NULL) {
TEMP_NOK_RETURN("output is NULL");
}
syslog(LOG_DEBUG, "verify (%p) verify_work: going to verify %s\n", vwh, input);
@ -455,47 +472,39 @@ int verify_work(void *handle, void *work_handle, char *input, htbuffer_t *output
wt->checker_cnt = 0;
wt->vwh = work_handle;
DEBUG_STEP; // 1
syslog(LOG_DEBUG, "verify (%p) verify_work: going to start worker thread, id=%d",
vwh, vwh->id);
DEBUG_STEP; // 2
pthread_mutex_lock(vwh->result_mutex);
DEBUG_STEP; // 3
err = pthread_create(&tid, NULL, &worker_thread, wt);
DEBUG_STEP; // 4
if (-1 == err) {
incStatCounter(vwh->vch->statCounter, STAT_CNT_VERIFIER_WORKER_THREADS_FAILED);
if (0 != err) {
incStatCounter(vch->statCounter, STAT_CNT_VERIFIER_WORKER_THREADS_FAILED);
free(wt->input);
free(wt);
wt = NULL;
pthread_mutex_unlock(vwh->result_mutex);
PERM_NOK_RETURN("unable to create worker thread");
} else {
vwh->worker_cnt++;
}
DEBUG_STEP; // 5
syslog(LOG_DEBUG, "verify (%p) verify_work: waiting for result", vwh);
ts.tv_sec = time(0) + vch->timeout_result;
ts.tv_nsec = 0;
DEBUG_STEP; // 6
err = pthread_cond_timedwait(vwh->result_cond, vwh->result_mutex, &ts);
DEBUG_STEP; // 7
pthread_mutex_unlock(vwh->result_mutex);
DEBUG_STEP; // 8
if (ETIMEDOUT == err) {
printf("last worker: %p, %d\n", wt, wt->whereAreWe);
incStatCounter(vwh->vch->statCounter, STAT_CNT_VERIFIER_WORKER_THREADS_TIMEOUT);
TEMP_NOK_RETURN("worker thread timed out");
}
DEBUG_STEP; // 9
// snprintf(output, ANSWER_BUFSIZE, vwh->result->output);
htbuffer_strcpy(output, vwh->result->output);
DEBUG_STEP; // 10
free(vwh->result->output);
DEBUG_STEP; // 11
return vwh->result->result;
}
@ -507,6 +516,11 @@ static unsigned int *get_mx_ip_addresses(char *domain) {
int i = 0;
unsigned int *addresses = NULL;
if (NULL == domain) {
syslog(LOG_CRIT, "get_mx_ip_addresses: domain is NULL");
return NULL;
}
mx_rdata = get_best_mx_rrs(domain);
if (NULL == mx_rdata) {
syslog(LOG_DEBUG, "get_mx_ip_address: no mx at all");
@ -539,6 +553,11 @@ static unsigned int *get_mx_ip_addresses(char *domain) {
static void *checker_thread(void *arg) {
if (arg == NULL) {
syslog(LOG_CRIT, "checker_thread: arg is NULL");
return NULL;
}
static const char *UNEXPECTED_ERROR = "unexpected error in smtp dialog";
static const char *TIMEOUT_ERROR = "timeout on smtp dialog";
static const char *ADDRESS_VALID = "address is valid";
@ -673,6 +692,11 @@ static void *checker_thread(void *arg) {
static void *worker_thread(void *arg) {
if (arg == NULL) {
syslog(LOG_CRIT, "worker_thread: arg is NULL");
return NULL;
}
worker_thread_t *wt = (worker_thread_t*) arg;
char *domain_part;
unsigned int *mx_ip_addresses;
@ -686,6 +710,9 @@ static void *worker_thread(void *arg) {
static const char *DEFAULT_ANSWER = "default answer";
static const char *NO_MX_AVAILABLE = "no MX available";
static const char *NO_PERM_RESULT = "no checker returned permanent result";
static const char *INTERNAL_ERROR = "verifier internal error";
lastThing = wt;
syslog(LOG_DEBUG, "verify (%p) worker_thread %d started, %s",
wt->vwh, wt->id, wt->input);
@ -721,12 +748,6 @@ static void *worker_thread(void *arg) {
} else {
for (i = 0; (*(mx_ip_addresses+i) != 0) && (i < wt->vwh->vch->max_checker_threads); i++) {
unsigned int address = *(mx_ip_addresses+i);
syslog(LOG_DEBUG, "verify (%p) worker_thread: starting checker thread to %d.%d.%d.%d, for email-address %s",
wt->vwh,
(address&0xff000000)>>24, (address&0x00ff0000)>>16,
(address&0x0000ff00)>>8, (address&0x000000ff),
wt->input);
ct = (checker_thread_t*) htmalloc(sizeof(checker_thread_t));
ct->id = i+1; /* id of ct should start with 1, same as id of wt */
ct->ip_address = address;
@ -736,11 +757,22 @@ static void *worker_thread(void *arg) {
ct->checker_terminator_queue = wt->checker_terminator_queue;
ct->vwh = wt->vwh;
syslog(LOG_DEBUG, "verify (%p) worker_thread: starting checker thread (%p) to %d.%d.%d.%d, for email-address %s",
wt->vwh,
ct,
(address&0xff000000)>>24, (address&0x00ff0000)>>16,
(address&0x0000ff00)>>8, (address&0x000000ff),
wt->input);
err = pthread_create(&tid, NULL, &checker_thread, ct);
if (-1 == err) {
if (0 != err) {
incStatCounter(ct->vwh->vch->statCounter, STAT_CNT_VERIFIER_CHECKER_THREADS_FAILED);
syslog(LOG_ERR, "verify (%p) worker_thread: unable to create checker thread", wt->vwh);
syslog(LOG_ERR, "verify (%p) worker_thread: unable to create checker thread: %d", wt->vwh, err);
free(ct);
ct = NULL;
wt->output = (char*) INTERNAL_ERROR;
result = SMM_LOCAL_TEMP_NOK;
} else {
wt->checker_cnt += 1;
}