/*
* Copyright (c) 2006 Claus Assmann
*
* By using this file, you agree to the terms and conditions set
* forth in the license/LICENSE.3C file which can be found at the
* top level of this source code distribution.
*/
/*
** (performance) Test program to add entries to AQ and remove them from AQ.
** Uses "scheduler" that moves entries from rdq to waitq.
*/
#include "sm/generic.h"
SM_IDSTR(id, "@(#)$Id: t-aq-perf-3.c,v 1.1 2007/02/26 02:17:11 ca Exp $")
#include "sm/debug.h"
#include "sm/heap.h"
#include "sm/sysexits.h"
#include "sm/test.h"
#include "sm/io.h"
#include "sm/rfc2821.h"
#include "sm/actdb-int.h"
#include "sm/aqrdq.h"
#define QMGR_DEBUG_DEFINE 1
#include "sm/qmgr-int.h"
#include "sm/qmgrdbg.h"
#define QMGR_LOG_DEFINES 1
#include "log.h"
#include "qmgr.h"
#define DONE_NONE 0
#define DONE_ADD 1
#define DONE_SCHED 2
#define DONE_DEL 3
static int Verbose = 0;
static int Rand = 0;
static int Done = DONE_NONE;
static aq_ctx_P Aq_ctx = NULL;
static qmgr_ctx_P Qmgr_ctx = NULL;
static uint Once_TAs = 0;
static uint Total_TAs = 0;
static uint Fill = 90;
static uint Rate = 0;
static uint Aq_size = 90;
static sm_evthr_ctx_P Ev_ctx = NULL;
static sm_evthr_task_P Tsk_add, Tsk_sched, Tsk_del;
static int Ev_dbg = 0;
static uint Ndas = 512;
qsc_ctx_P qsc_ctx;
#if SM_HEAP_CHECK
# include "sm/io.h"
extern SM_DEBUG_T SmHeapCheck;
# define HEAP_CHECK (SmHeapCheck > 0)
#else
# define HEAP_CHECK 0
#endif
#define SM_AQ_RCPTS 256
#define RCPT_MAX_LEN 256
sm_cstr_P Tst_CDB_id = NULL;
static sm_log_ctx_P Lctx = NULL;
static sm_str_P Defaultdomain = NULL;
#if 0
/* HACK: referenced by dadb_close() */
sm_ret_T
qda_update_ta_stat(qmgr_ctx_P qmgr_ctx, sessta_id_T da_ta_id, sm_ret_T status,
uint err_st, dadb_ctx_P dadb_ctx, dadb_entry_P dadb_entry,
aq_ta_P aq_ta, aq_rcpt_P aq_rcpt, sm_str_P errmsg, thr_lock_T locktype)
{
sm_io_fprintf(smioerr, "func=qda_update_ta_stat\n");
return SM_SUCCESS;
}
#endif
/*
** AQ_T_ADD -- add entries to AQ and rdq
*/
static uint32_t Total_added = 0;
static sm_ret_T
aq_t_add(sm_evthr_task_P tsk)
{
sm_ret_T ret;
uint32_t added;
bool wakeup;
timeval_T tv_now, delay;
time_T now;
ret = evthr_timeval(Ev_ctx, &tv_now);
SM_TEST_ERR(ret);
now = tv_now.tv_sec;
wakeup = false;
if (Verbose > 1)
sm_io_fprintf(smioerr, "func=aq_t_add, called=%ld.%06ld\n", tv_now.tv_sec, tv_now.tv_usec);
do {
ret = qm_test_fill_aq(Lctx, Aq_ctx, now, Total_TAs, Once_TAs, Fill, Rate, Defaultdomain, &added, THR_LOCK_UNLOCK);
Total_added += added;
if (Verbose > 3)
sm_io_fprintf(smioerr, "func=aq_t_add, added=%u, total=%u, ret=%d\n", added, Total_added, ret);
SM_TEST_ERR(ret);
if (added > 0) {
if (Verbose > 1)
sm_io_fprintf(smioerr, "func=aq_t_add, added=%u, total=%u, ret=%d\n", added, Total_added, ret);
wakeup = true;
}
else if (0 == added ) {
if (Verbose > 1)
sm_io_fprintf(smioerr, "func=aq_t_add, added=%u, ret=%d\n", added, ret);
if (SM_NOTDONE == ret && Done < DONE_ADD)
Done = DONE_ADD;
}
} while (Rand > 0 && added > 0 && Done < DONE_ADD && rand() < Rand);
if (wakeup) {
ret = evthr_wakeup_task(Tsk_sched);
SM_TEST_E(0 == ret);
}
if (Verbose > 3)
sm_io_fprintf(smioerr, "func=aq_t_add, added=%u, total=%u, Done=%d\n", added, Total_added, Done);
delay.tv_usec = 1;
delay.tv_sec = Done != DONE_NONE ? 9 : 1;
timeradd(&tv_now, &delay, &tsk->evthr_t_sleep);
return EVTHR_SLPQ;
error:
sm_io_fprintf(smioerr, "func=aq_t_add, error=%d\n", ret);
return EVTHR_TERM;
}
/*
** AQ_T_SCHED -- "schedule" entries from AQ
*/
static uint32_t Total_scheduled = 0;
static sm_ret_T
aq_t_sched(sm_evthr_task_P tsk)
{
sm_ret_T ret;
int i, status;
uint todo_entries, entries_scheduled;
bool done, aq_locked;
aq_rcpt_P aq_rcpt;
aqrdq_ctx_P aqrdq;
timeval_T tv_now, delay;
time_T time_now;
entries_scheduled = 0;
done = false;
aq_locked = false;
ret = SM_FAILURE;
SM_ASSERT(!aq_locked);
status = pthread_mutex_lock(&Aq_ctx->aq_mutex);
SM_TEST_E(0 == status);
aq_locked = true;
ret = evthr_timeval(Ev_ctx, &tv_now);
if (Verbose > 1)
sm_io_fprintf(smioerr, "func=aq_t_sched, called=%ld.%06ld\n", tv_now.tv_sec, tv_now.tv_usec);
time_now = tv_now.tv_sec;
/* only one... */
aqrdq = AQ_RDQS_FIRST(Aq_ctx->aq_rdqs);
if (NULL == aqrdq) {
/* no entries have been added? */
if (Verbose > 0)
sm_io_fprintf(smioerr, "func=aq_t_sched, aq_rdqs=empty\n");
goto unlock;
}
todo_entries = aqrdq->aqrdq_entries;
if (0 == todo_entries) {
if (Verbose > 0)
sm_io_fprintf(smioerr, "func=aq_t_sched, todo=0\n");
goto unlock;
}
/* don't schedule more entries than added in a single invocation */
if (todo_entries > Once_TAs && Done == DONE_NONE) {
todo_entries = Once_TAs;
}
#if HAVE_MONCONTROL
moncontrol(1);
#endif
for (i = 0; i < todo_entries; i++, entries_scheduled++)
{
dadb_entry_P dadb_entry;
if (AQ_RDQ_EMPTY(aqrdq->aqrdq_rcpts))
break;
aq_rcpt = AQ_RDQ_FIRST(aqrdq->aqrdq_rcpts);
SM_TEST_E(aq_rcpt != NULL);
SM_IS_AQ_RCPT(aq_rcpt);
dadb_entry = NULL;
#if 0
ret = dadb_se_find_by_ipv4(qsc_ctx->qsc_dadb_ctx,
aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4,
time_now, &dadb_entry);
#endif
if (dadb_entry == NULL) {
/* open a DA session */
ret = dadb_sess_open(qsc_ctx, qsc_ctx->qsc_dadb_ctx,
aq_rcpt->aqr_ss_ta_id,
aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4,
aq_rcpt, &dadb_entry);
SM_TEST_ERR(ret);
SM_TEST_E(dadb_entry != NULL);
SESSTA_COPY(aq_rcpt->aqr_da_ta_id, dadb_entry->dadbe_da_ta_id);
AQR_DA_INIT(aq_rcpt);
if (Verbose > 3)
sm_io_fprintf(smioerr, "func=aq_t_sched, da_da_id=%s\n", aq_rcpt->aqr_da_ta_id);
}
ret = aq_rdq_rm(Aq_ctx, aq_rcpt, THR_NO_LOCK, Lctx);
SM_TEST_ERR(ret);
ret = aq_waitq_add(Aq_ctx, aq_rcpt, time_now, AQWQ_DA, false);
SM_TEST_ERR(ret);
}
#if HAVE_MONCONTROL
moncontrol(1);
#endif
unlock:
if (aq_locked) {
status = pthread_mutex_unlock(&Aq_ctx->aq_mutex);
SM_TEST_E(0 == status);
aq_locked = false;
}
if (entries_scheduled) {
ret = evthr_wakeup_task(Tsk_del);
SM_TEST_E(0 == ret);
}
Total_scheduled += entries_scheduled;
if (Total_scheduled >= Total_TAs && Done < DONE_SCHED)
Done = DONE_SCHED;
if (Verbose > 1)
sm_io_fprintf(smioerr, "func=aq_t_sched, scheduled=%u, total=%u, return=%d, Done=%d\n"
, entries_scheduled, Total_scheduled, ret, Done);
delay.tv_usec = 0;
delay.tv_sec = 30;
timeradd(&tv_now, &delay, &tsk->evthr_t_sleep);
return EVTHR_SLPQ;
error:
if (aq_locked) {
status = pthread_mutex_unlock(&Aq_ctx->aq_mutex);
SM_TEST_E(0 == status);
aq_locked = false;
}
sm_io_fprintf(smioerr, "func=aq_t_sched, error=%d\n", ret);
return EVTHR_TERM;
}
/*
** AQ_T_DEL -- delete entries from AQ
*/
static uint32_t Total_removed = 0;
static sm_ret_T
aq_t_del(sm_evthr_task_P tsk)
{
sm_ret_T ret, flags;
int i, status;
uint todo_entries, entries_removed;
bool done, aq_locked;
aq_rcpt_P aq_rcpt;
aq_ta_P aq_ta;
timeval_T tv_now, delay;
dadb_entry_P dadb_entry;
entries_removed = 0;
done = false;
aq_locked = false;
ret = evthr_timeval(Ev_ctx, &tv_now);
if (Verbose > 1)
sm_io_fprintf(smioerr, "func=aq_t_del, called=%ld.%06ld\n", tv_now.tv_sec, tv_now.tv_usec);
todo_entries = Once_TAs;
#if HAVE_MONCONTROL
moncontrol(1);
#endif
for (i = 0; i < todo_entries || Done == DONE_SCHED; i++, entries_removed++)
{
status = pthread_mutex_lock(&Aq_ctx->aq_mutex);
SM_TEST_E(0 == status);
aq_locked = true;
if (AQR_WAITQ_EMPTY(Aq_ctx))
break;
aq_rcpt = AQR_WAITQ_FIRST(Aq_ctx);
SM_TEST_E(aq_rcpt != NULL);
SM_IS_AQ_RCPT(aq_rcpt);
status = pthread_mutex_unlock(&Aq_ctx->aq_mutex);
SM_TEST_E(0 == status);
aq_locked = false;
/* not necessary here, but required to "emulate" the real algorithm */
if (Verbose > 3)
sm_io_fprintf(smioerr, "func=aq_t_del, da_da_id=%s\n", aq_rcpt->aqr_da_ta_id);
ret = dadb_ta_find(qsc_ctx->qsc_dadb_ctx, aq_rcpt->aqr_da_ta_id,
&dadb_entry);
SM_TEST_ERR(ret);
SM_TEST_E(dadb_entry != NULL);
SM_TEST_E(dadb_entry->dadbe_ss_ta_id != NULL);
SM_TEST_E(*dadb_entry->dadbe_ss_ta_id != '\0');
if (dadb_entry->dadbe_rcpt != NULL &&
(aq_ta = dadb_entry->dadbe_rcpt->aqr_ss_ta) != NULL) {
SM_ASSERT(SESSTA_EQ(dadb_entry->dadbe_ss_ta_id,
aq_ta->aqt_ss_ta_id));
}
else
ret = aq_ta_find(Qmgr_ctx->qmgr_aq, dadb_entry->dadbe_ss_ta_id, true, &aq_ta);
/* session must be closed */
DADBE_SET_FLAG(dadb_entry, DADBE_FL_SE_CL);
#if 0
/* Update transaction status */
ret = qda_update_ta_stat(Qmgr_ctx, dadb_entry->dadbe_da_ta_id,
0, 0, qsc_ctx->qsc_dadb_ctx, dadb_entry,
aq_ta, NULL, NULL, THR_LOCK_UNLOCK);
#else
ret = qda_dadb_close(Qmgr_ctx, dadb_entry->dadbe_da_ta_id,
qsc_ctx->qsc_dadb_ctx, dadb_entry, status, &flags);
#endif
ret = aq_waitq_rm(Aq_ctx, aq_rcpt, AQWQ_DA, true);
SM_TEST_ERR(ret);
aq_ta = aq_rcpt->aqr_ss_ta;
SM_TEST_E(aq_ta != NULL);
SM_TEST_ERR(ret);
ret = aq_rcpt_rm(Aq_ctx, aq_rcpt, AQR_RM_LOCK|AQR_RM_N_RDQ|AQR_RM_I_RDQ);
SM_TEST_ERR(ret);
ret = aq_ta_rm(Aq_ctx, aq_ta, true);
SM_TEST_ERR(ret);
}
#if HAVE_MONCONTROL
moncontrol(1);
#endif
if (aq_locked) {
status = pthread_mutex_unlock(&Aq_ctx->aq_mutex);
SM_TEST_E(0 == status);
aq_locked = false;
}
if (Done < DONE_ADD) {
ret = evthr_wakeup_task(Tsk_add);
SM_TEST_E(0 == ret);
}
else if (Done < DONE_SCHED) {
ret = evthr_wakeup_task(Tsk_sched);
SM_TEST_E(0 == ret);
}
Total_removed += entries_removed;
if (Total_removed >= Total_TAs && Done < DONE_DEL)
Done = DONE_DEL;
if (Verbose > 1)
sm_io_fprintf(smioerr, "func=aq_t_del, removed=%u, total=%u, AQ=%u, return=%d, Done=%d\n"
, entries_removed, Total_removed, Total_added - Total_removed, ret, Done);
if (Done >= DONE_SCHED) {
delay.tv_usec = 0;
delay.tv_sec = 0;
}
else {
delay.tv_usec = 0;
delay.tv_sec = 1;
}
timeradd(&tv_now, &delay, &tsk->evthr_t_sleep);
return (Done == DONE_DEL) ? EVTHR_TERM : EVTHR_SLPQ;
error:
if (aq_locked) {
status = pthread_mutex_unlock(&Aq_ctx->aq_mutex);
SM_TEST_E(0 == status);
aq_locked = false;
}
sm_io_fprintf(smioerr, "func=aq_t_del, error=%d\n", ret);
return EVTHR_TERM;
}
static sm_ret_T
aq_t_init(void)
{
sm_ret_T ret;
sm_logconfig_P lcfg;
sessta_id_T ta_id;
ret = SM_SUCCESS;
Aq_ctx = NULL;
Qmgr_ctx = NULL;
Qmgr_ctx = (qmgr_ctx_P) sm_zalloc(sizeof(*Qmgr_ctx));
Qmgr_ctx->qmgr_cnf.q_cnf_tmo_da = AQR_DA_TMOUT;
Qmgr_ctx->sm_magic = SM_QMGR_CTX_MAGIC;
Qmgr_ctx->qmgr_cnf.sm_magic = SM_QMGR_CNF_MAGIC;
ret = aq_open(Qmgr_ctx, &Aq_ctx, Aq_size, 0);
SM_TEST_ERR(ret);
ret = qsc_ctx_new(Qmgr_ctx, 1, &qsc_ctx);
SM_TEST_ERR(ret);
qsc_ctx->qsc_id = 1;
ret = qsc_id_init(qsc_ctx, 0);
SM_TEST_ERR(ret);
ret = dadb_new(&qsc_ctx->qsc_dadb_ctx, Ndas);
SM_TEST_ERR(ret);
ret = occ_open(&Qmgr_ctx->qmgr_occ_ctx, 512);
SM_TEST_ERR(ret);
sm_snprintf(ta_id, SMTP_STID_SIZE, SMTPS_STID_FORMAT, (id_count_T)0, 0);
Tst_CDB_id = sm_cstr_scpyn((uchar *)ta_id, sizeof(ta_id));
if (NULL == Tst_CDB_id)
return sm_error_temp(SM_EM_Q, ENOMEM);
Defaultdomain = sm_str_scpy(NULL, "example.com", 64);
SM_TEST_E(Defaultdomain != NULL);
ret = sm_log_create(NULL, &Lctx, &lcfg);
SM_TEST_ERR(ret);
ret = sm_log_setfp_fd(Lctx, smioerr, SMIOERR_FILENO);
SM_TEST_ERR(ret);
ret = sm_log_setdebuglevel(Lctx, 0);
SM_TEST_ERR(ret);
srand(time(0));
/* initialize event threads system */
ret = evthr_init(&Ev_ctx, 2, 6, 2);
if (sm_is_err(ret)) {
sm_io_fprintf(smioerr,
"sev=ERROR, func=aq_t_init1, evthr_init=%m\n", ret);
SM_TEST_ERR(ret);
}
#if EVTHR_DEBUG
evthr_set_dbglvl(Ev_ctx, Ev_dbg);
#endif
Qmgr_ctx->qmgr_ev_ctx = Ev_ctx;
return ret;
error:
return ret;
}
static sm_ret_T
aq_t_start(void)
{
sm_ret_T ret;
timeval_T tv_now, sleept, delay;
ret = evthr_timeval(Ev_ctx, &tv_now);
SM_TEST_ERR(ret);
delay.tv_usec = 1;
delay.tv_sec = 0;
timeradd(&tv_now, &delay, &sleept);
ret = evthr_task_new(Ev_ctx, &Tsk_add,
EVTHR_EV_SL, INVALID_FD, &sleept, aq_t_add, &Aq_ctx);
SM_TEST_ERR(ret);
delay.tv_usec = 0;
delay.tv_sec = 80000;
timeradd(&tv_now, &delay, &sleept);
ret = evthr_task_new(Ev_ctx, &Tsk_sched,
EVTHR_EV_SL, INVALID_FD, &sleept, aq_t_sched, &Aq_ctx);
SM_TEST_ERR(ret);
delay.tv_usec = 0;
delay.tv_sec = 90000;
timeradd(&tv_now, &delay, &sleept);
ret = evthr_task_new(Ev_ctx, &Tsk_del,
EVTHR_EV_SL, INVALID_FD, &sleept, aq_t_del, &Aq_ctx);
SM_TEST_ERR(ret);
return ret;
error:
return ret;
}
/*
** AQ_T_PERF -- add/remove entries to/from AQ
*/
static sm_ret_T
aq_t_perf(void)
{
sm_ret_T ret;
ret = aq_t_init();
SM_TEST_ERR(ret);
ret = aq_t_start();
SM_TEST_ERR(ret);
ret = evthr_loop(Ev_ctx);
SM_TEST_ERR(ret);
ret = aq_close(Aq_ctx);
return ret;
error:
return ret;
}
static void
usage(const char *prg)
{
sm_io_fprintf(smioerr, "usage: %s [options]\n", prg);
sm_io_fprintf(smioerr, "Test AQ insertion, lookups, and removal (domains are random numbers)\n");
sm_io_fprintf(smioerr, "options:\n");
sm_io_fprintf(smioerr, "-e n specify number of recipients to add each iteration\n");
sm_io_fprintf(smioerr, "-f n specify allowed fill of AQ (per cent)\n");
sm_io_fprintf(smioerr, "-n n specify total number of recipients to use\n");
sm_io_fprintf(smioerr, "-R n repead addition if rand() is less than n\n");
sm_io_fprintf(smioerr, "-r n fill rate\n");
sm_io_fprintf(smioerr, "-s n specify size of AQ\n");
sm_io_fprintf(smioerr, "-V increase verbosity\n");
exit(EX_USAGE);
}
int
main(int argc, char **argv)
{
int c;
sm_ret_T r;
char *prg;
#if HAVE_MONCONTROL
moncontrol(1);
#endif
prg = argv[0];
Total_TAs = SM_AQ_RCPTS;
Aq_size = SM_AQ_RCPTS;
Once_TAs = Total_TAs / 10;
#if SM_HEAP_CHECK
SmHeapCheck = 0;
#endif
while ((c = getopt(argc, argv, "E:e:f:H:l:n:R:r:s:V")) != -1) {
switch (c) {
case 'E':
Ev_dbg = (int) strtoul(optarg, NULL, 0);
break;
case 'e':
Once_TAs = (uint) strtoul(optarg, NULL, 0);
break;
case 'f':
Fill = (uint) strtoul(optarg, NULL, 0);
break;
#if SM_HEAP_CHECK
case 'H':
SmHeapCheck = atoi(optarg);
break;
#endif
case 'n':
Total_TAs = (uint) strtoul(optarg, NULL, 0);
break;
case 'R':
Rand = (int) strtoul(optarg, NULL, 0);
break;
case 'r':
Rate = (uint) strtoul(optarg, NULL, 0);
break;
case 's':
Aq_size = (uint) strtoul(optarg, NULL, 0);
break;
case 'V':
++Verbose;
break;
default:
usage(prg);
return EX_USAGE;
}
}
sm_test_begin(argc, argv, "test AQ perf 2");
r = aq_t_perf();
#if SM_HEAP_CHECK
if (HEAP_CHECK) {
sm_io_fprintf(smioout, "heap should be empty except for makebuf:\n");
sm_heap_report(smioout, 3);
}
#endif
sm_io_flush(smioout);
return sm_test_end();
}
syntax highlighted by Code2HTML, v. 0.9.1