/*
 * Copyright (c) 2002-2005 Sendmail, Inc. and its suppliers.
 *	All rights reserved.
 *
 * By using this file, you agree to the terms and conditions set
 * forth in the LICENSE file which can be found at the top level of
 * the sendmail distribution.
 */

#include "sm/generic.h"
SM_RCSID("@(#)$Id: qss_control.c,v 1.37 2007/06/22 14:49:56 ca Exp $")
#include "sm/error.h"
#include "sm/assert.h"
#include "sm/io.h"
#include "sm/rcb.h"
#include "sm/qmgr.h"
#include "sm/qmgr-int.h"
#include "sm/reccom.h"
#include "qmgr.h"
#include "qm_throttle.h"
#include "log.h"

/*
**  XXX Note: these functions apply only to a single SMTP server. qss_control()
**  and qss_unthrottle() should probably apply to all available servers.
*/

/*
**  Q2S_THROTTLE -- Tell (single) SMTPS to (un)throttle
**
**	Parameters:
**		qss_ctx -- QMGR/SMTPS context
**		tsk -- evthr task
**		nthreads -- number of threads for SMTPS
**
**	Returns:
**		usual sm_error code
**
**	Locking:
**		qss_ctx should be "locked", i.e., under control of the caller.
**
**	Last code review: 2005-04-22 00:11:44
**	Last code change:
*/

sm_ret_T
q2s_throttle(qss_ctx_P qss_ctx, sm_evthr_task_P tsk, uint nthreads)
{
	sm_rcb_P rcb;
	sm_rcbe_P rcbe;
	sm_ret_T ret;

	SM_IS_QSS_CTX(qss_ctx);
	SM_IS_EVTHR_TSK(tsk);
	ret = sm_rcbe_new_enc(&rcbe, -1, 0);
	if (sm_is_err(ret))
		goto error;
	rcb = &rcbe->rcbe_rcb;
	ret = sm_rcb_putv(rcb, RCB_PUTV_FIRST,
		SM_RCBV_INT, RT_PROT_VER, PROT_VER_RT,
		SM_RCBV_INT, RT_Q2S_ID, qss_ctx->qss_id,
		SM_RCBV_INT, RT_Q2S_THRDS, (uint32_t) nthreads,
		SM_RCBV_END);
	if (sm_is_err(ret))
		goto error;
	ret = sm_rcbcom_endrep(&qss_ctx->qss_com, tsk, false, &rcbe);
	if (sm_is_err(ret))
		goto error;
	return SM_SUCCESS;

  error:
	/* cleanup? */
	if (rcbe != NULL)
		sm_rcbe_free(rcbe);
	QM_LEV_DPRINTFC(QDC_RSRC, 0, (QM_DEBFP, "sev=ERROR, func=q2s_throttle, ret=%r\n", ret));
	return ret;
}

/*
**  QSS_CONTROL -- (un)throttle (single) SMTPS as needed
**
**	Parameters:
**		qss_ctx -- QMGR/SMTPS context
**		direction -- up/down/any direction
**		use -- fullness of cache/DB/resource (0..100)
**		resource -- which resource?
**		locktype -- kind of locking
**
**	Returns:
**		usual sm_error code
**
**	Called by:
**		qss_comp_control()
**		sm_q_ctaid()
**		sm_q_nseid()
**		sm_q_rcptid()
**		qm_fr_ss_react()
**
**	Locking:
**		qss_ctx should be "locked", i.e., under control of the caller.
**		qmgr_ctx is locked here (additional parameter in case a caller
**			has a lock on qmgr_ctx?)
**
**	Last code review: 2005-04-23 21:31:00
**	Last code change:
*/

sm_ret_T
qss_control(qss_ctx_P qss_ctx, int direction, uint use, uint resource, thr_lock_T locktype)
{
	sm_ret_T ret;
	int r;
	sm_evthr_task_P tsk;
	qmgr_ctx_P qmgr_ctx;

	SM_IS_QSS_CTX(qss_ctx);
	SM_REQUIRE(resource <= QMGR_RFL_LAST_I);
	tsk = qss_ctx->qss_com.rcbcom_tsk;
	SM_IS_EVTHR_TSK(tsk);
	qmgr_ctx = qss_ctx->qss_qmgr_ctx;
	SM_IS_QMGR_CTX(qmgr_ctx);
	ret = SM_SUCCESS;

	/*
	**  XXX This should be "smoother" and more fine-grained.
	**  The number of threads is set based on the overall usage, i.e.,
	**  if there's n% usage then there should be (100-n)% of maximum
	**  threads.
	**  However, it's not that simple: all resources are combined into
	**  a single value which however probably has no direct
	**  relationship to the number of allowed threads in SMTPS
	**  (when "unthrottling" SMTPS).
	**  Currently the number of threads is either set to the number
	**  of open sessions or simply divided by 2 every time the usage
	**  exceeds the upper resource limit.
	**
	**  Moreover, we should change the priority of tasks within the
	**  QMGR if that helps to overcome a resource shortage, e.g.,
	**  if SMTPS receives mail too fast and IQDB overflows, then
	**  SMAR and the scheduler need to run faster/more often.
	**  However, currently we don't have priorities for tasks...
	*/

	/*
	**  Throttle the system iff
	**  - asked for and
	**  - the use of the resource exceeds the upper threshold and
	**  - the current max.number of threads is greater than 0
	**
	**  else unthrottle the system iff
	**  - asked for and
	**  - the use of the resource is below the lower threshold and
	**  - the current max.number of threads is less the allowed max. and
	**  - the system is not in state OK (???) and
	**  - there is no resource shortage at all
	*/

	QM_LEV_DPRINTFC(QDC_RSRC, 6, (QM_DEBFP, "func=qss_control, id=%d, dir=%d, use=%u, max_cur_thrs=%u, max_thrs=%u, resource=%d, flags=%#x\n", qss_ctx->qss_id, direction, use, qss_ctx->qss_max_cur_thrs, qss_ctx->qss_max_thrs, resource, qmgr_ctx->qmgr_rflags));

	if (thr_lock_it(locktype))
	{
		r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex);
		SM_LOCK_OK(r);
		if (r != 0)
		{
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_CONTROL, QM_LMOD_CONTROL,
				SM_LOG_CRIT, 4,
				"sev=CRIT, func=qss_control, lock=%d\n", r);
			return sm_error_temp(SM_EM_Q, r);
		}
	}

	if (qmgr_is_throttle(direction)
	    && use > qmgr_ctx->qmgr_upper[resource]
	    && qss_ctx->qss_max_cur_thrs > 0)
	{
		bool change, logged;

		QM_LEV_DPRINTFC(QDC_RSRC, 2, (QM_DEBFP, "func=qss_control, id=%d, status=throttle, use=%u, max_cur_thrs=%u, max_thrs=%u, cur_se=%u, resource=%d, flags=%#x\n", qss_ctx->qss_id, use, qss_ctx->qss_max_cur_thrs, qss_ctx->qss_max_thrs, qss_ctx->qss_cur_session, resource, qmgr_ctx->qmgr_rflags));
		logged = sm_log_wouldlog(qmgr_ctx->qmgr_lctx,
			QM_LCAT_CONTROL, QM_LMOD_CONTROL, 10);
		sm_log_write(qmgr_ctx->qmgr_lctx,
			QM_LCAT_CONTROL, QM_LMOD_CONTROL,
			SM_LOG_INFO, 10,
			"sev=INFO, func=qss_control, id=%d, status=throttle, use=%u, max_cur_thrs=%u, max_thrs=%u, resource=%d, flags=%#x"
			, qss_ctx->qss_id, use, qss_ctx->qss_max_cur_thrs
			, qss_ctx->qss_max_thrs, resource
			, qmgr_ctx->qmgr_rflags);
		qmgr_ctx->qmgr_usage[resource] = use;
		QMGR_SET_USE_MAX(resource);
		change = true;
		if (use >= QMGR_R_USE_FULL)
			qss_ctx->qss_max_cur_thrs = 0;
		else if (qss_ctx->qss_max_cur_thrs > qss_ctx->qss_cur_session)
			qss_ctx->qss_max_cur_thrs = qss_ctx->qss_cur_session;
		else if (qss_ctx->qss_max_cur_thrs > 1)
			qss_ctx->qss_max_cur_thrs >>= 1;
		else
			change = false;
		if (change) {
			ret = q2s_throttle(qss_ctx, tsk, qss_ctx->qss_max_cur_thrs);
			if (0 == qss_ctx->qss_max_cur_thrs && !logged) {
				sm_log_write(qmgr_ctx->qmgr_lctx,
					QM_LCAT_CONTROL, QM_LMOD_CONTROL,
					SM_LOG_WARN, 2,
					"sev=WARN, func=qss_control, id=%d, status=throttle, use=%u, max_cur_thrs=%u, max_thrs=%u, resource=%d, flags=%#x"
					, qss_ctx->qss_id, use, qss_ctx->qss_max_cur_thrs
					, qss_ctx->qss_max_thrs, resource
					, qmgr_ctx->qmgr_rflags);
			}
		}
		qss_ctx->qss_status = QSS_ST_SLOW_0;
		QMGR_SET_RFLAG_I(qmgr_ctx, resource);
	}
	else if (qmgr_is_un_throttle(direction)
		 && use < qmgr_ctx->qmgr_lower[resource]
		 && qss_ctx->qss_status != QSS_ST_OK
		 && qss_ctx->qss_max_cur_thrs < qss_ctx->qss_max_thrs)
	{
		QM_LEV_DPRINTFC(QDC_RSRC, 2, (QM_DEBFP, "func=qss_control, id=%d, status=unthrottle, use=%u, max_cur_thrs=%u, max_thrs=%u, resource=%d, flags=%#x\n", qss_ctx->qss_id, use, qss_ctx->qss_max_cur_thrs, qss_ctx->qss_max_thrs, resource, qmgr_ctx->qmgr_rflags));
		qmgr_ctx->qmgr_usage[resource] = use;
		QMGR_CLR_RFLAG_I(qmgr_ctx, resource);
		if (QMGR_RFL_IS_NO_RSR(qmgr_ctx))
		{
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_CONTROL, QM_LMOD_CONTROL,
				SM_LOG_INFO, 9,
				"sev=INFO, func=qss_control, id=%d, status=unthrottle, use=%u, max_cur_thrs=%u, max_thrs=%u, resource=%d, flags=%#x"
				, qss_ctx->qss_id, use
				, qss_ctx->qss_max_cur_thrs
				, qss_ctx->qss_max_thrs, resource
				, qmgr_ctx->qmgr_rflags);
			ret = qm_comp_resource(qmgr_ctx, THR_NO_LOCK);
			SM_ASSERT(100 >= qmgr_ctx->qmgr_total_usage);
			QS_UNTHROTTLE("qss_control");
		}
	}
	if (thr_unl_always(locktype))
	{
		r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex);
		SM_ASSERT(0 == r);
		if (r != 0 && sm_is_success(ret))
			ret = sm_error_perm(SM_EM_Q, r);
	}
	return ret;
}

/*
**  QSS_UNTHROTTLE -- unthrottle (single) SMTPS as needed
**
**	Parameters:
**		qss_ctx -- QMGR/SMTPS context
**
**	Returns:
**		usual sm_error code
**
**	Called by: sm_smtps_wakeup()
**
**	Locking:
**		qss_ctx should be "locked", i.e., under control of the caller.
**		qmgr_ctx is locked here (additional parameter in case a caller
**			has a lock on qmgr_ctx?)
**
**	Last code review: 2005-04-23 21:36:45
**	Last code change:
*/

sm_ret_T
qss_unthrottle(qss_ctx_P qss_ctx)
{
	sm_ret_T ret;
	int r;
	sm_evthr_task_P tsk;
	qmgr_ctx_P qmgr_ctx;

	SM_IS_QSS_CTX(qss_ctx);
	tsk = qss_ctx->qss_com.rcbcom_tsk;
	SM_IS_EVTHR_TSK(tsk);
	qmgr_ctx = qss_ctx->qss_qmgr_ctx;
	SM_IS_QMGR_CTX(qmgr_ctx);
	ret = SM_SUCCESS;

	r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex);
	SM_LOCK_OK(r);
	if (r != 0)
	{
		sm_log_write(qmgr_ctx->qmgr_lctx,
			QM_LCAT_CONTROL, QM_LMOD_CONTROL,
			SM_LOG_CRIT, 4,
			"sev=CRIT, func=qss_unthrottle, lock=%d\n", r);
		return sm_error_temp(SM_EM_Q, r);
	}

	QM_LEV_DPRINTFC(QDC_RSRC, 6, (QM_DEBFP, "func=qss_unthrottle, id=%d, max_cur_thrs=%u, max_thrs=%u, flags=%#x\n", qss_ctx->qss_id, qss_ctx->qss_max_cur_thrs, qss_ctx->qss_max_thrs, qmgr_ctx->qmgr_rflags));

	if (qss_ctx->qss_status != QSS_ST_OK
	    && qss_ctx->qss_max_cur_thrs < qss_ctx->qss_max_thrs
	    && QMGR_RFL_IS_NO_RSR(qmgr_ctx))
	{
		ret = qm_comp_resource(qmgr_ctx, THR_NO_LOCK);
		SM_ASSERT(100 >= qmgr_ctx->qmgr_total_usage);
		QS_UNTHROTTLE("qss_unthrottle");
	}

	r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex);
	SM_ASSERT(0 == r);
	if (r != 0 && sm_is_success(ret))
		ret = sm_error_perm(SM_EM_Q, r);
	return ret;
}


syntax highlighted by Code2HTML, v. 0.9.1