/* * Copyright (c) 2003-2005 Sendmail, Inc. and its suppliers. * All rights reserved. * * By using this file, you agree to the terms and conditions set * forth in the LICENSE file which can be found at the top level of * the sendmail distribution. */ #include "sm/generic.h" SM_RCSID("@(#)$Id: aqwait.c,v 1.30 2006/12/11 01:22:06 ca Exp $") #include "sm/types.h" #include "sm/assert.h" #include "sm/magic.h" #include "sm/actdb-int.h" #include "sm/qmgr-int.h" #include "sm/qmgrdbg.h" /* ** QM_RCPT_DA_EXPIRE -- calculate "expiration" time for a rcpt ** that is scheduled for delivery ** ** Parameters: ** qmgr_ctx -- QMGR context ** aq_rcpt -- AQ recipient ** startt -- start time ** pexpire -- (pointer to) expiration time (output) ** ** Returns: ** SM_SUCCESS ** ** Side Effects: none ** ** Last code review: ** Last code change: */ sm_ret_T qm_rcpt_da_expire(qmgr_ctx_P qmgr_ctx, aq_rcpt_P aq_rcpt, time_T startt, time_T *pexpire) { time_T expire; aq_ta_P aq_ta; SM_IS_QMGR_CTX(qmgr_ctx); SM_IS_AQ_RCPT(aq_rcpt); SM_REQUIRE(pexpire != NULL); expire = startt + qmgr_ctx->qmgr_cnf.q_cnf_tmo_da; if (expire < startt) expire = TIME_T_MAX; /* paranoia */ aq_ta = aq_rcpt->aqr_ss_ta; if (aq_ta != NULL && aq_ta->aqt_msg_sz_b > 0) { time_T expire2; /* ** CONF assumed minimum transfer rate: 1KB/s ** Use a configuration variable and maybe some ** data gathered via occ? */ expire2 = expire + aq_ta->aqt_msg_sz_b / ONEKB; if (expire2 > expire) expire = expire2; else if (expire2 < expire) expire = TIME_T_MAX; } *pexpire = expire; return SM_SUCCESS; } /* ** AQ_WAITQ_FIRST_TMO -- get expiration time of first entry in wait queue ** ** Parameters: ** aq_ctx -- AQ context ** which -- which waitq ** lockit -- needs locking? ** ** Returns: ** expiration time of first entry in wait queue ** (0 if there is none or on (un)lock error) ** ** Side Effects: none on error ** ** Locking: locks aq_ctx if requested ** ** Last code review: 2005-03-23 00:45:58 ** Last code change: ** ** Note: alternatively the "first expiration time" could be maintained ** whenever entries are added to or removed from waitq. */ time_T aq_waitq_first_tmo(aq_ctx_P aq_ctx, uint which, bool lockit) { time_T exp; int r; aq_rcpt_P aq_rcpt; SM_IS_AQ(aq_ctx); if (lockit) { r = pthread_mutex_lock(&aq_ctx->aq_mutex); SM_LOCK_OK(r); if (r != 0) return 0; } aq_rcpt = NULL; if (AQR_WAITQ_EMPTY(aq_ctx)) exp = 0; else { aq_rcpt = AQR_WAITQ_FIRST(aq_ctx); SM_ASSERT(AQR_WAITQ_NEXT(aq_rcpt) != NULL); SM_IS_AQ_RCPT(aq_rcpt); exp = aq_rcpt->aqr_expire; } QM_LEV_DPRINTF(4, (QM_DEBFP, "sev=DBG, func=aq_waitq_first_tmo, empty=%d, exp=%7ld, arq_expire=%7ld\n", AQR_WAITQ_EMPTY(aq_ctx), (long) exp, (long) (aq_rcpt == NULL ? -1 : aq_rcpt->aqr_expire))); if (lockit) { r = pthread_mutex_unlock(&aq_ctx->aq_mutex); SM_ASSERT(0 == r); if (r != 0) return 0; } return exp; } /* ** AQ_WAITQ_ADD -- add rcpt to wait queue ** ** Parameters: ** aq_ctx -- AQ context ** aq_rcpt -- AQ recipient ** startt -- start time (if 0: use aqr_entered) ** which -- which waitq ** lockit -- needs locking? ** ** Returns: ** >0: number of entries in wait queue ** <0: usual sm_error code; only (un)lock ** ** Side Effects: none on error ** ** Locking: locks aq_ctx if requested ** ** Last code review: 2005-03-23 00:45:04 ** Last code change: * * Note: waitq should be implemented as a tree for efficiency. * A linear list with a simple "insert" takes too long (O(n)). * However, it should be ok for the basic implementation to * check whether the algorithms that use this data structure * actually work. * */ sm_ret_T aq_waitq_add(aq_ctx_P aq_ctx, aq_rcpt_P aq_rcpt, time_T startt, uint which, bool lockit) { sm_ret_T ret; int r; bool inserted; aq_rcpt_P aq_rcpt_h; SM_IS_AQ(aq_ctx); SM_IS_AQ_RCPT(aq_rcpt); if (lockit) { r = pthread_mutex_lock(&aq_ctx->aq_mutex); SM_LOCK_OK(r); if (r != 0) return sm_error_perm(SM_EM_AQ, r); } ret = SM_SUCCESS; if (0 == startt) startt = aq_rcpt->aqr_entered; /* should expire depend on entered or on "now"?? */ if (AQR_IS_FLAG(aq_rcpt, AQR_FL_ARF|AQR_FL_SCHEDF)) aq_rcpt->aqr_expire = startt; else if (AQWQ_DA == which) (void) qm_rcpt_da_expire(aq_ctx->aq_qmgr_ctx, aq_rcpt, startt, &aq_rcpt->aqr_expire); else if (AQWQ_AR == which) aq_rcpt->aqr_expire = startt + aq_ctx->aq_qmgr_ctx->qmgr_cnf.q_cnf_tmo_ar; else SM_ASSERT(0); /* bogus rcpt, doesn't wait for anything? */ QM_LEV_DPRINTF(4, (QM_DEBFP, "sev=DBG, func=aq_waitq_add, which=%u, rcpt=%p, aqr_flags=%x, ss_ta=%s, rcpt_idx=%d, exp=%7ld, startt=%7ld\n", which, aq_rcpt, aq_rcpt->aqr_flags, aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_idx, aq_rcpt->aqr_expire, startt)); SM_ASSERT(!AQR_IS_FLAG(aq_rcpt, AQR_FL_WAITQ(which))); /* simple insert... not very efficient; ToDo: better algorithm */ inserted = false; if (!AQR_WAITQ_EMPTY(aq_ctx)) { SM_ASSERT(AQR_WAITQ_NEXT(AQR_WAITQ_FIRST(aq_ctx)) != NULL); AQR_WAITQ_FOREACH_REVERSE(aq_ctx, aq_rcpt_h) { SM_IS_AQ_RCPT(aq_rcpt_h); if (aq_rcpt->aqr_expire < aq_rcpt_h->aqr_expire) { AQR_WAITQ_INSERT_BEFORE(aq_ctx, aq_rcpt_h, aq_rcpt); inserted = true; break; } } } if (!inserted) AQR_WAITQ_INSERT_TAIL(aq_ctx, aq_rcpt); SM_ASSERT(AQR_WAITQ_NEXT(AQR_WAITQ_FIRST(aq_ctx)) != NULL); SM_IS_AQ_RCPT(AQR_WAITQ_FIRST(aq_ctx)); AQR_SET_FLAG(aq_rcpt, AQR_FL_WAITQ(which)); #if AQ_STATS ret = ++aq_ctx->aq_waitq_entries; if (aq_ctx->aq_waitq_entries > aq_ctx->aq_waitq_max) aq_ctx->aq_waitq_max = aq_ctx->aq_waitq_entries; #endif r=0; AQR_WAITQ_FOREACH_REVERSE(aq_ctx, aq_rcpt_h) { SM_IS_AQ_RCPT(aq_rcpt_h); ++r; } SM_ASSERT(r == aq_ctx->aq_waitq_entries); r=0; AQR_WAITQ_FOREACH(aq_ctx, aq_rcpt_h) { SM_IS_AQ_RCPT(aq_rcpt_h); ++r; } SM_ASSERT(r == aq_ctx->aq_waitq_entries); if (lockit) { r = pthread_mutex_unlock(&aq_ctx->aq_mutex); SM_ASSERT(0 == r); if (r != 0) return sm_error_perm(SM_EM_AQ, r); } return ret; #if 0 // errunlock: // if (lockit) // { // r = pthread_mutex_unlock(&aq_ctx->aq_mutex); // SM_ASSERT(0 == r); // if (r != 0) // return sm_error_perm(SM_EM_AQ, r); // } // return ret; #endif /* 0 */ } /* ** AQ_WAITQ_RM -- remove rcpt from wait queue ** Note: this does not set a new timeout/aq sweep time. ** ** Parameters: ** aq_ctx -- AQ context ** aq_rcpt -- AQ recipient ** which -- which waitq ** lockit -- needs locking? ** ** Returns: ** usual sm_error code; only (un)lock ** ** Side Effects: none on error (except if unlock fails) ** ** Locking: locks aq_ctx if requested ** ** Last code review: 2005-03-18 19:02:43 ** Last code change: */ sm_ret_T aq_waitq_rm(aq_ctx_P aq_ctx, aq_rcpt_P aq_rcpt, uint which, bool lockit) { int r; SM_IS_AQ(aq_ctx); SM_IS_AQ_RCPT(aq_rcpt); if (lockit) { r = pthread_mutex_lock(&aq_ctx->aq_mutex); SM_LOCK_OK(r); if (r != 0) return sm_error_perm(SM_EM_AQ, r); } if (AQWQ_ANY == which) { SM_ASSERT(!AQR_IS_FLAGS(aq_rcpt, AQR_FL_WAIT4UPD|AQR_FL_WAITQ_AR)); if (AQR_IS_FLAG(aq_rcpt, AQR_FL_WAIT4UPD|AQR_FL_SCHEDF)) which = AQWQ_DA; else if (AQR_IS_FLAG(aq_rcpt, AQR_FL_WAITQ_AR|AQR_FL_ARF)) which = AQWQ_AR; /* if (!AQR_IS_FLAG(aq_rcpt, AQR_FL_WAITQ(which))) which = (AQWQ_DA == which) ? AQWQ_AR : AQWQ_DA; */ QM_LEV_DPRINTF(4, (QM_DEBFP, "sev=DBG, func=aq_waitq_rm, which=any, new_which=%u, rcpt=%p, aqr_flags=%x, inwaitq=%d, ss_ta=%s, rcpt_idx=%d\n", which, aq_rcpt, aq_rcpt->aqr_flags, AQR_IS_FLAG(aq_rcpt, AQR_FL_WAITQ(which)), aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_idx)); } if (which != AQWQ_ANY && AQR_IS_FLAG(aq_rcpt, AQR_FL_WAITQ(which))) { QM_LEV_DPRINTF(4, (QM_DEBFP, "sev=DBG, func=aq_waitq_rm, which=%u, rcpt=%p, aqr_flags=%x, ss_ta=%s, rcpt_idx=%d\n", which, aq_rcpt, aq_rcpt->aqr_flags, aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_idx)); AQR_WAITQ_REMOVE(aq_ctx, aq_rcpt); if (!AQR_WAITQ_EMPTY(aq_ctx)) { SM_IS_AQ_RCPT(AQR_WAITQ_FIRST(aq_ctx)); SM_ASSERT(AQR_WAITQ_NEXT(AQR_WAITQ_FIRST(aq_ctx)) != NULL); } AQR_CLR_FLAG(aq_rcpt, AQR_FL_WAITQ(which)); #if AQ_STATS SM_ASSERT(aq_ctx->aq_waitq_entries > 0); --aq_ctx->aq_waitq_entries; #endif /* change some other values? */ } r=0; AQR_WAITQ_FOREACH(aq_ctx, aq_rcpt) { SM_IS_AQ_RCPT(aq_rcpt); ++r; } SM_ASSERT(r == aq_ctx->aq_waitq_entries); if (lockit) { r = pthread_mutex_unlock(&aq_ctx->aq_mutex); SM_ASSERT(0 == r); if (r != 0) return sm_error_perm(SM_EM_AQ, r); } return SM_SUCCESS; }