/*
 * Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers.
 *	All rights reserved.
 *
 * By using this file, you agree to the terms and conditions set
 * forth in the LICENSE file which can be found at the top level of
 * the sendmail distribution.
 */

#include "sm/generic.h"
SM_RCSID("@(#)$Id: sched.c,v 1.263 2007/06/24 02:52:57 ca Exp $")
#include "sm/error.h"
#include "sm/assert.h"
#include "sm/memops.h"
#include "sm/io.h"
#include "sm/rcb.h"
#include "sm/reccom.h"
#include "sm/da.h"
#include "sm/das.h"
#include "sm/qmgr.h"
#include "sm/qmgr-int.h"
#include "sm/misc.h"
#include "qmgr.h"
#include "sm/aqrdq.h"
#include "log.h"

/*
**  Which access methods to recipients does the scheduler need?
**  DA/Host: for session reuse.
**  SMTPS TA-Id: to find recipients for the same TA (that could be
**	be delivered in one TA if the DA/Host is the same).
**
**  Which other (recipient) data does the scheduler need?
**  Number of recipients/TAs destined for the same DA/Host.
*/

/*
**  Can both recipients be delivered in the same transaction?
**  They are from the same incoming transaction, but do they share same
**  delivery information (MX piggypacking?)
**  Bounces can't be piggypacked: they have content in the rcpt itself.
*/

#define SAME_TRANSACTION(aq_rcpt, aq_rcpt_nxt)	\
	((aq_rcpt)->aqr_addrs[(aq_rcpt)->aqr_addr_cur].aqra_ipv4 == \
	 (aq_rcpt_nxt)->aqr_addrs[(aq_rcpt_nxt)->aqr_addr_cur].aqra_ipv4 \
	&& (aq_rcpt)->aqr_da_idx == (aq_rcpt_nxt)->aqr_da_idx \
	&& (aq_rcpt)->aqr_port == (aq_rcpt_nxt)->aqr_port \
	&& (aq_rcpt)->aqr_owner_idx == (aq_rcpt_nxt)->aqr_owner_idx \
	&& !AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_DSN) \
	&& !AQR_IS_FLAG(aq_rcpt_nxt, AQR_FL_IS_DSN) \
	&& !AQR_IS_FLAG(aq_rcpt, AQR_FL_HAS_VERP) \
	&& !AQR_IS_FLAG(aq_rcpt_nxt, AQR_FL_HAS_VERP) \
	)

/* primitive version of setting an error value if it isn't set already */
#define SM_SET_RET(res, ret) do {				\
		if (sm_is_err(ret) && !sm_is_err(res))	\
			(res) = (ret);			\
	} while (0)

/*
**  QM_AR_ACTSCHED -- Decide whether to activate scheduler after getting
**	result from AR.
**
**	Parameters:
**		qmgr_ctx -- QMGR context
**		aq_rcpt -- AQ recipient
**		pnotify_sched -- (pointer to) notify scheduler? (output)
**
**	Returns:
**		usual sm_error code; only (un)lock
**
**	Last code review: 2005-04-04 20:39:02
**	Last code change: 2005-04-04 20:37:48
*/

sm_ret_T
qm_ar_actsched(qmgr_ctx_P qmgr_ctx, aq_rcpt_P aq_rcpt, bool *pnotify_sched)
{
#undef SMFCT
#define SMFCT   "qm_ar_actsched"
	sm_ret_T ret;
	int r;
	occ_entry_P occ_entry;

	SM_IS_QMGR_CTX(qmgr_ctx);
	SM_IS_AQ_RCPT(aq_rcpt);
	SM_REQUIRE(pnotify_sched != NULL);

	/* default: activate scheduler */
	*pnotify_sched = true;
	ret = SM_SUCCESS;
	if (!AQR_IS_FLAG(aq_rcpt, AQR_FL_ARF) && aq_rcpt->aqr_addrs != NULL) {
		occ_entry = NULL;
		SM_ASSERT(aq_rcpt->aqr_addr_cur < aq_rcpt->aqr_addr_max);
		ret = occ_entry_find(qmgr_ctx->qmgr_occ_ctx->occx_ht,
			aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4,
			&occ_entry, &qmgr_ctx->qmgr_occ_ctx->occx_mutex,
			THR_LOCK_UNLERR);
		if (SM_SUCCESS == ret && occ_entry != NULL) {
			if (occ_entry->occe_open_se >= occ_entry->occe_cur_conc
			    && occ_entry->occe_open_se == occ_entry->occe_open_ta)
			{
#if 0
				QM_LEV_DPRINTFC(QDC_SCHED, 2, (QM_DEBFP,
					"func=qm_ar_actsched, open_se=%d, cur_conc=%d, open_se=%d, open_ta=%d\n"
					, occ_entry->occe_open_se
					, occ_entry->occe_cur_conc
					, occ_entry->occe_open_se
					, occ_entry->occe_open_ta));
#endif /* 0 */
				*pnotify_sched = false;
				OCCE_SET_FLAG(occ_entry, OCCE_FL_SE_WAIT);
			}
			r = smthread_mutex_unlock(&qmgr_ctx->qmgr_occ_ctx->occx_mutex);
			SM_ASSERT(r == 0);
			if (r != 0)
				ret = sm_error_perm(SM_EM_Q_SCHED, r);
		}
		else
			ret = SM_SUCCESS;
	}
#if 0
	/* else: scheduler might be waiting for result, see below: */
	if (!((aq_ta->aqt_rcpts_ar == 0)
	      || (aq_ta->aqt_rcpts_ar <= (aq_ta->aqt_rcpts_inaq >> 1)
		  && (aq_ctx->aq_entries < (aq_ctx->aq_t_da >> 4)))
#endif /* 0 */

	return ret;
}

/*
**  QM_SC_CONF -- find smtpc configuration data for server
**
**	Parameters:
**		qmgr_ctx -- QMGR context
**		ipv4 -- IPv4 address of server
**		dadb_entry -- DA DB entry
**
**	Returns:
**		usual sm_error code
**
**	Last code review:
**	Last code change:
*/

static sm_ret_T
qm_sc_conf(qmgr_ctx_P qmgr_ctx, ipv4_T ipv4, dadb_entry_P dadb_entry)
{
#if MTA_USE_TLS
#undef SMFCT
#define SMFCT   "qm_sc_conf"
	sm_ret_T ret;

	dadb_entry->dadbe_se_maprescnf = sm_err_perm(SM_E_NOTFOUND);
	if (NULL == qmgr_ctx->qmgr_conf_map ||
	    !QCNF_IS_FLAG(qmgr_ctx, QCNF_FL_SC_LKP_SE_CONF))
	{
		return SM_SUCCESS;
	}

	if (NULL == dadb_entry->dadbe_se_conf)
		dadb_entry->dadbe_se_conf = sm_str_new(NULL, 256, 1024); /*check size*/
	if (NULL == dadb_entry->dadbe_lhs)
		dadb_entry->dadbe_lhs = sm_str_new(NULL, 10, 16); /* check size */
	if (NULL == dadb_entry->dadbe_tag)
		dadb_entry->dadbe_tag = sm_str_new(NULL, 16, 32); /* check size */
	if (NULL == dadb_entry->dadbe_se_conf ||
	    NULL == dadb_entry->dadbe_lhs ||
	    NULL == dadb_entry->dadbe_tag)
	{
		return sm_err_temp(ENOMEM);
	}

	sm_str_clr(dadb_entry->dadbe_se_conf);
	sm_str_clr(dadb_entry->dadbe_lhs);
	sm_str_clr(dadb_entry->dadbe_tag);

#define SC_SE_TAG "smtpc_sess_conf:"
	ret = sm_err_temp(SM_E_OVFLW_NS);
	if (sm_str_scat(dadb_entry->dadbe_tag, SC_SE_TAG) == SM_SUCCESS
	    && sm_inet_ipv4str(ipv4, dadb_entry->dadbe_lhs) == SM_SUCCESS
	    && (ret = sm_map_lookup_ip(qmgr_ctx->qmgr_conf_map,
	                dadb_entry->dadbe_lhs, dadb_entry->dadbe_tag,
	                SMMAP_LFL_SUBNETS|SMMAP_LFL_TAG,
	                dadb_entry->dadbe_se_conf)) == SM_SUCCESS
	    && sm_str_getlen(dadb_entry->dadbe_se_conf) > 0)
	{
QM_LEV_DPRINTFC(QDC_S2Q, 4, (QM_DEBFP, "func=%s, ipv4=%A, %slookup=%r, rhs=%S\n", SMFCT, ipv4, SC_SE_TAG, ret, dadb_entry->dadbe_se_conf));
		dadb_entry->dadbe_se_maprescnf = ret;
	}

	return ret;
#else /* MTA_USE_TLS */
	return SM_SUCCESS;
#endif /* MTA_USE_TLS */
}

/*
**  QM_TO_SC_TASK -- create one session with one task for SMTPC
**	XXX HACK just one session with one transaction with one (or more)
**		recipients (see qm_to_sc_add_rcpt())
**
**	Parameters:
**		qsc_ctx -- QMGR SMTPC context
**		se_reuse -- reuse an open session?
**		one_ta -- only one transaction? (close session after this TA?)
**		aq_ta -- AQ transaction
**		aq_rcpt -- AQ recipient
**		rcbe -- RCB entry
**		dadb_entry -- DA DB entry
**
**	Returns:
**		usual sm_error code; see sm_rcb_putv()
**
**	Side Effects: does not clean up rcb in case of an error.
**
**	Last code review: 2005-04-04 21:03:45
**	Last code change: 2006-02-21 05:55:16
*/

static sm_ret_T
qm_to_sc_task(qsc_ctx_P qsc_ctx, bool se_reuse, bool one_ta, aq_ta_P aq_ta, aq_rcpt_P aq_rcpt, sm_rcbe_P rcbe, dadb_entry_P dadb_entry)
{
	sm_rcb_P rcb;
	sm_ret_T ret;
	uint32_t cdb_rt, ta_flags, se_flags;
	bool dsn;
	extern sm_str_P NullSender;
	sm_str_P sender, verp;

	SM_ASSERT(!aq_rcpt_has_owner(aq_rcpt) ||
		(aq_rcpt->aqr_owner_idx > 0 &&
		 aq_rcpt->aqr_owner_idx <= aq_ta->aqt_owners_n));

	verp = NULL;
	rcb = &rcbe->rcbe_rcb;

	/* treat double bounce differently?? */
	dsn = AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_DSN);
	se_flags = ta_flags = 0;
	cdb_rt = RT_Q2C_CDBID;
	if (dsn &&
	    (QCNF_IS_FLAG(qsc_ctx->qsc_qmgr_ctx, QCNF_FL_B_HDR) ||
	     (AQ_TA_IS_FLAG(aq_ta, AQ_TA_FL_NO_BODY) &&
	      QCNF_IS_FLAG(qsc_ctx->qsc_qmgr_ctx, QCNF_FL_B_BODY_HDR))))
	{
		cdb_rt = RT_Q2C_CDBID_HDR;
		ta_flags |= DA_FL_HDR_ONLY;
	}
	if (dsn && QCNF_IS_FLAG(qsc_ctx->qsc_qmgr_ctx, QCNF_FL_B_DSN_MIME))
		ta_flags |= DA_FL_DSN_MIME;
	if (!one_ta)
		se_flags |= DA_FL_SE_KEEP;
	if (se_reuse)
		se_flags |= DA_FL_SE_REUSE_EX;

	if (dsn)
		sender = NullSender;
	else if (AQR_IS_FLAG(aq_rcpt, AQR_FL_HAS_VERP) && aq_rcpt_has_owner(aq_rcpt))
	{
		size_t len;
		sm_str_P owner;

		owner = aq_ta->aqt_owners_pa[aq_rcpt->aqr_owner_idx - 1];
		len = sm_str_getlen(owner) + sm_str_getlen(aq_rcpt->aqr_pa);
		verp = sm_str_new(NULL, len, len + 4);
		if (NULL == verp) {
			ret = sm_err_temp(ENOMEM);
			goto error;
		}
		ret = sm_verpify(owner, aq_rcpt->aqr_pa, '\0', '\0', verp);
		if (sm_is_err(ret)) goto error;
		sender = verp;
	}
	else if (AQ_TA_IS_FLAG(aq_ta, AQ_TA_FL_VERP)) {
		size_t len;

		len = sm_str_getlen(aq_ta->aqt_mail->aqm_pa) + sm_str_getlen(aq_rcpt->aqr_pa);
		verp = sm_str_new(NULL, len, len + 4);
		if (NULL == verp) {
			ret = sm_err_temp(ENOMEM);
			goto error;
		}
		ret = sm_verpify(aq_ta->aqt_mail->aqm_pa, aq_rcpt->aqr_pa, '\0', '\0', verp);
		if (sm_is_err(ret)) goto error;
		sender = verp;
	}
	else if (aq_rcpt_has_owner(aq_rcpt))
		sender = aq_ta->aqt_owners_pa[aq_rcpt->aqr_owner_idx - 1];
	else
		sender = aq_ta->aqt_mail->aqm_pa;

	ret = sm_rcb_putv(rcb, RCB_PUTV_FIRST,
		SM_RCBV_INT, RT_PROT_VER, PROT_VER_RT,
		SM_RCBV_INT, RT_Q2C_ID, qsc_ctx->qsc_id,
#ifdef RT_Q2C_DCID
		SM_RCBV_INT, RT_Q2C_DCID, 0, /* delivery class, cur. unused */
#endif
		SM_RCBV_INT, RT_Q2C_SE_FLAGS, se_flags,
		SM_RCBV_BUF,
			se_reuse ? (one_ta ? RT_Q2C_SEID_1TA  : RT_Q2C_SEID)
				 : (one_ta ? RT_Q2C_NSEID_1TA : RT_Q2C_NSEID),
			dadb_entry->dadbe_da_se_id, SMTP_STID_SIZE,
		SM_RCBV_INT,
			(0 == aq_rcpt->aqr_port) ? RT_NOSEND : RT_Q2C_SRVPORT,
			(uint32_t) aq_rcpt->aqr_port,
		SM_RCBV_INT,
			(DA_IDX_ESMTP == aq_rcpt->aqr_da_idx) ? RT_NOSEND : RT_Q2C_DA_IDX,
			aq_rcpt->aqr_da_idx,

		/* XXX HACK XXX */
		SM_RCBV_INT, RT_Q2C_SRVIPV4,
			aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4,
		SM_RCBV_BUF,
			AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_DLY)
			? RT_Q2C_NTAIDD : AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_BNC)
			? RT_Q2C_NTAIDB : AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_DBNC)
			? RT_Q2C_NTAIDDB : RT_Q2C_NTAID,
			dadb_entry->dadbe_da_ta_id, SMTP_STID_SIZE,
		SM_RCBV_BUF, RT_Q2C_SSTAID, aq_ta->aqt_ss_ta_id, SMTP_STID_SIZE,
		SM_RCBV_OFF, RT_Q2C_SIZE_B, aq_ta->aqt_msg_sz_b,
		SM_RCBV_STR, RT_Q2C_MAIL, sender,
		SM_RCBV_CSTR, cdb_rt, aq_ta->aqt_cdb_id,
		SM_RCBV_INT, RT_Q2C_TA_FLAGS, ta_flags,
		SM_RCBV_END);

#if MTA_USE_TLS
	if (sm_is_success(ret) &&
	    sm_is_success(dadb_entry->dadbe_se_maprescnf) &&
	    dadb_entry->dadbe_se_conf != NULL)
	{
		ret = sm_rcb_putv(rcb, RCB_PUTV_NONE,
			SM_RCBV_INT, RT_Q2C_MAP_RES_CNF_SRV, dadb_entry->dadbe_se_maprescnf,
			SM_RCBV_STR, RT_Q2C_RHS_CNF_SRV, dadb_entry->dadbe_se_conf,
			SM_RCBV_END);
	}
#endif /* MTA_USE_TLS */

	if (sm_is_success(ret))
		ret = sm_hdrmodl_wr(aq_ta->aqt_hdrmodhd, rcb, RT_Q2C_HM_T_P, RT_Q2C_HM_HDR);

	if (sm_is_success(ret))
		ret = sm_rcb_putv(rcb, RCB_PUTV_NONE,
			SM_RCBV_INT, RT_Q2C_RCPT_IDX, aq_rcpt->aqr_idx,
			SM_RCBV_STR, RT_Q2C_RCPT, aq_rcpt->aqr_pa,
			SM_RCBV_END);

#if MTA_USE_TLS
	if (sm_is_success(ret) &&
	    sm_is_success(aq_rcpt->aqr_maprescnf) &&
	    aq_rcpt->aqr_conf != NULL)
	{
		ret = sm_rcb_putv(rcb, RCB_PUTV_NONE,
			SM_RCBV_INT, RT_Q2C_MAP_RES_CNF_RCPT, aq_rcpt->aqr_maprescnf,
			SM_RCBV_STR, RT_Q2C_RHS_CNF_RCPT, aq_rcpt->aqr_conf,
			SM_RCBV_END);
	}
#endif /* MTA_USE_TLS */

	QM_LEV_DPRINTFC(QDC_SCHED, 1, (QM_DEBFP, "func=qm_to_sc_task, ss_ta=%s, cdb=%s, da_ta=%s, ta_flags=%#x, dsn=%d, ret=%r\n",
		aq_ta->aqt_ss_ta_id, sm_cstr_data(aq_ta->aqt_cdb_id),
		dadb_entry->dadbe_da_ta_id, aq_ta->aqt_flags, dsn, ret));
	if (sm_is_err(ret)) goto error;
	if (dsn) {
		ret = sm_rcb_putv(rcb, RCB_PUTV_NONE,
			SM_RCBV_STR, RT_Q2C_B_MSG, aq_rcpt->aqr_dsn_msg,
			SM_RCBV_END);
		if (sm_is_err(ret)) goto error;
		AQ_TA_SET_FLAG(aq_ta, AQ_TA_FL_NO_BODY);
	}
	SM_STR_FREE(verp);
	return ret;

  error:
	SM_STR_FREE(verp);
	QM_LEV_DPRINTFC(QDC_SCHED, 0, (QM_DEBFP, "sev=ERROR, func=qm_to_sc_task, ret=%r\n", ret));
	return ret;
}

#if QMGR_DEBUG > 1
void
aq_rcpt_print(aq_rcpt_P aq_rcpt)
{
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aq_rcpt=%p\n", aq_rcpt));
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "ss_ta=%s\n", aq_rcpt->aqr_ss_ta_id));
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "rcpt=%S\n", aq_rcpt->aqr_pa));
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqr_idx=%u\n", aq_rcpt->aqr_idx));
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqr_tries=%u\n", aq_rcpt->aqr_tries));
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "srv_ipv4=%A\n", aq_rcpt->aqr_addrs == NULL ? -1 : aq_rcpt->aqr_addrs[0].aqra_ipv4));
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqr_da_idx=%u\n", aq_rcpt->aqr_da_idx));
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqr_st_time=%ld\n", aq_rcpt->aqr_st_time));
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqr_last_try=%ld\n", aq_rcpt->aqr_last_try));
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqr_next_try=%ld\n", aq_rcpt->aqr_next_try));
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqr_status=%d\n", aq_rcpt->aqr_status));
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqr_flags=%#x\n", aq_rcpt->aqr_flags));
	QM_LEV_DPRINTFC(QDC_SCHED, 5, (QM_DEBFP, "aqr_ss_link.sm_rg_succ=%p\n", AQR_SS_SUCC(aq_rcpt)));
	QM_LEV_DPRINTFC(QDC_SCHED, 5, (QM_DEBFP, "aqr_ss_link.sm_rg_pred=%p\n", AQR_SS_PRED(aq_rcpt)));
	QM_LEV_DPRINTFC(QDC_SCHED, 5, (QM_DEBFP, "aqr_da_link.sm_rg_succ=%p\n", AQR_DA_SUCC(aq_rcpt)));
	QM_LEV_DPRINTFC(QDC_SCHED, 5, (QM_DEBFP, "aqr_da_link.sm_rg_pred=%p\n", AQR_DA_PRED(aq_rcpt)));
}

void
aq_ta_print(aq_ta_P aq_ta)
{
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aq_ta=%p\n", aq_ta));
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "ss_ta=%s\n", aq_ta->aqt_ss_ta_id));
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "mail=%S\n", aq_ta->aqt_mail->aqm_pa));
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "time=%ld\n", (long) aq_ta->aqt_st_time));
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "cdb=%s\n", sm_cstr_data(aq_ta->aqt_cdb_id)));
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "rcpts=%u\n", aq_ta->aqt_rcpts_inaq));
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "rcpts_tot=%u\n", aq_ta->aqt_rcpts_tot));
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "rcpts_left=%u\n", aq_ta->aqt_rcpts_left));
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "rcpts_temp=%u\n", aq_ta->aqt_rcpts_temp));
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "rcpts_perm=%u\n", aq_ta->aqt_rcpts_perm));
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "state=%d\n", aq_ta->aqt_state));
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqt_rcpts_ar=%d\n", aq_ta->aqt_rcpts_ar));
}
#endif /* QMGR_DEBUG > 1 */

/*
**  QM_TO_SC_ADD_RCPT -- add another recipient to a transaction for SMTPC
**
**	Parameters:
**		qsc_ctx -- QMGR SMTPC context (currently unused)
**		aq_rcpt -- AQ recipient
**		rcbe -- RCB entry
**
**	Returns:
**		usual sm_error code; see sm_rcb_putv()
**
**	Side Effects: does not clean up rcb in case of an error.
**
**	Last code review: 2005-04-04 21:05:04
**	Last code change:
*/

static sm_ret_T
qm_to_sc_add_rcpt(qsc_ctx_P qsc_ctx, aq_rcpt_P aq_rcpt, sm_rcbe_P rcbe)
{
	sm_rcb_P rcb;
	sm_ret_T ret;

	rcb = &rcbe->rcbe_rcb;
	ret = sm_rcb_putv(rcb, RCB_PUTV_NONE,
		SM_RCBV_INT, RT_Q2C_RCPT_IDX, aq_rcpt->aqr_idx,
		SM_RCBV_STR, RT_Q2C_RCPT, aq_rcpt->aqr_pa,
		SM_RCBV_END);

#if MTA_USE_TLS
	if (sm_is_success(ret) &&
	    sm_is_success(aq_rcpt->aqr_maprescnf) &&
		aq_rcpt->aqr_conf != NULL)
	{
		ret = sm_rcb_putv(rcb, RCB_PUTV_NONE,
			SM_RCBV_INT, RT_Q2C_MAP_RES_CNF_RCPT, aq_rcpt->aqr_maprescnf,
			SM_RCBV_STR, RT_Q2C_RHS_CNF_RCPT, aq_rcpt->aqr_conf,
			SM_RCBV_END);
	}
#endif /* MTA_USE_TLS */
	if (sm_is_err(ret)) goto error;
	return ret;

  error:
	QM_LEV_DPRINTFC(QDC_SCHED, 0, (QM_DEBFP, "sev=ERROR, func=qm_to_sc_add_rcpt, ret=%r\n", ret));
	return ret;
}

/*
**  QMGR_FIND_DA -- Find an available DA
**	NOTE: this requires that all DAs can do all kind of deliveries!
**
**	Parameters:
**		qmgr_ctx -- QMGR context
**		pqsc_bit -- (pointer to) QSC bit (output)
**		pqsc_ctx -- (pointer to) QSC context (output)
**			if this points to a valid qsc_ctx: don't return the
**			same one.
**		pda_avail -- (pointer to) how many entries are free? (output)
**		pda_idle -- (pointer to) how many entries are idle? (output)
**
**	Returns:
**		usual sm_error code; SM_E_NO_DA, (un)lock
**
**	Locking: locks qmgr_ctx
**
**	Last code review: 2005-04-04 22:07:13
**	Last code change: 2006-01-29 05:03:32
*/

static sm_ret_T
qmgr_find_da(qmgr_ctx_P qmgr_ctx, uint32_t *pqsc_bit, qsc_ctx_P *pqsc_ctx, uint *pda_avail, uint *pda_idle)
{
	int i, r;
	uint32_t j;
	uint da_avail, da_idle;
	sm_ret_T ret;
	qsc_ctx_P qsc_ctx;

	SM_REQUIRE(pqsc_ctx != NULL);
	SM_REQUIRE(pqsc_bit != NULL);
	*pqsc_bit = 0;
	r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex);
	SM_LOCK_OK(r);
	if (r != 0) {
		sm_log_write(qmgr_ctx->qmgr_lctx,
			QM_LCAT_SMTPS, QM_LMOD_FROM_SMTPS,
			SM_LOG_CRIT, 4,
			"sev=CRIT, func=qmgr_find_da, lock=%d\n", r);
		return sm_error_temp(SM_EM_Q_SCHED, r);
	}
	ret = sm_error_warn(SM_EM_Q_SCHED, SM_E_NO_DA);	/* XXX */
	for (j = 1, i = 0; i < QM_N_SC_GLI(qmgr_ctx); i++, j *= 2)
	{
		if ((qmgr_ctx->qmgr_sc_li.qm_gli_used & j) != 0
		    && (qsc_ctx = qmgr_li_sc(qmgr_ctx, i)) != NULL
		    && qsc_ctx->qsc_com.rcbcom_tsk != NULL
		    && QSC_IS_RUNNING(qsc_ctx)
		    && qsc_ctx != *pqsc_ctx
		   )
		{
			/* check whether the DA has free capacity */
			(void) dadb_entry_avail(qsc_ctx->qsc_dadb_ctx, &da_avail, &da_idle, THR_LOCK_UNLOCK);
			if (da_avail > 0 || da_idle > 0) {
				if (pda_avail != NULL)
					*pda_avail = da_avail;
				if (pda_idle != NULL)
					*pda_idle = da_idle;

				*pqsc_ctx = qsc_ctx;
				*pqsc_bit = j;
				ret = SM_SUCCESS;
				break;
			}
		}
	}
	r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex);
	SM_ASSERT(r == 0);
	SM_ASSERT(ret != SM_SUCCESS || *pqsc_ctx != NULL);
	if (r != 0 && sm_is_success(ret))
		ret = sm_error_perm(SM_EM_Q_SCHED, r);
	return ret;
}

/*
**  SCHED_MV2NXTMX -- Maybe move rcpt to next MX (if same priority)
**
**	Parameters:
**		qmgr_ctx -- QMGR context
**		aq_ctx -- AQ context
**		aq_rcpt -- AQ recipient
**		time_now -- current time
**
**	Returns:
**		usual sm_error code; ENOMEM, SM_E_UNEXPECTED et.al.
**
**	Locking: aq_ctx must be locked
**
**	Side Effects: may remove aq_rcpt from RDQ without adding it
**		to next one due to resource problem
**
**	Last code review: 2005-04-04 22:19:52
**	Last code change:
*/

static sm_ret_T
sched_mv2nxtmx(qmgr_ctx_P qmgr_ctx, aq_ctx_P aq_ctx, aq_rcpt_P aq_rcpt, time_T time_now)
{
	sm_ret_T ret;
	uint addr_cur;

	ret = SM_SUCCESS;
	addr_cur = aq_rcpt->aqr_addr_cur;
	if (AQR_MORE_ADDR(aq_rcpt) &&
	    aq_rcpt->aqr_addrs[addr_cur].aqra_pref ==
		aq_rcpt->aqr_addrs[addr_cur + 1].aqra_pref &&
	    aq_rcpt->aqr_addrs[addr_cur].aqra_expt >= time_now)
	{
		ret = aq_rdq_rm(aq_ctx, aq_rcpt, THR_NO_LOCK, qmgr_ctx->qmgr_lctx);
		if (sm_is_err(ret)) {
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_SCHED, QM_LMOD_SCHED,
				SM_LOG_ERR, 1,
				"sev=ERROR, func=sched_mv2nxtmx, aq_rdq_rm=%m, flags=%#x",
				ret, aq_rcpt->aqr_flags);
			return ret;
		}
		aq_rcpt->aqr_addr_cur++;
		ret = aq_rdq_add(aq_ctx, aq_rcpt, NULL /* &aqrdq_flags */, THR_NO_LOCK);
		if (sm_is_err(ret)) {
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_SCHED, QM_LMOD_SCHED,
				SM_LOG_ERR, 1,
				"sev=ERROR, func=sched_mv2nxtmx, aq_rdq_add=%m, flags=%#x",
				ret, aq_rcpt->aqr_flags);
			return ret;
		}
	}
	return ret;
}

/*
**  SCHED2LONG -- AQ rcpt takes too long to schedule
**
**	Parameters:
**		qmgr_ctx -- QMGR context
**		aq_ctx -- AQ context
**		aq_rcpt -- AQ recipient
**		time_now -- current time
**
**	Returns:
**		usual sm_error code
**
**	Locking: aq_ctx must be locked
**
**	Last code review:
**	Last code change:
*/

static sm_ret_T
sched2long(qmgr_ctx_P qmgr_ctx, aq_ctx_P aq_ctx, aq_rcpt_P aq_rcpt, time_T time_now)
{
	sm_ret_T ret;

	ret = SM_SUCCESS;
	sm_log_write(qmgr_ctx->qmgr_lctx,
		QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_ERR, 1,
		"sev=ERROR, func=sched2long, ss_ta=%s, idx=%u, now-entered=%ld, status=timeout_in_scheduler"
		, aq_rcpt->aqr_ss_ta_id
		, aq_rcpt->aqr_idx
		, (long) (time_now - aq_rcpt->aqr_entered));

	/*
	**  XXX AND NOW?
	**  what's the simplest way to remove this from AQ?
	**  move it from rcpt dest queue to wait queue and
	**  "cheat" on the timeout ("immediate")?
	*/

	++aq_ctx->aq_t_da;
	AQR_DA_INIT(aq_rcpt);
	AQR_SET_FLAG(aq_rcpt, AQR_FL_SCHEDF);

	/* Was: rdq from todo to busy? check cleanup code (and how rdq is used) */
	ret = aq_rdq_rm(aq_ctx, aq_rcpt, THR_NO_LOCK, qmgr_ctx->qmgr_lctx);
	if (sm_is_err(ret)) {
		sm_log_write(qmgr_ctx->qmgr_lctx,
			QM_LCAT_SCHED, QM_LMOD_SCHED,
			SM_LOG_ERR, 1,
			"sev=ERROR, func=sched2long, aq_rdq_rm=%m, flags=%#x",
			ret, aq_rcpt->aqr_flags);
		return ret;
	}

	/* ??? use DA waitq for this? or yet another queue??? */
	ret = aq_waitq_add(aq_ctx, aq_rcpt, time_now, AQWQ_DA, THR_NO_LOCK);
	if (sm_is_err(ret)) {
		sm_log_write(qmgr_ctx->qmgr_lctx,
			QM_LCAT_SCHED, QM_LMOD_SCHED,
			SM_LOG_ERR, 1,
			"sev=ERROR, func=sched2long, aq_waitq_add=%m",
			ret);
		return ret;
	}
	ret = qmgr_set_aq_cleanup(qmgr_ctx->qmgr_cleanup_ctx, time_now, true);
	if (sm_is_err(ret)) {
		sm_log_write(qmgr_ctx->qmgr_lctx,
			QM_LCAT_SCHED, QM_LMOD_SCHED,
			SM_LOG_ERR, 1,
			"sev=ERROR, func=sched2long, qmgr_set_aq_cleanup=%m",
			ret);
		return ret;
	}
	return ret;
}

/*
**  SAMEDOMAINOK -- Are all recipients for the same domain ready for scheduling?
**
**	Parameters:
**		aq_rcpt -- AQ recipient
**		defaultdomain -- domain to use if recipient doesn't have one
**
**	Returns:
**		true: all ok
**		false: at least one is not yet ready
**
**	Locking: aq_ctx must be locked
**
**	Last code review: 2005-04-04 23:49:13
**	Last code change: 2006-06-11 04:14:31
*/

static bool
samedomainok(aq_rcpt_P aq_rcpt, sm_str_P defaultdomain)
{
	aq_rcpt_P aq_rcpt_nxt;
	bool ok;

	/*
	**  Check whether all recipients with the same domain
	**  are ready for scheduling.
	**  Notes:
	**  1. it is necessary to look in both directions.
	**  this assumes that recipients for the same domain
	**  are treated "similar" by smar; it is not required
	**  that they actually have the same destination,
	**  but if it takes much longer for some of the
	**  recipients to determine the resolved addresses
	**  than for others (even with the same domain! i.e.,
	**  routing based on other criteria), then this may
	**  add unnecessary delays.
	**  2. this function requires that the rcpts are sorted by domain.
	*/

	ok = true;
	for (aq_rcpt_nxt = AQR_SS_SUCC(aq_rcpt);
	     ok && aq_rcpt_nxt != aq_rcpt &&
	     aq_rcpt_eq_domain(aq_rcpt, aq_rcpt_nxt, defaultdomain)
								== SM_SUCCESS;
	     aq_rcpt_nxt = AQR_SS_SUCC(aq_rcpt_nxt))
	{
		ok = AQR_IS_FLAG(aq_rcpt_nxt, AQR_FL_RDY4DLVRY);
	}
	for (aq_rcpt_nxt = AQR_SS_PRED(aq_rcpt);
	     ok && aq_rcpt_nxt != aq_rcpt &&
	     aq_rcpt_eq_domain(aq_rcpt, aq_rcpt_nxt, defaultdomain)
								== SM_SUCCESS;
	     aq_rcpt_nxt = AQR_SS_PRED(aq_rcpt_nxt))
	{
		ok = AQR_IS_FLAG(aq_rcpt_nxt, AQR_FL_RDY4DLVRY);
	}
	return ok;
}

/*
**  QMGR_RPCTS_TA -- return max number of recipients for this transaction
**
**	Parameters:
**		qmgr_ctx -- QMGR context
**		aq_rcpt -- AQ recipient
**
**	Returns:
**		max number of recipients for this transaction
*/

static uint
qmgr_rpcts_ta(qmgr_ctx_P qmgr_ctx, aq_rcpt_P aq_rcpt)
{
	SM_IS_QMGR_CTX(qmgr_ctx);
	SM_IS_AQ_RCPT(aq_rcpt);

	if (aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4
	    == LMTP_IPV4_ADDR)
		return qmgr_ctx->qmgr_cnf.q_cnf_lmtp_rcpts_ta;
	return qmgr_ctx->qmgr_cnf.q_cnf_smtp_rcpts_ta;
}

/*
**  QM_SESS_KEEP_OPEN -- is it useful to keep a session open?
**
**	Parameters:
**		qmgr_ctx -- QMGR context
**		aq_ta -- AQ transaction
**		aq_rcpt -- AQ recipient
**		max_rcpts_ta -- max recipients per transaction
**
**	Returns:
**		true iff is it useful to keep a session open
*/

static bool
qm_sess_keep_open(qmgr_ctx_P qmgr_ctx, aq_ta_P aq_ta, aq_rcpt_P aq_rcpt, uint max_rcpts_ta)
{
	uint nrcpts;
	aq_rcpt_P aq_rcpt_nxt;

	if (!QCNF_IS_FLAG(qmgr_ctx, QCNF_FL_SE_REUSE))
		return false;
	aq_rcpt_nxt = AQ_RDQ_NEXT(aq_rcpt);
	if (aq_rcpt_nxt == NULL || aq_rcpt_nxt == aq_rcpt)
		return false;

	for (nrcpts = 0;
	     aq_rcpt_nxt != NULL && aq_rcpt_nxt != aq_rcpt && nrcpts < max_rcpts_ta;
	     aq_rcpt_nxt = AQ_RDQ_NEXT(aq_rcpt_nxt))
	{
		if (AQ_TA_IS_FLAG(aq_ta, AQ_TA_FL_VERP)
		    && AQR_IS_FLAG(aq_rcpt, AQR_FL_HAS_VERP)
		    && SAME_TRANSACTION(aq_rcpt, aq_rcpt_nxt))
			return false;
		++nrcpts;
	}
	return true;
}

/*
**  QMGR_SCHED_DLVRY -- simple scheduler...
**
**	Parameters:
**		qmgr_ctx -- QMGR context
**		aq_ctx -- AQ context
**		pqsc_bits -- (bitmask) which DAs need to be started (output)
**		pdelay_next_try -- (pointer to) delay until next try (output)
**
**	Returns:
**		usual sm_error code
**
**	Locking: aq_ctx must be locked
**
**	Todo: this is not very sophisticated.
**		split this in multiple functions? it's too long...
**
**	Last code review:
**	Last code change:
*/

/*
**  Delay for scheduler after encountering too many open connections.
**  The scheduler should be woken up after a TA close RCB is received
**  from a DA for which a TA is waiting. This data should be recorded
**  in the DA connection cache.
*/

#define DELAY_TOO_MANY	300
#define SET_DELAY_NEXT_TRY(d, where)					\
	do								\
	{								\
		if ((d) < *pdelay_next_try || *pdelay_next_try == 0)	\
		{							\
			*pdelay_next_try = (d);				\
			QM_LEV_DPRINTFC(QDC_SCHED, 5, (QM_DEBFP, "func=qmgr_sched_dlvry, where=%s, delay=%d\n", (where), (d)));	\
		}							\
	} while (0)

static sm_ret_T
qmgr_sched_dlvry(qmgr_ctx_P qmgr_ctx, aq_ctx_P aq_ctx, uint32_t *pqsc_bits, int *pdelay_next_try)
{
#undef SMFCT
#define SMFCT   "qmgr_sched_dlvry"
	sm_ret_T ret, res;
	aq_rcpt_P aq_rcpt, aq_rcpt_nxt;
	aq_ta_P aq_ta;
	qsc_ctx_P qsc_ctx;
	sm_rcbe_P rcbe;
	dadb_entry_P dadb_entry;
	aq_rsnd_ctx_P aq_rsnd_ctx;
	uint i, nrcpts, connlimit, conncur, max_rcpts_ta, da_avail, da_idle;
	int r;
	uint32_t qsc_bit;
	bool stopit, se_reuse, se_keep_open, occe_locked;
	time_T time_now;
	sessta_id_T da_ta_id;
	occ_entry_P occ_entry;
	aqrdq_ctx_P aqrdq;
	uint todo_entries, rdq_entries;

	/*
	**  Macro to move rcpt to end of list unless it's there already.
	*/

#define SM_MV_RCPT2END	\
	if (aq_rcpt == AQ_RDQ_LAST(aqrdq->aqrdq_rcpts))	\
		break;						\
	AQ_RDQ_REMOVE(aqrdq->aqrdq_rcpts, aq_rcpt);	\
	AQ_RDQ_INSERT_TAIL(aqrdq->aqrdq_rcpts, aq_rcpt);	\
	continue

#define QM_SCHED_RCBE_FREE			\
	while (rcbe != NULL)			\
	{					\
		aq_rsnd_ctx_free(aq_rsnd_ctx);	\
		aq_rsnd_ctx = NULL;		\
		sm_rcbe_free(rcbe);		\
		rcbe = NULL;			\
	}

	SM_IS_QMGR_CTX(qmgr_ctx);
	SM_IS_AQ(aq_ctx);
	SM_REQUIRE(pqsc_bits != NULL);
	SM_REQUIRE(pdelay_next_try != NULL);
	*pqsc_bits = 0;
	da_ta_id[0] = '\0';
	rcbe = NULL;
	aq_rsnd_ctx = NULL;
	occe_locked = stopit = false;
	max_rcpts_ta = INT_MAX / 2;

	/*
	**  Do we have a DA? Just one...
	**  Note: this may sleep() while holding AQ mutex, but only
	**  if there was never a DA available before (QMGR_SFL_HAD_DA),
	**  i.e., during startup.
	*/

	qsc_ctx = NULL;
	while (!sm_is_success(ret =
		qmgr_find_da(qmgr_ctx, &qsc_bit, &qsc_ctx, NULL, NULL)))
	{
		if (qmgr_ctx->qmgr_st_time + qmgr_ctx->qmgr_cnf.q_cnf_wait4clt
				<= evthr_time(qmgr_ctx->qmgr_ev_ctx)
		    || QMGR_IS_SFLAG(qmgr_ctx, QMGR_SFL_HAD_DA))
		{
			if (!QMGR_IS_SFLAG(qmgr_ctx, QMGR_SFL_DA)) {
				sm_log_write(qmgr_ctx->qmgr_lctx,
					QM_LCAT_SCHED, QM_LMOD_SCHED,
					SM_LOG_WARN, 8,
					"sev=WARN, func=qmgr_sched_dlvry, status=no delivery agent available");
				QMGR_SET_SFLAG(qmgr_ctx, QMGR_SFL_DA);
			}
			qmgr_ctx->qmgr_tm_no_da = evthr_time(qmgr_ctx->qmgr_ev_ctx);
			return sm_error_warn(SM_EM_Q_SCHED, SM_E_NO_DA);
		}
		sleep(1);
	}
	if (!QMGR_IS_SFLAG(qmgr_ctx, QMGR_SFL_HAD_DA))
		QMGR_SET_SFLAG(qmgr_ctx, QMGR_SFL_HAD_DA);
	QMGR_CLR_SFLAG(qmgr_ctx, QMGR_SFL_DA);

	SM_IS_QSC_CTX(qsc_ctx);
	QM_LEV_DPRINTFC(QDC_SCHED, 7, (QM_DEBFP, "func=qmgr_sched_dlvry, qsc_ctx=%p\n", qsc_ctx));
	SM_IS_EVTHR_TSK(qsc_ctx->qsc_com.rcbcom_tsk);

#if 0
//	r = pthread_mutex_lock(&qsc_ctx->qsc_mutex);
//	SM_LOCK_OK(r);
//	if (r != 0)
//	{
//		sm_log_write(qmgr_ctx->qmgr_lctx,
//			QM_LCAT_SCHED, QM_LMOD_SCHED,
//			SM_LOG_CRIT, 1,
//			"sev=CRIT, func=qmgr_sched_dlvry, lock_qsc=%d", r);
//	}
//	else
//	{
//		/* XXX might be able to use an existing session */
//		if (qsc_ctx->qsc_curactive >= qsc_ctx->qsc_maxthreads
//		    && !QCNF_IS_FLAG(qsc_ctx->qsc_qmgr_ctx, QCNF_FL_SE_REUSE))
//		{
//QM_LEV_DPRINTFC(QDC_SCHED, 0, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, where=1, qsc_ctx=%p, curactive=%d >= max=%d\n",
//qsc_ctx, qsc_ctx->qsc_curactive, qsc_ctx->qsc_maxthreads));
//			r = pthread_mutex_unlock(&qsc_ctx->qsc_mutex);
//			SM_ASSERT(r == 0);
//			return sm_error_warn(SM_EM_Q_SCHED, SM_E_NO_DA_FREE);
//		}
//		r = pthread_mutex_unlock(&qsc_ctx->qsc_mutex);
//		SM_ASSERT(r == 0);
//	}
#endif /* 0 */

	ret = SM_SUCCESS;
	time_now = evthr_time(qmgr_ctx->qmgr_ev_ctx);
	QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "func=qmgr_sched_dlvry, rdq_entries=%d\n", aq_ctx->aq_rdq_used));

	/*
	**  Go through all rcpt destination queues.
	**
	**  Note: currently this code assumes that all recipients can be
	**  delivered by all delivery agents.
	**  Question: what "breaks" if different delivery agents or different
	**  delivery classes need to be selected?
	**  Is is necessary to have rcpt destination queues per delivery class?
	**  Is is necessary to have delivery superclasses, i.e., sets of
	**  delivery classes which are implemented by the same set of DAs
	**  (such that it doesn't make a difference if any of the delivery
	**  classes inside a superclass is selected because they all share
	**  the same DAs)?
	*/

	for (rdq_entries = aq_ctx->aq_rdq_used; rdq_entries > 0; rdq_entries--)
	{
		/*
		**  Take the head of the list, and "rotate" it to the end.
		**  If there would be a "rotate" operation, then this wouldn't
		**  be necessary unless a problem occurs in which case the
		**  current entry would become the first element of the list
		**  to achieve some kind of fairness.
		*/

		aqrdq = AQ_RDQS_FIRST(aq_ctx->aq_rdqs);
		AQ_RDQS_REMOVE(aq_ctx->aq_rdqs, aqrdq);
		AQ_RDQS_INSERT_TAIL(aq_ctx->aq_rdqs, aqrdq);
		AQRDQ_CLR_FLAG(aqrdq, AQRDQ_FL_OCEXC);

		todo_entries = aqrdq->aqrdq_entries;
		QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "func=qmgr_sched_dlvry, todo_entries=%d\n", todo_entries));

		/*
		**  Note: the loop construct is a problem:
		**  1. aq_rcpt might be removed (most likely)
		**	hence AQ_RDQ_NEXT(aq_rcpt) can't be done inside
		**	the "for" statement.
		**  2. aq_rcpt_nxtl might be removed from the TODO queue too
		**	by the "add more recipients to this transaction" code.
		**  Possible solutions:
		**  1. when removing a recipient from the TODO list, check it
		**  against aq_rcpt_nxtl: if it is the same: go to the next.
		**  2. check whether aq_rcpt_nxtl is still in TODO list,
		**  if not: what then? we can't go to the next entry: this
		**  isn't in the right list...
		**
		**  It seems the loop should be restructured:
		**  all entries in the TODO list are ready for delivery,
		**  hence after each iteration the head of the list should
		**  have been removed from the list, i.e.,
		**  while (!empty) { rcpt = head(todo_list); schedule }
		**
		**  However, even that is incorrect because some recipients
		**  might not be scheduled despite being in the todo list.
		**  See below for two cases.
		**  The hack applied in those cases (move the item to the
		**  end of the list) doesn't work if this happens more than
		**  once in a list. Hence this restriction must be used
		**  unless a better way, e.g., a flag, can be found.
		**
		**  The current solution:
		**  use a for loop restricted by the number of todo entries
		**  always access the first element of the list
		**	rcpts that aren't scheduled must be moved to the end
		**  stop if the list is empty
		**	this can happen if the scheduler adds more rcpts
		**	inside the loop to a transaction
		*/

		/* go through all "todo" rcpts for each rcpt dest queue */
		for (i = 0;
		     i < todo_entries && !stopit
		     && !AQ_RDQ_EMPTY(aqrdq->aqrdq_rcpts);
		     i++)
		{
			da_avail = da_idle = 0;
			aq_rcpt = AQ_RDQ_FIRST(aqrdq->aqrdq_rcpts);
			SM_IS_AQ_RCPT(aq_rcpt);
#if QMGR_TEST
			if (SM_IS_FLAG(qmgr_ctx->qmgr_cnf.q_cnf_tests, QMGR_TEST_DLY_SCHED)
			    && AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_DLY))
			{
				SM_ASSERT(false);	/* abort */
			}
#endif

			aq_ta = aq_rcpt->aqr_ss_ta;
			QM_LEV_DPRINTFC(QDC_SCHED, 5, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, aq_rcpt=%p, aqr_pa=%S, ss_ta=%s, idx=%u, aqr_flags=%#x, sched=%d, aq_ta=%p\n", aq_rcpt, aq_rcpt->aqr_pa, aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_idx, aq_rcpt->aqr_flags, AQR_SCHEDULE(aq_rcpt), aq_ta));
#if QMGR_DEBUG > 1
			aq_rcpt_print(aq_rcpt);
#endif /* QMGR_DEBUG > 1 */

			SM_IS_AQ_TA(aq_ta);
#if QMGR_DEBUG > 1
			aq_ta_print(aq_ta);
#endif

			SM_ASSERT(!AQR_IS_FLAG(aq_rcpt, AQR_FL_WAITQ_AR));
			SM_ASSERT(!AQR_IS_FLAG(aq_rcpt, AQR_FL_TMOUT|AQR_FL_SCHED|AQR_FL_PERM));
			SM_ASSERT(!AQR_IS_DSNFL(aq_rcpt, AQR_DSNFL_F_HBG));
			SM_ASSERT(AQR_SCHEDULE(aq_rcpt));

			if (AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_DSN))
				QM_LEV_DPRINTFC(QDC_SCHED, 5, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, aq_rcpt=%p, aq_rcpt_pa=%S, idx=%u, type=bounce, tried=%u, total=%u, now-entered=%ld\n", aq_rcpt, aq_rcpt->aqr_pa, aq_rcpt->aqr_idx, aq_ta->aqt_rcpts_tried, aq_ta->aqt_rcpts_tot, (long) (time_now - aq_rcpt->aqr_entered)));

			/*
			**  Note: the timeout must be significantly larger
			**  than the SMTPC delivery timeout!
			**  Check whether the item itself is too old:
			**  this can cause problems if there are many
			**  destinations that are unavailable (many delivery
			**  attempts)
			**  and check whether the time since the last
			**  scheduling attempt is exceeded too.
			**  Should this check the time since the last
			**  "status update" (e.g., scheduled, result from DA
			**  result from AR, others?)
			*/

			if (aq_rcpt->aqr_entered + qmgr_ctx->qmgr_tmo_sched
			    < time_now &&
			    aq_rcpt->aqr_last_try + qmgr_ctx->qmgr_tmo_sched
			    < time_now &&
			    qmgr_ctx->qmgr_tm_no_da + qmgr_ctx->qmgr_tmo_sched
			    < time_now
			   )
			{
				/*
				**  This is taking too long...
				**  something must be wrong.
				*/

				ret = sched2long(qmgr_ctx, aq_ctx, aq_rcpt, time_now);
				if (sm_is_err(ret)) {
					stopit = true;
					break;
				}
				continue;
			}

			/*
			**  If this is a (double)bounce then skip it if
			**  - there aren't enough tried recipients (XXX?)
			**  or
			**  - the timeout hasn't occurred yet.
			**
			**  Note: this is one place where a recipient can be in
			**  the todo queue but it is skipped by the scheduler.
			**  See also next if statement.
			*/

			if (AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_DSN) &&
			    !(aq_ta->aqt_rcpts_tot - aq_ta->aqt_rcpts_tried <=
					(aq_ta_has_bounce(aq_ta) ? 1 : 0) +
					(aq_ta_has_dbl_bounce(aq_ta) ? 1 : 0)
			      || aq_rcpt->aqr_entered +
				 qmgr_ctx->qmgr_cnf.q_cnf_t_sched_dsn
				 < time_now)
			   )
			{
				int d;

				d = aq_rcpt->aqr_entered +
				    qmgr_ctx->qmgr_cnf.q_cnf_t_sched_dsn -
				    time_now;
				if (d == 0)
					d = 1;	/* need a minimum delay */
				SET_DELAY_NEXT_TRY(d, "bounce/timeout");

				SM_MV_RCPT2END;
			}

			/*
			**  XXX Hack: need
			**  no more recipients to receive from AR
			**  or
#if 0
			**  (at least half of the recipients resolved and
			**   current number of entries greater than
			**	(entries being delivered / 16))
#else * 0 *
			**  the number of recipients is less than 10
			**  (i.e., for small numbers all recipients must be
			**  resolved before continuing).
#endif * 0 *
			**
			**  This is only a hack anyway, we need a real
			**  scheduler...
			**
			**  Note: aqt_rcpts_ar counts also bounce recipients
			**  that have been sent to AR! Should it do that?
			**
			**  Note: this is another place where a recipient can
			**  be in the todo queue but it is skipped by the
			**  scheduler. See also previos if statement.
			*/

			QM_LEV_DPRINTFC(QDC_SCHED, 6, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, aqr_pa=%S, status=%d, aqt_rcpts_ar=%d, aqt_rcpts_inaq=%d, aq_entries=%d, t_da=%d, skip=%d\n", aq_rcpt->aqr_pa, aq_rcpt->aqr_status, aq_ta->aqt_rcpts_ar, aq_ta->aqt_rcpts_inaq, aq_ctx->aq_entries, aq_ctx->aq_t_da, !((aq_ta->aqt_rcpts_ar == 0) || (aq_ta->aqt_rcpts_ar <= (aq_ta->aqt_rcpts_inaq >> 1) && aq_ta->aqt_rcpts_inaq > 10))));

			if (!((aq_ta->aqt_rcpts_ar == 0)
			      || (aq_ta->aqt_rcpts_ar <= (aq_ta->aqt_rcpts_inaq >> 1)
#if 0
				  && (aq_ctx->aq_entries > (aq_ctx->aq_t_da >> 4))
#else
				  && aq_ta->aqt_rcpts_inaq > 10
#endif
				 )
			     )
			   )
			{
				SM_MV_RCPT2END;
			}

			/*
			**  Check whether all recipients with the same domain
			**  are ready for scheduling.
			*/

			if (!samedomainok(aq_rcpt, qmgr_ctx->qmgr_hostname)) {
				QM_LEV_DPRINTFC(QDC_SCHED, 5, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, aqr_pa=%S, status=skipped_because_other_entry_is_not_ready\n",
aq_rcpt->aqr_pa));
				SM_MV_RCPT2END;
			}

			/* check whether current DA can be used */
			ret = dadb_entry_avail(qsc_ctx->qsc_dadb_ctx, &da_avail, &da_idle, THR_LOCK_UNLOCK);
			if (da_avail <= 0 && da_idle <= 0) {
				/* find another DA */
				ret = qmgr_find_da(qmgr_ctx, &qsc_bit, &qsc_ctx, &da_avail, &da_idle);
				if (sm_is_err(ret)) {
					if (sm_error_warn(SM_EM_Q_SCHED, SM_E_NO_DA) == ret)
						qmgr_ctx->qmgr_tm_no_da = time_now;
#if 0
					QM_LEV_DPRINTFC(QDC_SCHED, 0, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, where=3, qsc_ctx=%p, curactive=%d >= max=%d\n",
qsc_ctx, qsc_ctx->qsc_curactive, qsc_ctx->qsc_maxthreads));
#else /* 0 */
					QM_LEV_DPRINTFC(QDC_SCHED, 0, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, where=3, qsc_ctx=%p, status=no_free_DA\n",
qsc_ctx));
#endif /* 0 */
					stopit = true;
					break;
				}
				QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, switch, qsc_ctx=%p\n", qsc_ctx));
			}

#if 0
//			/* XXX What's this? 2003-11-12 */
//			ret = aq_ta_find(aq_ctx, aq_rcpt->aqr_ss_ta_id, false, &aq_ta);
//			if (sm_is_err(ret)) {
//				/* COMPLAIN */
//				QM_LEV_DPRINTFC(QDC_SCHED, 0, (QM_DEBFP, "sev=ERROR, func=qmgr_sched_dlvry, can't find ta for ss_ta=%s, ret=%r\n",
//					aq_rcpt->aqr_ss_ta_id, ret));
//				continue;
//			}
#endif /* 0 */

			/* add delivery information to SMTPC RCB entry */
			ret = qm_rcbe_new(qmgr_ctx, &rcbe, -1);
			if (sm_is_err(ret)) {
				sm_log_write(qmgr_ctx->qmgr_lctx,
					QM_LCAT_SCHED, QM_LMOD_SCHED,
					SM_LOG_ERR, 1,
					"sev=ERROR, func=qmgr_sched_dlvry, qm_rcbe_new=%m",
					ret);
				stopit = true;
				break;
			}

			/* create callback context for RCB send */
			ret = aq_rsnd_ctx_new(aq_ctx, aq_rcpt, &aq_rsnd_ctx);
			if (sm_is_err(ret) || NULL == aq_rsnd_ctx) {
				sm_log_write(qmgr_ctx->qmgr_lctx,
					QM_LCAT_SCHED, QM_LMOD_SCHED,
					SM_LOG_ERR, 1,
					"sev=ERROR, func=qmgr_sched_dlvry, aq_rsnd_ctx_new=%m",
					ret);
				sm_rcbe_free(rcbe);
				rcbe = NULL;
				stopit = true;
				break;
			}
			sm_rcbe_setcb(rcbe, aq_rcptsent2da, (void *)aq_rsnd_ctx);

			dadb_entry = NULL;
			occ_entry = NULL;
			connlimit = qmgr_ctx->qmgr_cnf.q_cnf_max_conc_conn;
			conncur = 0;

			/* Need to keep occ_entry locked! */
			res = occ_entry_find(qmgr_ctx->qmgr_occ_ctx->occx_ht,
				aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4,
				&occ_entry, &qmgr_ctx->qmgr_occ_ctx->occx_mutex,
				THR_LOCK_UNLERR);
			if (SM_SUCCESS == res && occ_entry != NULL) {
				occe_locked = true;
				SM_ASSERT(occ_entry->occe_open_se >= occ_entry->occe_open_ta);
				connlimit = occ_entry->occe_cur_conc;
				conncur = occ_entry->occe_open_se;
				if (conncur >= connlimit
				    && occ_entry->occe_open_se == occ_entry->occe_open_ta)
				{
					AQRDQ_SET_FLAG(aqrdq, AQRDQ_FL_OCEXC);
					OCCE_SET_FLAG(occ_entry, OCCE_FL_SE_WAIT);
					if (QCNF_IS_FLAG(qmgr_ctx, QCNF_FL_SE_REUSE)) {
						/* always? */
						OCCE_SET_FLAG(occ_entry, OCCE_FL_TA_WAIT);
					}

#if 0
//					r = DELAY_TOO_MANY;
//					SET_DELAY_NEXT_TRY(DELAY_TOO_MANY, "exceeded");
#else /* 0 */
					r = occ_entry->occe_last_upd + occ_entry->occe_timeout - time_now;
					QM_LEV_DPRINTFC(QDC_SCHED, 2, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, occ=limit_exceeded, last_upd=%ld, timeout=%u, now=%ld, r=%d\n",
(long) occ_entry->occe_last_upd, occ_entry->occe_timeout, (long) time_now, r));
					if (r <= 0)
						r = DELAY_TOO_MANY;
					SET_DELAY_NEXT_TRY(r, "exceeded");
#endif /* 0 */
					sm_log_write(qmgr_ctx->qmgr_lctx,
						QM_LCAT_SCHED, QM_LMOD_SCHED,
						SM_LOG_INFO,
						OCCE_IS_FLAG(occ_entry, OCCE_FL_LOGEXC) ? 14 : 11,
						"sev=INFO, func=qmgr_sched_dlvry, ss_ta=%s, idx=%d, ip=%A, open_connections=%d, cur_limit=%u, status=limit_exceeded, timeout=%d",
						aq_rcpt->aqr_ss_ta_id,
						aq_rcpt->aqr_idx,
						aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4,
						occ_entry->occe_open_se,
						occ_entry->occe_cur_conc, r);
					OCCE_SET_FLAG(occ_entry, OCCE_FL_LOGEXC);
					r = smthread_mutex_unlock(&qmgr_ctx->qmgr_occ_ctx->occx_mutex);
					SM_ASSERT(r == 0);
					if (r == 0)
						occe_locked = false;

					/* try next MX for this rcpt? */
					ret = sched_mv2nxtmx(qmgr_ctx, aq_ctx, aq_rcpt, time_now);
					QM_SCHED_RCBE_FREE;
					break;
				}
#if 0
//				else if (occ_entry->occe_open_se > occ_entry->occe_open_ta)
//				{
//sm_log_write(qmgr_ctx->qmgr_lctx,
//QM_LCAT_SCHED, QM_LMOD_SCHED,
//SM_LOG_INFO, 11,
//"func=qmgr_sched_dlvry, ip=%A, open_se=%u, open_ta=%u, dadbe=%p, flags=%#x"
//, aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4
//, occ_entry->occe_open_se
//, occ_entry->occe_open_ta
//, occ_entry->occe_dadbe
//, occ_entry->occe_dadbe->dadbe_flags
//);
//					/* first entry should be an available session */
//					if (DADBE_IS_CONN(occ_entry->occe_dadbe))
//					{
//						dadb_entry = occ_entry->occe_dadbe;
//						res = occ_entry->occe_open_se;
//					}
//					else
//QM_LEV_DPRINTFC(QDC_SCHED, 1, (QM_DEBFP, "sev=ERROR, func=qmgr_sched_dlvry, ip=%A, occ_entry=%p, dadb_entry=%p, open_se=%u, open_ta=%u, flags=%#x, status=expected_available_session\n"
//, aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4
//, occ_entry, dadb_entry
//, occ_entry->occe_open_se, occ_entry->occe_open_ta
//, occ_entry->occe_dadbe->dadbe_flags
//));
//				}
#endif /* 0 */
				else
				{
					OCCE_CLR_FLAG(occ_entry, OCCE_FL_LOGEXC);
				}
			}
/*
**  move into previous if-block: an open session can only be found if
**  occ_entry_find() found an entry.
*/
			if (NULL == dadb_entry) {
				/*
				**  Find a DA that has a session open
				**  to the required IP addr
				*/

				res = dadb_se_find_by_ipv4(qsc_ctx->qsc_dadb_ctx,
					aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4,
					time_now, &dadb_entry);
			}

			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_SCHED, QM_LMOD_SCHED,
				SM_LOG_INFO, 15,
				"func=qmgr_sched_dlvry, ip=%A, open_connections=%d, dadb_entry=%p",
				aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].
					aqra_ipv4,
				res, dadb_entry);

			/* restrict number of connections */
			if (dadb_entry == NULL &&
			    ((res > 0 && (uint)res >= connlimit) || conncur >= connlimit))
			{
				r = smthread_mutex_unlock(&qmgr_ctx->qmgr_occ_ctx->occx_mutex);
				SM_ASSERT(r == 0);
				if (r == 0)
					occe_locked = false;
				sm_log_write(qmgr_ctx->qmgr_lctx,
					QM_LCAT_SCHED, QM_LMOD_SCHED,
					SM_LOG_INFO, 13,
					"func=qmgr_sched_dlvry, ip=%A, open_connections=%d, status=limit_exceeded",
					aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4,
					conncur > 0 ? conncur : res);

					SET_DELAY_NEXT_TRY(DELAY_TOO_MANY, "exceeded2");
					break;
			}

			se_reuse = res > 0 && dadb_entry != NULL
				&& QCNF_IS_FLAG(qmgr_ctx, QCNF_FL_SE_REUSE);
			if (se_reuse) {
				SM_ASSERT(occe_locked);

				/* let's reuse this ... */
				ret = dadb_sess_reuse(qsc_ctx, qsc_ctx->qsc_dadb_ctx,
					aq_rcpt->aqr_ss_ta_id, dadb_entry, occ_entry, aq_rcpt);

				r = smthread_mutex_unlock(&qmgr_ctx->qmgr_occ_ctx->occx_mutex);
				SM_ASSERT(r == 0);
				if (r == 0)
					occe_locked = false;

				sm_log_write(qmgr_ctx->qmgr_lctx,
					QM_LCAT_SCHED, QM_LMOD_SCHED,
					SM_LOG_INFO, 9,
					"func=qmgr_sched_dlvry, open_connections=%d, ip=%A, dadb_sess_reuse=%m",
					conncur > 0 ? conncur : res,
					aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4,
					ret);

				if (sm_is_err(ret)) {
					sm_log_write(qmgr_ctx->qmgr_lctx,
						QM_LCAT_SCHED, QM_LMOD_SCHED,
						SM_LOG_ERR, 1,
						"sev=ERROR, func=qmgr_sched_dlvry, dadb_sess_reuse=%m",
						ret);

					/*
					**  if session can't be reused: check
					**  whether connection limit is exceeded
					*/

					if ((uint)res >= connlimit || conncur >= connlimit) {
						sm_log_write(qmgr_ctx->qmgr_lctx,
							QM_LCAT_SCHED,
							QM_LMOD_SCHED,
							SM_LOG_INFO, 11,
							"func=qmgr_sched_dlvry, ip=%A, open_connections=%d, status=session_cannot_be_reused+limit_exceeded",
							aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4,
							res);
						SET_DELAY_NEXT_TRY(DELAY_TOO_MANY, "exceeded3");
						break;
					}
				}
			}
			else if (occe_locked)
			{
				r = smthread_mutex_unlock(&qmgr_ctx->qmgr_occ_ctx->occx_mutex);
				SM_ASSERT(r == 0);
				if (r == 0)
					occe_locked = false;
			}

			/* can't reuse existing connection? try a new one */
			if ((res == 0 || dadb_entry == NULL || sm_is_err(ret))
			    && da_avail > 0)
			{
				se_reuse = false;

				/* open a DA session */
				ret = dadb_sess_open(qsc_ctx, qsc_ctx->qsc_dadb_ctx,
					aq_rcpt->aqr_ss_ta_id,
					aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4,
					aq_rcpt, &dadb_entry);
				if (sm_is_err(ret)) {
					sm_log_write(qmgr_ctx->qmgr_lctx,
						QM_LCAT_SCHED, QM_LMOD_SCHED,
						SM_LOG_ERR, 1,
						"sev=ERROR, func=qmgr_sched_dlvry, qsc_ctx=%p, dadb_sess_open=%m",
						qsc_ctx, ret);

#if QMGR_DEBUG
					{
						dadb_entry_P dadb_entryh;
						dadb_ctx_P dadb_ctx;
						uint l;

						dadb_ctx = qsc_ctx->qsc_dadb_ctx;
#if 0
						QM_LEV_DPRINTFC(QDC_SCHED, 0, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, where=4, qsc_ctx=%p, curactive=%d, max=%d\n",
qsc_ctx, qsc_ctx->qsc_curactive, qsc_ctx->qsc_maxthreads));
#else /* 0 */
						QM_LEV_DPRINTFC(QDC_SCHED, 0, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, where=4, qsc_ctx=%p, entries_cur=%d, entries_max=%d\n",
qsc_ctx, dadb_ctx->dadb_entries_cur, dadb_ctx->dadb_entries_max));
#endif /* 0 */
						for (l = 0; l < dadb_ctx->dadb_entries_max; l++)
						{
							dadb_entryh = (dadb_ctx->dadb_entries)[l];
							QM_LEV_DPRINTFC(QDC_SCHED, 1, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, dadb_entry=%p, flags=%#x\n",
							dadb_entryh, dadb_entryh == NULL ? UINT_MAX : dadb_entryh->dadbe_flags));
						}
					}
#endif /* QMGR_DEBUG */

					/* should this stop the scheduler??? */
					stopit = true;
					break;
				}
				SM_ASSERT(dadb_entry != NULL);
			}

			if (NULL == dadb_entry) {
				/* no free da for this one: try next rdq */
				QM_SCHED_RCBE_FREE;
				break;
			}

			ret = qm_sc_conf(qmgr_ctx,
					aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4,
					dadb_entry);
			if (SM_IS_TEMP_ERR(ret)) {
				sm_log_write(qmgr_ctx->qmgr_lctx,
					QM_LCAT_SCHED, QM_LMOD_SCHED,
					SM_LOG_INFO, 9,
					"sev=INFO, func=qmgr_sched_dlvry, ip=%A, qm_sc_conf=%r"
					, aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4, ret);
				QM_SCHED_RCBE_FREE;
				break;
			}

			/*
			**  Keep the connection open?
			**  Only if it is configured and if there is at least
			**  one other entry in this "ready queue".
			*/

			se_keep_open = qm_sess_keep_open(qmgr_ctx, aq_ta, aq_rcpt, max_rcpts_ta);

			QM_LEV_DPRINTTC(QDC_SCHED, 1, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, session_reuse=%d, keep_open=%d, da_sess=%s, da_ta=%s\n",
se_reuse, se_keep_open, dadb_entry->dadbe_da_se_id, dadb_entry->dadbe_da_ta_id), qmgr_ctx->qmgr_ev_ctx->evthr_c_time);
			ret = qm_to_sc_task(qsc_ctx, se_reuse, !se_keep_open,
					aq_ta, aq_rcpt, rcbe, dadb_entry);
			if (sm_is_err(ret)) {
				sm_log_write(qmgr_ctx->qmgr_lctx,
					QM_LCAT_SCHED, QM_LMOD_SCHED,
					SM_LOG_ERR, 1,
					"sev=ERROR, func=qmgr_sched_dlvry, qm_to_sc_task=%m",
					ret);
				stopit = true;
				break;
			}
			++aq_ctx->aq_t_da;
			SESSTA_COPY(aq_rcpt->aqr_da_ta_id, dadb_entry->dadbe_da_ta_id);
			AQR_DA_INIT(aq_rcpt);

			ret = aq_rdq_rm(aq_ctx, aq_rcpt, THR_NO_LOCK, qmgr_ctx->qmgr_lctx);
			if (sm_is_err(ret)) {
				sm_log_write(qmgr_ctx->qmgr_lctx,
					QM_LCAT_SCHED, QM_LMOD_SCHED,
					SM_LOG_ERR, 1,
					"sev=ERROR, func=qmgr_sched_dlvry, aq_rdq_rm=%m, flags=%#x",
					ret, aq_rcpt->aqr_flags);
				stopit = true;
				break;
			}

			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_SCHED, QM_LMOD_SCHED,
				SM_LOG_INFO, 9,
				"func=qmgr_sched_dlvry, ss_ta=%s, da_sess=%s, da_ta=%s, rcpt=%@S, idx=%d, state=%d, ip=%A, i=%u, reuse=%d, keep_open=%d, entries_cur=%d, entries_max=%d"
				, aq_rcpt->aqr_ss_ta_id
				, dadb_entry->dadbe_da_se_id
				, dadb_entry->dadbe_da_ta_id
				, aq_rcpt->aqr_pa
				, aq_rcpt->aqr_idx, aq_rcpt->aqr_status
				, aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4, i
				, se_reuse, se_keep_open
				, qsc_ctx->qsc_dadb_ctx->dadb_entries_cur
				, qsc_ctx->qsc_dadb_ctx->dadb_entries_max
				);

			max_rcpts_ta = qmgr_rpcts_ta(qmgr_ctx, aq_rcpt);

			/* Add more recipients to this DA transaction? */
			for (aq_rcpt_nxt = AQR_SS_SUCC(aq_rcpt), nrcpts = 1;
			     aq_rcpt_nxt != aq_rcpt && nrcpts < max_rcpts_ta
			     && !AQ_TA_IS_FLAG(aq_ta, AQ_TA_FL_VERP)
			     && !AQR_IS_FLAG(aq_rcpt, AQR_FL_HAS_VERP);
			     aq_rcpt_nxt = AQR_SS_SUCC(aq_rcpt_nxt))
			{
/*
XXX "append" also undeliverable recipients
so they are counted in qda...
*/
				/* check whether DA and host are the same */
				if (AQR_SCHEDULE(aq_rcpt_nxt)
				    && SAME_TRANSACTION(aq_rcpt, aq_rcpt_nxt))
				{
					QM_LEV_DPRINTFC(QDC_SCHED, 1, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, status=more, i=%u, ta=%s, rcpt=%@S, idx=%d, ip=%A, rcpt=%p, rcpt_nxt=%p, nrctps=%u\n", i, aq_rcpt->aqr_ss_ta_id, aq_rcpt_nxt->aqr_pa, aq_rcpt_nxt->aqr_idx, aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4, aq_rcpt, aq_rcpt_nxt, nrcpts));

					ret = qm_to_sc_add_rcpt(qsc_ctx, aq_rcpt_nxt, rcbe);
					if (sm_is_err(ret)) {
						sm_log_write(qmgr_ctx->qmgr_lctx,
							QM_LCAT_SCHED, QM_LMOD_SCHED,
							SM_LOG_ERR, 1,
							"sev=ERROR, func=qmgr_sched_dlvry, qm_to_sc_add_rcpt=%m",
							ret);
						stopit = true;
						break;
					}
					sm_log_write(qmgr_ctx->qmgr_lctx,
						QM_LCAT_SCHED, QM_LMOD_SCHED,
						SM_LOG_INFO, 9,
						"func=qmgr_sched_dlvry, sched=more, ss_ta=%s, da_ta=%s, rcpt=%@S, idx=%d, ip=%A",
						aq_rcpt->aqr_ss_ta_id,
						dadb_entry->dadbe_da_ta_id,
						aq_rcpt_nxt->aqr_pa,
						aq_rcpt_nxt->aqr_idx,
						aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4);

					AQR_SET_FLAG(aq_rcpt_nxt, AQR_FL_SCHED);
					SESSTA_COPY(aq_rcpt_nxt->aqr_da_ta_id, dadb_entry->dadbe_da_ta_id);

					/*
					**  Prepend new recipient to original
					**  one to keep the correct order.
					*/

					AQR_DA_PRE(aq_rcpt, aq_rcpt_nxt);
					++aq_ctx->aq_t_da;
					++nrcpts;

					ret = aq_rdq_rm(aq_ctx, aq_rcpt_nxt, THR_NO_LOCK, qmgr_ctx->qmgr_lctx);
					if (sm_is_err(ret)) {
						sm_log_write(qmgr_ctx->qmgr_lctx,
							QM_LCAT_SCHED, QM_LMOD_SCHED,
							SM_LOG_ERR, 1,
							"sev=ERROR, func=qmgr_sched_dlvry, aq_rdq_rm=%m, flags=%#x, where=more_rcpts2busy",
							ret, aq_rcpt_nxt->aqr_flags);
						stopit = true;
						break;
					}
				}

				/*
				**  Set this here??  That means it is only
				**  set if sent to a DA, not when it is
				**  "looked at" by the scheduler.  Hence if
				**  the address doesn't resolve, last_try
				**  is never set.
				*/

				aq_rcpt_nxt->aqr_last_try = time_now;
			}

			/* Send task to DA */
			ret = sm_rcbcom_endrep(&qsc_ctx->qsc_com, qsc_ctx->qsc_com.rcbcom_tsk, true /* XXX HACK */, &rcbe);
			if (sm_is_err(ret)) {
				aq_rcpt_P aq_rcpt_r;

				if (rcbe != NULL) {
					/*
					**  rcbe has not been added to write
					**  list hence it needs to be cleaned up
					*/

					aq_rsnd_ctx_free(aq_rsnd_ctx);
					aq_rsnd_ctx = NULL;
					sm_rcbe_free(rcbe);
					rcbe = NULL;
				}

				/* undo status change for added recipients */
				for (aq_rcpt_nxt = AQR_DA_PRED(aq_rcpt);
				     aq_rcpt_nxt != aq_rcpt;
				     aq_rcpt_nxt = aq_rcpt_r)
				{
					aq_rcpt_r = AQR_DA_PRED(aq_rcpt);
					AQR_CLR_FLAG(aq_rcpt_nxt, AQR_FL_SCHED);

					res = aq_rdq_add(aq_ctx, aq_rcpt_nxt, NULL /* &aqrdq_flags */, THR_NO_LOCK);
					if (sm_is_err(res)) {
						sm_log_write(qmgr_ctx->qmgr_lctx,
							QM_LCAT_SCHED, QM_LMOD_SCHED,
							SM_LOG_ERR, 1,
							"sev=ERROR, func=qmgr_sched_dlvry, aq_rdq_add=%m, flags=%#x, where=more_rcpts2todo",
							ret, aq_rcpt_nxt->aqr_flags);
					}

					/* XXX also remove entry from list? */
					AQR_DA_DELENTRY(aq_rcpt_nxt);

					/*
					**  XXX Other things to undo?
					**  Make a list of things "done"
					**  so they can be "undone" here.
					*/

				}
				sm_log_write(qmgr_ctx->qmgr_lctx,
					QM_LCAT_SCHED, QM_LMOD_SCHED,
					SM_LOG_ERR, 1,
					"sev=ERROR, func=qmgr_sched_dlvry, sm_qsc_endrep=%m",
					ret);
				stopit = true;
				break;
			}
			else
				aq_rsnd_ctx = NULL;

			/*
			**  At this point the information about the session/
			**  transaction is in the RCB list for the DA, but
			**  it isn't sent yet. Hence AQR_FL_WAIT4UPD can't
			**  be set here, it will be set when the RCB
			**  is sent to the DA.
			*/

#if 0
//			if (!se_reuse)
//			{
//				r = pthread_mutex_lock(&qsc_ctx->qsc_mutex);
//				SM_LOCK_OK(r);
//				if (r != 0)
//				{
//					sm_log_write(qmgr_ctx->qmgr_lctx,
//						QM_LCAT_SCHED, QM_LMOD_SCHED,
//						SM_LOG_CRIT, 4,
//						"sev=CRIT, func=qmgr_sched_dlvry, lock_qsc=%d", r);
//				}
//				else
//				{
//					qsc_ctx->qsc_curactive++;
//					r = pthread_mutex_unlock(&qsc_ctx->qsc_mutex);
//					SM_ASSERT(r == 0);
//				}
//			}
//QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, qsc_ctx=%p, curactive=%d, max=%d\n",
//qsc_ctx, qsc_ctx->qsc_curactive, qsc_ctx->qsc_maxthreads));
#endif /* 0 */
			AQR_SET_FLAG(aq_rcpt, AQR_FL_SCHED);
			aq_rcpt->aqr_last_try = time_now;
			*pqsc_bits |= qsc_bit;
		}

		/*
		**  If stopit is set then the rdq should be rotated so
		**  the next time the scheduler starts at the one that
		**  couldn't be finished this time. XXX
		**  See begin of outer loop, it's currently rotated for
		**  every entry.
		*/

		if (rcbe != NULL) {
			/*
			**  Possible leftover from previous iteration.
			**  Maybe it should be reused?
			*/

			(void) aq_rsnd_ctx_free(aq_rsnd_ctx);
			aq_rsnd_ctx = NULL;
			sm_rcbe_free(rcbe);
			rcbe = NULL;
		}
		SM_ASSERT(NULL == aq_rsnd_ctx);
	}
	res = ret;

	/*
	**  XXX Crude resource control...
	**  This needs to be more sophisticated and take more resources
	**  into account. Moreover, it needs to free memory if required,
	**  and it needs to change the limits on other data structures
	**  (esp. IQDB) too.
	*/

	if (sm_is_err(res) && sm_error_value(res) == ENOMEM) {
#if 0
		/* throttle servers; do this in caller?? */
		(void) qm_control(qmgr_ctx, 1, 100, QMGR_RFL_MEM_I, THR_LOCK_UNLOCK);
#endif
		ret = aq_lower_limit(aq_ctx, 0, THR_NO_LOCK);
		sm_log_write(qmgr_ctx->qmgr_lctx,
			QM_LCAT_SCHED, QM_LMOD_SCHED,
			SM_LOG_WARN, 8,
			"sev=WARN, func=qmgr_sched_dlvry, aq_lower_limit=%m",
			ret);
	}
	else
	{
		/*
		**  XXX This should NOT be done always... but as long
		**  as we don't have a way to measure memory usage...
		**  How about a way to gradually increase it, similar
		**  to qss_control()?
		*/

		ret = aq_raise_limit(aq_ctx, UINT_MAX, THR_NO_LOCK);
	}

	if (rcbe != NULL) {
		aq_rsnd_ctx_free(aq_rsnd_ctx);
		aq_rsnd_ctx = NULL;
		sm_rcbe_free(rcbe);
		rcbe = NULL;
	}
	SM_ASSERT(NULL == aq_rsnd_ctx);
	return res;
}

/*
**  QMGR_SCHED -- scheduler
**	this is running as a task (timeout activated)
**
**	Parameters:
**		tsk -- evthr task
**
**	Returns:
**		usual sm_error code
**
**	Locking: locks entire aq_ctx during operation, returns unlocked
**
**	Last code review:
**	Last code change:
*/

sm_ret_T
qmgr_sched(sm_evthr_task_P tsk)
{
	int r;
	sm_ret_T ret;
	qmgr_ctx_P qmgr_ctx;
	aq_ctx_P aq_ctx;
	bool err;
	uint32_t qsc_bits;
	timeval_T timeval_now, delay;
	int delay_next_try;
#if QMGR_TEST
	int delay_next_try_tst;
#endif

	SM_IS_EVTHR_TSK(tsk);
	qmgr_ctx = (qmgr_ctx_P) tsk->evthr_t_actx;
	SM_IS_QMGR_CTX(qmgr_ctx);
	aq_ctx = qmgr_ctx->qmgr_aq;
	SM_IS_AQ(aq_ctx);
	err = false;
	qsc_bits = 0;

	/* Check ret?? */
	ret = evthr_timeval(tsk->evthr_t_ctx, &timeval_now);
	delay_next_try = 0;

#if QMGR_TEST
	delay_next_try_tst = 0;
	if (QCNF_IS_FLAG(qmgr_ctx, QCNF_FL_NO_SCHED)) {
		delay.tv_usec = 0;
		delay.tv_sec = 100;
		timeradd(&timeval_now, &delay, &tsk->evthr_t_sleep);
		return EVTHR_SLPQ;
	}

	if (SM_IS_FLAG(qmgr_ctx->qmgr_cnf.q_cnf_tests, QMGR_TEST_INT_SRC)) {
		ret = qm_tst_fill_aq(qmgr_ctx);
		if (sm_is_err(ret))
			delay_next_try_tst = sm_is_warn_err(ret) ?
			                     (0 - sm_error_value(ret)) : 300;
		else if (SM_NOTDONE == ret)
			delay_next_try_tst = 1;
#if QMGR_STATS
		if (qmgr_ctx->qmgr_cnf.q_cnf_tot_tas <= qmgr_ctx->qmgr_tas_sent)
sm_io_fprintf(smioerr, "q_cnf_tot_tas=%u, tas_sent=%lu, aq_entries=%u\n", qmgr_ctx->qmgr_cnf.q_cnf_tot_tas, qmgr_ctx->qmgr_tas_sent, aq_ctx->aq_entries);
		if (qmgr_ctx->qmgr_cnf.q_cnf_tot_tas <= qmgr_ctx->qmgr_tas_sent
		    && aq_ctx->aq_entries <= 2)
		{
			qm_info(qmgr_ctx, smioout);
			return EVTHR_TERM;
		}
#endif
	}

	if (SM_IS_FLAG(qmgr_ctx->qmgr_cnf.q_cnf_tests, QMGR_TEST_SCHED_DLY)) {
		static time_T sched_start_delay = 0;

		if (0 == sched_start_delay)
			sched_start_delay = timeval_now.tv_sec + 10;
		if (sched_start_delay > timeval_now.tv_sec) {
			tsk->evthr_t_sleep.tv_sec = sched_start_delay;
			return EVTHR_SLPQ;
		}
		SM_CLR_FLAG(qmgr_ctx->qmgr_cnf.q_cnf_tests, QMGR_TEST_SCHED_DLY);
	}
#else
/* XXX TESTING XXX */
	if (QCNF_IS_FLAG(qmgr_ctx, QCNF_FL_SE_REUSE)) {
		static time_T sched_start_delay = 0;

		if (0 == sched_start_delay)
			sched_start_delay = timeval_now.tv_sec + 10;
		if (sched_start_delay > timeval_now.tv_sec) {
			tsk->evthr_t_sleep.tv_sec = sched_start_delay;
			return EVTHR_SLPQ;
		}
	}
#endif /* QMGR_TEST */

	ret = qm_get_edb_entries(qmgr_ctx, &delay_next_try);
	if (delay_next_try < 0)
		delay_next_try = 1;	/* XXX HACK */

	/* HACK notify qar task */
	if (QDA_ACT_SMAR(ret)) {
		sm_evthr_task_P qar_tsk;
		qar_ctx_P qar_ctx;

		qar_ctx = qmgr_ctx->qmgr_ar_ctx;
		SM_IS_QAR_CTX(qar_ctx);
		qar_tsk = qmgr_ctx->qmgr_ar_tsk;
		if (qar_tsk != NULL)
			ret = evthr_en_wr(qar_tsk);
		else
			ret = sm_error_temp(SM_EM_Q_SCHED, SM_E_NO_AR);
	}

	QM_LEV_DPRINTFC(QDC_SCHED, 6, (QM_DEBFP, "sev=DBG, func=qmgr_sched, tsk=%p\n", tsk));
	r = pthread_mutex_lock(&aq_ctx->aq_mutex);
	SM_LOCK_OK(r);
	if (r != 0) {
		sm_log_write(qmgr_ctx->qmgr_lctx,
			QM_LCAT_SCHED, QM_LMOD_SCHED,
			SM_LOG_CRIT, 4,
			"sev=CRIT, func=qmgr_sched, lock_aq=%d", r);
		goto error;
	}

	/* XXX just wait till someone wakes this up? */

	if (aq_ctx->aq_nextrun.tv_sec != 0) {
		if (timercmp(&timeval_now, &aq_ctx->aq_nextrun, <)) {
			tsk->evthr_t_sleep = aq_ctx->aq_nextrun;
			QM_LEV_DPRINTFC(QDC_SCHED, 2, (QM_DEBFP, "sev=DBG, func=qmgr_sched, delay=forced, next_try=%ld\n", aq_ctx->aq_nextrun.tv_sec));
			goto unlock;
		}

		aq_ctx->aq_nextrun.tv_usec = 0;
		aq_ctx->aq_nextrun.tv_sec = 0;
	}

	if (aq_is_empty(aq_ctx)) {
		delay.tv_usec = 0;
		delay.tv_sec = delay_next_try == 0 ? 3000 : delay_next_try;
		QM_LEV_DPRINTFC(QDC_SCHED, 2, (QM_DEBFP, "sev=DBG, func=qmgr_sched, aq=empty, next_try=%d, delay=%ld\n", delay_next_try, delay.tv_sec));
	}
	else {
		/* do something.... */
		sm_log_write(qmgr_ctx->qmgr_lctx,
			QM_LCAT_SCHED, QM_LMOD_SCHED,
			SM_LOG_INFO, 14,
			"func=qmgr_sched, aq_entries=%u, aq_t_da=%u",
			aq_ctx->aq_entries, aq_ctx->aq_t_da);
		ret = qmgr_sched_dlvry(qmgr_ctx, aq_ctx, &qsc_bits, &delay_next_try);
		if (sm_is_err(ret)) {
			/* XXX Map error type to logging category and sev? */
			err = sm_is_error(ret);
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_SCHED, QM_LMOD_SCHED,
				err ? SM_LOG_ERR : SM_LOG_WARN,
				err ? 2 :
					QMGR_IS_SFLAG(qmgr_ctx, QMGR_SFL_DA)
						? 14 : 12,
				"sev=%s, func=qmgr_sched, qmgr_sched_dlvry=%m",
				err ? "ERROR" : "WARN",
				ret);
		}

		/* XXX interval... */
		delay.tv_usec = 0;
#if QMGR_TEST
		if (SM_IS_FLAG(qmgr_ctx->qmgr_cnf.q_cnf_tests, QMGR_TEST_INT_SRC)) {
			r = aq_usage(aq_ctx, AQ_USAGE_ALL);
			if (r < qmgr_ctx->qmgr_cnf.q_cnf_max_fill_aq &&
			    0 == delay_next_try_tst)
			{
				delay.tv_sec = 0;
				delay.tv_usec = r;
			}
			else if (delay_next_try_tst < 0)
			{
				delay.tv_sec = 0;
				delay.tv_usec = (-delay_next_try_tst) * 500;
			}
			else
				delay.tv_sec = delay_next_try_tst == 0 ?
								1 : delay_next_try_tst;
sm_io_fprintf(smioerr, "usage=%d, delay_next_try_tst=%d, delay_next_try=%d, delay=%d.%06ld\n", r, delay_next_try_tst, delay_next_try, delay.tv_sec, delay.tv_usec);
		}
		else
#endif
		delay.tv_sec = delay_next_try == 0 ? 600 : delay_next_try;
		QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "sev=DBG, func=qmgr_sched, next_try=%d, delay=%ld\n", delay_next_try, delay.tv_sec));
	}

	QM_LEV_DPRINTFC(QDC_SCHED, 4, (QM_DEBFP, "sev=DBG, func=qmgr_sched, old t_sleep=%ld\n", tsk->evthr_t_sleep.tv_sec));
	timeradd(&timeval_now, &delay, &tsk->evthr_t_sleep);
	QM_LEV_DPRINTFC(QDC_SCHED, 4, (QM_DEBFP, "sev=DBG, func=qmgr_sched, timeval_now=%ld, delay=%ld, t_sleep=%ld\n", timeval_now.tv_sec, delay.tv_sec, tsk->evthr_t_sleep.tv_sec));
	if (err)
		aq_ctx->aq_nextrun = tsk->evthr_t_sleep;

  unlock:
	r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
	if (r != 0) {
		sm_log_write(qmgr_ctx->qmgr_lctx,
			QM_LCAT_SCHED, QM_LMOD_SCHED,
			SM_LOG_ERR, 1,
			"sev=ERROR, func=qmgr_sched, unlock_aq=%d", r);
		goto error;
	}

	/*
	**  notify qsc task after aq has been unlocked, otherwise
	**  there might be a deadlock (qsc task locked, waiting for aq,
	**  while this task has the lock for aq and tries to notify
	**  (which requires locking) the qsc task).
	*/

	if (qsc_bits != 0) {
		uint i;
		uint32_t j;
		qsc_ctx_P qsc_ctx;

		r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex);
		SM_LOCK_OK(r);
		if (r != 0) {
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_SMTPC, QM_LMOD_FROM_SMTPC,
				SM_LOG_CRIT, 4,
				"sev=CRIT, func=qmgr_sched, lock_qmgr=%d",
				r);
			goto error;
		}

		/* XXX HACK! Must match what qmgr_sched_dlvry does! */
		for (j = 1, i = 0; i < QM_N_SC_GLI(qmgr_ctx); i++, j *= 2)
		{
			if ((qsc_bits & j) == 0)
				continue;

			qsc_ctx = qmgr_li_sc(qmgr_ctx, i);
			if (qsc_ctx->qsc_com.rcbcom_tsk == NULL)
				continue;
			SM_IS_EVTHR_TSK(qsc_ctx->qsc_com.rcbcom_tsk);

			/* tell someone to send the tasks to the DAs */
			ret = evthr_en_wr(qsc_ctx->qsc_com.rcbcom_tsk);
			if (sm_is_err(ret)) {
				sm_log_write(qmgr_ctx->qmgr_lctx,
					QM_LCAT_SCHED, QM_LMOD_SCHED,
					SM_LOG_ERR, 1,
					"sev=ERROR, func=qmgr_sched, evthr_en_wr=%m",
					ret);
				/* XXX what now my friend? */
			}
		}
		r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex);
		SM_ASSERT(r == 0);
	}

	return EVTHR_SLPQ;

  error:
	delay.tv_usec = 0;
	delay.tv_sec = 1;
	timeradd(&timeval_now, &delay, &tsk->evthr_t_sleep);
	return EVTHR_SLPQ;
}


syntax highlighted by Code2HTML, v. 0.9.1