/*
* Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers.
* All rights reserved.
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the sendmail distribution.
*/
#include "sm/generic.h"
SM_RCSID("@(#)$Id: sched.c,v 1.263 2007/06/24 02:52:57 ca Exp $")
#include "sm/error.h"
#include "sm/assert.h"
#include "sm/memops.h"
#include "sm/io.h"
#include "sm/rcb.h"
#include "sm/reccom.h"
#include "sm/da.h"
#include "sm/das.h"
#include "sm/qmgr.h"
#include "sm/qmgr-int.h"
#include "sm/misc.h"
#include "qmgr.h"
#include "sm/aqrdq.h"
#include "log.h"
/*
** Which access methods to recipients does the scheduler need?
** DA/Host: for session reuse.
** SMTPS TA-Id: to find recipients for the same TA (that could be
** be delivered in one TA if the DA/Host is the same).
**
** Which other (recipient) data does the scheduler need?
** Number of recipients/TAs destined for the same DA/Host.
*/
/*
** Can both recipients be delivered in the same transaction?
** They are from the same incoming transaction, but do they share same
** delivery information (MX piggypacking?)
** Bounces can't be piggypacked: they have content in the rcpt itself.
*/
#define SAME_TRANSACTION(aq_rcpt, aq_rcpt_nxt) \
((aq_rcpt)->aqr_addrs[(aq_rcpt)->aqr_addr_cur].aqra_ipv4 == \
(aq_rcpt_nxt)->aqr_addrs[(aq_rcpt_nxt)->aqr_addr_cur].aqra_ipv4 \
&& (aq_rcpt)->aqr_da_idx == (aq_rcpt_nxt)->aqr_da_idx \
&& (aq_rcpt)->aqr_port == (aq_rcpt_nxt)->aqr_port \
&& (aq_rcpt)->aqr_owner_idx == (aq_rcpt_nxt)->aqr_owner_idx \
&& !AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_DSN) \
&& !AQR_IS_FLAG(aq_rcpt_nxt, AQR_FL_IS_DSN) \
&& !AQR_IS_FLAG(aq_rcpt, AQR_FL_HAS_VERP) \
&& !AQR_IS_FLAG(aq_rcpt_nxt, AQR_FL_HAS_VERP) \
)
/* primitive version of setting an error value if it isn't set already */
#define SM_SET_RET(res, ret) do { \
if (sm_is_err(ret) && !sm_is_err(res)) \
(res) = (ret); \
} while (0)
/*
** QM_AR_ACTSCHED -- Decide whether to activate scheduler after getting
** result from AR.
**
** Parameters:
** qmgr_ctx -- QMGR context
** aq_rcpt -- AQ recipient
** pnotify_sched -- (pointer to) notify scheduler? (output)
**
** Returns:
** usual sm_error code; only (un)lock
**
** Last code review: 2005-04-04 20:39:02
** Last code change: 2005-04-04 20:37:48
*/
sm_ret_T
qm_ar_actsched(qmgr_ctx_P qmgr_ctx, aq_rcpt_P aq_rcpt, bool *pnotify_sched)
{
#undef SMFCT
#define SMFCT "qm_ar_actsched"
sm_ret_T ret;
int r;
occ_entry_P occ_entry;
SM_IS_QMGR_CTX(qmgr_ctx);
SM_IS_AQ_RCPT(aq_rcpt);
SM_REQUIRE(pnotify_sched != NULL);
/* default: activate scheduler */
*pnotify_sched = true;
ret = SM_SUCCESS;
if (!AQR_IS_FLAG(aq_rcpt, AQR_FL_ARF) && aq_rcpt->aqr_addrs != NULL) {
occ_entry = NULL;
SM_ASSERT(aq_rcpt->aqr_addr_cur < aq_rcpt->aqr_addr_max);
ret = occ_entry_find(qmgr_ctx->qmgr_occ_ctx->occx_ht,
aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4,
&occ_entry, &qmgr_ctx->qmgr_occ_ctx->occx_mutex,
THR_LOCK_UNLERR);
if (SM_SUCCESS == ret && occ_entry != NULL) {
if (occ_entry->occe_open_se >= occ_entry->occe_cur_conc
&& occ_entry->occe_open_se == occ_entry->occe_open_ta)
{
#if 0
QM_LEV_DPRINTFC(QDC_SCHED, 2, (QM_DEBFP,
"func=qm_ar_actsched, open_se=%d, cur_conc=%d, open_se=%d, open_ta=%d\n"
, occ_entry->occe_open_se
, occ_entry->occe_cur_conc
, occ_entry->occe_open_se
, occ_entry->occe_open_ta));
#endif /* 0 */
*pnotify_sched = false;
OCCE_SET_FLAG(occ_entry, OCCE_FL_SE_WAIT);
}
r = smthread_mutex_unlock(&qmgr_ctx->qmgr_occ_ctx->occx_mutex);
SM_ASSERT(r == 0);
if (r != 0)
ret = sm_error_perm(SM_EM_Q_SCHED, r);
}
else
ret = SM_SUCCESS;
}
#if 0
/* else: scheduler might be waiting for result, see below: */
if (!((aq_ta->aqt_rcpts_ar == 0)
|| (aq_ta->aqt_rcpts_ar <= (aq_ta->aqt_rcpts_inaq >> 1)
&& (aq_ctx->aq_entries < (aq_ctx->aq_t_da >> 4)))
#endif /* 0 */
return ret;
}
/*
** QM_SC_CONF -- find smtpc configuration data for server
**
** Parameters:
** qmgr_ctx -- QMGR context
** ipv4 -- IPv4 address of server
** dadb_entry -- DA DB entry
**
** Returns:
** usual sm_error code
**
** Last code review:
** Last code change:
*/
static sm_ret_T
qm_sc_conf(qmgr_ctx_P qmgr_ctx, ipv4_T ipv4, dadb_entry_P dadb_entry)
{
#if MTA_USE_TLS
#undef SMFCT
#define SMFCT "qm_sc_conf"
sm_ret_T ret;
dadb_entry->dadbe_se_maprescnf = sm_err_perm(SM_E_NOTFOUND);
if (NULL == qmgr_ctx->qmgr_conf_map ||
!QCNF_IS_FLAG(qmgr_ctx, QCNF_FL_SC_LKP_SE_CONF))
{
return SM_SUCCESS;
}
if (NULL == dadb_entry->dadbe_se_conf)
dadb_entry->dadbe_se_conf = sm_str_new(NULL, 256, 1024); /*check size*/
if (NULL == dadb_entry->dadbe_lhs)
dadb_entry->dadbe_lhs = sm_str_new(NULL, 10, 16); /* check size */
if (NULL == dadb_entry->dadbe_tag)
dadb_entry->dadbe_tag = sm_str_new(NULL, 16, 32); /* check size */
if (NULL == dadb_entry->dadbe_se_conf ||
NULL == dadb_entry->dadbe_lhs ||
NULL == dadb_entry->dadbe_tag)
{
return sm_err_temp(ENOMEM);
}
sm_str_clr(dadb_entry->dadbe_se_conf);
sm_str_clr(dadb_entry->dadbe_lhs);
sm_str_clr(dadb_entry->dadbe_tag);
#define SC_SE_TAG "smtpc_sess_conf:"
ret = sm_err_temp(SM_E_OVFLW_NS);
if (sm_str_scat(dadb_entry->dadbe_tag, SC_SE_TAG) == SM_SUCCESS
&& sm_inet_ipv4str(ipv4, dadb_entry->dadbe_lhs) == SM_SUCCESS
&& (ret = sm_map_lookup_ip(qmgr_ctx->qmgr_conf_map,
dadb_entry->dadbe_lhs, dadb_entry->dadbe_tag,
SMMAP_LFL_SUBNETS|SMMAP_LFL_TAG,
dadb_entry->dadbe_se_conf)) == SM_SUCCESS
&& sm_str_getlen(dadb_entry->dadbe_se_conf) > 0)
{
QM_LEV_DPRINTFC(QDC_S2Q, 4, (QM_DEBFP, "func=%s, ipv4=%A, %slookup=%r, rhs=%S\n", SMFCT, ipv4, SC_SE_TAG, ret, dadb_entry->dadbe_se_conf));
dadb_entry->dadbe_se_maprescnf = ret;
}
return ret;
#else /* MTA_USE_TLS */
return SM_SUCCESS;
#endif /* MTA_USE_TLS */
}
/*
** QM_TO_SC_TASK -- create one session with one task for SMTPC
** XXX HACK just one session with one transaction with one (or more)
** recipients (see qm_to_sc_add_rcpt())
**
** Parameters:
** qsc_ctx -- QMGR SMTPC context
** se_reuse -- reuse an open session?
** one_ta -- only one transaction? (close session after this TA?)
** aq_ta -- AQ transaction
** aq_rcpt -- AQ recipient
** rcbe -- RCB entry
** dadb_entry -- DA DB entry
**
** Returns:
** usual sm_error code; see sm_rcb_putv()
**
** Side Effects: does not clean up rcb in case of an error.
**
** Last code review: 2005-04-04 21:03:45
** Last code change: 2006-02-21 05:55:16
*/
static sm_ret_T
qm_to_sc_task(qsc_ctx_P qsc_ctx, bool se_reuse, bool one_ta, aq_ta_P aq_ta, aq_rcpt_P aq_rcpt, sm_rcbe_P rcbe, dadb_entry_P dadb_entry)
{
sm_rcb_P rcb;
sm_ret_T ret;
uint32_t cdb_rt, ta_flags, se_flags;
bool dsn;
extern sm_str_P NullSender;
sm_str_P sender, verp;
SM_ASSERT(!aq_rcpt_has_owner(aq_rcpt) ||
(aq_rcpt->aqr_owner_idx > 0 &&
aq_rcpt->aqr_owner_idx <= aq_ta->aqt_owners_n));
verp = NULL;
rcb = &rcbe->rcbe_rcb;
/* treat double bounce differently?? */
dsn = AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_DSN);
se_flags = ta_flags = 0;
cdb_rt = RT_Q2C_CDBID;
if (dsn &&
(QCNF_IS_FLAG(qsc_ctx->qsc_qmgr_ctx, QCNF_FL_B_HDR) ||
(AQ_TA_IS_FLAG(aq_ta, AQ_TA_FL_NO_BODY) &&
QCNF_IS_FLAG(qsc_ctx->qsc_qmgr_ctx, QCNF_FL_B_BODY_HDR))))
{
cdb_rt = RT_Q2C_CDBID_HDR;
ta_flags |= DA_FL_HDR_ONLY;
}
if (dsn && QCNF_IS_FLAG(qsc_ctx->qsc_qmgr_ctx, QCNF_FL_B_DSN_MIME))
ta_flags |= DA_FL_DSN_MIME;
if (!one_ta)
se_flags |= DA_FL_SE_KEEP;
if (se_reuse)
se_flags |= DA_FL_SE_REUSE_EX;
if (dsn)
sender = NullSender;
else if (AQR_IS_FLAG(aq_rcpt, AQR_FL_HAS_VERP) && aq_rcpt_has_owner(aq_rcpt))
{
size_t len;
sm_str_P owner;
owner = aq_ta->aqt_owners_pa[aq_rcpt->aqr_owner_idx - 1];
len = sm_str_getlen(owner) + sm_str_getlen(aq_rcpt->aqr_pa);
verp = sm_str_new(NULL, len, len + 4);
if (NULL == verp) {
ret = sm_err_temp(ENOMEM);
goto error;
}
ret = sm_verpify(owner, aq_rcpt->aqr_pa, '\0', '\0', verp);
if (sm_is_err(ret)) goto error;
sender = verp;
}
else if (AQ_TA_IS_FLAG(aq_ta, AQ_TA_FL_VERP)) {
size_t len;
len = sm_str_getlen(aq_ta->aqt_mail->aqm_pa) + sm_str_getlen(aq_rcpt->aqr_pa);
verp = sm_str_new(NULL, len, len + 4);
if (NULL == verp) {
ret = sm_err_temp(ENOMEM);
goto error;
}
ret = sm_verpify(aq_ta->aqt_mail->aqm_pa, aq_rcpt->aqr_pa, '\0', '\0', verp);
if (sm_is_err(ret)) goto error;
sender = verp;
}
else if (aq_rcpt_has_owner(aq_rcpt))
sender = aq_ta->aqt_owners_pa[aq_rcpt->aqr_owner_idx - 1];
else
sender = aq_ta->aqt_mail->aqm_pa;
ret = sm_rcb_putv(rcb, RCB_PUTV_FIRST,
SM_RCBV_INT, RT_PROT_VER, PROT_VER_RT,
SM_RCBV_INT, RT_Q2C_ID, qsc_ctx->qsc_id,
#ifdef RT_Q2C_DCID
SM_RCBV_INT, RT_Q2C_DCID, 0, /* delivery class, cur. unused */
#endif
SM_RCBV_INT, RT_Q2C_SE_FLAGS, se_flags,
SM_RCBV_BUF,
se_reuse ? (one_ta ? RT_Q2C_SEID_1TA : RT_Q2C_SEID)
: (one_ta ? RT_Q2C_NSEID_1TA : RT_Q2C_NSEID),
dadb_entry->dadbe_da_se_id, SMTP_STID_SIZE,
SM_RCBV_INT,
(0 == aq_rcpt->aqr_port) ? RT_NOSEND : RT_Q2C_SRVPORT,
(uint32_t) aq_rcpt->aqr_port,
SM_RCBV_INT,
(DA_IDX_ESMTP == aq_rcpt->aqr_da_idx) ? RT_NOSEND : RT_Q2C_DA_IDX,
aq_rcpt->aqr_da_idx,
/* XXX HACK XXX */
SM_RCBV_INT, RT_Q2C_SRVIPV4,
aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4,
SM_RCBV_BUF,
AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_DLY)
? RT_Q2C_NTAIDD : AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_BNC)
? RT_Q2C_NTAIDB : AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_DBNC)
? RT_Q2C_NTAIDDB : RT_Q2C_NTAID,
dadb_entry->dadbe_da_ta_id, SMTP_STID_SIZE,
SM_RCBV_BUF, RT_Q2C_SSTAID, aq_ta->aqt_ss_ta_id, SMTP_STID_SIZE,
SM_RCBV_OFF, RT_Q2C_SIZE_B, aq_ta->aqt_msg_sz_b,
SM_RCBV_STR, RT_Q2C_MAIL, sender,
SM_RCBV_CSTR, cdb_rt, aq_ta->aqt_cdb_id,
SM_RCBV_INT, RT_Q2C_TA_FLAGS, ta_flags,
SM_RCBV_END);
#if MTA_USE_TLS
if (sm_is_success(ret) &&
sm_is_success(dadb_entry->dadbe_se_maprescnf) &&
dadb_entry->dadbe_se_conf != NULL)
{
ret = sm_rcb_putv(rcb, RCB_PUTV_NONE,
SM_RCBV_INT, RT_Q2C_MAP_RES_CNF_SRV, dadb_entry->dadbe_se_maprescnf,
SM_RCBV_STR, RT_Q2C_RHS_CNF_SRV, dadb_entry->dadbe_se_conf,
SM_RCBV_END);
}
#endif /* MTA_USE_TLS */
if (sm_is_success(ret))
ret = sm_hdrmodl_wr(aq_ta->aqt_hdrmodhd, rcb, RT_Q2C_HM_T_P, RT_Q2C_HM_HDR);
if (sm_is_success(ret))
ret = sm_rcb_putv(rcb, RCB_PUTV_NONE,
SM_RCBV_INT, RT_Q2C_RCPT_IDX, aq_rcpt->aqr_idx,
SM_RCBV_STR, RT_Q2C_RCPT, aq_rcpt->aqr_pa,
SM_RCBV_END);
#if MTA_USE_TLS
if (sm_is_success(ret) &&
sm_is_success(aq_rcpt->aqr_maprescnf) &&
aq_rcpt->aqr_conf != NULL)
{
ret = sm_rcb_putv(rcb, RCB_PUTV_NONE,
SM_RCBV_INT, RT_Q2C_MAP_RES_CNF_RCPT, aq_rcpt->aqr_maprescnf,
SM_RCBV_STR, RT_Q2C_RHS_CNF_RCPT, aq_rcpt->aqr_conf,
SM_RCBV_END);
}
#endif /* MTA_USE_TLS */
QM_LEV_DPRINTFC(QDC_SCHED, 1, (QM_DEBFP, "func=qm_to_sc_task, ss_ta=%s, cdb=%s, da_ta=%s, ta_flags=%#x, dsn=%d, ret=%r\n",
aq_ta->aqt_ss_ta_id, sm_cstr_data(aq_ta->aqt_cdb_id),
dadb_entry->dadbe_da_ta_id, aq_ta->aqt_flags, dsn, ret));
if (sm_is_err(ret)) goto error;
if (dsn) {
ret = sm_rcb_putv(rcb, RCB_PUTV_NONE,
SM_RCBV_STR, RT_Q2C_B_MSG, aq_rcpt->aqr_dsn_msg,
SM_RCBV_END);
if (sm_is_err(ret)) goto error;
AQ_TA_SET_FLAG(aq_ta, AQ_TA_FL_NO_BODY);
}
SM_STR_FREE(verp);
return ret;
error:
SM_STR_FREE(verp);
QM_LEV_DPRINTFC(QDC_SCHED, 0, (QM_DEBFP, "sev=ERROR, func=qm_to_sc_task, ret=%r\n", ret));
return ret;
}
#if QMGR_DEBUG > 1
void
aq_rcpt_print(aq_rcpt_P aq_rcpt)
{
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aq_rcpt=%p\n", aq_rcpt));
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "ss_ta=%s\n", aq_rcpt->aqr_ss_ta_id));
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "rcpt=%S\n", aq_rcpt->aqr_pa));
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqr_idx=%u\n", aq_rcpt->aqr_idx));
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqr_tries=%u\n", aq_rcpt->aqr_tries));
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "srv_ipv4=%A\n", aq_rcpt->aqr_addrs == NULL ? -1 : aq_rcpt->aqr_addrs[0].aqra_ipv4));
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqr_da_idx=%u\n", aq_rcpt->aqr_da_idx));
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqr_st_time=%ld\n", aq_rcpt->aqr_st_time));
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqr_last_try=%ld\n", aq_rcpt->aqr_last_try));
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqr_next_try=%ld\n", aq_rcpt->aqr_next_try));
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqr_status=%d\n", aq_rcpt->aqr_status));
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqr_flags=%#x\n", aq_rcpt->aqr_flags));
QM_LEV_DPRINTFC(QDC_SCHED, 5, (QM_DEBFP, "aqr_ss_link.sm_rg_succ=%p\n", AQR_SS_SUCC(aq_rcpt)));
QM_LEV_DPRINTFC(QDC_SCHED, 5, (QM_DEBFP, "aqr_ss_link.sm_rg_pred=%p\n", AQR_SS_PRED(aq_rcpt)));
QM_LEV_DPRINTFC(QDC_SCHED, 5, (QM_DEBFP, "aqr_da_link.sm_rg_succ=%p\n", AQR_DA_SUCC(aq_rcpt)));
QM_LEV_DPRINTFC(QDC_SCHED, 5, (QM_DEBFP, "aqr_da_link.sm_rg_pred=%p\n", AQR_DA_PRED(aq_rcpt)));
}
void
aq_ta_print(aq_ta_P aq_ta)
{
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aq_ta=%p\n", aq_ta));
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "ss_ta=%s\n", aq_ta->aqt_ss_ta_id));
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "mail=%S\n", aq_ta->aqt_mail->aqm_pa));
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "time=%ld\n", (long) aq_ta->aqt_st_time));
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "cdb=%s\n", sm_cstr_data(aq_ta->aqt_cdb_id)));
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "rcpts=%u\n", aq_ta->aqt_rcpts_inaq));
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "rcpts_tot=%u\n", aq_ta->aqt_rcpts_tot));
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "rcpts_left=%u\n", aq_ta->aqt_rcpts_left));
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "rcpts_temp=%u\n", aq_ta->aqt_rcpts_temp));
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "rcpts_perm=%u\n", aq_ta->aqt_rcpts_perm));
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "state=%d\n", aq_ta->aqt_state));
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqt_rcpts_ar=%d\n", aq_ta->aqt_rcpts_ar));
}
#endif /* QMGR_DEBUG > 1 */
/*
** QM_TO_SC_ADD_RCPT -- add another recipient to a transaction for SMTPC
**
** Parameters:
** qsc_ctx -- QMGR SMTPC context (currently unused)
** aq_rcpt -- AQ recipient
** rcbe -- RCB entry
**
** Returns:
** usual sm_error code; see sm_rcb_putv()
**
** Side Effects: does not clean up rcb in case of an error.
**
** Last code review: 2005-04-04 21:05:04
** Last code change:
*/
static sm_ret_T
qm_to_sc_add_rcpt(qsc_ctx_P qsc_ctx, aq_rcpt_P aq_rcpt, sm_rcbe_P rcbe)
{
sm_rcb_P rcb;
sm_ret_T ret;
rcb = &rcbe->rcbe_rcb;
ret = sm_rcb_putv(rcb, RCB_PUTV_NONE,
SM_RCBV_INT, RT_Q2C_RCPT_IDX, aq_rcpt->aqr_idx,
SM_RCBV_STR, RT_Q2C_RCPT, aq_rcpt->aqr_pa,
SM_RCBV_END);
#if MTA_USE_TLS
if (sm_is_success(ret) &&
sm_is_success(aq_rcpt->aqr_maprescnf) &&
aq_rcpt->aqr_conf != NULL)
{
ret = sm_rcb_putv(rcb, RCB_PUTV_NONE,
SM_RCBV_INT, RT_Q2C_MAP_RES_CNF_RCPT, aq_rcpt->aqr_maprescnf,
SM_RCBV_STR, RT_Q2C_RHS_CNF_RCPT, aq_rcpt->aqr_conf,
SM_RCBV_END);
}
#endif /* MTA_USE_TLS */
if (sm_is_err(ret)) goto error;
return ret;
error:
QM_LEV_DPRINTFC(QDC_SCHED, 0, (QM_DEBFP, "sev=ERROR, func=qm_to_sc_add_rcpt, ret=%r\n", ret));
return ret;
}
/*
** QMGR_FIND_DA -- Find an available DA
** NOTE: this requires that all DAs can do all kind of deliveries!
**
** Parameters:
** qmgr_ctx -- QMGR context
** pqsc_bit -- (pointer to) QSC bit (output)
** pqsc_ctx -- (pointer to) QSC context (output)
** if this points to a valid qsc_ctx: don't return the
** same one.
** pda_avail -- (pointer to) how many entries are free? (output)
** pda_idle -- (pointer to) how many entries are idle? (output)
**
** Returns:
** usual sm_error code; SM_E_NO_DA, (un)lock
**
** Locking: locks qmgr_ctx
**
** Last code review: 2005-04-04 22:07:13
** Last code change: 2006-01-29 05:03:32
*/
static sm_ret_T
qmgr_find_da(qmgr_ctx_P qmgr_ctx, uint32_t *pqsc_bit, qsc_ctx_P *pqsc_ctx, uint *pda_avail, uint *pda_idle)
{
int i, r;
uint32_t j;
uint da_avail, da_idle;
sm_ret_T ret;
qsc_ctx_P qsc_ctx;
SM_REQUIRE(pqsc_ctx != NULL);
SM_REQUIRE(pqsc_bit != NULL);
*pqsc_bit = 0;
r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex);
SM_LOCK_OK(r);
if (r != 0) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SMTPS, QM_LMOD_FROM_SMTPS,
SM_LOG_CRIT, 4,
"sev=CRIT, func=qmgr_find_da, lock=%d\n", r);
return sm_error_temp(SM_EM_Q_SCHED, r);
}
ret = sm_error_warn(SM_EM_Q_SCHED, SM_E_NO_DA); /* XXX */
for (j = 1, i = 0; i < QM_N_SC_GLI(qmgr_ctx); i++, j *= 2)
{
if ((qmgr_ctx->qmgr_sc_li.qm_gli_used & j) != 0
&& (qsc_ctx = qmgr_li_sc(qmgr_ctx, i)) != NULL
&& qsc_ctx->qsc_com.rcbcom_tsk != NULL
&& QSC_IS_RUNNING(qsc_ctx)
&& qsc_ctx != *pqsc_ctx
)
{
/* check whether the DA has free capacity */
(void) dadb_entry_avail(qsc_ctx->qsc_dadb_ctx, &da_avail, &da_idle, THR_LOCK_UNLOCK);
if (da_avail > 0 || da_idle > 0) {
if (pda_avail != NULL)
*pda_avail = da_avail;
if (pda_idle != NULL)
*pda_idle = da_idle;
*pqsc_ctx = qsc_ctx;
*pqsc_bit = j;
ret = SM_SUCCESS;
break;
}
}
}
r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex);
SM_ASSERT(r == 0);
SM_ASSERT(ret != SM_SUCCESS || *pqsc_ctx != NULL);
if (r != 0 && sm_is_success(ret))
ret = sm_error_perm(SM_EM_Q_SCHED, r);
return ret;
}
/*
** SCHED_MV2NXTMX -- Maybe move rcpt to next MX (if same priority)
**
** Parameters:
** qmgr_ctx -- QMGR context
** aq_ctx -- AQ context
** aq_rcpt -- AQ recipient
** time_now -- current time
**
** Returns:
** usual sm_error code; ENOMEM, SM_E_UNEXPECTED et.al.
**
** Locking: aq_ctx must be locked
**
** Side Effects: may remove aq_rcpt from RDQ without adding it
** to next one due to resource problem
**
** Last code review: 2005-04-04 22:19:52
** Last code change:
*/
static sm_ret_T
sched_mv2nxtmx(qmgr_ctx_P qmgr_ctx, aq_ctx_P aq_ctx, aq_rcpt_P aq_rcpt, time_T time_now)
{
sm_ret_T ret;
uint addr_cur;
ret = SM_SUCCESS;
addr_cur = aq_rcpt->aqr_addr_cur;
if (AQR_MORE_ADDR(aq_rcpt) &&
aq_rcpt->aqr_addrs[addr_cur].aqra_pref ==
aq_rcpt->aqr_addrs[addr_cur + 1].aqra_pref &&
aq_rcpt->aqr_addrs[addr_cur].aqra_expt >= time_now)
{
ret = aq_rdq_rm(aq_ctx, aq_rcpt, THR_NO_LOCK, qmgr_ctx->qmgr_lctx);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_ERR, 1,
"sev=ERROR, func=sched_mv2nxtmx, aq_rdq_rm=%m, flags=%#x",
ret, aq_rcpt->aqr_flags);
return ret;
}
aq_rcpt->aqr_addr_cur++;
ret = aq_rdq_add(aq_ctx, aq_rcpt, NULL /* &aqrdq_flags */, THR_NO_LOCK);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_ERR, 1,
"sev=ERROR, func=sched_mv2nxtmx, aq_rdq_add=%m, flags=%#x",
ret, aq_rcpt->aqr_flags);
return ret;
}
}
return ret;
}
/*
** SCHED2LONG -- AQ rcpt takes too long to schedule
**
** Parameters:
** qmgr_ctx -- QMGR context
** aq_ctx -- AQ context
** aq_rcpt -- AQ recipient
** time_now -- current time
**
** Returns:
** usual sm_error code
**
** Locking: aq_ctx must be locked
**
** Last code review:
** Last code change:
*/
static sm_ret_T
sched2long(qmgr_ctx_P qmgr_ctx, aq_ctx_P aq_ctx, aq_rcpt_P aq_rcpt, time_T time_now)
{
sm_ret_T ret;
ret = SM_SUCCESS;
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_ERR, 1,
"sev=ERROR, func=sched2long, ss_ta=%s, idx=%u, now-entered=%ld, status=timeout_in_scheduler"
, aq_rcpt->aqr_ss_ta_id
, aq_rcpt->aqr_idx
, (long) (time_now - aq_rcpt->aqr_entered));
/*
** XXX AND NOW?
** what's the simplest way to remove this from AQ?
** move it from rcpt dest queue to wait queue and
** "cheat" on the timeout ("immediate")?
*/
++aq_ctx->aq_t_da;
AQR_DA_INIT(aq_rcpt);
AQR_SET_FLAG(aq_rcpt, AQR_FL_SCHEDF);
/* Was: rdq from todo to busy? check cleanup code (and how rdq is used) */
ret = aq_rdq_rm(aq_ctx, aq_rcpt, THR_NO_LOCK, qmgr_ctx->qmgr_lctx);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_ERR, 1,
"sev=ERROR, func=sched2long, aq_rdq_rm=%m, flags=%#x",
ret, aq_rcpt->aqr_flags);
return ret;
}
/* ??? use DA waitq for this? or yet another queue??? */
ret = aq_waitq_add(aq_ctx, aq_rcpt, time_now, AQWQ_DA, THR_NO_LOCK);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_ERR, 1,
"sev=ERROR, func=sched2long, aq_waitq_add=%m",
ret);
return ret;
}
ret = qmgr_set_aq_cleanup(qmgr_ctx->qmgr_cleanup_ctx, time_now, true);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_ERR, 1,
"sev=ERROR, func=sched2long, qmgr_set_aq_cleanup=%m",
ret);
return ret;
}
return ret;
}
/*
** SAMEDOMAINOK -- Are all recipients for the same domain ready for scheduling?
**
** Parameters:
** aq_rcpt -- AQ recipient
** defaultdomain -- domain to use if recipient doesn't have one
**
** Returns:
** true: all ok
** false: at least one is not yet ready
**
** Locking: aq_ctx must be locked
**
** Last code review: 2005-04-04 23:49:13
** Last code change: 2006-06-11 04:14:31
*/
static bool
samedomainok(aq_rcpt_P aq_rcpt, sm_str_P defaultdomain)
{
aq_rcpt_P aq_rcpt_nxt;
bool ok;
/*
** Check whether all recipients with the same domain
** are ready for scheduling.
** Notes:
** 1. it is necessary to look in both directions.
** this assumes that recipients for the same domain
** are treated "similar" by smar; it is not required
** that they actually have the same destination,
** but if it takes much longer for some of the
** recipients to determine the resolved addresses
** than for others (even with the same domain! i.e.,
** routing based on other criteria), then this may
** add unnecessary delays.
** 2. this function requires that the rcpts are sorted by domain.
*/
ok = true;
for (aq_rcpt_nxt = AQR_SS_SUCC(aq_rcpt);
ok && aq_rcpt_nxt != aq_rcpt &&
aq_rcpt_eq_domain(aq_rcpt, aq_rcpt_nxt, defaultdomain)
== SM_SUCCESS;
aq_rcpt_nxt = AQR_SS_SUCC(aq_rcpt_nxt))
{
ok = AQR_IS_FLAG(aq_rcpt_nxt, AQR_FL_RDY4DLVRY);
}
for (aq_rcpt_nxt = AQR_SS_PRED(aq_rcpt);
ok && aq_rcpt_nxt != aq_rcpt &&
aq_rcpt_eq_domain(aq_rcpt, aq_rcpt_nxt, defaultdomain)
== SM_SUCCESS;
aq_rcpt_nxt = AQR_SS_PRED(aq_rcpt_nxt))
{
ok = AQR_IS_FLAG(aq_rcpt_nxt, AQR_FL_RDY4DLVRY);
}
return ok;
}
/*
** QMGR_RPCTS_TA -- return max number of recipients for this transaction
**
** Parameters:
** qmgr_ctx -- QMGR context
** aq_rcpt -- AQ recipient
**
** Returns:
** max number of recipients for this transaction
*/
static uint
qmgr_rpcts_ta(qmgr_ctx_P qmgr_ctx, aq_rcpt_P aq_rcpt)
{
SM_IS_QMGR_CTX(qmgr_ctx);
SM_IS_AQ_RCPT(aq_rcpt);
if (aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4
== LMTP_IPV4_ADDR)
return qmgr_ctx->qmgr_cnf.q_cnf_lmtp_rcpts_ta;
return qmgr_ctx->qmgr_cnf.q_cnf_smtp_rcpts_ta;
}
/*
** QM_SESS_KEEP_OPEN -- is it useful to keep a session open?
**
** Parameters:
** qmgr_ctx -- QMGR context
** aq_ta -- AQ transaction
** aq_rcpt -- AQ recipient
** max_rcpts_ta -- max recipients per transaction
**
** Returns:
** true iff is it useful to keep a session open
*/
static bool
qm_sess_keep_open(qmgr_ctx_P qmgr_ctx, aq_ta_P aq_ta, aq_rcpt_P aq_rcpt, uint max_rcpts_ta)
{
uint nrcpts;
aq_rcpt_P aq_rcpt_nxt;
if (!QCNF_IS_FLAG(qmgr_ctx, QCNF_FL_SE_REUSE))
return false;
aq_rcpt_nxt = AQ_RDQ_NEXT(aq_rcpt);
if (aq_rcpt_nxt == NULL || aq_rcpt_nxt == aq_rcpt)
return false;
for (nrcpts = 0;
aq_rcpt_nxt != NULL && aq_rcpt_nxt != aq_rcpt && nrcpts < max_rcpts_ta;
aq_rcpt_nxt = AQ_RDQ_NEXT(aq_rcpt_nxt))
{
if (AQ_TA_IS_FLAG(aq_ta, AQ_TA_FL_VERP)
&& AQR_IS_FLAG(aq_rcpt, AQR_FL_HAS_VERP)
&& SAME_TRANSACTION(aq_rcpt, aq_rcpt_nxt))
return false;
++nrcpts;
}
return true;
}
/*
** QMGR_SCHED_DLVRY -- simple scheduler...
**
** Parameters:
** qmgr_ctx -- QMGR context
** aq_ctx -- AQ context
** pqsc_bits -- (bitmask) which DAs need to be started (output)
** pdelay_next_try -- (pointer to) delay until next try (output)
**
** Returns:
** usual sm_error code
**
** Locking: aq_ctx must be locked
**
** Todo: this is not very sophisticated.
** split this in multiple functions? it's too long...
**
** Last code review:
** Last code change:
*/
/*
** Delay for scheduler after encountering too many open connections.
** The scheduler should be woken up after a TA close RCB is received
** from a DA for which a TA is waiting. This data should be recorded
** in the DA connection cache.
*/
#define DELAY_TOO_MANY 300
#define SET_DELAY_NEXT_TRY(d, where) \
do \
{ \
if ((d) < *pdelay_next_try || *pdelay_next_try == 0) \
{ \
*pdelay_next_try = (d); \
QM_LEV_DPRINTFC(QDC_SCHED, 5, (QM_DEBFP, "func=qmgr_sched_dlvry, where=%s, delay=%d\n", (where), (d))); \
} \
} while (0)
static sm_ret_T
qmgr_sched_dlvry(qmgr_ctx_P qmgr_ctx, aq_ctx_P aq_ctx, uint32_t *pqsc_bits, int *pdelay_next_try)
{
#undef SMFCT
#define SMFCT "qmgr_sched_dlvry"
sm_ret_T ret, res;
aq_rcpt_P aq_rcpt, aq_rcpt_nxt;
aq_ta_P aq_ta;
qsc_ctx_P qsc_ctx;
sm_rcbe_P rcbe;
dadb_entry_P dadb_entry;
aq_rsnd_ctx_P aq_rsnd_ctx;
uint i, nrcpts, connlimit, conncur, max_rcpts_ta, da_avail, da_idle;
int r;
uint32_t qsc_bit;
bool stopit, se_reuse, se_keep_open, occe_locked;
time_T time_now;
sessta_id_T da_ta_id;
occ_entry_P occ_entry;
aqrdq_ctx_P aqrdq;
uint todo_entries, rdq_entries;
/*
** Macro to move rcpt to end of list unless it's there already.
*/
#define SM_MV_RCPT2END \
if (aq_rcpt == AQ_RDQ_LAST(aqrdq->aqrdq_rcpts)) \
break; \
AQ_RDQ_REMOVE(aqrdq->aqrdq_rcpts, aq_rcpt); \
AQ_RDQ_INSERT_TAIL(aqrdq->aqrdq_rcpts, aq_rcpt); \
continue
#define QM_SCHED_RCBE_FREE \
while (rcbe != NULL) \
{ \
aq_rsnd_ctx_free(aq_rsnd_ctx); \
aq_rsnd_ctx = NULL; \
sm_rcbe_free(rcbe); \
rcbe = NULL; \
}
SM_IS_QMGR_CTX(qmgr_ctx);
SM_IS_AQ(aq_ctx);
SM_REQUIRE(pqsc_bits != NULL);
SM_REQUIRE(pdelay_next_try != NULL);
*pqsc_bits = 0;
da_ta_id[0] = '\0';
rcbe = NULL;
aq_rsnd_ctx = NULL;
occe_locked = stopit = false;
max_rcpts_ta = INT_MAX / 2;
/*
** Do we have a DA? Just one...
** Note: this may sleep() while holding AQ mutex, but only
** if there was never a DA available before (QMGR_SFL_HAD_DA),
** i.e., during startup.
*/
qsc_ctx = NULL;
while (!sm_is_success(ret =
qmgr_find_da(qmgr_ctx, &qsc_bit, &qsc_ctx, NULL, NULL)))
{
if (qmgr_ctx->qmgr_st_time + qmgr_ctx->qmgr_cnf.q_cnf_wait4clt
<= evthr_time(qmgr_ctx->qmgr_ev_ctx)
|| QMGR_IS_SFLAG(qmgr_ctx, QMGR_SFL_HAD_DA))
{
if (!QMGR_IS_SFLAG(qmgr_ctx, QMGR_SFL_DA)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_WARN, 8,
"sev=WARN, func=qmgr_sched_dlvry, status=no delivery agent available");
QMGR_SET_SFLAG(qmgr_ctx, QMGR_SFL_DA);
}
qmgr_ctx->qmgr_tm_no_da = evthr_time(qmgr_ctx->qmgr_ev_ctx);
return sm_error_warn(SM_EM_Q_SCHED, SM_E_NO_DA);
}
sleep(1);
}
if (!QMGR_IS_SFLAG(qmgr_ctx, QMGR_SFL_HAD_DA))
QMGR_SET_SFLAG(qmgr_ctx, QMGR_SFL_HAD_DA);
QMGR_CLR_SFLAG(qmgr_ctx, QMGR_SFL_DA);
SM_IS_QSC_CTX(qsc_ctx);
QM_LEV_DPRINTFC(QDC_SCHED, 7, (QM_DEBFP, "func=qmgr_sched_dlvry, qsc_ctx=%p\n", qsc_ctx));
SM_IS_EVTHR_TSK(qsc_ctx->qsc_com.rcbcom_tsk);
#if 0
// r = pthread_mutex_lock(&qsc_ctx->qsc_mutex);
// SM_LOCK_OK(r);
// if (r != 0)
// {
// sm_log_write(qmgr_ctx->qmgr_lctx,
// QM_LCAT_SCHED, QM_LMOD_SCHED,
// SM_LOG_CRIT, 1,
// "sev=CRIT, func=qmgr_sched_dlvry, lock_qsc=%d", r);
// }
// else
// {
// /* XXX might be able to use an existing session */
// if (qsc_ctx->qsc_curactive >= qsc_ctx->qsc_maxthreads
// && !QCNF_IS_FLAG(qsc_ctx->qsc_qmgr_ctx, QCNF_FL_SE_REUSE))
// {
//QM_LEV_DPRINTFC(QDC_SCHED, 0, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, where=1, qsc_ctx=%p, curactive=%d >= max=%d\n",
//qsc_ctx, qsc_ctx->qsc_curactive, qsc_ctx->qsc_maxthreads));
// r = pthread_mutex_unlock(&qsc_ctx->qsc_mutex);
// SM_ASSERT(r == 0);
// return sm_error_warn(SM_EM_Q_SCHED, SM_E_NO_DA_FREE);
// }
// r = pthread_mutex_unlock(&qsc_ctx->qsc_mutex);
// SM_ASSERT(r == 0);
// }
#endif /* 0 */
ret = SM_SUCCESS;
time_now = evthr_time(qmgr_ctx->qmgr_ev_ctx);
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "func=qmgr_sched_dlvry, rdq_entries=%d\n", aq_ctx->aq_rdq_used));
/*
** Go through all rcpt destination queues.
**
** Note: currently this code assumes that all recipients can be
** delivered by all delivery agents.
** Question: what "breaks" if different delivery agents or different
** delivery classes need to be selected?
** Is is necessary to have rcpt destination queues per delivery class?
** Is is necessary to have delivery superclasses, i.e., sets of
** delivery classes which are implemented by the same set of DAs
** (such that it doesn't make a difference if any of the delivery
** classes inside a superclass is selected because they all share
** the same DAs)?
*/
for (rdq_entries = aq_ctx->aq_rdq_used; rdq_entries > 0; rdq_entries--)
{
/*
** Take the head of the list, and "rotate" it to the end.
** If there would be a "rotate" operation, then this wouldn't
** be necessary unless a problem occurs in which case the
** current entry would become the first element of the list
** to achieve some kind of fairness.
*/
aqrdq = AQ_RDQS_FIRST(aq_ctx->aq_rdqs);
AQ_RDQS_REMOVE(aq_ctx->aq_rdqs, aqrdq);
AQ_RDQS_INSERT_TAIL(aq_ctx->aq_rdqs, aqrdq);
AQRDQ_CLR_FLAG(aqrdq, AQRDQ_FL_OCEXC);
todo_entries = aqrdq->aqrdq_entries;
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "func=qmgr_sched_dlvry, todo_entries=%d\n", todo_entries));
/*
** Note: the loop construct is a problem:
** 1. aq_rcpt might be removed (most likely)
** hence AQ_RDQ_NEXT(aq_rcpt) can't be done inside
** the "for" statement.
** 2. aq_rcpt_nxtl might be removed from the TODO queue too
** by the "add more recipients to this transaction" code.
** Possible solutions:
** 1. when removing a recipient from the TODO list, check it
** against aq_rcpt_nxtl: if it is the same: go to the next.
** 2. check whether aq_rcpt_nxtl is still in TODO list,
** if not: what then? we can't go to the next entry: this
** isn't in the right list...
**
** It seems the loop should be restructured:
** all entries in the TODO list are ready for delivery,
** hence after each iteration the head of the list should
** have been removed from the list, i.e.,
** while (!empty) { rcpt = head(todo_list); schedule }
**
** However, even that is incorrect because some recipients
** might not be scheduled despite being in the todo list.
** See below for two cases.
** The hack applied in those cases (move the item to the
** end of the list) doesn't work if this happens more than
** once in a list. Hence this restriction must be used
** unless a better way, e.g., a flag, can be found.
**
** The current solution:
** use a for loop restricted by the number of todo entries
** always access the first element of the list
** rcpts that aren't scheduled must be moved to the end
** stop if the list is empty
** this can happen if the scheduler adds more rcpts
** inside the loop to a transaction
*/
/* go through all "todo" rcpts for each rcpt dest queue */
for (i = 0;
i < todo_entries && !stopit
&& !AQ_RDQ_EMPTY(aqrdq->aqrdq_rcpts);
i++)
{
da_avail = da_idle = 0;
aq_rcpt = AQ_RDQ_FIRST(aqrdq->aqrdq_rcpts);
SM_IS_AQ_RCPT(aq_rcpt);
#if QMGR_TEST
if (SM_IS_FLAG(qmgr_ctx->qmgr_cnf.q_cnf_tests, QMGR_TEST_DLY_SCHED)
&& AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_DLY))
{
SM_ASSERT(false); /* abort */
}
#endif
aq_ta = aq_rcpt->aqr_ss_ta;
QM_LEV_DPRINTFC(QDC_SCHED, 5, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, aq_rcpt=%p, aqr_pa=%S, ss_ta=%s, idx=%u, aqr_flags=%#x, sched=%d, aq_ta=%p\n", aq_rcpt, aq_rcpt->aqr_pa, aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_idx, aq_rcpt->aqr_flags, AQR_SCHEDULE(aq_rcpt), aq_ta));
#if QMGR_DEBUG > 1
aq_rcpt_print(aq_rcpt);
#endif /* QMGR_DEBUG > 1 */
SM_IS_AQ_TA(aq_ta);
#if QMGR_DEBUG > 1
aq_ta_print(aq_ta);
#endif
SM_ASSERT(!AQR_IS_FLAG(aq_rcpt, AQR_FL_WAITQ_AR));
SM_ASSERT(!AQR_IS_FLAG(aq_rcpt, AQR_FL_TMOUT|AQR_FL_SCHED|AQR_FL_PERM));
SM_ASSERT(!AQR_IS_DSNFL(aq_rcpt, AQR_DSNFL_F_HBG));
SM_ASSERT(AQR_SCHEDULE(aq_rcpt));
if (AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_DSN))
QM_LEV_DPRINTFC(QDC_SCHED, 5, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, aq_rcpt=%p, aq_rcpt_pa=%S, idx=%u, type=bounce, tried=%u, total=%u, now-entered=%ld\n", aq_rcpt, aq_rcpt->aqr_pa, aq_rcpt->aqr_idx, aq_ta->aqt_rcpts_tried, aq_ta->aqt_rcpts_tot, (long) (time_now - aq_rcpt->aqr_entered)));
/*
** Note: the timeout must be significantly larger
** than the SMTPC delivery timeout!
** Check whether the item itself is too old:
** this can cause problems if there are many
** destinations that are unavailable (many delivery
** attempts)
** and check whether the time since the last
** scheduling attempt is exceeded too.
** Should this check the time since the last
** "status update" (e.g., scheduled, result from DA
** result from AR, others?)
*/
if (aq_rcpt->aqr_entered + qmgr_ctx->qmgr_tmo_sched
< time_now &&
aq_rcpt->aqr_last_try + qmgr_ctx->qmgr_tmo_sched
< time_now &&
qmgr_ctx->qmgr_tm_no_da + qmgr_ctx->qmgr_tmo_sched
< time_now
)
{
/*
** This is taking too long...
** something must be wrong.
*/
ret = sched2long(qmgr_ctx, aq_ctx, aq_rcpt, time_now);
if (sm_is_err(ret)) {
stopit = true;
break;
}
continue;
}
/*
** If this is a (double)bounce then skip it if
** - there aren't enough tried recipients (XXX?)
** or
** - the timeout hasn't occurred yet.
**
** Note: this is one place where a recipient can be in
** the todo queue but it is skipped by the scheduler.
** See also next if statement.
*/
if (AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_DSN) &&
!(aq_ta->aqt_rcpts_tot - aq_ta->aqt_rcpts_tried <=
(aq_ta_has_bounce(aq_ta) ? 1 : 0) +
(aq_ta_has_dbl_bounce(aq_ta) ? 1 : 0)
|| aq_rcpt->aqr_entered +
qmgr_ctx->qmgr_cnf.q_cnf_t_sched_dsn
< time_now)
)
{
int d;
d = aq_rcpt->aqr_entered +
qmgr_ctx->qmgr_cnf.q_cnf_t_sched_dsn -
time_now;
if (d == 0)
d = 1; /* need a minimum delay */
SET_DELAY_NEXT_TRY(d, "bounce/timeout");
SM_MV_RCPT2END;
}
/*
** XXX Hack: need
** no more recipients to receive from AR
** or
#if 0
** (at least half of the recipients resolved and
** current number of entries greater than
** (entries being delivered / 16))
#else * 0 *
** the number of recipients is less than 10
** (i.e., for small numbers all recipients must be
** resolved before continuing).
#endif * 0 *
**
** This is only a hack anyway, we need a real
** scheduler...
**
** Note: aqt_rcpts_ar counts also bounce recipients
** that have been sent to AR! Should it do that?
**
** Note: this is another place where a recipient can
** be in the todo queue but it is skipped by the
** scheduler. See also previos if statement.
*/
QM_LEV_DPRINTFC(QDC_SCHED, 6, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, aqr_pa=%S, status=%d, aqt_rcpts_ar=%d, aqt_rcpts_inaq=%d, aq_entries=%d, t_da=%d, skip=%d\n", aq_rcpt->aqr_pa, aq_rcpt->aqr_status, aq_ta->aqt_rcpts_ar, aq_ta->aqt_rcpts_inaq, aq_ctx->aq_entries, aq_ctx->aq_t_da, !((aq_ta->aqt_rcpts_ar == 0) || (aq_ta->aqt_rcpts_ar <= (aq_ta->aqt_rcpts_inaq >> 1) && aq_ta->aqt_rcpts_inaq > 10))));
if (!((aq_ta->aqt_rcpts_ar == 0)
|| (aq_ta->aqt_rcpts_ar <= (aq_ta->aqt_rcpts_inaq >> 1)
#if 0
&& (aq_ctx->aq_entries > (aq_ctx->aq_t_da >> 4))
#else
&& aq_ta->aqt_rcpts_inaq > 10
#endif
)
)
)
{
SM_MV_RCPT2END;
}
/*
** Check whether all recipients with the same domain
** are ready for scheduling.
*/
if (!samedomainok(aq_rcpt, qmgr_ctx->qmgr_hostname)) {
QM_LEV_DPRINTFC(QDC_SCHED, 5, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, aqr_pa=%S, status=skipped_because_other_entry_is_not_ready\n",
aq_rcpt->aqr_pa));
SM_MV_RCPT2END;
}
/* check whether current DA can be used */
ret = dadb_entry_avail(qsc_ctx->qsc_dadb_ctx, &da_avail, &da_idle, THR_LOCK_UNLOCK);
if (da_avail <= 0 && da_idle <= 0) {
/* find another DA */
ret = qmgr_find_da(qmgr_ctx, &qsc_bit, &qsc_ctx, &da_avail, &da_idle);
if (sm_is_err(ret)) {
if (sm_error_warn(SM_EM_Q_SCHED, SM_E_NO_DA) == ret)
qmgr_ctx->qmgr_tm_no_da = time_now;
#if 0
QM_LEV_DPRINTFC(QDC_SCHED, 0, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, where=3, qsc_ctx=%p, curactive=%d >= max=%d\n",
qsc_ctx, qsc_ctx->qsc_curactive, qsc_ctx->qsc_maxthreads));
#else /* 0 */
QM_LEV_DPRINTFC(QDC_SCHED, 0, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, where=3, qsc_ctx=%p, status=no_free_DA\n",
qsc_ctx));
#endif /* 0 */
stopit = true;
break;
}
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, switch, qsc_ctx=%p\n", qsc_ctx));
}
#if 0
// /* XXX What's this? 2003-11-12 */
// ret = aq_ta_find(aq_ctx, aq_rcpt->aqr_ss_ta_id, false, &aq_ta);
// if (sm_is_err(ret)) {
// /* COMPLAIN */
// QM_LEV_DPRINTFC(QDC_SCHED, 0, (QM_DEBFP, "sev=ERROR, func=qmgr_sched_dlvry, can't find ta for ss_ta=%s, ret=%r\n",
// aq_rcpt->aqr_ss_ta_id, ret));
// continue;
// }
#endif /* 0 */
/* add delivery information to SMTPC RCB entry */
ret = qm_rcbe_new(qmgr_ctx, &rcbe, -1);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_ERR, 1,
"sev=ERROR, func=qmgr_sched_dlvry, qm_rcbe_new=%m",
ret);
stopit = true;
break;
}
/* create callback context for RCB send */
ret = aq_rsnd_ctx_new(aq_ctx, aq_rcpt, &aq_rsnd_ctx);
if (sm_is_err(ret) || NULL == aq_rsnd_ctx) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_ERR, 1,
"sev=ERROR, func=qmgr_sched_dlvry, aq_rsnd_ctx_new=%m",
ret);
sm_rcbe_free(rcbe);
rcbe = NULL;
stopit = true;
break;
}
sm_rcbe_setcb(rcbe, aq_rcptsent2da, (void *)aq_rsnd_ctx);
dadb_entry = NULL;
occ_entry = NULL;
connlimit = qmgr_ctx->qmgr_cnf.q_cnf_max_conc_conn;
conncur = 0;
/* Need to keep occ_entry locked! */
res = occ_entry_find(qmgr_ctx->qmgr_occ_ctx->occx_ht,
aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4,
&occ_entry, &qmgr_ctx->qmgr_occ_ctx->occx_mutex,
THR_LOCK_UNLERR);
if (SM_SUCCESS == res && occ_entry != NULL) {
occe_locked = true;
SM_ASSERT(occ_entry->occe_open_se >= occ_entry->occe_open_ta);
connlimit = occ_entry->occe_cur_conc;
conncur = occ_entry->occe_open_se;
if (conncur >= connlimit
&& occ_entry->occe_open_se == occ_entry->occe_open_ta)
{
AQRDQ_SET_FLAG(aqrdq, AQRDQ_FL_OCEXC);
OCCE_SET_FLAG(occ_entry, OCCE_FL_SE_WAIT);
if (QCNF_IS_FLAG(qmgr_ctx, QCNF_FL_SE_REUSE)) {
/* always? */
OCCE_SET_FLAG(occ_entry, OCCE_FL_TA_WAIT);
}
#if 0
// r = DELAY_TOO_MANY;
// SET_DELAY_NEXT_TRY(DELAY_TOO_MANY, "exceeded");
#else /* 0 */
r = occ_entry->occe_last_upd + occ_entry->occe_timeout - time_now;
QM_LEV_DPRINTFC(QDC_SCHED, 2, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, occ=limit_exceeded, last_upd=%ld, timeout=%u, now=%ld, r=%d\n",
(long) occ_entry->occe_last_upd, occ_entry->occe_timeout, (long) time_now, r));
if (r <= 0)
r = DELAY_TOO_MANY;
SET_DELAY_NEXT_TRY(r, "exceeded");
#endif /* 0 */
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_INFO,
OCCE_IS_FLAG(occ_entry, OCCE_FL_LOGEXC) ? 14 : 11,
"sev=INFO, func=qmgr_sched_dlvry, ss_ta=%s, idx=%d, ip=%A, open_connections=%d, cur_limit=%u, status=limit_exceeded, timeout=%d",
aq_rcpt->aqr_ss_ta_id,
aq_rcpt->aqr_idx,
aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4,
occ_entry->occe_open_se,
occ_entry->occe_cur_conc, r);
OCCE_SET_FLAG(occ_entry, OCCE_FL_LOGEXC);
r = smthread_mutex_unlock(&qmgr_ctx->qmgr_occ_ctx->occx_mutex);
SM_ASSERT(r == 0);
if (r == 0)
occe_locked = false;
/* try next MX for this rcpt? */
ret = sched_mv2nxtmx(qmgr_ctx, aq_ctx, aq_rcpt, time_now);
QM_SCHED_RCBE_FREE;
break;
}
#if 0
// else if (occ_entry->occe_open_se > occ_entry->occe_open_ta)
// {
//sm_log_write(qmgr_ctx->qmgr_lctx,
//QM_LCAT_SCHED, QM_LMOD_SCHED,
//SM_LOG_INFO, 11,
//"func=qmgr_sched_dlvry, ip=%A, open_se=%u, open_ta=%u, dadbe=%p, flags=%#x"
//, aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4
//, occ_entry->occe_open_se
//, occ_entry->occe_open_ta
//, occ_entry->occe_dadbe
//, occ_entry->occe_dadbe->dadbe_flags
//);
// /* first entry should be an available session */
// if (DADBE_IS_CONN(occ_entry->occe_dadbe))
// {
// dadb_entry = occ_entry->occe_dadbe;
// res = occ_entry->occe_open_se;
// }
// else
//QM_LEV_DPRINTFC(QDC_SCHED, 1, (QM_DEBFP, "sev=ERROR, func=qmgr_sched_dlvry, ip=%A, occ_entry=%p, dadb_entry=%p, open_se=%u, open_ta=%u, flags=%#x, status=expected_available_session\n"
//, aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4
//, occ_entry, dadb_entry
//, occ_entry->occe_open_se, occ_entry->occe_open_ta
//, occ_entry->occe_dadbe->dadbe_flags
//));
// }
#endif /* 0 */
else
{
OCCE_CLR_FLAG(occ_entry, OCCE_FL_LOGEXC);
}
}
/*
** move into previous if-block: an open session can only be found if
** occ_entry_find() found an entry.
*/
if (NULL == dadb_entry) {
/*
** Find a DA that has a session open
** to the required IP addr
*/
res = dadb_se_find_by_ipv4(qsc_ctx->qsc_dadb_ctx,
aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4,
time_now, &dadb_entry);
}
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_INFO, 15,
"func=qmgr_sched_dlvry, ip=%A, open_connections=%d, dadb_entry=%p",
aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].
aqra_ipv4,
res, dadb_entry);
/* restrict number of connections */
if (dadb_entry == NULL &&
((res > 0 && (uint)res >= connlimit) || conncur >= connlimit))
{
r = smthread_mutex_unlock(&qmgr_ctx->qmgr_occ_ctx->occx_mutex);
SM_ASSERT(r == 0);
if (r == 0)
occe_locked = false;
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_INFO, 13,
"func=qmgr_sched_dlvry, ip=%A, open_connections=%d, status=limit_exceeded",
aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4,
conncur > 0 ? conncur : res);
SET_DELAY_NEXT_TRY(DELAY_TOO_MANY, "exceeded2");
break;
}
se_reuse = res > 0 && dadb_entry != NULL
&& QCNF_IS_FLAG(qmgr_ctx, QCNF_FL_SE_REUSE);
if (se_reuse) {
SM_ASSERT(occe_locked);
/* let's reuse this ... */
ret = dadb_sess_reuse(qsc_ctx, qsc_ctx->qsc_dadb_ctx,
aq_rcpt->aqr_ss_ta_id, dadb_entry, occ_entry, aq_rcpt);
r = smthread_mutex_unlock(&qmgr_ctx->qmgr_occ_ctx->occx_mutex);
SM_ASSERT(r == 0);
if (r == 0)
occe_locked = false;
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_INFO, 9,
"func=qmgr_sched_dlvry, open_connections=%d, ip=%A, dadb_sess_reuse=%m",
conncur > 0 ? conncur : res,
aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4,
ret);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_ERR, 1,
"sev=ERROR, func=qmgr_sched_dlvry, dadb_sess_reuse=%m",
ret);
/*
** if session can't be reused: check
** whether connection limit is exceeded
*/
if ((uint)res >= connlimit || conncur >= connlimit) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED,
QM_LMOD_SCHED,
SM_LOG_INFO, 11,
"func=qmgr_sched_dlvry, ip=%A, open_connections=%d, status=session_cannot_be_reused+limit_exceeded",
aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4,
res);
SET_DELAY_NEXT_TRY(DELAY_TOO_MANY, "exceeded3");
break;
}
}
}
else if (occe_locked)
{
r = smthread_mutex_unlock(&qmgr_ctx->qmgr_occ_ctx->occx_mutex);
SM_ASSERT(r == 0);
if (r == 0)
occe_locked = false;
}
/* can't reuse existing connection? try a new one */
if ((res == 0 || dadb_entry == NULL || sm_is_err(ret))
&& da_avail > 0)
{
se_reuse = false;
/* open a DA session */
ret = dadb_sess_open(qsc_ctx, qsc_ctx->qsc_dadb_ctx,
aq_rcpt->aqr_ss_ta_id,
aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4,
aq_rcpt, &dadb_entry);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_ERR, 1,
"sev=ERROR, func=qmgr_sched_dlvry, qsc_ctx=%p, dadb_sess_open=%m",
qsc_ctx, ret);
#if QMGR_DEBUG
{
dadb_entry_P dadb_entryh;
dadb_ctx_P dadb_ctx;
uint l;
dadb_ctx = qsc_ctx->qsc_dadb_ctx;
#if 0
QM_LEV_DPRINTFC(QDC_SCHED, 0, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, where=4, qsc_ctx=%p, curactive=%d, max=%d\n",
qsc_ctx, qsc_ctx->qsc_curactive, qsc_ctx->qsc_maxthreads));
#else /* 0 */
QM_LEV_DPRINTFC(QDC_SCHED, 0, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, where=4, qsc_ctx=%p, entries_cur=%d, entries_max=%d\n",
qsc_ctx, dadb_ctx->dadb_entries_cur, dadb_ctx->dadb_entries_max));
#endif /* 0 */
for (l = 0; l < dadb_ctx->dadb_entries_max; l++)
{
dadb_entryh = (dadb_ctx->dadb_entries)[l];
QM_LEV_DPRINTFC(QDC_SCHED, 1, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, dadb_entry=%p, flags=%#x\n",
dadb_entryh, dadb_entryh == NULL ? UINT_MAX : dadb_entryh->dadbe_flags));
}
}
#endif /* QMGR_DEBUG */
/* should this stop the scheduler??? */
stopit = true;
break;
}
SM_ASSERT(dadb_entry != NULL);
}
if (NULL == dadb_entry) {
/* no free da for this one: try next rdq */
QM_SCHED_RCBE_FREE;
break;
}
ret = qm_sc_conf(qmgr_ctx,
aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4,
dadb_entry);
if (SM_IS_TEMP_ERR(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_INFO, 9,
"sev=INFO, func=qmgr_sched_dlvry, ip=%A, qm_sc_conf=%r"
, aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4, ret);
QM_SCHED_RCBE_FREE;
break;
}
/*
** Keep the connection open?
** Only if it is configured and if there is at least
** one other entry in this "ready queue".
*/
se_keep_open = qm_sess_keep_open(qmgr_ctx, aq_ta, aq_rcpt, max_rcpts_ta);
QM_LEV_DPRINTTC(QDC_SCHED, 1, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, session_reuse=%d, keep_open=%d, da_sess=%s, da_ta=%s\n",
se_reuse, se_keep_open, dadb_entry->dadbe_da_se_id, dadb_entry->dadbe_da_ta_id), qmgr_ctx->qmgr_ev_ctx->evthr_c_time);
ret = qm_to_sc_task(qsc_ctx, se_reuse, !se_keep_open,
aq_ta, aq_rcpt, rcbe, dadb_entry);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_ERR, 1,
"sev=ERROR, func=qmgr_sched_dlvry, qm_to_sc_task=%m",
ret);
stopit = true;
break;
}
++aq_ctx->aq_t_da;
SESSTA_COPY(aq_rcpt->aqr_da_ta_id, dadb_entry->dadbe_da_ta_id);
AQR_DA_INIT(aq_rcpt);
ret = aq_rdq_rm(aq_ctx, aq_rcpt, THR_NO_LOCK, qmgr_ctx->qmgr_lctx);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_ERR, 1,
"sev=ERROR, func=qmgr_sched_dlvry, aq_rdq_rm=%m, flags=%#x",
ret, aq_rcpt->aqr_flags);
stopit = true;
break;
}
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_INFO, 9,
"func=qmgr_sched_dlvry, ss_ta=%s, da_sess=%s, da_ta=%s, rcpt=%@S, idx=%d, state=%d, ip=%A, i=%u, reuse=%d, keep_open=%d, entries_cur=%d, entries_max=%d"
, aq_rcpt->aqr_ss_ta_id
, dadb_entry->dadbe_da_se_id
, dadb_entry->dadbe_da_ta_id
, aq_rcpt->aqr_pa
, aq_rcpt->aqr_idx, aq_rcpt->aqr_status
, aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4, i
, se_reuse, se_keep_open
, qsc_ctx->qsc_dadb_ctx->dadb_entries_cur
, qsc_ctx->qsc_dadb_ctx->dadb_entries_max
);
max_rcpts_ta = qmgr_rpcts_ta(qmgr_ctx, aq_rcpt);
/* Add more recipients to this DA transaction? */
for (aq_rcpt_nxt = AQR_SS_SUCC(aq_rcpt), nrcpts = 1;
aq_rcpt_nxt != aq_rcpt && nrcpts < max_rcpts_ta
&& !AQ_TA_IS_FLAG(aq_ta, AQ_TA_FL_VERP)
&& !AQR_IS_FLAG(aq_rcpt, AQR_FL_HAS_VERP);
aq_rcpt_nxt = AQR_SS_SUCC(aq_rcpt_nxt))
{
/*
XXX "append" also undeliverable recipients
so they are counted in qda...
*/
/* check whether DA and host are the same */
if (AQR_SCHEDULE(aq_rcpt_nxt)
&& SAME_TRANSACTION(aq_rcpt, aq_rcpt_nxt))
{
QM_LEV_DPRINTFC(QDC_SCHED, 1, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, status=more, i=%u, ta=%s, rcpt=%@S, idx=%d, ip=%A, rcpt=%p, rcpt_nxt=%p, nrctps=%u\n", i, aq_rcpt->aqr_ss_ta_id, aq_rcpt_nxt->aqr_pa, aq_rcpt_nxt->aqr_idx, aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4, aq_rcpt, aq_rcpt_nxt, nrcpts));
ret = qm_to_sc_add_rcpt(qsc_ctx, aq_rcpt_nxt, rcbe);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_ERR, 1,
"sev=ERROR, func=qmgr_sched_dlvry, qm_to_sc_add_rcpt=%m",
ret);
stopit = true;
break;
}
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_INFO, 9,
"func=qmgr_sched_dlvry, sched=more, ss_ta=%s, da_ta=%s, rcpt=%@S, idx=%d, ip=%A",
aq_rcpt->aqr_ss_ta_id,
dadb_entry->dadbe_da_ta_id,
aq_rcpt_nxt->aqr_pa,
aq_rcpt_nxt->aqr_idx,
aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4);
AQR_SET_FLAG(aq_rcpt_nxt, AQR_FL_SCHED);
SESSTA_COPY(aq_rcpt_nxt->aqr_da_ta_id, dadb_entry->dadbe_da_ta_id);
/*
** Prepend new recipient to original
** one to keep the correct order.
*/
AQR_DA_PRE(aq_rcpt, aq_rcpt_nxt);
++aq_ctx->aq_t_da;
++nrcpts;
ret = aq_rdq_rm(aq_ctx, aq_rcpt_nxt, THR_NO_LOCK, qmgr_ctx->qmgr_lctx);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_ERR, 1,
"sev=ERROR, func=qmgr_sched_dlvry, aq_rdq_rm=%m, flags=%#x, where=more_rcpts2busy",
ret, aq_rcpt_nxt->aqr_flags);
stopit = true;
break;
}
}
/*
** Set this here?? That means it is only
** set if sent to a DA, not when it is
** "looked at" by the scheduler. Hence if
** the address doesn't resolve, last_try
** is never set.
*/
aq_rcpt_nxt->aqr_last_try = time_now;
}
/* Send task to DA */
ret = sm_rcbcom_endrep(&qsc_ctx->qsc_com, qsc_ctx->qsc_com.rcbcom_tsk, true /* XXX HACK */, &rcbe);
if (sm_is_err(ret)) {
aq_rcpt_P aq_rcpt_r;
if (rcbe != NULL) {
/*
** rcbe has not been added to write
** list hence it needs to be cleaned up
*/
aq_rsnd_ctx_free(aq_rsnd_ctx);
aq_rsnd_ctx = NULL;
sm_rcbe_free(rcbe);
rcbe = NULL;
}
/* undo status change for added recipients */
for (aq_rcpt_nxt = AQR_DA_PRED(aq_rcpt);
aq_rcpt_nxt != aq_rcpt;
aq_rcpt_nxt = aq_rcpt_r)
{
aq_rcpt_r = AQR_DA_PRED(aq_rcpt);
AQR_CLR_FLAG(aq_rcpt_nxt, AQR_FL_SCHED);
res = aq_rdq_add(aq_ctx, aq_rcpt_nxt, NULL /* &aqrdq_flags */, THR_NO_LOCK);
if (sm_is_err(res)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_ERR, 1,
"sev=ERROR, func=qmgr_sched_dlvry, aq_rdq_add=%m, flags=%#x, where=more_rcpts2todo",
ret, aq_rcpt_nxt->aqr_flags);
}
/* XXX also remove entry from list? */
AQR_DA_DELENTRY(aq_rcpt_nxt);
/*
** XXX Other things to undo?
** Make a list of things "done"
** so they can be "undone" here.
*/
}
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_ERR, 1,
"sev=ERROR, func=qmgr_sched_dlvry, sm_qsc_endrep=%m",
ret);
stopit = true;
break;
}
else
aq_rsnd_ctx = NULL;
/*
** At this point the information about the session/
** transaction is in the RCB list for the DA, but
** it isn't sent yet. Hence AQR_FL_WAIT4UPD can't
** be set here, it will be set when the RCB
** is sent to the DA.
*/
#if 0
// if (!se_reuse)
// {
// r = pthread_mutex_lock(&qsc_ctx->qsc_mutex);
// SM_LOCK_OK(r);
// if (r != 0)
// {
// sm_log_write(qmgr_ctx->qmgr_lctx,
// QM_LCAT_SCHED, QM_LMOD_SCHED,
// SM_LOG_CRIT, 4,
// "sev=CRIT, func=qmgr_sched_dlvry, lock_qsc=%d", r);
// }
// else
// {
// qsc_ctx->qsc_curactive++;
// r = pthread_mutex_unlock(&qsc_ctx->qsc_mutex);
// SM_ASSERT(r == 0);
// }
// }
//QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, qsc_ctx=%p, curactive=%d, max=%d\n",
//qsc_ctx, qsc_ctx->qsc_curactive, qsc_ctx->qsc_maxthreads));
#endif /* 0 */
AQR_SET_FLAG(aq_rcpt, AQR_FL_SCHED);
aq_rcpt->aqr_last_try = time_now;
*pqsc_bits |= qsc_bit;
}
/*
** If stopit is set then the rdq should be rotated so
** the next time the scheduler starts at the one that
** couldn't be finished this time. XXX
** See begin of outer loop, it's currently rotated for
** every entry.
*/
if (rcbe != NULL) {
/*
** Possible leftover from previous iteration.
** Maybe it should be reused?
*/
(void) aq_rsnd_ctx_free(aq_rsnd_ctx);
aq_rsnd_ctx = NULL;
sm_rcbe_free(rcbe);
rcbe = NULL;
}
SM_ASSERT(NULL == aq_rsnd_ctx);
}
res = ret;
/*
** XXX Crude resource control...
** This needs to be more sophisticated and take more resources
** into account. Moreover, it needs to free memory if required,
** and it needs to change the limits on other data structures
** (esp. IQDB) too.
*/
if (sm_is_err(res) && sm_error_value(res) == ENOMEM) {
#if 0
/* throttle servers; do this in caller?? */
(void) qm_control(qmgr_ctx, 1, 100, QMGR_RFL_MEM_I, THR_LOCK_UNLOCK);
#endif
ret = aq_lower_limit(aq_ctx, 0, THR_NO_LOCK);
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_WARN, 8,
"sev=WARN, func=qmgr_sched_dlvry, aq_lower_limit=%m",
ret);
}
else
{
/*
** XXX This should NOT be done always... but as long
** as we don't have a way to measure memory usage...
** How about a way to gradually increase it, similar
** to qss_control()?
*/
ret = aq_raise_limit(aq_ctx, UINT_MAX, THR_NO_LOCK);
}
if (rcbe != NULL) {
aq_rsnd_ctx_free(aq_rsnd_ctx);
aq_rsnd_ctx = NULL;
sm_rcbe_free(rcbe);
rcbe = NULL;
}
SM_ASSERT(NULL == aq_rsnd_ctx);
return res;
}
/*
** QMGR_SCHED -- scheduler
** this is running as a task (timeout activated)
**
** Parameters:
** tsk -- evthr task
**
** Returns:
** usual sm_error code
**
** Locking: locks entire aq_ctx during operation, returns unlocked
**
** Last code review:
** Last code change:
*/
sm_ret_T
qmgr_sched(sm_evthr_task_P tsk)
{
int r;
sm_ret_T ret;
qmgr_ctx_P qmgr_ctx;
aq_ctx_P aq_ctx;
bool err;
uint32_t qsc_bits;
timeval_T timeval_now, delay;
int delay_next_try;
#if QMGR_TEST
int delay_next_try_tst;
#endif
SM_IS_EVTHR_TSK(tsk);
qmgr_ctx = (qmgr_ctx_P) tsk->evthr_t_actx;
SM_IS_QMGR_CTX(qmgr_ctx);
aq_ctx = qmgr_ctx->qmgr_aq;
SM_IS_AQ(aq_ctx);
err = false;
qsc_bits = 0;
/* Check ret?? */
ret = evthr_timeval(tsk->evthr_t_ctx, &timeval_now);
delay_next_try = 0;
#if QMGR_TEST
delay_next_try_tst = 0;
if (QCNF_IS_FLAG(qmgr_ctx, QCNF_FL_NO_SCHED)) {
delay.tv_usec = 0;
delay.tv_sec = 100;
timeradd(&timeval_now, &delay, &tsk->evthr_t_sleep);
return EVTHR_SLPQ;
}
if (SM_IS_FLAG(qmgr_ctx->qmgr_cnf.q_cnf_tests, QMGR_TEST_INT_SRC)) {
ret = qm_tst_fill_aq(qmgr_ctx);
if (sm_is_err(ret))
delay_next_try_tst = sm_is_warn_err(ret) ?
(0 - sm_error_value(ret)) : 300;
else if (SM_NOTDONE == ret)
delay_next_try_tst = 1;
#if QMGR_STATS
if (qmgr_ctx->qmgr_cnf.q_cnf_tot_tas <= qmgr_ctx->qmgr_tas_sent)
sm_io_fprintf(smioerr, "q_cnf_tot_tas=%u, tas_sent=%lu, aq_entries=%u\n", qmgr_ctx->qmgr_cnf.q_cnf_tot_tas, qmgr_ctx->qmgr_tas_sent, aq_ctx->aq_entries);
if (qmgr_ctx->qmgr_cnf.q_cnf_tot_tas <= qmgr_ctx->qmgr_tas_sent
&& aq_ctx->aq_entries <= 2)
{
qm_info(qmgr_ctx, smioout);
return EVTHR_TERM;
}
#endif
}
if (SM_IS_FLAG(qmgr_ctx->qmgr_cnf.q_cnf_tests, QMGR_TEST_SCHED_DLY)) {
static time_T sched_start_delay = 0;
if (0 == sched_start_delay)
sched_start_delay = timeval_now.tv_sec + 10;
if (sched_start_delay > timeval_now.tv_sec) {
tsk->evthr_t_sleep.tv_sec = sched_start_delay;
return EVTHR_SLPQ;
}
SM_CLR_FLAG(qmgr_ctx->qmgr_cnf.q_cnf_tests, QMGR_TEST_SCHED_DLY);
}
#else
/* XXX TESTING XXX */
if (QCNF_IS_FLAG(qmgr_ctx, QCNF_FL_SE_REUSE)) {
static time_T sched_start_delay = 0;
if (0 == sched_start_delay)
sched_start_delay = timeval_now.tv_sec + 10;
if (sched_start_delay > timeval_now.tv_sec) {
tsk->evthr_t_sleep.tv_sec = sched_start_delay;
return EVTHR_SLPQ;
}
}
#endif /* QMGR_TEST */
ret = qm_get_edb_entries(qmgr_ctx, &delay_next_try);
if (delay_next_try < 0)
delay_next_try = 1; /* XXX HACK */
/* HACK notify qar task */
if (QDA_ACT_SMAR(ret)) {
sm_evthr_task_P qar_tsk;
qar_ctx_P qar_ctx;
qar_ctx = qmgr_ctx->qmgr_ar_ctx;
SM_IS_QAR_CTX(qar_ctx);
qar_tsk = qmgr_ctx->qmgr_ar_tsk;
if (qar_tsk != NULL)
ret = evthr_en_wr(qar_tsk);
else
ret = sm_error_temp(SM_EM_Q_SCHED, SM_E_NO_AR);
}
QM_LEV_DPRINTFC(QDC_SCHED, 6, (QM_DEBFP, "sev=DBG, func=qmgr_sched, tsk=%p\n", tsk));
r = pthread_mutex_lock(&aq_ctx->aq_mutex);
SM_LOCK_OK(r);
if (r != 0) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_CRIT, 4,
"sev=CRIT, func=qmgr_sched, lock_aq=%d", r);
goto error;
}
/* XXX just wait till someone wakes this up? */
if (aq_ctx->aq_nextrun.tv_sec != 0) {
if (timercmp(&timeval_now, &aq_ctx->aq_nextrun, <)) {
tsk->evthr_t_sleep = aq_ctx->aq_nextrun;
QM_LEV_DPRINTFC(QDC_SCHED, 2, (QM_DEBFP, "sev=DBG, func=qmgr_sched, delay=forced, next_try=%ld\n", aq_ctx->aq_nextrun.tv_sec));
goto unlock;
}
aq_ctx->aq_nextrun.tv_usec = 0;
aq_ctx->aq_nextrun.tv_sec = 0;
}
if (aq_is_empty(aq_ctx)) {
delay.tv_usec = 0;
delay.tv_sec = delay_next_try == 0 ? 3000 : delay_next_try;
QM_LEV_DPRINTFC(QDC_SCHED, 2, (QM_DEBFP, "sev=DBG, func=qmgr_sched, aq=empty, next_try=%d, delay=%ld\n", delay_next_try, delay.tv_sec));
}
else {
/* do something.... */
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_INFO, 14,
"func=qmgr_sched, aq_entries=%u, aq_t_da=%u",
aq_ctx->aq_entries, aq_ctx->aq_t_da);
ret = qmgr_sched_dlvry(qmgr_ctx, aq_ctx, &qsc_bits, &delay_next_try);
if (sm_is_err(ret)) {
/* XXX Map error type to logging category and sev? */
err = sm_is_error(ret);
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
err ? SM_LOG_ERR : SM_LOG_WARN,
err ? 2 :
QMGR_IS_SFLAG(qmgr_ctx, QMGR_SFL_DA)
? 14 : 12,
"sev=%s, func=qmgr_sched, qmgr_sched_dlvry=%m",
err ? "ERROR" : "WARN",
ret);
}
/* XXX interval... */
delay.tv_usec = 0;
#if QMGR_TEST
if (SM_IS_FLAG(qmgr_ctx->qmgr_cnf.q_cnf_tests, QMGR_TEST_INT_SRC)) {
r = aq_usage(aq_ctx, AQ_USAGE_ALL);
if (r < qmgr_ctx->qmgr_cnf.q_cnf_max_fill_aq &&
0 == delay_next_try_tst)
{
delay.tv_sec = 0;
delay.tv_usec = r;
}
else if (delay_next_try_tst < 0)
{
delay.tv_sec = 0;
delay.tv_usec = (-delay_next_try_tst) * 500;
}
else
delay.tv_sec = delay_next_try_tst == 0 ?
1 : delay_next_try_tst;
sm_io_fprintf(smioerr, "usage=%d, delay_next_try_tst=%d, delay_next_try=%d, delay=%d.%06ld\n", r, delay_next_try_tst, delay_next_try, delay.tv_sec, delay.tv_usec);
}
else
#endif
delay.tv_sec = delay_next_try == 0 ? 600 : delay_next_try;
QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "sev=DBG, func=qmgr_sched, next_try=%d, delay=%ld\n", delay_next_try, delay.tv_sec));
}
QM_LEV_DPRINTFC(QDC_SCHED, 4, (QM_DEBFP, "sev=DBG, func=qmgr_sched, old t_sleep=%ld\n", tsk->evthr_t_sleep.tv_sec));
timeradd(&timeval_now, &delay, &tsk->evthr_t_sleep);
QM_LEV_DPRINTFC(QDC_SCHED, 4, (QM_DEBFP, "sev=DBG, func=qmgr_sched, timeval_now=%ld, delay=%ld, t_sleep=%ld\n", timeval_now.tv_sec, delay.tv_sec, tsk->evthr_t_sleep.tv_sec));
if (err)
aq_ctx->aq_nextrun = tsk->evthr_t_sleep;
unlock:
r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
if (r != 0) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_ERR, 1,
"sev=ERROR, func=qmgr_sched, unlock_aq=%d", r);
goto error;
}
/*
** notify qsc task after aq has been unlocked, otherwise
** there might be a deadlock (qsc task locked, waiting for aq,
** while this task has the lock for aq and tries to notify
** (which requires locking) the qsc task).
*/
if (qsc_bits != 0) {
uint i;
uint32_t j;
qsc_ctx_P qsc_ctx;
r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex);
SM_LOCK_OK(r);
if (r != 0) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SMTPC, QM_LMOD_FROM_SMTPC,
SM_LOG_CRIT, 4,
"sev=CRIT, func=qmgr_sched, lock_qmgr=%d",
r);
goto error;
}
/* XXX HACK! Must match what qmgr_sched_dlvry does! */
for (j = 1, i = 0; i < QM_N_SC_GLI(qmgr_ctx); i++, j *= 2)
{
if ((qsc_bits & j) == 0)
continue;
qsc_ctx = qmgr_li_sc(qmgr_ctx, i);
if (qsc_ctx->qsc_com.rcbcom_tsk == NULL)
continue;
SM_IS_EVTHR_TSK(qsc_ctx->qsc_com.rcbcom_tsk);
/* tell someone to send the tasks to the DAs */
ret = evthr_en_wr(qsc_ctx->qsc_com.rcbcom_tsk);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_SCHED,
SM_LOG_ERR, 1,
"sev=ERROR, func=qmgr_sched, evthr_en_wr=%m",
ret);
/* XXX what now my friend? */
}
}
r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex);
SM_ASSERT(r == 0);
}
return EVTHR_SLPQ;
error:
delay.tv_usec = 0;
delay.tv_sec = 1;
timeradd(&timeval_now, &delay, &tsk->evthr_t_sleep);
return EVTHR_SLPQ;
}
syntax highlighted by Code2HTML, v. 0.9.1