/*
 * Copyright (c) 2005 Sendmail, Inc. and its suppliers.
 *	All rights reserved.
 *
 * By using this file, you agree to the terms and conditions set
 * forth in the LICENSE file which can be found at the top level of
 * the sendmail distribution.
 */

#include "sm/generic.h"
SM_RCSID("@(#)$Id: qm_throttle.c,v 1.9 2007/06/22 14:49:56 ca Exp $")
#include "sm/error.h"
#include "sm/assert.h"
#include "sm/io.h"
#include "sm/rcb.h"
#include "sm/qmgr.h"
#include "sm/qmgr-int.h"
#include "sm/reccom.h"
#include "qmgr.h"
#include "qm_throttle.h"
#include "log.h"

/*
**  (Un)throttle all SMTP servers.
**  These functions apply to all SMTP servers.
**  Note: this is almost the same as qss_control.c! It's basically just
**  a loop around the corresponding code in qss_control/qss_unthrottle.
**  The main difference is that loop invariants are extracted such that
**  they are not computed each time.
**
**  NOTE: qss_ctx is not locked... but the load relevant data
**  is indirectly protected by qmgr_mutex.
*/

/*
**  QM_CONTROL -- (un)throttle SMTPS as needed
**
**	Parameters:
**		qmgr_ctx -- QMGR context
**		direction -- up/down/any direction
**		use -- fullness of cache/DB/resource (0..100)
**		resource -- which resource?
**		locktype -- kind of locking
**
**	Returns:
**		usual sm_error code
**
**	Called by:
**
**	Locking:
**		locks qmgr_ctx if requested
**
**	Last code review: 2005-04-22 00:12:28
**	Last code change:
*/

sm_ret_T
qm_control(qmgr_ctx_P qmgr_ctx, int direction, uint use, uint resource, thr_lock_T locktype)
{
	sm_ret_T ret;
	int i, r;
	uint8_t	j;
	sm_evthr_task_P tsk;
	qss_ctx_P qss_ctx;

	SM_IS_QMGR_CTX(qmgr_ctx);

	SM_REQUIRE(resource <= QMGR_RFL_LAST_I);
	ret = SM_SUCCESS;

	QM_LEV_DPRINTFC(QDC_RSRC, 6, (QM_DEBFP, "func=qm_control, dir=%d, use=%u, resource=%d, flags=%#x, upper=%d, lower=%d, no_rsr=%d\n", direction, use, resource, qmgr_ctx->qmgr_rflags, qmgr_ctx->qmgr_upper[resource], qmgr_ctx->qmgr_lower[resource], QMGR_RFL_IS_NO_RSR(qmgr_ctx)));

	if (thr_lock_it(locktype)) {
		r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex);
		SM_LOCK_OK(r);
		if (r != 0) {
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_CONTROL, QM_LMOD_CONTROL,
				SM_LOG_CRIT, 4,
				"sev=CRIT, func=qm_control, lock=%d\n", r);
			return sm_error_temp(SM_EM_Q, r);
		}
	}

	if (qmgr_is_throttle(direction) && use > qmgr_ctx->qmgr_upper[resource]) {
		qmgr_ctx->qmgr_usage[resource] = use;
		QMGR_SET_USE_MAX(resource);
		QMGR_SET_RFLAG_I(qmgr_ctx, resource);
		ret = qm_comp_resource(qmgr_ctx, THR_NO_LOCK);
		SM_ASSERT(100 >= qmgr_ctx->qmgr_total_usage);
		for (j = 1, i = 0; i < QM_N_SS_GLI(qmgr_ctx); i++, j *= 2) {
			bool logged;

			if ((qmgr_ctx->qmgr_ss_li.qm_gli_used & j) == 0)
				continue;
			qss_ctx = qmgr_li_ss(qmgr_ctx, i);
			if (qss_ctx->qss_max_cur_thrs <= 0)
				continue;
			tsk = qss_ctx->qss_com.rcbcom_tsk;
			SM_IS_EVTHR_TSK(tsk);
			QM_LEV_DPRINTFC(QDC_RSRC, 2, (QM_DEBFP, "func=qm_control, throttle, use=%u, max_cur_thrs=%u, max_thrs=%u, resource=%d, flags=%#x\n", use, qss_ctx->qss_max_cur_thrs, qss_ctx->qss_max_thrs, resource, qmgr_ctx->qmgr_rflags));
			logged = sm_log_wouldlog(qmgr_ctx->qmgr_lctx,
				QM_LCAT_CONTROL, QM_LMOD_CONTROL, 10);
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_CONTROL, QM_LMOD_CONTROL,
				SM_LOG_INFO, 10,
				"sev=INFO, func=qm_control, status=throttle, use=%u, max_cur_thrs=%u, max_thrs=%u, resource=%d, flags=%#x"
				, use, qss_ctx->qss_max_cur_thrs
				, qss_ctx->qss_max_thrs, resource
				, qmgr_ctx->qmgr_rflags);
			if (use == QMGR_R_USE_FULL)
				qss_ctx->qss_max_cur_thrs = 0;
			else if (qss_ctx->qss_max_cur_thrs >
				 qss_ctx->qss_cur_session)
				qss_ctx->qss_max_cur_thrs =
					qss_ctx->qss_cur_session;
			else
				qss_ctx->qss_max_cur_thrs >>= 1;
			ret = q2s_throttle(qss_ctx, tsk, qss_ctx->qss_max_cur_thrs);
			if (0 == qss_ctx->qss_max_cur_thrs && !logged) {
				sm_log_write(qmgr_ctx->qmgr_lctx,
					QM_LCAT_CONTROL, QM_LMOD_CONTROL,
					SM_LOG_WARN, 2,
					"sev=WARN, func=qm_control, status=throttle, use=%u, max_cur_thrs=%u, max_thrs=%u, resource=%d, flags=%#x"
					, use, qss_ctx->qss_max_cur_thrs
					, qss_ctx->qss_max_thrs, resource
					, qmgr_ctx->qmgr_rflags);
			}
			qss_ctx->qss_status = QSS_ST_SLOW_0;
		}
	}
	else if (qmgr_is_un_throttle(direction)
		 && use < qmgr_ctx->qmgr_lower[resource])
	{
		qmgr_ctx->qmgr_usage[resource] = use;
		QMGR_CLR_RFLAG_I(qmgr_ctx, resource);
		ret = qm_comp_resource(qmgr_ctx, THR_NO_LOCK);
		SM_ASSERT(100 >= qmgr_ctx->qmgr_total_usage);
		for (j = 1, i = 0;
		     QMGR_RFL_IS_NO_RSR(qmgr_ctx) && i < QM_N_SS_GLI(qmgr_ctx);
		     i++, j *= 2)
		{
			if ((qmgr_ctx->qmgr_ss_li.qm_gli_used & j) == 0)
				continue;
			qss_ctx = qmgr_li_ss(qmgr_ctx, i);
			if (qss_ctx->qss_status == QSS_ST_OK
			    || qss_ctx->qss_max_cur_thrs >= qss_ctx->qss_max_thrs)
				continue;
			tsk = qss_ctx->qss_com.rcbcom_tsk;
			SM_IS_EVTHR_TSK(tsk);
			QM_LEV_DPRINTFC(QDC_RSRC, 2, (QM_DEBFP, "func=qm_control, status=unthrottle, use=%u, max_cur_thrs=%u, max_thrs=%u, resource=%d, flags=%#x\n", use, qss_ctx->qss_max_cur_thrs, qss_ctx->qss_max_thrs, resource, qmgr_ctx->qmgr_rflags));
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_SCHED, QM_LMOD_DEFEDB,
				SM_LOG_INFO, 9,
				"sev=INFO, func=qm_control, status=unthrottle, use=%u, max_cur_thrs=%u, max_thrs=%u, resource=%d, flags=%#x"
				, use, qss_ctx->qss_max_cur_thrs
				, qss_ctx->qss_max_thrs, resource
				, qmgr_ctx->qmgr_rflags);
			QS_UNTHROTTLE("qm_control");
		}
	}
	if (thr_unl_always(locktype)) {
		r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex);
		SM_ASSERT(r == 0);
		if (r != 0 && sm_is_success(ret))
			ret = sm_error_perm(SM_EM_Q, r);
	}
	return ret;
}

/*
**  QM_UNTHROTTLE -- unthrottle SMTPS as needed
**
**	Parameters:
**		qmgr_ctx -- QMGR context
**		locktype -- kind of locking
**
**	Returns:
**		usual sm_error code
**
**	Locking:
**		locks qmgr_ctx if requested
**
**	Last code review: 2005-04-22 00:13:02
**	Last code change:
*/

sm_ret_T
qm_unthrottle(qmgr_ctx_P qmgr_ctx, thr_lock_T locktype)
{
	sm_ret_T ret;
	int i, r;
	uint8_t	j;
	sm_evthr_task_P tsk;
	qss_ctx_P qss_ctx;

	SM_IS_QMGR_CTX(qmgr_ctx);
	ret = SM_SUCCESS;

	if (thr_lock_it(locktype)) {
		r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex);
		SM_LOCK_OK(r);
		if (r != 0) {
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_CONTROL, QM_LMOD_CONTROL,
				SM_LOG_CRIT, 4,
				"sev=CRIT, func=qm_unthrottle, lock=%d\n", r);
			return sm_error_temp(SM_EM_Q, r);
		}
	}

	QM_LEV_DPRINTFC(QDC_RSRC, 6, (QM_DEBFP, "func=qm_unthrottle, flags=%#x\n", qmgr_ctx->qmgr_rflags));

	for (j = 1, i = 0; i < QM_N_SS_GLI(qmgr_ctx); i++, j *= 2) {
		if ((qmgr_ctx->qmgr_ss_li.qm_gli_used & j) == 0)
			continue;
		qss_ctx = qmgr_li_ss(qmgr_ctx, i);
		if (qss_ctx->qss_status != QSS_ST_OK
		    && qss_ctx->qss_max_cur_thrs < qss_ctx->qss_max_thrs
		    && QMGR_RFL_IS_NO_RSR(qmgr_ctx))
		{
			tsk = qss_ctx->qss_com.rcbcom_tsk;
			SM_IS_EVTHR_TSK(tsk);
			QS_UNTHROTTLE("qss_unthrottle");
		}
	}

	if (thr_unl_always(locktype)) {
		r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex);
		SM_ASSERT(r == 0);
		if (r != 0 && sm_is_success(ret))
			ret = sm_error_perm(SM_EM_Q, r);
	}
	return ret;
}


syntax highlighted by Code2HTML, v. 0.9.1