/*
 * Copyright (c) 2003-2005 Sendmail, Inc. and its suppliers.
 *      All rights reserved.
 *
 * By using this file, you agree to the terms and conditions set
 * forth in the LICENSE file which can be found at the top level of
 * the sendmail distribution.
 */

#include "sm/generic.h"
SM_RCSID("@(#)$Id: aqwait.c,v 1.30 2006/12/11 01:22:06 ca Exp $")
#include "sm/types.h"
#include "sm/assert.h"
#include "sm/magic.h"
#include "sm/actdb-int.h"
#include "sm/qmgr-int.h"
#include "sm/qmgrdbg.h"

/*
**  QM_RCPT_DA_EXPIRE -- calculate "expiration" time for a rcpt
**		that is scheduled for delivery
**
**	Parameters:
**		qmgr_ctx -- QMGR context
**		aq_rcpt -- AQ recipient
**		startt -- start time
**		pexpire -- (pointer to) expiration time (output)
**
**	Returns:
**		SM_SUCCESS
**
**	Side Effects: none
**
**	Last code review:
**	Last code change:
*/

sm_ret_T
qm_rcpt_da_expire(qmgr_ctx_P qmgr_ctx, aq_rcpt_P aq_rcpt, time_T startt, time_T *pexpire)
{
	time_T expire;
	aq_ta_P aq_ta;

	SM_IS_QMGR_CTX(qmgr_ctx);
	SM_IS_AQ_RCPT(aq_rcpt);
	SM_REQUIRE(pexpire != NULL);

	expire = startt + qmgr_ctx->qmgr_cnf.q_cnf_tmo_da;
	if (expire < startt)
		expire = TIME_T_MAX;	/* paranoia */
	aq_ta = aq_rcpt->aqr_ss_ta;
	if (aq_ta != NULL && aq_ta->aqt_msg_sz_b > 0) {
		time_T expire2;

		/*
		**  CONF assumed minimum transfer rate: 1KB/s
		**  Use a configuration variable and maybe some
		**  data gathered via occ?
		*/

		expire2 = expire + aq_ta->aqt_msg_sz_b / ONEKB;
		if (expire2 > expire)
			expire = expire2;
		else if (expire2 < expire)
			expire = TIME_T_MAX;
	}
	*pexpire = expire;
	return SM_SUCCESS;
}

/*
**  AQ_WAITQ_FIRST_TMO -- get expiration time of first entry in wait queue
**
**	Parameters:
**		aq_ctx -- AQ context
**		which -- which waitq
**		lockit -- needs locking?
**
**	Returns:
**		expiration time of first entry in wait queue
**		(0 if there is none or on (un)lock error)
**
**	Side Effects: none on error
**
**	Locking: locks aq_ctx if requested
**
**	Last code review: 2005-03-23 00:45:58
**	Last code change:
**
**  Note: alternatively the "first expiration time" could be maintained
**	whenever entries are added to or removed from waitq.
*/

time_T
aq_waitq_first_tmo(aq_ctx_P aq_ctx, uint which, bool lockit)
{
	time_T exp;
	int r;
	aq_rcpt_P aq_rcpt;

	SM_IS_AQ(aq_ctx);
	if (lockit) {
		r = pthread_mutex_lock(&aq_ctx->aq_mutex);
		SM_LOCK_OK(r);
		if (r != 0)
			return 0;
	}
	aq_rcpt = NULL;
	if (AQR_WAITQ_EMPTY(aq_ctx))
		exp = 0;
	else {
		aq_rcpt = AQR_WAITQ_FIRST(aq_ctx);
SM_ASSERT(AQR_WAITQ_NEXT(aq_rcpt) != NULL);
		SM_IS_AQ_RCPT(aq_rcpt);
		exp = aq_rcpt->aqr_expire;
	}
	QM_LEV_DPRINTF(4, (QM_DEBFP, "sev=DBG, func=aq_waitq_first_tmo, empty=%d, exp=%7ld, arq_expire=%7ld\n", AQR_WAITQ_EMPTY(aq_ctx), (long) exp, (long) (aq_rcpt == NULL ? -1 : aq_rcpt->aqr_expire)));

	if (lockit) {
		r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
		SM_ASSERT(0 == r);
		if (r != 0)
			return 0;
	}
	return exp;
}

/*
**  AQ_WAITQ_ADD -- add rcpt to wait queue
**
**	Parameters:
**		aq_ctx -- AQ context
**		aq_rcpt -- AQ recipient
**		startt -- start time (if 0: use aqr_entered)
**		which -- which waitq
**		lockit -- needs locking?
**
**	Returns:
**		>0: number of entries in wait queue
**		<0: usual sm_error code; only (un)lock
**
**	Side Effects: none on error
**
**	Locking: locks aq_ctx if requested
**
**	Last code review: 2005-03-23 00:45:04
**	Last code change:
 *
 * Note: waitq should be implemented as a tree for efficiency.
 * A linear list with a simple "insert" takes too long (O(n)).
 * However, it should be ok for the basic implementation to
 * check whether the algorithms that use this data structure
 * actually work.
 *
*/

sm_ret_T
aq_waitq_add(aq_ctx_P aq_ctx, aq_rcpt_P aq_rcpt, time_T startt, uint which, bool lockit)
{
	sm_ret_T ret;
	int r;
	bool inserted;
	aq_rcpt_P aq_rcpt_h;

	SM_IS_AQ(aq_ctx);
	SM_IS_AQ_RCPT(aq_rcpt);

	if (lockit) {
		r = pthread_mutex_lock(&aq_ctx->aq_mutex);
		SM_LOCK_OK(r);
		if (r != 0)
			return sm_error_perm(SM_EM_AQ, r);
	}
	ret = SM_SUCCESS;
	if (0 == startt)
		startt = aq_rcpt->aqr_entered;

	/* should expire depend on entered or on "now"?? */
	if (AQR_IS_FLAG(aq_rcpt, AQR_FL_ARF|AQR_FL_SCHEDF))
		aq_rcpt->aqr_expire = startt;
	else if (AQWQ_DA == which)
		(void) qm_rcpt_da_expire(aq_ctx->aq_qmgr_ctx, aq_rcpt, startt, &aq_rcpt->aqr_expire);
	else if (AQWQ_AR == which)
		aq_rcpt->aqr_expire = startt + aq_ctx->aq_qmgr_ctx->qmgr_cnf.q_cnf_tmo_ar;
	else
		SM_ASSERT(0);
		/* bogus rcpt, doesn't wait for anything? */
	QM_LEV_DPRINTF(4, (QM_DEBFP, "sev=DBG, func=aq_waitq_add, which=%u, rcpt=%p, aqr_flags=%x, ss_ta=%s, rcpt_idx=%d, exp=%7ld, startt=%7ld\n", which, aq_rcpt, aq_rcpt->aqr_flags, aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_idx, aq_rcpt->aqr_expire, startt));

	SM_ASSERT(!AQR_IS_FLAG(aq_rcpt, AQR_FL_WAITQ(which)));

	/* simple insert... not very efficient; ToDo: better algorithm */
	inserted = false;
	if (!AQR_WAITQ_EMPTY(aq_ctx)) {
SM_ASSERT(AQR_WAITQ_NEXT(AQR_WAITQ_FIRST(aq_ctx)) != NULL);
		AQR_WAITQ_FOREACH_REVERSE(aq_ctx, aq_rcpt_h) {
			SM_IS_AQ_RCPT(aq_rcpt_h);
			if (aq_rcpt->aqr_expire < aq_rcpt_h->aqr_expire) {
				AQR_WAITQ_INSERT_BEFORE(aq_ctx, aq_rcpt_h, aq_rcpt);
				inserted = true;
				break;
			}
		}
	}
	if (!inserted)
		AQR_WAITQ_INSERT_TAIL(aq_ctx, aq_rcpt);
SM_ASSERT(AQR_WAITQ_NEXT(AQR_WAITQ_FIRST(aq_ctx)) != NULL);
	SM_IS_AQ_RCPT(AQR_WAITQ_FIRST(aq_ctx));

	AQR_SET_FLAG(aq_rcpt, AQR_FL_WAITQ(which));
#if AQ_STATS
	ret = ++aq_ctx->aq_waitq_entries;
	if (aq_ctx->aq_waitq_entries > aq_ctx->aq_waitq_max)
		aq_ctx->aq_waitq_max = aq_ctx->aq_waitq_entries;
#endif

r=0;
AQR_WAITQ_FOREACH_REVERSE(aq_ctx, aq_rcpt_h) {
	SM_IS_AQ_RCPT(aq_rcpt_h);
	++r;
}
SM_ASSERT(r == aq_ctx->aq_waitq_entries);
r=0;
AQR_WAITQ_FOREACH(aq_ctx, aq_rcpt_h) {
	SM_IS_AQ_RCPT(aq_rcpt_h);
	++r;
}
SM_ASSERT(r == aq_ctx->aq_waitq_entries);

	if (lockit) {
		r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
		SM_ASSERT(0 == r);
		if (r != 0)
			return sm_error_perm(SM_EM_AQ, r);
	}
	return ret;

#if 0
//  errunlock:
//	if (lockit)
//	{
//		r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
//		SM_ASSERT(0 == r);
//		if (r != 0)
//			return sm_error_perm(SM_EM_AQ, r);
//	}
//	return ret;
#endif /* 0 */
}

/*
**  AQ_WAITQ_RM -- remove rcpt from wait queue
**	Note: this does not set a new timeout/aq sweep time.
**
**	Parameters:
**		aq_ctx -- AQ context
**		aq_rcpt -- AQ recipient
**		which -- which waitq
**		lockit -- needs locking?
**
**	Returns:
**		usual sm_error code; only (un)lock
**
**	Side Effects: none on error (except if unlock fails)
**
**	Locking: locks aq_ctx if requested
**
**	Last code review: 2005-03-18 19:02:43
**	Last code change:
*/

sm_ret_T
aq_waitq_rm(aq_ctx_P aq_ctx, aq_rcpt_P aq_rcpt, uint which, bool lockit)
{
	int r;

	SM_IS_AQ(aq_ctx);
	SM_IS_AQ_RCPT(aq_rcpt);

	if (lockit) {
		r = pthread_mutex_lock(&aq_ctx->aq_mutex);
		SM_LOCK_OK(r);
		if (r != 0)
			return sm_error_perm(SM_EM_AQ, r);
	}
	if (AQWQ_ANY == which) {
		SM_ASSERT(!AQR_IS_FLAGS(aq_rcpt, AQR_FL_WAIT4UPD|AQR_FL_WAITQ_AR));
		if (AQR_IS_FLAG(aq_rcpt, AQR_FL_WAIT4UPD|AQR_FL_SCHEDF))
			which = AQWQ_DA;
		else if (AQR_IS_FLAG(aq_rcpt, AQR_FL_WAITQ_AR|AQR_FL_ARF))
			which = AQWQ_AR;
/*
		if (!AQR_IS_FLAG(aq_rcpt, AQR_FL_WAITQ(which)))
			which = (AQWQ_DA == which) ? AQWQ_AR : AQWQ_DA;
*/
QM_LEV_DPRINTF(4, (QM_DEBFP, "sev=DBG, func=aq_waitq_rm, which=any, new_which=%u, rcpt=%p, aqr_flags=%x, inwaitq=%d, ss_ta=%s, rcpt_idx=%d\n", which, aq_rcpt, aq_rcpt->aqr_flags, AQR_IS_FLAG(aq_rcpt, AQR_FL_WAITQ(which)), aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_idx));
	}
	if (which != AQWQ_ANY && AQR_IS_FLAG(aq_rcpt, AQR_FL_WAITQ(which))) {
		QM_LEV_DPRINTF(4, (QM_DEBFP, "sev=DBG, func=aq_waitq_rm, which=%u, rcpt=%p, aqr_flags=%x, ss_ta=%s, rcpt_idx=%d\n", which, aq_rcpt, aq_rcpt->aqr_flags, aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_idx));
		AQR_WAITQ_REMOVE(aq_ctx, aq_rcpt);
if (!AQR_WAITQ_EMPTY(aq_ctx)) {
	SM_IS_AQ_RCPT(AQR_WAITQ_FIRST(aq_ctx));
	SM_ASSERT(AQR_WAITQ_NEXT(AQR_WAITQ_FIRST(aq_ctx)) != NULL);
}
		AQR_CLR_FLAG(aq_rcpt, AQR_FL_WAITQ(which));
#if AQ_STATS
		SM_ASSERT(aq_ctx->aq_waitq_entries > 0);
		--aq_ctx->aq_waitq_entries;
#endif
		/* change some other values? */
	}
r=0;
AQR_WAITQ_FOREACH(aq_ctx, aq_rcpt) {
	SM_IS_AQ_RCPT(aq_rcpt);
	++r;
}
SM_ASSERT(r == aq_ctx->aq_waitq_entries);

	if (lockit) {
		r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
		SM_ASSERT(0 == r);
		if (r != 0)
			return sm_error_perm(SM_EM_AQ, r);
	}
	return SM_SUCCESS;
}


syntax highlighted by Code2HTML, v. 0.9.1