/* * Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers. * All rights reserved. * * By using this file, you agree to the terms and conditions set * forth in the LICENSE file which can be found at the top level of * the sendmail distribution. */ #include "sm/generic.h" SM_RCSID("@(#)$Id: rcbcomm.c,v 1.44 2006/12/29 01:21:31 ca Exp $") #include "sm/error.h" #include "sm/assert.h" #include "sm/memops.h" #include "sm/io.h" #include "sm/rcb.h" #include "sm/rcbl.h" #include "sm/rcbcomm.h" #include "sm/reccom.h" #include "sm/qmgrcomm.h" /* ** This uses some Q* constants; should it do that?? ** Rename Q* constants and move them into different include file (rcb.h)?? */ /* ** SM_RCBE_NEW_ENC -- create a new RCB entry, open it for encoding ** ** Parameters: ** prcbe -- pointer to RCB entry (output) ** minsz -- parameter "n" for sm_rcb_open_enc() ** See comments there! ** maxlen -- Maximum length of buf data, if 0 a system ** default will be used. ** ** Returns: ** usual sm_error code; ENOMEM, SM_E_RANGE, SM_E_OVFLW_NS ** ** Side Effects: none on error ** ** Last code review: 2005-03-22 17:46:33 ** Last code change: */ sm_ret_T sm_rcbe_new_enc(sm_rcbe_P *prcbe, int minsz, uint maxlen) { sm_rcbe_P rcbe; sm_ret_T ret; SM_REQUIRE(prcbe != NULL); /* XXX really QSS_RC_MAXSZ? */ rcbe = sm_rcbe_new(NULL, (minsz > QSS_RC_SZ) ? minsz : QSS_RC_SZ, (0 == maxlen) ? QSS_RC_MAXSZ : maxlen); if (NULL == rcbe) return sm_error_temp(SM_EM_RECCOM, ENOMEM); ret = sm_rcb_open_enc(&rcbe->rcbe_rcb, minsz); if (sm_is_err(ret)) goto error; *prcbe = rcbe; return SM_SUCCESS; error: if (rcbe != NULL) sm_rcbe_free(rcbe); *prcbe = NULL; return ret; } /* ** SM_RCBCOM_OPEN -- Open (initialize) RCB communication structure context ** ** Parameters: ** rcbcom_ctx -- RCB communication structure context ** ** Returns: ** usual sm_error code; ENOMEM, etc */ sm_ret_T sm_rcbcom_open(rcbcom_ctx_P rcbcom_ctx) { int r; sm_ret_T ret; SM_REQUIRE(rcbcom_ctx != NULL); r = pthread_mutex_init(&rcbcom_ctx->rcbcom_wrmutex, SM_PTHREAD_MUTEXATTR); if (r != 0) { ret = sm_error_perm(SM_EM_RECCOM, r); goto error; } rcbcom_ctx->rcbcom_rdrcb = sm_rcb_new(NULL, QSS_RC_SZ, QSS_RC_MAXSZ); if (NULL == rcbcom_ctx->rcbcom_rdrcb) goto err2; rcbcom_ctx->rcbcom_wrrcbe = sm_rcbe_new(NULL, QSS_RC_SZ, QSS_RC_MAXSZ); if (NULL == rcbcom_ctx->rcbcom_wrrcbe) goto err2; RCBL_INIT(&rcbcom_ctx->rcbcom_wrrcbl); rcbcom_ctx->rcbcom_wr_len_cur = 0; rcbcom_ctx->rcbcom_wr_len_max = UINT_MAX / 2; return SM_SUCCESS; err2: ret = sm_error_temp(SM_EM_RECCOM, ENOMEM); (void) pthread_mutex_destroy(&rcbcom_ctx->rcbcom_wrmutex); error: if (rcbcom_ctx->rcbcom_wrrcbe != NULL) sm_rcbe_free(rcbcom_ctx->rcbcom_wrrcbe); if (rcbcom_ctx->rcbcom_rdrcb != NULL) sm_rcb_free(rcbcom_ctx->rcbcom_rdrcb); return ret; } /* ** SM_COM_CLOSE -- Close a RCB communication structure ** ** Parameters: ** rcbcom_ctx -- RCB communication structure context ** ** Returns: ** SM_SUCCESS */ sm_ret_T sm_rcbcom_close(rcbcom_ctx_P rcbcom_ctx) { sm_rcbe_P rcbe, rcbe_nxt; if (NULL == rcbcom_ctx) return SM_SUCCESS; if (rcbcom_ctx->rcbcom_rdrcb != NULL) sm_rcb_free(rcbcom_ctx->rcbcom_rdrcb); if (rcbcom_ctx->rcbcom_wrrcbe != NULL) sm_rcbe_free(rcbcom_ctx->rcbcom_wrrcbe); for (rcbe = RCBL_FIRST(&rcbcom_ctx->rcbcom_wrrcbl); rcbe != NULL; rcbe = rcbe_nxt) { rcbe_nxt = RCBL_NEXT(rcbe); sm_rcbe_free(rcbe); } /* ugly... */ rcbcom_ctx->rcbcom_tsk = NULL; rcbcom_ctx->rcbcom_wr_len_cur = 0; RCBL_INIT(&rcbcom_ctx->rcbcom_wrrcbl); (void) pthread_mutex_destroy(&rcbcom_ctx->rcbcom_wrmutex); return SM_SUCCESS; } /* ** SM_RCBCOM_PREREP -- Prepare reply to some module: ** check whether read RCB is completely read, ** close read RCB from decoding, open it for receiving (always); ** Note: caller should not access rcbcom_rdrcb afterwards (if tsk != NULL)! ** This read RCB belongs to the (communication) task and will ** be reused as soon as the task is activated again. ** create a new RCB entry (prcbe); ** put task back in waitq (unless tsk is NULL). ** ** Parameters: ** rcbcom_ctx -- RCB communication structure context ** tsk -- evthr task ** prcbe -- (pointer to) RCB entry (output) ** ** Returns: ** usual sm_error code; SM_E_PR_ERR, etc */ sm_ret_T sm_rcbcom_prerep(rcbcom_ctx_P rcbcom_ctx, sm_evthr_task_P tsk, sm_rcbe_P *prcbe) { sm_rcbe_P rcbe; sm_ret_T ret; SM_REQUIRE(prcbe != NULL); rcbe = NULL; *prcbe = NULL; /* check for EOB */ if (!SM_RCB_ISEOB(rcbcom_ctx->rcbcom_rdrcb)) { /* fixme: better error code */ ret = sm_error_perm(SM_EM_RECCOM, SM_E_PR_ERR); goto error; } /* those two calls won't fail, nevertheless, this isn't really ok */ (void) sm_rcb_close_dec(rcbcom_ctx->rcbcom_rdrcb); (void) sm_rcb_open_rcv(rcbcom_ctx->rcbcom_rdrcb); ret = sm_rcbe_new_enc(&rcbe, -1, 0); if (sm_is_err(ret)) { /* Try "fallback" RCB if out of memory */ if (sm_error_value(ret) == ENOMEM && rcbcom_ctx->rcbcom_wrrcbe != NULL) { rcbe = rcbcom_ctx->rcbcom_wrrcbe; rcbcom_ctx->rcbcom_wrrcbe = NULL; } else goto error; } if (tsk != NULL) { /* put task back in waitq now */ ret = evthr_waitq_app(tsk); if (sm_is_err(ret)) goto error; } *prcbe = rcbe; return SM_SUCCESS; error: if (rcbe != NULL) sm_rcbe_free(rcbe); (void) sm_rcb_close_decn(rcbcom_ctx->rcbcom_rdrcb); (void) sm_rcb_open_rcvn(rcbcom_ctx->rcbcom_rdrcb); return ret; } /* ** SM_RCBCOM_ENDREP -- close RCB, append it to rcbcom_ctx write list, ** enable WRite event for tsk unless notified is set ** ** Parameters: ** rcbcom_ctx -- RCB communication structure context ** tsk -- evthr task ** notified -- tsk already notified? ** prcbe -- (pointer to) RCB entry; ** will be set to NULL if appended to the write list ** ** Returns: ** usual sm_error code ** ** Side Effects: rcbe is appended to write list even if evthr_en_wr() ** fails (see above: prcbe). ** ** Implementation note: notified can be removed and tsk != NULL ** can be used instead. ** ** NOTE: do not use something like this in an application: ** rcbe = some_context->..._rbce; ** sm_rcbcom_endrep( ...., &rcbe) ** because this function "takes over" rcbe and sets it to NULL, ** hence it must NOT be free()d by the caller, which means that ** some_context->..._rbce must not be freed... ** Summary: use some_context->..._rbce directly in a call: ** sm_rcbcom_endrep( ...., &(some_context->..._rbce)) ** ** Last code review: ** Last code change: */ sm_ret_T sm_rcbcom_endrep(rcbcom_ctx_P rcbcom_ctx, sm_evthr_task_P tsk, bool notified, sm_rcbe_P *prcbe) { int r; sm_ret_T ret; sm_rcb_P rcb; sm_rcbe_P rcbe; RCBCOMM_LEV_DPRINTF(3, (RCBCOMM_DEBFP, "sev=DBG, func=sm_rcbcom_endrep, tsk=%p, notified=%d\n", tsk, notified)); SM_REQUIRE(prcbe != NULL && *prcbe != NULL); rcbe = *prcbe; rcb = &rcbe->rcbe_rcb; ret = sm_rcb_close_enc(rcb); if (sm_is_err(ret)) goto error; ret = sm_rcb_open_snd(rcb); if (sm_is_err(ret)) goto error; r = pthread_mutex_lock(&rcbcom_ctx->rcbcom_wrmutex); SM_LOCK_OK(r); if (r != 0) { ret = sm_error_perm(SM_EM_RECCOM, r); goto error; } if (rcbcom_ctx->rcbcom_wr_len_cur < rcbcom_ctx->rcbcom_wr_len_max) { /* append rcbe to sender list */ RCBL_APP(&rcbcom_ctx->rcbcom_wrrcbl, rcbe); ++rcbcom_ctx->rcbcom_wr_len_cur; *prcbe = NULL; } else { notified = true; /* just to avoid enabling write */ ret = sm_error_temp(SM_EM_RECCOM, SM_E_FULL); } r = pthread_mutex_unlock(&rcbcom_ctx->rcbcom_wrmutex); SM_ASSERT(0 == r); /* r is checked below */ if (!notified && tsk != NULL) { /* trigger send */ ret = evthr_en_wr(tsk); if (sm_is_err(ret)) goto error; } if (r != 0 && sm_is_success(ret)) ret = sm_error_perm(SM_EM_RECCOM, r); RCBCOMM_LEV_DPRINTF(1, (RCBCOMM_DEBFP, "sev=DBG, func=sm_rcbcom_endrep, status=successful, tsk=%p\n", tsk)); return ret; error: RCBCOMM_LEV_DPRINTF(1, (RCBCOMM_DEBFP, "sev=ERROR, func=sm_rcbcom_endrep, tsk=%p, ret=%x\n", tsk, ret)); return ret; } /* ** SM_RCBCOM2MOD -- Send the head of the write RCB list ** (must be opened for snd) to an external module ** ** Parameters: ** tsk -- evthr task ** rcbcom_ctx -- RCB communication structure context ** ** Returns: ** evthr return code, e.g., EVTHR_WAITQ ** ** Locking: ** The WRite RCB list is protected by mutex, but this ** function must not be called concurrently (it does not ** remove the first element immediately), which is ** guaranteed by the event thread system. */ sm_ret_T sm_rcbcom2mod(sm_evthr_task_P tsk, rcbcom_ctx_P rcbcom_ctx) { int fd, r; sm_ret_T ret, rv; bool empty; sm_rcbe_P rcbe; sm_rcb_P rcb; SM_IS_EVTHR_TSK(tsk); RCBCOMM_LEV_DPRINTF(3, (RCBCOMM_DEBFP, "sev=DBG, func=sm_rcbcom2mod, tsk=%p\n", tsk)); r = pthread_mutex_lock(&rcbcom_ctx->rcbcom_wrmutex); SM_LOCK_OK(r); if (r != 0) { ret = sm_error_perm(SM_EM_RECCOM, r); goto error; } empty = RCBL_EMPTY(&rcbcom_ctx->rcbcom_wrrcbl); rcbe = RCBL_FIRST(&rcbcom_ctx->rcbcom_wrrcbl); r = pthread_mutex_unlock(&rcbcom_ctx->rcbcom_wrmutex); SM_ASSERT(0 == r); /* r isn't checked further; will fail on next iteration */ if (empty) return EVTHR_WAITQ|evthr_r_no(EVTHR_EV_WR); rv = EVTHR_WAITQ; rcb = &rcbe->rcbe_rcb; SM_IS_RCB(rcb); fd = tsk->evthr_t_fd; ret = sm_rcb_snd(fd, rcb); RCBCOMM_LEV_DPRINTF(2, (RCBCOMM_DEBFP, "sev=DBG, func=sm_rcbcom2mod, sm_rcb_snd=%d, len=%d, tsk=%p, rcbe=%p\n", ret, rcb->sm_rcb_len, tsk, rcbe)); if (ret > 0) return EVTHR_WAITQ; else if (0 == ret) { void *h_rcbe_ctx; rcb_wr_cb_F h_rcbe_wr_cb; ret = sm_rcb_close_snd(rcb); r = pthread_mutex_lock(&rcbcom_ctx->rcbcom_wrmutex); SM_LOCK_OK(r); if (r != 0) { ret = sm_error_perm(SM_EM_RECCOM, r); goto error; } RCBL_REMOVE(&rcbcom_ctx->rcbcom_wrrcbl); SM_ASSERT(rcbcom_ctx->rcbcom_wr_len_cur > 0); --rcbcom_ctx->rcbcom_wr_len_cur; empty = RCBL_EMPTY(&rcbcom_ctx->rcbcom_wrrcbl); /* temporary variable to use outside the locked region */ h_rcbe_wr_cb = rcbe->rcbe_wr_cb; h_rcbe_ctx = rcbe->rcbe_ctx; rcbe->rcbe_wr_cb = NULL; rcbe->rcbe_ctx = NULL; r = pthread_mutex_unlock(&rcbcom_ctx->rcbcom_wrmutex); SM_ASSERT(0 == r); /* r isn't checked further; will fail on next iteration */ /* invoke callback if it exists */ if (h_rcbe_wr_cb != NULL) (void) h_rcbe_wr_cb(h_rcbe_ctx); /* XXX check return value? */ if (empty) { RCBCOMM_LEV_DPRINTF(1, (RCBCOMM_DEBFP, "sev=DBG, func=sm_rcbcom2mod, status=queue_empty, tsk=%p\n", tsk)); rv |= evthr_r_no(EVTHR_EV_WR); } if (NULL == rcbcom_ctx->rcbcom_wrrcbe) rcbcom_ctx->rcbcom_wrrcbe = rcbe; else sm_rcbe_free(rcbe); } else if (sm_is_err(ret)) { RCBCOMM_LEV_DPRINTF(0, (RCBCOMM_DEBFP, "sev=ERR, func=sm_rcbcom2mod, where=done, tsk=%p, ret=%#x\n", tsk, ret)); rv = ret; } RCBCOMM_LEV_DPRINTF(1, (RCBCOMM_DEBFP, "sev=DBG, func=sm_rcbcom2mod, where=done, tsk=%p, rv=%x\n", tsk, rv)); return rv; error: return ret; }