/* * Copyright (c) 2002-2005 Sendmail, Inc. and its suppliers. * All rights reserved. * * By using this file, you agree to the terms and conditions set * forth in the LICENSE file which can be found at the top level of * the sendmail distribution. */ #include "sm/generic.h" SM_RCSID("@(#)$Id: qss_control.c,v 1.37 2007/06/22 14:49:56 ca Exp $") #include "sm/error.h" #include "sm/assert.h" #include "sm/io.h" #include "sm/rcb.h" #include "sm/qmgr.h" #include "sm/qmgr-int.h" #include "sm/reccom.h" #include "qmgr.h" #include "qm_throttle.h" #include "log.h" /* ** XXX Note: these functions apply only to a single SMTP server. qss_control() ** and qss_unthrottle() should probably apply to all available servers. */ /* ** Q2S_THROTTLE -- Tell (single) SMTPS to (un)throttle ** ** Parameters: ** qss_ctx -- QMGR/SMTPS context ** tsk -- evthr task ** nthreads -- number of threads for SMTPS ** ** Returns: ** usual sm_error code ** ** Locking: ** qss_ctx should be "locked", i.e., under control of the caller. ** ** Last code review: 2005-04-22 00:11:44 ** Last code change: */ sm_ret_T q2s_throttle(qss_ctx_P qss_ctx, sm_evthr_task_P tsk, uint nthreads) { sm_rcb_P rcb; sm_rcbe_P rcbe; sm_ret_T ret; SM_IS_QSS_CTX(qss_ctx); SM_IS_EVTHR_TSK(tsk); ret = sm_rcbe_new_enc(&rcbe, -1, 0); if (sm_is_err(ret)) goto error; rcb = &rcbe->rcbe_rcb; ret = sm_rcb_putv(rcb, RCB_PUTV_FIRST, SM_RCBV_INT, RT_PROT_VER, PROT_VER_RT, SM_RCBV_INT, RT_Q2S_ID, qss_ctx->qss_id, SM_RCBV_INT, RT_Q2S_THRDS, (uint32_t) nthreads, SM_RCBV_END); if (sm_is_err(ret)) goto error; ret = sm_rcbcom_endrep(&qss_ctx->qss_com, tsk, false, &rcbe); if (sm_is_err(ret)) goto error; return SM_SUCCESS; error: /* cleanup? */ if (rcbe != NULL) sm_rcbe_free(rcbe); QM_LEV_DPRINTFC(QDC_RSRC, 0, (QM_DEBFP, "sev=ERROR, func=q2s_throttle, ret=%r\n", ret)); return ret; } /* ** QSS_CONTROL -- (un)throttle (single) SMTPS as needed ** ** Parameters: ** qss_ctx -- QMGR/SMTPS context ** direction -- up/down/any direction ** use -- fullness of cache/DB/resource (0..100) ** resource -- which resource? ** locktype -- kind of locking ** ** Returns: ** usual sm_error code ** ** Called by: ** qss_comp_control() ** sm_q_ctaid() ** sm_q_nseid() ** sm_q_rcptid() ** qm_fr_ss_react() ** ** Locking: ** qss_ctx should be "locked", i.e., under control of the caller. ** qmgr_ctx is locked here (additional parameter in case a caller ** has a lock on qmgr_ctx?) ** ** Last code review: 2005-04-23 21:31:00 ** Last code change: */ sm_ret_T qss_control(qss_ctx_P qss_ctx, int direction, uint use, uint resource, thr_lock_T locktype) { sm_ret_T ret; int r; sm_evthr_task_P tsk; qmgr_ctx_P qmgr_ctx; SM_IS_QSS_CTX(qss_ctx); SM_REQUIRE(resource <= QMGR_RFL_LAST_I); tsk = qss_ctx->qss_com.rcbcom_tsk; SM_IS_EVTHR_TSK(tsk); qmgr_ctx = qss_ctx->qss_qmgr_ctx; SM_IS_QMGR_CTX(qmgr_ctx); ret = SM_SUCCESS; /* ** XXX This should be "smoother" and more fine-grained. ** The number of threads is set based on the overall usage, i.e., ** if there's n% usage then there should be (100-n)% of maximum ** threads. ** However, it's not that simple: all resources are combined into ** a single value which however probably has no direct ** relationship to the number of allowed threads in SMTPS ** (when "unthrottling" SMTPS). ** Currently the number of threads is either set to the number ** of open sessions or simply divided by 2 every time the usage ** exceeds the upper resource limit. ** ** Moreover, we should change the priority of tasks within the ** QMGR if that helps to overcome a resource shortage, e.g., ** if SMTPS receives mail too fast and IQDB overflows, then ** SMAR and the scheduler need to run faster/more often. ** However, currently we don't have priorities for tasks... */ /* ** Throttle the system iff ** - asked for and ** - the use of the resource exceeds the upper threshold and ** - the current max.number of threads is greater than 0 ** ** else unthrottle the system iff ** - asked for and ** - the use of the resource is below the lower threshold and ** - the current max.number of threads is less the allowed max. and ** - the system is not in state OK (???) and ** - there is no resource shortage at all */ QM_LEV_DPRINTFC(QDC_RSRC, 6, (QM_DEBFP, "func=qss_control, id=%d, dir=%d, use=%u, max_cur_thrs=%u, max_thrs=%u, resource=%d, flags=%#x\n", qss_ctx->qss_id, direction, use, qss_ctx->qss_max_cur_thrs, qss_ctx->qss_max_thrs, resource, qmgr_ctx->qmgr_rflags)); if (thr_lock_it(locktype)) { r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex); SM_LOCK_OK(r); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_CONTROL, QM_LMOD_CONTROL, SM_LOG_CRIT, 4, "sev=CRIT, func=qss_control, lock=%d\n", r); return sm_error_temp(SM_EM_Q, r); } } if (qmgr_is_throttle(direction) && use > qmgr_ctx->qmgr_upper[resource] && qss_ctx->qss_max_cur_thrs > 0) { bool change, logged; QM_LEV_DPRINTFC(QDC_RSRC, 2, (QM_DEBFP, "func=qss_control, id=%d, status=throttle, use=%u, max_cur_thrs=%u, max_thrs=%u, cur_se=%u, resource=%d, flags=%#x\n", qss_ctx->qss_id, use, qss_ctx->qss_max_cur_thrs, qss_ctx->qss_max_thrs, qss_ctx->qss_cur_session, resource, qmgr_ctx->qmgr_rflags)); logged = sm_log_wouldlog(qmgr_ctx->qmgr_lctx, QM_LCAT_CONTROL, QM_LMOD_CONTROL, 10); sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_CONTROL, QM_LMOD_CONTROL, SM_LOG_INFO, 10, "sev=INFO, func=qss_control, id=%d, status=throttle, use=%u, max_cur_thrs=%u, max_thrs=%u, resource=%d, flags=%#x" , qss_ctx->qss_id, use, qss_ctx->qss_max_cur_thrs , qss_ctx->qss_max_thrs, resource , qmgr_ctx->qmgr_rflags); qmgr_ctx->qmgr_usage[resource] = use; QMGR_SET_USE_MAX(resource); change = true; if (use >= QMGR_R_USE_FULL) qss_ctx->qss_max_cur_thrs = 0; else if (qss_ctx->qss_max_cur_thrs > qss_ctx->qss_cur_session) qss_ctx->qss_max_cur_thrs = qss_ctx->qss_cur_session; else if (qss_ctx->qss_max_cur_thrs > 1) qss_ctx->qss_max_cur_thrs >>= 1; else change = false; if (change) { ret = q2s_throttle(qss_ctx, tsk, qss_ctx->qss_max_cur_thrs); if (0 == qss_ctx->qss_max_cur_thrs && !logged) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_CONTROL, QM_LMOD_CONTROL, SM_LOG_WARN, 2, "sev=WARN, func=qss_control, id=%d, status=throttle, use=%u, max_cur_thrs=%u, max_thrs=%u, resource=%d, flags=%#x" , qss_ctx->qss_id, use, qss_ctx->qss_max_cur_thrs , qss_ctx->qss_max_thrs, resource , qmgr_ctx->qmgr_rflags); } } qss_ctx->qss_status = QSS_ST_SLOW_0; QMGR_SET_RFLAG_I(qmgr_ctx, resource); } else if (qmgr_is_un_throttle(direction) && use < qmgr_ctx->qmgr_lower[resource] && qss_ctx->qss_status != QSS_ST_OK && qss_ctx->qss_max_cur_thrs < qss_ctx->qss_max_thrs) { QM_LEV_DPRINTFC(QDC_RSRC, 2, (QM_DEBFP, "func=qss_control, id=%d, status=unthrottle, use=%u, max_cur_thrs=%u, max_thrs=%u, resource=%d, flags=%#x\n", qss_ctx->qss_id, use, qss_ctx->qss_max_cur_thrs, qss_ctx->qss_max_thrs, resource, qmgr_ctx->qmgr_rflags)); qmgr_ctx->qmgr_usage[resource] = use; QMGR_CLR_RFLAG_I(qmgr_ctx, resource); if (QMGR_RFL_IS_NO_RSR(qmgr_ctx)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_CONTROL, QM_LMOD_CONTROL, SM_LOG_INFO, 9, "sev=INFO, func=qss_control, id=%d, status=unthrottle, use=%u, max_cur_thrs=%u, max_thrs=%u, resource=%d, flags=%#x" , qss_ctx->qss_id, use , qss_ctx->qss_max_cur_thrs , qss_ctx->qss_max_thrs, resource , qmgr_ctx->qmgr_rflags); ret = qm_comp_resource(qmgr_ctx, THR_NO_LOCK); SM_ASSERT(100 >= qmgr_ctx->qmgr_total_usage); QS_UNTHROTTLE("qss_control"); } } if (thr_unl_always(locktype)) { r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex); SM_ASSERT(0 == r); if (r != 0 && sm_is_success(ret)) ret = sm_error_perm(SM_EM_Q, r); } return ret; } /* ** QSS_UNTHROTTLE -- unthrottle (single) SMTPS as needed ** ** Parameters: ** qss_ctx -- QMGR/SMTPS context ** ** Returns: ** usual sm_error code ** ** Called by: sm_smtps_wakeup() ** ** Locking: ** qss_ctx should be "locked", i.e., under control of the caller. ** qmgr_ctx is locked here (additional parameter in case a caller ** has a lock on qmgr_ctx?) ** ** Last code review: 2005-04-23 21:36:45 ** Last code change: */ sm_ret_T qss_unthrottle(qss_ctx_P qss_ctx) { sm_ret_T ret; int r; sm_evthr_task_P tsk; qmgr_ctx_P qmgr_ctx; SM_IS_QSS_CTX(qss_ctx); tsk = qss_ctx->qss_com.rcbcom_tsk; SM_IS_EVTHR_TSK(tsk); qmgr_ctx = qss_ctx->qss_qmgr_ctx; SM_IS_QMGR_CTX(qmgr_ctx); ret = SM_SUCCESS; r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex); SM_LOCK_OK(r); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_CONTROL, QM_LMOD_CONTROL, SM_LOG_CRIT, 4, "sev=CRIT, func=qss_unthrottle, lock=%d\n", r); return sm_error_temp(SM_EM_Q, r); } QM_LEV_DPRINTFC(QDC_RSRC, 6, (QM_DEBFP, "func=qss_unthrottle, id=%d, max_cur_thrs=%u, max_thrs=%u, flags=%#x\n", qss_ctx->qss_id, qss_ctx->qss_max_cur_thrs, qss_ctx->qss_max_thrs, qmgr_ctx->qmgr_rflags)); if (qss_ctx->qss_status != QSS_ST_OK && qss_ctx->qss_max_cur_thrs < qss_ctx->qss_max_thrs && QMGR_RFL_IS_NO_RSR(qmgr_ctx)) { ret = qm_comp_resource(qmgr_ctx, THR_NO_LOCK); SM_ASSERT(100 >= qmgr_ctx->qmgr_total_usage); QS_UNTHROTTLE("qss_unthrottle"); } r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex); SM_ASSERT(0 == r); if (r != 0 && sm_is_success(ret)) ret = sm_error_perm(SM_EM_Q, r); return ret; }