/*
 * Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers.
 *	All rights reserved.
 *
 * By using this file, you agree to the terms and conditions set
 * forth in the LICENSE file which can be found at the top level of
 * the sendmail distribution.
 */

#include "sm/generic.h"
SM_RCSID("@(#)$Id: qm_fr_sc.c,v 1.136 2007/06/18 04:42:31 ca Exp $")
#include "sm/error.h"
#include "sm/assert.h"
#include "sm/io.h"
#include "sm/rcb.h"
#include "sm/qmgr.h"
#include "sm/qmgr-int.h"
#include "sm/reccom.h"
#include "sm/da.h"
#include "qmgr.h"
#include "log.h"

/* if AQ size is bigger than the number of DAs: increase scheduler timeout */
#define QMGR_TMO_SCHED	\
	do {								\
		if (qmgr_ctx->qmgr_cnf.q_cnf_aq_size <=			\
		    qmgr_ctx->qmgr_max_da_threads)			\
			qmgr_ctx->qmgr_tmo_sched =			\
				qmgr_ctx->qmgr_cnf.q_cnf_tmo_sched;	\
		else							\
			qmgr_ctx->qmgr_tmo_sched =			\
				(uint) (qmgr_ctx->qmgr_cnf.q_cnf_aq_size / \
					(qmgr_ctx->qmgr_max_da_threads + 1)) \
				* AQR_SCHED_TMOUT_FACT			\
				+ qmgr_ctx->qmgr_cnf.q_cnf_tmo_sched;	\
	} while (0)

/*
**  QM_FR_SC_RCPTS -- Read (failed) recipient status from SMTPC and update AQ
**
**	Note: this only updates the recipient status in AQ, qda_update_ta_stat()
**	must be called afterwards to update the various DBs and counters
**	What happens if this (partially) fails? The cleanup task should
**	take care of that, i.e., the recipients will be removed (tempfail?)
**	and tried later on again.
**
**	Parameters:
**		qmgr_ctx -- QMGR context
**		da_ta_id -- DA transaction id
**		rcb -- rcb containing info about recipient status,
**			this can contain several entries.
**
**	Side Effects:
**
**	Returns:
**		<0: usual sm_error code: protocol errors, ENOMEM,
**		>0: number of recipients with status information
**
**	Last code review:
**	Last code change:
*/

static sm_ret_T
qm_fr_sc_rcpts(qmgr_ctx_P qmgr_ctx, sessta_id_T da_ta_id, sm_rcb_P rcb)
{
	uint32_t l, rt, idx, v;
	uint i, nrcpts;
	sm_ret_T ret, rv;
	sm_ret_T rcpt_status;
	aq_ctx_P aq_ctx;
	sm_str_P errmsg;

	SM_IS_QMGR_CTX(qmgr_ctx);
	aq_ctx = qmgr_ctx->qmgr_aq;
	SM_IS_AQ(aq_ctx);
	errmsg = NULL;
	rv = SM_SUCCESS;
	nrcpts = 0;

	QM_LEV_DPRINTTC(QDC_C2Q, 2, (QM_DEBFP, "func=qm_fr_sc_rcpts, da_ta=%s\n", da_ta_id), qmgr_ctx->qmgr_ev_ctx->evthr_c_time);

	ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
	if (sm_is_err(ret) || l != 4 || rt != RT_C2Q_RCPT_N || v > INT_MAX) {
		rv = sm_is_err(ret) ? ret
			: sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
		sm_log_write(qmgr_ctx->qmgr_lctx,
			QM_LCAT_DASTAT, QM_LMOD_DASTAT,
			SM_LOG_ERR, 1,
			"sev=ERROR, func=qm_fr_sc_rcpts, da_ta=%s, rt=%#x, expected=%#x, l=%d, v=%#x, ret=%m"
			, da_ta_id, rt, RT_C2Q_RCPT_N, l, v, ret);
		goto error;
	}

	nrcpts = v;
	for (i = 0; i < nrcpts && !SM_RCB_ISEOB(rcb); i++) {
		ret = sm_rcb_get3uint32(rcb, &l, &rt, &idx);
		if (sm_is_err(ret) || l != 4 || rt != RT_C2Q_RCPT_IDX) {
			rv = sm_is_err(ret) ? ret
				: sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_DASTAT, QM_LMOD_DASTAT,
				SM_LOG_ERR, 1,
				"sev=ERROR, func=qm_fr_sc_rcpts, da_ta=%s, rt=%#x, expected=%#x, idx=%u, ret=%m"
				, da_ta_id, rt, RT_C2Q_RCPT_IDX, idx
				, ret);
			break;
		}
		ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
		if (sm_is_err(ret) || l != 4 || rt != RT_C2Q_RCPT_ST) {
			rv = sm_is_err(ret) ? ret
				: sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_DASTAT, QM_LMOD_DASTAT,
				SM_LOG_ERR, 1,
				"sev=ERROR, func=qm_fr_sc_rcpts, rt=%#x, v=%u, expected=%#x, idx=%u, ret=%m",
				rt, v, idx, RT_C2Q_RCPT_ST, ret);
			break;
		}
		rcpt_status = STATUS2SMTPCODE(v);
QM_LEV_DPRINTFC(QDC_C2Q, 4, (QM_DEBFP, "func=qm_fr_sc_rcpts, idx=%u, stat=%d\n", idx, rcpt_status));

		errmsg = NULL;
		ret = sm_rcb_get2uint32(rcb, &l, &rt);
		if (sm_is_err(ret)) {
			rv = ret;
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_DASTAT, QM_LMOD_DASTAT,
				SM_LOG_ERR, 1,
				"sev=ERROR, func=qm_fr_sc_rcpts, sm_rcb_get2uint(errmsg)=%m",
				ret);
			break;
		}
		if (rt != RT_C2Q_RCPT_STT) {
			rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_DASTAT, QM_LMOD_DASTAT,
				SM_LOG_ERR, 1,
				"sev=ERROR, func=qm_fr_sc_rcpts, da_ta=%s, rt=%#x, expected=%#x, idx=%u, ret=%m"
				, da_ta_id, rt, RT_C2Q_RCPT_IDX, idx
				, ret);
			break;
		}
#if QMGR_TEST
		/* trigger an error if requested */
		if (SM_IS_FLAG(qmgr_ctx->qmgr_cnf.q_cnf_tests, QMGR_TEST_RCPT_STAT)
		    && rcpt_status == QMGR_TEST_RSR_ST)
		{
QM_LEV_DPRINTFC(QDC_C2Q, 4, (QM_DEBFP, "func=qm_fr_sc_rcpts, where=test_error\n"));
			rv = sm_error_perm(SM_EM_STR, ENOMEM);
			break;
		}
#endif /* QMGR_TEST */

		ret = sm_rcb_getnstr(rcb, &errmsg, l);
		if (sm_is_err(ret)) {
			rv = ret;
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_DASTAT, QM_LMOD_DASTAT,
				SM_LOG_ERR, 1,
				"sev=ERROR, func=qm_fr_sc_rcpts, sm_rcb_getnstr(errmsg)=%m",
				ret);
			break;
		}
QM_LEV_DPRINTFC(QDC_C2Q, 4, (QM_DEBFP, "func=qm_fr_sc_rcpts, rt=RT_C2Q_RCPT_STT, idx=%u, msg=%S\n", idx, errmsg));
		sm_str_sanitize(errmsg);

		/* do something more with this?? */
		if (smtp_is_reply_temp(rcpt_status) || smtp_is_reply_fail(rcpt_status))
		{
			/*
			**  XXX This could be more efficient depending on
			**  the implementation of AQ: we could "just"
			**  walk through the recipients for this delivery
			**  attempt.
			**
			**  Should error state always be DA_TA_ERR_RCPT_R?
			**  It is an individual error, so it must have been
			**  the reply to a RCPT command, right?
			**  The error state coming back from the DA is only
			**  one "global" state (for the entire transaction),
			**  but the error for a recipient can only(?) occurred
			**  during RCPT, otherwise no individual error would
			**  have been returned by the DA.
			*/

			ret = aq_rcpt_status(aq_ctx, da_ta_id, idx, rcpt_status,
						DA_TA_ERR_RCPT_R, errmsg);
			errmsg = NULL;
			if (sm_is_err(ret)) {
				/* can happen if rcpt was too long in AQ */
				sm_log_write(qmgr_ctx->qmgr_lctx,
					QM_LCAT_DASTAT, QM_LMOD_DASTAT,
					SM_LOG_ERR,
					(ret == sm_error_perm(SM_EM_AQ,
							SM_E_NOTFOUND)) ? 8 : 1,
					"sev=ERROR, func=qm_fr_sc_rcpts, stat=cannot_update_rcpt, da_ta=%s, idx=%u, ret=%m",
					da_ta_id, idx, ret);
				if (ret != sm_error_perm(SM_EM_AQ, SM_E_NOTFOUND)) {
					rv = ret;
					break;
				}
				continue;
			}
		}
		else {
			SM_STR_FREE(errmsg);
			rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_DASTAT, QM_LMOD_DASTAT,
				SM_LOG_ERR, 1,
				"sev=ERROR, func=qm_fr_sc_rcpts, status=bad_value, da_ta=%s, idx=%u, rcpt_stat=%d",
				da_ta_id, idx, rcpt_status);
		}
	}
	if (rv == SM_SUCCESS && i < nrcpts)
		rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);

  error:
	return (SM_SUCCESS == rv) ? (sm_ret_T) nrcpts : rv;
}

/*
**  QM_FR_SC_REACT -- Decode data received from SMTPC and act accordingly
**
**	Parameters:
**		qsc_ctx -- SMTPC context
**
**	Returns:
**		usual sm_error code
*/

static sm_ret_T
qm_fr_sc_react(qsc_ctx_P qsc_ctx
#if QM_TIMING
	, uint32_t *pfrt
#endif
	)
{
	uint32_t v, l, rt, tl, where;
	sm_ret_T ret, rv;
	sm_rcb_P rcb;
	qmgr_ctx_P qmgr_ctx;
	aq_ctx_P aq_ctx;
	aq_ta_P aq_ta;
	dadb_entry_P dadb_entry;
	sm_str_P errmsg;

	SM_IS_QSC_CTX(qsc_ctx);
	qmgr_ctx = qsc_ctx->qsc_qmgr_ctx;
	SM_IS_QMGR_CTX(qmgr_ctx);
	ret = rv = SM_SUCCESS;
	errmsg = NULL;
#if QM_TIMING
	*pfrt = 0;
#endif

	/* Decode rcb */
	rcb = qsc_ctx->qsc_com.rcbcom_rdrcb;
	ret = sm_rcb_open_dec(rcb);
	if (sm_is_err(ret)) {
		rv = ret;
		goto error;
	}
	aq_ctx = qmgr_ctx->qmgr_aq;
	SM_IS_AQ(aq_ctx);

	/* Total length of record */
	ret = sm_rcb_getuint32(rcb, &tl);
	if (sm_is_err(ret) || tl > QM_SS_MAX_REC_LEN || tl > sm_rcb_getlen(rcb)) {
		rv = sm_is_err(ret) ? ret
			: sm_error_perm(SM_EM_Q_SC2Q, SM_E_RCB2LONG);
		goto err2;
	}

	/* Decode data, act accordingly... */

	/* Protocol header: version */
	ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
	if (sm_is_err(ret)) {
		rv = ret;
		goto err2;
	}
	if (l != 4 || rt != RT_PROT_VER || v != PROT_VER_RT) {
		rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_V_MISM);
		goto err2;
	}

/*
queuemanager.func.tex:


*/

	/* SMTPC (new/existing/close) ID */
	ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
QM_LEV_DPRINTTC(QDC_C2Q, 4, (QM_DEBFP, "func=qm_fr_sc_react,1 id=%d, stat=%d, rt=%#x, v=%d\n",
qsc_ctx->qsc_id, qsc_ctx->qsc_status, rt, v), qmgr_ctx->qmgr_ev_ctx->evthr_c_time);
	if (sm_is_err(ret) || l != 4) {
		rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
		goto err2;
	}
	if (RT_C2Q_NID == rt) {
		int r;

		/* New SMTPC */
		/* XXX protect access? */

		/*
		**  XXX perform similar checks as in qm_fr_ss().c
		**  to prevent multiple clients with the same id!
		*/

		if (qsc_ctx->qsc_status != QSC_ST_NONE ||
		    qsc_ctx->qsc_id != QSC_ID_NONE)
		{
			/* XXX Internal error? Stop task? */
			goto err2;
		}
		qsc_ctx->qsc_id = v;

		/* Max number of threads */
		ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
		if (sm_is_err(ret) || l != 4 || rt != RT_C2Q_MAXTHR || v <= 0) {
			rv = sm_is_err(ret) ? ret
				: sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
			goto err2;
		}
#if 0
		qsc_ctx->qsc_maxthreads = v;
#endif
QM_LEV_DPRINTFC(QDC_C2Q, 1, (QM_DEBFP, "func=qm_fr_sc_react, new id=%d, maxthreads=%d\n", qsc_ctx->qsc_id, v));
		ret = dadb_new(&qsc_ctx->qsc_dadb_ctx, v);
		if (sm_is_err(ret))
			goto err2;
		qsc_ctx->qsc_status = QSC_ST_START;
		if (!SM_RCB_ISEOB(rcb)) {
			rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
			goto err2;
		}
		qsc_ctx->qsc_status = QSC_ST_OK;
		r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex);
		SM_LOCK_OK(r);
		if (r != 0) {
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_SMTPC, QM_LMOD_FROM_SMTPC,
				SM_LOG_CRIT, 4,
				"sev=CRIT, func=qm_fr_sc_react, lock=%d",
				r);
		}
		else {
			qmgr_ctx->qmgr_max_da_threads += v;
			QMGR_TMO_SCHED;
			r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex);
			SM_ASSERT(0 == r);
		}
		goto done;
	}
	else if (RT_C2Q_ID == rt) {
		/* SMTPC id just for identification */
		/* XXX protect access? */
		if (qsc_ctx->qsc_status == QSC_ST_NONE ||
		    qsc_ctx->qsc_id != v)
			goto err2;
	}
	else if (RT_C2Q_CID == rt) {
		/* SMTPC shuts down */
		/* XXX protect access? */
		if (qsc_ctx->qsc_status == QSC_ST_NONE || qsc_ctx->qsc_id != v) {
			/* XXX Internal error? Stop task? */
			goto err2;
		}

		/* Check for EOB? not really: we shut down anyway */
		qsc_ctx->qsc_status = QSC_ST_SH_DOWN;
		(void) sm_rcb_close_dec(qsc_ctx->qsc_com.rcbcom_rdrcb);

		/*
		**  We assume that no open sessions/transactions exist,
		**  i.e., SMTPC properly terminated them before sending this
		**  message.
		*/

		/* Terminate (delete) this task, qsc_ctx is cleaned in caller */
		return EVTHR_DEL;
	}
	else
		goto err2;

	/* rt == RT_C2Q_ID is the only case in which we continue here */
#if 0
	oldstatus = qsc_ctx->qsc_status;
#endif

	/* what's next? */
	ret = sm_rcb_get2uint32(rcb, &l, &rt);
QM_LEV_DPRINTFC(QDC_C2Q, 2, (QM_DEBFP, "func=qm_fr_sc_react, where=2, id=%d, stat=%d, rt=%#x, v=%d\n", qsc_ctx->qsc_id, qsc_ctx->qsc_status, rt, v));
	if (sm_is_err(ret)) {
		rv = ret;
		goto err2;
	}
#if QM_TIMING
	*pfrt = rt;
#endif

	if (RT_C2Q_STAT == rt) {
		if (l != 4) {
			rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
			goto err2;
		}

		/* Status of SMTPC */
		ret = sm_rcb_getuint32(rcb, &v);
		if (sm_is_err(ret)) {
			rv = ret;
			goto err2;
		}

		/* XXX protect access? */
		qsc_ctx->qsc_status = v;
QM_LEV_DPRINTFC(QDC_C2Q, 1, (QM_DEBFP, "func=qm_fr_sc_react, id=%d, stat=%d\n", qsc_ctx->qsc_id, qsc_ctx->qsc_status));

		/* Max number of threads */
		ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
		if (sm_is_err(ret) || l != 4 || rt != RT_C2Q_MAXTHR) {
			rv = sm_is_err(ret) ? ret
				: sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
			goto err2;
		}

#if 0
		r = pthread_mutex_lock(&qsc_ctx->qsc_mutex);
		SM_LOCK_OK(r);
		if (r != 0) {
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_SMTPC, QM_LMOD_FROM_SMTPC,
				SM_LOG_CRIT, 1,
				"sev=CRIT, func=qm_fr_sc_react, lock_qsc=%d"
				, r);
		}
		else {
			qsc_ctx->qsc_maxthreads = v;
			r = pthread_mutex_unlock(&qsc_ctx->qsc_mutex);
			SM_ASSERT(0 == r);
		}
#else /* 0 */
		ret = dadb_set_limit(qsc_ctx->qsc_dadb_ctx, v, THR_LOCK_UNLOCK);
		if (sm_is_err(ret)) {
QM_LEV_DPRINTFC(QDC_C2Q, 1, (QM_DEBFP, "sev=ERROR, func=qm_fr_sc_react, dadb_set_limit=%r\n", ret));
		/* XXX */
		}
#endif /* 0 */

QM_LEV_DPRINTFC(QDC_C2Q, 3, (QM_DEBFP, "func=qm_fr_sc_react, id=%d, maxthreads=%d\n", qsc_ctx->qsc_id, v));

		if (SM_RCB_ISEOB(rcb))
			goto done;

		/* more to come: should be session status! */
		ret = sm_rcb_get2uint32(rcb, &l, &rt);
		if (sm_is_err(ret)) {
			rv = ret;
			goto err2;
		}
	}

	if (RT_C2Q_SEID == rt) {
		sessta_id_T da_se_id;

		if (l != SMTP_STID_SIZE) {
			rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
			goto err2;
		}

		/* Session id */
		ret = sm_rcb_getn(rcb, (uchar *) da_se_id, l);
		if (sm_is_err(ret)) {
			rv = ret;
			goto errseid;
		}

		/*
		**  XXX What to do here?
		**  The session open failed, so it was impossible to start any
		**  transaction. Back to the scheduler to try something
		**  else... where/how do we record this failure?
		*/

		ret = dadb_se_find(qsc_ctx->qsc_dadb_ctx, da_se_id, &dadb_entry);
QM_LEV_DPRINTFC(QDC_C2Q, 2, (QM_DEBFP, "func=qm_fr_sc_react, id=%d, da_se-id=%s, ta-id=%s, dadb_se_find=%r\n", qsc_ctx->qsc_id, da_se_id, dadb_entry->dadbe_da_ta_id, ret));
		if (sm_is_err(ret))
			goto errseid;

		/* Session status */
		ret = sm_rcb_get4uint32(rcb, &l, &rt, &v, &where);
QM_LEV_DPRINTFC(QDC_C2Q, 2, (QM_DEBFP, "func=qm_fr_sc_react, where=RT_C2Q_SEID id=%d, stat=%d, rt=%#x, v=%#x, where=%#x, ret=%r\n", qsc_ctx->qsc_id, qsc_ctx->qsc_status, rt, v, where, ret));
		if (sm_is_err(ret) || l != 8 ||
		    (rt != RT_C2Q_SESTAT && rt != RT_C2Q_SECLSD))
		{
			rv = sm_is_err(ret) ? ret
				: sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
			goto errseid;
		}

		/* Note: rt must be preserved! at least until (RT_C2Q_SECLSD == rt) */

		/*
		**  XXX HACK XXX Always a temporary error.
		**  Should this be done in SMTPC or should the conversion
		**  happen here?
		**  What about error codes during EHLO (i.e., the SMTP
		**  session negotation)?
		**  What about 5xy codes? sm8 tries the next host no
		**  matter what...
		**  XXX HACK: if 5xy code -> change to 4xy to make it
		**  a temporary error to try the next host.
		**  NOTE: this should NOT be done here... Instead some
		**  other logic should decide whether another try should
		**  be made...  this logic should depend on "where" the
		**  error occurred, i.e., a session error should(?) be
		**  handled differently than a transaction error.
		*/

		if (sm_is_err(v)) {
			if ((sm_ret_T) v == sm_error_perm(SM_EM_SMTPC, SM_E_TTMYSELF))
				v = SMTPC_SE_TTMYSELF;
			else if (sm_error_value(v) == ETIMEDOUT)
				v = SMTPC_SE_OP_TMO;
			else if (sm_error_value(v) == ECONNREFUSED)
				v = SMTPC_SE_OP_REFUSED;
			else if (sm_error_value(v) == ENETUNREACH ||
				 sm_error_value(v) == EHOSTUNREACH)
				v = SMTPC_SE_OP_UNREACH;
			else
				v = SMTPC_SE_OPEN_ST;
		}
		else if (IS_SMTP_REPLY(v) && smtp_is_reply_fail(v))
			v -= 100;	/* XXX 5xy -> 4xy; see above */
		/* keep the code...??? */

		if (!SM_RCB_ISEOB(rcb)) {
			uint32_t rtl;

			/* Error message */
			ret = sm_rcb_get2uint32(rcb, &l, &rtl);
			if (sm_is_err(ret) || rtl != RT_C2Q_STATT) {
				rv = sm_is_err(ret) ? ret
					: sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
				goto errtaid;
			}
			ret = sm_rcb_getnstr(rcb, &errmsg, l);
			if (sm_is_err(ret)) {
				rv = ret;
				goto errtaid;
			}
QM_LEV_DPRINTFC(QDC_C2Q, 4, (QM_DEBFP, "func=qm_fr_sc_react, where=seid, rt=RT_C2Q_STATT, id=%d, msg=%S\n", qsc_ctx->qsc_id, errmsg));
			sm_str_sanitize(errmsg);
		}

/* don't decrease curactive before closing session */
#if 0
#define QSC_CURACTIVE_DECR do {								\
		r = pthread_mutex_lock(&qsc_ctx->qsc_mutex);		\
		SM_LOCK_OK(r);						\
		if (r != 0) {							\
			sm_log_write(qmgr_ctx->qmgr_lctx,		\
				QM_LCAT_SMTPC, QM_LMOD_FROM_SMTPC,	\
				SM_LOG_CRIT, 1,				\
				"sev=CRIT, func=qm_fr_sc_react, lock_qsc=%d", r);\
		}							\
		else {							\
			SM_ASSERT(qsc_ctx->qsc_curactive > 0);		\
			qsc_ctx->qsc_curactive--;			\
			r = pthread_mutex_unlock(&qsc_ctx->qsc_mutex);\
			SM_ASSERT(0 == r);				\
		}							\
	} while (0)
#else /* 0 */
#define QSC_CURACTIVE_DECR	SM_NOOP
#endif /* 0 */

		/* session is just closed? */
		if (RT_C2Q_SECLSD == rt) {
			/*
			**  "race condition":
			**  smtpc may close a session due to timeout
			**  qmgr may want to reuse it
			**  the messages "cross" each other, i.e.,
			**  smtpc closed the session before it got the
			**  new task and qmgr sent the new task before it
			**  knows about the close.
			**  how to detect this?
			*/

			if (DADBE_IS_FLAG(dadb_entry, DADBE_FL_BUSY) &&
			    !DADBE_IS_FLAG(dadb_entry, DADBE_FL_IDLE))
				goto done;

			/* need to close entry in DADB */
			ret = dadb_sess_close_entry(qmgr_ctx, qsc_ctx->qsc_dadb_ctx,
					dadb_entry, false, NULL, THR_LOCK_UNLOCK);
			QSC_CURACTIVE_DECR;
			if (sm_is_err(ret)) {
QM_LEV_DPRINTFC(QDC_C2Q, 1, (QM_DEBFP, "sev=ERROR, func=qm_fr_sc_react, id=%d, da_se-id=%s, dadb_sess_close_entry=%r\n", qsc_ctx->qsc_id, da_se_id, ret));
				goto errseid;
			}
			goto done;
		}

		/*
		**  XXX Update session status XXX
		**  Update all (open) transactions within the session.
		**  Currently there can be only one transaction.
		**  Look it up and mark it (all recipients) as
		**  "temporarily failed".
		*/

/*
QM_HRBT_DPRINTF(QDC_C2Q_TM, (QM_DEBFP, "func=qm_fr_sc_react, before=dadb_ta_find\n"));
*/
		ret = dadb_ta_find(qsc_ctx->qsc_dadb_ctx, dadb_entry->dadbe_da_ta_id, &dadb_entry);
QM_LEV_DPRINTFC(QDC_C2Q, 2, (QM_DEBFP, "func=qm_fr_sc_react, id=%d, da_se-id=%s, ta-id=%s, dadb_ta_find=%r\n", qsc_ctx->qsc_id, da_se_id, dadb_entry->dadbe_da_ta_id, ret));
		if (sm_is_err(ret))
			goto errseid;
		SM_ASSERT(dadb_entry->dadbe_ss_ta_id != NULL);
		SM_ASSERT(*dadb_entry->dadbe_ss_ta_id != '\0');

		/* XXX Hack, more or less copy from below XXX */
/*
QM_HRBT_DPRINTF(QDC_C2Q_TM, (QM_DEBFP, "func=qm_fr_sc_react, before=aq_ta_find\n"));
*/
		if (dadb_entry->dadbe_rcpt != NULL &&
		    (aq_ta = dadb_entry->dadbe_rcpt->aqr_ss_ta) != NULL) {
				SM_ASSERT(SESSTA_EQ(dadb_entry->dadbe_ss_ta_id,
				                    aq_ta->aqt_ss_ta_id));
		}
		else
			ret = aq_ta_find(qmgr_ctx->qmgr_aq, dadb_entry->dadbe_ss_ta_id, true, &aq_ta);
QM_LEV_DPRINTFC(QDC_C2Q, 3, (QM_DEBFP, "sev=DBG, func=qm_fr_sc_react, id=%d, da_ta-id=%s, ss_ta-id=%s, aq_ta_find=%r\n", qsc_ctx->qsc_id, dadb_entry->dadbe_da_ta_id, dadb_entry->dadbe_ss_ta_id, ret));
		if (sm_is_err(ret) && sm_error_perm(SM_EM_AQ, SM_E_NOTFOUND) == ret) {
			/*
			 *  entry could have been expired... ??? WHAT NOW?
			 *  we cannot simply stop reading; at least we
			 *  have to skip over the rest, but we also
			 *  need to clean up the DA session cache!
			 */

			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_DASTAT, QM_LMOD_DASTAT,
				SM_LOG_WARN, 8,
				"sev=WARN, func=qm_fr_sc_react, stat=cannot_update_session, da_ta=%s, aq_ta_find=not_found",
				dadb_entry->dadbe_ss_ta_id);
			(void) sm_rcb_close_decn(qsc_ctx->qsc_com.rcbcom_rdrcb);
			(void) qda_dadb_close(qmgr_ctx, dadb_entry->dadbe_ss_ta_id, qsc_ctx->qsc_dadb_ctx, dadb_entry, v, &ret);
			goto done2;
		}
		if (sm_is_err(ret))
			goto errseid;

		/* session must be closed */
		DADBE_SET_FLAG(dadb_entry, DADBE_FL_SE_CL);

/*
QM_HRBT_DPRINTF(QDC_C2Q_TM, 3, (QM_DEBFP, "func=qm_fr_sc_react, before=qda_update_ta_stat\n"));
*/
		/* Update transaction status */
		ret = qda_update_ta_stat(qmgr_ctx, dadb_entry->dadbe_da_ta_id,
				v, where, qsc_ctx->qsc_dadb_ctx, dadb_entry,
				aq_ta, NULL, errmsg, THR_LOCK_UNLOCK);

		QSC_CURACTIVE_DECR;

		if (sm_is_err(ret)) {
QM_LEV_DPRINTFC(QDC_C2Q, 3, (QM_DEBFP, "sev=DBG, func=qm_fr_sc_react, id=%d, da_ta-id=%s, ss_ta-id=%s qda_update_ta_stat=%r\n", qsc_ctx->qsc_id, dadb_entry->dadbe_da_ta_id, dadb_entry->dadbe_ss_ta_id, ret));
			rv = ret;
			goto errseid;
		}

		/* XXX */
		goto done;

	  errseid:
		/* More cleanup? */
		goto err2;
	}
	else if (rt == RT_C2Q_TAID || rt == RT_C2Q_TAID_CS) {
		sessta_id_T da_ta_id;

		if (l != SMTP_STID_SIZE) {
			rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
			goto err2;
		}

		/* Transaction id */
		ret = sm_rcb_getn(rcb, (uchar *) da_ta_id, l);
		if (sm_is_err(ret)) {
			rv = ret;
			goto errtaid;
		}
QM_HRBT_DPRINTF(QDC_C2Q_TM, 0, (QM_DEBFP, "func=qm_fr_sc_react, da_ta_id=%s\n", da_ta_id));

		/*
		**  XXX Which TA do we want to find here?
		**  That is, how do we store the DA TA in the QMGR?
		**  Do we look up all recipients (sequentially)?
		**  Yes -> write a function update_da_ta_status()
		**  which updates the status for all recipients in
		**  this DA TA.
		*/

/*
QM_HRBT_DPRINTF(QDC_C2Q_TM, 2, (QM_DEBFP, "func=qm_fr_sc_react, before=dadb_ta_find\n"));
*/
		ret = dadb_ta_find(qsc_ctx->qsc_dadb_ctx, da_ta_id, &dadb_entry);
QM_LEV_DPRINTFC(QDC_C2Q, 3, (QM_DEBFP, "sev=DBG, func=qm_fr_sc_react, id=%d, da_ta-id=%s, dadb_ta_find=%r\n", qsc_ctx->qsc_id, da_ta_id, ret));
		if (sm_is_err(ret))
			goto errtaid;

		/* "last transaction, close session" */
		if (RT_C2Q_TAID_CS == rt) {
			QSC_CURACTIVE_DECR;
			DADBE_SET_FLAG(dadb_entry, DADBE_FL_SE_CL);
		}
		else if (RT_C2Q_TAID == rt)
			DADBE_SET_FLAG(dadb_entry, DADBE_FL_TA_CL);

		SM_ASSERT(dadb_entry->dadbe_ss_ta_id != NULL);
		SM_ASSERT(*dadb_entry->dadbe_ss_ta_id != '\0');
/*
QM_HRBT_DPRINTF(QDC_C2Q_TM, 2, (QM_DEBFP, "func=qm_fr_sc_react, before=aq_ta_find\n"));
*/
		if (dadb_entry->dadbe_rcpt != NULL &&
		    (aq_ta = dadb_entry->dadbe_rcpt->aqr_ss_ta) != NULL) {
				SM_IS_AQ_RCPT(dadb_entry->dadbe_rcpt);
				SM_ASSERT(SESSTA_EQ(dadb_entry->dadbe_ss_ta_id,
				                    aq_ta->aqt_ss_ta_id));
		}
		else
			ret = aq_ta_find(qmgr_ctx->qmgr_aq, dadb_entry->dadbe_ss_ta_id, true, &aq_ta);
QM_LEV_DPRINTFC(QDC_C2Q, 3, (QM_DEBFP, "sev=DBG, func=qm_fr_sc_react, id=%d, da_ta-id=%s, ss_ta-id=%s aq_ta_find=%r\n", qsc_ctx->qsc_id, da_ta_id, dadb_entry->dadbe_ss_ta_id, ret));
		if (sm_is_err(ret) && sm_error_perm(SM_EM_AQ, SM_E_NOTFOUND) == ret) {
			/*
			 *  ??? WHAT NOW????
			 *  see other aq_ta_find() call and comment!
			 */

			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_DASTAT, QM_LMOD_DASTAT,
				SM_LOG_WARN, 8,
				"sev=WARN, func=qm_fr_sc_react, stat=cannot_update_TA, da_ta=%s, aq_ta_find=not_found",
				dadb_entry->dadbe_ss_ta_id);
			(void) sm_rcb_close_decn(qsc_ctx->qsc_com.rcbcom_rdrcb);
			goto done2;
		}
		if (sm_is_err(ret))
			goto errtaid;

		/* Status */
		ret = sm_rcb_get4uint32(rcb, &l, &rt, &v, &where);
QM_LEV_DPRINTFC(QDC_C2Q, 1, (QM_DEBFP, "sev=DBG, func=qm_fr_sc_react, where=RT_C2Q_status, id=%d, stat=%d, rt=%r, v=%d, where=%r, ret=%#x\n", qsc_ctx->qsc_id, qsc_ctx->qsc_status, rt, v, where, ret));
		if (sm_is_err(ret) || l != 8) {
			rv = sm_is_err(ret) ? ret
				: sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
			goto errtaid;
		}

		if (RT_C2Q_SESTAT == rt) {
			/* XXX Update session status XXX */
			/* Can this happen? Check protocol! */
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_SMTPC, QM_LMOD_FROM_SMTPC,
				SM_LOG_CRIT, 1,
				"sev=CRIT, func=qm_fr_sc_react, rt=%#x, status=unexpected"
				, rt);
		}
		else if (rt == RT_C2Q_TASTAT || rt == RT_C2Q_TARSTAT) {
			if (RT_C2Q_TARSTAT == rt) {
				/* Update recipient status */
				ret = qm_fr_sc_rcpts(qmgr_ctx, da_ta_id, rcb);
				if (sm_is_err(ret)) {
QM_LEV_DPRINTFC(QDC_C2Q, 4, (QM_DEBFP, "sev=DBG, func=qm_fr_sc_react, qm_fr_sc_rcpts=%r\n", ret));
					rv = ret;
					goto errtaid;
				}
			}
			else if (!SM_RCB_ISEOB(rcb)) {
				/* Error message */
				ret = sm_rcb_get2uint32(rcb, &l, &rt);
				if (sm_is_err(ret) || rt != RT_C2Q_STATT) {
					rv = sm_is_err(ret) ? ret
						: sm_error_perm(SM_EM_Q_SC2Q,
								SM_E_PR_ERR);
					goto errtaid;
				}
				ret = sm_rcb_getnstr(rcb, &errmsg, l);
				if (sm_is_err(ret)) {
					rv = ret;
					goto errtaid;
				}
QM_LEV_DPRINTFC(QDC_C2Q, 4, (QM_DEBFP, "sev=DBG, func=qm_fr_sc_react, where=taid, rt=RT_C2Q_STATT, id=%d, msg=%S\n", qsc_ctx->qsc_id, errmsg));
				sm_str_sanitize(errmsg);
			}

			/* Update (rest of) transaction status */
/*
QM_HRBT_DPRINTF(QDC_C2Q_TM, 2, (QM_DEBFP, "func=qm_fr_sc_react, before=qda_update_ta_stat\n"));
*/
			ret = qda_update_ta_stat(qmgr_ctx, da_ta_id,
					STATUS2SMTPCODE(v),
					where, qsc_ctx->qsc_dadb_ctx,
					dadb_entry, aq_ta, NULL,
					errmsg, THR_LOCK_UNLOCK);
QM_HRBT_DPRINTF(QDC_C2Q_TM, 0, (QM_DEBFP, "func=qda_update_ta_stat, da_ta_id=%s\n", da_ta_id));
			if (sm_is_err(ret)) {
				rv = ret;
				goto errtaid;
			}
		}
		else {
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_SMTPC, QM_LMOD_FROM_SMTPC,
				SM_LOG_ERR, 8,
				"sev=ERROR, func=qm_fr_sc_react, rt=%#x, status=unexpected"
				, rt);
			rv = sm_error_perm(SM_EM_Q_SC2Q, SM_E_PR_ERR);
			goto err2;
		}

		/* XXX */
		goto done;

	  errtaid:
		/* more cleanup? */
		goto err2;
	}

#if 0
	/* Other cases? */
	else if (XXX == rt)

#endif /* 0 */
	else
		goto err2;

  done:
	ret = sm_rcb_close_dec(qsc_ctx->qsc_com.rcbcom_rdrcb);
  done2:
	(void) sm_rcb_open_rcv(qsc_ctx->qsc_com.rcbcom_rdrcb);
	SM_STR_FREE(errmsg);
	return rv;

	/* Preserve original error code! */
  err2:
	/* Use rcb functions that don't do check the state */
	(void) sm_rcb_close_decn(qsc_ctx->qsc_com.rcbcom_rdrcb);
  error:
	/* Open rcb for next record */
	(void) sm_rcb_open_rcvn(qsc_ctx->qsc_com.rcbcom_rdrcb);

QM_LEV_DPRINTFC(QDC_C2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_sc_react, id=%d, status=%#x, status=error_out, rt=%#x, v=%d, l=%d, ret=%r\n", qsc_ctx->qsc_id, qsc_ctx->qsc_status, rt, v, l, ret));
	SM_STR_FREE(errmsg);

	/*
	**  Question: Fail on all errors? Only on those returned from
	**  qda_update_ta_stat() or qm_fr_sc_rcpts(), not protocol errors.
	*/

	if (QM_DA_STAT_FATAL(rv)) {
		/* which resource? doesn't matter? */
		(void) qm_rsr_problem(qmgr_ctx, QMGR_RFL_MEM_I, THR_LOCK_UNLOCK);
		rv = EVTHR_TERM;
	}
	return rv;
}

/*
**  QM_FR_SC -- SMTPC to QMGR interface
**
**	Parameters:
**		tsk -- evthr task
**
**	Returns:
**		usual sm_error code
*/

sm_ret_T
qm_fr_sc(sm_evthr_task_P tsk)
{
	int fd, r;
	sm_ret_T ret;
	uint da_threads;
	qmgr_ctx_P qmgr_ctx;
	qsc_ctx_P qsc_ctx;
#if QM_TIMING
	uint32_t frt;
#endif

	SM_IS_EVTHR_TSK(tsk);
	qsc_ctx = (qsc_ctx_P) tsk->evthr_t_actx;
	SM_IS_QSC_CTX(qsc_ctx);
	qmgr_ctx = qsc_ctx->qsc_qmgr_ctx;
	SM_IS_QMGR_CTX(qmgr_ctx);

	fd = tsk->evthr_t_fd;	/* Checked in caller */
	ret = sm_rcb_rcv(fd, qsc_ctx->qsc_com.rcbcom_rdrcb, QSS_RC_MINSZ);

	QM_LEV_DPRINTTC(QDC_C2Q, 4, (QM_DEBFP, "sev=DBG, func=qm_fr_sc, fd=%d, ret=%r, buf=%d, len=%d, qsc_status=%#x\n",
		fd, ret, qsc_ctx->qsc_com.rcbcom_rdrcb->sm_rcb_base[0], sm_rcb_getlen(qsc_ctx->qsc_com.rcbcom_rdrcb), qsc_ctx->qsc_status), qmgr_ctx->qmgr_ev_ctx->evthr_c_time);
	if (ret > 0)
		return EVTHR_WAITQ;
	else if (0 == ret) {
		ret = sm_rcb_close_rcv(qsc_ctx->qsc_com.rcbcom_rdrcb);

		/* start appropriate function ... */
QM_HRBT_DPRINTF(QDC_C2Q_TM, 3, (QM_DEBFP, "func=qm_fr_sc, qm_fr_sc_react=before\n"));
		ret = qm_fr_sc_react(qsc_ctx
#if QM_TIMING
				, &frt
#endif
				);
QM_HRBT_DPRINTF(QDC_C2Q_TM, 3, (QM_DEBFP, "func=qm_fr_sc, qm_fr_sc_react=after, rt=%x\n", frt));
QM_LEV_DPRINTTC(QDC_C2Q, 4, (QM_DEBFP, "sev=DBG, func=qm_fr_sc, qm_fr_sc_react=%r\n", ret), qmgr_ctx->qmgr_ev_ctx->evthr_c_time);
#if QMGR_TEST
		/* trigger an error if requested */
		if (SM_IS_FLAG(qmgr_ctx->qmgr_cnf.q_cnf_tests, QMGR_TEST_SC_RD)
		    && !sm_is_err(ret)) {
QM_LEV_DPRINTTC(QDC_C2Q, 4, (QM_DEBFP, "sev=DBG, func=qm_fr_sc, where=stop_reading\n", ret), qmgr_ctx->qmgr_ev_ctx->evthr_c_time);
			return EVTHR_WAITQ|evthr_r_no(EVTHR_EV_RD);
		}
#endif /* QMGR_TEST */
		if (sm_is_err(ret))
			goto termit;	/* too harsh? */
		else if (QMGR_R_WAITQ == ret)
			return EVTHR_WAITQ;
		else if (QMGR_R_ASYNC == ret)
			return EVTHR_OK;
		else if (EVTHR_DEL == ret)
			goto termit;	/* terminate this client */
		else
			return ret;
	}
	else if (SM_IO_EOF == ret) {
		ret = sm_rcb_close_rcv(qsc_ctx->qsc_com.rcbcom_rdrcb);
  termit:
QM_LEV_DPRINTTC(QDC_C2Q, 1, (QM_DEBFP, "sev=DBG, func=qm_fr_sc, task=%p, status=terminate, ret=%r\n", qsc_ctx->qsc_com.rcbcom_tsk, ret), qmgr_ctx->qmgr_ev_ctx->evthr_c_time);
		close(fd);

		/*
		**  Need to close all outstanding sessions and update
		**  internal data!
		*/

		if (qsc_ctx->qsc_dadb_ctx != NULL)
			da_threads = qsc_ctx->qsc_dadb_ctx->dadb_entries_max;
		else
			da_threads = 0;	/* oops */
		(void) qsc_ctx_close(qsc_ctx);

		/* XXX see comment in qm_fr_ss() */
		tsk->evthr_t_fd = INVALID_FD;	/* make it invalid */
		qsc_ctx->qsc_status = QSC_ST_SH_DOWN;

		r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex);
		SM_LOCK_OK(r);
		if (r != 0) {
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_SMTPC, QM_LMOD_FROM_SMTPC,
				SM_LOG_CRIT, 4,
				"sev=CRIT, func=qm_fr_sc, lock=%d",
				r);
			goto error;
		}

		if (qmgr_ctx->qmgr_max_da_threads >= da_threads) {
			qmgr_ctx->qmgr_max_da_threads -= da_threads;
			QMGR_TMO_SCHED;
		}
		else {
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_SMTPC, QM_LMOD_FROM_SMTPC,
				SM_LOG_INCONS, 5,
				"sev=INCONS, func=qm_fr_sc, qmgr_max_da_threads=%lu, da_threads=%u, status=inconsistent"
				, qmgr_ctx->qmgr_max_da_threads, da_threads);
		}

		/* XXX don't crash while holding lock? */
		SM_ASSERT(qmgr_ctx->qmgr_sc_li.qm_gli_nfd > 0);

		/* free li ctx */
		qmgr_ctx->qmgr_sc_li.qm_gli_nfd--;
		qmgr_ctx->qmgr_sc_li.qm_gli_used &= ~(qsc_ctx->qsc_bit);
		if (qsc_ctx->qsc_com.rcbcom_rdrcb != NULL)
			(void) sm_rcb_close_decn(qsc_ctx->qsc_com.rcbcom_rdrcb);

		r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex);
		SM_ASSERT(0 == r);

		/* free qsc_ctx? */
		qsc_ctx->qsc_com.rcbcom_tsk = NULL;

		/*
		**  XXX QMGR should ask "someone" (MCP) to start a new DA.
		*/

		return EVTHR_DEL;
	}
	else /* if (ret < 0) */ {
QM_LEV_DPRINTFC(QDC_C2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_sc, ret=%r, errno=%d\n", ret, errno));
	}
QM_LEV_DPRINTFC(QDC_C2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_sc, fd=%d\n", fd));

 error:
	return EVTHR_DEL;
}


syntax highlighted by Code2HTML, v. 0.9.1