/* * Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers. * All rights reserved. * * By using this file, you agree to the terms and conditions set * forth in the LICENSE file which can be found at the top level of * the sendmail distribution. */ #include "sm/generic.h" SM_RCSID("@(#)$Id: sched.c,v 1.263 2007/06/24 02:52:57 ca Exp $") #include "sm/error.h" #include "sm/assert.h" #include "sm/memops.h" #include "sm/io.h" #include "sm/rcb.h" #include "sm/reccom.h" #include "sm/da.h" #include "sm/das.h" #include "sm/qmgr.h" #include "sm/qmgr-int.h" #include "sm/misc.h" #include "qmgr.h" #include "sm/aqrdq.h" #include "log.h" /* ** Which access methods to recipients does the scheduler need? ** DA/Host: for session reuse. ** SMTPS TA-Id: to find recipients for the same TA (that could be ** be delivered in one TA if the DA/Host is the same). ** ** Which other (recipient) data does the scheduler need? ** Number of recipients/TAs destined for the same DA/Host. */ /* ** Can both recipients be delivered in the same transaction? ** They are from the same incoming transaction, but do they share same ** delivery information (MX piggypacking?) ** Bounces can't be piggypacked: they have content in the rcpt itself. */ #define SAME_TRANSACTION(aq_rcpt, aq_rcpt_nxt) \ ((aq_rcpt)->aqr_addrs[(aq_rcpt)->aqr_addr_cur].aqra_ipv4 == \ (aq_rcpt_nxt)->aqr_addrs[(aq_rcpt_nxt)->aqr_addr_cur].aqra_ipv4 \ && (aq_rcpt)->aqr_da_idx == (aq_rcpt_nxt)->aqr_da_idx \ && (aq_rcpt)->aqr_port == (aq_rcpt_nxt)->aqr_port \ && (aq_rcpt)->aqr_owner_idx == (aq_rcpt_nxt)->aqr_owner_idx \ && !AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_DSN) \ && !AQR_IS_FLAG(aq_rcpt_nxt, AQR_FL_IS_DSN) \ && !AQR_IS_FLAG(aq_rcpt, AQR_FL_HAS_VERP) \ && !AQR_IS_FLAG(aq_rcpt_nxt, AQR_FL_HAS_VERP) \ ) /* primitive version of setting an error value if it isn't set already */ #define SM_SET_RET(res, ret) do { \ if (sm_is_err(ret) && !sm_is_err(res)) \ (res) = (ret); \ } while (0) /* ** QM_AR_ACTSCHED -- Decide whether to activate scheduler after getting ** result from AR. ** ** Parameters: ** qmgr_ctx -- QMGR context ** aq_rcpt -- AQ recipient ** pnotify_sched -- (pointer to) notify scheduler? (output) ** ** Returns: ** usual sm_error code; only (un)lock ** ** Last code review: 2005-04-04 20:39:02 ** Last code change: 2005-04-04 20:37:48 */ sm_ret_T qm_ar_actsched(qmgr_ctx_P qmgr_ctx, aq_rcpt_P aq_rcpt, bool *pnotify_sched) { #undef SMFCT #define SMFCT "qm_ar_actsched" sm_ret_T ret; int r; occ_entry_P occ_entry; SM_IS_QMGR_CTX(qmgr_ctx); SM_IS_AQ_RCPT(aq_rcpt); SM_REQUIRE(pnotify_sched != NULL); /* default: activate scheduler */ *pnotify_sched = true; ret = SM_SUCCESS; if (!AQR_IS_FLAG(aq_rcpt, AQR_FL_ARF) && aq_rcpt->aqr_addrs != NULL) { occ_entry = NULL; SM_ASSERT(aq_rcpt->aqr_addr_cur < aq_rcpt->aqr_addr_max); ret = occ_entry_find(qmgr_ctx->qmgr_occ_ctx->occx_ht, aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4, &occ_entry, &qmgr_ctx->qmgr_occ_ctx->occx_mutex, THR_LOCK_UNLERR); if (SM_SUCCESS == ret && occ_entry != NULL) { if (occ_entry->occe_open_se >= occ_entry->occe_cur_conc && occ_entry->occe_open_se == occ_entry->occe_open_ta) { #if 0 QM_LEV_DPRINTFC(QDC_SCHED, 2, (QM_DEBFP, "func=qm_ar_actsched, open_se=%d, cur_conc=%d, open_se=%d, open_ta=%d\n" , occ_entry->occe_open_se , occ_entry->occe_cur_conc , occ_entry->occe_open_se , occ_entry->occe_open_ta)); #endif /* 0 */ *pnotify_sched = false; OCCE_SET_FLAG(occ_entry, OCCE_FL_SE_WAIT); } r = smthread_mutex_unlock(&qmgr_ctx->qmgr_occ_ctx->occx_mutex); SM_ASSERT(r == 0); if (r != 0) ret = sm_error_perm(SM_EM_Q_SCHED, r); } else ret = SM_SUCCESS; } #if 0 /* else: scheduler might be waiting for result, see below: */ if (!((aq_ta->aqt_rcpts_ar == 0) || (aq_ta->aqt_rcpts_ar <= (aq_ta->aqt_rcpts_inaq >> 1) && (aq_ctx->aq_entries < (aq_ctx->aq_t_da >> 4))) #endif /* 0 */ return ret; } /* ** QM_SC_CONF -- find smtpc configuration data for server ** ** Parameters: ** qmgr_ctx -- QMGR context ** ipv4 -- IPv4 address of server ** dadb_entry -- DA DB entry ** ** Returns: ** usual sm_error code ** ** Last code review: ** Last code change: */ static sm_ret_T qm_sc_conf(qmgr_ctx_P qmgr_ctx, ipv4_T ipv4, dadb_entry_P dadb_entry) { #if MTA_USE_TLS #undef SMFCT #define SMFCT "qm_sc_conf" sm_ret_T ret; dadb_entry->dadbe_se_maprescnf = sm_err_perm(SM_E_NOTFOUND); if (NULL == qmgr_ctx->qmgr_conf_map || !QCNF_IS_FLAG(qmgr_ctx, QCNF_FL_SC_LKP_SE_CONF)) { return SM_SUCCESS; } if (NULL == dadb_entry->dadbe_se_conf) dadb_entry->dadbe_se_conf = sm_str_new(NULL, 256, 1024); /*check size*/ if (NULL == dadb_entry->dadbe_lhs) dadb_entry->dadbe_lhs = sm_str_new(NULL, 10, 16); /* check size */ if (NULL == dadb_entry->dadbe_tag) dadb_entry->dadbe_tag = sm_str_new(NULL, 16, 32); /* check size */ if (NULL == dadb_entry->dadbe_se_conf || NULL == dadb_entry->dadbe_lhs || NULL == dadb_entry->dadbe_tag) { return sm_err_temp(ENOMEM); } sm_str_clr(dadb_entry->dadbe_se_conf); sm_str_clr(dadb_entry->dadbe_lhs); sm_str_clr(dadb_entry->dadbe_tag); #define SC_SE_TAG "smtpc_sess_conf:" ret = sm_err_temp(SM_E_OVFLW_NS); if (sm_str_scat(dadb_entry->dadbe_tag, SC_SE_TAG) == SM_SUCCESS && sm_inet_ipv4str(ipv4, dadb_entry->dadbe_lhs) == SM_SUCCESS && (ret = sm_map_lookup_ip(qmgr_ctx->qmgr_conf_map, dadb_entry->dadbe_lhs, dadb_entry->dadbe_tag, SMMAP_LFL_SUBNETS|SMMAP_LFL_TAG, dadb_entry->dadbe_se_conf)) == SM_SUCCESS && sm_str_getlen(dadb_entry->dadbe_se_conf) > 0) { QM_LEV_DPRINTFC(QDC_S2Q, 4, (QM_DEBFP, "func=%s, ipv4=%A, %slookup=%r, rhs=%S\n", SMFCT, ipv4, SC_SE_TAG, ret, dadb_entry->dadbe_se_conf)); dadb_entry->dadbe_se_maprescnf = ret; } return ret; #else /* MTA_USE_TLS */ return SM_SUCCESS; #endif /* MTA_USE_TLS */ } /* ** QM_TO_SC_TASK -- create one session with one task for SMTPC ** XXX HACK just one session with one transaction with one (or more) ** recipients (see qm_to_sc_add_rcpt()) ** ** Parameters: ** qsc_ctx -- QMGR SMTPC context ** se_reuse -- reuse an open session? ** one_ta -- only one transaction? (close session after this TA?) ** aq_ta -- AQ transaction ** aq_rcpt -- AQ recipient ** rcbe -- RCB entry ** dadb_entry -- DA DB entry ** ** Returns: ** usual sm_error code; see sm_rcb_putv() ** ** Side Effects: does not clean up rcb in case of an error. ** ** Last code review: 2005-04-04 21:03:45 ** Last code change: 2006-02-21 05:55:16 */ static sm_ret_T qm_to_sc_task(qsc_ctx_P qsc_ctx, bool se_reuse, bool one_ta, aq_ta_P aq_ta, aq_rcpt_P aq_rcpt, sm_rcbe_P rcbe, dadb_entry_P dadb_entry) { sm_rcb_P rcb; sm_ret_T ret; uint32_t cdb_rt, ta_flags, se_flags; bool dsn; extern sm_str_P NullSender; sm_str_P sender, verp; SM_ASSERT(!aq_rcpt_has_owner(aq_rcpt) || (aq_rcpt->aqr_owner_idx > 0 && aq_rcpt->aqr_owner_idx <= aq_ta->aqt_owners_n)); verp = NULL; rcb = &rcbe->rcbe_rcb; /* treat double bounce differently?? */ dsn = AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_DSN); se_flags = ta_flags = 0; cdb_rt = RT_Q2C_CDBID; if (dsn && (QCNF_IS_FLAG(qsc_ctx->qsc_qmgr_ctx, QCNF_FL_B_HDR) || (AQ_TA_IS_FLAG(aq_ta, AQ_TA_FL_NO_BODY) && QCNF_IS_FLAG(qsc_ctx->qsc_qmgr_ctx, QCNF_FL_B_BODY_HDR)))) { cdb_rt = RT_Q2C_CDBID_HDR; ta_flags |= DA_FL_HDR_ONLY; } if (dsn && QCNF_IS_FLAG(qsc_ctx->qsc_qmgr_ctx, QCNF_FL_B_DSN_MIME)) ta_flags |= DA_FL_DSN_MIME; if (!one_ta) se_flags |= DA_FL_SE_KEEP; if (se_reuse) se_flags |= DA_FL_SE_REUSE_EX; if (dsn) sender = NullSender; else if (AQR_IS_FLAG(aq_rcpt, AQR_FL_HAS_VERP) && aq_rcpt_has_owner(aq_rcpt)) { size_t len; sm_str_P owner; owner = aq_ta->aqt_owners_pa[aq_rcpt->aqr_owner_idx - 1]; len = sm_str_getlen(owner) + sm_str_getlen(aq_rcpt->aqr_pa); verp = sm_str_new(NULL, len, len + 4); if (NULL == verp) { ret = sm_err_temp(ENOMEM); goto error; } ret = sm_verpify(owner, aq_rcpt->aqr_pa, '\0', '\0', verp); if (sm_is_err(ret)) goto error; sender = verp; } else if (AQ_TA_IS_FLAG(aq_ta, AQ_TA_FL_VERP)) { size_t len; len = sm_str_getlen(aq_ta->aqt_mail->aqm_pa) + sm_str_getlen(aq_rcpt->aqr_pa); verp = sm_str_new(NULL, len, len + 4); if (NULL == verp) { ret = sm_err_temp(ENOMEM); goto error; } ret = sm_verpify(aq_ta->aqt_mail->aqm_pa, aq_rcpt->aqr_pa, '\0', '\0', verp); if (sm_is_err(ret)) goto error; sender = verp; } else if (aq_rcpt_has_owner(aq_rcpt)) sender = aq_ta->aqt_owners_pa[aq_rcpt->aqr_owner_idx - 1]; else sender = aq_ta->aqt_mail->aqm_pa; ret = sm_rcb_putv(rcb, RCB_PUTV_FIRST, SM_RCBV_INT, RT_PROT_VER, PROT_VER_RT, SM_RCBV_INT, RT_Q2C_ID, qsc_ctx->qsc_id, #ifdef RT_Q2C_DCID SM_RCBV_INT, RT_Q2C_DCID, 0, /* delivery class, cur. unused */ #endif SM_RCBV_INT, RT_Q2C_SE_FLAGS, se_flags, SM_RCBV_BUF, se_reuse ? (one_ta ? RT_Q2C_SEID_1TA : RT_Q2C_SEID) : (one_ta ? RT_Q2C_NSEID_1TA : RT_Q2C_NSEID), dadb_entry->dadbe_da_se_id, SMTP_STID_SIZE, SM_RCBV_INT, (0 == aq_rcpt->aqr_port) ? RT_NOSEND : RT_Q2C_SRVPORT, (uint32_t) aq_rcpt->aqr_port, SM_RCBV_INT, (DA_IDX_ESMTP == aq_rcpt->aqr_da_idx) ? RT_NOSEND : RT_Q2C_DA_IDX, aq_rcpt->aqr_da_idx, /* XXX HACK XXX */ SM_RCBV_INT, RT_Q2C_SRVIPV4, aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4, SM_RCBV_BUF, AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_DLY) ? RT_Q2C_NTAIDD : AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_BNC) ? RT_Q2C_NTAIDB : AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_DBNC) ? RT_Q2C_NTAIDDB : RT_Q2C_NTAID, dadb_entry->dadbe_da_ta_id, SMTP_STID_SIZE, SM_RCBV_BUF, RT_Q2C_SSTAID, aq_ta->aqt_ss_ta_id, SMTP_STID_SIZE, SM_RCBV_OFF, RT_Q2C_SIZE_B, aq_ta->aqt_msg_sz_b, SM_RCBV_STR, RT_Q2C_MAIL, sender, SM_RCBV_CSTR, cdb_rt, aq_ta->aqt_cdb_id, SM_RCBV_INT, RT_Q2C_TA_FLAGS, ta_flags, SM_RCBV_END); #if MTA_USE_TLS if (sm_is_success(ret) && sm_is_success(dadb_entry->dadbe_se_maprescnf) && dadb_entry->dadbe_se_conf != NULL) { ret = sm_rcb_putv(rcb, RCB_PUTV_NONE, SM_RCBV_INT, RT_Q2C_MAP_RES_CNF_SRV, dadb_entry->dadbe_se_maprescnf, SM_RCBV_STR, RT_Q2C_RHS_CNF_SRV, dadb_entry->dadbe_se_conf, SM_RCBV_END); } #endif /* MTA_USE_TLS */ if (sm_is_success(ret)) ret = sm_hdrmodl_wr(aq_ta->aqt_hdrmodhd, rcb, RT_Q2C_HM_T_P, RT_Q2C_HM_HDR); if (sm_is_success(ret)) ret = sm_rcb_putv(rcb, RCB_PUTV_NONE, SM_RCBV_INT, RT_Q2C_RCPT_IDX, aq_rcpt->aqr_idx, SM_RCBV_STR, RT_Q2C_RCPT, aq_rcpt->aqr_pa, SM_RCBV_END); #if MTA_USE_TLS if (sm_is_success(ret) && sm_is_success(aq_rcpt->aqr_maprescnf) && aq_rcpt->aqr_conf != NULL) { ret = sm_rcb_putv(rcb, RCB_PUTV_NONE, SM_RCBV_INT, RT_Q2C_MAP_RES_CNF_RCPT, aq_rcpt->aqr_maprescnf, SM_RCBV_STR, RT_Q2C_RHS_CNF_RCPT, aq_rcpt->aqr_conf, SM_RCBV_END); } #endif /* MTA_USE_TLS */ QM_LEV_DPRINTFC(QDC_SCHED, 1, (QM_DEBFP, "func=qm_to_sc_task, ss_ta=%s, cdb=%s, da_ta=%s, ta_flags=%#x, dsn=%d, ret=%r\n", aq_ta->aqt_ss_ta_id, sm_cstr_data(aq_ta->aqt_cdb_id), dadb_entry->dadbe_da_ta_id, aq_ta->aqt_flags, dsn, ret)); if (sm_is_err(ret)) goto error; if (dsn) { ret = sm_rcb_putv(rcb, RCB_PUTV_NONE, SM_RCBV_STR, RT_Q2C_B_MSG, aq_rcpt->aqr_dsn_msg, SM_RCBV_END); if (sm_is_err(ret)) goto error; AQ_TA_SET_FLAG(aq_ta, AQ_TA_FL_NO_BODY); } SM_STR_FREE(verp); return ret; error: SM_STR_FREE(verp); QM_LEV_DPRINTFC(QDC_SCHED, 0, (QM_DEBFP, "sev=ERROR, func=qm_to_sc_task, ret=%r\n", ret)); return ret; } #if QMGR_DEBUG > 1 void aq_rcpt_print(aq_rcpt_P aq_rcpt) { QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aq_rcpt=%p\n", aq_rcpt)); QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "ss_ta=%s\n", aq_rcpt->aqr_ss_ta_id)); QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "rcpt=%S\n", aq_rcpt->aqr_pa)); QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqr_idx=%u\n", aq_rcpt->aqr_idx)); QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqr_tries=%u\n", aq_rcpt->aqr_tries)); QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "srv_ipv4=%A\n", aq_rcpt->aqr_addrs == NULL ? -1 : aq_rcpt->aqr_addrs[0].aqra_ipv4)); QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqr_da_idx=%u\n", aq_rcpt->aqr_da_idx)); QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqr_st_time=%ld\n", aq_rcpt->aqr_st_time)); QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqr_last_try=%ld\n", aq_rcpt->aqr_last_try)); QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqr_next_try=%ld\n", aq_rcpt->aqr_next_try)); QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqr_status=%d\n", aq_rcpt->aqr_status)); QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqr_flags=%#x\n", aq_rcpt->aqr_flags)); QM_LEV_DPRINTFC(QDC_SCHED, 5, (QM_DEBFP, "aqr_ss_link.sm_rg_succ=%p\n", AQR_SS_SUCC(aq_rcpt))); QM_LEV_DPRINTFC(QDC_SCHED, 5, (QM_DEBFP, "aqr_ss_link.sm_rg_pred=%p\n", AQR_SS_PRED(aq_rcpt))); QM_LEV_DPRINTFC(QDC_SCHED, 5, (QM_DEBFP, "aqr_da_link.sm_rg_succ=%p\n", AQR_DA_SUCC(aq_rcpt))); QM_LEV_DPRINTFC(QDC_SCHED, 5, (QM_DEBFP, "aqr_da_link.sm_rg_pred=%p\n", AQR_DA_PRED(aq_rcpt))); } void aq_ta_print(aq_ta_P aq_ta) { QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aq_ta=%p\n", aq_ta)); QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "ss_ta=%s\n", aq_ta->aqt_ss_ta_id)); QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "mail=%S\n", aq_ta->aqt_mail->aqm_pa)); QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "time=%ld\n", (long) aq_ta->aqt_st_time)); QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "cdb=%s\n", sm_cstr_data(aq_ta->aqt_cdb_id))); QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "rcpts=%u\n", aq_ta->aqt_rcpts_inaq)); QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "rcpts_tot=%u\n", aq_ta->aqt_rcpts_tot)); QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "rcpts_left=%u\n", aq_ta->aqt_rcpts_left)); QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "rcpts_temp=%u\n", aq_ta->aqt_rcpts_temp)); QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "rcpts_perm=%u\n", aq_ta->aqt_rcpts_perm)); QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "state=%d\n", aq_ta->aqt_state)); QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "aqt_rcpts_ar=%d\n", aq_ta->aqt_rcpts_ar)); } #endif /* QMGR_DEBUG > 1 */ /* ** QM_TO_SC_ADD_RCPT -- add another recipient to a transaction for SMTPC ** ** Parameters: ** qsc_ctx -- QMGR SMTPC context (currently unused) ** aq_rcpt -- AQ recipient ** rcbe -- RCB entry ** ** Returns: ** usual sm_error code; see sm_rcb_putv() ** ** Side Effects: does not clean up rcb in case of an error. ** ** Last code review: 2005-04-04 21:05:04 ** Last code change: */ static sm_ret_T qm_to_sc_add_rcpt(qsc_ctx_P qsc_ctx, aq_rcpt_P aq_rcpt, sm_rcbe_P rcbe) { sm_rcb_P rcb; sm_ret_T ret; rcb = &rcbe->rcbe_rcb; ret = sm_rcb_putv(rcb, RCB_PUTV_NONE, SM_RCBV_INT, RT_Q2C_RCPT_IDX, aq_rcpt->aqr_idx, SM_RCBV_STR, RT_Q2C_RCPT, aq_rcpt->aqr_pa, SM_RCBV_END); #if MTA_USE_TLS if (sm_is_success(ret) && sm_is_success(aq_rcpt->aqr_maprescnf) && aq_rcpt->aqr_conf != NULL) { ret = sm_rcb_putv(rcb, RCB_PUTV_NONE, SM_RCBV_INT, RT_Q2C_MAP_RES_CNF_RCPT, aq_rcpt->aqr_maprescnf, SM_RCBV_STR, RT_Q2C_RHS_CNF_RCPT, aq_rcpt->aqr_conf, SM_RCBV_END); } #endif /* MTA_USE_TLS */ if (sm_is_err(ret)) goto error; return ret; error: QM_LEV_DPRINTFC(QDC_SCHED, 0, (QM_DEBFP, "sev=ERROR, func=qm_to_sc_add_rcpt, ret=%r\n", ret)); return ret; } /* ** QMGR_FIND_DA -- Find an available DA ** NOTE: this requires that all DAs can do all kind of deliveries! ** ** Parameters: ** qmgr_ctx -- QMGR context ** pqsc_bit -- (pointer to) QSC bit (output) ** pqsc_ctx -- (pointer to) QSC context (output) ** if this points to a valid qsc_ctx: don't return the ** same one. ** pda_avail -- (pointer to) how many entries are free? (output) ** pda_idle -- (pointer to) how many entries are idle? (output) ** ** Returns: ** usual sm_error code; SM_E_NO_DA, (un)lock ** ** Locking: locks qmgr_ctx ** ** Last code review: 2005-04-04 22:07:13 ** Last code change: 2006-01-29 05:03:32 */ static sm_ret_T qmgr_find_da(qmgr_ctx_P qmgr_ctx, uint32_t *pqsc_bit, qsc_ctx_P *pqsc_ctx, uint *pda_avail, uint *pda_idle) { int i, r; uint32_t j; uint da_avail, da_idle; sm_ret_T ret; qsc_ctx_P qsc_ctx; SM_REQUIRE(pqsc_ctx != NULL); SM_REQUIRE(pqsc_bit != NULL); *pqsc_bit = 0; r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex); SM_LOCK_OK(r); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SMTPS, QM_LMOD_FROM_SMTPS, SM_LOG_CRIT, 4, "sev=CRIT, func=qmgr_find_da, lock=%d\n", r); return sm_error_temp(SM_EM_Q_SCHED, r); } ret = sm_error_warn(SM_EM_Q_SCHED, SM_E_NO_DA); /* XXX */ for (j = 1, i = 0; i < QM_N_SC_GLI(qmgr_ctx); i++, j *= 2) { if ((qmgr_ctx->qmgr_sc_li.qm_gli_used & j) != 0 && (qsc_ctx = qmgr_li_sc(qmgr_ctx, i)) != NULL && qsc_ctx->qsc_com.rcbcom_tsk != NULL && QSC_IS_RUNNING(qsc_ctx) && qsc_ctx != *pqsc_ctx ) { /* check whether the DA has free capacity */ (void) dadb_entry_avail(qsc_ctx->qsc_dadb_ctx, &da_avail, &da_idle, THR_LOCK_UNLOCK); if (da_avail > 0 || da_idle > 0) { if (pda_avail != NULL) *pda_avail = da_avail; if (pda_idle != NULL) *pda_idle = da_idle; *pqsc_ctx = qsc_ctx; *pqsc_bit = j; ret = SM_SUCCESS; break; } } } r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex); SM_ASSERT(r == 0); SM_ASSERT(ret != SM_SUCCESS || *pqsc_ctx != NULL); if (r != 0 && sm_is_success(ret)) ret = sm_error_perm(SM_EM_Q_SCHED, r); return ret; } /* ** SCHED_MV2NXTMX -- Maybe move rcpt to next MX (if same priority) ** ** Parameters: ** qmgr_ctx -- QMGR context ** aq_ctx -- AQ context ** aq_rcpt -- AQ recipient ** time_now -- current time ** ** Returns: ** usual sm_error code; ENOMEM, SM_E_UNEXPECTED et.al. ** ** Locking: aq_ctx must be locked ** ** Side Effects: may remove aq_rcpt from RDQ without adding it ** to next one due to resource problem ** ** Last code review: 2005-04-04 22:19:52 ** Last code change: */ static sm_ret_T sched_mv2nxtmx(qmgr_ctx_P qmgr_ctx, aq_ctx_P aq_ctx, aq_rcpt_P aq_rcpt, time_T time_now) { sm_ret_T ret; uint addr_cur; ret = SM_SUCCESS; addr_cur = aq_rcpt->aqr_addr_cur; if (AQR_MORE_ADDR(aq_rcpt) && aq_rcpt->aqr_addrs[addr_cur].aqra_pref == aq_rcpt->aqr_addrs[addr_cur + 1].aqra_pref && aq_rcpt->aqr_addrs[addr_cur].aqra_expt >= time_now) { ret = aq_rdq_rm(aq_ctx, aq_rcpt, THR_NO_LOCK, qmgr_ctx->qmgr_lctx); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_ERR, 1, "sev=ERROR, func=sched_mv2nxtmx, aq_rdq_rm=%m, flags=%#x", ret, aq_rcpt->aqr_flags); return ret; } aq_rcpt->aqr_addr_cur++; ret = aq_rdq_add(aq_ctx, aq_rcpt, NULL /* &aqrdq_flags */, THR_NO_LOCK); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_ERR, 1, "sev=ERROR, func=sched_mv2nxtmx, aq_rdq_add=%m, flags=%#x", ret, aq_rcpt->aqr_flags); return ret; } } return ret; } /* ** SCHED2LONG -- AQ rcpt takes too long to schedule ** ** Parameters: ** qmgr_ctx -- QMGR context ** aq_ctx -- AQ context ** aq_rcpt -- AQ recipient ** time_now -- current time ** ** Returns: ** usual sm_error code ** ** Locking: aq_ctx must be locked ** ** Last code review: ** Last code change: */ static sm_ret_T sched2long(qmgr_ctx_P qmgr_ctx, aq_ctx_P aq_ctx, aq_rcpt_P aq_rcpt, time_T time_now) { sm_ret_T ret; ret = SM_SUCCESS; sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_ERR, 1, "sev=ERROR, func=sched2long, ss_ta=%s, idx=%u, now-entered=%ld, status=timeout_in_scheduler" , aq_rcpt->aqr_ss_ta_id , aq_rcpt->aqr_idx , (long) (time_now - aq_rcpt->aqr_entered)); /* ** XXX AND NOW? ** what's the simplest way to remove this from AQ? ** move it from rcpt dest queue to wait queue and ** "cheat" on the timeout ("immediate")? */ ++aq_ctx->aq_t_da; AQR_DA_INIT(aq_rcpt); AQR_SET_FLAG(aq_rcpt, AQR_FL_SCHEDF); /* Was: rdq from todo to busy? check cleanup code (and how rdq is used) */ ret = aq_rdq_rm(aq_ctx, aq_rcpt, THR_NO_LOCK, qmgr_ctx->qmgr_lctx); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_ERR, 1, "sev=ERROR, func=sched2long, aq_rdq_rm=%m, flags=%#x", ret, aq_rcpt->aqr_flags); return ret; } /* ??? use DA waitq for this? or yet another queue??? */ ret = aq_waitq_add(aq_ctx, aq_rcpt, time_now, AQWQ_DA, THR_NO_LOCK); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_ERR, 1, "sev=ERROR, func=sched2long, aq_waitq_add=%m", ret); return ret; } ret = qmgr_set_aq_cleanup(qmgr_ctx->qmgr_cleanup_ctx, time_now, true); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_ERR, 1, "sev=ERROR, func=sched2long, qmgr_set_aq_cleanup=%m", ret); return ret; } return ret; } /* ** SAMEDOMAINOK -- Are all recipients for the same domain ready for scheduling? ** ** Parameters: ** aq_rcpt -- AQ recipient ** defaultdomain -- domain to use if recipient doesn't have one ** ** Returns: ** true: all ok ** false: at least one is not yet ready ** ** Locking: aq_ctx must be locked ** ** Last code review: 2005-04-04 23:49:13 ** Last code change: 2006-06-11 04:14:31 */ static bool samedomainok(aq_rcpt_P aq_rcpt, sm_str_P defaultdomain) { aq_rcpt_P aq_rcpt_nxt; bool ok; /* ** Check whether all recipients with the same domain ** are ready for scheduling. ** Notes: ** 1. it is necessary to look in both directions. ** this assumes that recipients for the same domain ** are treated "similar" by smar; it is not required ** that they actually have the same destination, ** but if it takes much longer for some of the ** recipients to determine the resolved addresses ** than for others (even with the same domain! i.e., ** routing based on other criteria), then this may ** add unnecessary delays. ** 2. this function requires that the rcpts are sorted by domain. */ ok = true; for (aq_rcpt_nxt = AQR_SS_SUCC(aq_rcpt); ok && aq_rcpt_nxt != aq_rcpt && aq_rcpt_eq_domain(aq_rcpt, aq_rcpt_nxt, defaultdomain) == SM_SUCCESS; aq_rcpt_nxt = AQR_SS_SUCC(aq_rcpt_nxt)) { ok = AQR_IS_FLAG(aq_rcpt_nxt, AQR_FL_RDY4DLVRY); } for (aq_rcpt_nxt = AQR_SS_PRED(aq_rcpt); ok && aq_rcpt_nxt != aq_rcpt && aq_rcpt_eq_domain(aq_rcpt, aq_rcpt_nxt, defaultdomain) == SM_SUCCESS; aq_rcpt_nxt = AQR_SS_PRED(aq_rcpt_nxt)) { ok = AQR_IS_FLAG(aq_rcpt_nxt, AQR_FL_RDY4DLVRY); } return ok; } /* ** QMGR_RPCTS_TA -- return max number of recipients for this transaction ** ** Parameters: ** qmgr_ctx -- QMGR context ** aq_rcpt -- AQ recipient ** ** Returns: ** max number of recipients for this transaction */ static uint qmgr_rpcts_ta(qmgr_ctx_P qmgr_ctx, aq_rcpt_P aq_rcpt) { SM_IS_QMGR_CTX(qmgr_ctx); SM_IS_AQ_RCPT(aq_rcpt); if (aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4 == LMTP_IPV4_ADDR) return qmgr_ctx->qmgr_cnf.q_cnf_lmtp_rcpts_ta; return qmgr_ctx->qmgr_cnf.q_cnf_smtp_rcpts_ta; } /* ** QM_SESS_KEEP_OPEN -- is it useful to keep a session open? ** ** Parameters: ** qmgr_ctx -- QMGR context ** aq_ta -- AQ transaction ** aq_rcpt -- AQ recipient ** max_rcpts_ta -- max recipients per transaction ** ** Returns: ** true iff is it useful to keep a session open */ static bool qm_sess_keep_open(qmgr_ctx_P qmgr_ctx, aq_ta_P aq_ta, aq_rcpt_P aq_rcpt, uint max_rcpts_ta) { uint nrcpts; aq_rcpt_P aq_rcpt_nxt; if (!QCNF_IS_FLAG(qmgr_ctx, QCNF_FL_SE_REUSE)) return false; aq_rcpt_nxt = AQ_RDQ_NEXT(aq_rcpt); if (aq_rcpt_nxt == NULL || aq_rcpt_nxt == aq_rcpt) return false; for (nrcpts = 0; aq_rcpt_nxt != NULL && aq_rcpt_nxt != aq_rcpt && nrcpts < max_rcpts_ta; aq_rcpt_nxt = AQ_RDQ_NEXT(aq_rcpt_nxt)) { if (AQ_TA_IS_FLAG(aq_ta, AQ_TA_FL_VERP) && AQR_IS_FLAG(aq_rcpt, AQR_FL_HAS_VERP) && SAME_TRANSACTION(aq_rcpt, aq_rcpt_nxt)) return false; ++nrcpts; } return true; } /* ** QMGR_SCHED_DLVRY -- simple scheduler... ** ** Parameters: ** qmgr_ctx -- QMGR context ** aq_ctx -- AQ context ** pqsc_bits -- (bitmask) which DAs need to be started (output) ** pdelay_next_try -- (pointer to) delay until next try (output) ** ** Returns: ** usual sm_error code ** ** Locking: aq_ctx must be locked ** ** Todo: this is not very sophisticated. ** split this in multiple functions? it's too long... ** ** Last code review: ** Last code change: */ /* ** Delay for scheduler after encountering too many open connections. ** The scheduler should be woken up after a TA close RCB is received ** from a DA for which a TA is waiting. This data should be recorded ** in the DA connection cache. */ #define DELAY_TOO_MANY 300 #define SET_DELAY_NEXT_TRY(d, where) \ do \ { \ if ((d) < *pdelay_next_try || *pdelay_next_try == 0) \ { \ *pdelay_next_try = (d); \ QM_LEV_DPRINTFC(QDC_SCHED, 5, (QM_DEBFP, "func=qmgr_sched_dlvry, where=%s, delay=%d\n", (where), (d))); \ } \ } while (0) static sm_ret_T qmgr_sched_dlvry(qmgr_ctx_P qmgr_ctx, aq_ctx_P aq_ctx, uint32_t *pqsc_bits, int *pdelay_next_try) { #undef SMFCT #define SMFCT "qmgr_sched_dlvry" sm_ret_T ret, res; aq_rcpt_P aq_rcpt, aq_rcpt_nxt; aq_ta_P aq_ta; qsc_ctx_P qsc_ctx; sm_rcbe_P rcbe; dadb_entry_P dadb_entry; aq_rsnd_ctx_P aq_rsnd_ctx; uint i, nrcpts, connlimit, conncur, max_rcpts_ta, da_avail, da_idle; int r; uint32_t qsc_bit; bool stopit, se_reuse, se_keep_open, occe_locked; time_T time_now; sessta_id_T da_ta_id; occ_entry_P occ_entry; aqrdq_ctx_P aqrdq; uint todo_entries, rdq_entries; /* ** Macro to move rcpt to end of list unless it's there already. */ #define SM_MV_RCPT2END \ if (aq_rcpt == AQ_RDQ_LAST(aqrdq->aqrdq_rcpts)) \ break; \ AQ_RDQ_REMOVE(aqrdq->aqrdq_rcpts, aq_rcpt); \ AQ_RDQ_INSERT_TAIL(aqrdq->aqrdq_rcpts, aq_rcpt); \ continue #define QM_SCHED_RCBE_FREE \ while (rcbe != NULL) \ { \ aq_rsnd_ctx_free(aq_rsnd_ctx); \ aq_rsnd_ctx = NULL; \ sm_rcbe_free(rcbe); \ rcbe = NULL; \ } SM_IS_QMGR_CTX(qmgr_ctx); SM_IS_AQ(aq_ctx); SM_REQUIRE(pqsc_bits != NULL); SM_REQUIRE(pdelay_next_try != NULL); *pqsc_bits = 0; da_ta_id[0] = '\0'; rcbe = NULL; aq_rsnd_ctx = NULL; occe_locked = stopit = false; max_rcpts_ta = INT_MAX / 2; /* ** Do we have a DA? Just one... ** Note: this may sleep() while holding AQ mutex, but only ** if there was never a DA available before (QMGR_SFL_HAD_DA), ** i.e., during startup. */ qsc_ctx = NULL; while (!sm_is_success(ret = qmgr_find_da(qmgr_ctx, &qsc_bit, &qsc_ctx, NULL, NULL))) { if (qmgr_ctx->qmgr_st_time + qmgr_ctx->qmgr_cnf.q_cnf_wait4clt <= evthr_time(qmgr_ctx->qmgr_ev_ctx) || QMGR_IS_SFLAG(qmgr_ctx, QMGR_SFL_HAD_DA)) { if (!QMGR_IS_SFLAG(qmgr_ctx, QMGR_SFL_DA)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_WARN, 8, "sev=WARN, func=qmgr_sched_dlvry, status=no delivery agent available"); QMGR_SET_SFLAG(qmgr_ctx, QMGR_SFL_DA); } qmgr_ctx->qmgr_tm_no_da = evthr_time(qmgr_ctx->qmgr_ev_ctx); return sm_error_warn(SM_EM_Q_SCHED, SM_E_NO_DA); } sleep(1); } if (!QMGR_IS_SFLAG(qmgr_ctx, QMGR_SFL_HAD_DA)) QMGR_SET_SFLAG(qmgr_ctx, QMGR_SFL_HAD_DA); QMGR_CLR_SFLAG(qmgr_ctx, QMGR_SFL_DA); SM_IS_QSC_CTX(qsc_ctx); QM_LEV_DPRINTFC(QDC_SCHED, 7, (QM_DEBFP, "func=qmgr_sched_dlvry, qsc_ctx=%p\n", qsc_ctx)); SM_IS_EVTHR_TSK(qsc_ctx->qsc_com.rcbcom_tsk); #if 0 // r = pthread_mutex_lock(&qsc_ctx->qsc_mutex); // SM_LOCK_OK(r); // if (r != 0) // { // sm_log_write(qmgr_ctx->qmgr_lctx, // QM_LCAT_SCHED, QM_LMOD_SCHED, // SM_LOG_CRIT, 1, // "sev=CRIT, func=qmgr_sched_dlvry, lock_qsc=%d", r); // } // else // { // /* XXX might be able to use an existing session */ // if (qsc_ctx->qsc_curactive >= qsc_ctx->qsc_maxthreads // && !QCNF_IS_FLAG(qsc_ctx->qsc_qmgr_ctx, QCNF_FL_SE_REUSE)) // { //QM_LEV_DPRINTFC(QDC_SCHED, 0, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, where=1, qsc_ctx=%p, curactive=%d >= max=%d\n", //qsc_ctx, qsc_ctx->qsc_curactive, qsc_ctx->qsc_maxthreads)); // r = pthread_mutex_unlock(&qsc_ctx->qsc_mutex); // SM_ASSERT(r == 0); // return sm_error_warn(SM_EM_Q_SCHED, SM_E_NO_DA_FREE); // } // r = pthread_mutex_unlock(&qsc_ctx->qsc_mutex); // SM_ASSERT(r == 0); // } #endif /* 0 */ ret = SM_SUCCESS; time_now = evthr_time(qmgr_ctx->qmgr_ev_ctx); QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "func=qmgr_sched_dlvry, rdq_entries=%d\n", aq_ctx->aq_rdq_used)); /* ** Go through all rcpt destination queues. ** ** Note: currently this code assumes that all recipients can be ** delivered by all delivery agents. ** Question: what "breaks" if different delivery agents or different ** delivery classes need to be selected? ** Is is necessary to have rcpt destination queues per delivery class? ** Is is necessary to have delivery superclasses, i.e., sets of ** delivery classes which are implemented by the same set of DAs ** (such that it doesn't make a difference if any of the delivery ** classes inside a superclass is selected because they all share ** the same DAs)? */ for (rdq_entries = aq_ctx->aq_rdq_used; rdq_entries > 0; rdq_entries--) { /* ** Take the head of the list, and "rotate" it to the end. ** If there would be a "rotate" operation, then this wouldn't ** be necessary unless a problem occurs in which case the ** current entry would become the first element of the list ** to achieve some kind of fairness. */ aqrdq = AQ_RDQS_FIRST(aq_ctx->aq_rdqs); AQ_RDQS_REMOVE(aq_ctx->aq_rdqs, aqrdq); AQ_RDQS_INSERT_TAIL(aq_ctx->aq_rdqs, aqrdq); AQRDQ_CLR_FLAG(aqrdq, AQRDQ_FL_OCEXC); todo_entries = aqrdq->aqrdq_entries; QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "func=qmgr_sched_dlvry, todo_entries=%d\n", todo_entries)); /* ** Note: the loop construct is a problem: ** 1. aq_rcpt might be removed (most likely) ** hence AQ_RDQ_NEXT(aq_rcpt) can't be done inside ** the "for" statement. ** 2. aq_rcpt_nxtl might be removed from the TODO queue too ** by the "add more recipients to this transaction" code. ** Possible solutions: ** 1. when removing a recipient from the TODO list, check it ** against aq_rcpt_nxtl: if it is the same: go to the next. ** 2. check whether aq_rcpt_nxtl is still in TODO list, ** if not: what then? we can't go to the next entry: this ** isn't in the right list... ** ** It seems the loop should be restructured: ** all entries in the TODO list are ready for delivery, ** hence after each iteration the head of the list should ** have been removed from the list, i.e., ** while (!empty) { rcpt = head(todo_list); schedule } ** ** However, even that is incorrect because some recipients ** might not be scheduled despite being in the todo list. ** See below for two cases. ** The hack applied in those cases (move the item to the ** end of the list) doesn't work if this happens more than ** once in a list. Hence this restriction must be used ** unless a better way, e.g., a flag, can be found. ** ** The current solution: ** use a for loop restricted by the number of todo entries ** always access the first element of the list ** rcpts that aren't scheduled must be moved to the end ** stop if the list is empty ** this can happen if the scheduler adds more rcpts ** inside the loop to a transaction */ /* go through all "todo" rcpts for each rcpt dest queue */ for (i = 0; i < todo_entries && !stopit && !AQ_RDQ_EMPTY(aqrdq->aqrdq_rcpts); i++) { da_avail = da_idle = 0; aq_rcpt = AQ_RDQ_FIRST(aqrdq->aqrdq_rcpts); SM_IS_AQ_RCPT(aq_rcpt); #if QMGR_TEST if (SM_IS_FLAG(qmgr_ctx->qmgr_cnf.q_cnf_tests, QMGR_TEST_DLY_SCHED) && AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_DLY)) { SM_ASSERT(false); /* abort */ } #endif aq_ta = aq_rcpt->aqr_ss_ta; QM_LEV_DPRINTFC(QDC_SCHED, 5, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, aq_rcpt=%p, aqr_pa=%S, ss_ta=%s, idx=%u, aqr_flags=%#x, sched=%d, aq_ta=%p\n", aq_rcpt, aq_rcpt->aqr_pa, aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_idx, aq_rcpt->aqr_flags, AQR_SCHEDULE(aq_rcpt), aq_ta)); #if QMGR_DEBUG > 1 aq_rcpt_print(aq_rcpt); #endif /* QMGR_DEBUG > 1 */ SM_IS_AQ_TA(aq_ta); #if QMGR_DEBUG > 1 aq_ta_print(aq_ta); #endif SM_ASSERT(!AQR_IS_FLAG(aq_rcpt, AQR_FL_WAITQ_AR)); SM_ASSERT(!AQR_IS_FLAG(aq_rcpt, AQR_FL_TMOUT|AQR_FL_SCHED|AQR_FL_PERM)); SM_ASSERT(!AQR_IS_DSNFL(aq_rcpt, AQR_DSNFL_F_HBG)); SM_ASSERT(AQR_SCHEDULE(aq_rcpt)); if (AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_DSN)) QM_LEV_DPRINTFC(QDC_SCHED, 5, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, aq_rcpt=%p, aq_rcpt_pa=%S, idx=%u, type=bounce, tried=%u, total=%u, now-entered=%ld\n", aq_rcpt, aq_rcpt->aqr_pa, aq_rcpt->aqr_idx, aq_ta->aqt_rcpts_tried, aq_ta->aqt_rcpts_tot, (long) (time_now - aq_rcpt->aqr_entered))); /* ** Note: the timeout must be significantly larger ** than the SMTPC delivery timeout! ** Check whether the item itself is too old: ** this can cause problems if there are many ** destinations that are unavailable (many delivery ** attempts) ** and check whether the time since the last ** scheduling attempt is exceeded too. ** Should this check the time since the last ** "status update" (e.g., scheduled, result from DA ** result from AR, others?) */ if (aq_rcpt->aqr_entered + qmgr_ctx->qmgr_tmo_sched < time_now && aq_rcpt->aqr_last_try + qmgr_ctx->qmgr_tmo_sched < time_now && qmgr_ctx->qmgr_tm_no_da + qmgr_ctx->qmgr_tmo_sched < time_now ) { /* ** This is taking too long... ** something must be wrong. */ ret = sched2long(qmgr_ctx, aq_ctx, aq_rcpt, time_now); if (sm_is_err(ret)) { stopit = true; break; } continue; } /* ** If this is a (double)bounce then skip it if ** - there aren't enough tried recipients (XXX?) ** or ** - the timeout hasn't occurred yet. ** ** Note: this is one place where a recipient can be in ** the todo queue but it is skipped by the scheduler. ** See also next if statement. */ if (AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_DSN) && !(aq_ta->aqt_rcpts_tot - aq_ta->aqt_rcpts_tried <= (aq_ta_has_bounce(aq_ta) ? 1 : 0) + (aq_ta_has_dbl_bounce(aq_ta) ? 1 : 0) || aq_rcpt->aqr_entered + qmgr_ctx->qmgr_cnf.q_cnf_t_sched_dsn < time_now) ) { int d; d = aq_rcpt->aqr_entered + qmgr_ctx->qmgr_cnf.q_cnf_t_sched_dsn - time_now; if (d == 0) d = 1; /* need a minimum delay */ SET_DELAY_NEXT_TRY(d, "bounce/timeout"); SM_MV_RCPT2END; } /* ** XXX Hack: need ** no more recipients to receive from AR ** or #if 0 ** (at least half of the recipients resolved and ** current number of entries greater than ** (entries being delivered / 16)) #else * 0 * ** the number of recipients is less than 10 ** (i.e., for small numbers all recipients must be ** resolved before continuing). #endif * 0 * ** ** This is only a hack anyway, we need a real ** scheduler... ** ** Note: aqt_rcpts_ar counts also bounce recipients ** that have been sent to AR! Should it do that? ** ** Note: this is another place where a recipient can ** be in the todo queue but it is skipped by the ** scheduler. See also previos if statement. */ QM_LEV_DPRINTFC(QDC_SCHED, 6, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, aqr_pa=%S, status=%d, aqt_rcpts_ar=%d, aqt_rcpts_inaq=%d, aq_entries=%d, t_da=%d, skip=%d\n", aq_rcpt->aqr_pa, aq_rcpt->aqr_status, aq_ta->aqt_rcpts_ar, aq_ta->aqt_rcpts_inaq, aq_ctx->aq_entries, aq_ctx->aq_t_da, !((aq_ta->aqt_rcpts_ar == 0) || (aq_ta->aqt_rcpts_ar <= (aq_ta->aqt_rcpts_inaq >> 1) && aq_ta->aqt_rcpts_inaq > 10)))); if (!((aq_ta->aqt_rcpts_ar == 0) || (aq_ta->aqt_rcpts_ar <= (aq_ta->aqt_rcpts_inaq >> 1) #if 0 && (aq_ctx->aq_entries > (aq_ctx->aq_t_da >> 4)) #else && aq_ta->aqt_rcpts_inaq > 10 #endif ) ) ) { SM_MV_RCPT2END; } /* ** Check whether all recipients with the same domain ** are ready for scheduling. */ if (!samedomainok(aq_rcpt, qmgr_ctx->qmgr_hostname)) { QM_LEV_DPRINTFC(QDC_SCHED, 5, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, aqr_pa=%S, status=skipped_because_other_entry_is_not_ready\n", aq_rcpt->aqr_pa)); SM_MV_RCPT2END; } /* check whether current DA can be used */ ret = dadb_entry_avail(qsc_ctx->qsc_dadb_ctx, &da_avail, &da_idle, THR_LOCK_UNLOCK); if (da_avail <= 0 && da_idle <= 0) { /* find another DA */ ret = qmgr_find_da(qmgr_ctx, &qsc_bit, &qsc_ctx, &da_avail, &da_idle); if (sm_is_err(ret)) { if (sm_error_warn(SM_EM_Q_SCHED, SM_E_NO_DA) == ret) qmgr_ctx->qmgr_tm_no_da = time_now; #if 0 QM_LEV_DPRINTFC(QDC_SCHED, 0, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, where=3, qsc_ctx=%p, curactive=%d >= max=%d\n", qsc_ctx, qsc_ctx->qsc_curactive, qsc_ctx->qsc_maxthreads)); #else /* 0 */ QM_LEV_DPRINTFC(QDC_SCHED, 0, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, where=3, qsc_ctx=%p, status=no_free_DA\n", qsc_ctx)); #endif /* 0 */ stopit = true; break; } QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, switch, qsc_ctx=%p\n", qsc_ctx)); } #if 0 // /* XXX What's this? 2003-11-12 */ // ret = aq_ta_find(aq_ctx, aq_rcpt->aqr_ss_ta_id, false, &aq_ta); // if (sm_is_err(ret)) { // /* COMPLAIN */ // QM_LEV_DPRINTFC(QDC_SCHED, 0, (QM_DEBFP, "sev=ERROR, func=qmgr_sched_dlvry, can't find ta for ss_ta=%s, ret=%r\n", // aq_rcpt->aqr_ss_ta_id, ret)); // continue; // } #endif /* 0 */ /* add delivery information to SMTPC RCB entry */ ret = qm_rcbe_new(qmgr_ctx, &rcbe, -1); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_ERR, 1, "sev=ERROR, func=qmgr_sched_dlvry, qm_rcbe_new=%m", ret); stopit = true; break; } /* create callback context for RCB send */ ret = aq_rsnd_ctx_new(aq_ctx, aq_rcpt, &aq_rsnd_ctx); if (sm_is_err(ret) || NULL == aq_rsnd_ctx) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_ERR, 1, "sev=ERROR, func=qmgr_sched_dlvry, aq_rsnd_ctx_new=%m", ret); sm_rcbe_free(rcbe); rcbe = NULL; stopit = true; break; } sm_rcbe_setcb(rcbe, aq_rcptsent2da, (void *)aq_rsnd_ctx); dadb_entry = NULL; occ_entry = NULL; connlimit = qmgr_ctx->qmgr_cnf.q_cnf_max_conc_conn; conncur = 0; /* Need to keep occ_entry locked! */ res = occ_entry_find(qmgr_ctx->qmgr_occ_ctx->occx_ht, aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4, &occ_entry, &qmgr_ctx->qmgr_occ_ctx->occx_mutex, THR_LOCK_UNLERR); if (SM_SUCCESS == res && occ_entry != NULL) { occe_locked = true; SM_ASSERT(occ_entry->occe_open_se >= occ_entry->occe_open_ta); connlimit = occ_entry->occe_cur_conc; conncur = occ_entry->occe_open_se; if (conncur >= connlimit && occ_entry->occe_open_se == occ_entry->occe_open_ta) { AQRDQ_SET_FLAG(aqrdq, AQRDQ_FL_OCEXC); OCCE_SET_FLAG(occ_entry, OCCE_FL_SE_WAIT); if (QCNF_IS_FLAG(qmgr_ctx, QCNF_FL_SE_REUSE)) { /* always? */ OCCE_SET_FLAG(occ_entry, OCCE_FL_TA_WAIT); } #if 0 // r = DELAY_TOO_MANY; // SET_DELAY_NEXT_TRY(DELAY_TOO_MANY, "exceeded"); #else /* 0 */ r = occ_entry->occe_last_upd + occ_entry->occe_timeout - time_now; QM_LEV_DPRINTFC(QDC_SCHED, 2, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, occ=limit_exceeded, last_upd=%ld, timeout=%u, now=%ld, r=%d\n", (long) occ_entry->occe_last_upd, occ_entry->occe_timeout, (long) time_now, r)); if (r <= 0) r = DELAY_TOO_MANY; SET_DELAY_NEXT_TRY(r, "exceeded"); #endif /* 0 */ sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_INFO, OCCE_IS_FLAG(occ_entry, OCCE_FL_LOGEXC) ? 14 : 11, "sev=INFO, func=qmgr_sched_dlvry, ss_ta=%s, idx=%d, ip=%A, open_connections=%d, cur_limit=%u, status=limit_exceeded, timeout=%d", aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_idx, aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4, occ_entry->occe_open_se, occ_entry->occe_cur_conc, r); OCCE_SET_FLAG(occ_entry, OCCE_FL_LOGEXC); r = smthread_mutex_unlock(&qmgr_ctx->qmgr_occ_ctx->occx_mutex); SM_ASSERT(r == 0); if (r == 0) occe_locked = false; /* try next MX for this rcpt? */ ret = sched_mv2nxtmx(qmgr_ctx, aq_ctx, aq_rcpt, time_now); QM_SCHED_RCBE_FREE; break; } #if 0 // else if (occ_entry->occe_open_se > occ_entry->occe_open_ta) // { //sm_log_write(qmgr_ctx->qmgr_lctx, //QM_LCAT_SCHED, QM_LMOD_SCHED, //SM_LOG_INFO, 11, //"func=qmgr_sched_dlvry, ip=%A, open_se=%u, open_ta=%u, dadbe=%p, flags=%#x" //, aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4 //, occ_entry->occe_open_se //, occ_entry->occe_open_ta //, occ_entry->occe_dadbe //, occ_entry->occe_dadbe->dadbe_flags //); // /* first entry should be an available session */ // if (DADBE_IS_CONN(occ_entry->occe_dadbe)) // { // dadb_entry = occ_entry->occe_dadbe; // res = occ_entry->occe_open_se; // } // else //QM_LEV_DPRINTFC(QDC_SCHED, 1, (QM_DEBFP, "sev=ERROR, func=qmgr_sched_dlvry, ip=%A, occ_entry=%p, dadb_entry=%p, open_se=%u, open_ta=%u, flags=%#x, status=expected_available_session\n" //, aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4 //, occ_entry, dadb_entry //, occ_entry->occe_open_se, occ_entry->occe_open_ta //, occ_entry->occe_dadbe->dadbe_flags //)); // } #endif /* 0 */ else { OCCE_CLR_FLAG(occ_entry, OCCE_FL_LOGEXC); } } /* ** move into previous if-block: an open session can only be found if ** occ_entry_find() found an entry. */ if (NULL == dadb_entry) { /* ** Find a DA that has a session open ** to the required IP addr */ res = dadb_se_find_by_ipv4(qsc_ctx->qsc_dadb_ctx, aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4, time_now, &dadb_entry); } sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_INFO, 15, "func=qmgr_sched_dlvry, ip=%A, open_connections=%d, dadb_entry=%p", aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur]. aqra_ipv4, res, dadb_entry); /* restrict number of connections */ if (dadb_entry == NULL && ((res > 0 && (uint)res >= connlimit) || conncur >= connlimit)) { r = smthread_mutex_unlock(&qmgr_ctx->qmgr_occ_ctx->occx_mutex); SM_ASSERT(r == 0); if (r == 0) occe_locked = false; sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_INFO, 13, "func=qmgr_sched_dlvry, ip=%A, open_connections=%d, status=limit_exceeded", aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4, conncur > 0 ? conncur : res); SET_DELAY_NEXT_TRY(DELAY_TOO_MANY, "exceeded2"); break; } se_reuse = res > 0 && dadb_entry != NULL && QCNF_IS_FLAG(qmgr_ctx, QCNF_FL_SE_REUSE); if (se_reuse) { SM_ASSERT(occe_locked); /* let's reuse this ... */ ret = dadb_sess_reuse(qsc_ctx, qsc_ctx->qsc_dadb_ctx, aq_rcpt->aqr_ss_ta_id, dadb_entry, occ_entry, aq_rcpt); r = smthread_mutex_unlock(&qmgr_ctx->qmgr_occ_ctx->occx_mutex); SM_ASSERT(r == 0); if (r == 0) occe_locked = false; sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_INFO, 9, "func=qmgr_sched_dlvry, open_connections=%d, ip=%A, dadb_sess_reuse=%m", conncur > 0 ? conncur : res, aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4, ret); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_ERR, 1, "sev=ERROR, func=qmgr_sched_dlvry, dadb_sess_reuse=%m", ret); /* ** if session can't be reused: check ** whether connection limit is exceeded */ if ((uint)res >= connlimit || conncur >= connlimit) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_INFO, 11, "func=qmgr_sched_dlvry, ip=%A, open_connections=%d, status=session_cannot_be_reused+limit_exceeded", aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4, res); SET_DELAY_NEXT_TRY(DELAY_TOO_MANY, "exceeded3"); break; } } } else if (occe_locked) { r = smthread_mutex_unlock(&qmgr_ctx->qmgr_occ_ctx->occx_mutex); SM_ASSERT(r == 0); if (r == 0) occe_locked = false; } /* can't reuse existing connection? try a new one */ if ((res == 0 || dadb_entry == NULL || sm_is_err(ret)) && da_avail > 0) { se_reuse = false; /* open a DA session */ ret = dadb_sess_open(qsc_ctx, qsc_ctx->qsc_dadb_ctx, aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4, aq_rcpt, &dadb_entry); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_ERR, 1, "sev=ERROR, func=qmgr_sched_dlvry, qsc_ctx=%p, dadb_sess_open=%m", qsc_ctx, ret); #if QMGR_DEBUG { dadb_entry_P dadb_entryh; dadb_ctx_P dadb_ctx; uint l; dadb_ctx = qsc_ctx->qsc_dadb_ctx; #if 0 QM_LEV_DPRINTFC(QDC_SCHED, 0, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, where=4, qsc_ctx=%p, curactive=%d, max=%d\n", qsc_ctx, qsc_ctx->qsc_curactive, qsc_ctx->qsc_maxthreads)); #else /* 0 */ QM_LEV_DPRINTFC(QDC_SCHED, 0, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, where=4, qsc_ctx=%p, entries_cur=%d, entries_max=%d\n", qsc_ctx, dadb_ctx->dadb_entries_cur, dadb_ctx->dadb_entries_max)); #endif /* 0 */ for (l = 0; l < dadb_ctx->dadb_entries_max; l++) { dadb_entryh = (dadb_ctx->dadb_entries)[l]; QM_LEV_DPRINTFC(QDC_SCHED, 1, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, dadb_entry=%p, flags=%#x\n", dadb_entryh, dadb_entryh == NULL ? UINT_MAX : dadb_entryh->dadbe_flags)); } } #endif /* QMGR_DEBUG */ /* should this stop the scheduler??? */ stopit = true; break; } SM_ASSERT(dadb_entry != NULL); } if (NULL == dadb_entry) { /* no free da for this one: try next rdq */ QM_SCHED_RCBE_FREE; break; } ret = qm_sc_conf(qmgr_ctx, aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4, dadb_entry); if (SM_IS_TEMP_ERR(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_INFO, 9, "sev=INFO, func=qmgr_sched_dlvry, ip=%A, qm_sc_conf=%r" , aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4, ret); QM_SCHED_RCBE_FREE; break; } /* ** Keep the connection open? ** Only if it is configured and if there is at least ** one other entry in this "ready queue". */ se_keep_open = qm_sess_keep_open(qmgr_ctx, aq_ta, aq_rcpt, max_rcpts_ta); QM_LEV_DPRINTTC(QDC_SCHED, 1, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, session_reuse=%d, keep_open=%d, da_sess=%s, da_ta=%s\n", se_reuse, se_keep_open, dadb_entry->dadbe_da_se_id, dadb_entry->dadbe_da_ta_id), qmgr_ctx->qmgr_ev_ctx->evthr_c_time); ret = qm_to_sc_task(qsc_ctx, se_reuse, !se_keep_open, aq_ta, aq_rcpt, rcbe, dadb_entry); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_ERR, 1, "sev=ERROR, func=qmgr_sched_dlvry, qm_to_sc_task=%m", ret); stopit = true; break; } ++aq_ctx->aq_t_da; SESSTA_COPY(aq_rcpt->aqr_da_ta_id, dadb_entry->dadbe_da_ta_id); AQR_DA_INIT(aq_rcpt); ret = aq_rdq_rm(aq_ctx, aq_rcpt, THR_NO_LOCK, qmgr_ctx->qmgr_lctx); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_ERR, 1, "sev=ERROR, func=qmgr_sched_dlvry, aq_rdq_rm=%m, flags=%#x", ret, aq_rcpt->aqr_flags); stopit = true; break; } sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_INFO, 9, "func=qmgr_sched_dlvry, ss_ta=%s, da_sess=%s, da_ta=%s, rcpt=%@S, idx=%d, state=%d, ip=%A, i=%u, reuse=%d, keep_open=%d, entries_cur=%d, entries_max=%d" , aq_rcpt->aqr_ss_ta_id , dadb_entry->dadbe_da_se_id , dadb_entry->dadbe_da_ta_id , aq_rcpt->aqr_pa , aq_rcpt->aqr_idx, aq_rcpt->aqr_status , aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4, i , se_reuse, se_keep_open , qsc_ctx->qsc_dadb_ctx->dadb_entries_cur , qsc_ctx->qsc_dadb_ctx->dadb_entries_max ); max_rcpts_ta = qmgr_rpcts_ta(qmgr_ctx, aq_rcpt); /* Add more recipients to this DA transaction? */ for (aq_rcpt_nxt = AQR_SS_SUCC(aq_rcpt), nrcpts = 1; aq_rcpt_nxt != aq_rcpt && nrcpts < max_rcpts_ta && !AQ_TA_IS_FLAG(aq_ta, AQ_TA_FL_VERP) && !AQR_IS_FLAG(aq_rcpt, AQR_FL_HAS_VERP); aq_rcpt_nxt = AQR_SS_SUCC(aq_rcpt_nxt)) { /* XXX "append" also undeliverable recipients so they are counted in qda... */ /* check whether DA and host are the same */ if (AQR_SCHEDULE(aq_rcpt_nxt) && SAME_TRANSACTION(aq_rcpt, aq_rcpt_nxt)) { QM_LEV_DPRINTFC(QDC_SCHED, 1, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, status=more, i=%u, ta=%s, rcpt=%@S, idx=%d, ip=%A, rcpt=%p, rcpt_nxt=%p, nrctps=%u\n", i, aq_rcpt->aqr_ss_ta_id, aq_rcpt_nxt->aqr_pa, aq_rcpt_nxt->aqr_idx, aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4, aq_rcpt, aq_rcpt_nxt, nrcpts)); ret = qm_to_sc_add_rcpt(qsc_ctx, aq_rcpt_nxt, rcbe); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_ERR, 1, "sev=ERROR, func=qmgr_sched_dlvry, qm_to_sc_add_rcpt=%m", ret); stopit = true; break; } sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_INFO, 9, "func=qmgr_sched_dlvry, sched=more, ss_ta=%s, da_ta=%s, rcpt=%@S, idx=%d, ip=%A", aq_rcpt->aqr_ss_ta_id, dadb_entry->dadbe_da_ta_id, aq_rcpt_nxt->aqr_pa, aq_rcpt_nxt->aqr_idx, aq_rcpt->aqr_addrs[aq_rcpt->aqr_addr_cur].aqra_ipv4); AQR_SET_FLAG(aq_rcpt_nxt, AQR_FL_SCHED); SESSTA_COPY(aq_rcpt_nxt->aqr_da_ta_id, dadb_entry->dadbe_da_ta_id); /* ** Prepend new recipient to original ** one to keep the correct order. */ AQR_DA_PRE(aq_rcpt, aq_rcpt_nxt); ++aq_ctx->aq_t_da; ++nrcpts; ret = aq_rdq_rm(aq_ctx, aq_rcpt_nxt, THR_NO_LOCK, qmgr_ctx->qmgr_lctx); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_ERR, 1, "sev=ERROR, func=qmgr_sched_dlvry, aq_rdq_rm=%m, flags=%#x, where=more_rcpts2busy", ret, aq_rcpt_nxt->aqr_flags); stopit = true; break; } } /* ** Set this here?? That means it is only ** set if sent to a DA, not when it is ** "looked at" by the scheduler. Hence if ** the address doesn't resolve, last_try ** is never set. */ aq_rcpt_nxt->aqr_last_try = time_now; } /* Send task to DA */ ret = sm_rcbcom_endrep(&qsc_ctx->qsc_com, qsc_ctx->qsc_com.rcbcom_tsk, true /* XXX HACK */, &rcbe); if (sm_is_err(ret)) { aq_rcpt_P aq_rcpt_r; if (rcbe != NULL) { /* ** rcbe has not been added to write ** list hence it needs to be cleaned up */ aq_rsnd_ctx_free(aq_rsnd_ctx); aq_rsnd_ctx = NULL; sm_rcbe_free(rcbe); rcbe = NULL; } /* undo status change for added recipients */ for (aq_rcpt_nxt = AQR_DA_PRED(aq_rcpt); aq_rcpt_nxt != aq_rcpt; aq_rcpt_nxt = aq_rcpt_r) { aq_rcpt_r = AQR_DA_PRED(aq_rcpt); AQR_CLR_FLAG(aq_rcpt_nxt, AQR_FL_SCHED); res = aq_rdq_add(aq_ctx, aq_rcpt_nxt, NULL /* &aqrdq_flags */, THR_NO_LOCK); if (sm_is_err(res)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_ERR, 1, "sev=ERROR, func=qmgr_sched_dlvry, aq_rdq_add=%m, flags=%#x, where=more_rcpts2todo", ret, aq_rcpt_nxt->aqr_flags); } /* XXX also remove entry from list? */ AQR_DA_DELENTRY(aq_rcpt_nxt); /* ** XXX Other things to undo? ** Make a list of things "done" ** so they can be "undone" here. */ } sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_ERR, 1, "sev=ERROR, func=qmgr_sched_dlvry, sm_qsc_endrep=%m", ret); stopit = true; break; } else aq_rsnd_ctx = NULL; /* ** At this point the information about the session/ ** transaction is in the RCB list for the DA, but ** it isn't sent yet. Hence AQR_FL_WAIT4UPD can't ** be set here, it will be set when the RCB ** is sent to the DA. */ #if 0 // if (!se_reuse) // { // r = pthread_mutex_lock(&qsc_ctx->qsc_mutex); // SM_LOCK_OK(r); // if (r != 0) // { // sm_log_write(qmgr_ctx->qmgr_lctx, // QM_LCAT_SCHED, QM_LMOD_SCHED, // SM_LOG_CRIT, 4, // "sev=CRIT, func=qmgr_sched_dlvry, lock_qsc=%d", r); // } // else // { // qsc_ctx->qsc_curactive++; // r = pthread_mutex_unlock(&qsc_ctx->qsc_mutex); // SM_ASSERT(r == 0); // } // } //QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "sev=DBG, func=qmgr_sched_dlvry, qsc_ctx=%p, curactive=%d, max=%d\n", //qsc_ctx, qsc_ctx->qsc_curactive, qsc_ctx->qsc_maxthreads)); #endif /* 0 */ AQR_SET_FLAG(aq_rcpt, AQR_FL_SCHED); aq_rcpt->aqr_last_try = time_now; *pqsc_bits |= qsc_bit; } /* ** If stopit is set then the rdq should be rotated so ** the next time the scheduler starts at the one that ** couldn't be finished this time. XXX ** See begin of outer loop, it's currently rotated for ** every entry. */ if (rcbe != NULL) { /* ** Possible leftover from previous iteration. ** Maybe it should be reused? */ (void) aq_rsnd_ctx_free(aq_rsnd_ctx); aq_rsnd_ctx = NULL; sm_rcbe_free(rcbe); rcbe = NULL; } SM_ASSERT(NULL == aq_rsnd_ctx); } res = ret; /* ** XXX Crude resource control... ** This needs to be more sophisticated and take more resources ** into account. Moreover, it needs to free memory if required, ** and it needs to change the limits on other data structures ** (esp. IQDB) too. */ if (sm_is_err(res) && sm_error_value(res) == ENOMEM) { #if 0 /* throttle servers; do this in caller?? */ (void) qm_control(qmgr_ctx, 1, 100, QMGR_RFL_MEM_I, THR_LOCK_UNLOCK); #endif ret = aq_lower_limit(aq_ctx, 0, THR_NO_LOCK); sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_WARN, 8, "sev=WARN, func=qmgr_sched_dlvry, aq_lower_limit=%m", ret); } else { /* ** XXX This should NOT be done always... but as long ** as we don't have a way to measure memory usage... ** How about a way to gradually increase it, similar ** to qss_control()? */ ret = aq_raise_limit(aq_ctx, UINT_MAX, THR_NO_LOCK); } if (rcbe != NULL) { aq_rsnd_ctx_free(aq_rsnd_ctx); aq_rsnd_ctx = NULL; sm_rcbe_free(rcbe); rcbe = NULL; } SM_ASSERT(NULL == aq_rsnd_ctx); return res; } /* ** QMGR_SCHED -- scheduler ** this is running as a task (timeout activated) ** ** Parameters: ** tsk -- evthr task ** ** Returns: ** usual sm_error code ** ** Locking: locks entire aq_ctx during operation, returns unlocked ** ** Last code review: ** Last code change: */ sm_ret_T qmgr_sched(sm_evthr_task_P tsk) { int r; sm_ret_T ret; qmgr_ctx_P qmgr_ctx; aq_ctx_P aq_ctx; bool err; uint32_t qsc_bits; timeval_T timeval_now, delay; int delay_next_try; #if QMGR_TEST int delay_next_try_tst; #endif SM_IS_EVTHR_TSK(tsk); qmgr_ctx = (qmgr_ctx_P) tsk->evthr_t_actx; SM_IS_QMGR_CTX(qmgr_ctx); aq_ctx = qmgr_ctx->qmgr_aq; SM_IS_AQ(aq_ctx); err = false; qsc_bits = 0; /* Check ret?? */ ret = evthr_timeval(tsk->evthr_t_ctx, &timeval_now); delay_next_try = 0; #if QMGR_TEST delay_next_try_tst = 0; if (QCNF_IS_FLAG(qmgr_ctx, QCNF_FL_NO_SCHED)) { delay.tv_usec = 0; delay.tv_sec = 100; timeradd(&timeval_now, &delay, &tsk->evthr_t_sleep); return EVTHR_SLPQ; } if (SM_IS_FLAG(qmgr_ctx->qmgr_cnf.q_cnf_tests, QMGR_TEST_INT_SRC)) { ret = qm_tst_fill_aq(qmgr_ctx); if (sm_is_err(ret)) delay_next_try_tst = sm_is_warn_err(ret) ? (0 - sm_error_value(ret)) : 300; else if (SM_NOTDONE == ret) delay_next_try_tst = 1; #if QMGR_STATS if (qmgr_ctx->qmgr_cnf.q_cnf_tot_tas <= qmgr_ctx->qmgr_tas_sent) sm_io_fprintf(smioerr, "q_cnf_tot_tas=%u, tas_sent=%lu, aq_entries=%u\n", qmgr_ctx->qmgr_cnf.q_cnf_tot_tas, qmgr_ctx->qmgr_tas_sent, aq_ctx->aq_entries); if (qmgr_ctx->qmgr_cnf.q_cnf_tot_tas <= qmgr_ctx->qmgr_tas_sent && aq_ctx->aq_entries <= 2) { qm_info(qmgr_ctx, smioout); return EVTHR_TERM; } #endif } if (SM_IS_FLAG(qmgr_ctx->qmgr_cnf.q_cnf_tests, QMGR_TEST_SCHED_DLY)) { static time_T sched_start_delay = 0; if (0 == sched_start_delay) sched_start_delay = timeval_now.tv_sec + 10; if (sched_start_delay > timeval_now.tv_sec) { tsk->evthr_t_sleep.tv_sec = sched_start_delay; return EVTHR_SLPQ; } SM_CLR_FLAG(qmgr_ctx->qmgr_cnf.q_cnf_tests, QMGR_TEST_SCHED_DLY); } #else /* XXX TESTING XXX */ if (QCNF_IS_FLAG(qmgr_ctx, QCNF_FL_SE_REUSE)) { static time_T sched_start_delay = 0; if (0 == sched_start_delay) sched_start_delay = timeval_now.tv_sec + 10; if (sched_start_delay > timeval_now.tv_sec) { tsk->evthr_t_sleep.tv_sec = sched_start_delay; return EVTHR_SLPQ; } } #endif /* QMGR_TEST */ ret = qm_get_edb_entries(qmgr_ctx, &delay_next_try); if (delay_next_try < 0) delay_next_try = 1; /* XXX HACK */ /* HACK notify qar task */ if (QDA_ACT_SMAR(ret)) { sm_evthr_task_P qar_tsk; qar_ctx_P qar_ctx; qar_ctx = qmgr_ctx->qmgr_ar_ctx; SM_IS_QAR_CTX(qar_ctx); qar_tsk = qmgr_ctx->qmgr_ar_tsk; if (qar_tsk != NULL) ret = evthr_en_wr(qar_tsk); else ret = sm_error_temp(SM_EM_Q_SCHED, SM_E_NO_AR); } QM_LEV_DPRINTFC(QDC_SCHED, 6, (QM_DEBFP, "sev=DBG, func=qmgr_sched, tsk=%p\n", tsk)); r = pthread_mutex_lock(&aq_ctx->aq_mutex); SM_LOCK_OK(r); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_CRIT, 4, "sev=CRIT, func=qmgr_sched, lock_aq=%d", r); goto error; } /* XXX just wait till someone wakes this up? */ if (aq_ctx->aq_nextrun.tv_sec != 0) { if (timercmp(&timeval_now, &aq_ctx->aq_nextrun, <)) { tsk->evthr_t_sleep = aq_ctx->aq_nextrun; QM_LEV_DPRINTFC(QDC_SCHED, 2, (QM_DEBFP, "sev=DBG, func=qmgr_sched, delay=forced, next_try=%ld\n", aq_ctx->aq_nextrun.tv_sec)); goto unlock; } aq_ctx->aq_nextrun.tv_usec = 0; aq_ctx->aq_nextrun.tv_sec = 0; } if (aq_is_empty(aq_ctx)) { delay.tv_usec = 0; delay.tv_sec = delay_next_try == 0 ? 3000 : delay_next_try; QM_LEV_DPRINTFC(QDC_SCHED, 2, (QM_DEBFP, "sev=DBG, func=qmgr_sched, aq=empty, next_try=%d, delay=%ld\n", delay_next_try, delay.tv_sec)); } else { /* do something.... */ sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_INFO, 14, "func=qmgr_sched, aq_entries=%u, aq_t_da=%u", aq_ctx->aq_entries, aq_ctx->aq_t_da); ret = qmgr_sched_dlvry(qmgr_ctx, aq_ctx, &qsc_bits, &delay_next_try); if (sm_is_err(ret)) { /* XXX Map error type to logging category and sev? */ err = sm_is_error(ret); sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, err ? SM_LOG_ERR : SM_LOG_WARN, err ? 2 : QMGR_IS_SFLAG(qmgr_ctx, QMGR_SFL_DA) ? 14 : 12, "sev=%s, func=qmgr_sched, qmgr_sched_dlvry=%m", err ? "ERROR" : "WARN", ret); } /* XXX interval... */ delay.tv_usec = 0; #if QMGR_TEST if (SM_IS_FLAG(qmgr_ctx->qmgr_cnf.q_cnf_tests, QMGR_TEST_INT_SRC)) { r = aq_usage(aq_ctx, AQ_USAGE_ALL); if (r < qmgr_ctx->qmgr_cnf.q_cnf_max_fill_aq && 0 == delay_next_try_tst) { delay.tv_sec = 0; delay.tv_usec = r; } else if (delay_next_try_tst < 0) { delay.tv_sec = 0; delay.tv_usec = (-delay_next_try_tst) * 500; } else delay.tv_sec = delay_next_try_tst == 0 ? 1 : delay_next_try_tst; sm_io_fprintf(smioerr, "usage=%d, delay_next_try_tst=%d, delay_next_try=%d, delay=%d.%06ld\n", r, delay_next_try_tst, delay_next_try, delay.tv_sec, delay.tv_usec); } else #endif delay.tv_sec = delay_next_try == 0 ? 600 : delay_next_try; QM_LEV_DPRINTFC(QDC_SCHED, 3, (QM_DEBFP, "sev=DBG, func=qmgr_sched, next_try=%d, delay=%ld\n", delay_next_try, delay.tv_sec)); } QM_LEV_DPRINTFC(QDC_SCHED, 4, (QM_DEBFP, "sev=DBG, func=qmgr_sched, old t_sleep=%ld\n", tsk->evthr_t_sleep.tv_sec)); timeradd(&timeval_now, &delay, &tsk->evthr_t_sleep); QM_LEV_DPRINTFC(QDC_SCHED, 4, (QM_DEBFP, "sev=DBG, func=qmgr_sched, timeval_now=%ld, delay=%ld, t_sleep=%ld\n", timeval_now.tv_sec, delay.tv_sec, tsk->evthr_t_sleep.tv_sec)); if (err) aq_ctx->aq_nextrun = tsk->evthr_t_sleep; unlock: r = pthread_mutex_unlock(&aq_ctx->aq_mutex); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_ERR, 1, "sev=ERROR, func=qmgr_sched, unlock_aq=%d", r); goto error; } /* ** notify qsc task after aq has been unlocked, otherwise ** there might be a deadlock (qsc task locked, waiting for aq, ** while this task has the lock for aq and tries to notify ** (which requires locking) the qsc task). */ if (qsc_bits != 0) { uint i; uint32_t j; qsc_ctx_P qsc_ctx; r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex); SM_LOCK_OK(r); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SMTPC, QM_LMOD_FROM_SMTPC, SM_LOG_CRIT, 4, "sev=CRIT, func=qmgr_sched, lock_qmgr=%d", r); goto error; } /* XXX HACK! Must match what qmgr_sched_dlvry does! */ for (j = 1, i = 0; i < QM_N_SC_GLI(qmgr_ctx); i++, j *= 2) { if ((qsc_bits & j) == 0) continue; qsc_ctx = qmgr_li_sc(qmgr_ctx, i); if (qsc_ctx->qsc_com.rcbcom_tsk == NULL) continue; SM_IS_EVTHR_TSK(qsc_ctx->qsc_com.rcbcom_tsk); /* tell someone to send the tasks to the DAs */ ret = evthr_en_wr(qsc_ctx->qsc_com.rcbcom_tsk); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_SCHED, SM_LOG_ERR, 1, "sev=ERROR, func=qmgr_sched, evthr_en_wr=%m", ret); /* XXX what now my friend? */ } } r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex); SM_ASSERT(r == 0); } return EVTHR_SLPQ; error: delay.tv_usec = 0; delay.tv_sec = 1; timeradd(&timeval_now, &delay, &tsk->evthr_t_sleep); return EVTHR_SLPQ; }