/* * Copyright (c) 2002-2005 Sendmail, Inc. and its suppliers. * All rights reserved. * * By using this file, you agree to the terms and conditions set * forth in the LICENSE file which can be found at the top level of * the sendmail distribution. */ #include "sm/generic.h" SM_RCSID("@(#)$Id: qm_fr_ar.c,v 1.151 2007/06/03 02:19:15 ca Exp $") #include "sm/error.h" #include "sm/assert.h" #include "sm/memops.h" #include "sm/io.h" #include "sm/rcb.h" #include "sm/common.h" #include "sm/qmgr.h" #include "sm/qmgr-int.h" #include "qmgr.h" #include "sm/edb.h" #include "sm/reccom.h" #include "sm/da.h" #include "sm/dns.h" #include "sm/smar.h" #include "sm/aqrdq.h" #include "qmgr.h" #include "log.h" /* ** Context structure for owner handling ** roc_old_owners: old number of owners (aq_ta->aqt_owners_n) ** roc_add_owners: number of owner addresses in this invocation ** roc_map: mapping owner indices (as given by SMAR) to indices in ** aq_ta->aqt_owners_pa */ struct rcpt_owner_ctx_S { rcpt_idx_T roc_old_owners; rcpt_idx_T roc_add_owners; size_t roc_map_size; rcpt_idx_T *roc_map; }; typedef struct rcpt_owner_ctx_S rcpt_owner_ctx_T, *rcpt_owner_ctx_P; /* ** QAR_RCPT_ST -- Decode error status received from AR ** ** Parameters: ** aq_rcpt -- AQ rcpt ** v -- status code ** ** Returns: ** usual sm_error code; SM_E_UNEXPECTED ** ** Side Effects: ** updates aq_rcpt, esp. aqr_status_new ** ** Last code review: 2005-03-29 18:07:23 ** Last code change: */ static sm_ret_T qar_rcpt_st(aq_rcpt_P aq_rcpt, uint32_t v) { smtp_status_T rcpt_status; #if 0 QM_LEV_DPRINTFC(QDC_A2Q, 3, (QM_DEBFP, "sev=WARN, func=qar_rcpt_st, where=updated, rcpt_id=%s, ss_ta=%s, idx=%d, state=%#x, addr_max=%d\n", rcpt_id, ta_ss_id, rcpt_idx, v, aq_rcpt->aqr_addr_max)); #endif if (sm_is_temp_err(v) || (IS_SMTP_REPLY(v) && smtp_is_reply_temp(v))) { rcpt_status = AQR_ST_TEMP; AQR_SET_FLAG(aq_rcpt, AQR_FL_TEMP|AQR_FL_ARF|AQR_FL_RCVD4AR|AQR_FL_STAT_NEW); } else if (sm_is_perm_err(v) || (IS_SMTP_REPLY(v) && smtp_is_reply_fail(v))) { rcpt_status = AQR_ST_PERM; AQR_SET_FLAG(aq_rcpt, AQR_FL_PERM|AQR_FL_ARF|AQR_FL_RCVD4AR|AQR_FL_STAT_NEW); } else { #if 0 QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qar_rcpt_st, where=update, rcpt_id=%s, ss_ta=%s, idx=%d, unknown_state=%#x\n", rcpt_id, ta_ss_id, rcpt_idx, v)); #endif return sm_error_perm(SM_EM_Q_AR2Q, SM_E_UNEXPECTED); } switch ((sm_ret_T) v) { case DNSR_NOTFOUND: rcpt_status = SMTP_AR_NOTF; break; case DNSR_TEMP: rcpt_status = SMTP_AR_TEMP; break; case sm_error_perm(SM_EM_AR, SM_E_ALIASEXP): case DNSR_PERM: rcpt_status = SMTP_AR_PERM; break; case DNSR_TIMEOUT: rcpt_status = SMTP_AR_TMO; break; case sm_error_perm(SM_EM_AR, SM_E_MXEMPTY): rcpt_status = SMTP_AR_MXEMPTY; break; case sm_error_temp(SM_EM_AR, SM_E_ALIASEXP): rcpt_status = SMTP_AR_ALIAS; break; case sm_error_perm(SM_EM_AR, SM_E_ALIAS_REC): rcpt_status = SMTP_AR_AL_REC; break; case sm_error_perm(SM_EM_AR, ELOOP): rcpt_status = SMTP_AR_LOOP; break; default: if (IS_SMTP_REPLY(v)) rcpt_status = v; break; } /* ** Need to update recipient in various DBs (also status ** and counters). See da_stat.c, need to split ** qda_update_ta_stat() to allow for update of aq_rcpts. ** Instead of doing this now (for each recipient when it ** comes in, which may cause a lot of "small" updates), ** do it in the scheduler: aq_rcpt is marked as "failed" ** anyway and hence it is possible to go through the list ** and update several at once. That code should be similar ** to qda_update_ta_stat(). ** We could keep track of the number of failures, and ** link those entries to allow for "fast" access. ** When a certain threshold is reached, those recipients ** are removed from AQ and the various DBs and counters ** are updated in a single "transaction". */ aq_rcpt->aqr_err_st = DA_AR_ERR; AQR_SET_FLAG(aq_rcpt, AQR_FL_ERRST_UPD); aq_rcpt->aqr_status_new = rcpt_status; aq_rcpt->aqr_addr_max = 0; return SM_SUCCESS; } /* ** QAR_OWNIDXMAP -- map owner index to local offset ** ** Parameters: ** owner_idx -- owner index as returned from AR ** roc -- owner context ** ** Returns: ** >=0: index in map ** <0: usual sm_error code: SM_E_NOTFOUND ** ** Last code review: 2005-03-29 23:01:13 ** Last code change: */ static sm_ret_T qar_ownidxmap(rcpt_idx_T owner_idx, rcpt_owner_ctx_P roc) { rcpt_idx_T i; SM_ASSERT(roc != NULL); for (i = 0; i < roc->roc_add_owners; i++) { if (owner_idx == roc->roc_map[i]) return i + roc->roc_old_owners + 1; } return sm_error_perm(SM_EM_AQ, SM_E_NOTFOUND); } /* ** QAR_GET_ADDRS -- Get (IPv4) addresses from AR ** ** Parameters: ** aq_rcpt -- AQ rcpt ** rcb -- RCB from AR ** now -- current time ** nr -- number of addresses ** prv -- (pointer to) return value for caller to use (output) ** returns an error if "fatal" for communication, e.g., ** protocol error. ** ** Returns: ** usual sm_error code; SM_E_OVFLW_SC, SM_E_PR_ERR ** ** Side Effects: allocate aqr_addrs (usually), must be free()d by caller ** on error. may fill in aqr_addrs only partially. ** ** Last code review: 2005-03-30 17:49:10 ** Last code change: */ static sm_ret_T qar_get_addrs(aq_rcpt_P aq_rcpt, sm_rcb_P rcb, time_T now, uint32_t nr, sm_ret_T *prv) { sm_ret_T ret; uint32_t v, l, rt, i, n_A; size_t addrs_size; *prv = ret = SM_SUCCESS; n_A = (nr < SM_DNS_A_MAX) ? nr : SM_DNS_A_MAX; addrs_size = n_A * sizeof(*(aq_rcpt->aqr_addrs)); if (addrs_size < n_A || addrs_size < sizeof(*(aq_rcpt->aqr_addrs))) { ret = sm_error_perm(SM_EM_Q_AR2Q, SM_E_OVFLW_SC); goto error; } /* Later on: check size and realloc() if necessary */ aq_rcpt->aqr_addrs = (aq_raddr_P) sm_zalloc(addrs_size); /* Use a fallback if memory allocation fails */ if (NULL == aq_rcpt->aqr_addrs) { AQR_SET_FLAG(aq_rcpt, AQR_FL_MEMAR|AQR_FL_ARINCOMPL); aq_rcpt->aqr_addr_max = 1; aq_rcpt->aqr_addrs = &aq_rcpt->aqr_addr_mf; } aq_rcpt->aqr_addr_cur = 0; /* Even if alloc failed: read all data (or ignore it?) */ for (i = 0; i < nr; i++) { ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); if (sm_is_err(ret) || l != 4 || rt != RT_R2Q_RCPT_IPV4) { QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qar_get_addrs, expected=%#x, rt=%#x, v=%d, l=%d, ret=%r, i=%d, nr=%d\n", RT_R2Q_RCPT_IPV4, rt, v, l, ret, i, nr)); *prv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); goto error; } if ((!AQR_IS_FLAG(aq_rcpt, AQR_FL_MEMAR) || i == 0) && i < n_A) aq_rcpt->aqr_addrs[i].aqra_ipv4 = v; #if 0 QM_LEV_DPRINTFC(QDC_A2Q, 6, (QM_DEBFP, "sev=DBG, func=qar_get_addrs, rcpt_id=%s, %d/%d, ip=%A\n", rcpt_id, i, nr, (ipv4_T) v)); #endif ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); if (sm_is_err(ret) || l != 4 || rt != RT_R2Q_RCPT_PRE) { QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qar_get_addrs, expected=%#x, rt=%#x, v=%d, l=%d, ret=%r\n", RT_R2Q_RCPT_PRE, rt, v, l, ret)); *prv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); goto error; } if ((!AQR_IS_FLAG(aq_rcpt, AQR_FL_MEMAR) || i == 0) && i < n_A) aq_rcpt->aqr_addrs[i].aqra_pref = v; ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); if (sm_is_err(ret) || l != 4 || rt != RT_R2Q_RCPT_TTL) { QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qar_get_addrs, expected=%#x, rt=%#x, v=%d, l=%d, ret=%r\n", RT_R2Q_RCPT_TTL, rt, v, l, ret)); *prv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); goto error; } if ((!AQR_IS_FLAG(aq_rcpt, AQR_FL_MEMAR) || i == 0) && i < n_A) aq_rcpt->aqr_addrs[i].aqra_expt = now + v; } AQR_SET_FLAG(aq_rcpt, AQR_FL_RCVD4AR|AQR_FL_RDY4DLVRY); if (!AQR_IS_FLAG(aq_rcpt, AQR_FL_MEMAR)) aq_rcpt->aqr_addr_max = nr; return ret; error: if (!sm_is_err(ret)) ret = sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); return ret; } /* ** QAR_ALIAS -- Decode data received from AR for alias expansion ** recipients are written to EDB (using an EDB request list), not to AQ! ** ** Parameters: ** qar_ctx -- AR context ** aq_rcpt -- AQ rcpt ** nae -- number of aliases ** rcb -- RCB from which to read data ** prv -- (pointer to) return value for caller to use (output) ** returns an error if "fatal" for communication, e.g., ** protocol error. ** ** Returns: ** usual sm_error code ** ** Locking: ** aq_ctx and defedb should be locked by caller ** ** Called by: qm_fr_ar_react() ** ** Todo: error handling! ** need to undo all changes done so far for any resource problem. ** what then? how is caller supposed to deal with that? just log it ** and try again later? theoretically cleanup should take care of it: ** it will remove aq_rcpt after a timeout and try to write it to EDB. ** we could try to do this "now" (maybe it makes sense to have one ** rcb entry for this as "emergency backup" available?) ** ** Last code review: ** Last code change: */ static sm_ret_T qar_alias(qar_ctx_P qar_ctx, aq_rcpt_P aq_rcpt, uint32_t nae, sm_rcb_P rcb, int *prv, rcpt_owner_ctx_P roc) { uint32_t v, l, rt, alias_idx; sm_ret_T ret, rcpt_status; qmgr_ctx_P qmgr_ctx; aq_ctx_P aq_ctx; edb_ctx_P edb_ctx; aq_rcpt_P aq_rcpt_a; aq_ta_P aq_ta; uint iqdb_rcpts_done; uint32_t fct_state; rcpt_id_T rcpt_id; time_T now; ibdb_req_hd_T ibdb_req_hd; edb_req_hd_T edb_req_hd; #define FST_IBDB_USED 0x0001 /* ibdb req list is in use */ #define FST_EDBRQL_USED 0x0002 /* edb req list is in use */ #define FST_EBD_WRITTEN 0x0004 /* edb has been written */ #define FST_IBBD_WRITTEN 0x0008 /* ibdb has been written */ #define FST_AQR_NEW 0x0010 /* aq_rcpt_new failed */ #define FST_EDB_RCPT_APP 0x0020 /* edb_rcpt_app failed */ #define FST_EDBC_ADD 0x0040 /* edbc_add failed */ #define FST_EDB_RCPT_RM 0x0080 /* edb_rcpt_rm_req failed */ SM_IS_QAR_CTX(qar_ctx); qmgr_ctx = qar_ctx->qar_qmgr_ctx; SM_IS_QMGR_CTX(qmgr_ctx); aq_ctx = qmgr_ctx->qmgr_aq; SM_IS_AQ(aq_ctx); edb_ctx = qmgr_ctx->qmgr_edb;; ret = SM_SUCCESS; now = evthr_time(qmgr_ctx->qmgr_ev_ctx); aq_rcpt_a = NULL; aq_ta = aq_rcpt->aqr_ss_ta; iqdb_rcpts_done = 0; fct_state = 0; IBDBREQL_INIT(&ibdb_req_hd); SM_SET_FLAG(fct_state, FST_IBDB_USED); EDBREQL_INIT(&edb_req_hd); SM_SET_FLAG(fct_state, FST_EDBRQL_USED); for (alias_idx = 0; alias_idx < nae; alias_idx++) { ret = aq_rcpt_new(&aq_rcpt_a); if (sm_is_err(ret)) { SM_SET_FLAG(fct_state, FST_AQR_NEW); goto error; } aq_rcpt_a->aqr_idx = aq_ta->aqt_nxt_idx + alias_idx; SESSTA_COPY(aq_rcpt_a->aqr_ss_ta_id, aq_rcpt->aqr_ss_ta_id); aq_rcpt_a->aqr_alias_idx = aq_rcpt->aqr_idx; aq_rcpt_a->aqr_st_time = aq_rcpt->aqr_st_time; aq_rcpt_a->aqr_ss_ta = aq_ta; /* which flags to copy?? */ aq_rcpt_a->aqr_flags = AQR_FL_ALIAS; /* |aq_rcpt->aqr_flags */ ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); if (sm_is_err(ret)) { *prv = ret; goto error; } rcpt_status = AQR_ST_NEW; if (RT_R2Q_RCPT_ST == rt) { /* handle error code */ ret = qar_rcpt_st(aq_rcpt_a, v); QM_LEV_DPRINTFC(QDC_A2Q, 3, (QM_DEBFP, "sev=WARN, func=qar_alias, alias=%d/%d, status=%r, qar_rcpt_st=%r\n", alias_idx, nae, v, ret)); if (sm_is_err(ret)) { *prv = ret; goto error; } /* update recipient counters in TA */ ret = aq_upd_ta_rcpt_cnts(aq_ta, AQR_ST_NONE, aq_rcpt_a->aqr_status_new, qmgr_ctx->qmgr_lctx); if (sm_is_err(ret)) { QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qar_alias, alias=%d/%d, status=%r, aq_upd_ta_rcpt_cnts=%r\n", alias_idx, nae, v, ret)); goto error; } rcpt_status = aq_rcpt_a->aqr_status_new; /* write rcpt to edb, see below */ } else if (RT_R2Q_RCPT_DA == rt) aq_rcpt_a->aqr_da_idx = v; else { *prv = sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); goto error; } /* RT_R2Q_RCPT_PORT (optional) or RT_R2Q_OWN_REF */ ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); if (sm_is_err(ret) || l != 4) { *prv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); goto error; } if (RT_R2Q_RCPT_PORT == rt) { aq_rcpt_a->aqr_port = (short) v; ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); } if (sm_is_err(ret) || l != 4 || rt != RT_R2Q_OWN_REF) { *prv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); goto error; } if (v > 0) { ret = qar_ownidxmap(v, roc); if (sm_is_err(ret)) goto error; aq_rcpt_a->aqr_owner_idx = (rcpt_idx_T) ret; } if (sm_is_success(sm_rcb_peek2uint32(rcb, &l, &rt)) && 4 == l && RT_R2Q_RCPT_FL == rt) { ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); if (sm_is_err(ret) || l != 4 || rt != RT_R2Q_RCPT_FL) { *prv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); goto error; } if (SM_IS_FLAG(v, SMARRT_FL_VERP)) AQR_SET_FLAG(aq_rcpt_a, AQR_FL_HAS_VERP); } #if MTA_USE_TLS if (!SM_RCB_ISEOB(rcb) && sm_is_success(sm_rcb_peek2uint32(rcb, &l, &rt)) && 4 == l && RT_R2Q_MAP_RES_CNF_RCPT == rt) { ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); if (sm_is_err(ret) || l != 4 || rt != RT_R2Q_MAP_RES_CNF_RCPT) { *prv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); goto error; } aq_rcpt_a->aqr_maprescnf = v; if (sm_is_temp_err(v) && !AQR_IS_FLAG(aq_rcpt, AQR_FL_TEMP)) { aq_rcpt->aqr_status_new = SMTP_MAP_TEMP; AQR_SET_FLAG(aq_rcpt, AQR_FL_TEMP|AQR_FL_ARF|AQR_FL_STAT_NEW); } ret = sm_rcb_get2uint32(rcb, &l, &rt); if (sm_is_err(ret) || rt != RT_R2Q_RHS_CNF_RCPT) { *prv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); goto error; } ret = sm_rcb_getnstr(rcb, &aq_rcpt_a->aqr_conf, l); if (sm_is_err(ret)) { *prv = ret; goto error; } QM_LEV_DPRINTFC(QDC_A2Q, 8, (QM_DEBFP, "func=qar_alias, conf=%x\n", aq_rcpt->aqr_conf)); } #endif /* MTA_USE_TLS */ ret = sm_rcb_get2uint32(rcb, &l, &rt); if (sm_is_err(ret) || rt != RT_R2Q_RCPT_PA) { *prv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); goto error; } ret = sm_rcb_getnstr(rcb, &aq_rcpt_a->aqr_pa, l); QM_LEV_DPRINTFC(QDC_A2Q, 3, (QM_DEBFP, "sev=INFO, func=qar_alias, alias=%d/%d, new_addr=%S, owner_idx=%d, port=%hd, ret=%r\n", alias_idx, nae, aq_rcpt_a->aqr_pa, aq_rcpt_a->aqr_owner_idx, aq_rcpt_a->aqr_port, ret)); if (AQR_ST_NEW == rcpt_status) { ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); if (sm_is_err(ret) || rt != RT_R2Q_RCPT_NAR || l != 4) { *prv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); goto error; } if (v < 1) { *prv = sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); goto error; } ret = qar_get_addrs(aq_rcpt_a, rcb, now, v, prv); if (sm_is_err(ret)) goto error; } sm_snprintf(rcpt_id, sizeof(rcpt_id), SMTP_RCPTID_FORMAT, aq_rcpt_a->aqr_ss_ta_id, aq_rcpt_a->aqr_idx); ret = edb_rcpt_app(edb_ctx, aq_rcpt_a, &edb_req_hd, rcpt_status); if (sm_is_err(ret)) { SM_SET_FLAG(fct_state, FST_EDB_RCPT_APP); sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_COMM, QM_LMOD_FROM_SMAR, SM_LOG_ERR, 1, "sev=ERROR, func=qar_alias, rcpt_id=%s, edb_rcpt_app=%m", rcpt_id, ret); goto error; } ret = edbc_add(qmgr_ctx->qmgr_edbc, rcpt_id, aq_rcpt_a->aqr_next_try, false); if (sm_is_err(ret)) { SM_SET_FLAG(fct_state, FST_EDBC_ADD); sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_COMM, QM_LMOD_FROM_SMAR, SM_LOG_ERR, 1, "sev=ERROR, func=qar_alias, rcpt_id=%s, edbc_add=%m", rcpt_id, ret); goto error; } /* reuse it?? later on... */ (void) aq_rcpt_free(aq_rcpt_a); aq_rcpt_a = NULL; } /* XXX remove rcpt from aq after EDB has been written?! */ AQR_SET_FLAG(aq_rcpt, AQR_FL_REPLACED); sm_snprintf(rcpt_id, sizeof(rcpt_id), SMTP_RCPTID_FORMAT, aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_idx); if (AQR_IS_FLAG(aq_rcpt, AQR_FL_DEFEDB)) { ret = edb_rcpt_rm_req(edb_ctx, rcpt_id, &edb_req_hd); if (sm_is_err(ret)) { /* XXX deal with error ... */ SM_SET_FLAG(fct_state, FST_EDB_RCPT_RM); sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_COMM, QM_LMOD_FROM_SMAR, SM_LOG_ERR, 0, "sev=ERROR, func=qar_alias, rcpt_id=%s, remove=%m" , rcpt_id, ret); goto error; } } if (AQR_IS_FLAG(aq_rcpt, AQR_FL_IQDB)) { ibdb_rcpt_T ibdb_rcpt; ret = iqdb_rcpt_rm(qmgr_ctx->qmgr_iqdb, rcpt_id, SMTP_RCPTID_SIZE, THR_LOCK_UNLOCK); if (sm_is_err(ret)) { /* XXX */ sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_COMM, QM_LMOD_FROM_SMAR, SM_LOG_ERR, 1, "sev=ERROR, func=qar_alias, rcpt_id=%s, iqdb_rcpt_rm=%m", rcpt_id, ret); goto error; } else { /* One rcpt successfully removed from IQDB */ ++iqdb_rcpts_done; } ibdb_rcpt.ibr_ta_id = aq_rcpt->aqr_ss_ta_id; ibdb_rcpt.ibr_pa = aq_rcpt->aqr_pa; ibdb_rcpt.ibr_idx = aq_rcpt->aqr_idx; ret = ibdb_rcpt_status(qmgr_ctx->qmgr_ibdb, &ibdb_rcpt, IBDB_RCPT_DONE, IBDB_FL_NOROLL, THR_LOCK_UNLOCK); if (sm_is_err(ret)) { /* ** XXX How to handle this error? ** Set a QMGR status flag? ** The system is probably out of memory. */ sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_COMM, QM_LMOD_FROM_SMAR, SM_LOG_ERR, 1, "sev=ERROR, func=qar_alias, ss_ta=%s, ibdb_rcpt_app=%m", aq_rcpt->aqr_ss_ta_id, ret); goto error; } else QM_LEV_DPRINTFC(QDC_A2Q, 5, (QM_DEBFP, "sev=DBG, func=qar_alias, ss_ta=%s, ibdb_rcpt_app=%r\n", aq_rcpt->aqr_ss_ta_id, ret)); } /* update aq_ta; does this need to be undone if an error occurs?? */ aq_ta->aqt_nxt_idx += nae; aq_ta->aqt_rcpts_tot += nae - 1; aq_ta->aqt_rcpts_left += nae - 1; ret = edb_ta_app(edb_ctx, aq_ta, &edb_req_hd, 0 /* XXX? */); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_COMM, QM_LMOD_FROM_SMAR, SM_LOG_ERR, 1, "sev=ERROR, func=qar_alias, ss_ta=%s, edb_ta_app=%m", aq_rcpt->aqr_ss_ta_id, ret); goto error; } /* Removed any entries from INCEDB? */ if (iqdb_rcpts_done > 0) { ret = qda_upd_iqdb(qmgr_ctx, iqdb_rcpts_done, aq_rcpt->aqr_ss_ta_id, aq_ta->aqt_cdb_id, &ibdb_req_hd); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_COMM, QM_LMOD_FROM_SMAR, SM_LOG_ERR, 1, "sev=ERROR, func=qar_alias, ss_ta=%s, qda_upd_iqdb=%m", aq_rcpt->aqr_ss_ta_id, ret); goto error; } } /* write request list ... */ ret = edb_wr_status(edb_ctx, &edb_req_hd); QM_LEV_DPRINTFC(QDC_A2Q, 2, (QM_DEBFP, "sev=DBG, func=qar_alias, edb_wr_status=%r\n", ret)); /* write ibdb... if necessary */ if (sm_is_success(ret)) { SM_SET_FLAG(fct_state, FST_EBD_WRITTEN); if (!AQ_TA_IS_FLAG(aq_ta, AQ_TA_FL_DEFEDB)) { AQ_TA_SET_FLAG(aq_ta, AQ_TA_FL_DEFEDB); aq_ctx->aq_d_entries++; QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=DBG, func=qar_alias, ss_ta=%s, ta_flags=%#x, aq_d_entries=%u, aq_entries=%u\n", aq_ta->aqt_ss_ta_id, aq_ta->aqt_flags, aq_ctx->aq_d_entries, aq_ctx->aq_entries)); SM_ASSERT(aq_ctx->aq_d_entries <= aq_ctx->aq_entries); } SM_CLR_FLAG(fct_state, FST_EDBRQL_USED); ret = ibdb_wr_status(qmgr_ctx->qmgr_ibdb, &ibdb_req_hd); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_COMM, QM_LMOD_FROM_SMAR, SM_LOG_ERR, 1, "sev=ERROR, func=qar_alias, ibdb_wr_status=%m", ret); } else { SM_CLR_FLAG(fct_state, FST_IBDB_USED); SM_SET_FLAG(fct_state, FST_IBBD_WRITTEN); } } else { /* XXX things to cancel? */ sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_COMM, QM_LMOD_FROM_SMAR, SM_LOG_ERR, 1, "sev=ERROR, func=qar_alias, rcpt_id=%s, edb_wr_status=%m", rcpt_id, ret); goto error; } return SM_SUCCESS; error: /* CONTINUE */ if (SM_IS_FLAG(fct_state, FST_EDBRQL_USED)) { (void) edb_reql_free(edb_ctx, &edb_req_hd); SM_CLR_FLAG(fct_state, FST_EDBRQL_USED); } if (SM_IS_FLAG(fct_state, FST_IBDB_USED)) { (void) ibdb_req_cancel(qmgr_ctx->qmgr_ibdb, &ibdb_req_hd); SM_CLR_FLAG(fct_state, FST_IBDB_USED); } if (aq_rcpt_a != NULL) { (void) aq_rcpt_free(aq_rcpt_a); aq_rcpt_a = NULL; } if (sm_is_err(ret)) QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qar_alias, rt=%#x, v=%d, l=%d, ret=%r, rv=%r\n", rt, v, l, ret, *prv)); /* a protocol error but ret does not indicate an error -> set ret too */ if (sm_is_err(*prv) && !sm_is_err(ret)) ret = *prv; return ret; } #undef FST_IBDB_USED #undef FST_EDBRQL_USED /* ** QAR_REACT -- Decode data received from AR and act accordingly ** ** Parameters: ** qar_ctx -- AR context ** ** Returns: ** usual sm_error code ** Only returns an error if "fatal" for communication, e.g., ** protocol error. ** ** Locking: ** qar_ctx: locked by access via task ** aq_ctx: locked if aq_rcpt is found ** ** Called by: qm_fr_ar() ** ** Last code review: 2003-10-29 16:30:16, see comments below. ** Last change: 2005-03-29 21:35:38 ** ** Todo: error handling! */ static sm_ret_T qm_fr_ar_react(qar_ctx_P qar_ctx) { uint32_t v, l, rt, tl, nae; sm_ret_T ret, rv; sm_rcb_P rcb; qmgr_ctx_P qmgr_ctx; aq_ctx_P aq_ctx; aq_rcpt_P aq_rcpt; aq_ta_P aq_ta; rcpt_id_T rcpt_id; sessta_id_T ta_ss_id; rcpt_idx_T rcpt_idx; int r; time_T now; bool notify_sched; uint fct_state; rcpt_owner_ctx_T roc; #define FST_EDB_LOCKED 0x01 #define FST_AQ_RCPT_LOCKED 0x02 #define FST_RCB_OPEN_DEC 0x04 SM_IS_QAR_CTX(qar_ctx); qmgr_ctx = qar_ctx->qar_qmgr_ctx; SM_IS_QMGR_CTX(qmgr_ctx); aq_ctx = qmgr_ctx->qmgr_aq; SM_IS_AQ(aq_ctx); ret = rv = SM_SUCCESS; now = evthr_time(qmgr_ctx->qmgr_ev_ctx); notify_sched = true; nae = 1; /* number of alias expansions */ sm_memzero(&roc, sizeof(roc)); fct_state = 0; aq_rcpt = NULL; /* decode rcb */ rcb = qar_ctx->qar_com.rcbcom_rdrcb; ret = sm_rcb_open_dec(rcb); if (sm_is_err(ret)) { rv = ret; goto error; } SM_SET_FLAG(fct_state, FST_RCB_OPEN_DEC); /* total length of record */ ret = sm_rcb_getuint32(rcb, &tl); if (sm_is_err(ret) || tl > QM_AR_MAX_REC_LEN || tl > sm_rcb_getlen(rcb)) { rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_AR2Q, SM_E_RCB2LONG); goto error; } /* protocol header: version */ ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); if (sm_is_err(ret)) { rv = ret; goto error; } if (l != 4 || rt != RT_PROT_VER || v != PROT_VER_RT) { rv = sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_V_MISM); goto error; } /* define protocol first in smX docs! */ /* see smar/rcpt.c for the current protocol! RT_R2Q_RCPT_ID, smar_rcpt->arr_id, - RT_R2Q_RCPT_ST: error out or - RT_R2Q_RCPT_DA: continue optional: RT_R2Q_RCPT_PA: 1-1 alias replacement RT_R2Q_RCPT_NAR loop for number of addresses RT_R2Q_RCPT_IPV4 RT_R2Q_RCPT_PRE RT_R2Q_RCPT_TTL or - RT_R2Q_RCPT_AE: alias expansion (1-n, n>1) The protocol isn't good... too many cases to distinguish, to much replicated code? How about: RT_R2Q_RCPT_NR: number of recipients for each recipient: either RT_R2Q_RCPT_ST: error from AR or RT_R2Q_RCPT_DA optional (only for #rcpts==1): RT_R2Q_RCPT_PA: 1-1 alias replacement RT_R2Q_RCPT_NAR loop for number of addresses RT_R2Q_RCPT_IPV4 RT_R2Q_RCPT_PRE RT_R2Q_RCPT_TTL doesn't work: no alias expansion (or 1-1) is very different from 1-n (n>1) the latter requires: n (number of new addresses) rcpt_id and rcpt_pa for each new address moreover, the original rcpt_id is needed to mark it as "replaced" nevertheless, it might be better to send RT_R2Q_RCPT_AE before all other data */ /* decode data, act accordingly... */ ret = sm_rcb_get2uint32(rcb, &l, &rt); if (sm_is_err(ret) || l != SMTP_RCPTID_SIZE || rt != RT_R2Q_RCPT_ID) { rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); goto error; } ret = sm_rcb_getn(rcb, (uchar *) rcpt_id, l); if (sm_is_err(ret)) { rv = ret; goto error; } /* "decode" rcpt_id: ta_id and idx */ RCPTID2SESSTAID(rcpt_id, ta_ss_id); r = RCPTID2IDX(rcpt_id, rcpt_idx); if (r != 1) goto error; /* ** NOTE: edbc MUST be locked before aq if aliases are returned! ** For now edbc is always locked, it shouldn't cause any significant ** contention: edbc isn't accessed without aq being locked too, so ** since aq must be locked here, it is ok to lock edb too. ** Why is it edbc and not edb? There seems to be some ** "cross locking" (if edbc is locked, don't access edb?) ** See README.dev. */ r = pthread_mutex_lock(&qmgr_ctx->qmgr_edbc->edbc_mutex); SM_LOCK_OK(r); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_COMM, QM_LMOD_FROM_SMAR, SM_LOG_ERR, 1, "sev=CRIT, func=qmgr_aq_cleanup, lock_edbc=%m", sm_err_perm(r)); goto error; } SM_SET_FLAG(fct_state, FST_EDB_LOCKED); /* lookup rcpt_id in AQ */ ret = aq_rcpt_find_ss(aq_ctx, ta_ss_id, rcpt_idx, THR_LOCK_UNLERR, &aq_rcpt); if (sm_is_err(ret)) { /* ** This can happen if a recipient has been removed due to ** a timeout, just ignore it, the recipient will be tried ** later on again. */ if (ret == sm_error_perm(SM_EM_AQ, SM_E_NOTFOUND)) { QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=WARN, func=qm_fr_ar_react, aq_rcpt_find_ss=NOTFOUND, rcpt_id=%s, ss_ta=%s, idx=%d\n", rcpt_id, ta_ss_id, rcpt_idx)); ret = 0; } else QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_ar_react, aq_rcpt_find_ss=%r, rcpt_id=%s, ss_ta=%s, idx=%d\n", ret, rcpt_id, ta_ss_id, rcpt_idx)); goto error; } SM_SET_FLAG(fct_state, FST_AQ_RCPT_LOCKED); aq_ta = aq_rcpt->aqr_ss_ta; SM_IS_AQ_TA(aq_ta); if (!AQR_IS_FLAG(aq_rcpt, AQR_FL_SENT2AR)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_COMM, QM_LMOD_FROM_SMAR, SM_LOG_WARN, 7, "sev=WARN, func=qm_fr_ar_react, rcpt_id=%s, status=not_sent2ar" , rcpt_id); goto unlock; } /* aq_rcpt (aq_ctx) is locked! */ ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); if (sm_is_err(ret) || l != 4) { rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); goto error; } if (RT_R2Q_RCPT_ST == rt) { QM_LEV_DPRINTFC(QDC_A2Q, 3, (QM_DEBFP, "sev=WARN, func=qm_fr_ar_react, rcpt_id=%s, ss_ta=%s, idx=%d, state=%#x, addr_max=%d, where=global\n", rcpt_id, ta_ss_id, rcpt_idx, v, aq_rcpt->aqr_addr_max)); ret = qar_rcpt_st(aq_rcpt, v); if (sm_is_err(ret)) goto error; /* See comments in qar_rcpt_st() */ ++aq_ta->aqt_rcpts_arf; goto done; /* XXX Really? */ } if (RT_R2Q_OWN_N == rt) { uint n, map_idx, total_owners; rcpt_idx_T owner_idx; sm_str_P *owners_pa; sm_str_P pa; owners_pa = NULL; pa = NULL; roc.roc_add_owners = v; owner_idx = roc.roc_old_owners = aq_ta->aqt_owners_n; total_owners = roc.roc_add_owners + roc.roc_old_owners; SM_ASSERT(total_owners >= roc.roc_add_owners); SM_ASSERT(total_owners >= roc.roc_old_owners); owners_pa = sm_zalloc(total_owners * sizeof(*owners_pa)); if (NULL == owners_pa) { ret = sm_error_temp(SM_EM_Q_AR2Q, ENOMEM); goto error; } roc.roc_map_size = v * sizeof(*roc.roc_map); if (roc.roc_map_size < v || roc.roc_map_size < sizeof(*roc.roc_map)) { ret = sm_error_perm(SM_EM_Q_AR2Q, SM_E_OVFLW_SC); goto err_own; } roc.roc_map = sm_zalloc(roc.roc_map_size); if (roc.roc_map == NULL) { ret = sm_error_temp(SM_EM_Q_AR2Q, ENOMEM); goto err_own; } QM_LEV_DPRINTFC(QDC_A2Q, 6, (QM_DEBFP, "func=qm_fr_ar_react, owners=%d\n", v)); for (n = v, map_idx = 0; n > 0; n--) { ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); if (sm_is_err(ret) || l != 4 || rt != RT_R2Q_OWN_IDX) { rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); goto err_own; } SM_ASSERT(map_idx < roc.roc_add_owners); roc.roc_map[map_idx++] = v; ret = sm_rcb_get2uint32(rcb, &l, &rt); if (sm_is_err(ret) || rt != RT_R2Q_OWN_PA) { rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); goto err_own; } pa = NULL; ret = sm_rcb_getnstr(rcb, &pa, l); if (sm_is_err(ret)) goto err_own; QM_LEV_DPRINTFC(QDC_A2Q, 6, (QM_DEBFP, "func=qm_fr_ar_react, owner_idx=%d, rcpt_idx=%d, pa=%S\n", owner_idx, roc.roc_map[map_idx - 1], pa)); SM_ASSERT(owner_idx < total_owners); owners_pa[owner_idx] = pa; pa = NULL; ++owner_idx; } ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); if (sm_is_err(ret) || l != 4) { rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); goto err_own; } /* copy over old data */ if (aq_ta->aqt_owners_pa != NULL) { SM_ASSERT(aq_ta->aqt_owners_n > 0); /* use sm_memcpy()? */ for (n = 0; n < aq_ta->aqt_owners_n; n++) owners_pa[n] = aq_ta->aqt_owners_pa[n]; SM_FREE(aq_ta->aqt_owners_pa); } aq_ta->aqt_owners_pa = owners_pa; owners_pa = NULL; aq_ta->aqt_owners_n += roc.roc_add_owners; err_own: if (sm_is_err(ret)) { if (owners_pa != NULL) { for (n = roc.roc_old_owners; n < roc.roc_add_owners; n++) SM_STR_FREE(owners_pa[n]); SM_FREE(owners_pa); } SM_FREE_SIZE(roc.roc_map, roc.roc_map_size); goto error; } } if (RT_R2Q_RCPT_AE == rt) { nae = v; if (nae <= 1) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_COMM, QM_LMOD_FROM_SMAR, SM_LOG_ERR, 3, "sev=ERROR, func=qm_fr_ar_react, number_of_aliases=%u, status=illegal_value" , v); goto error; } } if (1 == nae) { sm_ret_T rcpt_state; rcpt_state = SM_SUCCESS; if (RT_R2Q_RCPT_ST == rt) { QM_LEV_DPRINTFC(QDC_A2Q, 3, (QM_DEBFP, "sev=WARN, func=qm_fr_ar_react, rcpt_id=%s, ss_ta=%s, idx=%d, state=%#x, addr_max=%d, where=individual\n", rcpt_id, ta_ss_id, rcpt_idx, v, aq_rcpt->aqr_addr_max)); ret = qar_rcpt_st(aq_rcpt, v); if (sm_is_err(ret)) goto error; rcpt_state = v; /* XXX See comments in qar_rcpt_st() */ ++aq_ta->aqt_rcpts_arf; } else if (rt != RT_R2Q_RCPT_DA) { rv = sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); goto error; } /* SM_ASSERT(RT_R2Q_RCPT_DA == rt) */ aq_rcpt->aqr_da_idx = v; /* RT_R2Q_RCPT_PORT (optional) or RT_R2Q_OWN_REF */ ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); if (sm_is_err(ret) || l != 4) { rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); goto error; } if (RT_R2Q_RCPT_PORT == rt) { aq_rcpt->aqr_port = (short) v; ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); } if (sm_is_err(ret) || l != 4 || rt != RT_R2Q_OWN_REF) { rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); goto error; } if (v > 0) { ret = qar_ownidxmap(v, &roc); if (sm_is_err(ret)) goto error; aq_rcpt->aqr_owner_idx = (rcpt_idx_T) ret; } if (sm_is_success(sm_rcb_peek2uint32(rcb, &l, &rt)) && 4 == l && RT_R2Q_RCPT_FL == rt) { ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); if (sm_is_err(ret) || l != 4 || rt != RT_R2Q_RCPT_FL) { rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); goto error; } if (SM_IS_FLAG(v, SMARRT_FL_VERP)) AQR_SET_FLAG(aq_rcpt, AQR_FL_HAS_VERP); } #if MTA_USE_TLS if (!SM_RCB_ISEOB(rcb) && sm_is_success(sm_rcb_peek2uint32(rcb, &l, &rt)) && 4 == l && RT_R2Q_MAP_RES_CNF_RCPT == rt) { ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); if (sm_is_err(ret) || l != 4 || rt != RT_R2Q_MAP_RES_CNF_RCPT) { rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); goto error; } aq_rcpt->aqr_maprescnf = v; if (sm_is_temp_err(v) && !AQR_IS_FLAG(aq_rcpt, AQR_FL_TEMP)) { aq_rcpt->aqr_status_new = SMTP_MAP_TEMP; AQR_SET_FLAG(aq_rcpt, AQR_FL_TEMP|AQR_FL_ARF|AQR_FL_STAT_NEW); } ret = sm_rcb_get2uint32(rcb, &l, &rt); if (sm_is_err(ret) || rt != RT_R2Q_RHS_CNF_RCPT) { rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); goto error; } ret = sm_rcb_getnstr(rcb, &aq_rcpt->aqr_conf, l); if (sm_is_err(ret)) { rv = ret; goto error; } QM_LEV_DPRINTFC(QDC_A2Q, 8, (QM_DEBFP, "func=qm_fr_ar_react, conf=%x\n", aq_rcpt->aqr_conf)); } #endif /* MTA_USE_TLS */ if (rcpt_state != SM_SUCCESS && SM_RCB_ISEOB(rcb)) goto done; ret = sm_rcb_get2uint32(rcb, &l, &rt); if (sm_is_err(ret)) { rv = ret; goto error; } if (RT_R2Q_RCPT_PA == rt) { sm_str_P pa; /* save original address, get new one, assign old one */ pa = aq_rcpt->aqr_pa; aq_rcpt->aqr_pa = NULL; ret = sm_rcb_getnstr(rcb, &aq_rcpt->aqr_pa, l); QM_LEV_DPRINTFC(QDC_A2Q, 3, (QM_DEBFP, "func=qm_fr_ar_react, new_addr=%S, ret=%r\n", aq_rcpt->aqr_pa, ret)); if (sm_is_err(ret)) { sm_str_free(pa); goto error; } aq_rcpt->aqr_orig_pa = pa; if (rcpt_state != SM_SUCCESS) goto done; /* get RT_R2Q_RCPT_NAR, value is read below */ ret = sm_rcb_get2uint32(rcb, &l, &rt); } if (sm_is_err(ret) || rt != RT_R2Q_RCPT_NAR || l != 4) { rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR); goto error; } /* get value for RT_R2Q_RCPT_NAR */ ret = sm_rcb_getuint32(rcb, &v); if (sm_is_err(ret)) { rv = ret; goto error; } if (v < 1) goto error; ret = qar_get_addrs(aq_rcpt, rcb, now, v, &rv); if (sm_is_err(ret)) goto error; ret = aq_rdq_add(aq_ctx, aq_rcpt, &v, THR_NO_LOCK); if (sm_is_err(ret)) { /* XXX what to do on error? */ QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_ar_react, aq_rdq_add=%r\n", ret)); goto error; } else { /* notify_sched = !SM_IS_FLAG(v, AQRDQ_FL_OCEXC); */ QM_LEV_DPRINTFC(QDC_A2Q, 4, (QM_DEBFP, "sev=DBG, func=qm_fr_ar_react, aq_rdq_add=%r, flags=%#x\n", ret, aq_rcpt->aqr_flags)); } } else { /* ** Who does the proper error handling? ** - failed temporarily: mark aq_rcpt properly ** AQR_FL_TEMP|AQR_FL_ARF|AQR_FL_RCVD4AR|AQR_FL_STAT_NEW ** - failed permanently: trigger bounce? ** - other kind of errors? */ ret = qar_alias(qar_ctx, aq_rcpt, nae, rcb, &rv, &roc); if (sm_is_err(ret)) goto error; } done: ret = aq_waitq_rm(qmgr_ctx->qmgr_aq, aq_rcpt, AQWQ_AR, false); if (sm_is_err(ret)) QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_ar_react, aq_waitq_rm=%r\n", ret)); if (AQR_IS_FLAG(aq_rcpt, AQR_FL_ARF)) { /* first rm and then add: just move it? */ ret = aq_waitq_add(qmgr_ctx->qmgr_aq, aq_rcpt, 0, AQWQ_AR, false); if (sm_is_err(ret)) QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_ar_react, aq_waitq_add=%r\n", ret)); else { ret = qmgr_set_aq_cleanup(qmgr_ctx->qmgr_cleanup_ctx, now, true); if (sm_is_err(ret)) QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_ar_react, qmgr_set_aq_cleanup=%r\n", ret)); } } /* call this before decrementing the counter? See the function! */ ret = qm_ar_actsched(qmgr_ctx, aq_rcpt, ¬ify_sched); if (!AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_BNC|AQR_FL_IS_DBNC)) { SM_ASSERT(aq_ta->aqt_rcpts_ar > 0); --aq_ta->aqt_rcpts_ar; } unlock: if (SM_IS_FLAG(fct_state, FST_EDB_LOCKED)) { r = pthread_mutex_unlock(&qmgr_ctx->qmgr_edbc->edbc_mutex); SM_ASSERT(0 == r); if (0 == r) SM_CLR_FLAG(fct_state, FST_EDB_LOCKED); } /* remove expanded address from AQ */ if (nae > 1) { ret = aq_rcpt_rm(aq_ctx, aq_rcpt, 0); if (sm_is_success(ret)) { SM_ASSERT(aq_ta->aqt_rcpts_inaq > 0); --aq_ta->aqt_rcpts_inaq; aq_rcpt = NULL; } } if (SM_IS_FLAG(fct_state, FST_AQ_RCPT_LOCKED)) { /* NOTE: aq_rcpt might be NULL already! */ ret = aq_rcpt_lockop(aq_ctx, aq_rcpt, THR_UNLOCK_IT); SM_CLR_FLAG(fct_state, FST_AQ_RCPT_LOCKED); } QM_LEV_DPRINTFC(QDC_A2Q, 3, (QM_DEBFP, "sev=DBG, func=qm_fr_ar_react, rcpt_id=%s, status=updated, idx=%d, state=%d, aqt_rcpts_ar=%u, ret=%r, notify_sched=%d\n", rcpt_id, rcpt_idx, aq_rcpt == NULL ? -1 : aq_rcpt->aqr_status, aq_ta->aqt_rcpts_ar, ret, notify_sched)); /* ** Activate scheduler, but how? Always or use some test? ** Note: the scheduler must also be activated when a recipient has ** has been resolved because it tests the number of "outstanding" ** recipients before it schedules a transaction. ** Does this mean it should always be called? It might be nice ** to provide a simple function that tests whether the scheduler ** should be activated or some flags that tell the scheduler ** what has changed so it doesn't need to "walk" through all ** the ready queues... */ if (notify_sched) { timeval_T nowt; /* activate scheduler */ ret = evthr_timeval(qmgr_ctx->qmgr_ar_tsk->evthr_t_ctx, &nowt); ret = evthr_new_sl(qmgr_ctx->qmgr_sched, nowt, false); if (sm_is_err(ret)) { /* what to do? */ sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_COMM, QM_LMOD_FROM_SMAR, SM_LOG_ERR, 4, "sev=ERROR, func=qm_fr_ar_react, evthr_new_sl=%m" , ret); } else QM_LEV_DPRINTFC(QDC_A2Q, 4, (QM_DEBFP, "func=qm_fr_ar_react, evthr_new_sl=%r\n", ret)); } SM_FREE_SIZE(roc.roc_map, roc.roc_map_size); ret = sm_rcb_close_dec(qar_ctx->qar_com.rcbcom_rdrcb); SM_CLR_FLAG(fct_state, FST_RCB_OPEN_DEC); (void) sm_rcb_open_rcv(qar_ctx->qar_com.rcbcom_rdrcb); return ret; error: if (aq_rcpt != NULL && aq_rcpt->aqr_addrs != NULL && !AQR_IS_FLAG(aq_rcpt, AQR_FL_MEMAR) && aq_rcpt->aqr_addrs != &aq_rcpt->aqr_addr_mf) { SM_FREE(aq_rcpt->aqr_addrs); } if (SM_IS_FLAG(fct_state, FST_AQ_RCPT_LOCKED)) { (void) aq_rcpt_lockop(aq_ctx, aq_rcpt, THR_UNLOCK_IT); SM_CLR_FLAG(fct_state, FST_AQ_RCPT_LOCKED); } if (SM_IS_FLAG(fct_state, FST_EDB_LOCKED)) { r = pthread_mutex_unlock(&qmgr_ctx->qmgr_edbc->edbc_mutex); SM_ASSERT(0 == r); if (0 == r) SM_CLR_FLAG(fct_state, FST_EDB_LOCKED); } if (SM_IS_FLAG(fct_state, FST_RCB_OPEN_DEC)) { /* use rcb functions that don't do check the state */ (void) sm_rcb_close_decn(qar_ctx->qar_com.rcbcom_rdrcb); SM_CLR_FLAG(fct_state, FST_RCB_OPEN_DEC); } /* open rcb to receive next record */ (void) sm_rcb_open_rcvn(qar_ctx->qar_com.rcbcom_rdrcb); SM_FREE_SIZE(roc.roc_map, roc.roc_map_size); if (sm_is_err(rv)) QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_ar_react, rt=%#x, v=%d, l=%d, ret=%r, rv=%r\n", rt, v, l, ret, rv)); return rv; } #undef FST_EDB_LOCKED #undef FST_AQ_RCPT_LOCKED #undef FST_RCB_OPEN_DEC /* ** AR2QMGR -- AR - QMGR interface ** ** Parameters: ** tsk -- evthr task ** ** Returns: ** usual sm_error code ** ** Locking: ** qar_ctx: locked by access via task ** ** Called by: qmgr_ar() for read events ** ** Last code review: 2003-10-16 */ sm_ret_T qm_fr_ar(sm_evthr_task_P tsk) { int fd, r; sm_ret_T ret; qmgr_ctx_P qmgr_ctx; qar_ctx_P qar_ctx; SM_IS_EVTHR_TSK(tsk); qar_ctx = (qar_ctx_P) tsk->evthr_t_actx; SM_IS_QAR_CTX(qar_ctx); qmgr_ctx = qar_ctx->qar_qmgr_ctx; SM_IS_QMGR_CTX(qmgr_ctx); fd = tsk->evthr_t_fd; /* checked in caller */ ret = sm_rcb_rcv(fd, qar_ctx->qar_com.rcbcom_rdrcb, QSS_RC_MINSZ); QM_LEV_DPRINTTC(QDC_A2Q, 4, (QM_DEBFP, "sev=DBG, func=qm_fr_ar, tsk=%p, fd=%d, ret=%r, buf=%d, len=%d\n", tsk, fd, ret, qar_ctx->qar_com.rcbcom_rdrcb->sm_rcb_base[0], sm_rcb_getlen(qar_ctx->qar_com.rcbcom_rdrcb)), qmgr_ctx->qmgr_ev_ctx->evthr_c_time); if (ret > 0) return EVTHR_WAITQ; else if (0 == ret) { ret = sm_rcb_close_rcv(qar_ctx->qar_com.rcbcom_rdrcb); /* start appropriate function ... */ ret = qm_fr_ar_react(qar_ctx); QM_LEV_DPRINTFC(QDC_A2Q, 5, (QM_DEBFP, "sev=DBG, func=qm_fr_ar, qm_fr_ar_react=%r\n", ret)); if (sm_is_err(ret)) goto termit; /* too harsh? */ else if (QMGR_R_WAITQ == ret) return EVTHR_WAITQ; else if (QMGR_R_ASYNC == ret) return EVTHR_OK; else if (EVTHR_DEL == ret) goto termit; /* terminate this SMTPC */ else return ret; } else if (SM_IO_EOF == ret) { ret = sm_rcb_close_rcv(qar_ctx->qar_com.rcbcom_rdrcb); termit: QM_LEV_DPRINTFC(QDC_A2Q, 1, (QM_DEBFP, "sev=DBG, func=qm_fr_ar, task=%p, status=terminate, ret=%r\n", qar_ctx->qar_com.rcbcom_tsk, ret)); close(fd); /* XXX see comment in qm_fr_ss */ tsk->evthr_t_fd = INVALID_FD; /* make it invalid */ qar_ctx->qar_status = QSC_ST_SH_DOWN; r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex); SM_LOCK_OK(r); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_COMM, QM_LMOD_FROM_SMAR, SM_LOG_CRIT, 1, "sev=CRIT, func=qm_fr_ar, lock=%d", r); goto error; } qmgr_ctx->qmgr_ar_tsk = NULL; r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex); SM_ASSERT(0 == r); /* free qar_ctx? done in qmgr_stop() */ qar_ctx->qar_com.rcbcom_tsk = NULL; return EVTHR_DEL; } else { /* if (ret < 0) */ QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_ar, ret=%r, errno=%d\n", ret, errno)); } QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_ar, fd=%d\n", fd)); error: return EVTHR_DEL; }