/*
* Copyright (c) 2002-2005 Sendmail, Inc. and its suppliers.
* All rights reserved.
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the sendmail distribution.
*/
#include "sm/generic.h"
SM_RCSID("@(#)$Id: qss_control.c,v 1.37 2007/06/22 14:49:56 ca Exp $")
#include "sm/error.h"
#include "sm/assert.h"
#include "sm/io.h"
#include "sm/rcb.h"
#include "sm/qmgr.h"
#include "sm/qmgr-int.h"
#include "sm/reccom.h"
#include "qmgr.h"
#include "qm_throttle.h"
#include "log.h"
/*
** XXX Note: these functions apply only to a single SMTP server. qss_control()
** and qss_unthrottle() should probably apply to all available servers.
*/
/*
** Q2S_THROTTLE -- Tell (single) SMTPS to (un)throttle
**
** Parameters:
** qss_ctx -- QMGR/SMTPS context
** tsk -- evthr task
** nthreads -- number of threads for SMTPS
**
** Returns:
** usual sm_error code
**
** Locking:
** qss_ctx should be "locked", i.e., under control of the caller.
**
** Last code review: 2005-04-22 00:11:44
** Last code change:
*/
sm_ret_T
q2s_throttle(qss_ctx_P qss_ctx, sm_evthr_task_P tsk, uint nthreads)
{
sm_rcb_P rcb;
sm_rcbe_P rcbe;
sm_ret_T ret;
SM_IS_QSS_CTX(qss_ctx);
SM_IS_EVTHR_TSK(tsk);
ret = sm_rcbe_new_enc(&rcbe, -1, 0);
if (sm_is_err(ret))
goto error;
rcb = &rcbe->rcbe_rcb;
ret = sm_rcb_putv(rcb, RCB_PUTV_FIRST,
SM_RCBV_INT, RT_PROT_VER, PROT_VER_RT,
SM_RCBV_INT, RT_Q2S_ID, qss_ctx->qss_id,
SM_RCBV_INT, RT_Q2S_THRDS, (uint32_t) nthreads,
SM_RCBV_END);
if (sm_is_err(ret))
goto error;
ret = sm_rcbcom_endrep(&qss_ctx->qss_com, tsk, false, &rcbe);
if (sm_is_err(ret))
goto error;
return SM_SUCCESS;
error:
/* cleanup? */
if (rcbe != NULL)
sm_rcbe_free(rcbe);
QM_LEV_DPRINTFC(QDC_RSRC, 0, (QM_DEBFP, "sev=ERROR, func=q2s_throttle, ret=%r\n", ret));
return ret;
}
/*
** QSS_CONTROL -- (un)throttle (single) SMTPS as needed
**
** Parameters:
** qss_ctx -- QMGR/SMTPS context
** direction -- up/down/any direction
** use -- fullness of cache/DB/resource (0..100)
** resource -- which resource?
** locktype -- kind of locking
**
** Returns:
** usual sm_error code
**
** Called by:
** qss_comp_control()
** sm_q_ctaid()
** sm_q_nseid()
** sm_q_rcptid()
** qm_fr_ss_react()
**
** Locking:
** qss_ctx should be "locked", i.e., under control of the caller.
** qmgr_ctx is locked here (additional parameter in case a caller
** has a lock on qmgr_ctx?)
**
** Last code review: 2005-04-23 21:31:00
** Last code change:
*/
sm_ret_T
qss_control(qss_ctx_P qss_ctx, int direction, uint use, uint resource, thr_lock_T locktype)
{
sm_ret_T ret;
int r;
sm_evthr_task_P tsk;
qmgr_ctx_P qmgr_ctx;
SM_IS_QSS_CTX(qss_ctx);
SM_REQUIRE(resource <= QMGR_RFL_LAST_I);
tsk = qss_ctx->qss_com.rcbcom_tsk;
SM_IS_EVTHR_TSK(tsk);
qmgr_ctx = qss_ctx->qss_qmgr_ctx;
SM_IS_QMGR_CTX(qmgr_ctx);
ret = SM_SUCCESS;
/*
** XXX This should be "smoother" and more fine-grained.
** The number of threads is set based on the overall usage, i.e.,
** if there's n% usage then there should be (100-n)% of maximum
** threads.
** However, it's not that simple: all resources are combined into
** a single value which however probably has no direct
** relationship to the number of allowed threads in SMTPS
** (when "unthrottling" SMTPS).
** Currently the number of threads is either set to the number
** of open sessions or simply divided by 2 every time the usage
** exceeds the upper resource limit.
**
** Moreover, we should change the priority of tasks within the
** QMGR if that helps to overcome a resource shortage, e.g.,
** if SMTPS receives mail too fast and IQDB overflows, then
** SMAR and the scheduler need to run faster/more often.
** However, currently we don't have priorities for tasks...
*/
/*
** Throttle the system iff
** - asked for and
** - the use of the resource exceeds the upper threshold and
** - the current max.number of threads is greater than 0
**
** else unthrottle the system iff
** - asked for and
** - the use of the resource is below the lower threshold and
** - the current max.number of threads is less the allowed max. and
** - the system is not in state OK (???) and
** - there is no resource shortage at all
*/
QM_LEV_DPRINTFC(QDC_RSRC, 6, (QM_DEBFP, "func=qss_control, id=%d, dir=%d, use=%u, max_cur_thrs=%u, max_thrs=%u, resource=%d, flags=%#x\n", qss_ctx->qss_id, direction, use, qss_ctx->qss_max_cur_thrs, qss_ctx->qss_max_thrs, resource, qmgr_ctx->qmgr_rflags));
if (thr_lock_it(locktype))
{
r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex);
SM_LOCK_OK(r);
if (r != 0)
{
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_CONTROL, QM_LMOD_CONTROL,
SM_LOG_CRIT, 4,
"sev=CRIT, func=qss_control, lock=%d\n", r);
return sm_error_temp(SM_EM_Q, r);
}
}
if (qmgr_is_throttle(direction)
&& use > qmgr_ctx->qmgr_upper[resource]
&& qss_ctx->qss_max_cur_thrs > 0)
{
bool change, logged;
QM_LEV_DPRINTFC(QDC_RSRC, 2, (QM_DEBFP, "func=qss_control, id=%d, status=throttle, use=%u, max_cur_thrs=%u, max_thrs=%u, cur_se=%u, resource=%d, flags=%#x\n", qss_ctx->qss_id, use, qss_ctx->qss_max_cur_thrs, qss_ctx->qss_max_thrs, qss_ctx->qss_cur_session, resource, qmgr_ctx->qmgr_rflags));
logged = sm_log_wouldlog(qmgr_ctx->qmgr_lctx,
QM_LCAT_CONTROL, QM_LMOD_CONTROL, 10);
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_CONTROL, QM_LMOD_CONTROL,
SM_LOG_INFO, 10,
"sev=INFO, func=qss_control, id=%d, status=throttle, use=%u, max_cur_thrs=%u, max_thrs=%u, resource=%d, flags=%#x"
, qss_ctx->qss_id, use, qss_ctx->qss_max_cur_thrs
, qss_ctx->qss_max_thrs, resource
, qmgr_ctx->qmgr_rflags);
qmgr_ctx->qmgr_usage[resource] = use;
QMGR_SET_USE_MAX(resource);
change = true;
if (use >= QMGR_R_USE_FULL)
qss_ctx->qss_max_cur_thrs = 0;
else if (qss_ctx->qss_max_cur_thrs > qss_ctx->qss_cur_session)
qss_ctx->qss_max_cur_thrs = qss_ctx->qss_cur_session;
else if (qss_ctx->qss_max_cur_thrs > 1)
qss_ctx->qss_max_cur_thrs >>= 1;
else
change = false;
if (change) {
ret = q2s_throttle(qss_ctx, tsk, qss_ctx->qss_max_cur_thrs);
if (0 == qss_ctx->qss_max_cur_thrs && !logged) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_CONTROL, QM_LMOD_CONTROL,
SM_LOG_WARN, 2,
"sev=WARN, func=qss_control, id=%d, status=throttle, use=%u, max_cur_thrs=%u, max_thrs=%u, resource=%d, flags=%#x"
, qss_ctx->qss_id, use, qss_ctx->qss_max_cur_thrs
, qss_ctx->qss_max_thrs, resource
, qmgr_ctx->qmgr_rflags);
}
}
qss_ctx->qss_status = QSS_ST_SLOW_0;
QMGR_SET_RFLAG_I(qmgr_ctx, resource);
}
else if (qmgr_is_un_throttle(direction)
&& use < qmgr_ctx->qmgr_lower[resource]
&& qss_ctx->qss_status != QSS_ST_OK
&& qss_ctx->qss_max_cur_thrs < qss_ctx->qss_max_thrs)
{
QM_LEV_DPRINTFC(QDC_RSRC, 2, (QM_DEBFP, "func=qss_control, id=%d, status=unthrottle, use=%u, max_cur_thrs=%u, max_thrs=%u, resource=%d, flags=%#x\n", qss_ctx->qss_id, use, qss_ctx->qss_max_cur_thrs, qss_ctx->qss_max_thrs, resource, qmgr_ctx->qmgr_rflags));
qmgr_ctx->qmgr_usage[resource] = use;
QMGR_CLR_RFLAG_I(qmgr_ctx, resource);
if (QMGR_RFL_IS_NO_RSR(qmgr_ctx))
{
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_CONTROL, QM_LMOD_CONTROL,
SM_LOG_INFO, 9,
"sev=INFO, func=qss_control, id=%d, status=unthrottle, use=%u, max_cur_thrs=%u, max_thrs=%u, resource=%d, flags=%#x"
, qss_ctx->qss_id, use
, qss_ctx->qss_max_cur_thrs
, qss_ctx->qss_max_thrs, resource
, qmgr_ctx->qmgr_rflags);
ret = qm_comp_resource(qmgr_ctx, THR_NO_LOCK);
SM_ASSERT(100 >= qmgr_ctx->qmgr_total_usage);
QS_UNTHROTTLE("qss_control");
}
}
if (thr_unl_always(locktype))
{
r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex);
SM_ASSERT(0 == r);
if (r != 0 && sm_is_success(ret))
ret = sm_error_perm(SM_EM_Q, r);
}
return ret;
}
/*
** QSS_UNTHROTTLE -- unthrottle (single) SMTPS as needed
**
** Parameters:
** qss_ctx -- QMGR/SMTPS context
**
** Returns:
** usual sm_error code
**
** Called by: sm_smtps_wakeup()
**
** Locking:
** qss_ctx should be "locked", i.e., under control of the caller.
** qmgr_ctx is locked here (additional parameter in case a caller
** has a lock on qmgr_ctx?)
**
** Last code review: 2005-04-23 21:36:45
** Last code change:
*/
sm_ret_T
qss_unthrottle(qss_ctx_P qss_ctx)
{
sm_ret_T ret;
int r;
sm_evthr_task_P tsk;
qmgr_ctx_P qmgr_ctx;
SM_IS_QSS_CTX(qss_ctx);
tsk = qss_ctx->qss_com.rcbcom_tsk;
SM_IS_EVTHR_TSK(tsk);
qmgr_ctx = qss_ctx->qss_qmgr_ctx;
SM_IS_QMGR_CTX(qmgr_ctx);
ret = SM_SUCCESS;
r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex);
SM_LOCK_OK(r);
if (r != 0)
{
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_CONTROL, QM_LMOD_CONTROL,
SM_LOG_CRIT, 4,
"sev=CRIT, func=qss_unthrottle, lock=%d\n", r);
return sm_error_temp(SM_EM_Q, r);
}
QM_LEV_DPRINTFC(QDC_RSRC, 6, (QM_DEBFP, "func=qss_unthrottle, id=%d, max_cur_thrs=%u, max_thrs=%u, flags=%#x\n", qss_ctx->qss_id, qss_ctx->qss_max_cur_thrs, qss_ctx->qss_max_thrs, qmgr_ctx->qmgr_rflags));
if (qss_ctx->qss_status != QSS_ST_OK
&& qss_ctx->qss_max_cur_thrs < qss_ctx->qss_max_thrs
&& QMGR_RFL_IS_NO_RSR(qmgr_ctx))
{
ret = qm_comp_resource(qmgr_ctx, THR_NO_LOCK);
SM_ASSERT(100 >= qmgr_ctx->qmgr_total_usage);
QS_UNTHROTTLE("qss_unthrottle");
}
r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex);
SM_ASSERT(0 == r);
if (r != 0 && sm_is_success(ret))
ret = sm_error_perm(SM_EM_Q, r);
return ret;
}
syntax highlighted by Code2HTML, v. 0.9.1