/* * Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers. * All rights reserved. * * By using this file, you agree to the terms and conditions set * forth in the LICENSE file which can be found at the top level of * the sendmail distribution. */ #include "sm/generic.h" SM_RCSID("@(#)$Id: qm_fr_sc.c,v 1.136 2007/06/18 04:42:31 ca Exp $") #include "sm/error.h" #include "sm/assert.h" #include "sm/io.h" #include "sm/rcb.h" #include "sm/qmgr.h" #include "sm/qmgr-int.h" #include "sm/reccom.h" #include "sm/da.h" #include "qmgr.h" #include "log.h" /* if AQ size is bigger than the number of DAs: increase scheduler timeout */ #define QMGR_TMO_SCHED \ do { \ if (qmgr_ctx->qmgr_cnf.q_cnf_aq_size <= \ qmgr_ctx->qmgr_max_da_threads) \ qmgr_ctx->qmgr_tmo_sched = \ qmgr_ctx->qmgr_cnf.q_cnf_tmo_sched; \ else \ qmgr_ctx->qmgr_tmo_sched = \ (uint) (qmgr_ctx->qmgr_cnf.q_cnf_aq_size / \ (qmgr_ctx->qmgr_max_da_threads + 1)) \ * AQR_SCHED_TMOUT_FACT \ + qmgr_ctx->qmgr_cnf.q_cnf_tmo_sched; \ } while (0) /* ** QM_FR_SC_RCPTS -- Read (failed) recipient status from SMTPC and update AQ ** ** Note: this only updates the recipient status in AQ, qda_update_ta_stat() ** must be called afterwards to update the various DBs and counters ** What happens if this (partially) fails? The cleanup task should ** take care of that, i.e., the recipients will be removed (tempfail?) ** and tried later on again. ** ** Parameters: ** qmgr_ctx -- QMGR context ** da_ta_id -- DA transaction id ** rcb -- rcb containing info about recipient status, ** this can contain several entries. ** ** Side Effects: ** ** Returns: ** <0: usual sm_error code: protocol errors, ENOMEM, ** >0: number of recipients with status information ** ** Last code review: ** Last code change: */ static sm_ret_T qm_fr_sc_rcpts(qmgr_ctx_P qmgr_ctx, sessta_id_T da_ta_id, sm_rcb_P rcb) { uint32_t l, rt, idx, v; uint i, nrcpts; sm_ret_T ret, rv; sm_ret_T rcpt_status; aq_ctx_P aq_ctx; sm_str_P errmsg; SM_IS_QMGR_CTX(qmgr_ctx); aq_ctx = qmgr_ctx->qmgr_aq; SM_IS_AQ(aq_ctx); errmsg = NULL; rv = SM_SUCCESS; nrcpts = 0; QM_LEV_DPRINTTC(QDC_C2Q, 2, (QM_DEBFP, "func=qm_fr_sc_rcpts, da_ta=%s\n", da_ta_id), qmgr_ctx->qmgr_ev_ctx->evthr_c_time); ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); if (sm_is_err(ret) || l != 4 || rt != RT_C2Q_RCPT_N || v > INT_MAX) { rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR); sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_DASTAT, QM_LMOD_DASTAT, SM_LOG_ERR, 1, "sev=ERROR, func=qm_fr_sc_rcpts, da_ta=%s, rt=%#x, expected=%#x, l=%d, v=%#x, ret=%m" , da_ta_id, rt, RT_C2Q_RCPT_N, l, v, ret); goto error; } nrcpts = v; for (i = 0; i < nrcpts && !SM_RCB_ISEOB(rcb); i++) { ret = sm_rcb_get3uint32(rcb, &l, &rt, &idx); if (sm_is_err(ret) || l != 4 || rt != RT_C2Q_RCPT_IDX) { rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR); sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_DASTAT, QM_LMOD_DASTAT, SM_LOG_ERR, 1, "sev=ERROR, func=qm_fr_sc_rcpts, da_ta=%s, rt=%#x, expected=%#x, idx=%u, ret=%m" , da_ta_id, rt, RT_C2Q_RCPT_IDX, idx , ret); break; } ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); if (sm_is_err(ret) || l != 4 || rt != RT_C2Q_RCPT_ST) { rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR); sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_DASTAT, QM_LMOD_DASTAT, SM_LOG_ERR, 1, "sev=ERROR, func=qm_fr_sc_rcpts, rt=%#x, v=%u, expected=%#x, idx=%u, ret=%m", rt, v, idx, RT_C2Q_RCPT_ST, ret); break; } rcpt_status = STATUS2SMTPCODE(v); QM_LEV_DPRINTFC(QDC_C2Q, 4, (QM_DEBFP, "func=qm_fr_sc_rcpts, idx=%u, stat=%d\n", idx, rcpt_status)); errmsg = NULL; ret = sm_rcb_get2uint32(rcb, &l, &rt); if (sm_is_err(ret)) { rv = ret; sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_DASTAT, QM_LMOD_DASTAT, SM_LOG_ERR, 1, "sev=ERROR, func=qm_fr_sc_rcpts, sm_rcb_get2uint(errmsg)=%m", ret); break; } if (rt != RT_C2Q_RCPT_STT) { rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR); sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_DASTAT, QM_LMOD_DASTAT, SM_LOG_ERR, 1, "sev=ERROR, func=qm_fr_sc_rcpts, da_ta=%s, rt=%#x, expected=%#x, idx=%u, ret=%m" , da_ta_id, rt, RT_C2Q_RCPT_IDX, idx , ret); break; } #if QMGR_TEST /* trigger an error if requested */ if (SM_IS_FLAG(qmgr_ctx->qmgr_cnf.q_cnf_tests, QMGR_TEST_RCPT_STAT) && rcpt_status == QMGR_TEST_RSR_ST) { QM_LEV_DPRINTFC(QDC_C2Q, 4, (QM_DEBFP, "func=qm_fr_sc_rcpts, where=test_error\n")); rv = sm_error_perm(SM_EM_STR, ENOMEM); break; } #endif /* QMGR_TEST */ ret = sm_rcb_getnstr(rcb, &errmsg, l); if (sm_is_err(ret)) { rv = ret; sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_DASTAT, QM_LMOD_DASTAT, SM_LOG_ERR, 1, "sev=ERROR, func=qm_fr_sc_rcpts, sm_rcb_getnstr(errmsg)=%m", ret); break; } QM_LEV_DPRINTFC(QDC_C2Q, 4, (QM_DEBFP, "func=qm_fr_sc_rcpts, rt=RT_C2Q_RCPT_STT, idx=%u, msg=%S\n", idx, errmsg)); sm_str_sanitize(errmsg); /* do something more with this?? */ if (smtp_is_reply_temp(rcpt_status) || smtp_is_reply_fail(rcpt_status)) { /* ** XXX This could be more efficient depending on ** the implementation of AQ: we could "just" ** walk through the recipients for this delivery ** attempt. ** ** Should error state always be DA_TA_ERR_RCPT_R? ** It is an individual error, so it must have been ** the reply to a RCPT command, right? ** The error state coming back from the DA is only ** one "global" state (for the entire transaction), ** but the error for a recipient can only(?) occurred ** during RCPT, otherwise no individual error would ** have been returned by the DA. */ ret = aq_rcpt_status(aq_ctx, da_ta_id, idx, rcpt_status, DA_TA_ERR_RCPT_R, errmsg); errmsg = NULL; if (sm_is_err(ret)) { /* can happen if rcpt was too long in AQ */ sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_DASTAT, QM_LMOD_DASTAT, SM_LOG_ERR, (ret == sm_error_perm(SM_EM_AQ, SM_E_NOTFOUND)) ? 8 : 1, "sev=ERROR, func=qm_fr_sc_rcpts, stat=cannot_update_rcpt, da_ta=%s, idx=%u, ret=%m", da_ta_id, idx, ret); if (ret != sm_error_perm(SM_EM_AQ, SM_E_NOTFOUND)) { rv = ret; break; } continue; } } else { SM_STR_FREE(errmsg); rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR); sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_DASTAT, QM_LMOD_DASTAT, SM_LOG_ERR, 1, "sev=ERROR, func=qm_fr_sc_rcpts, status=bad_value, da_ta=%s, idx=%u, rcpt_stat=%d", da_ta_id, idx, rcpt_status); } } if (rv == SM_SUCCESS && i < nrcpts) rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR); error: return (SM_SUCCESS == rv) ? (sm_ret_T) nrcpts : rv; } /* ** QM_FR_SC_REACT -- Decode data received from SMTPC and act accordingly ** ** Parameters: ** qsc_ctx -- SMTPC context ** ** Returns: ** usual sm_error code */ static sm_ret_T qm_fr_sc_react(qsc_ctx_P qsc_ctx #if QM_TIMING , uint32_t *pfrt #endif ) { uint32_t v, l, rt, tl, where; sm_ret_T ret, rv; sm_rcb_P rcb; qmgr_ctx_P qmgr_ctx; aq_ctx_P aq_ctx; aq_ta_P aq_ta; dadb_entry_P dadb_entry; sm_str_P errmsg; SM_IS_QSC_CTX(qsc_ctx); qmgr_ctx = qsc_ctx->qsc_qmgr_ctx; SM_IS_QMGR_CTX(qmgr_ctx); ret = rv = SM_SUCCESS; errmsg = NULL; #if QM_TIMING *pfrt = 0; #endif /* Decode rcb */ rcb = qsc_ctx->qsc_com.rcbcom_rdrcb; ret = sm_rcb_open_dec(rcb); if (sm_is_err(ret)) { rv = ret; goto error; } aq_ctx = qmgr_ctx->qmgr_aq; SM_IS_AQ(aq_ctx); /* Total length of record */ ret = sm_rcb_getuint32(rcb, &tl); if (sm_is_err(ret) || tl > QM_SS_MAX_REC_LEN || tl > sm_rcb_getlen(rcb)) { rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_SC2Q, SM_E_RCB2LONG); goto err2; } /* Decode data, act accordingly... */ /* Protocol header: version */ ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); if (sm_is_err(ret)) { rv = ret; goto err2; } if (l != 4 || rt != RT_PROT_VER || v != PROT_VER_RT) { rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_V_MISM); goto err2; } /* queuemanager.func.tex: */ /* SMTPC (new/existing/close) ID */ ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); QM_LEV_DPRINTTC(QDC_C2Q, 4, (QM_DEBFP, "func=qm_fr_sc_react,1 id=%d, stat=%d, rt=%#x, v=%d\n", qsc_ctx->qsc_id, qsc_ctx->qsc_status, rt, v), qmgr_ctx->qmgr_ev_ctx->evthr_c_time); if (sm_is_err(ret) || l != 4) { rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR); goto err2; } if (RT_C2Q_NID == rt) { int r; /* New SMTPC */ /* XXX protect access? */ /* ** XXX perform similar checks as in qm_fr_ss().c ** to prevent multiple clients with the same id! */ if (qsc_ctx->qsc_status != QSC_ST_NONE || qsc_ctx->qsc_id != QSC_ID_NONE) { /* XXX Internal error? Stop task? */ goto err2; } qsc_ctx->qsc_id = v; /* Max number of threads */ ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); if (sm_is_err(ret) || l != 4 || rt != RT_C2Q_MAXTHR || v <= 0) { rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR); goto err2; } #if 0 qsc_ctx->qsc_maxthreads = v; #endif QM_LEV_DPRINTFC(QDC_C2Q, 1, (QM_DEBFP, "func=qm_fr_sc_react, new id=%d, maxthreads=%d\n", qsc_ctx->qsc_id, v)); ret = dadb_new(&qsc_ctx->qsc_dadb_ctx, v); if (sm_is_err(ret)) goto err2; qsc_ctx->qsc_status = QSC_ST_START; if (!SM_RCB_ISEOB(rcb)) { rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR); goto err2; } qsc_ctx->qsc_status = QSC_ST_OK; r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex); SM_LOCK_OK(r); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SMTPC, QM_LMOD_FROM_SMTPC, SM_LOG_CRIT, 4, "sev=CRIT, func=qm_fr_sc_react, lock=%d", r); } else { qmgr_ctx->qmgr_max_da_threads += v; QMGR_TMO_SCHED; r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex); SM_ASSERT(0 == r); } goto done; } else if (RT_C2Q_ID == rt) { /* SMTPC id just for identification */ /* XXX protect access? */ if (qsc_ctx->qsc_status == QSC_ST_NONE || qsc_ctx->qsc_id != v) goto err2; } else if (RT_C2Q_CID == rt) { /* SMTPC shuts down */ /* XXX protect access? */ if (qsc_ctx->qsc_status == QSC_ST_NONE || qsc_ctx->qsc_id != v) { /* XXX Internal error? Stop task? */ goto err2; } /* Check for EOB? not really: we shut down anyway */ qsc_ctx->qsc_status = QSC_ST_SH_DOWN; (void) sm_rcb_close_dec(qsc_ctx->qsc_com.rcbcom_rdrcb); /* ** We assume that no open sessions/transactions exist, ** i.e., SMTPC properly terminated them before sending this ** message. */ /* Terminate (delete) this task, qsc_ctx is cleaned in caller */ return EVTHR_DEL; } else goto err2; /* rt == RT_C2Q_ID is the only case in which we continue here */ #if 0 oldstatus = qsc_ctx->qsc_status; #endif /* what's next? */ ret = sm_rcb_get2uint32(rcb, &l, &rt); QM_LEV_DPRINTFC(QDC_C2Q, 2, (QM_DEBFP, "func=qm_fr_sc_react, where=2, id=%d, stat=%d, rt=%#x, v=%d\n", qsc_ctx->qsc_id, qsc_ctx->qsc_status, rt, v)); if (sm_is_err(ret)) { rv = ret; goto err2; } #if QM_TIMING *pfrt = rt; #endif if (RT_C2Q_STAT == rt) { if (l != 4) { rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR); goto err2; } /* Status of SMTPC */ ret = sm_rcb_getuint32(rcb, &v); if (sm_is_err(ret)) { rv = ret; goto err2; } /* XXX protect access? */ qsc_ctx->qsc_status = v; QM_LEV_DPRINTFC(QDC_C2Q, 1, (QM_DEBFP, "func=qm_fr_sc_react, id=%d, stat=%d\n", qsc_ctx->qsc_id, qsc_ctx->qsc_status)); /* Max number of threads */ ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); if (sm_is_err(ret) || l != 4 || rt != RT_C2Q_MAXTHR) { rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR); goto err2; } #if 0 r = pthread_mutex_lock(&qsc_ctx->qsc_mutex); SM_LOCK_OK(r); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SMTPC, QM_LMOD_FROM_SMTPC, SM_LOG_CRIT, 1, "sev=CRIT, func=qm_fr_sc_react, lock_qsc=%d" , r); } else { qsc_ctx->qsc_maxthreads = v; r = pthread_mutex_unlock(&qsc_ctx->qsc_mutex); SM_ASSERT(0 == r); } #else /* 0 */ ret = dadb_set_limit(qsc_ctx->qsc_dadb_ctx, v, THR_LOCK_UNLOCK); if (sm_is_err(ret)) { QM_LEV_DPRINTFC(QDC_C2Q, 1, (QM_DEBFP, "sev=ERROR, func=qm_fr_sc_react, dadb_set_limit=%r\n", ret)); /* XXX */ } #endif /* 0 */ QM_LEV_DPRINTFC(QDC_C2Q, 3, (QM_DEBFP, "func=qm_fr_sc_react, id=%d, maxthreads=%d\n", qsc_ctx->qsc_id, v)); if (SM_RCB_ISEOB(rcb)) goto done; /* more to come: should be session status! */ ret = sm_rcb_get2uint32(rcb, &l, &rt); if (sm_is_err(ret)) { rv = ret; goto err2; } } if (RT_C2Q_SEID == rt) { sessta_id_T da_se_id; if (l != SMTP_STID_SIZE) { rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR); goto err2; } /* Session id */ ret = sm_rcb_getn(rcb, (uchar *) da_se_id, l); if (sm_is_err(ret)) { rv = ret; goto errseid; } /* ** XXX What to do here? ** The session open failed, so it was impossible to start any ** transaction. Back to the scheduler to try something ** else... where/how do we record this failure? */ ret = dadb_se_find(qsc_ctx->qsc_dadb_ctx, da_se_id, &dadb_entry); QM_LEV_DPRINTFC(QDC_C2Q, 2, (QM_DEBFP, "func=qm_fr_sc_react, id=%d, da_se-id=%s, ta-id=%s, dadb_se_find=%r\n", qsc_ctx->qsc_id, da_se_id, dadb_entry->dadbe_da_ta_id, ret)); if (sm_is_err(ret)) goto errseid; /* Session status */ ret = sm_rcb_get4uint32(rcb, &l, &rt, &v, &where); QM_LEV_DPRINTFC(QDC_C2Q, 2, (QM_DEBFP, "func=qm_fr_sc_react, where=RT_C2Q_SEID id=%d, stat=%d, rt=%#x, v=%#x, where=%#x, ret=%r\n", qsc_ctx->qsc_id, qsc_ctx->qsc_status, rt, v, where, ret)); if (sm_is_err(ret) || l != 8 || (rt != RT_C2Q_SESTAT && rt != RT_C2Q_SECLSD)) { rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR); goto errseid; } /* Note: rt must be preserved! at least until (RT_C2Q_SECLSD == rt) */ /* ** XXX HACK XXX Always a temporary error. ** Should this be done in SMTPC or should the conversion ** happen here? ** What about error codes during EHLO (i.e., the SMTP ** session negotation)? ** What about 5xy codes? sm8 tries the next host no ** matter what... ** XXX HACK: if 5xy code -> change to 4xy to make it ** a temporary error to try the next host. ** NOTE: this should NOT be done here... Instead some ** other logic should decide whether another try should ** be made... this logic should depend on "where" the ** error occurred, i.e., a session error should(?) be ** handled differently than a transaction error. */ if (sm_is_err(v)) { if ((sm_ret_T) v == sm_error_perm(SM_EM_SMTPC, SM_E_TTMYSELF)) v = SMTPC_SE_TTMYSELF; else if (sm_error_value(v) == ETIMEDOUT) v = SMTPC_SE_OP_TMO; else if (sm_error_value(v) == ECONNREFUSED) v = SMTPC_SE_OP_REFUSED; else if (sm_error_value(v) == ENETUNREACH || sm_error_value(v) == EHOSTUNREACH) v = SMTPC_SE_OP_UNREACH; else v = SMTPC_SE_OPEN_ST; } else if (IS_SMTP_REPLY(v) && smtp_is_reply_fail(v)) v -= 100; /* XXX 5xy -> 4xy; see above */ /* keep the code...??? */ if (!SM_RCB_ISEOB(rcb)) { uint32_t rtl; /* Error message */ ret = sm_rcb_get2uint32(rcb, &l, &rtl); if (sm_is_err(ret) || rtl != RT_C2Q_STATT) { rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR); goto errtaid; } ret = sm_rcb_getnstr(rcb, &errmsg, l); if (sm_is_err(ret)) { rv = ret; goto errtaid; } QM_LEV_DPRINTFC(QDC_C2Q, 4, (QM_DEBFP, "func=qm_fr_sc_react, where=seid, rt=RT_C2Q_STATT, id=%d, msg=%S\n", qsc_ctx->qsc_id, errmsg)); sm_str_sanitize(errmsg); } /* don't decrease curactive before closing session */ #if 0 #define QSC_CURACTIVE_DECR do { \ r = pthread_mutex_lock(&qsc_ctx->qsc_mutex); \ SM_LOCK_OK(r); \ if (r != 0) { \ sm_log_write(qmgr_ctx->qmgr_lctx, \ QM_LCAT_SMTPC, QM_LMOD_FROM_SMTPC, \ SM_LOG_CRIT, 1, \ "sev=CRIT, func=qm_fr_sc_react, lock_qsc=%d", r);\ } \ else { \ SM_ASSERT(qsc_ctx->qsc_curactive > 0); \ qsc_ctx->qsc_curactive--; \ r = pthread_mutex_unlock(&qsc_ctx->qsc_mutex);\ SM_ASSERT(0 == r); \ } \ } while (0) #else /* 0 */ #define QSC_CURACTIVE_DECR SM_NOOP #endif /* 0 */ /* session is just closed? */ if (RT_C2Q_SECLSD == rt) { /* ** "race condition": ** smtpc may close a session due to timeout ** qmgr may want to reuse it ** the messages "cross" each other, i.e., ** smtpc closed the session before it got the ** new task and qmgr sent the new task before it ** knows about the close. ** how to detect this? */ if (DADBE_IS_FLAG(dadb_entry, DADBE_FL_BUSY) && !DADBE_IS_FLAG(dadb_entry, DADBE_FL_IDLE)) goto done; /* need to close entry in DADB */ ret = dadb_sess_close_entry(qmgr_ctx, qsc_ctx->qsc_dadb_ctx, dadb_entry, false, NULL, THR_LOCK_UNLOCK); QSC_CURACTIVE_DECR; if (sm_is_err(ret)) { QM_LEV_DPRINTFC(QDC_C2Q, 1, (QM_DEBFP, "sev=ERROR, func=qm_fr_sc_react, id=%d, da_se-id=%s, dadb_sess_close_entry=%r\n", qsc_ctx->qsc_id, da_se_id, ret)); goto errseid; } goto done; } /* ** XXX Update session status XXX ** Update all (open) transactions within the session. ** Currently there can be only one transaction. ** Look it up and mark it (all recipients) as ** "temporarily failed". */ /* QM_HRBT_DPRINTF(QDC_C2Q_TM, (QM_DEBFP, "func=qm_fr_sc_react, before=dadb_ta_find\n")); */ ret = dadb_ta_find(qsc_ctx->qsc_dadb_ctx, dadb_entry->dadbe_da_ta_id, &dadb_entry); QM_LEV_DPRINTFC(QDC_C2Q, 2, (QM_DEBFP, "func=qm_fr_sc_react, id=%d, da_se-id=%s, ta-id=%s, dadb_ta_find=%r\n", qsc_ctx->qsc_id, da_se_id, dadb_entry->dadbe_da_ta_id, ret)); if (sm_is_err(ret)) goto errseid; SM_ASSERT(dadb_entry->dadbe_ss_ta_id != NULL); SM_ASSERT(*dadb_entry->dadbe_ss_ta_id != '\0'); /* XXX Hack, more or less copy from below XXX */ /* QM_HRBT_DPRINTF(QDC_C2Q_TM, (QM_DEBFP, "func=qm_fr_sc_react, before=aq_ta_find\n")); */ if (dadb_entry->dadbe_rcpt != NULL && (aq_ta = dadb_entry->dadbe_rcpt->aqr_ss_ta) != NULL) { SM_ASSERT(SESSTA_EQ(dadb_entry->dadbe_ss_ta_id, aq_ta->aqt_ss_ta_id)); } else ret = aq_ta_find(qmgr_ctx->qmgr_aq, dadb_entry->dadbe_ss_ta_id, true, &aq_ta); QM_LEV_DPRINTFC(QDC_C2Q, 3, (QM_DEBFP, "sev=DBG, func=qm_fr_sc_react, id=%d, da_ta-id=%s, ss_ta-id=%s, aq_ta_find=%r\n", qsc_ctx->qsc_id, dadb_entry->dadbe_da_ta_id, dadb_entry->dadbe_ss_ta_id, ret)); if (sm_is_err(ret) && sm_error_perm(SM_EM_AQ, SM_E_NOTFOUND) == ret) { /* * entry could have been expired... ??? WHAT NOW? * we cannot simply stop reading; at least we * have to skip over the rest, but we also * need to clean up the DA session cache! */ sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_DASTAT, QM_LMOD_DASTAT, SM_LOG_WARN, 8, "sev=WARN, func=qm_fr_sc_react, stat=cannot_update_session, da_ta=%s, aq_ta_find=not_found", dadb_entry->dadbe_ss_ta_id); (void) sm_rcb_close_decn(qsc_ctx->qsc_com.rcbcom_rdrcb); (void) qda_dadb_close(qmgr_ctx, dadb_entry->dadbe_ss_ta_id, qsc_ctx->qsc_dadb_ctx, dadb_entry, v, &ret); goto done2; } if (sm_is_err(ret)) goto errseid; /* session must be closed */ DADBE_SET_FLAG(dadb_entry, DADBE_FL_SE_CL); /* QM_HRBT_DPRINTF(QDC_C2Q_TM, 3, (QM_DEBFP, "func=qm_fr_sc_react, before=qda_update_ta_stat\n")); */ /* Update transaction status */ ret = qda_update_ta_stat(qmgr_ctx, dadb_entry->dadbe_da_ta_id, v, where, qsc_ctx->qsc_dadb_ctx, dadb_entry, aq_ta, NULL, errmsg, THR_LOCK_UNLOCK); QSC_CURACTIVE_DECR; if (sm_is_err(ret)) { QM_LEV_DPRINTFC(QDC_C2Q, 3, (QM_DEBFP, "sev=DBG, func=qm_fr_sc_react, id=%d, da_ta-id=%s, ss_ta-id=%s qda_update_ta_stat=%r\n", qsc_ctx->qsc_id, dadb_entry->dadbe_da_ta_id, dadb_entry->dadbe_ss_ta_id, ret)); rv = ret; goto errseid; } /* XXX */ goto done; errseid: /* More cleanup? */ goto err2; } else if (rt == RT_C2Q_TAID || rt == RT_C2Q_TAID_CS) { sessta_id_T da_ta_id; if (l != SMTP_STID_SIZE) { rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR); goto err2; } /* Transaction id */ ret = sm_rcb_getn(rcb, (uchar *) da_ta_id, l); if (sm_is_err(ret)) { rv = ret; goto errtaid; } QM_HRBT_DPRINTF(QDC_C2Q_TM, 0, (QM_DEBFP, "func=qm_fr_sc_react, da_ta_id=%s\n", da_ta_id)); /* ** XXX Which TA do we want to find here? ** That is, how do we store the DA TA in the QMGR? ** Do we look up all recipients (sequentially)? ** Yes -> write a function update_da_ta_status() ** which updates the status for all recipients in ** this DA TA. */ /* QM_HRBT_DPRINTF(QDC_C2Q_TM, 2, (QM_DEBFP, "func=qm_fr_sc_react, before=dadb_ta_find\n")); */ ret = dadb_ta_find(qsc_ctx->qsc_dadb_ctx, da_ta_id, &dadb_entry); QM_LEV_DPRINTFC(QDC_C2Q, 3, (QM_DEBFP, "sev=DBG, func=qm_fr_sc_react, id=%d, da_ta-id=%s, dadb_ta_find=%r\n", qsc_ctx->qsc_id, da_ta_id, ret)); if (sm_is_err(ret)) goto errtaid; /* "last transaction, close session" */ if (RT_C2Q_TAID_CS == rt) { QSC_CURACTIVE_DECR; DADBE_SET_FLAG(dadb_entry, DADBE_FL_SE_CL); } else if (RT_C2Q_TAID == rt) DADBE_SET_FLAG(dadb_entry, DADBE_FL_TA_CL); SM_ASSERT(dadb_entry->dadbe_ss_ta_id != NULL); SM_ASSERT(*dadb_entry->dadbe_ss_ta_id != '\0'); /* QM_HRBT_DPRINTF(QDC_C2Q_TM, 2, (QM_DEBFP, "func=qm_fr_sc_react, before=aq_ta_find\n")); */ if (dadb_entry->dadbe_rcpt != NULL && (aq_ta = dadb_entry->dadbe_rcpt->aqr_ss_ta) != NULL) { SM_IS_AQ_RCPT(dadb_entry->dadbe_rcpt); SM_ASSERT(SESSTA_EQ(dadb_entry->dadbe_ss_ta_id, aq_ta->aqt_ss_ta_id)); } else ret = aq_ta_find(qmgr_ctx->qmgr_aq, dadb_entry->dadbe_ss_ta_id, true, &aq_ta); QM_LEV_DPRINTFC(QDC_C2Q, 3, (QM_DEBFP, "sev=DBG, func=qm_fr_sc_react, id=%d, da_ta-id=%s, ss_ta-id=%s aq_ta_find=%r\n", qsc_ctx->qsc_id, da_ta_id, dadb_entry->dadbe_ss_ta_id, ret)); if (sm_is_err(ret) && sm_error_perm(SM_EM_AQ, SM_E_NOTFOUND) == ret) { /* * ??? WHAT NOW???? * see other aq_ta_find() call and comment! */ sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_DASTAT, QM_LMOD_DASTAT, SM_LOG_WARN, 8, "sev=WARN, func=qm_fr_sc_react, stat=cannot_update_TA, da_ta=%s, aq_ta_find=not_found", dadb_entry->dadbe_ss_ta_id); (void) sm_rcb_close_decn(qsc_ctx->qsc_com.rcbcom_rdrcb); goto done2; } if (sm_is_err(ret)) goto errtaid; /* Status */ ret = sm_rcb_get4uint32(rcb, &l, &rt, &v, &where); QM_LEV_DPRINTFC(QDC_C2Q, 1, (QM_DEBFP, "sev=DBG, func=qm_fr_sc_react, where=RT_C2Q_status, id=%d, stat=%d, rt=%r, v=%d, where=%r, ret=%#x\n", qsc_ctx->qsc_id, qsc_ctx->qsc_status, rt, v, where, ret)); if (sm_is_err(ret) || l != 8) { rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR); goto errtaid; } if (RT_C2Q_SESTAT == rt) { /* XXX Update session status XXX */ /* Can this happen? Check protocol! */ sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SMTPC, QM_LMOD_FROM_SMTPC, SM_LOG_CRIT, 1, "sev=CRIT, func=qm_fr_sc_react, rt=%#x, status=unexpected" , rt); } else if (rt == RT_C2Q_TASTAT || rt == RT_C2Q_TARSTAT) { if (RT_C2Q_TARSTAT == rt) { /* Update recipient status */ ret = qm_fr_sc_rcpts(qmgr_ctx, da_ta_id, rcb); if (sm_is_err(ret)) { QM_LEV_DPRINTFC(QDC_C2Q, 4, (QM_DEBFP, "sev=DBG, func=qm_fr_sc_react, qm_fr_sc_rcpts=%r\n", ret)); rv = ret; goto errtaid; } } else if (!SM_RCB_ISEOB(rcb)) { /* Error message */ ret = sm_rcb_get2uint32(rcb, &l, &rt); if (sm_is_err(ret) || rt != RT_C2Q_STATT) { rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR); goto errtaid; } ret = sm_rcb_getnstr(rcb, &errmsg, l); if (sm_is_err(ret)) { rv = ret; goto errtaid; } QM_LEV_DPRINTFC(QDC_C2Q, 4, (QM_DEBFP, "sev=DBG, func=qm_fr_sc_react, where=taid, rt=RT_C2Q_STATT, id=%d, msg=%S\n", qsc_ctx->qsc_id, errmsg)); sm_str_sanitize(errmsg); } /* Update (rest of) transaction status */ /* QM_HRBT_DPRINTF(QDC_C2Q_TM, 2, (QM_DEBFP, "func=qm_fr_sc_react, before=qda_update_ta_stat\n")); */ ret = qda_update_ta_stat(qmgr_ctx, da_ta_id, STATUS2SMTPCODE(v), where, qsc_ctx->qsc_dadb_ctx, dadb_entry, aq_ta, NULL, errmsg, THR_LOCK_UNLOCK); QM_HRBT_DPRINTF(QDC_C2Q_TM, 0, (QM_DEBFP, "func=qda_update_ta_stat, da_ta_id=%s\n", da_ta_id)); if (sm_is_err(ret)) { rv = ret; goto errtaid; } } else { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SMTPC, QM_LMOD_FROM_SMTPC, SM_LOG_ERR, 8, "sev=ERROR, func=qm_fr_sc_react, rt=%#x, status=unexpected" , rt); rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR); goto err2; } /* XXX */ goto done; errtaid: /* more cleanup? */ goto err2; } #if 0 /* Other cases? */ else if (XXX == rt) #endif /* 0 */ else goto err2; done: ret = sm_rcb_close_dec(qsc_ctx->qsc_com.rcbcom_rdrcb); done2: (void) sm_rcb_open_rcv(qsc_ctx->qsc_com.rcbcom_rdrcb); SM_STR_FREE(errmsg); return rv; /* Preserve original error code! */ err2: /* Use rcb functions that don't do check the state */ (void) sm_rcb_close_decn(qsc_ctx->qsc_com.rcbcom_rdrcb); error: /* Open rcb for next record */ (void) sm_rcb_open_rcvn(qsc_ctx->qsc_com.rcbcom_rdrcb); QM_LEV_DPRINTFC(QDC_C2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_sc_react, id=%d, status=%#x, status=error_out, rt=%#x, v=%d, l=%d, ret=%r\n", qsc_ctx->qsc_id, qsc_ctx->qsc_status, rt, v, l, ret)); SM_STR_FREE(errmsg); /* ** Question: Fail on all errors? Only on those returned from ** qda_update_ta_stat() or qm_fr_sc_rcpts(), not protocol errors. */ if (QM_DA_STAT_FATAL(rv)) { /* which resource? doesn't matter? */ (void) qm_rsr_problem(qmgr_ctx, QMGR_RFL_MEM_I, THR_LOCK_UNLOCK); rv = EVTHR_TERM; } return rv; } /* ** QM_FR_SC -- SMTPC to QMGR interface ** ** Parameters: ** tsk -- evthr task ** ** Returns: ** usual sm_error code */ sm_ret_T qm_fr_sc(sm_evthr_task_P tsk) { int fd, r; sm_ret_T ret; uint da_threads; qmgr_ctx_P qmgr_ctx; qsc_ctx_P qsc_ctx; #if QM_TIMING uint32_t frt; #endif SM_IS_EVTHR_TSK(tsk); qsc_ctx = (qsc_ctx_P) tsk->evthr_t_actx; SM_IS_QSC_CTX(qsc_ctx); qmgr_ctx = qsc_ctx->qsc_qmgr_ctx; SM_IS_QMGR_CTX(qmgr_ctx); fd = tsk->evthr_t_fd; /* Checked in caller */ ret = sm_rcb_rcv(fd, qsc_ctx->qsc_com.rcbcom_rdrcb, QSS_RC_MINSZ); QM_LEV_DPRINTTC(QDC_C2Q, 4, (QM_DEBFP, "sev=DBG, func=qm_fr_sc, fd=%d, ret=%r, buf=%d, len=%d, qsc_status=%#x\n", fd, ret, qsc_ctx->qsc_com.rcbcom_rdrcb->sm_rcb_base[0], sm_rcb_getlen(qsc_ctx->qsc_com.rcbcom_rdrcb), qsc_ctx->qsc_status), qmgr_ctx->qmgr_ev_ctx->evthr_c_time); if (ret > 0) return EVTHR_WAITQ; else if (0 == ret) { ret = sm_rcb_close_rcv(qsc_ctx->qsc_com.rcbcom_rdrcb); /* start appropriate function ... */ QM_HRBT_DPRINTF(QDC_C2Q_TM, 3, (QM_DEBFP, "func=qm_fr_sc, qm_fr_sc_react=before\n")); ret = qm_fr_sc_react(qsc_ctx #if QM_TIMING , &frt #endif ); QM_HRBT_DPRINTF(QDC_C2Q_TM, 3, (QM_DEBFP, "func=qm_fr_sc, qm_fr_sc_react=after, rt=%x\n", frt)); QM_LEV_DPRINTTC(QDC_C2Q, 4, (QM_DEBFP, "sev=DBG, func=qm_fr_sc, qm_fr_sc_react=%r\n", ret), qmgr_ctx->qmgr_ev_ctx->evthr_c_time); #if QMGR_TEST /* trigger an error if requested */ if (SM_IS_FLAG(qmgr_ctx->qmgr_cnf.q_cnf_tests, QMGR_TEST_SC_RD) && !sm_is_err(ret)) { QM_LEV_DPRINTTC(QDC_C2Q, 4, (QM_DEBFP, "sev=DBG, func=qm_fr_sc, where=stop_reading\n", ret), qmgr_ctx->qmgr_ev_ctx->evthr_c_time); return EVTHR_WAITQ|evthr_r_no(EVTHR_EV_RD); } #endif /* QMGR_TEST */ if (sm_is_err(ret)) goto termit; /* too harsh? */ else if (QMGR_R_WAITQ == ret) return EVTHR_WAITQ; else if (QMGR_R_ASYNC == ret) return EVTHR_OK; else if (EVTHR_DEL == ret) goto termit; /* terminate this client */ else return ret; } else if (SM_IO_EOF == ret) { ret = sm_rcb_close_rcv(qsc_ctx->qsc_com.rcbcom_rdrcb); termit: QM_LEV_DPRINTTC(QDC_C2Q, 1, (QM_DEBFP, "sev=DBG, func=qm_fr_sc, task=%p, status=terminate, ret=%r\n", qsc_ctx->qsc_com.rcbcom_tsk, ret), qmgr_ctx->qmgr_ev_ctx->evthr_c_time); close(fd); /* ** Need to close all outstanding sessions and update ** internal data! */ if (qsc_ctx->qsc_dadb_ctx != NULL) da_threads = qsc_ctx->qsc_dadb_ctx->dadb_entries_max; else da_threads = 0; /* oops */ (void) qsc_ctx_close(qsc_ctx); /* XXX see comment in qm_fr_ss() */ tsk->evthr_t_fd = INVALID_FD; /* make it invalid */ qsc_ctx->qsc_status = QSC_ST_SH_DOWN; r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex); SM_LOCK_OK(r); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SMTPC, QM_LMOD_FROM_SMTPC, SM_LOG_CRIT, 4, "sev=CRIT, func=qm_fr_sc, lock=%d", r); goto error; } if (qmgr_ctx->qmgr_max_da_threads >= da_threads) { qmgr_ctx->qmgr_max_da_threads -= da_threads; QMGR_TMO_SCHED; } else { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SMTPC, QM_LMOD_FROM_SMTPC, SM_LOG_INCONS, 5, "sev=INCONS, func=qm_fr_sc, qmgr_max_da_threads=%lu, da_threads=%u, status=inconsistent" , qmgr_ctx->qmgr_max_da_threads, da_threads); } /* XXX don't crash while holding lock? */ SM_ASSERT(qmgr_ctx->qmgr_sc_li.qm_gli_nfd > 0); /* free li ctx */ qmgr_ctx->qmgr_sc_li.qm_gli_nfd--; qmgr_ctx->qmgr_sc_li.qm_gli_used &= ~(qsc_ctx->qsc_bit); if (qsc_ctx->qsc_com.rcbcom_rdrcb != NULL) (void) sm_rcb_close_decn(qsc_ctx->qsc_com.rcbcom_rdrcb); r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex); SM_ASSERT(0 == r); /* free qsc_ctx? */ qsc_ctx->qsc_com.rcbcom_tsk = NULL; /* ** XXX QMGR should ask "someone" (MCP) to start a new DA. */ return EVTHR_DEL; } else /* if (ret < 0) */ { QM_LEV_DPRINTFC(QDC_C2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_sc, ret=%r, errno=%d\n", ret, errno)); } QM_LEV_DPRINTFC(QDC_C2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_sc, fd=%d\n", fd)); error: return EVTHR_DEL; }