/*
 * Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers.
 *	All rights reserved.
 *
 * By using this file, you agree to the terms and conditions set
 * forth in the LICENSE file which can be found at the top level of
 * the sendmail distribution.
 */

#include "sm/generic.h"
SM_RCSID("@(#)$Id: rcbcomm.c,v 1.44 2006/12/29 01:21:31 ca Exp $")
#include "sm/error.h"
#include "sm/assert.h"
#include "sm/memops.h"
#include "sm/io.h"
#include "sm/rcb.h"
#include "sm/rcbl.h"
#include "sm/rcbcomm.h"
#include "sm/reccom.h"
#include "sm/qmgrcomm.h"

/*
**  This uses some Q* constants; should it do that??
**  Rename Q* constants and move them into different include file (rcb.h)??
*/

/*
**  SM_RCBE_NEW_ENC -- create a new RCB entry, open it for encoding
**
**	Parameters:
**		prcbe -- pointer to RCB entry (output)
**		minsz -- parameter "n" for sm_rcb_open_enc()
**			See comments there!
**		maxlen -- Maximum length of buf data, if 0 a system
**			default will be used.
**
**	Returns:
**		usual sm_error code; ENOMEM, SM_E_RANGE, SM_E_OVFLW_NS
**
**	Side Effects: none on error
**
**	Last code review: 2005-03-22 17:46:33
**	Last code change:
*/

sm_ret_T
sm_rcbe_new_enc(sm_rcbe_P *prcbe, int minsz, uint maxlen)
{
	sm_rcbe_P rcbe;
	sm_ret_T ret;

	SM_REQUIRE(prcbe != NULL);

	/* XXX really QSS_RC_MAXSZ? */
	rcbe = sm_rcbe_new(NULL,
			(minsz > QSS_RC_SZ) ? minsz : QSS_RC_SZ,
			(0 == maxlen) ? QSS_RC_MAXSZ : maxlen);
	if (NULL == rcbe)
		return sm_error_temp(SM_EM_RECCOM, ENOMEM);
	ret = sm_rcb_open_enc(&rcbe->rcbe_rcb, minsz);
	if (sm_is_err(ret))
		goto error;

	*prcbe = rcbe;
	return SM_SUCCESS;

  error:
	if (rcbe != NULL)
		sm_rcbe_free(rcbe);
	*prcbe = NULL;
	return ret;
}

/*
**  SM_RCBCOM_OPEN -- Open (initialize) RCB communication structure context
**
**	Parameters:
**		rcbcom_ctx -- RCB communication structure context
**
**	Returns:
**		usual sm_error code; ENOMEM, etc
*/

sm_ret_T
sm_rcbcom_open(rcbcom_ctx_P rcbcom_ctx)
{
	int r;
	sm_ret_T ret;

	SM_REQUIRE(rcbcom_ctx != NULL);
	r = pthread_mutex_init(&rcbcom_ctx->rcbcom_wrmutex,
			SM_PTHREAD_MUTEXATTR);
	if (r != 0) {
		ret = sm_error_perm(SM_EM_RECCOM, r);
		goto error;
	}
	rcbcom_ctx->rcbcom_rdrcb = sm_rcb_new(NULL, QSS_RC_SZ, QSS_RC_MAXSZ);
	if (NULL == rcbcom_ctx->rcbcom_rdrcb)
		goto err2;
	rcbcom_ctx->rcbcom_wrrcbe = sm_rcbe_new(NULL, QSS_RC_SZ, QSS_RC_MAXSZ);
	if (NULL == rcbcom_ctx->rcbcom_wrrcbe)
		goto err2;
	RCBL_INIT(&rcbcom_ctx->rcbcom_wrrcbl);
	rcbcom_ctx->rcbcom_wr_len_cur = 0;
	rcbcom_ctx->rcbcom_wr_len_max = UINT_MAX / 2;
	return SM_SUCCESS;

  err2:
	ret = sm_error_temp(SM_EM_RECCOM, ENOMEM);
	(void) pthread_mutex_destroy(&rcbcom_ctx->rcbcom_wrmutex);
  error:
	if (rcbcom_ctx->rcbcom_wrrcbe != NULL)
		sm_rcbe_free(rcbcom_ctx->rcbcom_wrrcbe);
	if (rcbcom_ctx->rcbcom_rdrcb != NULL)
		sm_rcb_free(rcbcom_ctx->rcbcom_rdrcb);
	return ret;
}

/*
**  SM_COM_CLOSE -- Close a RCB communication structure
**
**	Parameters:
**		rcbcom_ctx -- RCB communication structure context
**
**	Returns:
**		SM_SUCCESS
*/

sm_ret_T
sm_rcbcom_close(rcbcom_ctx_P rcbcom_ctx)
{
	sm_rcbe_P rcbe, rcbe_nxt;

	if (NULL == rcbcom_ctx)
		return SM_SUCCESS;
	if (rcbcom_ctx->rcbcom_rdrcb != NULL)
		sm_rcb_free(rcbcom_ctx->rcbcom_rdrcb);
	if (rcbcom_ctx->rcbcom_wrrcbe != NULL)
		sm_rcbe_free(rcbcom_ctx->rcbcom_wrrcbe);
	for (rcbe = RCBL_FIRST(&rcbcom_ctx->rcbcom_wrrcbl); rcbe != NULL;
	     rcbe = rcbe_nxt)
	{
		rcbe_nxt = RCBL_NEXT(rcbe);
		sm_rcbe_free(rcbe);
	}

	/* ugly... */
	rcbcom_ctx->rcbcom_tsk = NULL;
	rcbcom_ctx->rcbcom_wr_len_cur = 0;
	RCBL_INIT(&rcbcom_ctx->rcbcom_wrrcbl);

	(void) pthread_mutex_destroy(&rcbcom_ctx->rcbcom_wrmutex);
	return SM_SUCCESS;
}

/*
**  SM_RCBCOM_PREREP -- Prepare reply to some module:
**	check whether read RCB is completely read,
**	close read RCB from decoding, open it for receiving (always);
**	Note: caller should not access rcbcom_rdrcb afterwards (if tsk != NULL)!
**		This read RCB belongs to the (communication) task and will
**		be reused as soon as the task is activated again.
**	create a new RCB entry (prcbe);
**	put task back in waitq (unless tsk is NULL).
**
**	Parameters:
**		rcbcom_ctx -- RCB communication structure context
**		tsk -- evthr task
**		prcbe -- (pointer to) RCB entry (output)
**
**	Returns:
**		usual sm_error code; SM_E_PR_ERR, etc
*/

sm_ret_T
sm_rcbcom_prerep(rcbcom_ctx_P rcbcom_ctx, sm_evthr_task_P tsk, sm_rcbe_P *prcbe)
{
	sm_rcbe_P rcbe;
	sm_ret_T ret;

	SM_REQUIRE(prcbe != NULL);
	rcbe = NULL;
	*prcbe = NULL;

	/* check for EOB */
	if (!SM_RCB_ISEOB(rcbcom_ctx->rcbcom_rdrcb)) {
		/* fixme: better error code */
		ret = sm_error_perm(SM_EM_RECCOM, SM_E_PR_ERR);
		goto error;
	}

	/* those two calls won't fail, nevertheless, this isn't really ok */
	(void) sm_rcb_close_dec(rcbcom_ctx->rcbcom_rdrcb);
	(void) sm_rcb_open_rcv(rcbcom_ctx->rcbcom_rdrcb);
	ret = sm_rcbe_new_enc(&rcbe, -1, 0);
	if (sm_is_err(ret)) {
		/* Try "fallback" RCB if out of memory */
		if (sm_error_value(ret) == ENOMEM && rcbcom_ctx->rcbcom_wrrcbe != NULL)
		{
			rcbe = rcbcom_ctx->rcbcom_wrrcbe;
			rcbcom_ctx->rcbcom_wrrcbe = NULL;
		}
		else
			goto error;
	}

	if (tsk != NULL) {
		/* put task back in waitq now */
		ret = evthr_waitq_app(tsk);
		if (sm_is_err(ret))
			goto error;
	}

	*prcbe = rcbe;
	return SM_SUCCESS;

  error:
	if (rcbe != NULL)
		sm_rcbe_free(rcbe);
	(void) sm_rcb_close_decn(rcbcom_ctx->rcbcom_rdrcb);
	(void) sm_rcb_open_rcvn(rcbcom_ctx->rcbcom_rdrcb);
	return ret;
}

/*
**  SM_RCBCOM_ENDREP -- close RCB, append it to rcbcom_ctx write list,
**		enable WRite event for tsk unless notified is set
**
**	Parameters:
**		rcbcom_ctx -- RCB communication structure context
**		tsk -- evthr task
**		notified -- tsk already notified?
**		prcbe -- (pointer to) RCB entry;
**			will be set to NULL if appended to the write list
**
**	Returns:
**		usual sm_error code
**
**	Side Effects: rcbe is appended to write list even if evthr_en_wr()
**		fails (see above: prcbe).
**
**	Implementation note: notified can be removed and tsk != NULL
**		can be used instead.
**
**	NOTE: do not use something like this in an application:
**		rcbe = some_context->..._rbce;
**		sm_rcbcom_endrep( ...., &rcbe)
**		because this function "takes over" rcbe and sets it to NULL,
**		hence it must NOT be free()d by the caller, which means that
**		some_context->..._rbce must not be freed...
**		Summary: use some_context->..._rbce directly in a call:
**		sm_rcbcom_endrep( ...., &(some_context->..._rbce))
**
**	Last code review:
**	Last code change:
*/

sm_ret_T
sm_rcbcom_endrep(rcbcom_ctx_P rcbcom_ctx, sm_evthr_task_P tsk, bool notified, sm_rcbe_P *prcbe)
{
	int r;
	sm_ret_T ret;
	sm_rcb_P rcb;
	sm_rcbe_P rcbe;

	RCBCOMM_LEV_DPRINTF(3, (RCBCOMM_DEBFP, "sev=DBG, func=sm_rcbcom_endrep, tsk=%p, notified=%d\n", tsk, notified));
	SM_REQUIRE(prcbe != NULL && *prcbe != NULL);
	rcbe = *prcbe;
	rcb = &rcbe->rcbe_rcb;
	ret = sm_rcb_close_enc(rcb);
	if (sm_is_err(ret))
		goto error;
	ret = sm_rcb_open_snd(rcb);
	if (sm_is_err(ret))
		goto error;

	r = pthread_mutex_lock(&rcbcom_ctx->rcbcom_wrmutex);
	SM_LOCK_OK(r);
	if (r != 0) {
		ret = sm_error_perm(SM_EM_RECCOM, r);
		goto error;
	}
	if (rcbcom_ctx->rcbcom_wr_len_cur < rcbcom_ctx->rcbcom_wr_len_max) {
		/* append rcbe to sender list */
		RCBL_APP(&rcbcom_ctx->rcbcom_wrrcbl, rcbe);
		++rcbcom_ctx->rcbcom_wr_len_cur;
		*prcbe = NULL;
	}
	else {
		notified = true;	/* just to avoid enabling write */
		ret = sm_error_temp(SM_EM_RECCOM, SM_E_FULL);
	}

	r = pthread_mutex_unlock(&rcbcom_ctx->rcbcom_wrmutex);
	SM_ASSERT(0 == r);	/* r is checked below */

	if (!notified && tsk != NULL) {
		/* trigger send */
		ret = evthr_en_wr(tsk);
		if (sm_is_err(ret))
			goto error;
	}
	if (r != 0 && sm_is_success(ret))
		ret = sm_error_perm(SM_EM_RECCOM, r);
	RCBCOMM_LEV_DPRINTF(1, (RCBCOMM_DEBFP, "sev=DBG, func=sm_rcbcom_endrep, status=successful, tsk=%p\n", tsk));
	return ret;

  error:
	RCBCOMM_LEV_DPRINTF(1, (RCBCOMM_DEBFP, "sev=ERROR, func=sm_rcbcom_endrep, tsk=%p, ret=%x\n", tsk, ret));
	return ret;
}

/*
**  SM_RCBCOM2MOD -- Send the head of the write RCB list
**	(must be opened for snd) to an external module
**
**	Parameters:
**		tsk -- evthr task
**		rcbcom_ctx -- RCB communication structure context
**
**	Returns:
**		evthr return code, e.g., EVTHR_WAITQ
**
**	Locking:
**		The WRite RCB list is protected by mutex, but this
**		function must not be called concurrently (it does not
**		remove the first element immediately), which is
**		guaranteed by the event thread system.
*/

sm_ret_T
sm_rcbcom2mod(sm_evthr_task_P tsk, rcbcom_ctx_P rcbcom_ctx)
{
	int fd, r;
	sm_ret_T ret, rv;
	bool empty;
	sm_rcbe_P rcbe;
	sm_rcb_P rcb;

	SM_IS_EVTHR_TSK(tsk);
	RCBCOMM_LEV_DPRINTF(3, (RCBCOMM_DEBFP, "sev=DBG, func=sm_rcbcom2mod, tsk=%p\n", tsk));

	r = pthread_mutex_lock(&rcbcom_ctx->rcbcom_wrmutex);
	SM_LOCK_OK(r);
	if (r != 0) {
		ret = sm_error_perm(SM_EM_RECCOM, r);
		goto error;
	}

	empty = RCBL_EMPTY(&rcbcom_ctx->rcbcom_wrrcbl);
	rcbe = RCBL_FIRST(&rcbcom_ctx->rcbcom_wrrcbl);
	r = pthread_mutex_unlock(&rcbcom_ctx->rcbcom_wrmutex);
	SM_ASSERT(0 == r);
	/* r isn't checked further; will fail on next iteration */

	if (empty)
		return EVTHR_WAITQ|evthr_r_no(EVTHR_EV_WR);

	rv = EVTHR_WAITQ;
	rcb = &rcbe->rcbe_rcb;
	SM_IS_RCB(rcb);
	fd = tsk->evthr_t_fd;
	ret = sm_rcb_snd(fd, rcb);
	RCBCOMM_LEV_DPRINTF(2, (RCBCOMM_DEBFP, "sev=DBG, func=sm_rcbcom2mod, sm_rcb_snd=%d, len=%d, tsk=%p, rcbe=%p\n", ret, rcb->sm_rcb_len, tsk, rcbe));
	if (ret > 0)
		return EVTHR_WAITQ;
	else if (0 == ret) {
		void *h_rcbe_ctx;
		rcb_wr_cb_F h_rcbe_wr_cb;

		ret = sm_rcb_close_snd(rcb);
		r = pthread_mutex_lock(&rcbcom_ctx->rcbcom_wrmutex);
		SM_LOCK_OK(r);
		if (r != 0) {
			ret = sm_error_perm(SM_EM_RECCOM, r);
			goto error;
		}
		RCBL_REMOVE(&rcbcom_ctx->rcbcom_wrrcbl);
		SM_ASSERT(rcbcom_ctx->rcbcom_wr_len_cur > 0);
		--rcbcom_ctx->rcbcom_wr_len_cur;
		empty = RCBL_EMPTY(&rcbcom_ctx->rcbcom_wrrcbl);

		/* temporary variable to use outside the locked region */
		h_rcbe_wr_cb = rcbe->rcbe_wr_cb;
		h_rcbe_ctx = rcbe->rcbe_ctx;
		rcbe->rcbe_wr_cb = NULL;
		rcbe->rcbe_ctx = NULL;

		r = pthread_mutex_unlock(&rcbcom_ctx->rcbcom_wrmutex);
		SM_ASSERT(0 == r);
		/* r isn't checked further; will fail on next iteration */

		/* invoke callback if it exists */
		if (h_rcbe_wr_cb != NULL)
			(void) h_rcbe_wr_cb(h_rcbe_ctx);
			/* XXX check return value? */

		if (empty) {
			RCBCOMM_LEV_DPRINTF(1, (RCBCOMM_DEBFP, "sev=DBG, func=sm_rcbcom2mod, status=queue_empty, tsk=%p\n", tsk));
			rv |= evthr_r_no(EVTHR_EV_WR);
		}
		if (NULL == rcbcom_ctx->rcbcom_wrrcbe)
			rcbcom_ctx->rcbcom_wrrcbe = rcbe;
		else
			sm_rcbe_free(rcbe);
	}
	else if (sm_is_err(ret)) {
		RCBCOMM_LEV_DPRINTF(0, (RCBCOMM_DEBFP, "sev=ERR, func=sm_rcbcom2mod, where=done, tsk=%p, ret=%#x\n", tsk, ret));
		rv = ret;
	}
	RCBCOMM_LEV_DPRINTF(1, (RCBCOMM_DEBFP, "sev=DBG, func=sm_rcbcom2mod, where=done, tsk=%p, rv=%x\n", tsk, rv));
	return rv;

  error:
	return ret;
}


syntax highlighted by Code2HTML, v. 0.9.1