/* */ #include "sm/generic.h" SM_RCSID("@(#)$Id: qm_test_fill_aq.c,v 1.10 2006/12/31 22:38:05 ca Exp $") #include "sm/assert.h" #include "sm/magic.h" #include "sm/error.h" #include "sm/memops.h" #include "sm/qmgr-int.h" #include "sm/actdb-int.h" #include "sm/aqrdq.h" #include "../qmgr/qmgr.h" #include "../qmgr/log.h" /* ** QM_TEST_FILL_AQ -- fill AQ with "test" transactions ** ** Parameters: ** lctx -- log context ** aq_ctx -- AQ context ** time_now -- current time ** tot_tas -- number of total transactions to add ** once_tas -- number of transactions to add in one invocation ** rate -- fill rate (msg/s) ** max_fill_aq -- maximum fill (per cent) of AQ ** defaultdomain -- domain to use if recipient doesn't have one ** ptas_added -- (pointer to) number of transactions added (out) ** locktype -- kind of locking ** ** Returns: ** usual sm_error code ** ** Called by: ** ** Last code review: ** Last code change: */ sm_ret_T qm_test_fill_aq(sm_log_ctx_P lctx, aq_ctx_P aq_ctx, time_T time_now, ulong tot_tas, uint32_t once_tas, uint max_fill_aq, uint rate, sm_str_P defaultdomain, uint32_t *ptas_added, thr_lock_T locktype) { sm_ret_T ret; int i, usage; aq_rcpt_P aq_rcpt; aq_ta_P aq_ta; static ulong added = 0; #ifdef MTA_NS_TIME static uint32_t once_tas_last = 0; static uint64_t t_first = 0; uint64_t t_now; #endif extern sm_cstr_P Tst_CDB_id; SM_IS_AQ(aq_ctx); aq_rcpt = NULL; aq_ta = NULL; if (ptas_added != NULL) *ptas_added = 0; if (added >= tot_tas) return SM_NOTDONE; #ifdef MTA_NS_TIME if (0 == t_first) { MTA_NS_TIME(t_first); once_tas_last = once_tas; } else if (rate > 0) { uint64_t d; MTA_NS_TIME(t_now); d = t_now - t_first; if (d > 0 && aq_usage(aq_ctx, AQ_USAGE_ALL) > 10) { uint rate_cur, ui; int perc; rate_cur = SEC2NSEC(added) / d; /*sm_io_fprintf(smioerr, "first=%lld, now=%lld, d=%lld, added=%ld, rate=%d, rate_cur=%d\n", t_first, t_now, d, added, rate, rate_cur); */ /* sm_io_fprintf(smioerr, ); */ QM_LEV_DPRINTFC(QDC_EDBR, 1, (QM_DEBFP, "sev=INFO, func=qm_test_fill_aq, now=%lld, d=%lld, added=%ld, rate_want=%d, rate_cur=%d, once_tas_last=%d, once_tas=%d\n", t_now / 1000, d / 1000, added, rate, rate_cur, once_tas_last, once_tas)); if (rate_cur > rate) { perc = (rate * 100) / rate_cur; if (once_tas_last > 0) { once_tas_last = (once_tas_last * perc) / 100; once_tas = once_tas_last; } else return sm_error_warn(SM_EM_Q_SCHED, 100 - perc); } else if (rate_cur < rate) { if (once_tas_last < once_tas) { if (0 == once_tas_last) once_tas_last = 1; perc = (rate * 100) / rate_cur; ui = (once_tas_last * perc) / 100; if (once_tas_last < ui) once_tas_last = ui; else if (rate > 110) ++once_tas_last; if (once_tas_last > once_tas) once_tas_last = once_tas; else once_tas = once_tas_last; } } } } #endif QM_LEV_DPRINTFC(QDC_EDBR, 3, (QM_DEBFP, "sev=INFO, func=qm_test_fill_aq, called=%ld, added=%lu, once=%u\n", time_now, added, once_tas)); for (i = 0; i < once_tas; i++) { usage = aq_usage(aq_ctx, AQ_USAGE_ALL); if (usage >= max_fill_aq) { sm_log_write(lctx, QM_LCAT_SCHED, QM_LMOD_DEFEDB, SM_LOG_INFO, 9, "sev=INFO, func=qm_test_fill_aq, aq_usage=%d, status=stop_adding_entries_from_defebd" , usage); if (ptas_added != NULL) *ptas_added = i; break; } ret = aq_ta_add_new(aq_ctx, &aq_ta, 0, 1, locktype); if (sm_is_err(ret)) { sm_log_write(lctx, QM_LCAT_SCHED, QM_LMOD_DEFEDB, SM_LOG_ERROR, 4, "sev=ERROR, func=qm_test_fill_aq, aq_ta_add_new=%m" , ret); goto error; } ret = aq_rcpt_add_new(aq_ctx, aq_ta, &aq_rcpt, 0 /*AQR_FL_IQDB*/, locktype); if (sm_is_err(ret)) { sm_log_write(lctx, QM_LCAT_SCHED, QM_LMOD_DEFEDB, SM_LOG_ERROR, 4, "sev=ERROR, func=qm_test_fill_aq, aq_rcpt_add_new=%m" , ret); goto error; } AQR_DA_INIT(aq_rcpt); aq_rcpt->aqr_entered = time_now; aq_ta->aqt_st_time = time_now; aq_ta->aqt_rcpts_inaq = 1; aq_ta->aqt_rcpts_tot = 1; aq_ta->aqt_msg_sz_b = 1; aq_ta->aqt_nxt_idx = 1; aq_ta->aqt_rcpts_left = 1; sm_snprintf(aq_ta->aqt_ss_ta_id, SMTP_STID_SIZE, SMTPS_STID_FORMAT, (id_count_T)added, 0); aq_ta->aqt_cdb_id = SM_CSTR_DUP(Tst_CDB_id); #define TST_MAIL "" aq_ta->aqt_mail->aqm_pa = sm_str_scpy0(NULL, TST_MAIL, sizeof(TST_MAIL) + 4); if (aq_ta->aqt_mail->aqm_pa == NULL) goto error; AQR_SS_INIT(aq_rcpt); AQR_DA_INIT(aq_rcpt); SESSTA_COPY(aq_rcpt->aqr_ss_ta_id, aq_ta->aqt_ss_ta_id); #define TST_RCPT "" aq_rcpt->aqr_pa = sm_str_scpy0(NULL, TST_RCPT, sizeof(TST_RCPT) + 4); if (aq_rcpt->aqr_pa == NULL) goto error; aq_rcpt->aqr_status = AQR_ST_NEW; aq_rcpt->aqr_addr_max = 1; aq_rcpt->aqr_addr_cur = 0; aq_rcpt->aqr_addrs = (aq_raddr_P) sm_malloc(sizeof(*(aq_rcpt->aqr_addrs))); if (aq_rcpt->aqr_addrs == NULL) goto error; aq_rcpt->aqr_addrs[0].aqra_ipv4 = htonl(INADDR_LOOPBACK); aq_rcpt->aqr_addrs[0].aqra_expt = time_now + 3600; aq_rcpt->aqr_addrs[0].aqra_pref = 0; aq_rcpt->aqr_entered = time_now; aq_rcpt->aqr_expire = time_now + 3600; aq_rcpt->aqr_flags = AQR_FL_RDY4DLVRY /*|AQR_FL_IQDB */; /* ignore result, a failure can be tolerated */ (void) aq_rcpt_set_domain(aq_rcpt, defaultdomain); /* add rcpt to todo queue */ ret = aq_rdq_add(aq_ctx, aq_rcpt, NULL, locktype); if (sm_is_err(ret)) { /* what to do on error?? stop for now.*/ QM_LEV_DPRINTFC(QDC_EDBR, 0, (QM_DEBFP, "sev=ERROR, func=qm_test_fill_aq, aq_rdq_add=%r\n", ret)); sm_log_write(lctx, QM_LCAT_SCHED, QM_LMOD_DEFEDB, SM_LOG_ERR, 4, "sev=ERROR, func=qm_test_fill_aq, ss_ta=%s, rcpt_pa=%S, stat=%d, aq_rdq_add=%m" , aq_rcpt->aqr_ss_ta_id , aq_rcpt->aqr_pa , aq_rcpt->aqr_status, ret); goto error; } /* Reset pointers to avoid accidental free */ aq_ta = NULL; aq_rcpt = NULL; ++added; } if (ptas_added != NULL) *ptas_added = i; if (aq_rcpt != NULL) { (void) aq_rcpt_rm(aq_ctx, aq_rcpt, AQR_RM_LOCK); aq_rcpt = NULL; } if (aq_ta != NULL) { (void) aq_ta_rm(aq_ctx, aq_ta, true); aq_ta = NULL; /* not really necessary */ } QM_LEV_DPRINTFC(QDC_EDBR, 3, (QM_DEBFP, "sev=INFO, func=qm_test_fill_aq, added=%lu\n", added)); return 0; error: if (aq_rcpt != NULL) { (void) aq_rcpt_rm(aq_ctx, aq_rcpt, AQR_RM_LOCK); aq_rcpt = NULL; /* not really necessary */ } if (aq_ta != NULL) { (void) aq_ta_rm(aq_ctx, aq_ta, true); aq_ta = NULL; /* not really necessary */ } QM_LEV_DPRINTFC(QDC_EDBR, 0, (QM_DEBFP, "sev=ERROR, func=qm_test_fill_aq, ret=%r\n", ret)); return ret; }