/*
* Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers.
* All rights reserved.
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the sendmail distribution.
*/
/*
** SMTPC - QMGR communication module.
*/
#include "sm/generic.h"
SM_RCSID("@(#)$Id: c2q.c,v 1.141 2007/10/23 03:57:39 ca Exp $")
#include "sm/assert.h"
#include "sm/error.h"
#include "sm/memops.h"
#include "sm/reccom.h"
#include "statethreads/st.h"
#include "sm/rcbst.h"
#include "sm/fcntl.h"
#include "sm/unixsock.h"
#include "sm/stsock.h"
#include "sm/stthreads.h"
#include "sm/qmgrcomm.h"
#include "sm/da.h"
#include "smtpc.h"
#include "c2q.h"
#include "log.h"
#include "sm/qmgr-int.h" /* HACK: for QSC_ST_SLOW_2 */
#if SC_DEBUG
static void
show_sess(c2q_ctx_P c2q_ctx)
{
int i;
for (i = 0; i < c2q_ctx->c2q_max_ses; i++) {
SC_LEV_DPRINTF(c2q_ctx->c2q_sc_ctx, 0, (smioerr,
"c2q_sess[%d]=%lx, thread=%u, state=%u\n",
i, (long) c2q_ctx->c2q_sess[i],
c2q_ctx->c2q_sess[i] == NULL ? 9999u :
c2q_ctx->c2q_sess[i]->scse_sct_ctx->sct_thr_id,
c2q_ctx->c2q_sess[i] == NULL ? 9999u :
c2q_ctx->c2q_sess[i]->scse_state));
}
}
#endif /* SC_DEBUG */
/*
** SC_GET_FREE_THR -- find free thread that can take care of a task from QMGR
**
** Parameters:
** sc_ctx -- SMTPC context
**
** Returns:
** pointer to thread context
** (NULL if no free threads is available)
*/
static sc_t_ctx_P
sc_get_free_thr(sc_ctx_P sc_ctx)
{
uint i;
sc_t_ctx_P sc_t_ctx;
/* May want to use some round robin search?? */
for (i = 0; i < SC_MAX_THREADS(sc_ctx); i++) {
if ((sc_t_ctx = (sc_ctx->scc_scts)[i]) != NULL &&
SC_T_FREE == sc_t_ctx->sct_status)
{
return sc_t_ctx;
}
}
return NULL;
}
/*
** SC_RM_SESS_RQ -- remove a session from the list of active sessions
**
** Parameters:
** sc_sess -- session context
** c2q_ctx -- C2Q context
**
** Returns:
** usual sm_error code
*/
sm_ret_T
sc_rm_sess_rq(sc_sess_P sc_sess, c2q_ctx_P c2q_ctx)
{
int i;
SM_IS_C2Q_CTX(c2q_ctx);
SM_IS_SC_SE(sc_sess);
i = sc_sess->scse_c2q_idx;
SC_LEV_DPRINTF(sc_sess->scse_sct_ctx->sct_sc_ctx, 5,
(smioerr, "sc_rm_sess_rq: sc_sess=%lx, cq2_idx=%d\n",
(long) sc_sess, i));
if (SCSE_C2Q_IDX_NONE == i)
return SM_SUCCESS;
SM_REQUIRE(i >= 0);
SM_REQUIRE((uint)i < c2q_ctx->c2q_max_ses);
c2q_ctx->c2q_sess[(uint)i] = NULL;
return SM_SUCCESS;
}
/*
** SC_ADD_SESS_RQ -- add a session to the list of active sessions
** (see also sc_find_sess_rq())
**
** Parameters:
** sc_sess -- session context
** c2q_ctx -- C2Q context
**
** Returns:
** usual sm_error code
*/
static sm_ret_T
sc_add_sess_rq(sc_sess_P sc_sess, c2q_ctx_P c2q_ctx)
{
uint i;
SM_IS_SC_SE(sc_sess);
SM_IS_C2Q_CTX(c2q_ctx);
SC_LEV_DPRINTF(sc_sess->scse_sct_ctx->sct_sc_ctx, 1,
(smioerr, "sc_add_sess_rq: sc_sess=%lx\n", (long) sc_sess));
for (i = 0; i < c2q_ctx->c2q_max_ses; i++) {
if (c2q_ctx->c2q_sess[i] == NULL ||
c2q_ctx->c2q_sess[i]->scse_state == SCSE_ST_NONE)
{
c2q_ctx->c2q_sess[i] = sc_sess;
sc_sess->scse_c2q_idx = i;
sc_sess->scse_state = SCSE_ST_NEW;
return SM_SUCCESS;
}
}
SC_LEV_DPRINTF(sc_sess->scse_sct_ctx->sct_sc_ctx, 0, (smioerr,
"sc_add_sess_rq: FATAL: sc_sess=%lx, i=%d\n", (long) sc_sess, i));
#if SC_DEBUG
show_sess(c2q_ctx);
/* abort... */
SM_ASSERT(i < c2q_ctx->c2q_max_ses);
#endif
return sm_error_perm(SM_EM_SMTPC, SM_E_FULL);
}
/*
** SC_FIND_SESS_RQ -- find session id in the list of active sessions
**
** Parameters:
** c2q_ctx -- C2Q context
** sess_id -- session id
**
** Returns:
** session context (NULL if not found)
*/
static sc_sess_P
sc_find_sess_rq(c2q_ctx_P c2q_ctx, sessta_id_P sess_id)
{
uint i;
sc_sess_P sc_sess;
SM_IS_C2Q_CTX(c2q_ctx);
for (i = 0; i < c2q_ctx->c2q_max_ses; i++) {
if (c2q_ctx->c2q_sess[i] != NULL &&
SESSTA_EQ(c2q_ctx->c2q_sess[i]->scse_id, sess_id))
{
sc_sess = c2q_ctx->c2q_sess[i];
return sc_sess;
}
}
return NULL;
}
/*
** SC_RCB_SEND -- send a (closed) RCB to QMGR
**
** Parameters:
** rcb -- RCB
** c2q_ctx -- C2Q context
**
** Returns:
** usual sm_error code
*/
sm_ret_T
sc_rcb_send(sm_rcb_P rcb, c2q_ctx_P c2q_ctx)
{
sm_ret_T ret;
int r;
#if SC_TIMING
st_utime_t uta, utb;
#endif
SM_IS_C2Q_CTX(c2q_ctx);
if (C2Q_IS_IOERR(c2q_ctx))
return sm_error_temp(SM_EM_SMTPC, EIO);
ret = sm_rcb_open_snd(rcb);
if (sm_is_err(ret)) goto error;
#if SC_TIMING
utb = st_utime();
#endif
/* make sure there aren't multiple sends */
r = st_mutex_lock(c2q_ctx->c2q_wr_mutex);
if (r != 0) {
SC_LEV_DPRINTF(c2q_ctx->c2q_sc_ctx, 0, (smioerr,
"ERROR: sc_rcb_send: st_mutex_lock=%d, errno=%d\n", r, errno));
goto err1;
}
ret = sm_rcb_snd(c2q_ctx->c2q_fd, rcb, c2q_ctx->c2q_sc_ctx->scc_qmgr_tmo);
r = st_mutex_unlock(c2q_ctx->c2q_wr_mutex);
if (r != 0) {
SC_LEV_DPRINTF(c2q_ctx->c2q_sc_ctx, 0, (smioerr,
"ERROR: sc_rcb_send: st_mutex_unlock=%d, errno=%d\n", r, errno));
SM_ASSERT(0 == r); /* abort */
if (r != 0 && sm_is_success(ret))
ret = sm_error_perm(SM_EM_SMTPC, r);
}
if (sm_is_err(ret)) {
C2Q_SET_IOERR(c2q_ctx);
/*
** NOTE: we ask for a shutdown here as the communication
** with qmgr is broken. This is not the best place to do it,
** such a decision should be made by the upper layers.
** Moreover, a better solution is to reconnect to qmgr,
** but currently that is not worth the effort (as we don't
** know whether just the connection is lost or whether
** qmgr died; in the latter case smtpc should be restarted
** by mcp anyway).
*/
SC_SET_FLAG(c2q_ctx->c2q_sc_ctx, SCC_FL_SHUTDOWN);
goto err1;
}
#if SC_TIMING
uta = st_utime();
if (uta > utb)
sm_io_fprintf(smioerr, "send=%lu\n", (unsigned long) (uta - utb));
#endif
ret = sm_rcb_close_snd(rcb);
if (sm_is_err(ret)) goto error;
return ret;
err1:
(void) sm_rcb_close_snd(rcb);
error:
return ret;
}
/*
** SC_C2Q -- creates RCB for sending status back to QMGR;
** RCB is only sent for session status, not for TA status (needs to
** be done by caller: invoke sc_rcb_send()).
**
** Parameters:
** sc_t_ctx -- SMTPC thread context
** whichstatus -- session/transaction/... status?
** status -- status (simplified!!!)
** c2q_ctx -- C2Q context
**
** Returns:
** usual sm_error code
*/
sm_ret_T
sc_c2q(sc_t_ctx_P sc_t_ctx, uint32_t whichstatus, int status, c2q_ctx_P c2q_ctx)
{
sm_ret_T ret;
sc_sess_P sc_sess;
uint32_t idt, err_state;
sessta_id_P id;
sc_ta_P sc_ta;
bool rcptstats;
SM_IS_C2Q_CTX(c2q_ctx);
SM_IS_SC_T_CTX(sc_t_ctx);
sc_sess = sc_t_ctx->sct_sess;
SM_IS_SC_SE(sc_sess);
SM_IS_RCB(sc_t_ctx->sct_rcb);
sc_ta = NULL;
(void) sm_rcb_close_decn(sc_t_ctx->sct_rcb);
if (C2Q_IS_IOERR(c2q_ctx))
return sm_error_temp(SM_EM_SMTPC, EIO);
ret = sm_rcb_open_enc(sc_t_ctx->sct_rcb, -1);
if (sm_is_err(ret)) {
SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 0,
(smioerr, "sev=ERROR, func=sc_c2q, sm_rcb_open=%m\n", ret));
goto error;
}
/*
** Which status?? If whichstatus is something else than
** RT_C2Q_SESTAT and RT_C2Q_TASTAT, then we may need to send
** more data back? Or do we have to do that only in case of
** RT_C2Q_RCPT_ST?
** Also change recordtype and _id (session/ta/...) accordingly.
*/
rcptstats = false;
if (RT_C2Q_SESTAT == whichstatus || RT_C2Q_SECLSD == whichstatus) {
idt = RT_C2Q_SEID;
id = sc_sess->scse_id;
err_state = sc_sess->scse_err_st;
}
else {
sc_ta = sc_sess->scse_ta;
SM_IS_SC_TA(sc_ta);
/* Close session? */
idt = SCSE_IS_FLAG(sc_sess, SCSE_FL_CLOSE)
? RT_C2Q_TAID_CS : RT_C2Q_TAID;
id = sc_ta->scta_id;
err_state = sc_ta->scta_err_state;
if (DA_TA_ERR_RCPT_S == sc_ta->scta_err_state ||
SCTA_IS_STATE(sc_ta, SCTA_R_PERM|SCTA_R_TEMP))
{
rcptstats = true;
whichstatus = RT_C2Q_TARSTAT;
}
/* Other cases?? */
}
ret = sm_rcb_putv(sc_t_ctx->sct_rcb, RCB_PUTV_FIRST,
SM_RCBV_INT, RT_PROT_VER, PROT_VER_RT,
SM_RCBV_INT, RT_C2Q_ID, c2q_ctx->c2q_sc_ctx->scc_cnf.sc_cnf_id,
SM_RCBV_BUF, idt, id, SMTP_STID_SIZE,
SM_RCBV_INT2, whichstatus, (uint32_t) status, err_state,
SM_RCBV_END);
if (sm_is_err(ret)) {
SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 0,
(smioerr, "sev=ERROR, func=sc_c2q, sm_rcb_putv-1=%m\n", ret));
goto err2;
}
if (RT_C2Q_SESTAT == whichstatus &&
sc_sess->scse_err_st != DA_SE_ERR_TTMYSLEF_S &&
sc_sess->scse_reply != NULL &&
sm_str_getlen(sc_sess->scse_reply) > 0)
{
ret = sm_rcb_putv(sc_t_ctx->sct_rcb, RCB_PUTV_NONE,
SM_RCBV_STR, RT_C2Q_STATT, sc_sess->scse_reply,
SM_RCBV_END);
if (sm_is_err(ret)) {
SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 0,
(smioerr, "sev=ERROR, func=sc_c2q, sm_rcb_putv-2=%m\n", ret));
goto err2;
}
}
if (rcptstats) {
uint nrcpts;
sc_rcpt_P sc_rcpt;
sc_rcpts_P sc_rcpt_lst;
nrcpts = 0;
sc_rcpt_lst = &sc_ta->scta_rcpts;
for (sc_rcpt = SC_RCPTS_FIRST(sc_rcpt_lst);
sc_rcpt != SC_RCPTS_END(sc_rcpt_lst);
sc_rcpt = SC_RCPTS_NEXT(sc_rcpt))
{
if (SMTP_OK == sc_rcpt->scr_st)
continue;
++nrcpts;
}
SM_ASSERT(nrcpts > 0);
ret = sm_rcb_putv(sc_t_ctx->sct_rcb, RCB_PUTV_NONE,
SM_RCBV_INT, RT_C2Q_RCPT_N, nrcpts,
SM_RCBV_END);
if (sm_is_err(ret)) {
SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 0,
(smioerr, "sev=ERROR, func=sc_c2q, sm_rcb_putv-3=%m\n", ret));
goto err2;
}
sc_rcpt_lst = &sc_ta->scta_rcpts;
for (sc_rcpt = SC_RCPTS_FIRST(sc_rcpt_lst);
sc_rcpt != SC_RCPTS_END(sc_rcpt_lst);
sc_rcpt = SC_RCPTS_NEXT(sc_rcpt))
{
if (SMTP_OK == sc_rcpt->scr_st)
continue;
SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 3,
(smioerr,
"func=sc_c2q, where=rcpt, stat=%d, len=%u, text=%S\n",
sc_rcpt->scr_st,
NULL == sc_rcpt->scr_reply ? 0 :
sm_str_getlen(sc_rcpt->scr_reply),
sc_rcpt->scr_reply));
ret = sm_rcb_putv(sc_t_ctx->sct_rcb, RCB_PUTV_NONE,
SM_RCBV_INT, RT_C2Q_RCPT_IDX, sc_rcpt->scr_idx,
SM_RCBV_INT, RT_C2Q_RCPT_ST, (uint32_t) sc_rcpt->scr_st,
SM_RCBV_STR, RT_C2Q_RCPT_STT, sc_rcpt->scr_reply,
SM_RCBV_END);
if (sm_is_err(ret)) {
SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 0,
(smioerr, "sev=ERROR, func=sc_c2q, sm_rcb_putv-4=%m\n", ret));
goto err2;
}
SM_ASSERT(nrcpts > 0);
--nrcpts;
}
SM_ASSERT(0 == nrcpts);
}
else if (sc_ta != NULL && DA_TA_ERR_MAIL_S == sc_ta->scta_err_state &&
sc_ta->scta_mail->scm_reply != NULL)
{
ret = sm_rcb_putv(sc_t_ctx->sct_rcb, RCB_PUTV_NONE,
SM_RCBV_STR, RT_C2Q_STATT, sc_ta->scta_mail->scm_reply,
SM_RCBV_END);
if (sm_is_err(ret)) {
SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 0,
(smioerr, "sev=ERROR, func=sc_c2q, sm_rcb_putv-5=%m\n", ret));
goto err2;
}
}
else if (sc_ta != NULL &&
(DA_TA_ERR_DATA_S == sc_ta->scta_err_state ||
DA_TA_ERR_DOT_S == sc_ta->scta_err_state) &&
sc_ta->scta_reply != NULL)
{
/* scta_reply is currently only set after DATA */
ret = sm_rcb_putv(sc_t_ctx->sct_rcb, RCB_PUTV_NONE,
SM_RCBV_STR, RT_C2Q_STATT, sc_ta->scta_reply,
SM_RCBV_END);
if (sm_is_err(ret)) {
SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 0, (smioerr,
"sev=ERROR, func=sc_c2q, sm_rcb_putv-6=%m\n"
, ret));
goto err2;
}
}
/* other error messages, e.g., from DATA, final dot? */
ret = sm_rcb_close_enc(sc_t_ctx->sct_rcb);
if (sm_is_err(ret)) goto err2;
if (RT_C2Q_SESTAT == whichstatus || RT_C2Q_SECLSD == whichstatus) {
ret = sc_rcb_send(sc_t_ctx->sct_rcb, c2q_ctx);
if (sm_is_err(ret)) goto err2;
}
if (NULL == sc_ta)
SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 3, (smioerr,
"sev=DBG, func=sc_c2q, whichstatus=%#x, stat=%d\n",
whichstatus, status));
else
SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 3, (smioerr,
"sev=DBG, func=sc_c2q, whichstatus=%#x, stat=%d, err_st=%#x, ta_state=%#x\n",
whichstatus, status, sc_ta->scta_err_state,
sc_ta->scta_state));
return ret;
error:
(void) sm_rcb_close_n(sc_t_ctx->sct_rcb);
err2:
sm_log_write(sc_t_ctx->sct_sc_ctx->scc_lctx,
SC_LCAT_INTERN, SC_LMOD_INTERN,
SM_LOG_ERR, 6,
"sev=ERROR, func=sc_c2q, ret=%m", ret);
return ret;
}
/*
** SC_RCB_FROM_QMGR -- receive an RCB from QMGR, notify thread (session)
** This runs permanently as a thread.
**
** Parameters:
** arg -- C2Q context
**
** Returns:
** NULL on termination
*/
static void *
sc_rcb_from_qmgr(void *arg)
{
uint32_t v, l, rt, tl, se_flags;
int r;
sm_ret_T ret;
sm_rcb_P rcb, rcbt;
c2q_ctx_P c2q_ctx;
sessta_id_T sess_id, sess_id2;
sc_sess_P sc_sess;
sc_t_ctx_P sc_t_ctx;
sc_ctx_P sc_ctx;
SM_REQUIRE(arg != NULL);
c2q_ctx = (c2q_ctx_P) arg;
SM_IS_C2Q_CTX(c2q_ctx);
sc_ctx = c2q_ctx->c2q_sc_ctx;
rcb = sm_rcb_new(NULL, QSS_RC_SZ, QSS_RC_MAXSZ);
if (NULL == rcb) {
SC_SET_FLAG(sc_ctx, SCC_FL_SHUTDOWN);
/* currently unused as nobody checks a return value */
ret = sm_error_temp(SM_EM_SMTPC, ENOMEM);
goto error;
}
sc_t_ctx = NULL; /* avoid bogus compiler warning */
/*
** Don't start running before at least one thread is ready
** and has initialized its data.
** Slight abuse of condition variable...
*/
while (!SC_IS_FLAG(sc_ctx, SCC_FL_COMMOK)) {
/* CONF timeout */
r = st_cond_timedwait(c2q_ctx->c2q_cond_rd, SEC2USEC(10));
#if 0
if (0 == r)
/* complain? */ ;
#endif
}
while (!SC_IS_FLAG(sc_ctx, SCC_FL_SHUTDOWN)) {
sc_sess = NULL;
ret = sm_rcb_open_rcv(rcb);
if (sm_is_err(ret)) {
sm_log_write(sc_ctx->scc_lctx,
SC_LCAT_COMM, SC_LMOD_COMM,
SM_LOG_ERR, 6,
"sev=ERROR, func=sc_rcb_from_qmgr, sm_rcb_open_rcv=%m",
ret);
goto error;
}
/* note: no timeout, this will just wait for an RCB */
ret = sm_rcb_rcv(c2q_ctx->c2q_fd, rcb, 12, (st_utime_t) -1);
if (sm_is_err(ret)) {
if (!E_IS_TEMP(sm_error_value(ret))) {
if (ret != SM_IO_EOF) {
sm_log_write(sc_ctx->scc_lctx,
SC_LCAT_COMM, SC_LMOD_COMM,
SM_LOG_ERR, 6,
"sev=ERROR, func=sc_rcb_from_qmgr, sm_rcb_rcv=%m",
ret);
}
goto error;
}
ret = sm_rcb_close_rcv(rcb);
if (sm_is_err(ret)) goto error;
continue;
}
ret = sm_rcb_close_rcv(rcb);
if (sm_is_err(ret)) goto error;
/*
** format:
** RT_Q2C_ID: int smtpc-id
** either RT_Q2C_*SEID: str session-id
** or RT_Q2C_*TAID: str transaction-id
*/
ret = sm_rcb_open_dec(rcb);
if (sm_is_err(ret)) {
sm_log_write(sc_ctx->scc_lctx,
SC_LCAT_INTERN, SC_LMOD_INTERN,
SM_LOG_ERR, 6,
"sev=ERROR, func=sc_rcb_from_qmgr, sm_rcb_open_dec=%m",
ret);
continue;
}
/* total length of record */
ret = sm_rcb_getuint32(rcb, &tl);
SC_LEV_DPRINTF(sc_ctx, 6, (smioerr, "sev=DBG, func=sc_rcb_from_qmgr, tl=%d, ret=%m\n", tl, ret));
if (sm_is_err(ret) || tl > QSS_RC_MAXSZ)
goto errdec;
/* protocol header: version */
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
SC_LEV_DPRINTF(sc_ctx, 6, (smioerr, "sev=DBG, func=sc_rcb_from_qmgr, where=prot, l=%d, rt=%#x, v=%d, ret=%m\n", l, rt, v, ret));
if (sm_is_err(ret) || l != 4 || rt != RT_PROT_VER || v != PROT_VER_RT)
goto errdec;
/* RT_Q2C_ID: int smtpc-id */
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
SC_LEV_DPRINTF(sc_ctx, 6, (smioerr, "sev=DBG, func=sc_rcb_from_qmgr, where=smtpc-id, l=%d, rt=%#x, v=%d, ret=%m\n", l, rt, v, ret));
if (sm_is_err(ret) || l != 4 ||
rt != RT_Q2C_ID || v != c2q_ctx->c2q_sc_ctx->scc_cnf.sc_cnf_id)
goto errdec;
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
if (sm_is_err(ret) || l != 4 || rt != RT_Q2C_SE_FLAGS)
goto errdec;
/* XXX do something with those flags... */
se_flags = v;
#ifdef RT_Q2C_DCID
/* RT_Q2C_DCID: delivery class (cur. unused) */
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
SC_LEV_DPRINTF(sc_ctx, 6, (smioerr, "sev=DBG, func=sc_rcb_from_qmgr, where=delivery-class, l=%d, rt=%#x, v=%d, ret=%m\n", l, rt, v, ret));
if (sm_is_err(ret) || l != 4 || rt != RT_Q2C_DCID)
goto errdec;
#endif /* RT_Q2C_DCID */
ret = sm_rcb_get2uint32(rcb, &l, &rt);
SC_LEV_DPRINTF(sc_ctx, 6, (smioerr, "sev=DBG, func=sc_rcb_from_qmgr, where=sc_sess-id, l=%d, rt=%#x, ret=%m\n", l, rt, ret));
if (l != SMTP_STID_SIZE ||
(rt != RT_Q2C_NSEID && rt != RT_Q2C_SEID &&
rt != RT_Q2C_CSEID &&
rt != RT_Q2C_NSEID_1TA && rt != RT_Q2C_SEID_1TA))
goto errdec;
/* session id */
ret = sm_rcb_getn(rcb, (uchar *) sess_id, l);
if (sm_is_err(ret)) goto errdec;
if (RT_Q2C_CSEID == rt) {
ret = sm_rcb_get2uint32(rcb, &l, &rt);
SC_LEV_DPRINTF(sc_ctx, 6, (smioerr, "sev=DBG, func=sc_rcb_from_qmgr, where=csc_sess-id, l=%d, rt=%#x, ret=%m\n", l, rt, ret));
if (l != SMTP_STID_SIZE ||
(rt != RT_Q2C_NSEID && rt != RT_Q2C_SEID &&
rt != RT_Q2C_NSEID_1TA && rt != RT_Q2C_SEID_1TA))
goto errdec;
/* session id */
ret = sm_rcb_getn(rcb, (uchar *) sess_id2, l);
if (sm_is_err(ret)) goto errdec;
}
/* use an existing session? */
if (RT_Q2C_SEID == rt || RT_Q2C_SEID_1TA == rt) {
sc_sess = sc_find_sess_rq(c2q_ctx, sess_id);
SC_LEV_DPRINTF(sc_ctx, 6, (smioerr,
"sc_rcb_from_qmgr: found sc_sess=%lx, id=%s\n",
(long) sc_sess, sess_id));
if (NULL == sc_sess && (RT_Q2C_SEID == rt || RT_Q2C_SEID_1TA == rt))
{
sm_log_write(sc_ctx->scc_lctx,
SC_LCAT_INTERN, SC_LMOD_INTERN,
SM_LOG_WARN, 6,
"sev=WARN, func=sc_rcb_from_qmgr, se_id=%s, status=not_found, action=continue_as_if_NSEID, rt=%#x",
sess_id, rt);
rt = (RT_Q2C_SEID == rt) ? RT_Q2C_NSEID : RT_Q2C_NSEID_1TA;
/* see below... */
}
else { /* (sc_sess != NULL || rt != RT_Q2C_SEID) */
if (NULL == sc_sess) {
sm_log_write(sc_ctx->scc_lctx,
SC_LCAT_INTERN, SC_LMOD_INTERN,
SM_LOG_ERR, 6,
"sev=ERROR, func=sc_rcb_from_qmgr, se_id=%s, status=not_found",
sess_id);
goto errdec;
}
sc_t_ctx = sc_sess->scse_sct_ctx;
if (NULL == sc_t_ctx) {
sm_log_write(sc_ctx->scc_lctx,
SC_LCAT_INTERN, SC_LMOD_INTERN,
SM_LOG_ERR, 6,
"sev=ERROR, func=sc_rcb_from_qmgr, where=reuse_session, sc_t_ctx=NULL");
goto errdec;
}
SM_IS_SC_T_CTX(sc_t_ctx);
if (SC_T_BUSY == sc_t_ctx->sct_status ||
SC_T_NOTRDY == sc_t_ctx->sct_status)
{
sm_log_write(sc_ctx->scc_lctx,
SC_LCAT_INTERN, SC_LMOD_INTERN,
SM_LOG_ERR, 6,
"sev=ERROR, func=sc_rcb_from_qmgr, where=reuse_session, sc_t_ctx=%p, stat=%u",
sc_t_ctx, sc_t_ctx->sct_status);
goto errdec;
}
if (sc_t_ctx->sct_status != SC_T_IDLE) {
sm_log_write(sc_ctx->scc_lctx,
SC_LCAT_INTERN, SC_LMOD_INTERN,
SM_LOG_WARN, 9,
"sev=WARN, func=sc_rcb_from_qmgr, where=reuse_session, sc_t_ctx=%p, stat=%u",
sc_t_ctx, sc_t_ctx->sct_status);
}
if (SC_T_IDLE == sc_t_ctx->sct_status) {
SM_ASSERT(SC_IDLE_THREADS(sc_ctx) > 0);
SC_IDLE_THREADS(sc_ctx)--;
#if SC_STATS
SC_SE_REUSE(sc_ctx)++;
#endif
}
if (SC_T_CLOSING == sc_t_ctx->sct_status) {
/*
** Note: this blocks this thread and hence
** no new tasks can be received.
** Possible solutions:
** 1. use alternative implementation
** (each thread does its own task receiption)
** 2. just use a different sc_t_ctx:
** the current one is closing anyway,
** so why reuse it?
** a) because any upper limit on the number
** of open connections won't be exceeded
** b) there might not be a free thread
*/
/* inform thread that we want to reuse it */
sc_t_ctx->sct_status = SC_T_REUSE;
/* CONF timeout */
r = st_cond_timedwait(c2q_ctx->c2q_cond_rd, SEC2USEC(5));
}
if (SC_T_REUSE == sc_t_ctx->sct_status) {
/*
** session not closed: "fall through" to
** get a new thread.
*/
sm_log_write(sc_ctx->scc_lctx,
SC_LCAT_CLIENT, SC_LMOD_CLIENT,
SM_LOG_WARN, 4,
"sev=WARN, func=sc_rcb_from_qmgr, sess=%p, se_id=%s, status=still_closing, busy=%u, wait=%u, idle=%u, closing=%u"
, sc_sess
, sc_sess->scse_id
, SC_BUSY_THREADS(sc_ctx)
, SC_WAIT_THREADS(sc_ctx)
, SC_IDLE_THREADS(sc_ctx)
, SC_CLOSING_THREADS(sc_ctx)
);
rt = (RT_Q2C_SEID == rt) ? RT_Q2C_NSEID : RT_Q2C_NSEID_1TA;
}
else {
sm_log_write(sc_ctx->scc_lctx,
SC_LCAT_CLIENT, SC_LMOD_CLIENT,
SM_LOG_INFO, 9,
"sev=INFO, func=sc_rcb_from_qmgr, sess=%p, se_id=%s, status=%u, busy=%u, wait=%u, idle=%u, closing=%u, rt=%#x"
, sc_sess
, sc_sess->scse_id
, sc_t_ctx->sct_status
, SC_BUSY_THREADS(sc_ctx)
, SC_WAIT_THREADS(sc_ctx)
, SC_IDLE_THREADS(sc_ctx)
, SC_CLOSING_THREADS(sc_ctx)
, rt
);
sc_t_ctx->sct_status = SC_T_BUSY;
/* check sc_sess->scse_state */
/* XXX RT_Q2C_TAID_CS doesn't seem to be possible? */
if (RT_Q2C_CSEID == rt || RT_Q2C_TAID_CS == rt)
SCSE_SET_FLAG(sc_sess, SCSE_FL_CLOSE);
}
}
}
/* XXX "fall through" when SEID but session already closed? */
if (RT_Q2C_NSEID == rt || RT_Q2C_NSEID_1TA == rt) {
uint tried;
for (tried = 0;;) {
sc_t_ctx = sc_get_free_thr(sc_ctx);
if (sc_t_ctx != NULL)
break;
sm_log_write(sc_ctx->scc_lctx,
SC_LCAT_INTERN, SC_LMOD_INTERN,
SM_LOG_WARN, 8,
"sev=WARN, func=sc_rcb_from_qmgr, sc_get_free_thr=failed, id=%s, waitthr=%u, idle=%u, closing=%u, busy=%u, total=%u, max=%u, tried=%u"
, sess_id
, SC_WAIT_THREADS(sc_ctx)
, SC_IDLE_THREADS(sc_ctx)
, SC_CLOSING_THREADS(sc_ctx)
, SC_BUSY_THREADS(sc_ctx)
, SC_TOTAL_THREADS(sc_ctx)
, SC_MAX_THREADS(sc_ctx)
, tried);
SM_ASSERT(SC_WAIT_THREADS(sc_ctx) >= SC_IDLE_THREADS(sc_ctx));
if (SC_TOTAL_THREADS(sc_ctx) < SC_MAX_THREADS(sc_ctx)
&& !SC_IS_FLAG(sc_ctx, SCC_FL_SHUTDOWN))
{
/* Create a thread */
if (st_thread_create(sc_hdl_requests, (void *) sc_ctx, 0, 0)
== NULL)
{
sm_log_write(sc_ctx->scc_lctx,
SC_LCAT_INTERN, SC_LMOD_INTERN,
SM_LOG_ERR, 1,
"sev=ERROR, func=sc_rcb_from_qmgr, status=cannot_create_thread, func=sc_rcb_from_qmgr, sc_get_free_thr=failed, id=%s, maxthr=%u, wait=%u, min_wait=%u, idle=%u, closing=%u, busy=%u, total=%u, tried=%u, errno=%d",
sess_id,
SC_MAX_THREADS(sc_ctx),
SC_WAIT_THREADS(sc_ctx),
SC_MIN_WAIT_THREADS(sc_ctx),
SC_IDLE_THREADS(sc_ctx),
SC_CLOSING_THREADS(sc_ctx),
SC_BUSY_THREADS(sc_ctx),
SC_TOTAL_THREADS(sc_ctx),
tried, errno);
}
else {
SC_WAIT_THREADS(sc_ctx)++;
/* let sc_hdl_requests() start*/
st_sleep(0);
sc_t_ctx = sc_get_free_thr(sc_ctx);
sm_log_write(sc_ctx->scc_lctx,
SC_LCAT_INTERN, SC_LMOD_INTERN,
SM_LOG_INFO, 14,
"sev=INFO, func=sc_rcb_form_qmgr, wait=%u, idle=%u, closing=%u, busy=%u, sc_hdl_requests=created, sc_t_ctx=%p"
, SC_WAIT_THREADS(sc_ctx)
, SC_IDLE_THREADS(sc_ctx)
, SC_CLOSING_THREADS(sc_ctx)
, SC_BUSY_THREADS(sc_ctx)
, sc_t_ctx
);
if (sc_t_ctx != NULL)
break;
}
}
if (SC_TOTAL_THREADS(sc_ctx) >= SC_MAX_THREADS(sc_ctx)
|| tried > SC_MAX_THREADS(sc_ctx))
{
sm_log_write(sc_ctx->scc_lctx,
SC_LCAT_INTERN, SC_LMOD_INTERN,
SM_LOG_ERR, 4,
"sev=ERROR, func=sc_rcb_from_qmgr, sc_get_free_thr=failed"
", id=%s, max=%u, wait=%u, min_wait=%u"
", idle=%u, closing=%u, busy=%u, total=%u, tried=%u"
, sess_id, SC_MAX_THREADS(sc_ctx)
, SC_WAIT_THREADS(sc_ctx)
, SC_MIN_WAIT_THREADS(sc_ctx)
, SC_IDLE_THREADS(sc_ctx)
, SC_CLOSING_THREADS(sc_ctx)
, SC_BUSY_THREADS(sc_ctx)
, SC_TOTAL_THREADS(sc_ctx)
, tried);
#if SC_DEBUG
for (r = 0; r < SC_MAX_THREADS(sc_ctx); r++) {
sc_t_ctx = (sc_ctx->scc_scts)[r];
sm_io_fprintf(smioerr,
"i=%3d, sc_t_ctx=%p, status=%u\n"
, r, sc_t_ctx
, sc_t_ctx != NULL ? sc_t_ctx->sct_status : 9
, (sc_t_ctx != NULL && sc_t_ctx->sct_sess != NULL) ?
sc_t_ctx->sct_sess->scse_id : "-"
);
}
SM_ASSERT(0);
#endif /* SC_DEBUG */
/* fixme: Better error code... */
ret = sm_error_temp(SM_EM_SMTPC, ENOMEM);
goto errslow;
}
/* let sc_hdl_request() do something */
st_sleep(0);
++tried;
}
#if 0
SM_ASSERT(sc_t_ctx != NULL);
#endif
SC_LEV_DPRINTF(sc_ctx, 2, (smioerr, "sc_rcb_from_qmgr: sc_get_free_thr=%u, wait=%u, busy=%u, id=%s, sc_sess=%p\n",
sc_t_ctx->sct_thr_id, SC_WAIT_THREADS(sc_ctx), SC_BUSY_THREADS(sc_ctx), sess_id, (long) sc_t_ctx->sct_sess));
sc_sess = sc_t_ctx->sct_sess;
sm_log_write(sc_ctx->scc_lctx,
SC_LCAT_CLIENT, SC_LMOD_CLIENT,
SM_LOG_DEBUG, 11,
"sev=DBG, func=sc_rcb_from_qmgr, where=new_task, sess=%p, se_id=%s, status=%u, busy=%u, wait=%u, idle=%u, closing=%u"
, sc_sess, sess_id
, sc_t_ctx->sct_status
, SC_BUSY_THREADS(sc_ctx)
, SC_WAIT_THREADS(sc_ctx)
, SC_IDLE_THREADS(sc_ctx)
, SC_CLOSING_THREADS(sc_ctx)
);
sc_t_ctx->sct_status = SC_T_BUSY;
ret = sc_add_sess_rq(sc_sess, c2q_ctx);
if (sm_is_err(ret)) {
sm_log_write(sc_ctx->scc_lctx,
SC_LCAT_INTERN, SC_LMOD_INTERN,
SM_LOG_ERR, 5,
"sev=ERROR, func=sc_rcb_from_qmgr, se_id=%s, sc_add_sess_rq=%m",
sess_id, ret);
sc_t_ctx->sct_status = SC_T_FREE;
goto errdec;
}
SESSTA_COPY(sc_sess->scse_id, sess_id);
/*
** sc_get_free_thr() must return a thread context
** with an initialized sc_sess.
*/
SCSE_INHERIT_FLAG(sc_sess);
sc_sess->scse_cap = SCSE_CAP_NONE;
/* only one shot? */
if (RT_Q2C_SEID_1TA == rt || RT_Q2C_NSEID_1TA == rt)
SCSE_SET_FLAG(sc_sess, SCSE_FL_LAST_TA);
}
/* sc_t_ctx != NULL && sc_sess != NULL */
SM_IS_SC_T_CTX(sc_t_ctx);
SM_IS_SC_SE(sc_sess);
if (!SM_IS_FLAG(v, DA_FL_SE_KEEP))
SCSE_SET_FLAG(sc_sess, SCSE_FL_LAST_TA);
/*
** Just "exchange" our RCB and the one of the session
** Notice: this requires that there is only one outstanding
** request per session. Otherwise we may exchange the RCB
** while the session is using it (which can only happen if
** the RCB is used across a scheduling point).
*/
rcbt = sc_t_ctx->sct_rcb;
SM_ASSERT(rcbt != NULL);
sc_t_ctx->sct_rcb = rcb;
rcb = rcbt;
/* notify session */
r = st_cond_signal(sc_t_ctx->sct_cond_rd);
if (r != 0) {
sm_log_write(sc_ctx->scc_lctx,
SC_LCAT_INTERN, SC_LMOD_INTERN,
SM_LOG_ERR, 6,
"sev=ERROR, func=sc_rcb_from_qmgr, st_cond_signal=%d",
r);
goto errdec;
}
/*
** Do NOT close the RCB: we exchanged it and we should
** have now a "clean" (closed) one.
*/
continue;
errslow:
/*
** Can't process request from QMGR due to resource shortage.
** Need to tell qmgr about the problem, at least:
** task could not be accepted due to resource
** problems. It might be nice to tell it also:
** only N total threads will be available from
** now on (how to increase that number later on again?)
*/
ret = sm_rcb_putrec(c2q_ctx->c2q_notify_rcb,
RCB_PUTR_DFLT, 0, -1,
SM_RCBV_INT, RT_PROT_VER, PROT_VER_RT,
SM_RCBV_INT, RT_C2Q_ID, c2q_ctx->c2q_sc_ctx->scc_cnf.sc_cnf_id,
SM_RCBV_INT, RT_C2Q_STAT, QSC_ST_SLOW_2,
SM_RCBV_INT, RT_C2Q_MAXTHR, SC_TOTAL_THREADS(sc_ctx),
SM_RCBV_BUF, RT_C2Q_SEID, sess_id, SMTP_STID_SIZE,
SM_RCBV_INT2, RT_C2Q_SESTAT, ret, DA_SE_ERR_CRT_I,
SM_RCBV_END);
if (!sm_is_err(ret)) {
ret = sc_rcb_send(c2q_ctx->c2q_notify_rcb, c2q_ctx);
if (sm_is_err(ret))
sm_log_write(sc_ctx->scc_lctx,
SC_LCAT_COMM, SC_LMOD_COMM,
SM_LOG_ERR, 6,
"sev=ERROR, func=sc_rcb_from_qmgr, sc_rcb_send=%m",
ret);
}
else
sm_log_write(sc_ctx->scc_lctx,
SC_LCAT_COMM, SC_LMOD_COMM,
SM_LOG_ERR, 6,
"sev=ERROR, func=sc_rcb_from_qmgr, sm_rcb_putrec=%m",
ret);
errdec:
sm_log_write(sc_ctx->scc_lctx,
SC_LCAT_COMM, SC_LMOD_COMM,
SM_LOG_ERR, 6,
"sev=ERROR, func=sc_rcb_from_qmgr, l=%d, rt=%#x, ret=%m",
l, rt, ret);
ret = sm_rcb_close_dec(rcb);
if (sm_is_err(ret))
{
/* COMPLAIN */
continue;
}
}
goto error;
error:
if (rcb != NULL)
sm_rcb_free(rcb);
return NULL;
}
/*
** C2Q_CONNECT -- connect to server
**
** Parameters:
** sc_ctx -- SMTPC context
** c2q_ctx -- C2Q context
**
** Returns:
** usual sm_error code
*/
static sm_ret_T
c2q_connect(sc_ctx_P sc_ctx, c2q_ctx_P c2q_ctx)
{
sm_ret_T ret;
sm_rcb_P rcb;
time_t time1;
SM_IS_C2Q_CTX(c2q_ctx);
SM_IS_SC_CTX(sc_ctx);
time1 = st_time();
for (;;) {
ret = un_st_client_connect(sc_ctx->scc_cnf.sc_cnf_smtpcsock_abs,
SEC2USEC(sc_ctx->scc_cnf.sc_cnf_wait4srv), &c2q_ctx->c2q_fd);
if (sm_is_err(ret) &&
time1 + sc_ctx->scc_cnf.sc_cnf_wait4srv > st_time())
{
st_sleep(1);
}
else
break;
}
if (sm_is_err(ret)) {
sm_log_write(sc_ctx->scc_lctx,
SC_LCAT_INIT, SC_LMOD_TO_QMGR,
SM_LOG_ERR, 1,
"sev=ERROR, func=c2q_connect, socket=%s, connect=%m"
, sc_ctx->scc_cnf.sc_cnf_smtpcsock_abs, ret);
goto error;
}
rcb = c2q_ctx->c2q_notify_rcb;
ret = sm_rcb_putrec(rcb, RCB_PUTR_DFLT, 0, -1,
SM_RCBV_INT, RT_PROT_VER, PROT_VER_RT,
SM_RCBV_INT, RT_C2Q_NID, c2q_ctx->c2q_sc_ctx->scc_cnf.sc_cnf_id,
SM_RCBV_INT, RT_C2Q_MAXTHR, SC_MAX_THREADS(sc_ctx),
SM_RCBV_END);
if (sm_is_err(ret))
goto error;
ret = sc_rcb_send(rcb, c2q_ctx);
if (sm_is_err(ret))
goto error;
return SM_SUCCESS;
error:
/* cleanup? */
return ret;
}
/*
** C2Q_INIT -- initialize C2Q
**
** Parameters:
** sc_ctx -- SMTPC context
** c2q_ctx -- C2Q context
** max_ses -- maximum number of open sessions
**
** Returns:
** usual sm_error code
*/
sm_ret_T
c2q_init(sc_ctx_P sc_ctx, c2q_ctx_P c2q_ctx, uint max_ses)
{
sm_ret_T ret;
sm_rcb_P rcb;
st_thread_t rdtsk;
size_t n;
/* should we allocate the context here? */
SM_REQUIRE(c2q_ctx != NULL);
SM_REQUIRE(sc_ctx != NULL);
/* clear out data */
sm_memzero(c2q_ctx, sizeof(*c2q_ctx));
rcb = NULL;
c2q_ctx->c2q_sc_ctx = sc_ctx;
c2q_ctx->c2q_fd = INVALID_NETFD;
c2q_ctx->c2q_cond_rd = st_cond_new();
if (NULL == c2q_ctx->c2q_cond_rd) {
ret = sm_error_temp(SM_EM_SMTPC, errno);
goto error;
}
c2q_ctx->c2q_wr_mutex = st_mutex_new();
if (NULL == c2q_ctx->c2q_wr_mutex) {
ret = sm_error_temp(SM_EM_SMTPC, errno);
goto error;
}
rcb = sm_rcb_new(NULL, C2Q_RCB_SIZE, QSS_RC_MAXSZ);
if (NULL == rcb) goto errnomem;
c2q_ctx->c2q_notify_rcb = rcb;
c2q_ctx->c2q_max_ses = max_ses;
#if 0
c2q_ctx->c2q_cur_ses = 0;
#endif
n = max_ses * sizeof(*c2q_ctx->c2q_sess);
c2q_ctx->c2q_sess = (sc_sess_P *) sm_zalloc(n);
if (NULL == c2q_ctx->c2q_sess) goto errnomem;
c2q_ctx->sm_magic = SM_C2Q_CTX_MAGIC;
ret = c2q_connect(sc_ctx, c2q_ctx);
if (sm_is_err(ret)) goto error;
c2q_ctx->c2q_status = C2Q_ST_OK;
rdtsk = st_thread_create(sc_rcb_from_qmgr, (void *) c2q_ctx, 0, 0);
if (NULL == rdtsk) {
ret = sm_error_temp(SM_EM_SMTPC, errno);
goto error;
}
return SM_SUCCESS;
errnomem:
ret = sm_error_temp(SM_EM_SMTPC, ENOMEM);
error:
SM_FREE(c2q_ctx->c2q_sess);
if (rcb != NULL)
sm_rcb_free(rcb);
if (c2q_ctx->c2q_wr_mutex != NULL) {
st_mutex_destroy(c2q_ctx->c2q_wr_mutex);
c2q_ctx->c2q_wr_mutex = NULL;
}
if (c2q_ctx->c2q_cond_rd != NULL) {
st_cond_destroy(c2q_ctx->c2q_cond_rd);
c2q_ctx->c2q_cond_rd = NULL;
}
if (c2q_ctx->c2q_fd != INVALID_NETFD) {
(void) un_st_socket_close(c2q_ctx->c2q_fd);
c2q_ctx->c2q_fd = INVALID_NETFD;
}
return ret;
}
/*
** C2Q_CLOSE -- close connection to server
**
** Parameters:
** c2q_ctx -- C2Q context
**
** Returns:
** usual sm_error code
*/
static sm_ret_T
c2q_close(c2q_ctx_P c2q_ctx)
{
sm_ret_T ret;
SM_IS_C2Q_CTX(c2q_ctx);
if (c2q_ctx->c2q_fd != INVALID_NETFD) {
ret = un_st_socket_close(c2q_ctx->c2q_fd);
if (sm_is_err(ret)) goto error;
c2q_ctx->c2q_fd = INVALID_NETFD;
}
c2q_ctx->c2q_status = C2Q_ST_CLOSED;
return SM_SUCCESS;
error:
return ret;
}
/*
** C2Q_STOP -- stop C2Q
**
** Parameters:
** c2q_ctx -- C2Q context
**
** Returns:
** usual sm_error code
*/
sm_ret_T
c2q_stop(c2q_ctx_P c2q_ctx)
{
sm_ret_T ret;
SM_IS_C2Q_CTX(c2q_ctx);
ret = c2q_close(c2q_ctx);
if (c2q_ctx->c2q_wr_mutex != NULL) {
st_mutex_destroy(c2q_ctx->c2q_wr_mutex);
c2q_ctx->c2q_wr_mutex = NULL;
}
if (c2q_ctx->c2q_cond_rd != NULL) {
st_cond_destroy(c2q_ctx->c2q_cond_rd);
c2q_ctx->c2q_cond_rd = NULL;
}
SM_FREE(c2q_ctx->c2q_sess);
sm_memzero(c2q_ctx, sizeof(*c2q_ctx));
c2q_ctx->sm_magic = SM_MAGIC_NULL; /* not needed if this is 0 */
/* free c2q_ctx? only if allocated above! */
return ret;
}
syntax highlighted by Code2HTML, v. 0.9.1