This commit is contained in:
whottgen 2009-04-24 11:26:57 +00:00
parent 364410438a
commit e8a014ad18
5 changed files with 124 additions and 69 deletions

View File

@ -2,19 +2,23 @@
#include <string>
#include "rateconn.H"
#include "Mutex.H"
using namespace std;
RateconnCheck rateconnCheck;
RateconnTable *rateconnTable = NULL;
extern "C" void rateconnInitWrapper() throw () {
rateconnCheck = RateconnCheck();
extern "C" void rateconnInitWrapper(pthread_mutex_t *mutex, int numOfBuckets) throw () {
rateconnTable = new RateconnTable(mutex, numOfBuckets);
}
extern "C" int rateconnCheckWrapper(const char* input) throw () {
return rateconnCheck.check(string(input));
extern "C" void rateconnTurnWrapper() throw () {
rateconnTable->turn();
}
extern "C" int rateconnIncAndGetWrapper(const char* input) throw () {
return rateconnTable->incAndGet(string(input));
}
@ -32,35 +36,45 @@ extern "C" int rateconnCheckWrapper(const char* input) throw () {
*/
RateconnCheck::RateconnCheck() {
moveChain();
RateconnTable::RateconnTable(pthread_mutex_t *mutex, int numOfBuckets)
: m_numOfBuckets(numOfBuckets), m_mutex(mutex)
{
turn();
}
void RateconnCheck::moveChain() {
if (m_countersDeque.size() > numOfBuckets)
void RateconnTable::turn() {
Mutex m1(m_mutex); // rmc to lock the whole scope and to unlock it at its end
if (m_countersDeque.size() > m_numOfBuckets)
m_countersDeque.pop_back();
counterMap_t tmpMap = counterMap_t();
m_countersDeque.push_front(tmpMap);
m_countersDeque.push_front(counterMap_t());
}
void RateconnCheck::printAll() {
for (countersDeque_t::const_iterator i = m_countersDeque.begin(); i != m_countersDeque.end(); ++i) {
void RateconnTable::printAll() {
int t = 0;
for (countersDeque_t::const_iterator i = m_countersDeque.begin(); i != m_countersDeque.end(); ++i, ++t) {
for (counterMap_t::const_iterator j = i->begin(); j != i->end(); ++j) {
cout << "key: " << j->first << ", value: " << j->second << endl;
cout << "table: " << t <<", key: " << j->first << ", value: " << j->second << endl;
}
}
}
int RateconnTable::incAndGet(const string& input) {
Mutex m1(m_mutex); // rmc to lock the whole scope and to unlock it at its end
int RateconnCheck::check(const string& input) {
counterMap_t& cm = m_countersDeque.front();
if (cm.find(input) == 0)
cm[input] = 0;
cm[input]++;
int cnt = 0;
for (countersDeque_t::iterator i = m_countersDeque.begin(); i != m_countersDeque.end(); ++i) {
cnt += (*i)[input];
}
printAll();
return 0;
return cnt;
}

View File

@ -5,24 +5,25 @@
#include <deque>
#include <string>
#include <pthread.h>
typedef std::map<std::string, int> counterMap_t;
typedef std::deque<counterMap_t> countersDeque_t;
class RateconnCheck {
class RateconnTable {
public:
RateconnCheck();
void moveChain();
RateconnTable(pthread_mutex_t *mutex, int numOfBuckets);
void turn();
void printAll();
int check(const std::string& input);
~RateconnCheck();
int incAndGet(const std::string& input);
// ~RateconnTable();
private:
const static int numOfBuckets = 10;
int m_numOfBuckets;
countersDeque_t m_countersDeque;
pthread_mutex_t *m_mutex;
};

View File

@ -21,6 +21,7 @@
#include <stdlib.h>
#include <syslog.h>
#include <string.h>
#include <pthread.h>
#include "containers_public.h"
#include "smmapd.h"
@ -44,14 +45,19 @@ void rateconn_setupCounterList() {
#define STAT_CNT_RATECONN_WORKER 0
#define STAT_CNT_RATECONN_ILLEGAL_INPUT 1
#define STAT_CNT_RATECONN_RETURNED_OK 3
#define STAT_CNT_RATECONN_RETURNED_NOK 4
#define STAT_CNT_RATECONN_RETURNED_TNOK 5
#define STAT_CNT_RATECONN_RETURNED_OK 2
#define STAT_CNT_RATECONN_RETURNED_NOK 3
#define STAT_CNT_RATECONN_RETURNED_TNOK 4
struct rateconn_container_handle_s {
cfgl_t *cfg;
statCounter_t *statCounter;
pthread_mutex_t tableMutex;
pthread_t turner;
int numOfBuckets;
int turnPeriod;
int threshold;
};
typedef struct rateconn_container_handle_s rateconn_container_handle_t;
@ -84,17 +90,46 @@ class_descriptor_t rateconn_check = {
};
static void *tableTurner(void *arg) {
if (arg == NULL) {
syslog(LOG_CRIT, "tableTurner: arg is NULL");
return NULL;
}
rateconn_container_handle_t *cch = (rateconn_container_handle_t*) arg;
while (1) {
syslog(LOG_DEBUG, "rateconn_worker: turning");
rateconnTurnWrapper();
sleep(cch->turnPeriod);
}
}
int rateconn_init(cfgl_t *cfg, void **handle) {
rateconn_container_handle_t *cch;
rateconn_container_handle_t *cch;
cch = (rateconn_container_handle_t*) htmalloc(sizeof(rateconn_container_handle_t));
cch->cfg = cfg;
cch = (rateconn_container_handle_t*) htmalloc(sizeof(rateconn_container_handle_t));
cch->cfg = cfg;
rateconn_setupCounterList();
cch->statCounter = initStatCounter("rateconn", rateconn_counterDefs);
cch->numOfBuckets = atoi(findcfglx(cch->cfg, "num_of_buckets", "12"));
cch->turnPeriod = atoi(findcfglx(cch->cfg, "turn_period", "300"));
cch->threshold = atoi(findcfglx(cch->cfg, "threshold", 10));
rateconn_setupCounterList();
cch->statCounter = initStatCounter("rateconn", rateconn_counterDefs);
pthread_mutex_init(&(cch->tableMutex), NULL);
pthread_mutex_unlock(&(cch->tableMutex));
rateconnInitWrapper(&(cch->tableMutex), cch->numOfBuckets);
int err = pthread_create(&(cch->turner), NULL, &tableTurner, cch);
*handle = cch;
return 0;
*handle = cch;
return 0;
}
int rateconn_destroy(void *handle) {
@ -125,30 +160,29 @@ int rateconn_work(void *handle, void *work_handle, char *input, htbuffer_t *outp
incStatCounter(cch->statCounter, STAT_CNT_RATECONN_WORKER);
result = rateconnCheckWrapper(input);
int cnt = rateconnIncAndGetWrapper(input);
syslog(LOG_DEBUG, "rateconn_work: cnt=%d", cnt);
result = (cnt > cch->threshold) ? SMM_LOCAL_NOK : SMM_LOCAL_OK;
switch (result) {
case SMM_LOCAL_NOK:
htbuffer_strcat(output, "<NOK><REJECTED>");
htbuffer_strcpy(output, "<NOK><REJECTED>");
result = SMM_OK;
incStatCounter(cch->statCounter, STAT_CNT_RATECONN_RETURNED_NOK);
break;
case SMM_LOCAL_OK:
htbuffer_strcat(output, "<OK><ACCEPTED>");
htbuffer_strcpy(output, "<OK><ACCEPTED>");
result = SMM_OK;
incStatCounter(cch->statCounter, STAT_CNT_RATECONN_RETURNED_OK);
break;
default:
htbuffer_strcat(output, "<TNOK><UNKNOWN FAILURE>");
htbuffer_strcpy(output, "<TNOK><UNKNOWN FAILURE>");
result = SMM_TEMP_NOK;
incStatCounter(cch->statCounter, STAT_CNT_RATECONN_RETURNED_TNOK);
break;
}
syslog(LOG_DEBUG, "rateconn_work: (%04x) result %d, %s", log_id,
result, output->buf);

View File

@ -122,37 +122,37 @@ int containers_dispatcher(container_handle_t *ch, char *input, htbuffer_t *outpu
syslog(LOG_DEBUG, "dispatcher: class: %s, data: %s", class, data);
for (classes = classes_root.next; classes != NULL; classes = classes->next) {
if (0 == strcmp(class, classes->alias)) {
syslog(LOG_DEBUG, "dispatcher: yes, we support it, it's id=%d", classes->id);
for (wh = ch->worker_handle_root.next, wh_last = &ch->worker_handle_root, wh2 = NULL;
wh != NULL;
wh = wh->next) {
wh_last = wh;
if (wh->id == classes->id) {
syslog(LOG_DEBUG, "dispatcher: we already have a worker handle: %p", wh);
wh2 = wh;
break;
}
}
if (0 == strcmp(class, classes->alias)) {
syslog(LOG_DEBUG, "dispatcher: yes, we support it, it's id=%d", classes->id);
for (wh = ch->worker_handle_root.next, wh_last = &ch->worker_handle_root, wh2 = NULL;
wh != NULL;
wh = wh->next) {
wh_last = wh;
if (wh->id == classes->id) {
syslog(LOG_DEBUG, "dispatcher: we already have a worker handle: %p", wh);
wh2 = wh;
break;
}
}
if ((NULL == wh_last->next) && (NULL == wh2)) {
wh2 = (worker_handle_t*)htmalloc(sizeof(worker_handle_t));
syslog(LOG_DEBUG, "dispatcher: we haven't one, we create one: %p", wh2);
wh2->id = classes->id;
if (NULL != classes->descr->work_setup_function) {
err = (*classes->descr->work_setup_function)(classes->handle, &(wh2->handle));
} else {
wh2->handle = NULL;
}
wh2->next = NULL;
wh_last->next = wh2;
}
if ((NULL == wh_last->next) && (NULL == wh2)) {
wh2 = (worker_handle_t*)htmalloc(sizeof(worker_handle_t));
syslog(LOG_DEBUG, "dispatcher: we haven't one, we create one: %p", wh2);
wh2->id = classes->id;
if (NULL != classes->descr->work_setup_function) {
err = (*classes->descr->work_setup_function)(classes->handle, &(wh2->handle));
} else {
wh2->handle = NULL;
}
wh2->next = NULL;
wh_last->next = wh2;
}
result = (*classes->descr->work_function)(classes->handle, wh2->handle, data, output);
syslog(LOG_DEBUG, "dispatcher: worker output: (%d, %s)", result, output->buf);
break;
}
result = (*classes->descr->work_function)(classes->handle, wh2->handle, data, output);
syslog(LOG_DEBUG, "dispatcher: worker output: (%d, %s)", result, output->buf);
break;
}
}
if (NULL == classes) {

View File

@ -61,3 +61,9 @@ load = base string io
lua_path = .
file = worker.l
entrypoint = f
[rateconn_check]
obj = librateconn_worker.so
expiry_period = 300
num_of_buckets = 12
threshold = 3