/* * Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers. * All rights reserved. * * By using this file, you agree to the terms and conditions set * forth in the LICENSE file which can be found at the top level of * the sendmail distribution. */ /* ** SMTPC - QMGR communication module. */ #include "sm/generic.h" SM_RCSID("@(#)$Id: c2q.c,v 1.141 2007/10/23 03:57:39 ca Exp $") #include "sm/assert.h" #include "sm/error.h" #include "sm/memops.h" #include "sm/reccom.h" #include "statethreads/st.h" #include "sm/rcbst.h" #include "sm/fcntl.h" #include "sm/unixsock.h" #include "sm/stsock.h" #include "sm/stthreads.h" #include "sm/qmgrcomm.h" #include "sm/da.h" #include "smtpc.h" #include "c2q.h" #include "log.h" #include "sm/qmgr-int.h" /* HACK: for QSC_ST_SLOW_2 */ #if SC_DEBUG static void show_sess(c2q_ctx_P c2q_ctx) { int i; for (i = 0; i < c2q_ctx->c2q_max_ses; i++) { SC_LEV_DPRINTF(c2q_ctx->c2q_sc_ctx, 0, (smioerr, "c2q_sess[%d]=%lx, thread=%u, state=%u\n", i, (long) c2q_ctx->c2q_sess[i], c2q_ctx->c2q_sess[i] == NULL ? 9999u : c2q_ctx->c2q_sess[i]->scse_sct_ctx->sct_thr_id, c2q_ctx->c2q_sess[i] == NULL ? 9999u : c2q_ctx->c2q_sess[i]->scse_state)); } } #endif /* SC_DEBUG */ /* ** SC_GET_FREE_THR -- find free thread that can take care of a task from QMGR ** ** Parameters: ** sc_ctx -- SMTPC context ** ** Returns: ** pointer to thread context ** (NULL if no free threads is available) */ static sc_t_ctx_P sc_get_free_thr(sc_ctx_P sc_ctx) { uint i; sc_t_ctx_P sc_t_ctx; /* May want to use some round robin search?? */ for (i = 0; i < SC_MAX_THREADS(sc_ctx); i++) { if ((sc_t_ctx = (sc_ctx->scc_scts)[i]) != NULL && SC_T_FREE == sc_t_ctx->sct_status) { return sc_t_ctx; } } return NULL; } /* ** SC_RM_SESS_RQ -- remove a session from the list of active sessions ** ** Parameters: ** sc_sess -- session context ** c2q_ctx -- C2Q context ** ** Returns: ** usual sm_error code */ sm_ret_T sc_rm_sess_rq(sc_sess_P sc_sess, c2q_ctx_P c2q_ctx) { int i; SM_IS_C2Q_CTX(c2q_ctx); SM_IS_SC_SE(sc_sess); i = sc_sess->scse_c2q_idx; SC_LEV_DPRINTF(sc_sess->scse_sct_ctx->sct_sc_ctx, 5, (smioerr, "sc_rm_sess_rq: sc_sess=%lx, cq2_idx=%d\n", (long) sc_sess, i)); if (SCSE_C2Q_IDX_NONE == i) return SM_SUCCESS; SM_REQUIRE(i >= 0); SM_REQUIRE((uint)i < c2q_ctx->c2q_max_ses); c2q_ctx->c2q_sess[(uint)i] = NULL; return SM_SUCCESS; } /* ** SC_ADD_SESS_RQ -- add a session to the list of active sessions ** (see also sc_find_sess_rq()) ** ** Parameters: ** sc_sess -- session context ** c2q_ctx -- C2Q context ** ** Returns: ** usual sm_error code */ static sm_ret_T sc_add_sess_rq(sc_sess_P sc_sess, c2q_ctx_P c2q_ctx) { uint i; SM_IS_SC_SE(sc_sess); SM_IS_C2Q_CTX(c2q_ctx); SC_LEV_DPRINTF(sc_sess->scse_sct_ctx->sct_sc_ctx, 1, (smioerr, "sc_add_sess_rq: sc_sess=%lx\n", (long) sc_sess)); for (i = 0; i < c2q_ctx->c2q_max_ses; i++) { if (c2q_ctx->c2q_sess[i] == NULL || c2q_ctx->c2q_sess[i]->scse_state == SCSE_ST_NONE) { c2q_ctx->c2q_sess[i] = sc_sess; sc_sess->scse_c2q_idx = i; sc_sess->scse_state = SCSE_ST_NEW; return SM_SUCCESS; } } SC_LEV_DPRINTF(sc_sess->scse_sct_ctx->sct_sc_ctx, 0, (smioerr, "sc_add_sess_rq: FATAL: sc_sess=%lx, i=%d\n", (long) sc_sess, i)); #if SC_DEBUG show_sess(c2q_ctx); /* abort... */ SM_ASSERT(i < c2q_ctx->c2q_max_ses); #endif return sm_error_perm(SM_EM_SMTPC, SM_E_FULL); } /* ** SC_FIND_SESS_RQ -- find session id in the list of active sessions ** ** Parameters: ** c2q_ctx -- C2Q context ** sess_id -- session id ** ** Returns: ** session context (NULL if not found) */ static sc_sess_P sc_find_sess_rq(c2q_ctx_P c2q_ctx, sessta_id_P sess_id) { uint i; sc_sess_P sc_sess; SM_IS_C2Q_CTX(c2q_ctx); for (i = 0; i < c2q_ctx->c2q_max_ses; i++) { if (c2q_ctx->c2q_sess[i] != NULL && SESSTA_EQ(c2q_ctx->c2q_sess[i]->scse_id, sess_id)) { sc_sess = c2q_ctx->c2q_sess[i]; return sc_sess; } } return NULL; } /* ** SC_RCB_SEND -- send a (closed) RCB to QMGR ** ** Parameters: ** rcb -- RCB ** c2q_ctx -- C2Q context ** ** Returns: ** usual sm_error code */ sm_ret_T sc_rcb_send(sm_rcb_P rcb, c2q_ctx_P c2q_ctx) { sm_ret_T ret; int r; #if SC_TIMING st_utime_t uta, utb; #endif SM_IS_C2Q_CTX(c2q_ctx); if (C2Q_IS_IOERR(c2q_ctx)) return sm_error_temp(SM_EM_SMTPC, EIO); ret = sm_rcb_open_snd(rcb); if (sm_is_err(ret)) goto error; #if SC_TIMING utb = st_utime(); #endif /* make sure there aren't multiple sends */ r = st_mutex_lock(c2q_ctx->c2q_wr_mutex); if (r != 0) { SC_LEV_DPRINTF(c2q_ctx->c2q_sc_ctx, 0, (smioerr, "ERROR: sc_rcb_send: st_mutex_lock=%d, errno=%d\n", r, errno)); goto err1; } ret = sm_rcb_snd(c2q_ctx->c2q_fd, rcb, c2q_ctx->c2q_sc_ctx->scc_qmgr_tmo); r = st_mutex_unlock(c2q_ctx->c2q_wr_mutex); if (r != 0) { SC_LEV_DPRINTF(c2q_ctx->c2q_sc_ctx, 0, (smioerr, "ERROR: sc_rcb_send: st_mutex_unlock=%d, errno=%d\n", r, errno)); SM_ASSERT(0 == r); /* abort */ if (r != 0 && sm_is_success(ret)) ret = sm_error_perm(SM_EM_SMTPC, r); } if (sm_is_err(ret)) { C2Q_SET_IOERR(c2q_ctx); /* ** NOTE: we ask for a shutdown here as the communication ** with qmgr is broken. This is not the best place to do it, ** such a decision should be made by the upper layers. ** Moreover, a better solution is to reconnect to qmgr, ** but currently that is not worth the effort (as we don't ** know whether just the connection is lost or whether ** qmgr died; in the latter case smtpc should be restarted ** by mcp anyway). */ SC_SET_FLAG(c2q_ctx->c2q_sc_ctx, SCC_FL_SHUTDOWN); goto err1; } #if SC_TIMING uta = st_utime(); if (uta > utb) sm_io_fprintf(smioerr, "send=%lu\n", (unsigned long) (uta - utb)); #endif ret = sm_rcb_close_snd(rcb); if (sm_is_err(ret)) goto error; return ret; err1: (void) sm_rcb_close_snd(rcb); error: return ret; } /* ** SC_C2Q -- creates RCB for sending status back to QMGR; ** RCB is only sent for session status, not for TA status (needs to ** be done by caller: invoke sc_rcb_send()). ** ** Parameters: ** sc_t_ctx -- SMTPC thread context ** whichstatus -- session/transaction/... status? ** status -- status (simplified!!!) ** c2q_ctx -- C2Q context ** ** Returns: ** usual sm_error code */ sm_ret_T sc_c2q(sc_t_ctx_P sc_t_ctx, uint32_t whichstatus, int status, c2q_ctx_P c2q_ctx) { sm_ret_T ret; sc_sess_P sc_sess; uint32_t idt, err_state; sessta_id_P id; sc_ta_P sc_ta; bool rcptstats; SM_IS_C2Q_CTX(c2q_ctx); SM_IS_SC_T_CTX(sc_t_ctx); sc_sess = sc_t_ctx->sct_sess; SM_IS_SC_SE(sc_sess); SM_IS_RCB(sc_t_ctx->sct_rcb); sc_ta = NULL; (void) sm_rcb_close_decn(sc_t_ctx->sct_rcb); if (C2Q_IS_IOERR(c2q_ctx)) return sm_error_temp(SM_EM_SMTPC, EIO); ret = sm_rcb_open_enc(sc_t_ctx->sct_rcb, -1); if (sm_is_err(ret)) { SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 0, (smioerr, "sev=ERROR, func=sc_c2q, sm_rcb_open=%m\n", ret)); goto error; } /* ** Which status?? If whichstatus is something else than ** RT_C2Q_SESTAT and RT_C2Q_TASTAT, then we may need to send ** more data back? Or do we have to do that only in case of ** RT_C2Q_RCPT_ST? ** Also change recordtype and _id (session/ta/...) accordingly. */ rcptstats = false; if (RT_C2Q_SESTAT == whichstatus || RT_C2Q_SECLSD == whichstatus) { idt = RT_C2Q_SEID; id = sc_sess->scse_id; err_state = sc_sess->scse_err_st; } else { sc_ta = sc_sess->scse_ta; SM_IS_SC_TA(sc_ta); /* Close session? */ idt = SCSE_IS_FLAG(sc_sess, SCSE_FL_CLOSE) ? RT_C2Q_TAID_CS : RT_C2Q_TAID; id = sc_ta->scta_id; err_state = sc_ta->scta_err_state; if (DA_TA_ERR_RCPT_S == sc_ta->scta_err_state || SCTA_IS_STATE(sc_ta, SCTA_R_PERM|SCTA_R_TEMP)) { rcptstats = true; whichstatus = RT_C2Q_TARSTAT; } /* Other cases?? */ } ret = sm_rcb_putv(sc_t_ctx->sct_rcb, RCB_PUTV_FIRST, SM_RCBV_INT, RT_PROT_VER, PROT_VER_RT, SM_RCBV_INT, RT_C2Q_ID, c2q_ctx->c2q_sc_ctx->scc_cnf.sc_cnf_id, SM_RCBV_BUF, idt, id, SMTP_STID_SIZE, SM_RCBV_INT2, whichstatus, (uint32_t) status, err_state, SM_RCBV_END); if (sm_is_err(ret)) { SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 0, (smioerr, "sev=ERROR, func=sc_c2q, sm_rcb_putv-1=%m\n", ret)); goto err2; } if (RT_C2Q_SESTAT == whichstatus && sc_sess->scse_err_st != DA_SE_ERR_TTMYSLEF_S && sc_sess->scse_reply != NULL && sm_str_getlen(sc_sess->scse_reply) > 0) { ret = sm_rcb_putv(sc_t_ctx->sct_rcb, RCB_PUTV_NONE, SM_RCBV_STR, RT_C2Q_STATT, sc_sess->scse_reply, SM_RCBV_END); if (sm_is_err(ret)) { SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 0, (smioerr, "sev=ERROR, func=sc_c2q, sm_rcb_putv-2=%m\n", ret)); goto err2; } } if (rcptstats) { uint nrcpts; sc_rcpt_P sc_rcpt; sc_rcpts_P sc_rcpt_lst; nrcpts = 0; sc_rcpt_lst = &sc_ta->scta_rcpts; for (sc_rcpt = SC_RCPTS_FIRST(sc_rcpt_lst); sc_rcpt != SC_RCPTS_END(sc_rcpt_lst); sc_rcpt = SC_RCPTS_NEXT(sc_rcpt)) { if (SMTP_OK == sc_rcpt->scr_st) continue; ++nrcpts; } SM_ASSERT(nrcpts > 0); ret = sm_rcb_putv(sc_t_ctx->sct_rcb, RCB_PUTV_NONE, SM_RCBV_INT, RT_C2Q_RCPT_N, nrcpts, SM_RCBV_END); if (sm_is_err(ret)) { SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 0, (smioerr, "sev=ERROR, func=sc_c2q, sm_rcb_putv-3=%m\n", ret)); goto err2; } sc_rcpt_lst = &sc_ta->scta_rcpts; for (sc_rcpt = SC_RCPTS_FIRST(sc_rcpt_lst); sc_rcpt != SC_RCPTS_END(sc_rcpt_lst); sc_rcpt = SC_RCPTS_NEXT(sc_rcpt)) { if (SMTP_OK == sc_rcpt->scr_st) continue; SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 3, (smioerr, "func=sc_c2q, where=rcpt, stat=%d, len=%u, text=%S\n", sc_rcpt->scr_st, NULL == sc_rcpt->scr_reply ? 0 : sm_str_getlen(sc_rcpt->scr_reply), sc_rcpt->scr_reply)); ret = sm_rcb_putv(sc_t_ctx->sct_rcb, RCB_PUTV_NONE, SM_RCBV_INT, RT_C2Q_RCPT_IDX, sc_rcpt->scr_idx, SM_RCBV_INT, RT_C2Q_RCPT_ST, (uint32_t) sc_rcpt->scr_st, SM_RCBV_STR, RT_C2Q_RCPT_STT, sc_rcpt->scr_reply, SM_RCBV_END); if (sm_is_err(ret)) { SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 0, (smioerr, "sev=ERROR, func=sc_c2q, sm_rcb_putv-4=%m\n", ret)); goto err2; } SM_ASSERT(nrcpts > 0); --nrcpts; } SM_ASSERT(0 == nrcpts); } else if (sc_ta != NULL && DA_TA_ERR_MAIL_S == sc_ta->scta_err_state && sc_ta->scta_mail->scm_reply != NULL) { ret = sm_rcb_putv(sc_t_ctx->sct_rcb, RCB_PUTV_NONE, SM_RCBV_STR, RT_C2Q_STATT, sc_ta->scta_mail->scm_reply, SM_RCBV_END); if (sm_is_err(ret)) { SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 0, (smioerr, "sev=ERROR, func=sc_c2q, sm_rcb_putv-5=%m\n", ret)); goto err2; } } else if (sc_ta != NULL && (DA_TA_ERR_DATA_S == sc_ta->scta_err_state || DA_TA_ERR_DOT_S == sc_ta->scta_err_state) && sc_ta->scta_reply != NULL) { /* scta_reply is currently only set after DATA */ ret = sm_rcb_putv(sc_t_ctx->sct_rcb, RCB_PUTV_NONE, SM_RCBV_STR, RT_C2Q_STATT, sc_ta->scta_reply, SM_RCBV_END); if (sm_is_err(ret)) { SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 0, (smioerr, "sev=ERROR, func=sc_c2q, sm_rcb_putv-6=%m\n" , ret)); goto err2; } } /* other error messages, e.g., from DATA, final dot? */ ret = sm_rcb_close_enc(sc_t_ctx->sct_rcb); if (sm_is_err(ret)) goto err2; if (RT_C2Q_SESTAT == whichstatus || RT_C2Q_SECLSD == whichstatus) { ret = sc_rcb_send(sc_t_ctx->sct_rcb, c2q_ctx); if (sm_is_err(ret)) goto err2; } if (NULL == sc_ta) SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 3, (smioerr, "sev=DBG, func=sc_c2q, whichstatus=%#x, stat=%d\n", whichstatus, status)); else SC_LEV_DPRINTF(sc_t_ctx->sct_sc_ctx, 3, (smioerr, "sev=DBG, func=sc_c2q, whichstatus=%#x, stat=%d, err_st=%#x, ta_state=%#x\n", whichstatus, status, sc_ta->scta_err_state, sc_ta->scta_state)); return ret; error: (void) sm_rcb_close_n(sc_t_ctx->sct_rcb); err2: sm_log_write(sc_t_ctx->sct_sc_ctx->scc_lctx, SC_LCAT_INTERN, SC_LMOD_INTERN, SM_LOG_ERR, 6, "sev=ERROR, func=sc_c2q, ret=%m", ret); return ret; } /* ** SC_RCB_FROM_QMGR -- receive an RCB from QMGR, notify thread (session) ** This runs permanently as a thread. ** ** Parameters: ** arg -- C2Q context ** ** Returns: ** NULL on termination */ static void * sc_rcb_from_qmgr(void *arg) { uint32_t v, l, rt, tl, se_flags; int r; sm_ret_T ret; sm_rcb_P rcb, rcbt; c2q_ctx_P c2q_ctx; sessta_id_T sess_id, sess_id2; sc_sess_P sc_sess; sc_t_ctx_P sc_t_ctx; sc_ctx_P sc_ctx; SM_REQUIRE(arg != NULL); c2q_ctx = (c2q_ctx_P) arg; SM_IS_C2Q_CTX(c2q_ctx); sc_ctx = c2q_ctx->c2q_sc_ctx; rcb = sm_rcb_new(NULL, QSS_RC_SZ, QSS_RC_MAXSZ); if (NULL == rcb) { SC_SET_FLAG(sc_ctx, SCC_FL_SHUTDOWN); /* currently unused as nobody checks a return value */ ret = sm_error_temp(SM_EM_SMTPC, ENOMEM); goto error; } sc_t_ctx = NULL; /* avoid bogus compiler warning */ /* ** Don't start running before at least one thread is ready ** and has initialized its data. ** Slight abuse of condition variable... */ while (!SC_IS_FLAG(sc_ctx, SCC_FL_COMMOK)) { /* CONF timeout */ r = st_cond_timedwait(c2q_ctx->c2q_cond_rd, SEC2USEC(10)); #if 0 if (0 == r) /* complain? */ ; #endif } while (!SC_IS_FLAG(sc_ctx, SCC_FL_SHUTDOWN)) { sc_sess = NULL; ret = sm_rcb_open_rcv(rcb); if (sm_is_err(ret)) { sm_log_write(sc_ctx->scc_lctx, SC_LCAT_COMM, SC_LMOD_COMM, SM_LOG_ERR, 6, "sev=ERROR, func=sc_rcb_from_qmgr, sm_rcb_open_rcv=%m", ret); goto error; } /* note: no timeout, this will just wait for an RCB */ ret = sm_rcb_rcv(c2q_ctx->c2q_fd, rcb, 12, (st_utime_t) -1); if (sm_is_err(ret)) { if (!E_IS_TEMP(sm_error_value(ret))) { if (ret != SM_IO_EOF) { sm_log_write(sc_ctx->scc_lctx, SC_LCAT_COMM, SC_LMOD_COMM, SM_LOG_ERR, 6, "sev=ERROR, func=sc_rcb_from_qmgr, sm_rcb_rcv=%m", ret); } goto error; } ret = sm_rcb_close_rcv(rcb); if (sm_is_err(ret)) goto error; continue; } ret = sm_rcb_close_rcv(rcb); if (sm_is_err(ret)) goto error; /* ** format: ** RT_Q2C_ID: int smtpc-id ** either RT_Q2C_*SEID: str session-id ** or RT_Q2C_*TAID: str transaction-id */ ret = sm_rcb_open_dec(rcb); if (sm_is_err(ret)) { sm_log_write(sc_ctx->scc_lctx, SC_LCAT_INTERN, SC_LMOD_INTERN, SM_LOG_ERR, 6, "sev=ERROR, func=sc_rcb_from_qmgr, sm_rcb_open_dec=%m", ret); continue; } /* total length of record */ ret = sm_rcb_getuint32(rcb, &tl); SC_LEV_DPRINTF(sc_ctx, 6, (smioerr, "sev=DBG, func=sc_rcb_from_qmgr, tl=%d, ret=%m\n", tl, ret)); if (sm_is_err(ret) || tl > QSS_RC_MAXSZ) goto errdec; /* protocol header: version */ ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); SC_LEV_DPRINTF(sc_ctx, 6, (smioerr, "sev=DBG, func=sc_rcb_from_qmgr, where=prot, l=%d, rt=%#x, v=%d, ret=%m\n", l, rt, v, ret)); if (sm_is_err(ret) || l != 4 || rt != RT_PROT_VER || v != PROT_VER_RT) goto errdec; /* RT_Q2C_ID: int smtpc-id */ ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); SC_LEV_DPRINTF(sc_ctx, 6, (smioerr, "sev=DBG, func=sc_rcb_from_qmgr, where=smtpc-id, l=%d, rt=%#x, v=%d, ret=%m\n", l, rt, v, ret)); if (sm_is_err(ret) || l != 4 || rt != RT_Q2C_ID || v != c2q_ctx->c2q_sc_ctx->scc_cnf.sc_cnf_id) goto errdec; ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); if (sm_is_err(ret) || l != 4 || rt != RT_Q2C_SE_FLAGS) goto errdec; /* XXX do something with those flags... */ se_flags = v; #ifdef RT_Q2C_DCID /* RT_Q2C_DCID: delivery class (cur. unused) */ ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); SC_LEV_DPRINTF(sc_ctx, 6, (smioerr, "sev=DBG, func=sc_rcb_from_qmgr, where=delivery-class, l=%d, rt=%#x, v=%d, ret=%m\n", l, rt, v, ret)); if (sm_is_err(ret) || l != 4 || rt != RT_Q2C_DCID) goto errdec; #endif /* RT_Q2C_DCID */ ret = sm_rcb_get2uint32(rcb, &l, &rt); SC_LEV_DPRINTF(sc_ctx, 6, (smioerr, "sev=DBG, func=sc_rcb_from_qmgr, where=sc_sess-id, l=%d, rt=%#x, ret=%m\n", l, rt, ret)); if (l != SMTP_STID_SIZE || (rt != RT_Q2C_NSEID && rt != RT_Q2C_SEID && rt != RT_Q2C_CSEID && rt != RT_Q2C_NSEID_1TA && rt != RT_Q2C_SEID_1TA)) goto errdec; /* session id */ ret = sm_rcb_getn(rcb, (uchar *) sess_id, l); if (sm_is_err(ret)) goto errdec; if (RT_Q2C_CSEID == rt) { ret = sm_rcb_get2uint32(rcb, &l, &rt); SC_LEV_DPRINTF(sc_ctx, 6, (smioerr, "sev=DBG, func=sc_rcb_from_qmgr, where=csc_sess-id, l=%d, rt=%#x, ret=%m\n", l, rt, ret)); if (l != SMTP_STID_SIZE || (rt != RT_Q2C_NSEID && rt != RT_Q2C_SEID && rt != RT_Q2C_NSEID_1TA && rt != RT_Q2C_SEID_1TA)) goto errdec; /* session id */ ret = sm_rcb_getn(rcb, (uchar *) sess_id2, l); if (sm_is_err(ret)) goto errdec; } /* use an existing session? */ if (RT_Q2C_SEID == rt || RT_Q2C_SEID_1TA == rt) { sc_sess = sc_find_sess_rq(c2q_ctx, sess_id); SC_LEV_DPRINTF(sc_ctx, 6, (smioerr, "sc_rcb_from_qmgr: found sc_sess=%lx, id=%s\n", (long) sc_sess, sess_id)); if (NULL == sc_sess && (RT_Q2C_SEID == rt || RT_Q2C_SEID_1TA == rt)) { sm_log_write(sc_ctx->scc_lctx, SC_LCAT_INTERN, SC_LMOD_INTERN, SM_LOG_WARN, 6, "sev=WARN, func=sc_rcb_from_qmgr, se_id=%s, status=not_found, action=continue_as_if_NSEID, rt=%#x", sess_id, rt); rt = (RT_Q2C_SEID == rt) ? RT_Q2C_NSEID : RT_Q2C_NSEID_1TA; /* see below... */ } else { /* (sc_sess != NULL || rt != RT_Q2C_SEID) */ if (NULL == sc_sess) { sm_log_write(sc_ctx->scc_lctx, SC_LCAT_INTERN, SC_LMOD_INTERN, SM_LOG_ERR, 6, "sev=ERROR, func=sc_rcb_from_qmgr, se_id=%s, status=not_found", sess_id); goto errdec; } sc_t_ctx = sc_sess->scse_sct_ctx; if (NULL == sc_t_ctx) { sm_log_write(sc_ctx->scc_lctx, SC_LCAT_INTERN, SC_LMOD_INTERN, SM_LOG_ERR, 6, "sev=ERROR, func=sc_rcb_from_qmgr, where=reuse_session, sc_t_ctx=NULL"); goto errdec; } SM_IS_SC_T_CTX(sc_t_ctx); if (SC_T_BUSY == sc_t_ctx->sct_status || SC_T_NOTRDY == sc_t_ctx->sct_status) { sm_log_write(sc_ctx->scc_lctx, SC_LCAT_INTERN, SC_LMOD_INTERN, SM_LOG_ERR, 6, "sev=ERROR, func=sc_rcb_from_qmgr, where=reuse_session, sc_t_ctx=%p, stat=%u", sc_t_ctx, sc_t_ctx->sct_status); goto errdec; } if (sc_t_ctx->sct_status != SC_T_IDLE) { sm_log_write(sc_ctx->scc_lctx, SC_LCAT_INTERN, SC_LMOD_INTERN, SM_LOG_WARN, 9, "sev=WARN, func=sc_rcb_from_qmgr, where=reuse_session, sc_t_ctx=%p, stat=%u", sc_t_ctx, sc_t_ctx->sct_status); } if (SC_T_IDLE == sc_t_ctx->sct_status) { SM_ASSERT(SC_IDLE_THREADS(sc_ctx) > 0); SC_IDLE_THREADS(sc_ctx)--; #if SC_STATS SC_SE_REUSE(sc_ctx)++; #endif } if (SC_T_CLOSING == sc_t_ctx->sct_status) { /* ** Note: this blocks this thread and hence ** no new tasks can be received. ** Possible solutions: ** 1. use alternative implementation ** (each thread does its own task receiption) ** 2. just use a different sc_t_ctx: ** the current one is closing anyway, ** so why reuse it? ** a) because any upper limit on the number ** of open connections won't be exceeded ** b) there might not be a free thread */ /* inform thread that we want to reuse it */ sc_t_ctx->sct_status = SC_T_REUSE; /* CONF timeout */ r = st_cond_timedwait(c2q_ctx->c2q_cond_rd, SEC2USEC(5)); } if (SC_T_REUSE == sc_t_ctx->sct_status) { /* ** session not closed: "fall through" to ** get a new thread. */ sm_log_write(sc_ctx->scc_lctx, SC_LCAT_CLIENT, SC_LMOD_CLIENT, SM_LOG_WARN, 4, "sev=WARN, func=sc_rcb_from_qmgr, sess=%p, se_id=%s, status=still_closing, busy=%u, wait=%u, idle=%u, closing=%u" , sc_sess , sc_sess->scse_id , SC_BUSY_THREADS(sc_ctx) , SC_WAIT_THREADS(sc_ctx) , SC_IDLE_THREADS(sc_ctx) , SC_CLOSING_THREADS(sc_ctx) ); rt = (RT_Q2C_SEID == rt) ? RT_Q2C_NSEID : RT_Q2C_NSEID_1TA; } else { sm_log_write(sc_ctx->scc_lctx, SC_LCAT_CLIENT, SC_LMOD_CLIENT, SM_LOG_INFO, 9, "sev=INFO, func=sc_rcb_from_qmgr, sess=%p, se_id=%s, status=%u, busy=%u, wait=%u, idle=%u, closing=%u, rt=%#x" , sc_sess , sc_sess->scse_id , sc_t_ctx->sct_status , SC_BUSY_THREADS(sc_ctx) , SC_WAIT_THREADS(sc_ctx) , SC_IDLE_THREADS(sc_ctx) , SC_CLOSING_THREADS(sc_ctx) , rt ); sc_t_ctx->sct_status = SC_T_BUSY; /* check sc_sess->scse_state */ /* XXX RT_Q2C_TAID_CS doesn't seem to be possible? */ if (RT_Q2C_CSEID == rt || RT_Q2C_TAID_CS == rt) SCSE_SET_FLAG(sc_sess, SCSE_FL_CLOSE); } } } /* XXX "fall through" when SEID but session already closed? */ if (RT_Q2C_NSEID == rt || RT_Q2C_NSEID_1TA == rt) { uint tried; for (tried = 0;;) { sc_t_ctx = sc_get_free_thr(sc_ctx); if (sc_t_ctx != NULL) break; sm_log_write(sc_ctx->scc_lctx, SC_LCAT_INTERN, SC_LMOD_INTERN, SM_LOG_WARN, 8, "sev=WARN, func=sc_rcb_from_qmgr, sc_get_free_thr=failed, id=%s, waitthr=%u, idle=%u, closing=%u, busy=%u, total=%u, max=%u, tried=%u" , sess_id , SC_WAIT_THREADS(sc_ctx) , SC_IDLE_THREADS(sc_ctx) , SC_CLOSING_THREADS(sc_ctx) , SC_BUSY_THREADS(sc_ctx) , SC_TOTAL_THREADS(sc_ctx) , SC_MAX_THREADS(sc_ctx) , tried); SM_ASSERT(SC_WAIT_THREADS(sc_ctx) >= SC_IDLE_THREADS(sc_ctx)); if (SC_TOTAL_THREADS(sc_ctx) < SC_MAX_THREADS(sc_ctx) && !SC_IS_FLAG(sc_ctx, SCC_FL_SHUTDOWN)) { /* Create a thread */ if (st_thread_create(sc_hdl_requests, (void *) sc_ctx, 0, 0) == NULL) { sm_log_write(sc_ctx->scc_lctx, SC_LCAT_INTERN, SC_LMOD_INTERN, SM_LOG_ERR, 1, "sev=ERROR, func=sc_rcb_from_qmgr, status=cannot_create_thread, func=sc_rcb_from_qmgr, sc_get_free_thr=failed, id=%s, maxthr=%u, wait=%u, min_wait=%u, idle=%u, closing=%u, busy=%u, total=%u, tried=%u, errno=%d", sess_id, SC_MAX_THREADS(sc_ctx), SC_WAIT_THREADS(sc_ctx), SC_MIN_WAIT_THREADS(sc_ctx), SC_IDLE_THREADS(sc_ctx), SC_CLOSING_THREADS(sc_ctx), SC_BUSY_THREADS(sc_ctx), SC_TOTAL_THREADS(sc_ctx), tried, errno); } else { SC_WAIT_THREADS(sc_ctx)++; /* let sc_hdl_requests() start*/ st_sleep(0); sc_t_ctx = sc_get_free_thr(sc_ctx); sm_log_write(sc_ctx->scc_lctx, SC_LCAT_INTERN, SC_LMOD_INTERN, SM_LOG_INFO, 14, "sev=INFO, func=sc_rcb_form_qmgr, wait=%u, idle=%u, closing=%u, busy=%u, sc_hdl_requests=created, sc_t_ctx=%p" , SC_WAIT_THREADS(sc_ctx) , SC_IDLE_THREADS(sc_ctx) , SC_CLOSING_THREADS(sc_ctx) , SC_BUSY_THREADS(sc_ctx) , sc_t_ctx ); if (sc_t_ctx != NULL) break; } } if (SC_TOTAL_THREADS(sc_ctx) >= SC_MAX_THREADS(sc_ctx) || tried > SC_MAX_THREADS(sc_ctx)) { sm_log_write(sc_ctx->scc_lctx, SC_LCAT_INTERN, SC_LMOD_INTERN, SM_LOG_ERR, 4, "sev=ERROR, func=sc_rcb_from_qmgr, sc_get_free_thr=failed" ", id=%s, max=%u, wait=%u, min_wait=%u" ", idle=%u, closing=%u, busy=%u, total=%u, tried=%u" , sess_id, SC_MAX_THREADS(sc_ctx) , SC_WAIT_THREADS(sc_ctx) , SC_MIN_WAIT_THREADS(sc_ctx) , SC_IDLE_THREADS(sc_ctx) , SC_CLOSING_THREADS(sc_ctx) , SC_BUSY_THREADS(sc_ctx) , SC_TOTAL_THREADS(sc_ctx) , tried); #if SC_DEBUG for (r = 0; r < SC_MAX_THREADS(sc_ctx); r++) { sc_t_ctx = (sc_ctx->scc_scts)[r]; sm_io_fprintf(smioerr, "i=%3d, sc_t_ctx=%p, status=%u\n" , r, sc_t_ctx , sc_t_ctx != NULL ? sc_t_ctx->sct_status : 9 , (sc_t_ctx != NULL && sc_t_ctx->sct_sess != NULL) ? sc_t_ctx->sct_sess->scse_id : "-" ); } SM_ASSERT(0); #endif /* SC_DEBUG */ /* fixme: Better error code... */ ret = sm_error_temp(SM_EM_SMTPC, ENOMEM); goto errslow; } /* let sc_hdl_request() do something */ st_sleep(0); ++tried; } #if 0 SM_ASSERT(sc_t_ctx != NULL); #endif SC_LEV_DPRINTF(sc_ctx, 2, (smioerr, "sc_rcb_from_qmgr: sc_get_free_thr=%u, wait=%u, busy=%u, id=%s, sc_sess=%p\n", sc_t_ctx->sct_thr_id, SC_WAIT_THREADS(sc_ctx), SC_BUSY_THREADS(sc_ctx), sess_id, (long) sc_t_ctx->sct_sess)); sc_sess = sc_t_ctx->sct_sess; sm_log_write(sc_ctx->scc_lctx, SC_LCAT_CLIENT, SC_LMOD_CLIENT, SM_LOG_DEBUG, 11, "sev=DBG, func=sc_rcb_from_qmgr, where=new_task, sess=%p, se_id=%s, status=%u, busy=%u, wait=%u, idle=%u, closing=%u" , sc_sess, sess_id , sc_t_ctx->sct_status , SC_BUSY_THREADS(sc_ctx) , SC_WAIT_THREADS(sc_ctx) , SC_IDLE_THREADS(sc_ctx) , SC_CLOSING_THREADS(sc_ctx) ); sc_t_ctx->sct_status = SC_T_BUSY; ret = sc_add_sess_rq(sc_sess, c2q_ctx); if (sm_is_err(ret)) { sm_log_write(sc_ctx->scc_lctx, SC_LCAT_INTERN, SC_LMOD_INTERN, SM_LOG_ERR, 5, "sev=ERROR, func=sc_rcb_from_qmgr, se_id=%s, sc_add_sess_rq=%m", sess_id, ret); sc_t_ctx->sct_status = SC_T_FREE; goto errdec; } SESSTA_COPY(sc_sess->scse_id, sess_id); /* ** sc_get_free_thr() must return a thread context ** with an initialized sc_sess. */ SCSE_INHERIT_FLAG(sc_sess); sc_sess->scse_cap = SCSE_CAP_NONE; /* only one shot? */ if (RT_Q2C_SEID_1TA == rt || RT_Q2C_NSEID_1TA == rt) SCSE_SET_FLAG(sc_sess, SCSE_FL_LAST_TA); } /* sc_t_ctx != NULL && sc_sess != NULL */ SM_IS_SC_T_CTX(sc_t_ctx); SM_IS_SC_SE(sc_sess); if (!SM_IS_FLAG(v, DA_FL_SE_KEEP)) SCSE_SET_FLAG(sc_sess, SCSE_FL_LAST_TA); /* ** Just "exchange" our RCB and the one of the session ** Notice: this requires that there is only one outstanding ** request per session. Otherwise we may exchange the RCB ** while the session is using it (which can only happen if ** the RCB is used across a scheduling point). */ rcbt = sc_t_ctx->sct_rcb; SM_ASSERT(rcbt != NULL); sc_t_ctx->sct_rcb = rcb; rcb = rcbt; /* notify session */ r = st_cond_signal(sc_t_ctx->sct_cond_rd); if (r != 0) { sm_log_write(sc_ctx->scc_lctx, SC_LCAT_INTERN, SC_LMOD_INTERN, SM_LOG_ERR, 6, "sev=ERROR, func=sc_rcb_from_qmgr, st_cond_signal=%d", r); goto errdec; } /* ** Do NOT close the RCB: we exchanged it and we should ** have now a "clean" (closed) one. */ continue; errslow: /* ** Can't process request from QMGR due to resource shortage. ** Need to tell qmgr about the problem, at least: ** task could not be accepted due to resource ** problems. It might be nice to tell it also: ** only N total threads will be available from ** now on (how to increase that number later on again?) */ ret = sm_rcb_putrec(c2q_ctx->c2q_notify_rcb, RCB_PUTR_DFLT, 0, -1, SM_RCBV_INT, RT_PROT_VER, PROT_VER_RT, SM_RCBV_INT, RT_C2Q_ID, c2q_ctx->c2q_sc_ctx->scc_cnf.sc_cnf_id, SM_RCBV_INT, RT_C2Q_STAT, QSC_ST_SLOW_2, SM_RCBV_INT, RT_C2Q_MAXTHR, SC_TOTAL_THREADS(sc_ctx), SM_RCBV_BUF, RT_C2Q_SEID, sess_id, SMTP_STID_SIZE, SM_RCBV_INT2, RT_C2Q_SESTAT, ret, DA_SE_ERR_CRT_I, SM_RCBV_END); if (!sm_is_err(ret)) { ret = sc_rcb_send(c2q_ctx->c2q_notify_rcb, c2q_ctx); if (sm_is_err(ret)) sm_log_write(sc_ctx->scc_lctx, SC_LCAT_COMM, SC_LMOD_COMM, SM_LOG_ERR, 6, "sev=ERROR, func=sc_rcb_from_qmgr, sc_rcb_send=%m", ret); } else sm_log_write(sc_ctx->scc_lctx, SC_LCAT_COMM, SC_LMOD_COMM, SM_LOG_ERR, 6, "sev=ERROR, func=sc_rcb_from_qmgr, sm_rcb_putrec=%m", ret); errdec: sm_log_write(sc_ctx->scc_lctx, SC_LCAT_COMM, SC_LMOD_COMM, SM_LOG_ERR, 6, "sev=ERROR, func=sc_rcb_from_qmgr, l=%d, rt=%#x, ret=%m", l, rt, ret); ret = sm_rcb_close_dec(rcb); if (sm_is_err(ret)) { /* COMPLAIN */ continue; } } goto error; error: if (rcb != NULL) sm_rcb_free(rcb); return NULL; } /* ** C2Q_CONNECT -- connect to server ** ** Parameters: ** sc_ctx -- SMTPC context ** c2q_ctx -- C2Q context ** ** Returns: ** usual sm_error code */ static sm_ret_T c2q_connect(sc_ctx_P sc_ctx, c2q_ctx_P c2q_ctx) { sm_ret_T ret; sm_rcb_P rcb; time_t time1; SM_IS_C2Q_CTX(c2q_ctx); SM_IS_SC_CTX(sc_ctx); time1 = st_time(); for (;;) { ret = un_st_client_connect(sc_ctx->scc_cnf.sc_cnf_smtpcsock_abs, SEC2USEC(sc_ctx->scc_cnf.sc_cnf_wait4srv), &c2q_ctx->c2q_fd); if (sm_is_err(ret) && time1 + sc_ctx->scc_cnf.sc_cnf_wait4srv > st_time()) { st_sleep(1); } else break; } if (sm_is_err(ret)) { sm_log_write(sc_ctx->scc_lctx, SC_LCAT_INIT, SC_LMOD_TO_QMGR, SM_LOG_ERR, 1, "sev=ERROR, func=c2q_connect, socket=%s, connect=%m" , sc_ctx->scc_cnf.sc_cnf_smtpcsock_abs, ret); goto error; } rcb = c2q_ctx->c2q_notify_rcb; ret = sm_rcb_putrec(rcb, RCB_PUTR_DFLT, 0, -1, SM_RCBV_INT, RT_PROT_VER, PROT_VER_RT, SM_RCBV_INT, RT_C2Q_NID, c2q_ctx->c2q_sc_ctx->scc_cnf.sc_cnf_id, SM_RCBV_INT, RT_C2Q_MAXTHR, SC_MAX_THREADS(sc_ctx), SM_RCBV_END); if (sm_is_err(ret)) goto error; ret = sc_rcb_send(rcb, c2q_ctx); if (sm_is_err(ret)) goto error; return SM_SUCCESS; error: /* cleanup? */ return ret; } /* ** C2Q_INIT -- initialize C2Q ** ** Parameters: ** sc_ctx -- SMTPC context ** c2q_ctx -- C2Q context ** max_ses -- maximum number of open sessions ** ** Returns: ** usual sm_error code */ sm_ret_T c2q_init(sc_ctx_P sc_ctx, c2q_ctx_P c2q_ctx, uint max_ses) { sm_ret_T ret; sm_rcb_P rcb; st_thread_t rdtsk; size_t n; /* should we allocate the context here? */ SM_REQUIRE(c2q_ctx != NULL); SM_REQUIRE(sc_ctx != NULL); /* clear out data */ sm_memzero(c2q_ctx, sizeof(*c2q_ctx)); rcb = NULL; c2q_ctx->c2q_sc_ctx = sc_ctx; c2q_ctx->c2q_fd = INVALID_NETFD; c2q_ctx->c2q_cond_rd = st_cond_new(); if (NULL == c2q_ctx->c2q_cond_rd) { ret = sm_error_temp(SM_EM_SMTPC, errno); goto error; } c2q_ctx->c2q_wr_mutex = st_mutex_new(); if (NULL == c2q_ctx->c2q_wr_mutex) { ret = sm_error_temp(SM_EM_SMTPC, errno); goto error; } rcb = sm_rcb_new(NULL, C2Q_RCB_SIZE, QSS_RC_MAXSZ); if (NULL == rcb) goto errnomem; c2q_ctx->c2q_notify_rcb = rcb; c2q_ctx->c2q_max_ses = max_ses; #if 0 c2q_ctx->c2q_cur_ses = 0; #endif n = max_ses * sizeof(*c2q_ctx->c2q_sess); c2q_ctx->c2q_sess = (sc_sess_P *) sm_zalloc(n); if (NULL == c2q_ctx->c2q_sess) goto errnomem; c2q_ctx->sm_magic = SM_C2Q_CTX_MAGIC; ret = c2q_connect(sc_ctx, c2q_ctx); if (sm_is_err(ret)) goto error; c2q_ctx->c2q_status = C2Q_ST_OK; rdtsk = st_thread_create(sc_rcb_from_qmgr, (void *) c2q_ctx, 0, 0); if (NULL == rdtsk) { ret = sm_error_temp(SM_EM_SMTPC, errno); goto error; } return SM_SUCCESS; errnomem: ret = sm_error_temp(SM_EM_SMTPC, ENOMEM); error: SM_FREE(c2q_ctx->c2q_sess); if (rcb != NULL) sm_rcb_free(rcb); if (c2q_ctx->c2q_wr_mutex != NULL) { st_mutex_destroy(c2q_ctx->c2q_wr_mutex); c2q_ctx->c2q_wr_mutex = NULL; } if (c2q_ctx->c2q_cond_rd != NULL) { st_cond_destroy(c2q_ctx->c2q_cond_rd); c2q_ctx->c2q_cond_rd = NULL; } if (c2q_ctx->c2q_fd != INVALID_NETFD) { (void) un_st_socket_close(c2q_ctx->c2q_fd); c2q_ctx->c2q_fd = INVALID_NETFD; } return ret; } /* ** C2Q_CLOSE -- close connection to server ** ** Parameters: ** c2q_ctx -- C2Q context ** ** Returns: ** usual sm_error code */ static sm_ret_T c2q_close(c2q_ctx_P c2q_ctx) { sm_ret_T ret; SM_IS_C2Q_CTX(c2q_ctx); if (c2q_ctx->c2q_fd != INVALID_NETFD) { ret = un_st_socket_close(c2q_ctx->c2q_fd); if (sm_is_err(ret)) goto error; c2q_ctx->c2q_fd = INVALID_NETFD; } c2q_ctx->c2q_status = C2Q_ST_CLOSED; return SM_SUCCESS; error: return ret; } /* ** C2Q_STOP -- stop C2Q ** ** Parameters: ** c2q_ctx -- C2Q context ** ** Returns: ** usual sm_error code */ sm_ret_T c2q_stop(c2q_ctx_P c2q_ctx) { sm_ret_T ret; SM_IS_C2Q_CTX(c2q_ctx); ret = c2q_close(c2q_ctx); if (c2q_ctx->c2q_wr_mutex != NULL) { st_mutex_destroy(c2q_ctx->c2q_wr_mutex); c2q_ctx->c2q_wr_mutex = NULL; } if (c2q_ctx->c2q_cond_rd != NULL) { st_cond_destroy(c2q_ctx->c2q_cond_rd); c2q_ctx->c2q_cond_rd = NULL; } SM_FREE(c2q_ctx->c2q_sess); sm_memzero(c2q_ctx, sizeof(*c2q_ctx)); c2q_ctx->sm_magic = SM_MAGIC_NULL; /* not needed if this is 0 */ /* free c2q_ctx? only if allocated above! */ return ret; }