fixes
This commit is contained in:
parent
943b73001c
commit
b244466563
@ -206,6 +206,7 @@ static void** get_rrs(char *domain, int qtype) {
|
||||
|
||||
#if HAVE_LIBDJBDNS
|
||||
struct dns_transmit dt = {0};
|
||||
char *q = NULL;
|
||||
#endif
|
||||
|
||||
|
||||
@ -215,7 +216,6 @@ static void** get_rrs(char *domain, int qtype) {
|
||||
{
|
||||
char s[64];
|
||||
int flagrecursive = 1;
|
||||
char *q = NULL;
|
||||
char t[2];
|
||||
char localip[4] = "\0\0\0\0";
|
||||
iopause_fd x[1];
|
||||
@ -232,10 +232,13 @@ static void** get_rrs(char *domain, int qtype) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
syslog(LOG_DEBUG, "get_rrs: (1) q = %p", q);
|
||||
dns_domain_fromdot(&q, domain, strlen(domain));
|
||||
syslog(LOG_DEBUG, "get_rrs: (2) q = %p", q);
|
||||
|
||||
pthread_mutex_lock(&dns_transmit_mutex);
|
||||
res = dns_transmit_start(&dt, s, flagrecursive, q, t, localip);
|
||||
syslog(LOG_DEBUG, "get_rrs: (3) q = %p", q);
|
||||
pthread_mutex_unlock(&dns_transmit_mutex);
|
||||
|
||||
if (res == -1) {
|
||||
@ -346,6 +349,10 @@ static void** get_rrs(char *domain, int qtype) {
|
||||
pthread_mutex_lock(&dns_transmit_mutex);
|
||||
dns_transmit_free(&dt);
|
||||
pthread_mutex_unlock(&dns_transmit_mutex);
|
||||
|
||||
syslog(LOG_DEBUG, "get_rrs: (4) q = %p", q);
|
||||
free(q);
|
||||
q = NULL;
|
||||
#endif
|
||||
|
||||
return rdata;
|
||||
|
@ -22,6 +22,7 @@
|
||||
|
||||
#include "queue.h"
|
||||
#include "htmalloc.h"
|
||||
#include <syslog.h>
|
||||
|
||||
#ifdef _TEST_MODE_
|
||||
#include <stdio.h>
|
||||
@ -74,30 +75,46 @@ int queue_put(ht_queue_t *q, void *d) {
|
||||
}
|
||||
|
||||
void *queue_get_wait(ht_queue_t *q) {
|
||||
return queue_get_wait_w_timeout(q, 0);
|
||||
}
|
||||
|
||||
void *queue_get_wait_w_timeout(ht_queue_t *q, int timeout_seconds) {
|
||||
queue_entry_t *entry = NULL;
|
||||
void *d = NULL;
|
||||
int rc = 0;
|
||||
|
||||
pthread_mutex_lock(&q->mutex);
|
||||
|
||||
while ((NULL == q->head) && (NULL == q->tail)) {
|
||||
pthread_cond_wait(&q->cond, &q->mutex);
|
||||
if (timeout_seconds == 0) {
|
||||
pthread_cond_wait(&q->cond, &q->mutex);
|
||||
} else {
|
||||
struct timespec ts;
|
||||
ts.tv_sec = time(0) + timeout_seconds;
|
||||
ts.tv_nsec = 0;
|
||||
rc = pthread_cond_timedwait(&q->cond, &q->mutex, &ts);
|
||||
if (rc != 0) // terminate loop for timeout
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
entry = q->head;
|
||||
if (rc == 0) {
|
||||
entry = q->head;
|
||||
|
||||
if (NULL != q->head) {
|
||||
q->head = q->head->prev;
|
||||
}
|
||||
|
||||
if (NULL == q->head) {
|
||||
q->tail = NULL;
|
||||
} else {
|
||||
q->head->next = NULL;
|
||||
}
|
||||
|
||||
if (NULL != entry) {
|
||||
d = entry->data;
|
||||
free(entry);
|
||||
if (NULL != q->head) {
|
||||
q->head = q->head->prev;
|
||||
}
|
||||
|
||||
if (NULL == q->head) {
|
||||
q->tail = NULL;
|
||||
} else {
|
||||
q->head->next = NULL;
|
||||
}
|
||||
|
||||
if (NULL != entry) {
|
||||
d = entry->data;
|
||||
free(entry);
|
||||
}
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&q->mutex);
|
||||
|
@ -44,6 +44,7 @@ void queue_init(ht_queue_t *q);
|
||||
void queue_destroy(ht_queue_t *q);
|
||||
int queue_put(ht_queue_t *q, void *d);
|
||||
void *queue_get_wait(ht_queue_t *q);
|
||||
void *queue_get_wait_w_timeout(ht_queue_t *q, int timeout_seconds);
|
||||
|
||||
|
||||
#endif /* _QUEUE_H_ */
|
||||
|
@ -75,6 +75,20 @@ statCounter_t *initStatCounter(const char *moduleName, counterDef_t *counterDefs
|
||||
return statCounter;
|
||||
}
|
||||
|
||||
void destroyStatCounters() {
|
||||
int i;
|
||||
statCounterEntry_t *s;
|
||||
for (i = 0, s = statCounterList; i < statCounterListLen; i++, s++) {
|
||||
statCounter_t *ss;
|
||||
for (ss = s->statCounters; ss->counter != NULL; ss++) {
|
||||
free(ss->counter);
|
||||
}
|
||||
free(s->counterDefs);
|
||||
free(s->statCounters);
|
||||
}
|
||||
free(statCounterList);
|
||||
}
|
||||
|
||||
unsigned int getStatCounter(statCounter_t *statCounter, int cnt_idx) {
|
||||
statCounter_t *curStatCounter = statCounter + cnt_idx;
|
||||
if (curStatCounter == NULL || curStatCounter->magic != STAT_COUNTER_MAGIC) {
|
||||
|
@ -50,6 +50,7 @@ typedef struct statCounterEntry_s {
|
||||
} statCounterEntry_t;
|
||||
|
||||
counterDef_t *addOneCounterDef(counterDef_t *counterDef, unsigned int type, const char* name);
|
||||
void destroyStatCounter();
|
||||
statCounter_t *initStatCounter(const char *moduleName, counterDef_t *counterDefs);
|
||||
unsigned int getStatCounter(statCounter_t *statCounter, int cnt_idx);
|
||||
void incStatCounter(statCounter_t *statCounter, int cnt_idx);
|
||||
|
@ -163,7 +163,7 @@ int containers_dispatcher(container_handle_t *ch, char *input, htbuffer_t *outpu
|
||||
return result;
|
||||
}
|
||||
|
||||
static int register_class(int id, class_descriptor_t *class_descriptor, char *alias) {
|
||||
static int register_class(int id, void *dl_handle, class_descriptor_t *class_descriptor, char *alias) {
|
||||
int result = 0;
|
||||
cfgl_t *c;
|
||||
classes_t *w;
|
||||
@ -197,6 +197,7 @@ static int register_class(int id, class_descriptor_t *class_descriptor, char *al
|
||||
w->descr = class_descriptor;
|
||||
w->alias = alias;
|
||||
w->id = id;
|
||||
w->dl_handle = dl_handle;
|
||||
w->next = NULL;
|
||||
|
||||
classes_head->next = w;
|
||||
@ -213,7 +214,7 @@ static int unregister_class(classes_t *class) {
|
||||
|
||||
if (NULL != class->descr->destroy_function) {
|
||||
syslog(LOG_DEBUG, "unregister_class: destroying class");
|
||||
result = (*class->descr->destroy_function)(&class->handle);
|
||||
result = (*class->descr->destroy_function)(class->handle);
|
||||
syslog(LOG_ERR, "unregister_class: destroy function returns %d", result);
|
||||
}
|
||||
|
||||
@ -229,6 +230,7 @@ int unregister_all() {
|
||||
classes = classes->next;
|
||||
|
||||
unregister_class(cf);
|
||||
dlclose(cf->dl_handle);
|
||||
free(cf);
|
||||
}
|
||||
}
|
||||
@ -292,7 +294,7 @@ int register_worker(char *worker_name) {
|
||||
return -5;
|
||||
}
|
||||
|
||||
err = register_class(id++, class_descriptor, cfg_plugin);
|
||||
err = register_class(id++, dl_handle, class_descriptor, cfg_plugin);
|
||||
if (0 != err) {
|
||||
syslog(LOG_ERR, "register_all: unable to initialize plugin %s, error %d", cfg_plugin, err);
|
||||
return -6;
|
||||
|
@ -32,6 +32,7 @@ struct classes_s {
|
||||
char *alias;
|
||||
int id;
|
||||
void *handle;
|
||||
void *dl_handle;
|
||||
struct classes_s *next;
|
||||
};
|
||||
|
||||
|
@ -126,6 +126,7 @@ struct networkerThread_s {
|
||||
|
||||
typedef struct networkerThread_s networkerThread_t;
|
||||
|
||||
int terminating = 0;
|
||||
|
||||
ht_queue_t terminated_networker_queue;
|
||||
pthread_t cleanerThread;
|
||||
@ -165,12 +166,19 @@ void * cleaner(void * arg) {
|
||||
networkerThread_t *t;
|
||||
int joinRes;
|
||||
|
||||
pthread_detach(pthread_self());
|
||||
|
||||
while (1) {
|
||||
syslog(LOG_DEBUG, "cleaner: Waiting for terminated networker, running threads: %d",
|
||||
count_get(&thread_counter));
|
||||
t = (networkerThread_t*) queue_get_wait(&terminated_networker_queue);
|
||||
/* syslog(LOG_DEBUG, "cleaner: Waiting for terminated networker, running threads: %d", */
|
||||
/* count_get(&thread_counter)); */
|
||||
|
||||
if ((terminating == 1) && (count_get(&thread_counter) == 0)) {
|
||||
/* syslog(LOG_DEBUG, "cleaner: termination requested"); */
|
||||
break;
|
||||
}
|
||||
|
||||
t = (networkerThread_t*) queue_get_wait_w_timeout(&terminated_networker_queue, 5);
|
||||
if (t == NULL)
|
||||
continue;
|
||||
|
||||
count_dec(&thread_counter);
|
||||
decStatCounter(globalStatCounter, STAT_CNT_NETWORKER_R_THREADS);
|
||||
|
||||
@ -184,6 +192,8 @@ void * cleaner(void * arg) {
|
||||
free(t);
|
||||
t = NULL;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
@ -393,7 +403,8 @@ int server() {
|
||||
}
|
||||
#endif /* ENABLE_NETSNMP */
|
||||
|
||||
while (1) {
|
||||
while (getStatCounter(globalStatCounter, STAT_CNT_ACCEPTED) < 3) {
|
||||
// while (1) {
|
||||
syslog(LOG_DEBUG, "server: Waiting for connection");
|
||||
|
||||
clientSock = accept(serverSock, (struct sockaddr *) &clientAddr, &clientAddrLen);
|
||||
@ -433,6 +444,9 @@ int server() {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
close(serverSock);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -517,6 +531,23 @@ int main(int argc, char **argv) {
|
||||
syslog(LOG_ERR, "main: server fails: %d", err);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
syslog(LOG_INFO, "main: terminating");
|
||||
|
||||
terminating = 1;
|
||||
int cleanerJoinRes = pthread_join(cleanerThread, NULL);
|
||||
syslog(LOG_DEBUG, "cleaner joined: cleanerJoinRes %d", cleanerJoinRes);
|
||||
|
||||
unregister_all();
|
||||
destroyStatCounters();
|
||||
|
||||
if (cfg_file)
|
||||
free(cfg_file);
|
||||
if (pid_file)
|
||||
free(pid_file);
|
||||
|
||||
freecfg(cfg);
|
||||
|
||||
syslog(LOG_INFO, "main: finished");
|
||||
closelog();
|
||||
}
|
||||
|
@ -7,7 +7,7 @@ port = 8887
|
||||
; plugins = test_worker1 test_worker2 verifier cyruscheck lua_worker
|
||||
plugins = verifier
|
||||
enable_stats = 0
|
||||
enable_snmp = 1
|
||||
enable_snmp = 0
|
||||
|
||||
; disables the netstring codec, only for debugging in the development
|
||||
; phase, sendmail is unable to talk to smmapd if the codec is disabled
|
||||
|
@ -10,5 +10,5 @@ config = ConfigParser.ConfigParser()
|
||||
config.read("test.ini");
|
||||
|
||||
|
||||
#Connector.threadedExecute(config, verifier.verifier, int(config.get('Global', 'Threads')), {})
|
||||
Connector.threadedExecute(config, testworker.test_worker1, int(config.get('Global', 'Threads')), {})
|
||||
Connector.threadedExecute(config, verifier.verifier, int(config.get('Global', 'Threads')), {})
|
||||
#Connector.threadedExecute(config, testworker.test_worker1, int(config.get('Global', 'Threads')), {})
|
||||
|
Loading…
x
Reference in New Issue
Block a user