/* * Copyright (c) 2004, 2005 Sendmail, Inc. and its suppliers. * All rights reserved. * * By using this file, you agree to the terms and conditions set * forth in the LICENSE file which can be found at the top level of * the sendmail distribution. */ #include "sm/generic.h" SM_RCSID("@(#)$Id: qm_ctl.c,v 1.20 2006/12/31 22:40:16 ca Exp $") #include "sm/error.h" #include "sm/assert.h" #include "sm/io.h" #include "sm/rcb.h" #include "sm/qmgr.h" #include "sm/qmgr-int.h" #include "qmgr.h" #include "sm/rcbcomm.h" #include "sm/reccom.h" #include "log.h" /* ** Control/Query interface to/for QMGR */ /* ** QM_TRIGGER_RCPT -- try to "trigger" a recipient so it will be scheduled ** ** Parameters: ** qmgr_ctx -- QMGR context ** rcpt_id -- id of recipient ** ** Returns: ** usual sm_error code */ static sm_ret_T qm_trigger_rcpt(qmgr_ctx_P qmgr_ctx, rcpt_id_T rcpt_id) { sm_ret_T ret; int r; bool notify_sched; timeval_T now_tv; time_T next_try; edbc_node_P edbc_node; notify_sched = false; ret = evthr_timeval(qmgr_ctx->qmgr_ar_tsk->evthr_t_ctx, &now_tv); if (sm_is_err(ret)) return ret; next_try = now_tv.tv_sec; r = pthread_mutex_lock(&qmgr_ctx->qmgr_edbc->edbc_mutex); SM_LOCK_OK(r); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_COMM, QM_LMOD_QUERY, SM_LOG_ERR, 1, "sev=CRIT, func=qm_trigger_rcpt, lock_edbc=%m", sm_err_perm(r)); return sm_err_perm(r); } if (edbc_exists(qmgr_ctx->qmgr_edbc, rcpt_id, &edbc_node)) { ret = edbc_mv(qmgr_ctx->qmgr_edbc, edbc_node, next_try); QM_LEV_DPRINTFC(QDC_CTL2Q, 7, (QM_DEBFP, "sev=DBG, func=qm_trigger_rcpt, edbc_mv=%r\n", ret)); if (sm_is_success(ret)) notify_sched = true; } else { edb_req_P edb_req; edb_req = NULL; ret = edb_req_new(qmgr_ctx->qmgr_edb, EDB_RQF_NONE, &edb_req, true); if (sm_is_success(ret)) { RCPT_ID_COPY(edb_req->edb_req_id, rcpt_id); edb_req->edb_req_type = EDB_REQ_RCPT; ret = edb_rd_req(qmgr_ctx->qmgr_edb, edb_req); if (sm_is_success(ret)) { ret = edbc_add(qmgr_ctx->qmgr_edbc, rcpt_id, next_try, false); QM_LEV_DPRINTFC(QDC_CTL2Q, 7, (QM_DEBFP, "sev=DBG, func=qm_trigger_rcpt, edbc_add=%r\n", ret)); if (sm_is_success(ret)) notify_sched = true; } } if (edb_req != NULL) { (void) edb_req_rel(qmgr_ctx->qmgr_edb, edb_req, (sm_is_err(ret) && sm_error_value(ret) == ENOMEM) ? EDB_RQF_FREE : 0, THR_LOCK_UNLOCK); edb_req = NULL; /* not really necessary */ } } r = pthread_mutex_unlock(&qmgr_ctx->qmgr_edbc->edbc_mutex); SM_ASSERT(0 == r); if (notify_sched) { /* activate scheduler. */ ret = evthr_new_sl(qmgr_ctx->qmgr_sched, now_tv, false); if (sm_is_err(ret)) { /* what to do? */ sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_COMM, QM_LMOD_QUERY, SM_LOG_ERR, 4, "sev=ERROR, func=qm_trigger_rcpt, evthr_new_sl=%m" , ret); } } sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_COMM, QM_LMOD_QUERY, SM_LOG_INFO, 10, "sev=INFO, func=qm_trigger_rcpt, notify=%d", notify_sched); return SM_SUCCESS; } /* ** QCTL_INFO -- put QMGR information into RCB ** ** Parameters: ** qmgr_ctx -- QMGR context ** tsk -- task context ** ** Returns: ** usual sm_error code */ static sm_ret_T qctl_info(qmgr_ctx_P qmgr_ctx, sm_evthr_task_P tsk) { sm_ret_T ret; sm_str_P str; sm_rcb_P rcb; sm_file_T file; sm_rcbe_P rcbe; SM_IS_QMGR_CTX(qmgr_ctx); str = NULL; rcbe = NULL; ret = sm_rcbe_new_enc(&rcbe, -1, 0); if (sm_is_err(ret)) { QM_LEV_DPRINTFC(QDC_CTL2Q, 0, (QM_DEBFP, "sev=ERROR, func=qctl_info, sm_rcbe_new=%r", ret)); goto error; } /* ** note: max and current size MUST be identical to avoid deadlock ** in heap module. */ str = sm_str_new(NULL, 16 * 1024, 16 * 1024); if (NULL == str) { ret = sm_error_temp(SM_EM_Q, ENOMEM); goto error; } ret = sm_str2file(str, &file); if (sm_is_err(ret)) { QM_LEV_DPRINTFC(QDC_CTL2Q, 0, (QM_DEBFP, "sev=ERROR, func=qctl_info, str2file=%r", ret)); goto error; } rcb = &rcbe->rcbe_rcb; ret = qm_info(qmgr_ctx, &file); if (sm_is_err(ret)) goto error; ret = sm_rcb_putv(rcb, RCB_PUTV_FIRST, SM_RCBV_INT, RT_PROT_VER, PROT_VER_RT, SM_RCBV_STR, RT_Q2CTL_INFO, str, SM_RCBV_END); if (sm_is_err(ret)) goto error; ret = sm_rcbcom_endrep(&qmgr_ctx->qmgr_ctl_com, tsk, true, &rcbe); if (sm_is_err(ret)) { QM_LEV_DPRINTFC(QDC_CTL2Q, 0, (QM_DEBFP, "sev=ERROR, func=qctl_info, sm_rcbcom_endrep=%r", ret)); goto error; } ret = evthr_en_wr(tsk); return ret; error: sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_COMM, QM_LMOD_QUERY, SM_LOG_ERR, 8, "sev=ERROR, func=qctl_info, ret=%m", ret); SM_STR_FREE(str); if (rcbe != NULL) sm_rcbe_free(rcbe); return ret; } /* ** QMGR2CTL -- QMGR - CTL interface (just send RCB) ** ** Parameters: ** tsk -- evthr task ** ** Returns: ** usual sm_error code */ static sm_ret_T qmgr2ctl(sm_evthr_task_P tsk) { qmgr_ctx_P qmgr_ctx; SM_IS_EVTHR_TSK(tsk); qmgr_ctx = (qmgr_ctx_P) tsk->evthr_t_actx; return sm_rcbcom2mod(tsk, &qmgr_ctx->qmgr_ctl_com); } /* ** QM_FR_CTL_REACT -- CTL - QMGR interface (read commands) ** ** Parameters: ** qmgr_ctx -- QMGR context ** ** Returns: ** usual sm_error code */ static sm_ret_T qm_fr_ctl_react(qmgr_ctx_P qmgr_ctx, sm_evthr_task_P tsk) { uint32_t v, v2, l, rt, tl; sm_ret_T ret, rv; sm_rcb_P rcb; rcpt_id_T rcpt_id; sessta_id_T ta_ss_id; SM_IS_QMGR_CTX(qmgr_ctx); ret = rv = SM_SUCCESS; /* decode rcb */ rcb = qmgr_ctx->qmgr_ctl_com.rcbcom_rdrcb; ret = sm_rcb_open_dec(rcb); if (sm_is_err(ret)) { rv = ret; goto error; } /* total length of record */ ret = sm_rcb_getuint32(rcb, &tl); if (sm_is_err(ret) || tl > QM_AR_MAX_REC_LEN || tl > sm_rcb_getlen(rcb)) { rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_CTL2Q, SM_E_RCB2LONG); goto err2; } /* protocol header: version */ ret = sm_rcb_get3uint32(rcb, &l, &rt, &v); if (sm_is_err(ret)) { rv = ret; goto err2; } if (l != 4 || rt != RT_PROT_VER || v != PROT_VER_RT) { rv = sm_error_perm(SM_EM_Q_CTL2Q, SM_E_PR_V_MISM); goto err2; } /* XXX define protocol first in smX docs! */ /* XXX decode data, act accordingly... */ ret = sm_rcb_get2uint32(rcb, &l, &rt); if (sm_is_err(ret)) goto err2; /* first: get rest of data */ switch (rt) { /* int */ case RT_CTL2Q_INFO: case RT_CTL2Q_R_MAP: case RT_CTL2Q_DBG: if (l != 4) { rv = sm_error_perm(SM_EM_Q_CTL2Q, SM_E_PR_ERR); goto err2; } ret = sm_rcb_getuint32(rcb, &v); if (sm_is_err(ret)) goto err2; break; /* 2 ints */ case RT_CTL2Q_DBG_C: if (l != 8) { rv = sm_error_perm(SM_EM_Q_CTL2Q, SM_E_PR_ERR); goto err2; } ret = sm_rcb_get2uint32(rcb, &v, &v2); if (sm_is_err(ret)) goto err2; break; /* rcpt-id */ case RT_CTL2Q_S_RCPT: case RT_CTL2Q_D_RCPT: if (l != SMTP_RCPTID_SIZE) { rv = sm_error_perm(SM_EM_Q_CTL2Q, SM_E_PR_ERR); goto err2; } ret = sm_rcb_getn(rcb, (uchar *) rcpt_id, l); if (sm_is_err(ret)) { rv = ret; goto error; } break; /* ta-id */ case RT_CTL2Q_D_TA: if (l != SMTP_STID_SIZE) { rv = sm_error_perm(SM_EM_Q_CTL2Q, SM_E_PR_ERR); goto err2; } ret = sm_rcb_getn(rcb, (uchar *) ta_ss_id, l); if (sm_is_err(ret)) { rv = ret; goto error; } break; default: break; } /* next: invoke appropriate function */ switch (rt) { case RT_CTL2Q_INFO: ret = qctl_info(qmgr_ctx, tsk); /* pass v? */ if (sm_is_err(ret)) goto err2; break; case RT_CTL2Q_DBG: for (l = 0; l < SM_ARRAY_SIZE(qm_debug); l++) qm_debug[l] = v; break; case RT_CTL2Q_DBG_C: if (v < SM_ARRAY_SIZE(qm_debug)) qm_debug[v] = v2; break; case RT_CTL2Q_S_RCPT: ret = qm_trigger_rcpt(qmgr_ctx, rcpt_id); break; case RT_CTL2Q_D_RCPT: break; case RT_CTL2Q_D_TA: break; case RT_CTL2Q_R_MAP: ret = qm_reopen_maps(qmgr_ctx); break; default: break; } ret = sm_rcb_close_dec(qmgr_ctx->qmgr_ctl_com.rcbcom_rdrcb); (void) sm_rcb_open_rcv(qmgr_ctx->qmgr_ctl_com.rcbcom_rdrcb); return ret; err2: /* use rcb functions that don't do check the state */ (void) sm_rcb_close_decn(qmgr_ctx->qmgr_ctl_com.rcbcom_rdrcb); error: /* open rcb for receiving next record */ (void) sm_rcb_open_rcvn(qmgr_ctx->qmgr_ctl_com.rcbcom_rdrcb); #if 0 errret: #endif /* 0 */ if (sm_is_err(ret)) QM_LEV_DPRINTFC(QDC_CTL2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_ctl_react, rt=%#x, v=%d, l=%d, ret=%r, rv=%r\n", rt, v, l, ret, rv)); return rv; } /* ** QM_FR_CTL -- CTL to QMGR interface ** ** Parameters: ** tsk -- evthr task ** ** Returns: ** usual sm_error code ** ** Locking: ** ** Last code review: */ static sm_ret_T qm_fr_ctl(sm_evthr_task_P tsk) { int fd, r; sm_ret_T ret; qmgr_ctx_P qmgr_ctx; SM_IS_EVTHR_TSK(tsk); qmgr_ctx = (qmgr_ctx_P) tsk->evthr_t_actx; SM_IS_QMGR_CTX(qmgr_ctx); fd = tsk->evthr_t_fd; /* checked in caller */ /* minsize: 4 + 4 + something */ ret = sm_rcb_rcv(fd, qmgr_ctx->qmgr_ctl_com.rcbcom_rdrcb, 11); QM_LEV_DPRINTTC(QDC_CTL2Q, 4, (QM_DEBFP, "sev=DBG, func=qm_fr_ctl, tsk=%p, fd=%d, ret=%r, buf=%d, len=%d\n", tsk, fd, ret, qmgr_ctx->qmgr_ctl_com.rcbcom_rdrcb->sm_rcb_base[0], sm_rcb_getlen(qmgr_ctx->qmgr_ctl_com.rcbcom_rdrcb)), qmgr_ctx->qmgr_ev_ctx->evthr_c_time); if (ret > 0) { return EVTHR_WAITQ; } else if (0 == ret) { ret = sm_rcb_close_rcv(qmgr_ctx->qmgr_ctl_com.rcbcom_rdrcb); /* start appropriate function ... */ ret = qm_fr_ctl_react(qmgr_ctx, tsk); QM_LEV_DPRINTFC(QDC_CTL2Q, 5, (QM_DEBFP, "sev=DBG, func=qm_fr_ctl, qm_fr_ctl_react=%r\n", ret)); if (sm_is_err(ret)) goto termit; /* too harsh? */ else if (QMGR_R_WAITQ == ret) return EVTHR_WAITQ; else if (QMGR_R_ASYNC == ret) return EVTHR_OK; else if (EVTHR_DEL == ret) goto termit; /* terminate this SMTPC */ else return ret; } else if (SM_IO_EOF == ret) { ret = sm_rcb_close_rcv(qmgr_ctx->qmgr_ctl_com.rcbcom_rdrcb); termit: QM_LEV_DPRINTFC(QDC_CTL2Q, 1, (QM_DEBFP, "sev=DBG, func=qm_fr_ctl, task=%p, status=terminate, ret=%r\n", qmgr_ctx->qmgr_ctl_com.rcbcom_tsk, ret)); close(fd); /* XXX see comment in qm_fr_ss() */ tsk->evthr_t_fd = INVALID_FD; /* make it invalid */ r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex); SM_LOCK_OK(r); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_COMM, QM_LMOD_QUERY, SM_LOG_CRIT, 1, "sev=CRIT, func=qm_fr_ctl, lock=%d", r); goto error; } qmgr_ctx->qmgr_ctl_com.rcbcom_tsk = NULL; r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex); SM_ASSERT(0 == r); return EVTHR_DEL; } else { /* if (ret < 0) */ QM_LEV_DPRINTFC(QDC_CTL2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_ctl, ret=%r, errno=%d\n", ret, errno)); } QM_LEV_DPRINTFC(QDC_CTL2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_ctl, fd=%d\n", fd)); error: return EVTHR_DEL; } /* ** QMGR_CTL -- QMGR - CTL interface ** This runs as a task. ** ** Parameters: ** tsk -- evthr task ** ** Returns: ** usual sm_error code ** ** Last code review: */ static sm_ret_T qmgr_ctl(sm_evthr_task_P tsk) { sm_ret_T ret; SM_IS_EVTHR_TSK(tsk); QM_LEV_DPRINTFC(QDC_Q2C, 5, (QM_DEBFP, "func=qmgr_ctl, fd=%d, ev=%#x\n", tsk->evthr_t_fd, evthr_rqevents(tsk))); if (is_valid_fd(tsk->evthr_t_fd)) { ret = EVTHR_WAITQ; if (evthr_got_wr(tsk)) { ret = qmgr2ctl(tsk); /* XXX check ret here? */ if (sm_is_err(ret)) QM_LEV_DPRINTFC(QDC_Q2C, 1, (QM_DEBFP, "sev=ERROR, func=qmgr_ctl, qmgr2ctl=%r\n", ret)); } if (evthr_got_rd(tsk)) { ret = qm_fr_ctl(tsk); if (sm_is_err(ret)) QM_LEV_DPRINTFC(QDC_Q2C, 1, (QM_DEBFP, "sev=ERROR, func=qmgr_ctl, qm_fr_ctl=%r\n", ret)); } return ret; } return EVTHR_DEL; } /* ** QM_CTL_LI -- Handle new connections from CTL ** This runs as a (listen) task. ** ** Parameters: ** tsk -- evthr task ** ** Returns: ** usual sm_error code ** ** Last code review: 2003-10-17 16:20:14, see below ** ** Todo: Can this be a special case of qm_gen_li()? */ sm_ret_T qm_ctl_li(sm_evthr_task_P tsk) { int fd, r; sm_ret_T ret; sm_evthr_task_P task; qmgr_ctx_P qmgr_ctx; SM_IS_EVTHR_TSK(tsk); SM_REQUIRE(tsk->evthr_t_nc != NULL); fd = INVALID_FD; qmgr_ctx = (qmgr_ctx_P) tsk->evthr_t_actx; SM_IS_QMGR_CTX(qmgr_ctx); QM_LEV_DPRINTFC(QDC_Q2C, 2, (QM_DEBFP, "func=qm_ctl_li, new connection\n")); /* add the new connection */ if (tsk->evthr_t_nc != NULL && is_valid_fd(fd = tsk->evthr_t_nc->evthr_a_fd)) { r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex); SM_LOCK_OK(r); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_COMM, QM_LMOD_QUERY, SM_LOG_CRIT, 4, "sev=CRIT, func=qm_ctl_li, lock=%d\n", r); goto error; } QM_LEV_DPRINTFC(QDC_Q2C, 5, (QM_DEBFP, "func=qm_ctl_li, lock=gotit\n")); if (is_valid_fd(qmgr_ctx->qmgr_ctlfd)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_COMM, QM_LMOD_QUERY, SM_LOG_ERR, 8, "sev=ERROR, func=qm_ctl_li, ctlfd=%d" , qmgr_ctx->qmgr_ctlfd); goto err2; } ret = sm_fd_nonblock(fd, true); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_COMM, QM_LMOD_QUERY, SM_LOG_ERROR, 4, "sev=ERROR, func=qm_ctl_li, sm_fd_nonblock=%m" , ret); goto err2; } ret = sm_rcb_open_rcvn(qmgr_ctx->qmgr_ctl_com.rcbcom_rdrcb); if (sm_is_err(ret)) goto error; ret = evthr_task_new(qmgr_ctx->qmgr_ev_ctx, &task, EVTHR_EV_RD, fd, NULL, qmgr_ctl, qmgr_ctx); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_COMM, QM_LMOD_COMM, SM_LOG_ERROR, 4, "sev=ERROR, func=qm_ctl_li, evthr_task_new=%m" , ret); goto err2; } else QM_LEV_DPRINTFC(QDC_Q2C, 2, (QM_DEBFP, "sev=DBG, func=qm_ctl_li, fd=%d, task=%p\n", fd, task)); SM_IS_EVTHR_TSK(task); qmgr_ctx->qmgr_ctl_com.rcbcom_tsk = task; } r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex); SM_ASSERT(0 == r); return EVTHR_WAITQ; err2: r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex); SM_ASSERT(0 == r); error: if (is_valid_fd(fd)) close(fd); return EVTHR_WAITQ; }