/*
* Copyright (c) 2003-2005 Sendmail, Inc. and its suppliers.
* All rights reserved.
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the sendmail distribution.
*/
#include "sm/generic.h"
SM_RCSID("@(#)$Id: aqwait.c,v 1.30 2006/12/11 01:22:06 ca Exp $")
#include "sm/types.h"
#include "sm/assert.h"
#include "sm/magic.h"
#include "sm/actdb-int.h"
#include "sm/qmgr-int.h"
#include "sm/qmgrdbg.h"
/*
** QM_RCPT_DA_EXPIRE -- calculate "expiration" time for a rcpt
** that is scheduled for delivery
**
** Parameters:
** qmgr_ctx -- QMGR context
** aq_rcpt -- AQ recipient
** startt -- start time
** pexpire -- (pointer to) expiration time (output)
**
** Returns:
** SM_SUCCESS
**
** Side Effects: none
**
** Last code review:
** Last code change:
*/
sm_ret_T
qm_rcpt_da_expire(qmgr_ctx_P qmgr_ctx, aq_rcpt_P aq_rcpt, time_T startt, time_T *pexpire)
{
time_T expire;
aq_ta_P aq_ta;
SM_IS_QMGR_CTX(qmgr_ctx);
SM_IS_AQ_RCPT(aq_rcpt);
SM_REQUIRE(pexpire != NULL);
expire = startt + qmgr_ctx->qmgr_cnf.q_cnf_tmo_da;
if (expire < startt)
expire = TIME_T_MAX; /* paranoia */
aq_ta = aq_rcpt->aqr_ss_ta;
if (aq_ta != NULL && aq_ta->aqt_msg_sz_b > 0) {
time_T expire2;
/*
** CONF assumed minimum transfer rate: 1KB/s
** Use a configuration variable and maybe some
** data gathered via occ?
*/
expire2 = expire + aq_ta->aqt_msg_sz_b / ONEKB;
if (expire2 > expire)
expire = expire2;
else if (expire2 < expire)
expire = TIME_T_MAX;
}
*pexpire = expire;
return SM_SUCCESS;
}
/*
** AQ_WAITQ_FIRST_TMO -- get expiration time of first entry in wait queue
**
** Parameters:
** aq_ctx -- AQ context
** which -- which waitq
** lockit -- needs locking?
**
** Returns:
** expiration time of first entry in wait queue
** (0 if there is none or on (un)lock error)
**
** Side Effects: none on error
**
** Locking: locks aq_ctx if requested
**
** Last code review: 2005-03-23 00:45:58
** Last code change:
**
** Note: alternatively the "first expiration time" could be maintained
** whenever entries are added to or removed from waitq.
*/
time_T
aq_waitq_first_tmo(aq_ctx_P aq_ctx, uint which, bool lockit)
{
time_T exp;
int r;
aq_rcpt_P aq_rcpt;
SM_IS_AQ(aq_ctx);
if (lockit) {
r = pthread_mutex_lock(&aq_ctx->aq_mutex);
SM_LOCK_OK(r);
if (r != 0)
return 0;
}
aq_rcpt = NULL;
if (AQR_WAITQ_EMPTY(aq_ctx))
exp = 0;
else {
aq_rcpt = AQR_WAITQ_FIRST(aq_ctx);
SM_ASSERT(AQR_WAITQ_NEXT(aq_rcpt) != NULL);
SM_IS_AQ_RCPT(aq_rcpt);
exp = aq_rcpt->aqr_expire;
}
QM_LEV_DPRINTF(4, (QM_DEBFP, "sev=DBG, func=aq_waitq_first_tmo, empty=%d, exp=%7ld, arq_expire=%7ld\n", AQR_WAITQ_EMPTY(aq_ctx), (long) exp, (long) (aq_rcpt == NULL ? -1 : aq_rcpt->aqr_expire)));
if (lockit) {
r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
SM_ASSERT(0 == r);
if (r != 0)
return 0;
}
return exp;
}
/*
** AQ_WAITQ_ADD -- add rcpt to wait queue
**
** Parameters:
** aq_ctx -- AQ context
** aq_rcpt -- AQ recipient
** startt -- start time (if 0: use aqr_entered)
** which -- which waitq
** lockit -- needs locking?
**
** Returns:
** >0: number of entries in wait queue
** <0: usual sm_error code; only (un)lock
**
** Side Effects: none on error
**
** Locking: locks aq_ctx if requested
**
** Last code review: 2005-03-23 00:45:04
** Last code change:
*
* Note: waitq should be implemented as a tree for efficiency.
* A linear list with a simple "insert" takes too long (O(n)).
* However, it should be ok for the basic implementation to
* check whether the algorithms that use this data structure
* actually work.
*
*/
sm_ret_T
aq_waitq_add(aq_ctx_P aq_ctx, aq_rcpt_P aq_rcpt, time_T startt, uint which, bool lockit)
{
sm_ret_T ret;
int r;
bool inserted;
aq_rcpt_P aq_rcpt_h;
SM_IS_AQ(aq_ctx);
SM_IS_AQ_RCPT(aq_rcpt);
if (lockit) {
r = pthread_mutex_lock(&aq_ctx->aq_mutex);
SM_LOCK_OK(r);
if (r != 0)
return sm_error_perm(SM_EM_AQ, r);
}
ret = SM_SUCCESS;
if (0 == startt)
startt = aq_rcpt->aqr_entered;
/* should expire depend on entered or on "now"?? */
if (AQR_IS_FLAG(aq_rcpt, AQR_FL_ARF|AQR_FL_SCHEDF))
aq_rcpt->aqr_expire = startt;
else if (AQWQ_DA == which)
(void) qm_rcpt_da_expire(aq_ctx->aq_qmgr_ctx, aq_rcpt, startt, &aq_rcpt->aqr_expire);
else if (AQWQ_AR == which)
aq_rcpt->aqr_expire = startt + aq_ctx->aq_qmgr_ctx->qmgr_cnf.q_cnf_tmo_ar;
else
SM_ASSERT(0);
/* bogus rcpt, doesn't wait for anything? */
QM_LEV_DPRINTF(4, (QM_DEBFP, "sev=DBG, func=aq_waitq_add, which=%u, rcpt=%p, aqr_flags=%x, ss_ta=%s, rcpt_idx=%d, exp=%7ld, startt=%7ld\n", which, aq_rcpt, aq_rcpt->aqr_flags, aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_idx, aq_rcpt->aqr_expire, startt));
SM_ASSERT(!AQR_IS_FLAG(aq_rcpt, AQR_FL_WAITQ(which)));
/* simple insert... not very efficient; ToDo: better algorithm */
inserted = false;
if (!AQR_WAITQ_EMPTY(aq_ctx)) {
SM_ASSERT(AQR_WAITQ_NEXT(AQR_WAITQ_FIRST(aq_ctx)) != NULL);
AQR_WAITQ_FOREACH_REVERSE(aq_ctx, aq_rcpt_h) {
SM_IS_AQ_RCPT(aq_rcpt_h);
if (aq_rcpt->aqr_expire < aq_rcpt_h->aqr_expire) {
AQR_WAITQ_INSERT_BEFORE(aq_ctx, aq_rcpt_h, aq_rcpt);
inserted = true;
break;
}
}
}
if (!inserted)
AQR_WAITQ_INSERT_TAIL(aq_ctx, aq_rcpt);
SM_ASSERT(AQR_WAITQ_NEXT(AQR_WAITQ_FIRST(aq_ctx)) != NULL);
SM_IS_AQ_RCPT(AQR_WAITQ_FIRST(aq_ctx));
AQR_SET_FLAG(aq_rcpt, AQR_FL_WAITQ(which));
#if AQ_STATS
ret = ++aq_ctx->aq_waitq_entries;
if (aq_ctx->aq_waitq_entries > aq_ctx->aq_waitq_max)
aq_ctx->aq_waitq_max = aq_ctx->aq_waitq_entries;
#endif
r=0;
AQR_WAITQ_FOREACH_REVERSE(aq_ctx, aq_rcpt_h) {
SM_IS_AQ_RCPT(aq_rcpt_h);
++r;
}
SM_ASSERT(r == aq_ctx->aq_waitq_entries);
r=0;
AQR_WAITQ_FOREACH(aq_ctx, aq_rcpt_h) {
SM_IS_AQ_RCPT(aq_rcpt_h);
++r;
}
SM_ASSERT(r == aq_ctx->aq_waitq_entries);
if (lockit) {
r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
SM_ASSERT(0 == r);
if (r != 0)
return sm_error_perm(SM_EM_AQ, r);
}
return ret;
#if 0
// errunlock:
// if (lockit)
// {
// r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
// SM_ASSERT(0 == r);
// if (r != 0)
// return sm_error_perm(SM_EM_AQ, r);
// }
// return ret;
#endif /* 0 */
}
/*
** AQ_WAITQ_RM -- remove rcpt from wait queue
** Note: this does not set a new timeout/aq sweep time.
**
** Parameters:
** aq_ctx -- AQ context
** aq_rcpt -- AQ recipient
** which -- which waitq
** lockit -- needs locking?
**
** Returns:
** usual sm_error code; only (un)lock
**
** Side Effects: none on error (except if unlock fails)
**
** Locking: locks aq_ctx if requested
**
** Last code review: 2005-03-18 19:02:43
** Last code change:
*/
sm_ret_T
aq_waitq_rm(aq_ctx_P aq_ctx, aq_rcpt_P aq_rcpt, uint which, bool lockit)
{
int r;
SM_IS_AQ(aq_ctx);
SM_IS_AQ_RCPT(aq_rcpt);
if (lockit) {
r = pthread_mutex_lock(&aq_ctx->aq_mutex);
SM_LOCK_OK(r);
if (r != 0)
return sm_error_perm(SM_EM_AQ, r);
}
if (AQWQ_ANY == which) {
SM_ASSERT(!AQR_IS_FLAGS(aq_rcpt, AQR_FL_WAIT4UPD|AQR_FL_WAITQ_AR));
if (AQR_IS_FLAG(aq_rcpt, AQR_FL_WAIT4UPD|AQR_FL_SCHEDF))
which = AQWQ_DA;
else if (AQR_IS_FLAG(aq_rcpt, AQR_FL_WAITQ_AR|AQR_FL_ARF))
which = AQWQ_AR;
/*
if (!AQR_IS_FLAG(aq_rcpt, AQR_FL_WAITQ(which)))
which = (AQWQ_DA == which) ? AQWQ_AR : AQWQ_DA;
*/
QM_LEV_DPRINTF(4, (QM_DEBFP, "sev=DBG, func=aq_waitq_rm, which=any, new_which=%u, rcpt=%p, aqr_flags=%x, inwaitq=%d, ss_ta=%s, rcpt_idx=%d\n", which, aq_rcpt, aq_rcpt->aqr_flags, AQR_IS_FLAG(aq_rcpt, AQR_FL_WAITQ(which)), aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_idx));
}
if (which != AQWQ_ANY && AQR_IS_FLAG(aq_rcpt, AQR_FL_WAITQ(which))) {
QM_LEV_DPRINTF(4, (QM_DEBFP, "sev=DBG, func=aq_waitq_rm, which=%u, rcpt=%p, aqr_flags=%x, ss_ta=%s, rcpt_idx=%d\n", which, aq_rcpt, aq_rcpt->aqr_flags, aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_idx));
AQR_WAITQ_REMOVE(aq_ctx, aq_rcpt);
if (!AQR_WAITQ_EMPTY(aq_ctx)) {
SM_IS_AQ_RCPT(AQR_WAITQ_FIRST(aq_ctx));
SM_ASSERT(AQR_WAITQ_NEXT(AQR_WAITQ_FIRST(aq_ctx)) != NULL);
}
AQR_CLR_FLAG(aq_rcpt, AQR_FL_WAITQ(which));
#if AQ_STATS
SM_ASSERT(aq_ctx->aq_waitq_entries > 0);
--aq_ctx->aq_waitq_entries;
#endif
/* change some other values? */
}
r=0;
AQR_WAITQ_FOREACH(aq_ctx, aq_rcpt) {
SM_IS_AQ_RCPT(aq_rcpt);
++r;
}
SM_ASSERT(r == aq_ctx->aq_waitq_entries);
if (lockit) {
r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
SM_ASSERT(0 == r);
if (r != 0)
return sm_error_perm(SM_EM_AQ, r);
}
return SM_SUCCESS;
}
syntax highlighted by Code2HTML, v. 0.9.1