/*
 * Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers.
 *	All rights reserved.
 *
 * By using this file, you agree to the terms and conditions set
 * forth in the LICENSE file which can be found at the top level of
 * the sendmail distribution.
 */

/*
**  SMTPC - QMGR communication module.
*/

#include "sm/generic.h"
SM_RCSID("@(#)$Id: c2q.c,v 1.141 2007/10/23 03:57:39 ca Exp $")

#include "sm/assert.h"
#include "sm/error.h"
#include "sm/memops.h"
#include "sm/reccom.h"
#include "statethreads/st.h"
#include "sm/rcbst.h"
#include "sm/fcntl.h"
#include "sm/unixsock.h"
#include "sm/stsock.h"
#include "sm/stthreads.h"
#include "sm/qmgrcomm.h"
#include "sm/da.h"
#include "smtpc.h"
#include "c2q.h"
#include "log.h"
#include "sm/qmgr-int.h"	/* HACK: for QSC_ST_SLOW_2 */

#if SC_DEBUG
static void
show_sess(c2q_ctx_P c2q_ctx)
{
	int i;

	for (i = 0; i < c2q_ctx->c2q_max_ses; i++) {
		SC_LEV_DPRINTF(c2q_ctx->c2q_sc_ctx, 0, (smioerr,
			"c2q_sess[%d]=%lx, thread=%u, state=%u\n",
			i, (long) c2q_ctx->c2q_sess[i],
			c2q_ctx->c2q_sess[i] == NULL ? 9999u :
				c2q_ctx->c2q_sess[i]->scse_sct_ctx->sct_thr_id,
			c2q_ctx->c2q_sess[i] == NULL ? 9999u :
				c2q_ctx->c2q_sess[i]->scse_state));
	}
}
#endif /* SC_DEBUG */

/*
**  SC_GET_FREE_THR -- find free thread that can take care of a task from QMGR
**
**	Parameters:
**		sc_ctx -- SMTPC context
**
**	Returns:
**		pointer to thread context
**		(NULL if no free threads is available)
*/

static sc_t_ctx_P
sc_get_free_thr(sc_ctx_P sc_ctx)
{
	uint i;
	sc_t_ctx_P sc_t_ctx;

	/* May want to use some round robin search?? */
	for (i = 0; i < SC_MAX_THREADS(sc_ctx); i++) {
		if ((sc_t_ctx = (sc_ctx->scc_scts)[i]) != NULL &&
		    SC_T_FREE == sc_t_ctx->sct_status)
		{
			return sc_t_ctx;
		}
	}
	return NULL;
}

/*
**  SC_RM_SESS_RQ -- remove a session from the list of active sessions
**
**	Parameters:
**		sc_sess -- session context
**		c2q_ctx -- C2Q context
**
**	Returns:
**		usual sm_error code
*/

sm_ret_T
sc_rm_sess_rq(sc_sess_P sc_sess, c2q_ctx_P c2q_ctx)
{
	int i;

	SM_IS_C2Q_CTX(c2q_ctx);
	SM_IS_SC_SE(sc_sess);
	i = sc_sess->scse_c2q_idx;
	SC_LEV_DPRINTF(sc_sess->scse_sct_ctx->sct_sc_ctx, 5,
			(smioerr, "sc_rm_sess_rq: sc_sess=%lx, cq2_idx=%d\n",
			(long) sc_sess, i));
	if (SCSE_C2Q_IDX_NONE == i)
		return SM_SUCCESS;
	SM_REQUIRE(i >= 0);
	SM_REQUIRE((uint)i < c2q_ctx->c2q_max_ses);
	c2q_ctx->c2q_sess[(uint)i] = NULL;
	return SM_SUCCESS;
}

/*
**  SC_ADD_SESS_RQ -- add a session to the list of active sessions
**	(see also sc_find_sess_rq())
**
**	Parameters:
**		sc_sess -- session context
**		c2q_ctx -- C2Q context
**
**	Returns:
**		usual sm_error code
*/

static sm_ret_T
sc_add_sess_rq(sc_sess_P sc_sess, c2q_ctx_P c2q_ctx)
{
	uint i;

	SM_IS_SC_SE(sc_sess);
	SM_IS_C2Q_CTX(c2q_ctx);
	SC_LEV_DPRINTF(sc_sess->scse_sct_ctx->sct_sc_ctx, 1,
			(smioerr, "sc_add_sess_rq: sc_sess=%lx\n", (long) sc_sess));
	for (i = 0; i < c2q_ctx->c2q_max_ses; i++) {
		if (c2q_ctx->c2q_sess[i] == NULL ||
		    c2q_ctx->c2q_sess[i]->scse_state == SCSE_ST_NONE)
		{
			c2q_ctx->c2q_sess[i] = sc_sess;
			sc_sess->scse_c2q_idx = i;
			sc_sess->scse_state = SCSE_ST_NEW;
			return SM_SUCCESS;
		}
	}
	SC_LEV_DPRINTF(sc_sess->scse_sct_ctx->sct_sc_ctx, 0, (smioerr,
		"sc_add_sess_rq: FATAL: sc_sess=%lx, i=%d\n", (long) sc_sess, i));
#if SC_DEBUG
	show_sess(c2q_ctx);
	/* abort... */
	SM_ASSERT(i < c2q_ctx->c2q_max_ses);
#endif
	return sm_error_perm(SM_EM_SMTPC, SM_E_FULL);
}

/*
**  SC_FIND_SESS_RQ -- find session id in the list of active sessions
**
**	Parameters:
**		c2q_ctx -- C2Q context
**		sess_id -- session id
**
**	Returns:
**		session context (NULL if not found)
*/

static sc_sess_P
sc_find_sess_rq(c2q_ctx_P c2q_ctx, sessta_id_P sess_id)
{
	uint i;
	sc_sess_P sc_sess;

	SM_IS_C2Q_CTX(c2q_ctx);
	for (i = 0; i < c2q_ctx->c2q_max_ses; i++) {
		if (c2q_ctx->c2q_sess[i] != NULL &&
		    SESSTA_EQ(c2q_ctx->c2q_sess[i]->scse_id, sess_id))
		{
			sc_sess = c2q_ctx->c2q_sess[i];
			return sc_sess;
		}
	}
	return NULL;
}

/*
**  SC_RCB_SEND -- send a (closed) RCB to QMGR
**
**	Parameters:
**		rcb -- RCB
**		c2q_ctx -- C2Q context
**
**	Returns:
**		usual sm_error code
*/

sm_ret_T
sc_rcb_send(sm_rcb_P rcb, c2q_ctx_P c2q_ctx)
{
	sm_ret_T ret;
	int r;
#if SC_TIMING
	st_utime_t uta, utb;
#endif

	SM_IS_C2Q_CTX(c2q_ctx);
	if (C2Q_IS_IOERR(c2q_ctx))
		return sm_error_temp(SM_EM_SMTPC, EIO);

	ret = sm_rcb_open_snd(rcb);
	if (sm_is_err(ret)) goto error;
#if SC_TIMING
	utb = st_utime();
#endif

	/* make sure there aren't multiple sends */
	r = st_mutex_lock(c2q_ctx->c2q_wr_mutex);
	if (r != 0) {
		SC_LEV_DPRINTF(c2q_ctx->c2q_sc_ctx, 0, (smioerr,
			"ERROR: sc_rcb_send: st_mutex_lock=%d, errno=%d\n", r, errno));
		goto err1;
	}
	ret = sm_rcb_snd(c2q_ctx->c2q_fd, rcb, c2q_ctx->c2q_sc_ctx->scc_qmgr_tmo);
	r = st_mutex_unlock(c2q_ctx->c2q_wr_mutex);
	if (r != 0) {
		SC_LEV_DPRINTF(c2q_ctx->c2q_sc_ctx, 0, (smioerr,
			"ERROR: sc_rcb_send: st_mutex_unlock=%d, errno=%d\n", r, errno));
		SM_ASSERT(0 == r);	/* abort */
		if (r != 0 && sm_is_success(ret))
			ret = sm_error_perm(SM_EM_SMTPC, r);
	}
	if (sm_is_err(ret)) {
		C2Q_SET_IOERR(c2q_ctx);

		/*
		**  NOTE: we ask for a shutdown here as the communication
		**  with qmgr is broken. This is not the best place to do it,
		**  such a decision should be made by the upper layers.
		**  Moreover, a better solution is to reconnect to qmgr,
		**  but currently that is not worth the effort (as we don't
		**  know whether just the connection is lost or whether
		**  qmgr died; in the latter case smtpc should be restarted
		**  by mcp anyway).
		*/

		SC_SET_FLAG(c2q_ctx->c2q_sc_ctx, SCC_FL_SHUTDOWN);
		goto err1;
	}
#if SC_TIMING
	uta = st_utime();
	if (uta > utb)
		sm_io_fprintf(smioerr, "send=%lu\n", (unsigned long) (uta - utb));
#endif
	ret = sm_rcb_close_snd(rcb);
	if (sm_is_err(ret)) goto error;
	return ret;

  err1:
	(void) sm_rcb_close_snd(rcb);
  error:
	return ret;
}

/*
**  SC_C2Q -- creates RCB for sending status back to QMGR;
**	RCB is only sent for session status, not for TA status (needs to
**		be done by caller: invoke sc_rcb_send()).
**
**	Parameters:
**		sc_t_ctx -- SMTPC thread context
**		whichstatus -- session/transaction/... status?
**		status -- status (simplified!!!)
**		c2q_ctx -- C2Q context
**
**	Returns:
**		usual sm_error code
*/

sm_ret_T
sc_c2q(sc_t_ctx_P sc_t_ctx, uint32_t whichstatus, int status, c2q_ctx_P c2q_ctx)
{
	sm_ret_T ret;
	sc_sess_P sc_sess;
	uint32_t idt, err_state;
	sessta_id_P id;
	sc_ta_P sc_ta;
	bool rcptstats;

	SM_IS_C2Q_CTX(c2q_ctx);
	SM_IS_SC_T_CTX(sc_t_ctx);
	sc_sess = sc_t_ctx->sct_sess;
	SM_IS_SC_SE(sc_sess);
	SM_IS_RCB(sc_t_ctx->sct_rcb);

	sc_ta = NULL;
	(void) sm_rcb_close_decn(sc_t_ctx->sct_rcb);
	if (C2Q_IS_IOERR(c2q_ctx))
		return sm_error_temp(SM_EM_SMTPC, EIO);
	ret = sm_rcb_open_enc(sc_t_ctx->sct_rcb, -1);
	if (sm_is_err(ret)) {
		SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 0,
			(smioerr, "sev=ERROR, func=sc_c2q, sm_rcb_open=%m\n", ret));
		goto error;
	}

	/*
	**  Which status??  If whichstatus is something else than
	**  RT_C2Q_SESTAT and RT_C2Q_TASTAT, then we may need to send
	**  more data back? Or do we have to do that only in case of
	**  RT_C2Q_RCPT_ST?
	**  Also change recordtype and _id (session/ta/...) accordingly.
	*/

	rcptstats = false;
	if (RT_C2Q_SESTAT == whichstatus || RT_C2Q_SECLSD == whichstatus) {
		idt = RT_C2Q_SEID;
		id = sc_sess->scse_id;
		err_state = sc_sess->scse_err_st;
	}
	else {
		sc_ta = sc_sess->scse_ta;
		SM_IS_SC_TA(sc_ta);

		/* Close session? */
		idt = SCSE_IS_FLAG(sc_sess, SCSE_FL_CLOSE)
				? RT_C2Q_TAID_CS : RT_C2Q_TAID;
		id = sc_ta->scta_id;
		err_state = sc_ta->scta_err_state;
		if (DA_TA_ERR_RCPT_S == sc_ta->scta_err_state ||
		    SCTA_IS_STATE(sc_ta, SCTA_R_PERM|SCTA_R_TEMP))
		{
			rcptstats = true;
			whichstatus = RT_C2Q_TARSTAT;
		}
		/* Other cases?? */
	}

	ret = sm_rcb_putv(sc_t_ctx->sct_rcb, RCB_PUTV_FIRST,
		SM_RCBV_INT, RT_PROT_VER, PROT_VER_RT,
		SM_RCBV_INT, RT_C2Q_ID, c2q_ctx->c2q_sc_ctx->scc_cnf.sc_cnf_id,
		SM_RCBV_BUF, idt, id, SMTP_STID_SIZE,
		SM_RCBV_INT2, whichstatus, (uint32_t) status, err_state,
		SM_RCBV_END);
	if (sm_is_err(ret)) {
		SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 0,
			(smioerr, "sev=ERROR, func=sc_c2q, sm_rcb_putv-1=%m\n", ret));
		goto err2;
	}
	if (RT_C2Q_SESTAT == whichstatus &&
	    sc_sess->scse_err_st != DA_SE_ERR_TTMYSLEF_S &&
	    sc_sess->scse_reply != NULL &&
	    sm_str_getlen(sc_sess->scse_reply) > 0)
	{
		ret = sm_rcb_putv(sc_t_ctx->sct_rcb, RCB_PUTV_NONE,
			SM_RCBV_STR, RT_C2Q_STATT, sc_sess->scse_reply,
			SM_RCBV_END);
		if (sm_is_err(ret)) {
			SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 0,
				(smioerr, "sev=ERROR, func=sc_c2q, sm_rcb_putv-2=%m\n", ret));
			goto err2;
		}
	}
	if (rcptstats) {
		uint nrcpts;
		sc_rcpt_P sc_rcpt;
		sc_rcpts_P sc_rcpt_lst;

		nrcpts = 0;
		sc_rcpt_lst = &sc_ta->scta_rcpts;
		for (sc_rcpt = SC_RCPTS_FIRST(sc_rcpt_lst);
		     sc_rcpt != SC_RCPTS_END(sc_rcpt_lst);
		     sc_rcpt = SC_RCPTS_NEXT(sc_rcpt))
		{
			if (SMTP_OK == sc_rcpt->scr_st)
				continue;
			++nrcpts;
		}

		SM_ASSERT(nrcpts > 0);
		ret = sm_rcb_putv(sc_t_ctx->sct_rcb, RCB_PUTV_NONE,
			SM_RCBV_INT, RT_C2Q_RCPT_N, nrcpts,
			SM_RCBV_END);
		if (sm_is_err(ret)) {
			SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 0,
				(smioerr, "sev=ERROR, func=sc_c2q, sm_rcb_putv-3=%m\n", ret));
			goto err2;
		}

		sc_rcpt_lst = &sc_ta->scta_rcpts;
		for (sc_rcpt = SC_RCPTS_FIRST(sc_rcpt_lst);
		     sc_rcpt != SC_RCPTS_END(sc_rcpt_lst);
		     sc_rcpt = SC_RCPTS_NEXT(sc_rcpt))
		{
			if (SMTP_OK == sc_rcpt->scr_st)
				continue;

			SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 3,
				(smioerr,
				"func=sc_c2q, where=rcpt, stat=%d, len=%u, text=%S\n",
				sc_rcpt->scr_st,
				NULL == sc_rcpt->scr_reply ? 0 :
				   sm_str_getlen(sc_rcpt->scr_reply),
				sc_rcpt->scr_reply));
			ret = sm_rcb_putv(sc_t_ctx->sct_rcb, RCB_PUTV_NONE,
				SM_RCBV_INT, RT_C2Q_RCPT_IDX, sc_rcpt->scr_idx,
				SM_RCBV_INT, RT_C2Q_RCPT_ST, (uint32_t) sc_rcpt->scr_st,
				SM_RCBV_STR, RT_C2Q_RCPT_STT, sc_rcpt->scr_reply,
				SM_RCBV_END);
			if (sm_is_err(ret)) {
				SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 0,
					(smioerr, "sev=ERROR, func=sc_c2q, sm_rcb_putv-4=%m\n", ret));
				goto err2;
			}
			SM_ASSERT(nrcpts > 0);
			--nrcpts;
		}
		SM_ASSERT(0 == nrcpts);
	}
	else if (sc_ta != NULL && DA_TA_ERR_MAIL_S == sc_ta->scta_err_state &&
		 sc_ta->scta_mail->scm_reply != NULL)
	{
		ret = sm_rcb_putv(sc_t_ctx->sct_rcb, RCB_PUTV_NONE,
			SM_RCBV_STR, RT_C2Q_STATT, sc_ta->scta_mail->scm_reply,
			SM_RCBV_END);
		if (sm_is_err(ret)) {
			SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 0,
				(smioerr, "sev=ERROR, func=sc_c2q, sm_rcb_putv-5=%m\n", ret));
			goto err2;
		}
	}
	else if (sc_ta != NULL &&
		 (DA_TA_ERR_DATA_S == sc_ta->scta_err_state ||
		  DA_TA_ERR_DOT_S == sc_ta->scta_err_state) &&
		 sc_ta->scta_reply != NULL)
	{
		/* scta_reply is currently only set after DATA */
		ret = sm_rcb_putv(sc_t_ctx->sct_rcb, RCB_PUTV_NONE,
			SM_RCBV_STR, RT_C2Q_STATT, sc_ta->scta_reply,
			SM_RCBV_END);
		if (sm_is_err(ret)) {
			SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 0, (smioerr,
				"sev=ERROR, func=sc_c2q, sm_rcb_putv-6=%m\n"
				, ret));
			goto err2;
		}
	}
	/* other error messages, e.g., from DATA, final dot? */

	ret = sm_rcb_close_enc(sc_t_ctx->sct_rcb);
	if (sm_is_err(ret)) goto err2;
	if (RT_C2Q_SESTAT == whichstatus || RT_C2Q_SECLSD == whichstatus) {
		ret = sc_rcb_send(sc_t_ctx->sct_rcb, c2q_ctx);
		if (sm_is_err(ret)) goto err2;
	}
	if (NULL == sc_ta)
		SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 3, (smioerr,
			"sev=DBG, func=sc_c2q, whichstatus=%#x, stat=%d\n",
			whichstatus, status));
	else
		SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 3, (smioerr,
			"sev=DBG, func=sc_c2q, whichstatus=%#x, stat=%d, err_st=%#x, ta_state=%#x\n",
			whichstatus, status, sc_ta->scta_err_state,
			sc_ta->scta_state));
	return ret;

  error:
	(void) sm_rcb_close_n(sc_t_ctx->sct_rcb);
  err2:
	sm_log_write(sc_t_ctx->sct_sc_ctx->scc_lctx,
		SC_LCAT_INTERN, SC_LMOD_INTERN,
		SM_LOG_ERR, 6,
		"sev=ERROR, func=sc_c2q, ret=%m", ret);
	return ret;
}

/*
**  SC_RCB_FROM_QMGR -- receive an RCB from QMGR, notify thread (session)
**	This runs permanently as a thread.
**
**	Parameters:
**		arg -- C2Q context
**
**	Returns:
**		NULL on termination
*/

static void *
sc_rcb_from_qmgr(void *arg)
{
	uint32_t v, l, rt, tl, se_flags;
	int r;
	sm_ret_T ret;
	sm_rcb_P rcb, rcbt;
	c2q_ctx_P c2q_ctx;
	sessta_id_T sess_id, sess_id2;
	sc_sess_P sc_sess;
	sc_t_ctx_P sc_t_ctx;
	sc_ctx_P sc_ctx;

	SM_REQUIRE(arg != NULL);
	c2q_ctx = (c2q_ctx_P) arg;
	SM_IS_C2Q_CTX(c2q_ctx);
	sc_ctx = c2q_ctx->c2q_sc_ctx;
	rcb = sm_rcb_new(NULL, QSS_RC_SZ, QSS_RC_MAXSZ);
	if (NULL == rcb) {
		SC_SET_FLAG(sc_ctx, SCC_FL_SHUTDOWN);

		/* currently unused as nobody checks a return value */
		ret = sm_error_temp(SM_EM_SMTPC, ENOMEM);
		goto error;
	}
	sc_t_ctx = NULL;	/* avoid bogus compiler warning */

	/*
	**  Don't start running before at least one thread is ready
	**  and has initialized its data.
	**  Slight abuse of condition variable...
	*/

	while (!SC_IS_FLAG(sc_ctx, SCC_FL_COMMOK)) {
		/* CONF timeout */
		r = st_cond_timedwait(c2q_ctx->c2q_cond_rd, SEC2USEC(10));
#if 0
		if (0 == r)
			/* complain? */ ;
#endif
	}

	while (!SC_IS_FLAG(sc_ctx, SCC_FL_SHUTDOWN)) {
		sc_sess = NULL;
		ret = sm_rcb_open_rcv(rcb);
		if (sm_is_err(ret)) {
			sm_log_write(sc_ctx->scc_lctx,
				SC_LCAT_COMM, SC_LMOD_COMM,
				SM_LOG_ERR, 6,
				"sev=ERROR, func=sc_rcb_from_qmgr, sm_rcb_open_rcv=%m",
				ret);
			goto error;
		}

		/* note: no timeout, this will just wait for an RCB */
		ret = sm_rcb_rcv(c2q_ctx->c2q_fd, rcb, 12, (st_utime_t) -1);
		if (sm_is_err(ret)) {
			if (!E_IS_TEMP(sm_error_value(ret))) {
				if (ret != SM_IO_EOF) {
					sm_log_write(sc_ctx->scc_lctx,
						SC_LCAT_COMM, SC_LMOD_COMM,
						SM_LOG_ERR, 6,
						"sev=ERROR, func=sc_rcb_from_qmgr, sm_rcb_rcv=%m",
						ret);
				}
				goto error;
			}
			ret = sm_rcb_close_rcv(rcb);
			if (sm_is_err(ret)) goto error;
			continue;
		}
		ret = sm_rcb_close_rcv(rcb);
		if (sm_is_err(ret)) goto error;

		/*
		**  format:
		**  RT_Q2C_ID: int smtpc-id
		**  either RT_Q2C_*SEID: str session-id
		**  or RT_Q2C_*TAID: str transaction-id
		*/

		ret = sm_rcb_open_dec(rcb);
		if (sm_is_err(ret)) {
			sm_log_write(sc_ctx->scc_lctx,
				SC_LCAT_INTERN, SC_LMOD_INTERN,
				SM_LOG_ERR, 6,
				"sev=ERROR, func=sc_rcb_from_qmgr, sm_rcb_open_dec=%m",
				ret);
			continue;
		}

		/* total length of record */
		ret = sm_rcb_getuint32(rcb, &tl);
		SC_LEV_DPRINTF(sc_ctx, 6, (smioerr, "sev=DBG, func=sc_rcb_from_qmgr, tl=%d, ret=%m\n", tl, ret));
		if (sm_is_err(ret) || tl > QSS_RC_MAXSZ)
			goto errdec;

		/* protocol header: version */
		ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
		SC_LEV_DPRINTF(sc_ctx, 6, (smioerr, "sev=DBG, func=sc_rcb_from_qmgr, where=prot, l=%d, rt=%#x, v=%d, ret=%m\n", l, rt, v, ret));
		if (sm_is_err(ret) || l != 4 || rt != RT_PROT_VER || v != PROT_VER_RT)
			goto errdec;

		/* RT_Q2C_ID: int smtpc-id */
		ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
		SC_LEV_DPRINTF(sc_ctx, 6, (smioerr, "sev=DBG, func=sc_rcb_from_qmgr, where=smtpc-id, l=%d, rt=%#x, v=%d, ret=%m\n", l, rt, v, ret));
		if (sm_is_err(ret) || l != 4 ||
		    rt != RT_Q2C_ID || v != c2q_ctx->c2q_sc_ctx->scc_cnf.sc_cnf_id)
			goto errdec;

		ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
		if (sm_is_err(ret) || l != 4 || rt != RT_Q2C_SE_FLAGS)
			goto errdec;
		/* XXX do something with those flags... */
		se_flags = v;

#ifdef RT_Q2C_DCID
		/* RT_Q2C_DCID: delivery class (cur. unused) */
		ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
		SC_LEV_DPRINTF(sc_ctx, 6, (smioerr, "sev=DBG, func=sc_rcb_from_qmgr, where=delivery-class, l=%d, rt=%#x, v=%d, ret=%m\n", l, rt, v, ret));
		if (sm_is_err(ret) || l != 4 || rt != RT_Q2C_DCID)
			goto errdec;
#endif /* RT_Q2C_DCID */

		ret = sm_rcb_get2uint32(rcb, &l, &rt);
		SC_LEV_DPRINTF(sc_ctx, 6, (smioerr, "sev=DBG, func=sc_rcb_from_qmgr, where=sc_sess-id, l=%d, rt=%#x, ret=%m\n", l, rt, ret));
		if (l != SMTP_STID_SIZE ||
		    (rt != RT_Q2C_NSEID && rt != RT_Q2C_SEID &&
		     rt != RT_Q2C_CSEID &&
		     rt != RT_Q2C_NSEID_1TA && rt != RT_Q2C_SEID_1TA))
			goto errdec;

		/* session id */
		ret = sm_rcb_getn(rcb, (uchar *) sess_id, l);
		if (sm_is_err(ret)) goto errdec;

		if (RT_Q2C_CSEID == rt) {
			ret = sm_rcb_get2uint32(rcb, &l, &rt);
			SC_LEV_DPRINTF(sc_ctx, 6, (smioerr, "sev=DBG, func=sc_rcb_from_qmgr, where=csc_sess-id, l=%d, rt=%#x, ret=%m\n", l, rt, ret));
			if (l != SMTP_STID_SIZE ||
			    (rt != RT_Q2C_NSEID && rt != RT_Q2C_SEID &&
			     rt != RT_Q2C_NSEID_1TA && rt != RT_Q2C_SEID_1TA))
				goto errdec;

			/* session id */
			ret = sm_rcb_getn(rcb, (uchar *) sess_id2, l);
			if (sm_is_err(ret)) goto errdec;
		}

		/* use an existing session? */
		if (RT_Q2C_SEID == rt || RT_Q2C_SEID_1TA == rt) {
			sc_sess = sc_find_sess_rq(c2q_ctx, sess_id);
			SC_LEV_DPRINTF(sc_ctx, 6, (smioerr,
				"sc_rcb_from_qmgr: found sc_sess=%lx, id=%s\n",
				(long) sc_sess, sess_id));
			if (NULL == sc_sess && (RT_Q2C_SEID == rt || RT_Q2C_SEID_1TA == rt))
			{
				sm_log_write(sc_ctx->scc_lctx,
					SC_LCAT_INTERN, SC_LMOD_INTERN,
					SM_LOG_WARN, 6,
					"sev=WARN, func=sc_rcb_from_qmgr, se_id=%s, status=not_found, action=continue_as_if_NSEID, rt=%#x",
					sess_id, rt);
				rt = (RT_Q2C_SEID == rt) ? RT_Q2C_NSEID : RT_Q2C_NSEID_1TA;
				/* see below... */
			}
			else { /* (sc_sess != NULL || rt != RT_Q2C_SEID) */
				if (NULL == sc_sess) {
					sm_log_write(sc_ctx->scc_lctx,
						SC_LCAT_INTERN, SC_LMOD_INTERN,
						SM_LOG_ERR, 6,
						"sev=ERROR, func=sc_rcb_from_qmgr, se_id=%s, status=not_found",
						sess_id);
					goto errdec;
				}
				sc_t_ctx = sc_sess->scse_sct_ctx;
				if (NULL == sc_t_ctx) {
					sm_log_write(sc_ctx->scc_lctx,
						SC_LCAT_INTERN, SC_LMOD_INTERN,
						SM_LOG_ERR, 6,
						"sev=ERROR, func=sc_rcb_from_qmgr, where=reuse_session, sc_t_ctx=NULL");
					goto errdec;
				}
				SM_IS_SC_T_CTX(sc_t_ctx);
				if (SC_T_BUSY == sc_t_ctx->sct_status ||
				    SC_T_NOTRDY == sc_t_ctx->sct_status)
				{
					sm_log_write(sc_ctx->scc_lctx,
						SC_LCAT_INTERN, SC_LMOD_INTERN,
						SM_LOG_ERR, 6,
						"sev=ERROR, func=sc_rcb_from_qmgr, where=reuse_session, sc_t_ctx=%p, stat=%u",
						sc_t_ctx, sc_t_ctx->sct_status);
					goto errdec;
				}
				if (sc_t_ctx->sct_status != SC_T_IDLE) {
					sm_log_write(sc_ctx->scc_lctx,
						SC_LCAT_INTERN, SC_LMOD_INTERN,
						SM_LOG_WARN, 9,
						"sev=WARN, func=sc_rcb_from_qmgr, where=reuse_session, sc_t_ctx=%p, stat=%u",
						sc_t_ctx, sc_t_ctx->sct_status);
				}
				if (SC_T_IDLE == sc_t_ctx->sct_status) {
					SM_ASSERT(SC_IDLE_THREADS(sc_ctx) > 0);
					SC_IDLE_THREADS(sc_ctx)--;
#if SC_STATS
					SC_SE_REUSE(sc_ctx)++;
#endif
				}

				if (SC_T_CLOSING == sc_t_ctx->sct_status) {
					/*
					**  Note: this blocks this thread and hence
					**  no new tasks can be received.
					**  Possible solutions:
					**  1. use alternative implementation
					**  (each thread does its own task receiption)
					**  2. just use a different sc_t_ctx:
					**  the current one is closing anyway,
					**  so why reuse it?
					**    a) because any upper limit on the number
					**       of open connections won't be exceeded
					**    b) there might not be a free thread
					*/

					/* inform thread that we want to reuse it */
					sc_t_ctx->sct_status = SC_T_REUSE;

					/* CONF timeout */
					r = st_cond_timedwait(c2q_ctx->c2q_cond_rd, SEC2USEC(5));
				}
				if (SC_T_REUSE == sc_t_ctx->sct_status) {
					/*
					**  session not closed: "fall through" to
					**  get a new thread.
					*/

					sm_log_write(sc_ctx->scc_lctx,
						SC_LCAT_CLIENT, SC_LMOD_CLIENT,
						SM_LOG_WARN, 4,
						"sev=WARN, func=sc_rcb_from_qmgr, sess=%p, se_id=%s, status=still_closing, busy=%u, wait=%u, idle=%u, closing=%u"
						, sc_sess
						, sc_sess->scse_id
						, SC_BUSY_THREADS(sc_ctx)
						, SC_WAIT_THREADS(sc_ctx)
						, SC_IDLE_THREADS(sc_ctx)
						, SC_CLOSING_THREADS(sc_ctx)
						);
					rt = (RT_Q2C_SEID == rt) ? RT_Q2C_NSEID : RT_Q2C_NSEID_1TA;
				}
				else {
					sm_log_write(sc_ctx->scc_lctx,
						SC_LCAT_CLIENT, SC_LMOD_CLIENT,
						SM_LOG_INFO, 9,
						"sev=INFO, func=sc_rcb_from_qmgr, sess=%p, se_id=%s, status=%u, busy=%u, wait=%u, idle=%u, closing=%u, rt=%#x"
						, sc_sess
						, sc_sess->scse_id
						, sc_t_ctx->sct_status
						, SC_BUSY_THREADS(sc_ctx)
						, SC_WAIT_THREADS(sc_ctx)
						, SC_IDLE_THREADS(sc_ctx)
						, SC_CLOSING_THREADS(sc_ctx)
						, rt
						);
					sc_t_ctx->sct_status = SC_T_BUSY;

					/* check sc_sess->scse_state */

					/* XXX RT_Q2C_TAID_CS doesn't seem to be possible? */
					if (RT_Q2C_CSEID == rt || RT_Q2C_TAID_CS == rt)
						SCSE_SET_FLAG(sc_sess, SCSE_FL_CLOSE);
				}
			}
		}
/* XXX "fall through" when SEID but session already closed? */
		if (RT_Q2C_NSEID == rt || RT_Q2C_NSEID_1TA == rt) {
			uint tried;

			for (tried = 0;;) {
				sc_t_ctx = sc_get_free_thr(sc_ctx);
				if (sc_t_ctx != NULL)
					break;

				sm_log_write(sc_ctx->scc_lctx,
					SC_LCAT_INTERN, SC_LMOD_INTERN,
					SM_LOG_WARN, 8,
					"sev=WARN, func=sc_rcb_from_qmgr, sc_get_free_thr=failed, id=%s, waitthr=%u, idle=%u, closing=%u, busy=%u, total=%u, max=%u, tried=%u"
					, sess_id
					, SC_WAIT_THREADS(sc_ctx)
					, SC_IDLE_THREADS(sc_ctx)
					, SC_CLOSING_THREADS(sc_ctx)
					, SC_BUSY_THREADS(sc_ctx)
					, SC_TOTAL_THREADS(sc_ctx)
					, SC_MAX_THREADS(sc_ctx)
					, tried);
				SM_ASSERT(SC_WAIT_THREADS(sc_ctx) >= SC_IDLE_THREADS(sc_ctx));

				if (SC_TOTAL_THREADS(sc_ctx) < SC_MAX_THREADS(sc_ctx)
				    && !SC_IS_FLAG(sc_ctx, SCC_FL_SHUTDOWN))
				{
					/* Create a thread */
					if (st_thread_create(sc_hdl_requests, (void *) sc_ctx, 0, 0)
					    == NULL)
					{
						sm_log_write(sc_ctx->scc_lctx,
							SC_LCAT_INTERN, SC_LMOD_INTERN,
							SM_LOG_ERR, 1,
							"sev=ERROR, func=sc_rcb_from_qmgr, status=cannot_create_thread, func=sc_rcb_from_qmgr, sc_get_free_thr=failed, id=%s, maxthr=%u, wait=%u, min_wait=%u, idle=%u, closing=%u, busy=%u, total=%u, tried=%u, errno=%d",
							sess_id,
							SC_MAX_THREADS(sc_ctx),
							SC_WAIT_THREADS(sc_ctx),
							SC_MIN_WAIT_THREADS(sc_ctx),
							SC_IDLE_THREADS(sc_ctx),
							SC_CLOSING_THREADS(sc_ctx),
							SC_BUSY_THREADS(sc_ctx),
							SC_TOTAL_THREADS(sc_ctx),
							tried, errno);
					}
					else {
						SC_WAIT_THREADS(sc_ctx)++;

						/* let sc_hdl_requests() start*/
						st_sleep(0);
						sc_t_ctx = sc_get_free_thr(sc_ctx);
						sm_log_write(sc_ctx->scc_lctx,
							SC_LCAT_INTERN, SC_LMOD_INTERN,
							SM_LOG_INFO, 14,
							"sev=INFO, func=sc_rcb_form_qmgr, wait=%u, idle=%u, closing=%u, busy=%u, sc_hdl_requests=created, sc_t_ctx=%p"
							, SC_WAIT_THREADS(sc_ctx)
							, SC_IDLE_THREADS(sc_ctx)
							, SC_CLOSING_THREADS(sc_ctx)
							, SC_BUSY_THREADS(sc_ctx)
							, sc_t_ctx
							);
						if (sc_t_ctx != NULL)
							break;
					}
				}

				if (SC_TOTAL_THREADS(sc_ctx) >= SC_MAX_THREADS(sc_ctx)
				    || tried > SC_MAX_THREADS(sc_ctx))
				{
					sm_log_write(sc_ctx->scc_lctx,
						SC_LCAT_INTERN, SC_LMOD_INTERN,
						SM_LOG_ERR, 4,
						"sev=ERROR, func=sc_rcb_from_qmgr, sc_get_free_thr=failed"
						", id=%s, max=%u, wait=%u, min_wait=%u"
						", idle=%u, closing=%u, busy=%u, total=%u, tried=%u"
						, sess_id, SC_MAX_THREADS(sc_ctx)
						, SC_WAIT_THREADS(sc_ctx)
						, SC_MIN_WAIT_THREADS(sc_ctx)
						, SC_IDLE_THREADS(sc_ctx)
						, SC_CLOSING_THREADS(sc_ctx)
						, SC_BUSY_THREADS(sc_ctx)
						, SC_TOTAL_THREADS(sc_ctx)
						, tried);

#if SC_DEBUG
					for (r = 0; r < SC_MAX_THREADS(sc_ctx); r++) {
						sc_t_ctx = (sc_ctx->scc_scts)[r];
						sm_io_fprintf(smioerr,
							"i=%3d, sc_t_ctx=%p, status=%u\n"
							, r, sc_t_ctx
							, sc_t_ctx != NULL ?  sc_t_ctx->sct_status : 9
							, (sc_t_ctx != NULL && sc_t_ctx->sct_sess != NULL) ?
									sc_t_ctx->sct_sess->scse_id : "-"
							);
					}
					SM_ASSERT(0);
#endif /* SC_DEBUG */

					/* fixme: Better error code... */
					ret = sm_error_temp(SM_EM_SMTPC, ENOMEM);
					goto errslow;
				}

				/* let sc_hdl_request() do something */
				st_sleep(0);
				++tried;
			}
#if 0
			SM_ASSERT(sc_t_ctx != NULL);
#endif
			SC_LEV_DPRINTF(sc_ctx, 2, (smioerr, "sc_rcb_from_qmgr: sc_get_free_thr=%u, wait=%u, busy=%u, id=%s, sc_sess=%p\n",
sc_t_ctx->sct_thr_id, SC_WAIT_THREADS(sc_ctx), SC_BUSY_THREADS(sc_ctx), sess_id, (long) sc_t_ctx->sct_sess));
			sc_sess = sc_t_ctx->sct_sess;

			sm_log_write(sc_ctx->scc_lctx,
				SC_LCAT_CLIENT, SC_LMOD_CLIENT,
				SM_LOG_DEBUG, 11,
				"sev=DBG, func=sc_rcb_from_qmgr, where=new_task, sess=%p, se_id=%s, status=%u, busy=%u, wait=%u, idle=%u, closing=%u"
				, sc_sess, sess_id
				, sc_t_ctx->sct_status
				, SC_BUSY_THREADS(sc_ctx)
				, SC_WAIT_THREADS(sc_ctx)
				, SC_IDLE_THREADS(sc_ctx)
				, SC_CLOSING_THREADS(sc_ctx)
				);

			sc_t_ctx->sct_status = SC_T_BUSY;
			ret = sc_add_sess_rq(sc_sess, c2q_ctx);
			if (sm_is_err(ret)) {
				sm_log_write(sc_ctx->scc_lctx,
					SC_LCAT_INTERN, SC_LMOD_INTERN,
					SM_LOG_ERR, 5,
					"sev=ERROR, func=sc_rcb_from_qmgr, se_id=%s, sc_add_sess_rq=%m",
					sess_id, ret);
				sc_t_ctx->sct_status = SC_T_FREE;
				goto errdec;
			}
			SESSTA_COPY(sc_sess->scse_id, sess_id);

			/*
			**  sc_get_free_thr() must return a thread context
			**  with an initialized sc_sess.
			*/

			SCSE_INHERIT_FLAG(sc_sess);
			sc_sess->scse_cap = SCSE_CAP_NONE;

			/* only one shot? */
			if (RT_Q2C_SEID_1TA == rt || RT_Q2C_NSEID_1TA == rt)
				SCSE_SET_FLAG(sc_sess, SCSE_FL_LAST_TA);
		}

		/* sc_t_ctx != NULL && sc_sess != NULL */
		SM_IS_SC_T_CTX(sc_t_ctx);

		SM_IS_SC_SE(sc_sess);
		if (!SM_IS_FLAG(v, DA_FL_SE_KEEP))
			SCSE_SET_FLAG(sc_sess, SCSE_FL_LAST_TA);

		/*
		**  Just "exchange" our RCB and the one of the session
		**  Notice: this requires that there is only one outstanding
		**  request per session. Otherwise we may exchange the RCB
		**  while the session is using it (which can only happen if
		**  the RCB is used across a scheduling point).
		*/

		rcbt = sc_t_ctx->sct_rcb;
		SM_ASSERT(rcbt != NULL);
		sc_t_ctx->sct_rcb = rcb;
		rcb = rcbt;

		/* notify session */
		r = st_cond_signal(sc_t_ctx->sct_cond_rd);
		if (r != 0) {
			sm_log_write(sc_ctx->scc_lctx,
				SC_LCAT_INTERN, SC_LMOD_INTERN,
				SM_LOG_ERR, 6,
				"sev=ERROR, func=sc_rcb_from_qmgr, st_cond_signal=%d",
				r);
			goto errdec;
		}

		/*
		**  Do NOT close the RCB: we exchanged it and we should
		**  have now a "clean" (closed) one.
		*/

		continue;

	  errslow:

		/*
		**  Can't process request from QMGR due to resource shortage.
		**  Need to tell qmgr about the problem, at least:
		**  task could not be accepted due to resource
		**  problems. It might be nice to tell it also:
		**  only N total threads will be available from
		**  now on (how to increase that number later on again?)
		*/

		ret = sm_rcb_putrec(c2q_ctx->c2q_notify_rcb,
			RCB_PUTR_DFLT, 0, -1,
			SM_RCBV_INT, RT_PROT_VER, PROT_VER_RT,
			SM_RCBV_INT, RT_C2Q_ID, c2q_ctx->c2q_sc_ctx->scc_cnf.sc_cnf_id,
			SM_RCBV_INT, RT_C2Q_STAT, QSC_ST_SLOW_2,
			SM_RCBV_INT, RT_C2Q_MAXTHR, SC_TOTAL_THREADS(sc_ctx),
			SM_RCBV_BUF, RT_C2Q_SEID, sess_id, SMTP_STID_SIZE,
			SM_RCBV_INT2, RT_C2Q_SESTAT, ret, DA_SE_ERR_CRT_I,
			SM_RCBV_END);
		if (!sm_is_err(ret)) {
			ret = sc_rcb_send(c2q_ctx->c2q_notify_rcb, c2q_ctx);
			if (sm_is_err(ret))
				sm_log_write(sc_ctx->scc_lctx,
					SC_LCAT_COMM, SC_LMOD_COMM,
					SM_LOG_ERR, 6,
					"sev=ERROR, func=sc_rcb_from_qmgr, sc_rcb_send=%m",
					ret);
		}
		else
			sm_log_write(sc_ctx->scc_lctx,
				SC_LCAT_COMM, SC_LMOD_COMM,
				SM_LOG_ERR, 6,
				"sev=ERROR, func=sc_rcb_from_qmgr, sm_rcb_putrec=%m",
				ret);

	  errdec:
		sm_log_write(sc_ctx->scc_lctx,
			SC_LCAT_COMM, SC_LMOD_COMM,
			SM_LOG_ERR, 6,
			"sev=ERROR, func=sc_rcb_from_qmgr, l=%d, rt=%#x, ret=%m",
			l, rt, ret);
		ret = sm_rcb_close_dec(rcb);
		if (sm_is_err(ret))
		{
			/* COMPLAIN */
			continue;
		}
	}
	goto error;

  error:
	if (rcb != NULL)
		sm_rcb_free(rcb);
	return NULL;
}

/*
**  C2Q_CONNECT -- connect to server
**
**	Parameters:
**		sc_ctx -- SMTPC context
**		c2q_ctx -- C2Q context
**
**	Returns:
**		usual sm_error code
*/

static sm_ret_T
c2q_connect(sc_ctx_P sc_ctx, c2q_ctx_P c2q_ctx)
{
	sm_ret_T ret;
	sm_rcb_P rcb;
	time_t time1;

	SM_IS_C2Q_CTX(c2q_ctx);
	SM_IS_SC_CTX(sc_ctx);

	time1 = st_time();
	for (;;) {
		ret = un_st_client_connect(sc_ctx->scc_cnf.sc_cnf_smtpcsock_abs,
				SEC2USEC(sc_ctx->scc_cnf.sc_cnf_wait4srv), &c2q_ctx->c2q_fd);
		if (sm_is_err(ret) &&
		    time1 + sc_ctx->scc_cnf.sc_cnf_wait4srv > st_time())
		{
			st_sleep(1);
		}
		else
			break;
	}
	if (sm_is_err(ret)) {
		sm_log_write(sc_ctx->scc_lctx,
			SC_LCAT_INIT, SC_LMOD_TO_QMGR,
			SM_LOG_ERR, 1,
			"sev=ERROR, func=c2q_connect, socket=%s, connect=%m"
			, sc_ctx->scc_cnf.sc_cnf_smtpcsock_abs, ret);
		goto error;
	}

	rcb = c2q_ctx->c2q_notify_rcb;
	ret = sm_rcb_putrec(rcb, RCB_PUTR_DFLT, 0, -1,
		SM_RCBV_INT, RT_PROT_VER, PROT_VER_RT,
		SM_RCBV_INT, RT_C2Q_NID, c2q_ctx->c2q_sc_ctx->scc_cnf.sc_cnf_id,
		SM_RCBV_INT, RT_C2Q_MAXTHR, SC_MAX_THREADS(sc_ctx),
		SM_RCBV_END);
	if (sm_is_err(ret))
		goto error;
	ret = sc_rcb_send(rcb, c2q_ctx);
	if (sm_is_err(ret))
		goto error;

	return SM_SUCCESS;

  error:
	/* cleanup? */
	return ret;
}

/*
**  C2Q_INIT -- initialize C2Q
**
**	Parameters:
**		sc_ctx -- SMTPC context
**		c2q_ctx -- C2Q context
**		max_ses  -- maximum number of open sessions
**
**	Returns:
**		usual sm_error code
*/

sm_ret_T
c2q_init(sc_ctx_P sc_ctx, c2q_ctx_P c2q_ctx, uint max_ses)
{
	sm_ret_T ret;
	sm_rcb_P rcb;
	st_thread_t rdtsk;
	size_t n;

	/* should we allocate the context here? */
	SM_REQUIRE(c2q_ctx != NULL);
	SM_REQUIRE(sc_ctx != NULL);

	/* clear out data */
	sm_memzero(c2q_ctx, sizeof(*c2q_ctx));
	rcb = NULL;
	c2q_ctx->c2q_sc_ctx = sc_ctx;
	c2q_ctx->c2q_fd = INVALID_NETFD;
	c2q_ctx->c2q_cond_rd = st_cond_new();
	if (NULL == c2q_ctx->c2q_cond_rd) {
		ret = sm_error_temp(SM_EM_SMTPC, errno);
		goto error;
	}
	c2q_ctx->c2q_wr_mutex = st_mutex_new();
	if (NULL == c2q_ctx->c2q_wr_mutex) {
		ret = sm_error_temp(SM_EM_SMTPC, errno);
		goto error;
	}

	rcb = sm_rcb_new(NULL, C2Q_RCB_SIZE, QSS_RC_MAXSZ);
	if (NULL == rcb) goto errnomem;
	c2q_ctx->c2q_notify_rcb = rcb;
	c2q_ctx->c2q_max_ses = max_ses;
#if 0
	c2q_ctx->c2q_cur_ses = 0;
#endif
	n = max_ses * sizeof(*c2q_ctx->c2q_sess);
	c2q_ctx->c2q_sess = (sc_sess_P *) sm_zalloc(n);
	if (NULL == c2q_ctx->c2q_sess) goto errnomem;

	c2q_ctx->sm_magic = SM_C2Q_CTX_MAGIC;
	ret = c2q_connect(sc_ctx, c2q_ctx);
	if (sm_is_err(ret)) goto error;

	c2q_ctx->c2q_status = C2Q_ST_OK;

	rdtsk = st_thread_create(sc_rcb_from_qmgr, (void *) c2q_ctx, 0, 0);
	if (NULL == rdtsk) {
		ret = sm_error_temp(SM_EM_SMTPC, errno);
		goto error;
	}
	return SM_SUCCESS;

  errnomem:
	ret = sm_error_temp(SM_EM_SMTPC, ENOMEM);
  error:
	SM_FREE(c2q_ctx->c2q_sess);
	if (rcb != NULL)
		sm_rcb_free(rcb);
	if (c2q_ctx->c2q_wr_mutex != NULL) {
		st_mutex_destroy(c2q_ctx->c2q_wr_mutex);
		c2q_ctx->c2q_wr_mutex = NULL;
	}
	if (c2q_ctx->c2q_cond_rd != NULL) {
		st_cond_destroy(c2q_ctx->c2q_cond_rd);
		c2q_ctx->c2q_cond_rd = NULL;
	}
	if (c2q_ctx->c2q_fd != INVALID_NETFD) {
		(void) un_st_socket_close(c2q_ctx->c2q_fd);
		c2q_ctx->c2q_fd = INVALID_NETFD;
	}
	return ret;
}

/*
**  C2Q_CLOSE -- close connection to server
**
**	Parameters:
**		c2q_ctx -- C2Q context
**
**	Returns:
**		usual sm_error code
*/

static sm_ret_T
c2q_close(c2q_ctx_P c2q_ctx)
{
	sm_ret_T ret;

	SM_IS_C2Q_CTX(c2q_ctx);
	if (c2q_ctx->c2q_fd != INVALID_NETFD) {
		ret = un_st_socket_close(c2q_ctx->c2q_fd);
		if (sm_is_err(ret)) goto error;
		c2q_ctx->c2q_fd = INVALID_NETFD;
	}
	c2q_ctx->c2q_status = C2Q_ST_CLOSED;
	return SM_SUCCESS;

  error:
	return ret;
}

/*
**  C2Q_STOP -- stop C2Q
**
**	Parameters:
**		c2q_ctx -- C2Q context
**
**	Returns:
**		usual sm_error code
*/

sm_ret_T
c2q_stop(c2q_ctx_P c2q_ctx)
{
	sm_ret_T ret;

	SM_IS_C2Q_CTX(c2q_ctx);
	ret = c2q_close(c2q_ctx);
	if (c2q_ctx->c2q_wr_mutex != NULL) {
		st_mutex_destroy(c2q_ctx->c2q_wr_mutex);
		c2q_ctx->c2q_wr_mutex = NULL;
	}
	if (c2q_ctx->c2q_cond_rd != NULL) {
		st_cond_destroy(c2q_ctx->c2q_cond_rd);
		c2q_ctx->c2q_cond_rd = NULL;
	}
	SM_FREE(c2q_ctx->c2q_sess);
	sm_memzero(c2q_ctx, sizeof(*c2q_ctx));
	c2q_ctx->sm_magic = SM_MAGIC_NULL;	/* not needed if this is 0 */

	/* free c2q_ctx? only if allocated above! */
	return ret;
}


syntax highlighted by Code2HTML, v. 0.9.1