/* * Copyright (c) 2006 Claus Assmann * * By using this file, you agree to the terms and conditions set * forth in the license/LICENSE.3C file which can be found at the * top level of this source code distribution. */ #include "sm/generic.h" SM_IDSTR(id, "@(#)$Id: t-aq-perf-0.c,v 1.10 2006/12/30 23:04:56 ca Exp $") #include "sm/debug.h" #include "sm/heap.h" #include "sm/sysexits.h" #include "sm/test.h" #include "sm/io.h" #include "sm/rfc2821.h" #include "sm/actdb-int.h" #include "sm/aqrdq.h" #define QMGR_DEBUG_DEFINE 1 #include "sm/qmgr-int.h" #include "sm/qmgrdbg.h" #define QMGR_LOG_DEFINES 1 #include "log.h" static int Verbose = 0; static bool Concurrent = false; static bool Done = false; static pthread_cond_t cond_added, cond_deleted; static aq_ctx_P Aq_ctx; static uint Once_TAs; static uint Total_TAs; static uint Fill = 90; static uint Rate = 0; static uint Aq_size = 90; #if SM_HEAP_CHECK # include "sm/io.h" extern SM_DEBUG_T SmHeapCheck; # define HEAP_CHECK (SmHeapCheck > 0) #else # define HEAP_CHECK 0 #endif #define SM_AQ_RCPTS 256 #define RCPT_MAX_LEN 256 sm_cstr_P Tst_CDB_id; sessta_id_T *ta_ids = NULL; sm_log_ctx_P Lctx = NULL; sm_str_P Defaultdomain = NULL; /* ** AQ_T_ADD -- add entries to AQ and rdq */ static sm_ret_T aq_t_add(void) { sm_ret_T ret; uint32_t added, total; int status; time_T time_now; total = 0; ret = SM_FAILURE; do { time_now = time(NULL); status = pthread_mutex_lock(&Aq_ctx->aq_mutex); SM_TEST_E(0 == status); if (Verbose > 3) sm_io_fprintf(smioerr, "func=aq_t_add, locked=1\n"); ret = qm_test_fill_aq(Lctx, Aq_ctx, time_now, Total_TAs, Once_TAs, Fill, Rate, Defaultdomain, &added, THR_NO_LOCK); if (Verbose > 2) sm_io_fprintf(smioerr, "func=aq_t_add, added=%u, todal=%u, ret=%d\n", added, total, ret); if (0 == ret) SM_ASSERT(AQ_RDQS_FIRST(Aq_ctx->aq_rdqs) != NULL); status = pthread_mutex_unlock(&Aq_ctx->aq_mutex); SM_ASSERT(status == 0); if (Verbose > 3) sm_io_fprintf(smioerr, "func=aq_t_add, unlocked=1\n"); SM_TEST_ERR(ret); total += added; if (added > 0 && Concurrent) { if (Verbose > 1) sm_io_fprintf(smioerr, "func=aq_t_add, added=%u, todal=%u\n", added, total); status = pthread_cond_signal(&cond_added); SM_TEST_E(0 == status); } if (0 == added && Concurrent) { if (Verbose > 1) sm_io_fprintf(smioerr, "func=aq_t_add, added=%u, ret=%d\n", added, ret); if (SM_NOTDONE == ret) { Done = true; break; } status = pthread_cond_wait(&cond_deleted, &Aq_ctx->aq_mutex); if (Verbose > 1) sm_io_fprintf(smioerr, "func=aq_t_add, deleted\n"); SM_TEST_E(0 == status); } if (SM_NOTDONE == ret) { if (Verbose > 1) sm_io_fprintf(smioerr, "func=aq_t_add, done=full\n"); return ret; } } while (Concurrent); if (Verbose > 1) sm_io_fprintf(smioerr, "func=aq_t_add, return=%d\n", ret); return ret; error: sm_io_fprintf(smioerr, "func=aq_t_add, error=%d\n", ret); return ret; } #if QM_TEST_SCHED /* ** AQ_T_SCHED -- take entries from rdq and move them to waitq */ static sm_ret_T aq_t_sched(void) { sm_ret_T ret; aq_rcpt_P aq_rcpt; aqrdq_ctx_P aqrdq; uint todo_entries; ret = SM_SUCCESS; aqrdq = AQ_RDQS_FIRST(Aq_ctx->aq_rdqs); todo_entries = aqrdq->aqrdq_entries; if (0 == todo_entries) return SM_NOTDONE; /* ??? move to waitq ??? */ ret = aq_rdq_rm(aq_ctx, aq_rcpt, THR_NO_LOCK, NULL); return ret; } #endif /* QM_TEST_SCHED */ /* ** AQ_T_DEL -- delete entries from AQ */ static sm_ret_T aq_t_del(void) { sm_ret_T ret; int i, status; uint todo_entries, entries_removed; bool done, aq_locked; aq_rcpt_P aq_rcpt; aq_ta_P aq_ta; aqrdq_ctx_P aqrdq; entries_removed = 0; done = false; aq_locked = false; do { ret = SM_FAILURE; SM_ASSERT(!aq_locked); status = pthread_mutex_lock(&Aq_ctx->aq_mutex); SM_TEST_E(0 == status); aq_locked = true; aqrdq = AQ_RDQS_FIRST(Aq_ctx->aq_rdqs); while (Concurrent && NULL == aqrdq) { status = pthread_mutex_unlock(&Aq_ctx->aq_mutex); SM_TEST_E(0 == status); aq_locked = false; if (Verbose > 1) sm_io_fprintf(smioerr, "func=aq_t_del, wait\n"); status = pthread_cond_wait(&cond_added, &Aq_ctx->aq_mutex); SM_TEST_E(0 == status); done = Done; aq_locked = true; aqrdq = AQ_RDQS_FIRST(Aq_ctx->aq_rdqs); if (Concurrent && NULL == aqrdq) { if (Verbose) sm_io_fprintf(smioerr, "ar_t_del=signal_but_no_rdq\n"); } else break; } SM_TEST_E(aqrdq != NULL); todo_entries = aqrdq->aqrdq_entries; if (0 == todo_entries) { if (Verbose > 1) sm_io_fprintf(smioerr, "func=aq_t_del, todo=0\n"); if (aq_locked) { status = pthread_mutex_unlock(&Aq_ctx->aq_mutex); SM_TEST_E(0 == status); aq_locked = false; } return SM_NOTDONE; } #if HAVE_MONCONTROL moncontrol(1); #endif SM_ASSERT(aq_locked); for (i = 0; i < todo_entries && !AQ_RDQ_EMPTY(aqrdq->aqrdq_rcpts); i++, entries_removed++) { aq_rcpt = AQ_RDQ_FIRST(aqrdq->aqrdq_rcpts); SM_TEST_E(aq_rcpt != NULL); SM_IS_AQ_RCPT(aq_rcpt); /* ** Outdated: new code in qm_fr_sc_react() accesses ** aq_ta directly via dadb (which isn't used in this ** test program at all). */ ret = aq_ta_find(Aq_ctx, aq_rcpt->aqr_ss_ta_id, false, &aq_ta); SM_TEST_ERR(ret); ret = aq_rdq_rm(Aq_ctx, aq_rcpt, THR_NO_LOCK, Lctx); SM_TEST_ERR(ret); ret = aq_rcpt_rm(Aq_ctx, aq_rcpt, AQR_RM_N_RDQ|AQR_RM_I_RDQ); SM_TEST_ERR(ret); ret = aq_ta_rm(Aq_ctx, aq_ta, false); SM_TEST_ERR(ret); } #if HAVE_MONCONTROL moncontrol(1); #endif if (Concurrent) { status = pthread_cond_signal(&cond_deleted); SM_TEST_E(0 == status); if (Verbose > 1) sm_io_fprintf(smioerr, "func=aq_t_del, signal=sent, entries_removed=%u, done=%d\n", entries_removed, done); if (done) break; if (entries_removed >= Total_TAs) break; } if (aq_locked) { status = pthread_mutex_unlock(&Aq_ctx->aq_mutex); SM_TEST_E(0 == status); aq_locked = false; } } while (Concurrent && ret != SM_NOTDONE); if (aq_locked) { status = pthread_mutex_unlock(&Aq_ctx->aq_mutex); SM_TEST_E(0 == status); aq_locked = false; } if (Verbose > 1) sm_io_fprintf(smioerr, "func=aq_t_del, return=%d\n", ret); return ret; error: if (aq_locked) { status = pthread_mutex_unlock(&Aq_ctx->aq_mutex); SM_TEST_E(0 == status); aq_locked = false; } sm_io_fprintf(smioerr, "func=aq_t_del, error=%d\n", ret); return ret; } static void * aq_thr_add(void *arg) { sm_ret_T ret; ret = aq_t_add(); return NULL; } static void * aq_thr_del(void *arg) { sm_ret_T ret; ret = aq_t_del(); return NULL; } /* ** AQ_T_PERF -- add/remove entries to/from AQ */ static sm_ret_T aq_t_perf(void) { sm_ret_T ret, reta; uint i; int status; sessta_id_T ta_id; sm_logconfig_P lcfg; pthread_t thr1, thr2; Aq_ctx = NULL; ret = aq_open(NULL /*qmgr_ctx*/, &Aq_ctx, Aq_size, 0); SM_TEST_ERR(ret); sm_snprintf(ta_id, SMTP_STID_SIZE, SMTPS_STID_FORMAT, (id_count_T)0, 0); Tst_CDB_id = sm_cstr_scpyn((uchar *)ta_id, sizeof(ta_id)); if (NULL == Tst_CDB_id) return sm_error_temp(SM_EM_Q, ENOMEM); Defaultdomain = sm_str_scpy(NULL, "example.com", 64); SM_TEST_E(Defaultdomain != NULL); ret = sm_log_create(NULL, &Lctx, &lcfg); SM_TEST_ERR(ret); ret = sm_log_setfp_fd(Lctx, smioerr, SMIOERR_FILENO); SM_TEST_ERR(ret); ret = sm_log_setdebuglevel(Lctx, 10); SM_TEST_ERR(ret); srand(time(0)); if (Concurrent) { void *res; status = pthread_cond_init(&cond_added, NULL); SM_TEST_E(0 == status); status = pthread_cond_init(&cond_deleted, NULL); SM_TEST_E(0 == status); status = pthread_create(&thr1, NULL, aq_thr_add, NULL); SM_TEST_E(0 == status); status = pthread_create(&thr2, NULL, aq_thr_del, NULL); SM_TEST_E(0 == status); status = pthread_join(thr1, &res); SM_TEST_E(0 == status); if (Verbose) sm_io_fprintf(smioout, "aq_thr_add=done\n"); status = pthread_join(thr2, &res); SM_TEST_E(0 == status); if (Verbose) sm_io_fprintf(smioout, "aq_thr_del=done\n"); goto done; } i = 0; do { reta = aq_t_add(); SM_TEST_ERR(reta); if (SM_NOTDONE == reta) { if (Verbose) sm_io_fprintf(smioout, "ar_t_add=not_done\n", i); break; } #if QM_TEST_SCHED ret = aq_t_sched(Aq_ctx, Once_TAs); SM_TEST_ERR(ret); if (SM_NOTDONE == ret) { if (Verbose) sm_io_fprintf(smioout, "ar_t_sched=not_done\n", i); break; } #endif /* QM_TEST_SCHED */ ret = aq_t_del(); SM_TEST_ERR(ret); if (SM_NOTDONE == ret) { if (Verbose) sm_io_fprintf(smioout, "ar_t_del=not_done\n", i); break; } ++i; } while (reta != SM_NOTDONE && ret != SM_NOTDONE); if (Verbose) sm_io_fprintf(smioout, "i=%u\n", i); if (Concurrent) { status = pthread_cond_destroy(&cond_added); SM_TEST_E(0 == status); status = pthread_cond_destroy(&cond_deleted); SM_TEST_E(0 == status); } done: ret = aq_close(Aq_ctx); return ret; error: return ret; } static void usage(const char *prg) { sm_io_fprintf(smioerr, "usage: %s [options]\n", prg); sm_io_fprintf(smioerr, "Test AQ insertion, lookups, and removal (domains are random numbers)\n"); sm_io_fprintf(smioerr, "options:\n"); sm_io_fprintf(smioerr, "-C try to perform adding/deleting entries concurrently\n"); sm_io_fprintf(smioerr, "-e n specify number of recipients to add each iteration\n"); sm_io_fprintf(smioerr, "-f n specify allowed fill of AQ\n"); sm_io_fprintf(smioerr, "-n n specify total number of recipients to use\n"); sm_io_fprintf(smioerr, "-s n specify size of AQ\n"); sm_io_fprintf(smioerr, "-r n fill rate\n"); sm_io_fprintf(smioerr, "-V increase verbosity\n"); exit(EX_USAGE); } int main(int argc, char **argv) { int c; sm_ret_T r; char *prg; #if HAVE_MONCONTROL moncontrol(1); #endif prg = argv[0]; Total_TAs = SM_AQ_RCPTS; Aq_size = SM_AQ_RCPTS; Once_TAs = Total_TAs / 10; #if SM_HEAP_CHECK SmHeapCheck = 0; #endif while ((c = getopt(argc, argv, "Ce:f:H:l:n:r:s:V")) != -1) { switch (c) { case 'C': Concurrent = true; break; case 'e': Once_TAs = (uint) strtoul(optarg, NULL, 0); break; case 'f': Fill = (uint) strtoul(optarg, NULL, 0); break; #if SM_HEAP_CHECK case 'H': SmHeapCheck = atoi(optarg); break; #endif case 'n': Total_TAs = (uint) strtoul(optarg, NULL, 0); break; case 'r': Rate = (uint) strtoul(optarg, NULL, 0); break; case 's': Aq_size = (uint) strtoul(optarg, NULL, 0); break; case 'V': ++Verbose; break; default: usage(prg); return EX_USAGE; } } sm_test_begin(argc, argv, "test AQ perf 0"); r = aq_t_perf(); #if SM_HEAP_CHECK if (HEAP_CHECK) { sm_io_fprintf(smioout, "heap should be empty except for makebuf:\n"); sm_heap_report(smioout, 3); } #endif sm_io_flush(smioout); return sm_test_end(); }