/*
* Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers.
* All rights reserved.
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the sendmail distribution.
*/
#include "sm/generic.h"
SM_RCSID("@(#)$Id: rcbcomm.c,v 1.44 2006/12/29 01:21:31 ca Exp $")
#include "sm/error.h"
#include "sm/assert.h"
#include "sm/memops.h"
#include "sm/io.h"
#include "sm/rcb.h"
#include "sm/rcbl.h"
#include "sm/rcbcomm.h"
#include "sm/reccom.h"
#include "sm/qmgrcomm.h"
/*
** This uses some Q* constants; should it do that??
** Rename Q* constants and move them into different include file (rcb.h)??
*/
/*
** SM_RCBE_NEW_ENC -- create a new RCB entry, open it for encoding
**
** Parameters:
** prcbe -- pointer to RCB entry (output)
** minsz -- parameter "n" for sm_rcb_open_enc()
** See comments there!
** maxlen -- Maximum length of buf data, if 0 a system
** default will be used.
**
** Returns:
** usual sm_error code; ENOMEM, SM_E_RANGE, SM_E_OVFLW_NS
**
** Side Effects: none on error
**
** Last code review: 2005-03-22 17:46:33
** Last code change:
*/
sm_ret_T
sm_rcbe_new_enc(sm_rcbe_P *prcbe, int minsz, uint maxlen)
{
sm_rcbe_P rcbe;
sm_ret_T ret;
SM_REQUIRE(prcbe != NULL);
/* XXX really QSS_RC_MAXSZ? */
rcbe = sm_rcbe_new(NULL,
(minsz > QSS_RC_SZ) ? minsz : QSS_RC_SZ,
(0 == maxlen) ? QSS_RC_MAXSZ : maxlen);
if (NULL == rcbe)
return sm_error_temp(SM_EM_RECCOM, ENOMEM);
ret = sm_rcb_open_enc(&rcbe->rcbe_rcb, minsz);
if (sm_is_err(ret))
goto error;
*prcbe = rcbe;
return SM_SUCCESS;
error:
if (rcbe != NULL)
sm_rcbe_free(rcbe);
*prcbe = NULL;
return ret;
}
/*
** SM_RCBCOM_OPEN -- Open (initialize) RCB communication structure context
**
** Parameters:
** rcbcom_ctx -- RCB communication structure context
**
** Returns:
** usual sm_error code; ENOMEM, etc
*/
sm_ret_T
sm_rcbcom_open(rcbcom_ctx_P rcbcom_ctx)
{
int r;
sm_ret_T ret;
SM_REQUIRE(rcbcom_ctx != NULL);
r = pthread_mutex_init(&rcbcom_ctx->rcbcom_wrmutex,
SM_PTHREAD_MUTEXATTR);
if (r != 0) {
ret = sm_error_perm(SM_EM_RECCOM, r);
goto error;
}
rcbcom_ctx->rcbcom_rdrcb = sm_rcb_new(NULL, QSS_RC_SZ, QSS_RC_MAXSZ);
if (NULL == rcbcom_ctx->rcbcom_rdrcb)
goto err2;
rcbcom_ctx->rcbcom_wrrcbe = sm_rcbe_new(NULL, QSS_RC_SZ, QSS_RC_MAXSZ);
if (NULL == rcbcom_ctx->rcbcom_wrrcbe)
goto err2;
RCBL_INIT(&rcbcom_ctx->rcbcom_wrrcbl);
rcbcom_ctx->rcbcom_wr_len_cur = 0;
rcbcom_ctx->rcbcom_wr_len_max = UINT_MAX / 2;
return SM_SUCCESS;
err2:
ret = sm_error_temp(SM_EM_RECCOM, ENOMEM);
(void) pthread_mutex_destroy(&rcbcom_ctx->rcbcom_wrmutex);
error:
if (rcbcom_ctx->rcbcom_wrrcbe != NULL)
sm_rcbe_free(rcbcom_ctx->rcbcom_wrrcbe);
if (rcbcom_ctx->rcbcom_rdrcb != NULL)
sm_rcb_free(rcbcom_ctx->rcbcom_rdrcb);
return ret;
}
/*
** SM_COM_CLOSE -- Close a RCB communication structure
**
** Parameters:
** rcbcom_ctx -- RCB communication structure context
**
** Returns:
** SM_SUCCESS
*/
sm_ret_T
sm_rcbcom_close(rcbcom_ctx_P rcbcom_ctx)
{
sm_rcbe_P rcbe, rcbe_nxt;
if (NULL == rcbcom_ctx)
return SM_SUCCESS;
if (rcbcom_ctx->rcbcom_rdrcb != NULL)
sm_rcb_free(rcbcom_ctx->rcbcom_rdrcb);
if (rcbcom_ctx->rcbcom_wrrcbe != NULL)
sm_rcbe_free(rcbcom_ctx->rcbcom_wrrcbe);
for (rcbe = RCBL_FIRST(&rcbcom_ctx->rcbcom_wrrcbl); rcbe != NULL;
rcbe = rcbe_nxt)
{
rcbe_nxt = RCBL_NEXT(rcbe);
sm_rcbe_free(rcbe);
}
/* ugly... */
rcbcom_ctx->rcbcom_tsk = NULL;
rcbcom_ctx->rcbcom_wr_len_cur = 0;
RCBL_INIT(&rcbcom_ctx->rcbcom_wrrcbl);
(void) pthread_mutex_destroy(&rcbcom_ctx->rcbcom_wrmutex);
return SM_SUCCESS;
}
/*
** SM_RCBCOM_PREREP -- Prepare reply to some module:
** check whether read RCB is completely read,
** close read RCB from decoding, open it for receiving (always);
** Note: caller should not access rcbcom_rdrcb afterwards (if tsk != NULL)!
** This read RCB belongs to the (communication) task and will
** be reused as soon as the task is activated again.
** create a new RCB entry (prcbe);
** put task back in waitq (unless tsk is NULL).
**
** Parameters:
** rcbcom_ctx -- RCB communication structure context
** tsk -- evthr task
** prcbe -- (pointer to) RCB entry (output)
**
** Returns:
** usual sm_error code; SM_E_PR_ERR, etc
*/
sm_ret_T
sm_rcbcom_prerep(rcbcom_ctx_P rcbcom_ctx, sm_evthr_task_P tsk, sm_rcbe_P *prcbe)
{
sm_rcbe_P rcbe;
sm_ret_T ret;
SM_REQUIRE(prcbe != NULL);
rcbe = NULL;
*prcbe = NULL;
/* check for EOB */
if (!SM_RCB_ISEOB(rcbcom_ctx->rcbcom_rdrcb)) {
/* fixme: better error code */
ret = sm_error_perm(SM_EM_RECCOM, SM_E_PR_ERR);
goto error;
}
/* those two calls won't fail, nevertheless, this isn't really ok */
(void) sm_rcb_close_dec(rcbcom_ctx->rcbcom_rdrcb);
(void) sm_rcb_open_rcv(rcbcom_ctx->rcbcom_rdrcb);
ret = sm_rcbe_new_enc(&rcbe, -1, 0);
if (sm_is_err(ret)) {
/* Try "fallback" RCB if out of memory */
if (sm_error_value(ret) == ENOMEM && rcbcom_ctx->rcbcom_wrrcbe != NULL)
{
rcbe = rcbcom_ctx->rcbcom_wrrcbe;
rcbcom_ctx->rcbcom_wrrcbe = NULL;
}
else
goto error;
}
if (tsk != NULL) {
/* put task back in waitq now */
ret = evthr_waitq_app(tsk);
if (sm_is_err(ret))
goto error;
}
*prcbe = rcbe;
return SM_SUCCESS;
error:
if (rcbe != NULL)
sm_rcbe_free(rcbe);
(void) sm_rcb_close_decn(rcbcom_ctx->rcbcom_rdrcb);
(void) sm_rcb_open_rcvn(rcbcom_ctx->rcbcom_rdrcb);
return ret;
}
/*
** SM_RCBCOM_ENDREP -- close RCB, append it to rcbcom_ctx write list,
** enable WRite event for tsk unless notified is set
**
** Parameters:
** rcbcom_ctx -- RCB communication structure context
** tsk -- evthr task
** notified -- tsk already notified?
** prcbe -- (pointer to) RCB entry;
** will be set to NULL if appended to the write list
**
** Returns:
** usual sm_error code
**
** Side Effects: rcbe is appended to write list even if evthr_en_wr()
** fails (see above: prcbe).
**
** Implementation note: notified can be removed and tsk != NULL
** can be used instead.
**
** NOTE: do not use something like this in an application:
** rcbe = some_context->..._rbce;
** sm_rcbcom_endrep( ...., &rcbe)
** because this function "takes over" rcbe and sets it to NULL,
** hence it must NOT be free()d by the caller, which means that
** some_context->..._rbce must not be freed...
** Summary: use some_context->..._rbce directly in a call:
** sm_rcbcom_endrep( ...., &(some_context->..._rbce))
**
** Last code review:
** Last code change:
*/
sm_ret_T
sm_rcbcom_endrep(rcbcom_ctx_P rcbcom_ctx, sm_evthr_task_P tsk, bool notified, sm_rcbe_P *prcbe)
{
int r;
sm_ret_T ret;
sm_rcb_P rcb;
sm_rcbe_P rcbe;
RCBCOMM_LEV_DPRINTF(3, (RCBCOMM_DEBFP, "sev=DBG, func=sm_rcbcom_endrep, tsk=%p, notified=%d\n", tsk, notified));
SM_REQUIRE(prcbe != NULL && *prcbe != NULL);
rcbe = *prcbe;
rcb = &rcbe->rcbe_rcb;
ret = sm_rcb_close_enc(rcb);
if (sm_is_err(ret))
goto error;
ret = sm_rcb_open_snd(rcb);
if (sm_is_err(ret))
goto error;
r = pthread_mutex_lock(&rcbcom_ctx->rcbcom_wrmutex);
SM_LOCK_OK(r);
if (r != 0) {
ret = sm_error_perm(SM_EM_RECCOM, r);
goto error;
}
if (rcbcom_ctx->rcbcom_wr_len_cur < rcbcom_ctx->rcbcom_wr_len_max) {
/* append rcbe to sender list */
RCBL_APP(&rcbcom_ctx->rcbcom_wrrcbl, rcbe);
++rcbcom_ctx->rcbcom_wr_len_cur;
*prcbe = NULL;
}
else {
notified = true; /* just to avoid enabling write */
ret = sm_error_temp(SM_EM_RECCOM, SM_E_FULL);
}
r = pthread_mutex_unlock(&rcbcom_ctx->rcbcom_wrmutex);
SM_ASSERT(0 == r); /* r is checked below */
if (!notified && tsk != NULL) {
/* trigger send */
ret = evthr_en_wr(tsk);
if (sm_is_err(ret))
goto error;
}
if (r != 0 && sm_is_success(ret))
ret = sm_error_perm(SM_EM_RECCOM, r);
RCBCOMM_LEV_DPRINTF(1, (RCBCOMM_DEBFP, "sev=DBG, func=sm_rcbcom_endrep, status=successful, tsk=%p\n", tsk));
return ret;
error:
RCBCOMM_LEV_DPRINTF(1, (RCBCOMM_DEBFP, "sev=ERROR, func=sm_rcbcom_endrep, tsk=%p, ret=%x\n", tsk, ret));
return ret;
}
/*
** SM_RCBCOM2MOD -- Send the head of the write RCB list
** (must be opened for snd) to an external module
**
** Parameters:
** tsk -- evthr task
** rcbcom_ctx -- RCB communication structure context
**
** Returns:
** evthr return code, e.g., EVTHR_WAITQ
**
** Locking:
** The WRite RCB list is protected by mutex, but this
** function must not be called concurrently (it does not
** remove the first element immediately), which is
** guaranteed by the event thread system.
*/
sm_ret_T
sm_rcbcom2mod(sm_evthr_task_P tsk, rcbcom_ctx_P rcbcom_ctx)
{
int fd, r;
sm_ret_T ret, rv;
bool empty;
sm_rcbe_P rcbe;
sm_rcb_P rcb;
SM_IS_EVTHR_TSK(tsk);
RCBCOMM_LEV_DPRINTF(3, (RCBCOMM_DEBFP, "sev=DBG, func=sm_rcbcom2mod, tsk=%p\n", tsk));
r = pthread_mutex_lock(&rcbcom_ctx->rcbcom_wrmutex);
SM_LOCK_OK(r);
if (r != 0) {
ret = sm_error_perm(SM_EM_RECCOM, r);
goto error;
}
empty = RCBL_EMPTY(&rcbcom_ctx->rcbcom_wrrcbl);
rcbe = RCBL_FIRST(&rcbcom_ctx->rcbcom_wrrcbl);
r = pthread_mutex_unlock(&rcbcom_ctx->rcbcom_wrmutex);
SM_ASSERT(0 == r);
/* r isn't checked further; will fail on next iteration */
if (empty)
return EVTHR_WAITQ|evthr_r_no(EVTHR_EV_WR);
rv = EVTHR_WAITQ;
rcb = &rcbe->rcbe_rcb;
SM_IS_RCB(rcb);
fd = tsk->evthr_t_fd;
ret = sm_rcb_snd(fd, rcb);
RCBCOMM_LEV_DPRINTF(2, (RCBCOMM_DEBFP, "sev=DBG, func=sm_rcbcom2mod, sm_rcb_snd=%d, len=%d, tsk=%p, rcbe=%p\n", ret, rcb->sm_rcb_len, tsk, rcbe));
if (ret > 0)
return EVTHR_WAITQ;
else if (0 == ret) {
void *h_rcbe_ctx;
rcb_wr_cb_F h_rcbe_wr_cb;
ret = sm_rcb_close_snd(rcb);
r = pthread_mutex_lock(&rcbcom_ctx->rcbcom_wrmutex);
SM_LOCK_OK(r);
if (r != 0) {
ret = sm_error_perm(SM_EM_RECCOM, r);
goto error;
}
RCBL_REMOVE(&rcbcom_ctx->rcbcom_wrrcbl);
SM_ASSERT(rcbcom_ctx->rcbcom_wr_len_cur > 0);
--rcbcom_ctx->rcbcom_wr_len_cur;
empty = RCBL_EMPTY(&rcbcom_ctx->rcbcom_wrrcbl);
/* temporary variable to use outside the locked region */
h_rcbe_wr_cb = rcbe->rcbe_wr_cb;
h_rcbe_ctx = rcbe->rcbe_ctx;
rcbe->rcbe_wr_cb = NULL;
rcbe->rcbe_ctx = NULL;
r = pthread_mutex_unlock(&rcbcom_ctx->rcbcom_wrmutex);
SM_ASSERT(0 == r);
/* r isn't checked further; will fail on next iteration */
/* invoke callback if it exists */
if (h_rcbe_wr_cb != NULL)
(void) h_rcbe_wr_cb(h_rcbe_ctx);
/* XXX check return value? */
if (empty) {
RCBCOMM_LEV_DPRINTF(1, (RCBCOMM_DEBFP, "sev=DBG, func=sm_rcbcom2mod, status=queue_empty, tsk=%p\n", tsk));
rv |= evthr_r_no(EVTHR_EV_WR);
}
if (NULL == rcbcom_ctx->rcbcom_wrrcbe)
rcbcom_ctx->rcbcom_wrrcbe = rcbe;
else
sm_rcbe_free(rcbe);
}
else if (sm_is_err(ret)) {
RCBCOMM_LEV_DPRINTF(0, (RCBCOMM_DEBFP, "sev=ERR, func=sm_rcbcom2mod, where=done, tsk=%p, ret=%#x\n", tsk, ret));
rv = ret;
}
RCBCOMM_LEV_DPRINTF(1, (RCBCOMM_DEBFP, "sev=DBG, func=sm_rcbcom2mod, where=done, tsk=%p, rv=%x\n", tsk, rv));
return rv;
error:
return ret;
}
syntax highlighted by Code2HTML, v. 0.9.1