/* * Copyright (c) 2006 Claus Assmann * * By using this file, you agree to the terms and conditions set * forth in the license/LICENSE.3C file which can be found at the * top level of this source code distribution. */ /* ** (performance) Test program to add entries to AQ and remove them from AQ. ** Uses "scheduler" that moves entries from rdq to waitq. */ #include "sm/generic.h" SM_IDSTR(id, "@(#)$Id: t-aq-perf-3.c,v 1.1 2007/02/26 02:17:11 ca Exp $") #include "sm/debug.h" #include "sm/heap.h" #include "sm/sysexits.h" #include "sm/test.h" #include "sm/io.h" #include "sm/rfc2821.h" #include "sm/actdb-int.h" #include "sm/aqrdq.h" #define QMGR_DEBUG_DEFINE 1 #include "sm/qmgr-int.h" #include "sm/qmgrdbg.h" #define QMGR_LOG_DEFINES 1 #include "log.h" #include "qmgr.h" #define DONE_NONE 0 #define DONE_ADD 1 #define DONE_SCHED 2 #define DONE_DEL 3 static int Verbose = 0; static int Rand = 0; static int Done = DONE_NONE; static aq_ctx_P Aq_ctx = NULL; static qmgr_ctx_P Qmgr_ctx = NULL; static uint Once_TAs = 0; static uint Total_TAs = 0; static uint Fill = 90; static uint Rate = 0; static uint Aq_size = 90; static sm_evthr_ctx_P Ev_ctx = NULL; static sm_evthr_task_P Tsk_add, Tsk_sched, Tsk_del; static int Ev_dbg = 0; static uint Ndas = 512; qsc_ctx_P qsc_ctx; #if SM_HEAP_CHECK # include "sm/io.h" extern SM_DEBUG_T SmHeapCheck; # define HEAP_CHECK (SmHeapCheck > 0) #else # define HEAP_CHECK 0 #endif #define SM_AQ_RCPTS 256 #define RCPT_MAX_LEN 256 sm_cstr_P Tst_CDB_id = NULL; static sm_log_ctx_P Lctx = NULL; static sm_str_P Defaultdomain = NULL; #if 0 /* HACK: referenced by dadb_close() */ sm_ret_T qda_update_ta_stat(qmgr_ctx_P qmgr_ctx, sessta_id_T da_ta_id, sm_ret_T status, uint err_st, dadb_ctx_P dadb_ctx, dadb_entry_P dadb_entry, aq_ta_P aq_ta, aq_rcpt_P aq_rcpt, sm_str_P errmsg, thr_lock_T locktype) { sm_io_fprintf(smioerr, "func=qda_update_ta_stat\n"); return SM_SUCCESS; } #endif /* ** AQ_T_ADD -- add entries to AQ and rdq */ static uint32_t Total_added = 0; static sm_ret_T aq_t_add(sm_evthr_task_P tsk) { sm_ret_T ret; uint32_t added; bool wakeup; timeval_T tv_now, delay; time_T now; ret = evthr_timeval(Ev_ctx, &tv_now); SM_TEST_ERR(ret); now = tv_now.tv_sec; wakeup = false; if (Verbose > 1) sm_io_fprintf(smioerr, "func=aq_t_add, called=%ld.%06ld\n", tv_now.tv_sec, tv_now.tv_usec); do { ret = qm_test_fill_aq(Lctx, Aq_ctx, now, Total_TAs, Once_TAs, Fill, Rate, Defaultdomain, &added, THR_LOCK_UNLOCK); Total_added += added; if (Verbose > 3) sm_io_fprintf(smioerr, "func=aq_t_add, added=%u, total=%u, ret=%d\n", added, Total_added, ret); SM_TEST_ERR(ret); if (added > 0) { if (Verbose > 1) sm_io_fprintf(smioerr, "func=aq_t_add, added=%u, total=%u, ret=%d\n", added, Total_added, ret); wakeup = true; } else if (0 == added ) { if (Verbose > 1) sm_io_fprintf(smioerr, "func=aq_t_add, added=%u, ret=%d\n", added, ret); if (SM_NOTDONE == ret && Done < DONE_ADD) Done = DONE_ADD; } } while (Rand > 0 && added > 0 && Done < DONE_ADD && rand() < Rand); if (wakeup) { ret = evthr_wakeup_task(Tsk_sched); SM_TEST_E(0 == ret); } if (Verbose > 3) sm_io_fprintf(smioerr, "func=aq_t_add, added=%u, total=%u, Done=%d\n", added, Total_added, Done); delay.tv_usec = 1; delay.tv_sec = Done != DONE_NONE ? 9 : 1; timeradd(&tv_now, &delay, &tsk->evthr_t_sleep); return EVTHR_SLPQ; error: sm_io_fprintf(smioerr, "func=aq_t_add, error=%d\n", ret); return EVTHR_TERM; } /* ** AQ_T_SCHED -- "schedule" entries from AQ */ static uint32_t Total_scheduled = 0; static sm_ret_T aq_t_sched(sm_evthr_task_P tsk) { sm_ret_T ret; int i, status; uint todo_entries, entries_scheduled; bool done, aq_locked; aq_rcpt_P aq_rcpt; aqrdq_ctx_P aqrdq; timeval_T tv_now, delay; time_T time_now; entries_scheduled = 0; done = false; aq_locked = false; ret = SM_FAILURE; SM_ASSERT(!aq_locked); status = pthread_mutex_lock(&Aq_ctx->aq_mutex); SM_TEST_E(0 == status); aq_locked = true; ret = evthr_timeval(Ev_ctx, &tv_now); if (Verbose > 1) sm_io_fprintf(smioerr, "func=aq_t_sched, called=%ld.%06ld\n", tv_now.tv_sec, tv_now.tv_usec); time_now = tv_now.tv_sec; /* only one... */ aqrdq = AQ_RDQS_FIRST(Aq_ctx->aq_rdqs); if (NULL == aqrdq) { /* no entries have been added? */ if (Verbose > 0) sm_io_fprintf(smioerr, "func=aq_t_sched, aq_rdqs=empty\n"); goto unlock; } todo_entries = aqrdq->aqrdq_entries; if (0 == todo_entries) { if (Verbose > 0) sm_io_fprintf(smioerr, "func=aq_t_sched, todo=0\n"); goto unlock; } /* don't schedule more entries than added in a single invocation */ if (todo_entries > Once_TAs && Done == DONE_NONE) { todo_entries = Once_TAs; } #if HAVE_MONCONTROL moncontrol(1); #endif for (i = 0; i < todo_entries; i++, entries_scheduled++) { dadb_entry_P dadb_entry; if (AQ_RDQ_EMPTY(aqrdq->aqrdq_rcpts)) break; aq_rcpt = AQ_RDQ_FIRST(aqrdq->aqrdq_rcpts); SM_TEST_E(aq_rcpt != NULL); SM_IS_AQ_RCPT(aq_rcpt); dadb_entry = NULL; #if 0 ret = dadb_se_find_by_ipv4(qsc_ctx->qsc_dadb_ctx, aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4, time_now, &dadb_entry); #endif if (dadb_entry == NULL) { /* open a DA session */ ret = dadb_sess_open(qsc_ctx, qsc_ctx->qsc_dadb_ctx, aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4, aq_rcpt, &dadb_entry); SM_TEST_ERR(ret); SM_TEST_E(dadb_entry != NULL); SESSTA_COPY(aq_rcpt->aqr_da_ta_id, dadb_entry->dadbe_da_ta_id); AQR_DA_INIT(aq_rcpt); if (Verbose > 3) sm_io_fprintf(smioerr, "func=aq_t_sched, da_da_id=%s\n", aq_rcpt->aqr_da_ta_id); } ret = aq_rdq_rm(Aq_ctx, aq_rcpt, THR_NO_LOCK, Lctx); SM_TEST_ERR(ret); ret = aq_waitq_add(Aq_ctx, aq_rcpt, time_now, AQWQ_DA, false); SM_TEST_ERR(ret); } #if HAVE_MONCONTROL moncontrol(1); #endif unlock: if (aq_locked) { status = pthread_mutex_unlock(&Aq_ctx->aq_mutex); SM_TEST_E(0 == status); aq_locked = false; } if (entries_scheduled) { ret = evthr_wakeup_task(Tsk_del); SM_TEST_E(0 == ret); } Total_scheduled += entries_scheduled; if (Total_scheduled >= Total_TAs && Done < DONE_SCHED) Done = DONE_SCHED; if (Verbose > 1) sm_io_fprintf(smioerr, "func=aq_t_sched, scheduled=%u, total=%u, return=%d, Done=%d\n" , entries_scheduled, Total_scheduled, ret, Done); delay.tv_usec = 0; delay.tv_sec = 30; timeradd(&tv_now, &delay, &tsk->evthr_t_sleep); return EVTHR_SLPQ; error: if (aq_locked) { status = pthread_mutex_unlock(&Aq_ctx->aq_mutex); SM_TEST_E(0 == status); aq_locked = false; } sm_io_fprintf(smioerr, "func=aq_t_sched, error=%d\n", ret); return EVTHR_TERM; } /* ** AQ_T_DEL -- delete entries from AQ */ static uint32_t Total_removed = 0; static sm_ret_T aq_t_del(sm_evthr_task_P tsk) { sm_ret_T ret, flags; int i, status; uint todo_entries, entries_removed; bool done, aq_locked; aq_rcpt_P aq_rcpt; aq_ta_P aq_ta; timeval_T tv_now, delay; dadb_entry_P dadb_entry; entries_removed = 0; done = false; aq_locked = false; ret = evthr_timeval(Ev_ctx, &tv_now); if (Verbose > 1) sm_io_fprintf(smioerr, "func=aq_t_del, called=%ld.%06ld\n", tv_now.tv_sec, tv_now.tv_usec); todo_entries = Once_TAs; #if HAVE_MONCONTROL moncontrol(1); #endif for (i = 0; i < todo_entries || Done == DONE_SCHED; i++, entries_removed++) { status = pthread_mutex_lock(&Aq_ctx->aq_mutex); SM_TEST_E(0 == status); aq_locked = true; if (AQR_WAITQ_EMPTY(Aq_ctx)) break; aq_rcpt = AQR_WAITQ_FIRST(Aq_ctx); SM_TEST_E(aq_rcpt != NULL); SM_IS_AQ_RCPT(aq_rcpt); status = pthread_mutex_unlock(&Aq_ctx->aq_mutex); SM_TEST_E(0 == status); aq_locked = false; /* not necessary here, but required to "emulate" the real algorithm */ if (Verbose > 3) sm_io_fprintf(smioerr, "func=aq_t_del, da_da_id=%s\n", aq_rcpt->aqr_da_ta_id); ret = dadb_ta_find(qsc_ctx->qsc_dadb_ctx, aq_rcpt->aqr_da_ta_id, &dadb_entry); SM_TEST_ERR(ret); SM_TEST_E(dadb_entry != NULL); SM_TEST_E(dadb_entry->dadbe_ss_ta_id != NULL); SM_TEST_E(*dadb_entry->dadbe_ss_ta_id != '\0'); if (dadb_entry->dadbe_rcpt != NULL && (aq_ta = dadb_entry->dadbe_rcpt->aqr_ss_ta) != NULL) { SM_ASSERT(SESSTA_EQ(dadb_entry->dadbe_ss_ta_id, aq_ta->aqt_ss_ta_id)); } else ret = aq_ta_find(Qmgr_ctx->qmgr_aq, dadb_entry->dadbe_ss_ta_id, true, &aq_ta); /* session must be closed */ DADBE_SET_FLAG(dadb_entry, DADBE_FL_SE_CL); #if 0 /* Update transaction status */ ret = qda_update_ta_stat(Qmgr_ctx, dadb_entry->dadbe_da_ta_id, 0, 0, qsc_ctx->qsc_dadb_ctx, dadb_entry, aq_ta, NULL, NULL, THR_LOCK_UNLOCK); #else ret = qda_dadb_close(Qmgr_ctx, dadb_entry->dadbe_da_ta_id, qsc_ctx->qsc_dadb_ctx, dadb_entry, status, &flags); #endif ret = aq_waitq_rm(Aq_ctx, aq_rcpt, AQWQ_DA, true); SM_TEST_ERR(ret); aq_ta = aq_rcpt->aqr_ss_ta; SM_TEST_E(aq_ta != NULL); SM_TEST_ERR(ret); ret = aq_rcpt_rm(Aq_ctx, aq_rcpt, AQR_RM_LOCK|AQR_RM_N_RDQ|AQR_RM_I_RDQ); SM_TEST_ERR(ret); ret = aq_ta_rm(Aq_ctx, aq_ta, true); SM_TEST_ERR(ret); } #if HAVE_MONCONTROL moncontrol(1); #endif if (aq_locked) { status = pthread_mutex_unlock(&Aq_ctx->aq_mutex); SM_TEST_E(0 == status); aq_locked = false; } if (Done < DONE_ADD) { ret = evthr_wakeup_task(Tsk_add); SM_TEST_E(0 == ret); } else if (Done < DONE_SCHED) { ret = evthr_wakeup_task(Tsk_sched); SM_TEST_E(0 == ret); } Total_removed += entries_removed; if (Total_removed >= Total_TAs && Done < DONE_DEL) Done = DONE_DEL; if (Verbose > 1) sm_io_fprintf(smioerr, "func=aq_t_del, removed=%u, total=%u, AQ=%u, return=%d, Done=%d\n" , entries_removed, Total_removed, Total_added - Total_removed, ret, Done); if (Done >= DONE_SCHED) { delay.tv_usec = 0; delay.tv_sec = 0; } else { delay.tv_usec = 0; delay.tv_sec = 1; } timeradd(&tv_now, &delay, &tsk->evthr_t_sleep); return (Done == DONE_DEL) ? EVTHR_TERM : EVTHR_SLPQ; error: if (aq_locked) { status = pthread_mutex_unlock(&Aq_ctx->aq_mutex); SM_TEST_E(0 == status); aq_locked = false; } sm_io_fprintf(smioerr, "func=aq_t_del, error=%d\n", ret); return EVTHR_TERM; } static sm_ret_T aq_t_init(void) { sm_ret_T ret; sm_logconfig_P lcfg; sessta_id_T ta_id; ret = SM_SUCCESS; Aq_ctx = NULL; Qmgr_ctx = NULL; Qmgr_ctx = (qmgr_ctx_P) sm_zalloc(sizeof(*Qmgr_ctx)); Qmgr_ctx->qmgr_cnf.q_cnf_tmo_da = AQR_DA_TMOUT; Qmgr_ctx->sm_magic = SM_QMGR_CTX_MAGIC; Qmgr_ctx->qmgr_cnf.sm_magic = SM_QMGR_CNF_MAGIC; ret = aq_open(Qmgr_ctx, &Aq_ctx, Aq_size, 0); SM_TEST_ERR(ret); ret = qsc_ctx_new(Qmgr_ctx, 1, &qsc_ctx); SM_TEST_ERR(ret); qsc_ctx->qsc_id = 1; ret = qsc_id_init(qsc_ctx, 0); SM_TEST_ERR(ret); ret = dadb_new(&qsc_ctx->qsc_dadb_ctx, Ndas); SM_TEST_ERR(ret); ret = occ_open(&Qmgr_ctx->qmgr_occ_ctx, 512); SM_TEST_ERR(ret); sm_snprintf(ta_id, SMTP_STID_SIZE, SMTPS_STID_FORMAT, (id_count_T)0, 0); Tst_CDB_id = sm_cstr_scpyn((uchar *)ta_id, sizeof(ta_id)); if (NULL == Tst_CDB_id) return sm_error_temp(SM_EM_Q, ENOMEM); Defaultdomain = sm_str_scpy(NULL, "example.com", 64); SM_TEST_E(Defaultdomain != NULL); ret = sm_log_create(NULL, &Lctx, &lcfg); SM_TEST_ERR(ret); ret = sm_log_setfp_fd(Lctx, smioerr, SMIOERR_FILENO); SM_TEST_ERR(ret); ret = sm_log_setdebuglevel(Lctx, 0); SM_TEST_ERR(ret); srand(time(0)); /* initialize event threads system */ ret = evthr_init(&Ev_ctx, 2, 6, 2); if (sm_is_err(ret)) { sm_io_fprintf(smioerr, "sev=ERROR, func=aq_t_init1, evthr_init=%m\n", ret); SM_TEST_ERR(ret); } #if EVTHR_DEBUG evthr_set_dbglvl(Ev_ctx, Ev_dbg); #endif Qmgr_ctx->qmgr_ev_ctx = Ev_ctx; return ret; error: return ret; } static sm_ret_T aq_t_start(void) { sm_ret_T ret; timeval_T tv_now, sleept, delay; ret = evthr_timeval(Ev_ctx, &tv_now); SM_TEST_ERR(ret); delay.tv_usec = 1; delay.tv_sec = 0; timeradd(&tv_now, &delay, &sleept); ret = evthr_task_new(Ev_ctx, &Tsk_add, EVTHR_EV_SL, INVALID_FD, &sleept, aq_t_add, &Aq_ctx); SM_TEST_ERR(ret); delay.tv_usec = 0; delay.tv_sec = 80000; timeradd(&tv_now, &delay, &sleept); ret = evthr_task_new(Ev_ctx, &Tsk_sched, EVTHR_EV_SL, INVALID_FD, &sleept, aq_t_sched, &Aq_ctx); SM_TEST_ERR(ret); delay.tv_usec = 0; delay.tv_sec = 90000; timeradd(&tv_now, &delay, &sleept); ret = evthr_task_new(Ev_ctx, &Tsk_del, EVTHR_EV_SL, INVALID_FD, &sleept, aq_t_del, &Aq_ctx); SM_TEST_ERR(ret); return ret; error: return ret; } /* ** AQ_T_PERF -- add/remove entries to/from AQ */ static sm_ret_T aq_t_perf(void) { sm_ret_T ret; ret = aq_t_init(); SM_TEST_ERR(ret); ret = aq_t_start(); SM_TEST_ERR(ret); ret = evthr_loop(Ev_ctx); SM_TEST_ERR(ret); ret = aq_close(Aq_ctx); return ret; error: return ret; } static void usage(const char *prg) { sm_io_fprintf(smioerr, "usage: %s [options]\n", prg); sm_io_fprintf(smioerr, "Test AQ insertion, lookups, and removal (domains are random numbers)\n"); sm_io_fprintf(smioerr, "options:\n"); sm_io_fprintf(smioerr, "-e n specify number of recipients to add each iteration\n"); sm_io_fprintf(smioerr, "-f n specify allowed fill of AQ (per cent)\n"); sm_io_fprintf(smioerr, "-n n specify total number of recipients to use\n"); sm_io_fprintf(smioerr, "-R n repead addition if rand() is less than n\n"); sm_io_fprintf(smioerr, "-r n fill rate\n"); sm_io_fprintf(smioerr, "-s n specify size of AQ\n"); sm_io_fprintf(smioerr, "-V increase verbosity\n"); exit(EX_USAGE); } int main(int argc, char **argv) { int c; sm_ret_T r; char *prg; #if HAVE_MONCONTROL moncontrol(1); #endif prg = argv[0]; Total_TAs = SM_AQ_RCPTS; Aq_size = SM_AQ_RCPTS; Once_TAs = Total_TAs / 10; #if SM_HEAP_CHECK SmHeapCheck = 0; #endif while ((c = getopt(argc, argv, "E:e:f:H:l:n:R:r:s:V")) != -1) { switch (c) { case 'E': Ev_dbg = (int) strtoul(optarg, NULL, 0); break; case 'e': Once_TAs = (uint) strtoul(optarg, NULL, 0); break; case 'f': Fill = (uint) strtoul(optarg, NULL, 0); break; #if SM_HEAP_CHECK case 'H': SmHeapCheck = atoi(optarg); break; #endif case 'n': Total_TAs = (uint) strtoul(optarg, NULL, 0); break; case 'R': Rand = (int) strtoul(optarg, NULL, 0); break; case 'r': Rate = (uint) strtoul(optarg, NULL, 0); break; case 's': Aq_size = (uint) strtoul(optarg, NULL, 0); break; case 'V': ++Verbose; break; default: usage(prg); return EX_USAGE; } } sm_test_begin(argc, argv, "test AQ perf 2"); r = aq_t_perf(); #if SM_HEAP_CHECK if (HEAP_CHECK) { sm_io_fprintf(smioout, "heap should be empty except for makebuf:\n"); sm_heap_report(smioout, 3); } #endif sm_io_flush(smioout); return sm_test_end(); }