/* * Copyright (c) 2006 Claus Assmann * * By using this file, you agree to the terms and conditions set * forth in the license/LICENSE.3C file which can be found at the * top level of this source code distribution. */ /* ** (performance) Test program to add entries to AQ and remove them from AQ. ** This test isn't very realistic as adding and deleting run alternating; ** each task adds/removes a given number of entries (unless a "fill rate" is ** exceeded) and then wakes up the other task. ** Should the code randomly invoke the "add" task more often? ** For example, use a loop with a random (small) size? ** ** Moreover, the code is slightly "outdated": it uses aq_ta_find() ** whereas the code in qm_fr_sc_react() accesses aq_ta directly via dadb ** (which isn't used in this test program at all). */ #include "sm/generic.h" SM_IDSTR(id, "@(#)$Id: t-aq-perf-1.c,v 1.6 2007/01/01 02:10:25 ca Exp $") #include "sm/debug.h" #include "sm/heap.h" #include "sm/sysexits.h" #include "sm/test.h" #include "sm/io.h" #include "sm/rfc2821.h" #include "sm/actdb-int.h" #include "sm/aqrdq.h" #define QMGR_DEBUG_DEFINE 1 #include "sm/qmgr-int.h" #include "sm/qmgrdbg.h" #define QMGR_LOG_DEFINES 1 #include "log.h" static int Verbose = 0; static int Rand = 0; static bool Done = false; static aq_ctx_P Aq_ctx; static uint Once_TAs; static uint Total_TAs; static uint Fill = 90; static uint Rate = 0; static uint Aq_size = 90; static sm_evthr_ctx_P Ev_ctx; static sm_evthr_task_P Tsk_add, Tsk_del; #if SM_HEAP_CHECK # include "sm/io.h" extern SM_DEBUG_T SmHeapCheck; # define HEAP_CHECK (SmHeapCheck > 0) #else # define HEAP_CHECK 0 #endif #define SM_AQ_RCPTS 256 #define RCPT_MAX_LEN 256 sm_cstr_P Tst_CDB_id; static sm_log_ctx_P Lctx = NULL; static sm_str_P Defaultdomain = NULL; /* ** AQ_T_ADD -- add entries to AQ and rdq */ static uint32_t Total_added; static sm_ret_T aq_t_add(sm_evthr_task_P tsk) { sm_ret_T ret; uint32_t added; bool wakeup; timeval_T tv_now, delay; time_T now; ret = evthr_timeval(Ev_ctx, &tv_now); SM_TEST_ERR(ret); now = tv_now.tv_sec; wakeup = false; if (Verbose > 1) sm_io_fprintf(smioerr, "func=aq_t_add, called=%ld.%06ld\n", tv_now.tv_sec, tv_now.tv_usec); do { ret = qm_test_fill_aq(Lctx, Aq_ctx, now, Total_TAs, Once_TAs, Fill, Rate, Defaultdomain, &added, THR_LOCK_UNLOCK); Total_added += added; if (Verbose > 3) sm_io_fprintf(smioerr, "func=aq_t_add, added=%u, total=%u, ret=%d\n", added, Total_added, ret); SM_TEST_ERR(ret); if (added > 0) { if (Verbose > 1) sm_io_fprintf(smioerr, "func=aq_t_add, added=%u, total=%u, ret=%d\n", added, Total_added, ret); wakeup = true; } else if (0 == added ) { if (Verbose > 1) sm_io_fprintf(smioerr, "func=aq_t_add, added=%u, ret=%d\n", added, ret); if (SM_NOTDONE == ret) Done = true; } } while (Rand > 0 && added > 0 && !Done && rand() < Rand); if (wakeup) { ret = evthr_wakeup_task(Tsk_del); SM_TEST_E(0 == ret); } if (Verbose > 3) sm_io_fprintf(smioerr, "func=aq_t_add, added=%u, total=%u, Done=%d\n", added, Total_added, Done); delay.tv_usec = 1; delay.tv_sec = Done ? 9 : 0; timeradd(&tv_now, &delay, &tsk->evthr_t_sleep); return EVTHR_SLPQ; error: sm_io_fprintf(smioerr, "func=aq_t_add, error=%d\n", ret); return EVTHR_TERM; } /* ** AQ_T_DEL -- delete entries from AQ */ static uint32_t Total_removed; static sm_ret_T aq_t_del(sm_evthr_task_P tsk) { sm_ret_T ret; int i, status; uint todo_entries, entries_removed; bool done, aq_locked; aq_rcpt_P aq_rcpt; aq_ta_P aq_ta; aqrdq_ctx_P aqrdq; timeval_T tv_now, delay; entries_removed = 0; done = false; aq_locked = false; ret = SM_FAILURE; SM_ASSERT(!aq_locked); status = pthread_mutex_lock(&Aq_ctx->aq_mutex); SM_TEST_E(0 == status); aq_locked = true; ret = evthr_timeval(Ev_ctx, &tv_now); if (Verbose > 1) sm_io_fprintf(smioerr, "func=aq_t_del, called=%ld.%06ld\n", tv_now.tv_sec, tv_now.tv_usec); aqrdq = AQ_RDQS_FIRST(Aq_ctx->aq_rdqs); if (NULL == aqrdq) { /* no entries have been added? */ if (Verbose > 0) sm_io_fprintf(smioerr, "func=aq_t_del, aq_rdqs=empty\n"); goto unlock; } todo_entries = aqrdq->aqrdq_entries; if (0 == todo_entries) { if (Verbose > 0) sm_io_fprintf(smioerr, "func=aq_t_del, todo=0\n"); goto unlock; } status = pthread_mutex_unlock(&Aq_ctx->aq_mutex); SM_TEST_E(0 == status); aq_locked = false; /* don't remove more entries than added in a single invocation */ if (todo_entries > Once_TAs && !Done) { todo_entries = Once_TAs; } #if HAVE_MONCONTROL moncontrol(1); #endif for (i = 0; i < todo_entries; i++, entries_removed++) { status = pthread_mutex_lock(&Aq_ctx->aq_mutex); SM_TEST_E(0 == status); aq_locked = true; if (AQ_RDQ_EMPTY(aqrdq->aqrdq_rcpts)) break; aq_rcpt = AQ_RDQ_FIRST(aqrdq->aqrdq_rcpts); SM_TEST_E(aq_rcpt != NULL); SM_IS_AQ_RCPT(aq_rcpt); status = pthread_mutex_unlock(&Aq_ctx->aq_mutex); SM_TEST_E(0 == status); aq_locked = false; ret = aq_ta_find(Aq_ctx, aq_rcpt->aqr_ss_ta_id, true, &aq_ta); SM_TEST_ERR(ret); ret = aq_rdq_rm(Aq_ctx, aq_rcpt, THR_LOCK_UNLOCK, Lctx); SM_TEST_ERR(ret); ret = aq_rcpt_rm(Aq_ctx, aq_rcpt, AQR_RM_LOCK|AQR_RM_N_RDQ|AQR_RM_I_RDQ); SM_TEST_ERR(ret); ret = aq_ta_rm(Aq_ctx, aq_ta, true); SM_TEST_ERR(ret); } #if HAVE_MONCONTROL moncontrol(1); #endif unlock: if (aq_locked) { status = pthread_mutex_unlock(&Aq_ctx->aq_mutex); SM_TEST_E(0 == status); aq_locked = false; } if (!Done) { ret = evthr_wakeup_task(Tsk_add); SM_TEST_E(0 == ret); } Total_removed += entries_removed; if (Verbose > 1) sm_io_fprintf(smioerr, "func=aq_t_del, removed=%u, total=%u, AQ=%u, return=%d, Done=%d\n" , entries_removed, Total_removed, Total_added - Total_removed, ret, Done); delay.tv_usec = 0; delay.tv_sec = 1; timeradd(&tv_now, &delay, &tsk->evthr_t_sleep); return Done ? EVTHR_TERM : EVTHR_SLPQ; error: if (aq_locked) { status = pthread_mutex_unlock(&Aq_ctx->aq_mutex); SM_TEST_E(0 == status); aq_locked = false; } sm_io_fprintf(smioerr, "func=aq_t_del, error=%d\n", ret); return EVTHR_TERM; } static sm_ret_T aq_t_init(void) { sm_ret_T ret; sm_logconfig_P lcfg; sessta_id_T ta_id; ret = SM_SUCCESS; Aq_ctx = NULL; ret = aq_open(NULL /*qmgr_ctx*/, &Aq_ctx, Aq_size, 0); SM_TEST_ERR(ret); sm_snprintf(ta_id, SMTP_STID_SIZE, SMTPS_STID_FORMAT, (id_count_T)0, 0); Tst_CDB_id = sm_cstr_scpyn((uchar *)ta_id, sizeof(ta_id)); if (NULL == Tst_CDB_id) return sm_error_temp(SM_EM_Q, ENOMEM); Defaultdomain = sm_str_scpy(NULL, "example.com", 64); SM_TEST_E(Defaultdomain != NULL); ret = sm_log_create(NULL, &Lctx, &lcfg); SM_TEST_ERR(ret); ret = sm_log_setfp_fd(Lctx, smioerr, SMIOERR_FILENO); SM_TEST_ERR(ret); ret = sm_log_setdebuglevel(Lctx, 10); SM_TEST_ERR(ret); srand(time(0)); /* initialize event threads system */ ret = evthr_init(&Ev_ctx, 2, 6, 2); if (sm_is_err(ret)) { sm_io_fprintf(smioerr, "sev=ERROR, func=aq_t_init1, evthr_init=%m\n", ret); SM_TEST_ERR(ret); } #if EVTHR_DEBUG evthr_set_dbglvl(Ev_ctx, 0); #endif return ret; error: return ret; } static sm_ret_T aq_t_start(void) { sm_ret_T ret; timeval_T tv_now, sleept, delay; ret = evthr_timeval(Ev_ctx, &tv_now); SM_TEST_ERR(ret); delay.tv_usec = 1; delay.tv_sec = 0; timeradd(&tv_now, &delay, &sleept); ret = evthr_task_new(Ev_ctx, &Tsk_add, EVTHR_EV_SL, INVALID_FD, &sleept, aq_t_add, &Aq_ctx); SM_TEST_ERR(ret); delay.tv_usec = 0; delay.tv_sec = 1; timeradd(&tv_now, &delay, &sleept); ret = evthr_task_new(Ev_ctx, &Tsk_del, EVTHR_EV_SL, INVALID_FD, &sleept, aq_t_del, &Aq_ctx); SM_TEST_ERR(ret); return ret; error: return ret; } /* ** AQ_T_PERF -- add/remove entries to/from AQ */ static sm_ret_T aq_t_perf(void) { sm_ret_T ret; ret = aq_t_init(); SM_TEST_ERR(ret); ret = aq_t_start(); SM_TEST_ERR(ret); ret = evthr_loop(Ev_ctx); SM_TEST_ERR(ret); ret = aq_close(Aq_ctx); return ret; error: return ret; } static void usage(const char *prg) { sm_io_fprintf(smioerr, "usage: %s [options]\n", prg); sm_io_fprintf(smioerr, "Test AQ insertion, lookups, and removal (domains are random numbers)\n"); sm_io_fprintf(smioerr, "options:\n"); sm_io_fprintf(smioerr, "-e n specify number of recipients to add each iteration\n"); sm_io_fprintf(smioerr, "-f n specify allowed fill of AQ (per cent)\n"); sm_io_fprintf(smioerr, "-n n specify total number of recipients to use\n"); sm_io_fprintf(smioerr, "-R n repead addition if rand() is less than n\n"); sm_io_fprintf(smioerr, "-r n fill rate\n"); sm_io_fprintf(smioerr, "-s n specify size of AQ\n"); sm_io_fprintf(smioerr, "-V increase verbosity\n"); exit(EX_USAGE); } int main(int argc, char **argv) { int c; sm_ret_T r; char *prg; #if HAVE_MONCONTROL moncontrol(1); #endif prg = argv[0]; Total_TAs = SM_AQ_RCPTS; Aq_size = SM_AQ_RCPTS; Once_TAs = Total_TAs / 10; #if SM_HEAP_CHECK SmHeapCheck = 0; #endif while ((c = getopt(argc, argv, "Ce:f:H:l:n:R:r:s:V")) != -1) { switch (c) { case 'e': Once_TAs = (uint) strtoul(optarg, NULL, 0); break; case 'f': Fill = (uint) strtoul(optarg, NULL, 0); break; #if SM_HEAP_CHECK case 'H': SmHeapCheck = atoi(optarg); break; #endif case 'n': Total_TAs = (uint) strtoul(optarg, NULL, 0); break; case 'R': Rand = (int) strtoul(optarg, NULL, 0); break; case 'r': Rate = (uint) strtoul(optarg, NULL, 0); break; case 's': Aq_size = (uint) strtoul(optarg, NULL, 0); break; case 'V': ++Verbose; break; default: usage(prg); return EX_USAGE; } } sm_test_begin(argc, argv, "test AQ perf 1"); r = aq_t_perf(); #if SM_HEAP_CHECK if (HEAP_CHECK) { sm_io_fprintf(smioout, "heap should be empty except for makebuf:\n"); sm_heap_report(smioout, 3); } #endif sm_io_flush(smioout); return sm_test_end(); }