/*
* Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers.
* All rights reserved.
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the sendmail distribution.
*/
#include "sm/generic.h"
SM_RCSID("@(#)$Id: qm_fr_sc.c,v 1.136 2007/06/18 04:42:31 ca Exp $")
#include "sm/error.h"
#include "sm/assert.h"
#include "sm/io.h"
#include "sm/rcb.h"
#include "sm/qmgr.h"
#include "sm/qmgr-int.h"
#include "sm/reccom.h"
#include "sm/da.h"
#include "qmgr.h"
#include "log.h"
/* if AQ size is bigger than the number of DAs: increase scheduler timeout */
#define QMGR_TMO_SCHED \
do { \
if (qmgr_ctx->qmgr_cnf.q_cnf_aq_size <= \
qmgr_ctx->qmgr_max_da_threads) \
qmgr_ctx->qmgr_tmo_sched = \
qmgr_ctx->qmgr_cnf.q_cnf_tmo_sched; \
else \
qmgr_ctx->qmgr_tmo_sched = \
(uint) (qmgr_ctx->qmgr_cnf.q_cnf_aq_size / \
(qmgr_ctx->qmgr_max_da_threads + 1)) \
* AQR_SCHED_TMOUT_FACT \
+ qmgr_ctx->qmgr_cnf.q_cnf_tmo_sched; \
} while (0)
/*
** QM_FR_SC_RCPTS -- Read (failed) recipient status from SMTPC and update AQ
**
** Note: this only updates the recipient status in AQ, qda_update_ta_stat()
** must be called afterwards to update the various DBs and counters
** What happens if this (partially) fails? The cleanup task should
** take care of that, i.e., the recipients will be removed (tempfail?)
** and tried later on again.
**
** Parameters:
** qmgr_ctx -- QMGR context
** da_ta_id -- DA transaction id
** rcb -- rcb containing info about recipient status,
** this can contain several entries.
**
** Side Effects:
**
** Returns:
** <0: usual sm_error code: protocol errors, ENOMEM,
** >0: number of recipients with status information
**
** Last code review:
** Last code change:
*/
static sm_ret_T
qm_fr_sc_rcpts(qmgr_ctx_P qmgr_ctx, sessta_id_T da_ta_id, sm_rcb_P rcb)
{
uint32_t l, rt, idx, v;
uint i, nrcpts;
sm_ret_T ret, rv;
sm_ret_T rcpt_status;
aq_ctx_P aq_ctx;
sm_str_P errmsg;
SM_IS_QMGR_CTX(qmgr_ctx);
aq_ctx = qmgr_ctx->qmgr_aq;
SM_IS_AQ(aq_ctx);
errmsg = NULL;
rv = SM_SUCCESS;
nrcpts = 0;
QM_LEV_DPRINTTC(QDC_C2Q, 2, (QM_DEBFP, "func=qm_fr_sc_rcpts, da_ta=%s\n", da_ta_id), qmgr_ctx->qmgr_ev_ctx->evthr_c_time);
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
if (sm_is_err(ret) || l != 4 || rt != RT_C2Q_RCPT_N || v > INT_MAX) {
rv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_ERR, 1,
"sev=ERROR, func=qm_fr_sc_rcpts, da_ta=%s, rt=%#x, expected=%#x, l=%d, v=%#x, ret=%m"
, da_ta_id, rt, RT_C2Q_RCPT_N, l, v, ret);
goto error;
}
nrcpts = v;
for (i = 0; i < nrcpts && !SM_RCB_ISEOB(rcb); i++) {
ret = sm_rcb_get3uint32(rcb, &l, &rt, &idx);
if (sm_is_err(ret) || l != 4 || rt != RT_C2Q_RCPT_IDX) {
rv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_ERR, 1,
"sev=ERROR, func=qm_fr_sc_rcpts, da_ta=%s, rt=%#x, expected=%#x, idx=%u, ret=%m"
, da_ta_id, rt, RT_C2Q_RCPT_IDX, idx
, ret);
break;
}
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
if (sm_is_err(ret) || l != 4 || rt != RT_C2Q_RCPT_ST) {
rv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_ERR, 1,
"sev=ERROR, func=qm_fr_sc_rcpts, rt=%#x, v=%u, expected=%#x, idx=%u, ret=%m",
rt, v, idx, RT_C2Q_RCPT_ST, ret);
break;
}
rcpt_status = STATUS2SMTPCODE(v);
QM_LEV_DPRINTFC(QDC_C2Q, 4, (QM_DEBFP, "func=qm_fr_sc_rcpts, idx=%u, stat=%d\n", idx, rcpt_status));
errmsg = NULL;
ret = sm_rcb_get2uint32(rcb, &l, &rt);
if (sm_is_err(ret)) {
rv = ret;
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_ERR, 1,
"sev=ERROR, func=qm_fr_sc_rcpts, sm_rcb_get2uint(errmsg)=%m",
ret);
break;
}
if (rt != RT_C2Q_RCPT_STT) {
rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_ERR, 1,
"sev=ERROR, func=qm_fr_sc_rcpts, da_ta=%s, rt=%#x, expected=%#x, idx=%u, ret=%m"
, da_ta_id, rt, RT_C2Q_RCPT_IDX, idx
, ret);
break;
}
#if QMGR_TEST
/* trigger an error if requested */
if (SM_IS_FLAG(qmgr_ctx->qmgr_cnf.q_cnf_tests, QMGR_TEST_RCPT_STAT)
&& rcpt_status == QMGR_TEST_RSR_ST)
{
QM_LEV_DPRINTFC(QDC_C2Q, 4, (QM_DEBFP, "func=qm_fr_sc_rcpts, where=test_error\n"));
rv = sm_error_perm(SM_EM_STR, ENOMEM);
break;
}
#endif /* QMGR_TEST */
ret = sm_rcb_getnstr(rcb, &errmsg, l);
if (sm_is_err(ret)) {
rv = ret;
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_ERR, 1,
"sev=ERROR, func=qm_fr_sc_rcpts, sm_rcb_getnstr(errmsg)=%m",
ret);
break;
}
QM_LEV_DPRINTFC(QDC_C2Q, 4, (QM_DEBFP, "func=qm_fr_sc_rcpts, rt=RT_C2Q_RCPT_STT, idx=%u, msg=%S\n", idx, errmsg));
sm_str_sanitize(errmsg);
/* do something more with this?? */
if (smtp_is_reply_temp(rcpt_status) || smtp_is_reply_fail(rcpt_status))
{
/*
** XXX This could be more efficient depending on
** the implementation of AQ: we could "just"
** walk through the recipients for this delivery
** attempt.
**
** Should error state always be DA_TA_ERR_RCPT_R?
** It is an individual error, so it must have been
** the reply to a RCPT command, right?
** The error state coming back from the DA is only
** one "global" state (for the entire transaction),
** but the error for a recipient can only(?) occurred
** during RCPT, otherwise no individual error would
** have been returned by the DA.
*/
ret = aq_rcpt_status(aq_ctx, da_ta_id, idx, rcpt_status,
DA_TA_ERR_RCPT_R, errmsg);
errmsg = NULL;
if (sm_is_err(ret)) {
/* can happen if rcpt was too long in AQ */
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_ERR,
(ret == sm_error_perm(SM_EM_AQ,
SM_E_NOTFOUND)) ? 8 : 1,
"sev=ERROR, func=qm_fr_sc_rcpts, stat=cannot_update_rcpt, da_ta=%s, idx=%u, ret=%m",
da_ta_id, idx, ret);
if (ret != sm_error_perm(SM_EM_AQ, SM_E_NOTFOUND)) {
rv = ret;
break;
}
continue;
}
}
else {
SM_STR_FREE(errmsg);
rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_ERR, 1,
"sev=ERROR, func=qm_fr_sc_rcpts, status=bad_value, da_ta=%s, idx=%u, rcpt_stat=%d",
da_ta_id, idx, rcpt_status);
}
}
if (rv == SM_SUCCESS && i < nrcpts)
rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
error:
return (SM_SUCCESS == rv) ? (sm_ret_T) nrcpts : rv;
}
/*
** QM_FR_SC_REACT -- Decode data received from SMTPC and act accordingly
**
** Parameters:
** qsc_ctx -- SMTPC context
**
** Returns:
** usual sm_error code
*/
static sm_ret_T
qm_fr_sc_react(qsc_ctx_P qsc_ctx
#if QM_TIMING
, uint32_t *pfrt
#endif
)
{
uint32_t v, l, rt, tl, where;
sm_ret_T ret, rv;
sm_rcb_P rcb;
qmgr_ctx_P qmgr_ctx;
aq_ctx_P aq_ctx;
aq_ta_P aq_ta;
dadb_entry_P dadb_entry;
sm_str_P errmsg;
SM_IS_QSC_CTX(qsc_ctx);
qmgr_ctx = qsc_ctx->qsc_qmgr_ctx;
SM_IS_QMGR_CTX(qmgr_ctx);
ret = rv = SM_SUCCESS;
errmsg = NULL;
#if QM_TIMING
*pfrt = 0;
#endif
/* Decode rcb */
rcb = qsc_ctx->qsc_com.rcbcom_rdrcb;
ret = sm_rcb_open_dec(rcb);
if (sm_is_err(ret)) {
rv = ret;
goto error;
}
aq_ctx = qmgr_ctx->qmgr_aq;
SM_IS_AQ(aq_ctx);
/* Total length of record */
ret = sm_rcb_getuint32(rcb, &tl);
if (sm_is_err(ret) || tl > QM_SS_MAX_REC_LEN || tl > sm_rcb_getlen(rcb)) {
rv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_SC2Q, SM_E_RCB2LONG);
goto err2;
}
/* Decode data, act accordingly... */
/* Protocol header: version */
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
if (sm_is_err(ret)) {
rv = ret;
goto err2;
}
if (l != 4 || rt != RT_PROT_VER || v != PROT_VER_RT) {
rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_V_MISM);
goto err2;
}
/*
queuemanager.func.tex:
*/
/* SMTPC (new/existing/close) ID */
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
QM_LEV_DPRINTTC(QDC_C2Q, 4, (QM_DEBFP, "func=qm_fr_sc_react,1 id=%d, stat=%d, rt=%#x, v=%d\n",
qsc_ctx->qsc_id, qsc_ctx->qsc_status, rt, v), qmgr_ctx->qmgr_ev_ctx->evthr_c_time);
if (sm_is_err(ret) || l != 4) {
rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
goto err2;
}
if (RT_C2Q_NID == rt) {
int r;
/* New SMTPC */
/* XXX protect access? */
/*
** XXX perform similar checks as in qm_fr_ss().c
** to prevent multiple clients with the same id!
*/
if (qsc_ctx->qsc_status != QSC_ST_NONE ||
qsc_ctx->qsc_id != QSC_ID_NONE)
{
/* XXX Internal error? Stop task? */
goto err2;
}
qsc_ctx->qsc_id = v;
/* Max number of threads */
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
if (sm_is_err(ret) || l != 4 || rt != RT_C2Q_MAXTHR || v <= 0) {
rv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
goto err2;
}
#if 0
qsc_ctx->qsc_maxthreads = v;
#endif
QM_LEV_DPRINTFC(QDC_C2Q, 1, (QM_DEBFP, "func=qm_fr_sc_react, new id=%d, maxthreads=%d\n", qsc_ctx->qsc_id, v));
ret = dadb_new(&qsc_ctx->qsc_dadb_ctx, v);
if (sm_is_err(ret))
goto err2;
qsc_ctx->qsc_status = QSC_ST_START;
if (!SM_RCB_ISEOB(rcb)) {
rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
goto err2;
}
qsc_ctx->qsc_status = QSC_ST_OK;
r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex);
SM_LOCK_OK(r);
if (r != 0) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SMTPC, QM_LMOD_FROM_SMTPC,
SM_LOG_CRIT, 4,
"sev=CRIT, func=qm_fr_sc_react, lock=%d",
r);
}
else {
qmgr_ctx->qmgr_max_da_threads += v;
QMGR_TMO_SCHED;
r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex);
SM_ASSERT(0 == r);
}
goto done;
}
else if (RT_C2Q_ID == rt) {
/* SMTPC id just for identification */
/* XXX protect access? */
if (qsc_ctx->qsc_status == QSC_ST_NONE ||
qsc_ctx->qsc_id != v)
goto err2;
}
else if (RT_C2Q_CID == rt) {
/* SMTPC shuts down */
/* XXX protect access? */
if (qsc_ctx->qsc_status == QSC_ST_NONE || qsc_ctx->qsc_id != v) {
/* XXX Internal error? Stop task? */
goto err2;
}
/* Check for EOB? not really: we shut down anyway */
qsc_ctx->qsc_status = QSC_ST_SH_DOWN;
(void) sm_rcb_close_dec(qsc_ctx->qsc_com.rcbcom_rdrcb);
/*
** We assume that no open sessions/transactions exist,
** i.e., SMTPC properly terminated them before sending this
** message.
*/
/* Terminate (delete) this task, qsc_ctx is cleaned in caller */
return EVTHR_DEL;
}
else
goto err2;
/* rt == RT_C2Q_ID is the only case in which we continue here */
#if 0
oldstatus = qsc_ctx->qsc_status;
#endif
/* what's next? */
ret = sm_rcb_get2uint32(rcb, &l, &rt);
QM_LEV_DPRINTFC(QDC_C2Q, 2, (QM_DEBFP, "func=qm_fr_sc_react, where=2, id=%d, stat=%d, rt=%#x, v=%d\n", qsc_ctx->qsc_id, qsc_ctx->qsc_status, rt, v));
if (sm_is_err(ret)) {
rv = ret;
goto err2;
}
#if QM_TIMING
*pfrt = rt;
#endif
if (RT_C2Q_STAT == rt) {
if (l != 4) {
rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
goto err2;
}
/* Status of SMTPC */
ret = sm_rcb_getuint32(rcb, &v);
if (sm_is_err(ret)) {
rv = ret;
goto err2;
}
/* XXX protect access? */
qsc_ctx->qsc_status = v;
QM_LEV_DPRINTFC(QDC_C2Q, 1, (QM_DEBFP, "func=qm_fr_sc_react, id=%d, stat=%d\n", qsc_ctx->qsc_id, qsc_ctx->qsc_status));
/* Max number of threads */
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
if (sm_is_err(ret) || l != 4 || rt != RT_C2Q_MAXTHR) {
rv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
goto err2;
}
#if 0
r = pthread_mutex_lock(&qsc_ctx->qsc_mutex);
SM_LOCK_OK(r);
if (r != 0) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SMTPC, QM_LMOD_FROM_SMTPC,
SM_LOG_CRIT, 1,
"sev=CRIT, func=qm_fr_sc_react, lock_qsc=%d"
, r);
}
else {
qsc_ctx->qsc_maxthreads = v;
r = pthread_mutex_unlock(&qsc_ctx->qsc_mutex);
SM_ASSERT(0 == r);
}
#else /* 0 */
ret = dadb_set_limit(qsc_ctx->qsc_dadb_ctx, v, THR_LOCK_UNLOCK);
if (sm_is_err(ret)) {
QM_LEV_DPRINTFC(QDC_C2Q, 1, (QM_DEBFP, "sev=ERROR, func=qm_fr_sc_react, dadb_set_limit=%r\n", ret));
/* XXX */
}
#endif /* 0 */
QM_LEV_DPRINTFC(QDC_C2Q, 3, (QM_DEBFP, "func=qm_fr_sc_react, id=%d, maxthreads=%d\n", qsc_ctx->qsc_id, v));
if (SM_RCB_ISEOB(rcb))
goto done;
/* more to come: should be session status! */
ret = sm_rcb_get2uint32(rcb, &l, &rt);
if (sm_is_err(ret)) {
rv = ret;
goto err2;
}
}
if (RT_C2Q_SEID == rt) {
sessta_id_T da_se_id;
if (l != SMTP_STID_SIZE) {
rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
goto err2;
}
/* Session id */
ret = sm_rcb_getn(rcb, (uchar *) da_se_id, l);
if (sm_is_err(ret)) {
rv = ret;
goto errseid;
}
/*
** XXX What to do here?
** The session open failed, so it was impossible to start any
** transaction. Back to the scheduler to try something
** else... where/how do we record this failure?
*/
ret = dadb_se_find(qsc_ctx->qsc_dadb_ctx, da_se_id, &dadb_entry);
QM_LEV_DPRINTFC(QDC_C2Q, 2, (QM_DEBFP, "func=qm_fr_sc_react, id=%d, da_se-id=%s, ta-id=%s, dadb_se_find=%r\n", qsc_ctx->qsc_id, da_se_id, dadb_entry->dadbe_da_ta_id, ret));
if (sm_is_err(ret))
goto errseid;
/* Session status */
ret = sm_rcb_get4uint32(rcb, &l, &rt, &v, &where);
QM_LEV_DPRINTFC(QDC_C2Q, 2, (QM_DEBFP, "func=qm_fr_sc_react, where=RT_C2Q_SEID id=%d, stat=%d, rt=%#x, v=%#x, where=%#x, ret=%r\n", qsc_ctx->qsc_id, qsc_ctx->qsc_status, rt, v, where, ret));
if (sm_is_err(ret) || l != 8 ||
(rt != RT_C2Q_SESTAT && rt != RT_C2Q_SECLSD))
{
rv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
goto errseid;
}
/* Note: rt must be preserved! at least until (RT_C2Q_SECLSD == rt) */
/*
** XXX HACK XXX Always a temporary error.
** Should this be done in SMTPC or should the conversion
** happen here?
** What about error codes during EHLO (i.e., the SMTP
** session negotation)?
** What about 5xy codes? sm8 tries the next host no
** matter what...
** XXX HACK: if 5xy code -> change to 4xy to make it
** a temporary error to try the next host.
** NOTE: this should NOT be done here... Instead some
** other logic should decide whether another try should
** be made... this logic should depend on "where" the
** error occurred, i.e., a session error should(?) be
** handled differently than a transaction error.
*/
if (sm_is_err(v)) {
if ((sm_ret_T) v == sm_error_perm(SM_EM_SMTPC, SM_E_TTMYSELF))
v = SMTPC_SE_TTMYSELF;
else if (sm_error_value(v) == ETIMEDOUT)
v = SMTPC_SE_OP_TMO;
else if (sm_error_value(v) == ECONNREFUSED)
v = SMTPC_SE_OP_REFUSED;
else if (sm_error_value(v) == ENETUNREACH ||
sm_error_value(v) == EHOSTUNREACH)
v = SMTPC_SE_OP_UNREACH;
else
v = SMTPC_SE_OPEN_ST;
}
else if (IS_SMTP_REPLY(v) && smtp_is_reply_fail(v))
v -= 100; /* XXX 5xy -> 4xy; see above */
/* keep the code...??? */
if (!SM_RCB_ISEOB(rcb)) {
uint32_t rtl;
/* Error message */
ret = sm_rcb_get2uint32(rcb, &l, &rtl);
if (sm_is_err(ret) || rtl != RT_C2Q_STATT) {
rv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
goto errtaid;
}
ret = sm_rcb_getnstr(rcb, &errmsg, l);
if (sm_is_err(ret)) {
rv = ret;
goto errtaid;
}
QM_LEV_DPRINTFC(QDC_C2Q, 4, (QM_DEBFP, "func=qm_fr_sc_react, where=seid, rt=RT_C2Q_STATT, id=%d, msg=%S\n", qsc_ctx->qsc_id, errmsg));
sm_str_sanitize(errmsg);
}
/* don't decrease curactive before closing session */
#if 0
#define QSC_CURACTIVE_DECR do { \
r = pthread_mutex_lock(&qsc_ctx->qsc_mutex); \
SM_LOCK_OK(r); \
if (r != 0) { \
sm_log_write(qmgr_ctx->qmgr_lctx, \
QM_LCAT_SMTPC, QM_LMOD_FROM_SMTPC, \
SM_LOG_CRIT, 1, \
"sev=CRIT, func=qm_fr_sc_react, lock_qsc=%d", r);\
} \
else { \
SM_ASSERT(qsc_ctx->qsc_curactive > 0); \
qsc_ctx->qsc_curactive--; \
r = pthread_mutex_unlock(&qsc_ctx->qsc_mutex);\
SM_ASSERT(0 == r); \
} \
} while (0)
#else /* 0 */
#define QSC_CURACTIVE_DECR SM_NOOP
#endif /* 0 */
/* session is just closed? */
if (RT_C2Q_SECLSD == rt) {
/*
** "race condition":
** smtpc may close a session due to timeout
** qmgr may want to reuse it
** the messages "cross" each other, i.e.,
** smtpc closed the session before it got the
** new task and qmgr sent the new task before it
** knows about the close.
** how to detect this?
*/
if (DADBE_IS_FLAG(dadb_entry, DADBE_FL_BUSY) &&
!DADBE_IS_FLAG(dadb_entry, DADBE_FL_IDLE))
goto done;
/* need to close entry in DADB */
ret = dadb_sess_close_entry(qmgr_ctx, qsc_ctx->qsc_dadb_ctx,
dadb_entry, false, NULL, THR_LOCK_UNLOCK);
QSC_CURACTIVE_DECR;
if (sm_is_err(ret)) {
QM_LEV_DPRINTFC(QDC_C2Q, 1, (QM_DEBFP, "sev=ERROR, func=qm_fr_sc_react, id=%d, da_se-id=%s, dadb_sess_close_entry=%r\n", qsc_ctx->qsc_id, da_se_id, ret));
goto errseid;
}
goto done;
}
/*
** XXX Update session status XXX
** Update all (open) transactions within the session.
** Currently there can be only one transaction.
** Look it up and mark it (all recipients) as
** "temporarily failed".
*/
/*
QM_HRBT_DPRINTF(QDC_C2Q_TM, (QM_DEBFP, "func=qm_fr_sc_react, before=dadb_ta_find\n"));
*/
ret = dadb_ta_find(qsc_ctx->qsc_dadb_ctx, dadb_entry->dadbe_da_ta_id, &dadb_entry);
QM_LEV_DPRINTFC(QDC_C2Q, 2, (QM_DEBFP, "func=qm_fr_sc_react, id=%d, da_se-id=%s, ta-id=%s, dadb_ta_find=%r\n", qsc_ctx->qsc_id, da_se_id, dadb_entry->dadbe_da_ta_id, ret));
if (sm_is_err(ret))
goto errseid;
SM_ASSERT(dadb_entry->dadbe_ss_ta_id != NULL);
SM_ASSERT(*dadb_entry->dadbe_ss_ta_id != '\0');
/* XXX Hack, more or less copy from below XXX */
/*
QM_HRBT_DPRINTF(QDC_C2Q_TM, (QM_DEBFP, "func=qm_fr_sc_react, before=aq_ta_find\n"));
*/
if (dadb_entry->dadbe_rcpt != NULL &&
(aq_ta = dadb_entry->dadbe_rcpt->aqr_ss_ta) != NULL) {
SM_ASSERT(SESSTA_EQ(dadb_entry->dadbe_ss_ta_id,
aq_ta->aqt_ss_ta_id));
}
else
ret = aq_ta_find(qmgr_ctx->qmgr_aq, dadb_entry->dadbe_ss_ta_id, true, &aq_ta);
QM_LEV_DPRINTFC(QDC_C2Q, 3, (QM_DEBFP, "sev=DBG, func=qm_fr_sc_react, id=%d, da_ta-id=%s, ss_ta-id=%s, aq_ta_find=%r\n", qsc_ctx->qsc_id, dadb_entry->dadbe_da_ta_id, dadb_entry->dadbe_ss_ta_id, ret));
if (sm_is_err(ret) && sm_error_perm(SM_EM_AQ, SM_E_NOTFOUND) == ret) {
/*
* entry could have been expired... ??? WHAT NOW?
* we cannot simply stop reading; at least we
* have to skip over the rest, but we also
* need to clean up the DA session cache!
*/
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_WARN, 8,
"sev=WARN, func=qm_fr_sc_react, stat=cannot_update_session, da_ta=%s, aq_ta_find=not_found",
dadb_entry->dadbe_ss_ta_id);
(void) sm_rcb_close_decn(qsc_ctx->qsc_com.rcbcom_rdrcb);
(void) qda_dadb_close(qmgr_ctx, dadb_entry->dadbe_ss_ta_id, qsc_ctx->qsc_dadb_ctx, dadb_entry, v, &ret);
goto done2;
}
if (sm_is_err(ret))
goto errseid;
/* session must be closed */
DADBE_SET_FLAG(dadb_entry, DADBE_FL_SE_CL);
/*
QM_HRBT_DPRINTF(QDC_C2Q_TM, 3, (QM_DEBFP, "func=qm_fr_sc_react, before=qda_update_ta_stat\n"));
*/
/* Update transaction status */
ret = qda_update_ta_stat(qmgr_ctx, dadb_entry->dadbe_da_ta_id,
v, where, qsc_ctx->qsc_dadb_ctx, dadb_entry,
aq_ta, NULL, errmsg, THR_LOCK_UNLOCK);
QSC_CURACTIVE_DECR;
if (sm_is_err(ret)) {
QM_LEV_DPRINTFC(QDC_C2Q, 3, (QM_DEBFP, "sev=DBG, func=qm_fr_sc_react, id=%d, da_ta-id=%s, ss_ta-id=%s qda_update_ta_stat=%r\n", qsc_ctx->qsc_id, dadb_entry->dadbe_da_ta_id, dadb_entry->dadbe_ss_ta_id, ret));
rv = ret;
goto errseid;
}
/* XXX */
goto done;
errseid:
/* More cleanup? */
goto err2;
}
else if (rt == RT_C2Q_TAID || rt == RT_C2Q_TAID_CS) {
sessta_id_T da_ta_id;
if (l != SMTP_STID_SIZE) {
rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
goto err2;
}
/* Transaction id */
ret = sm_rcb_getn(rcb, (uchar *) da_ta_id, l);
if (sm_is_err(ret)) {
rv = ret;
goto errtaid;
}
QM_HRBT_DPRINTF(QDC_C2Q_TM, 0, (QM_DEBFP, "func=qm_fr_sc_react, da_ta_id=%s\n", da_ta_id));
/*
** XXX Which TA do we want to find here?
** That is, how do we store the DA TA in the QMGR?
** Do we look up all recipients (sequentially)?
** Yes -> write a function update_da_ta_status()
** which updates the status for all recipients in
** this DA TA.
*/
/*
QM_HRBT_DPRINTF(QDC_C2Q_TM, 2, (QM_DEBFP, "func=qm_fr_sc_react, before=dadb_ta_find\n"));
*/
ret = dadb_ta_find(qsc_ctx->qsc_dadb_ctx, da_ta_id, &dadb_entry);
QM_LEV_DPRINTFC(QDC_C2Q, 3, (QM_DEBFP, "sev=DBG, func=qm_fr_sc_react, id=%d, da_ta-id=%s, dadb_ta_find=%r\n", qsc_ctx->qsc_id, da_ta_id, ret));
if (sm_is_err(ret))
goto errtaid;
/* "last transaction, close session" */
if (RT_C2Q_TAID_CS == rt) {
QSC_CURACTIVE_DECR;
DADBE_SET_FLAG(dadb_entry, DADBE_FL_SE_CL);
}
else if (RT_C2Q_TAID == rt)
DADBE_SET_FLAG(dadb_entry, DADBE_FL_TA_CL);
SM_ASSERT(dadb_entry->dadbe_ss_ta_id != NULL);
SM_ASSERT(*dadb_entry->dadbe_ss_ta_id != '\0');
/*
QM_HRBT_DPRINTF(QDC_C2Q_TM, 2, (QM_DEBFP, "func=qm_fr_sc_react, before=aq_ta_find\n"));
*/
if (dadb_entry->dadbe_rcpt != NULL &&
(aq_ta = dadb_entry->dadbe_rcpt->aqr_ss_ta) != NULL) {
SM_IS_AQ_RCPT(dadb_entry->dadbe_rcpt);
SM_ASSERT(SESSTA_EQ(dadb_entry->dadbe_ss_ta_id,
aq_ta->aqt_ss_ta_id));
}
else
ret = aq_ta_find(qmgr_ctx->qmgr_aq, dadb_entry->dadbe_ss_ta_id, true, &aq_ta);
QM_LEV_DPRINTFC(QDC_C2Q, 3, (QM_DEBFP, "sev=DBG, func=qm_fr_sc_react, id=%d, da_ta-id=%s, ss_ta-id=%s aq_ta_find=%r\n", qsc_ctx->qsc_id, da_ta_id, dadb_entry->dadbe_ss_ta_id, ret));
if (sm_is_err(ret) && sm_error_perm(SM_EM_AQ, SM_E_NOTFOUND) == ret) {
/*
* ??? WHAT NOW????
* see other aq_ta_find() call and comment!
*/
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_WARN, 8,
"sev=WARN, func=qm_fr_sc_react, stat=cannot_update_TA, da_ta=%s, aq_ta_find=not_found",
dadb_entry->dadbe_ss_ta_id);
(void) sm_rcb_close_decn(qsc_ctx->qsc_com.rcbcom_rdrcb);
goto done2;
}
if (sm_is_err(ret))
goto errtaid;
/* Status */
ret = sm_rcb_get4uint32(rcb, &l, &rt, &v, &where);
QM_LEV_DPRINTFC(QDC_C2Q, 1, (QM_DEBFP, "sev=DBG, func=qm_fr_sc_react, where=RT_C2Q_status, id=%d, stat=%d, rt=%r, v=%d, where=%r, ret=%#x\n", qsc_ctx->qsc_id, qsc_ctx->qsc_status, rt, v, where, ret));
if (sm_is_err(ret) || l != 8) {
rv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
goto errtaid;
}
if (RT_C2Q_SESTAT == rt) {
/* XXX Update session status XXX */
/* Can this happen? Check protocol! */
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SMTPC, QM_LMOD_FROM_SMTPC,
SM_LOG_CRIT, 1,
"sev=CRIT, func=qm_fr_sc_react, rt=%#x, status=unexpected"
, rt);
}
else if (rt == RT_C2Q_TASTAT || rt == RT_C2Q_TARSTAT) {
if (RT_C2Q_TARSTAT == rt) {
/* Update recipient status */
ret = qm_fr_sc_rcpts(qmgr_ctx, da_ta_id, rcb);
if (sm_is_err(ret)) {
QM_LEV_DPRINTFC(QDC_C2Q, 4, (QM_DEBFP, "sev=DBG, func=qm_fr_sc_react, qm_fr_sc_rcpts=%r\n", ret));
rv = ret;
goto errtaid;
}
}
else if (!SM_RCB_ISEOB(rcb)) {
/* Error message */
ret = sm_rcb_get2uint32(rcb, &l, &rt);
if (sm_is_err(ret) || rt != RT_C2Q_STATT) {
rv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_SC2Q,
SM_E_PR_ERR);
goto errtaid;
}
ret = sm_rcb_getnstr(rcb, &errmsg, l);
if (sm_is_err(ret)) {
rv = ret;
goto errtaid;
}
QM_LEV_DPRINTFC(QDC_C2Q, 4, (QM_DEBFP, "sev=DBG, func=qm_fr_sc_react, where=taid, rt=RT_C2Q_STATT, id=%d, msg=%S\n", qsc_ctx->qsc_id, errmsg));
sm_str_sanitize(errmsg);
}
/* Update (rest of) transaction status */
/*
QM_HRBT_DPRINTF(QDC_C2Q_TM, 2, (QM_DEBFP, "func=qm_fr_sc_react, before=qda_update_ta_stat\n"));
*/
ret = qda_update_ta_stat(qmgr_ctx, da_ta_id,
STATUS2SMTPCODE(v),
where, qsc_ctx->qsc_dadb_ctx,
dadb_entry, aq_ta, NULL,
errmsg, THR_LOCK_UNLOCK);
QM_HRBT_DPRINTF(QDC_C2Q_TM, 0, (QM_DEBFP, "func=qda_update_ta_stat, da_ta_id=%s\n", da_ta_id));
if (sm_is_err(ret)) {
rv = ret;
goto errtaid;
}
}
else {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SMTPC, QM_LMOD_FROM_SMTPC,
SM_LOG_ERR, 8,
"sev=ERROR, func=qm_fr_sc_react, rt=%#x, status=unexpected"
, rt);
rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
goto err2;
}
/* XXX */
goto done;
errtaid:
/* more cleanup? */
goto err2;
}
#if 0
/* Other cases? */
else if (XXX == rt)
#endif /* 0 */
else
goto err2;
done:
ret = sm_rcb_close_dec(qsc_ctx->qsc_com.rcbcom_rdrcb);
done2:
(void) sm_rcb_open_rcv(qsc_ctx->qsc_com.rcbcom_rdrcb);
SM_STR_FREE(errmsg);
return rv;
/* Preserve original error code! */
err2:
/* Use rcb functions that don't do check the state */
(void) sm_rcb_close_decn(qsc_ctx->qsc_com.rcbcom_rdrcb);
error:
/* Open rcb for next record */
(void) sm_rcb_open_rcvn(qsc_ctx->qsc_com.rcbcom_rdrcb);
QM_LEV_DPRINTFC(QDC_C2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_sc_react, id=%d, status=%#x, status=error_out, rt=%#x, v=%d, l=%d, ret=%r\n", qsc_ctx->qsc_id, qsc_ctx->qsc_status, rt, v, l, ret));
SM_STR_FREE(errmsg);
/*
** Question: Fail on all errors? Only on those returned from
** qda_update_ta_stat() or qm_fr_sc_rcpts(), not protocol errors.
*/
if (QM_DA_STAT_FATAL(rv)) {
/* which resource? doesn't matter? */
(void) qm_rsr_problem(qmgr_ctx, QMGR_RFL_MEM_I, THR_LOCK_UNLOCK);
rv = EVTHR_TERM;
}
return rv;
}
/*
** QM_FR_SC -- SMTPC to QMGR interface
**
** Parameters:
** tsk -- evthr task
**
** Returns:
** usual sm_error code
*/
sm_ret_T
qm_fr_sc(sm_evthr_task_P tsk)
{
int fd, r;
sm_ret_T ret;
uint da_threads;
qmgr_ctx_P qmgr_ctx;
qsc_ctx_P qsc_ctx;
#if QM_TIMING
uint32_t frt;
#endif
SM_IS_EVTHR_TSK(tsk);
qsc_ctx = (qsc_ctx_P) tsk->evthr_t_actx;
SM_IS_QSC_CTX(qsc_ctx);
qmgr_ctx = qsc_ctx->qsc_qmgr_ctx;
SM_IS_QMGR_CTX(qmgr_ctx);
fd = tsk->evthr_t_fd; /* Checked in caller */
ret = sm_rcb_rcv(fd, qsc_ctx->qsc_com.rcbcom_rdrcb, QSS_RC_MINSZ);
QM_LEV_DPRINTTC(QDC_C2Q, 4, (QM_DEBFP, "sev=DBG, func=qm_fr_sc, fd=%d, ret=%r, buf=%d, len=%d, qsc_status=%#x\n",
fd, ret, qsc_ctx->qsc_com.rcbcom_rdrcb->sm_rcb_base[0], sm_rcb_getlen(qsc_ctx->qsc_com.rcbcom_rdrcb), qsc_ctx->qsc_status), qmgr_ctx->qmgr_ev_ctx->evthr_c_time);
if (ret > 0)
return EVTHR_WAITQ;
else if (0 == ret) {
ret = sm_rcb_close_rcv(qsc_ctx->qsc_com.rcbcom_rdrcb);
/* start appropriate function ... */
QM_HRBT_DPRINTF(QDC_C2Q_TM, 3, (QM_DEBFP, "func=qm_fr_sc, qm_fr_sc_react=before\n"));
ret = qm_fr_sc_react(qsc_ctx
#if QM_TIMING
, &frt
#endif
);
QM_HRBT_DPRINTF(QDC_C2Q_TM, 3, (QM_DEBFP, "func=qm_fr_sc, qm_fr_sc_react=after, rt=%x\n", frt));
QM_LEV_DPRINTTC(QDC_C2Q, 4, (QM_DEBFP, "sev=DBG, func=qm_fr_sc, qm_fr_sc_react=%r\n", ret), qmgr_ctx->qmgr_ev_ctx->evthr_c_time);
#if QMGR_TEST
/* trigger an error if requested */
if (SM_IS_FLAG(qmgr_ctx->qmgr_cnf.q_cnf_tests, QMGR_TEST_SC_RD)
&& !sm_is_err(ret)) {
QM_LEV_DPRINTTC(QDC_C2Q, 4, (QM_DEBFP, "sev=DBG, func=qm_fr_sc, where=stop_reading\n", ret), qmgr_ctx->qmgr_ev_ctx->evthr_c_time);
return EVTHR_WAITQ|evthr_r_no(EVTHR_EV_RD);
}
#endif /* QMGR_TEST */
if (sm_is_err(ret))
goto termit; /* too harsh? */
else if (QMGR_R_WAITQ == ret)
return EVTHR_WAITQ;
else if (QMGR_R_ASYNC == ret)
return EVTHR_OK;
else if (EVTHR_DEL == ret)
goto termit; /* terminate this client */
else
return ret;
}
else if (SM_IO_EOF == ret) {
ret = sm_rcb_close_rcv(qsc_ctx->qsc_com.rcbcom_rdrcb);
termit:
QM_LEV_DPRINTTC(QDC_C2Q, 1, (QM_DEBFP, "sev=DBG, func=qm_fr_sc, task=%p, status=terminate, ret=%r\n", qsc_ctx->qsc_com.rcbcom_tsk, ret), qmgr_ctx->qmgr_ev_ctx->evthr_c_time);
close(fd);
/*
** Need to close all outstanding sessions and update
** internal data!
*/
if (qsc_ctx->qsc_dadb_ctx != NULL)
da_threads = qsc_ctx->qsc_dadb_ctx->dadb_entries_max;
else
da_threads = 0; /* oops */
(void) qsc_ctx_close(qsc_ctx);
/* XXX see comment in qm_fr_ss() */
tsk->evthr_t_fd = INVALID_FD; /* make it invalid */
qsc_ctx->qsc_status = QSC_ST_SH_DOWN;
r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex);
SM_LOCK_OK(r);
if (r != 0) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SMTPC, QM_LMOD_FROM_SMTPC,
SM_LOG_CRIT, 4,
"sev=CRIT, func=qm_fr_sc, lock=%d",
r);
goto error;
}
if (qmgr_ctx->qmgr_max_da_threads >= da_threads) {
qmgr_ctx->qmgr_max_da_threads -= da_threads;
QMGR_TMO_SCHED;
}
else {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SMTPC, QM_LMOD_FROM_SMTPC,
SM_LOG_INCONS, 5,
"sev=INCONS, func=qm_fr_sc, qmgr_max_da_threads=%lu, da_threads=%u, status=inconsistent"
, qmgr_ctx->qmgr_max_da_threads, da_threads);
}
/* XXX don't crash while holding lock? */
SM_ASSERT(qmgr_ctx->qmgr_sc_li.qm_gli_nfd > 0);
/* free li ctx */
qmgr_ctx->qmgr_sc_li.qm_gli_nfd--;
qmgr_ctx->qmgr_sc_li.qm_gli_used &= ~(qsc_ctx->qsc_bit);
if (qsc_ctx->qsc_com.rcbcom_rdrcb != NULL)
(void) sm_rcb_close_decn(qsc_ctx->qsc_com.rcbcom_rdrcb);
r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex);
SM_ASSERT(0 == r);
/* free qsc_ctx? */
qsc_ctx->qsc_com.rcbcom_tsk = NULL;
/*
** XXX QMGR should ask "someone" (MCP) to start a new DA.
*/
return EVTHR_DEL;
}
else /* if (ret < 0) */ {
QM_LEV_DPRINTFC(QDC_C2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_sc, ret=%r, errno=%d\n", ret, errno));
}
QM_LEV_DPRINTFC(QDC_C2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_sc, fd=%d\n", fd));
error:
return EVTHR_DEL;
}
syntax highlighted by Code2HTML, v. 0.9.1