/*
 * Copyright (c) 2003-2005 Sendmail, Inc. and its suppliers.
 *	All rights reserved.
 *
 * By using this file, you agree to the terms and conditions set
 * forth in the LICENSE file which can be found at the top level of
 * the sendmail distribution.
 */

#include "sm/generic.h"
SM_RCSID("@(#)$Id: rcb.c,v 1.15 2007/03/14 16:08:00 ca Exp $")
#include "sm/error.h"
#include "sm/assert.h"
#include "sm/io.h"
#include "sm/rcb.h"
#include "sm/qmgr.h"
#include "sm/qmgr-int.h"
#include "qmgr.h"

/*
*/

/*
**  QM_RCBE_NEW -- create a new RCB entry, open it for encoding
**  XXX similar to sm_rcbe_new()
**
**	Parameters:
**		qmgr_ctx -- QMGR context
**		prcbe -- pointer to RCB entry (output)
**		minsz -- minimum size parameter for rcb_open_enc()
**
**	Returns:
**		usual sm_error code
**
**	Side Effects: none (transactional behavior)
**
**	Locking:
**		qmgr_ctx will be locked here.
**		XXX parameter to decide whether caller has lock already?
**
**	Last code review:
*/

sm_ret_T
qm_rcbe_new(qmgr_ctx_P qmgr_ctx, sm_rcbe_P *prcbe, int minsz)
{
	sm_ret_T ret;
	int r;
	sm_rcbe_P rcbe;

	SM_REQUIRE(prcbe != NULL);
	rcbe = NULL;

	r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex);
	SM_LOCK_OK(r);
	if (r == 0)
	{
		if (!RCBL_EMPTY(&qmgr_ctx->qmgr_rcbh))
		{
			rcbe = RCBL_FIRST(&qmgr_ctx->qmgr_rcbh);
			RCBL_REMOVE(&qmgr_ctx->qmgr_rcbh);
			SM_ASSERT(qmgr_ctx->qmgr_rcbn > 0);
			--qmgr_ctx->qmgr_rcbn;
		}
		r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex);
		if (r != 0)
			ret = sm_error_temp(SM_EM_Q, r);
	}

	if (rcbe == NULL)
		rcbe = sm_rcbe_new(NULL, QSS_RC_SZ, QSS_RC_MAXSZ);
	if (rcbe == NULL)
		return sm_error_temp(SM_EM_Q, ENOMEM);
	ret = sm_rcb_open_enc(&rcbe->rcbe_rcb, minsz);
	if (sm_is_err(ret))
		goto error;

	*prcbe = rcbe;
	return SM_SUCCESS;

  error:
	if (rcbe != NULL)
		sm_rcbe_free(rcbe);
	*prcbe = NULL;
	return ret;
}

/*
**  QM_RCBCOM_PREREP -- Prepare reply to some module:
**	check whether read RCB is completely read,
**	close read RCB from decoding, open it for receiving (always);
**	Note: caller should not access rcbcom_rdrcb afterwards (if tsk != NULL)!
**		This read RCB belongs to the (communication) task and will
**		be reused as soon as the task is activated again.
**	create a new RCB entry (prcbe);
**	put task back in waitq (unless tsk is NULL).
**
**	Parameters:
**		qmgr_ctx -- QMGR context
**		rcbcom_ctx -- RCB communication structure context
**		tsk -- evthr task
**		prcbe -- (pointer to) RCB entry (output)
**			if *prcbe is not NULL, then it is used as input too.
**
**	Returns:
**		usual sm_error code
**
**	Locking:
**		qmgr_ctx will be in qm_rcbe_new()
**		XXX parameter to decide whether caller has lock already?
*/

sm_ret_T
qm_rcbcom_prerep(qmgr_ctx_P qmgr_ctx, rcbcom_ctx_P rcbcom_ctx, sm_evthr_task_P tsk, sm_rcbe_P *prcbe)
{
	sm_rcbe_P rcbe;
	sm_ret_T ret;

	SM_REQUIRE(prcbe != NULL);
	rcbe = NULL;
	*prcbe = NULL;
	ret = SM_SUCCESS;

	/* check for EOB */
	if (!SM_RCB_ISEOB(rcbcom_ctx->rcbcom_rdrcb))
	{
		ret = sm_error_perm(SM_EM_Q, SM_E_PR_ERR);

		/*
		**  Better error code?
		**  Fall through to make sure that the RCB is closed,
		**  ret is checked below.
		*/
	}

	/* those two calls won't fail, nevertheless, this isn't really ok */
	(void) sm_rcb_close_decn(rcbcom_ctx->rcbcom_rdrcb);
	(void) sm_rcb_open_rcvn(rcbcom_ctx->rcbcom_rdrcb);

	if (sm_is_err(ret))
		return ret;

	ret = qm_rcbe_new(qmgr_ctx, &rcbe, -1);
	if (sm_is_err(ret))
		goto error;

	if (tsk != NULL)
	{
		/* put task back in waitq now */
		ret = evthr_waitq_app(tsk);
		if (sm_is_err(ret))
			goto error;
	}

	*prcbe = rcbe;
	return SM_SUCCESS;

  error:
	if (rcbe != NULL)
		sm_rcbe_free(rcbe);
	return ret;
}

/*
**  QM_RCBE_PUT -- Put an RCB entry back into a list
**
**	Parameters:
**		qmgr_ctx -- QMGR context
**		rcbe -- RCB entry
**
**	Returns:
**		usual sm_error code
**
**	Locking:
**		qmgr_ctx will be locked here.
**		XXX parameter to decide whether caller has lock already?
*/

/* maximum number of entries in RCB list, make this configurable XXX */
#define QMGR_RCBN_THRESHOLD	8

sm_ret_T
qm_rcbe_put(qmgr_ctx_P qmgr_ctx, sm_rcbe_P rcbe)
{
	sm_ret_T ret;
	int r;

	if (rcbe == NULL)
		return SM_SUCCESS;
	ret = SM_SUCCESS;
	r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex);
	SM_LOCK_OK(r);
	if (r == 0)
	{
		if (qmgr_ctx->qmgr_rcbn > QMGR_RCBN_THRESHOLD)
		{
			sm_rcbe_free(rcbe);
		}
		else
		{
			RCBL_APP(&qmgr_ctx->qmgr_rcbh, rcbe);
			++qmgr_ctx->qmgr_rcbn;
		}
		r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex);
		if (r != 0)
		{
QM_LEV_DPRINTF(0, (QM_DEBFP, "sev=ERROR, func=qm_rcbe_put, lock=%d\n", r));
			ret = sm_error_temp(SM_EM_Q, r);
		}
	}
	else
	{
		/* Hmm, can't lock mutex? Wierd... XXX */
		sm_rcbe_free(rcbe);
QM_LEV_DPRINTF(0, (QM_DEBFP, "sev=ERROR, func=qm_rcbe_put, lock=%d\n", r));
		/* ret = sm_error_temp(SM_EM_Q, r); */
	}
	return ret;
}


syntax highlighted by Code2HTML, v. 0.9.1