/*
* Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers.
* All rights reserved.
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the sendmail distribution.
*/
#include "sm/generic.h"
SM_RCSID("@(#)$Id: actdb.c,v 1.106 2007/06/03 02:18:49 ca Exp $")
#include "sm/types.h"
#include "sm/assert.h"
#include "sm/magic.h"
#include "sm/str.h"
#include "sm/time.h"
#include "sm/mta.h"
#include "sm/memops.h"
#include "sm/qmgr.h"
#include "sm/actdb-int.h"
#include "sm/qmgr-int.h"
#include "sm/aqrdq.h"
#include "adb.h"
#include "aqrdq.h"
/*
** Simple version of active envelope database
**
** Which access methods do we need?
** DA/Host: for session reuse (scheduler).
** DA TA-Id: to associate a result from a DA with the recipients (da_status).
** SMTPS TA-Id: to find recipients for the same TA (that could be
** be delivered in one TA if the DA/Host is the same) (scheduler).
*/
/*
** Implementation note(s):
**
** It might be useful to keep a list of "freed" entries, instead of actually
** free()ing the data. There are other places (which?) where this is done.
*/
/*
** AQ_MAIL_NEW -- allocate new mail entry
**
** Parameters:
** aq_ta -- AQ transaction
**
** Returns:
** usual sm_error code; ENOMEM
**
** Side Effects: none on error
**
** Last code review: 2005-03-24 23:34:17
** Last code change:
*/
static sm_ret_T
aq_mail_new(aq_ta_P aq_ta)
{
aq_mail_P aq_mail;
SM_IS_AQ_TA(aq_ta);
aq_mail = (aq_mail_P) sm_zalloc(sizeof(*aq_mail));
if (NULL == aq_mail)
return sm_error_temp(SM_EM_AQ, ENOMEM);
aq_ta->aqt_mail = aq_mail;
return SM_SUCCESS;
}
/*
** AQ_MAIL_FREE -- free mail entry
**
** Parameters:
** aq_ta -- AQ transaction
**
** Returns:
** SM_SUCCESS
**
** Last code review: 2005-03-24 23:36:26
** Last code change:
*/
static sm_ret_T
aq_mail_free(aq_ta_P aq_ta)
{
SM_IS_AQ_TA(aq_ta);
if (NULL == aq_ta->aqt_mail)
return SM_SUCCESS;
SM_STR_FREE(aq_ta->aqt_mail->aqm_pa);
SM_FREE_SIZE(aq_ta->aqt_mail, sizeof(aq_ta->aqt_mail));
return SM_SUCCESS;
}
/*
** AQ_RCPT_FREE -- free a single recipient address
**
** Parameters:
** aq_rcpt -- AQ recipient
**
** Returns:
** SM_SUCCESS
**
** Last code review: 2005-03-18 19:56:16
** Last code change:
*/
sm_ret_T
aq_rcpt_free(aq_rcpt_P aq_rcpt)
{
if (NULL == aq_rcpt)
return SM_SUCCESS;
if (aq_rcpt->aqr_addrs != NULL
&& !AQR_IS_FLAG(aq_rcpt, AQR_FL_MEMAR)
&& aq_rcpt->aqr_addrs != &aq_rcpt->aqr_addr_mf)
SM_FREE(aq_rcpt->aqr_addrs);
SM_STR_FREE(aq_rcpt->aqr_pa);
SM_STR_FREE(aq_rcpt->aqr_orig_pa);
SM_STR_FREE(aq_rcpt->aqr_domain);
SM_STR_FREE(aq_rcpt->aqr_msg);
SM_STR_FREE(aq_rcpt->aqr_dsn_msg);
if (aq_rcpt->aqr_dsns != NULL) {
size_t size_dsns;
SM_ASSERT(aq_rcpt->aqr_dsn_rcpts_max > 0);
size_dsns = sizeof(*(aq_rcpt->aqr_dsns)) * aq_rcpt->aqr_dsn_rcpts_max;
SM_ASSERT(aq_rcpt->aqr_dsn_rcpts_max <= size_dsns);
sm_free_size(aq_rcpt->aqr_dsns, size_dsns);
}
#if MTA_USE_TLS
SM_STR_FREE(aq_rcpt->aqr_conf);
#endif
#if AQ_RCPT_CHECK
aq_rcpt->sm_magic = SM_MAGIC_NULL;
#endif
sm_free_size(aq_rcpt, sizeof(*aq_rcpt));
return SM_SUCCESS;
}
/*
** AQ_RCPT_RM -- free a single recipient address, remove it from AQ
** and all of its lists.
**
** Parameters:
** aq_ctx -- AQ context
** aq_rcpt -- AQ recipient
** flags -- flags (AQR_RM_*)
**
** Returns:
** usual sm_error code; SM_E_UNEXPECTED (aq_rdq_rm) et.al.
**
** Note:
** This doesn't decrease the aq_ta counter aqt_rcpts_inaq,
** the caller has to take care of that;
** however, it changes the aq_ctx counters.
**
** Locking: locks entire aq_ctx if requested
**
** Last code review: 2005-03-18 19:56:57
** Last code change: 2005-06-14 17:16:38
*/
sm_ret_T
aq_rcpt_rm(aq_ctx_P aq_ctx, aq_rcpt_P aq_rcpt, uint flags)
{
sm_ret_T ret, rv;
int r;
if (NULL == aq_rcpt)
return SM_SUCCESS;
SM_IS_AQ(aq_ctx);
rv = SM_SUCCESS;
if (SM_IS_FLAG(flags, AQR_RM_LOCK)) {
r = pthread_mutex_lock(&aq_ctx->aq_mutex);
SM_LOCK_OK(r);
if (r != 0)
return sm_error_perm(SM_EM_AQ, r);
}
if (!SM_IS_FLAG(flags, AQR_RM_N_RDQ)) {
rv = aq_rdq_rm(aq_ctx, aq_rcpt, THR_NO_LOCK, NULL /* XXX lctx */);
if (SM_IS_FLAG(flags, AQR_RM_I_RDQ))
rv = SM_SUCCESS;
}
if (!SM_IS_FLAG(flags, AQR_RM_N_WAITQ)) {
ret = aq_waitq_rm(aq_ctx, aq_rcpt,
SM_IS_FLAG(flags, AQR_RM_WAITQ_AR) ? AQWQ_AR :
SM_IS_FLAG(flags, AQR_RM_WAITQ_DA) ? AQWQ_DA : AQWQ_ANY,
false);
if (sm_is_err(ret) && !sm_is_err(rv) && !SM_IS_FLAG(flags, AQR_RM_I_WAITQ))
rv = ret;
}
AQR_REMOVE(aq_ctx, aq_rcpt);
AQR_DA_DELENTRY(aq_rcpt);
AQR_SS_DELENTRY(aq_rcpt);
QM_LEV_DPRINTF(7, (QM_DEBFP, "sev=DBG, func=aq_rcpt_rm, ss_ta=%s, flags=%x, aq_d_entries=%u, aq_entries=%u\n", aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_flags, aq_ctx->aq_d_entries, aq_ctx->aq_entries));
if (AQR_IS_FLAG(aq_rcpt, AQR_FL_DEFEDB)) {
SM_ASSERT(aq_ctx->aq_d_entries > 0);
aq_ctx->aq_d_entries--;
}
SM_ASSERT(aq_ctx->aq_entries > 0);
aq_ctx->aq_entries--;
SM_ASSERT(aq_ctx->aq_d_entries <= aq_ctx->aq_entries);
(void) aq_rcpt_free(aq_rcpt);
if (SM_IS_FLAG(flags, AQR_RM_LOCK)) {
r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
SM_ASSERT(0 == r);
if (r != 0 && sm_is_success(rv))
rv = sm_error_perm(SM_EM_AQ, r);
}
return rv;
}
/*
** AQ_RCPT_NEW -- allocate a new recipient
**
** Parameters:
** paq_rcpt -- recipient (return parameter)
**
** Returns:
** usual sm_error code; ENOMEM
**
** Last code review: 2005-03-30 04:55:36
** Last code change:
*/
sm_ret_T
aq_rcpt_new(aq_rcpt_P *paq_rcpt)
{
aq_rcpt_P aq_rcpt;
SM_REQUIRE(paq_rcpt != NULL);
aq_rcpt = (aq_rcpt_P) sm_zalloc(sizeof(*aq_rcpt));
if (NULL == aq_rcpt)
return sm_error_temp(SM_EM_AQ, ENOMEM);
#if MTA_USE_TLS
aq_rcpt->aqr_maprescnf = sm_err_perm(SM_E_NOTFOUND);
#endif
#if AQ_RCPT_CHECK
aq_rcpt->sm_magic = SM_AQ_RCPT_MAGIC;
#endif
/* Initialize DA and SS lists?? */
*paq_rcpt = aq_rcpt;
return SM_SUCCESS;
}
/*
** AQ_RCPT_ADD_NEW -- create a new recipient and add it to AQ
**
** Parameters:
** aq_ctx -- AQ context
** aq_ta -- AQ TA (for aqr_ss_ta)
** paq_rcpt -- recipient (return parameter)
** flags -- flags
** locktype -- kind of locking
**
** Returns:
** usual sm_error code; ENOMEM, SM_E_FULL
**
** Side Effects: none on error (except if unlock fails)
** if ok: creates new aq_rcpt, adds it to AQ;
** increases the aq_ctx counters.
**
** Note:
** This doesn't increase the aq_ta counter aqt_rcpts_inaq,
** the caller has to take care of that;
**
** Locking: locks/unlocks entire aq_ctx as requested
**
** Last code review: 2005-03-18 02:26:40
** Last code change:
*/
sm_ret_T
aq_rcpt_add_new(aq_ctx_P aq_ctx, aq_ta_P aq_ta, aq_rcpt_P *paq_rcpt, uint32_t flags, thr_lock_T locktype)
{
int r;
sm_ret_T ret;
aq_rcpt_P aq_rcpt;
SM_IS_AQ(aq_ctx);
SM_REQUIRE(paq_rcpt != NULL);
if (thr_lock_it(locktype)) {
r = pthread_mutex_lock(&aq_ctx->aq_mutex);
SM_LOCK_OK(r);
if (r != 0)
return sm_error_perm(SM_EM_AQ, r);
}
aq_rcpt = NULL;
if (aq_ctx->aq_entries >= aq_ctx->aq_limit) {
ret = sm_error_temp(SM_EM_AQ, SM_E_FULL);
goto error;
}
aq_rcpt = (aq_rcpt_P) sm_zalloc(sizeof(*aq_rcpt));
if (NULL == aq_rcpt) {
ret = sm_error_temp(SM_EM_AQ, ENOMEM);
goto error;
}
#if MTA_USE_TLS
aq_rcpt->aqr_maprescnf = sm_err_perm(SM_E_NOTFOUND);
#endif
#if AQ_RCPT_CHECK
aq_rcpt->sm_magic = SM_AQ_RCPT_MAGIC;
#endif
AQR_INSERT_TAIL(aq_ctx, aq_rcpt);
#if 0
/* Initialize also DA and SS lists?? */
// AQR_DA_INIT(aq_rcpt);
// AQR_SS_INIT(aq_rcpt);
#endif /* 0 */
aq_rcpt->aqr_ss_ta = aq_ta;
aq_rcpt->aqr_flags = flags;
*paq_rcpt = aq_rcpt;
QM_LEV_DPRINTF(7, (QM_DEBFP, "sev=DBG, func=aq_rcpt_add_new, ss_ta=%s, flags=%x, aq_d_entries=%u, aq_entries=%u\n", aq_rcpt->aqr_ss_ta_id, flags, aq_ctx->aq_d_entries, aq_ctx->aq_entries));
if (SM_IS_FLAG(flags, AQR_FL_DEFEDB))
aq_ctx->aq_d_entries++;
aq_ctx->aq_entries++;
SM_ASSERT(aq_ctx->aq_d_entries <= aq_ctx->aq_entries);
if (thr_unl_no_err(locktype)) {
r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
SM_ASSERT(0 == r);
if (r != 0)
return sm_error_perm(SM_EM_AQ, r);
}
return SM_SUCCESS;
error:
SM_FREE_SIZE(aq_rcpt, sizeof(*aq_rcpt));
if (thr_unl_if_err(locktype)) {
r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
SM_ASSERT(0 == r);
if (r != 0 && sm_is_success(ret))
ret = sm_error_perm(SM_EM_AQ, r);
}
return ret;
}
/*
** AQ_RCPTS_FREE -- free entire recipient list (in AQ)
**
** Parameters:
** aq_ctx -- AQ context
**
** Returns:
** usual sm_error code; SM_E_UNEXPECTED (aq_rcpt_rm), (un)lock
**
** Locking: locks entire aq_ctx during operation, returns unlocked
**
** Last code review: 2005-03-30 21:36:34; see comment below
** Last code change: 2005-03-30 21:34:18
*/
static sm_ret_T
aq_rcpts_free(aq_ctx_P aq_ctx)
{
int r;
sm_ret_T ret, rv;
aq_rcpt_P aq_rcpt, aq_rcpt_nxt;
SM_IS_AQ(aq_ctx);
r = pthread_mutex_lock(&aq_ctx->aq_mutex);
SM_LOCK_OK(r);
if (r != 0)
return sm_error_perm(SM_EM_AQ, r);
ret = SM_SUCCESS;
for (aq_rcpt = AQR_FIRST(aq_ctx); aq_rcpt != AQR_END(aq_ctx); aq_rcpt = aq_rcpt_nxt)
{
aq_rcpt_nxt = AQR_NEXT(aq_rcpt);
rv = aq_rcpt_rm(aq_ctx, aq_rcpt, 0);
if (sm_is_err(rv) && !sm_is_err(ret))
ret = rv; /* don't stop on error?? */
}
r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
SM_ASSERT(0 == r);
if (r != 0 && sm_is_success(ret))
ret = sm_error_perm(SM_EM_AQ, r);
return ret;
}
/*
** AQ_TA_RM -- free a transaction, remove it from AQ
**
** Parameters:
** aq_ctx -- AQ context
** aq_ta -- AQ transaction
** lockit -- needs locking?
**
** Returns:
** usual sm_error code; only (un)lock
**
** Side Effects: none on error (except if unlock fails)
**
** Last code review: 2005-03-28 22:35:42
** Last code change:
*/
sm_ret_T
aq_ta_rm(aq_ctx_P aq_ctx, aq_ta_P aq_ta, bool lockit)
{
int r;
if (NULL == aq_ta)
return SM_SUCCESS;
SM_IS_AQ(aq_ctx);
SM_IS_AQ_TA(aq_ta);
if (lockit) {
r = pthread_mutex_lock(&aq_ctx->aq_mutex);
SM_LOCK_OK(r);
if (r != 0)
return sm_error_perm(SM_EM_AQ, r);
}
(void) aq_mail_free(aq_ta);
CDB_PRT(aq_ta->aqt_cdb_id, aq_ta->aqt_ss_ta_id, "aq_ta_rm");
SM_CSTR_FREE(aq_ta->aqt_cdb_id);
AQ_TAS_REMOVE(aq_ctx, aq_ta);
#if AQ_TA_CHECK
aq_ta->sm_magic = SM_MAGIC_NULL;
#endif
QM_LEV_DPRINTF(7, (QM_DEBFP, "sev=DBG, func=aq_ta_rm, ss_ta=%s, ta_flags=%x, aq_d_entries=%u, aq_entries=%u\n", aq_ta->aqt_ss_ta_id, aq_ta->aqt_flags, aq_ctx->aq_d_entries, aq_ctx->aq_entries));
if (AQ_TA_IS_FLAG(aq_ta, AQ_TA_FL_DEFEDB)) {
SM_ASSERT(aq_ctx->aq_d_entries > 0);
aq_ctx->aq_d_entries--;
}
sm_hdrmodl_free(&aq_ta->aqt_hdrmodhd);
SM_ASSERT(aq_ctx->aq_entries > 0);
aq_ctx->aq_entries--;
SM_ASSERT(aq_ctx->aq_d_entries <= aq_ctx->aq_entries);
sm_free_size(aq_ta, sizeof(*aq_ta));
if (lockit) {
r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
SM_ASSERT(0 == r);
if (r != 0)
return sm_error_perm(SM_EM_AQ, r);
}
return SM_SUCCESS;
}
/*
** AQ_TAS_FREE -- free all transactions in AQ
**
** Parameters:
** aq_ctx -- AQ context
**
** Returns:
** usual sm_error code; only (un)lock
**
** Locking: locks entire aq_ctx during operation, returns unlocked
**
** Last code review: 2005-03-30 21:37:18
** Last code change:
*/
static sm_ret_T
aq_tas_free(aq_ctx_P aq_ctx)
{
int r;
aq_ta_P aq_ta, aq_ta_nxt;
SM_IS_AQ(aq_ctx);
r = pthread_mutex_lock(&aq_ctx->aq_mutex);
SM_LOCK_OK(r);
if (r != 0)
return sm_error_perm(SM_EM_AQ, r);
for (aq_ta = AQ_TAS_FIRST(aq_ctx); aq_ta != AQ_TAS_END(aq_ctx); aq_ta = aq_ta_nxt)
{
aq_ta_nxt = AQ_TAS_NEXT(aq_ta);
aq_ta_rm(aq_ctx, aq_ta, false);
}
r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
SM_ASSERT(0 == r);
if (r != 0)
return sm_error_perm(SM_EM_AQ, r);
return SM_SUCCESS;
}
/*
** AQ_TA_FREE -- free transaction
**
** Parameters:
** aq_ta -- AQ transaction
**
** Returns:
** SM_SUCCESS
**
** Last code review: 2005-03-30 21:57:32
** Last code change: 2005-03-30 21:49:32
*/
sm_ret_T
aq_ta_free(aq_ta_P aq_ta)
{
if (NULL == aq_ta)
return SM_SUCCESS;
SM_IS_AQ_TA(aq_ta);
(void) aq_mail_free(aq_ta);
SM_FREE(aq_ta->aqt_owners_pa);
#if AQ_TA_CHECK
aq_ta->sm_magic = SM_MAGIC_NULL;
#endif
sm_free_size(aq_ta, sizeof(*aq_ta));
return SM_SUCCESS;
}
/*
** AQ_TA_NEW -- allocate new transaction
**
** Parameters:
** paq_ta -- pointer to AQ transaction (output)
**
** Returns:
** usual sm_error code; ENOMEM
**
** Last code review: 2005-03-30 21:58:04
** Last code change: 2005-03-30 21:47:52
*/
sm_ret_T
aq_ta_new(aq_ta_P *paq_ta)
{
sm_ret_T ret;
aq_ta_P aq_ta;
SM_REQUIRE(paq_ta != NULL);
aq_ta = (aq_ta_P) sm_zalloc(sizeof(*aq_ta));
if (NULL == aq_ta) {
ret = sm_error_temp(SM_EM_AQ, ENOMEM);
goto error;
}
#if AQ_TA_CHECK
/* set this early, otherwise the rest of the routines will fail */
aq_ta->sm_magic = SM_AQ_TA_MAGIC;
#endif
ret = aq_mail_new(aq_ta);
if (sm_is_err(ret))
goto error;
*paq_ta = aq_ta;
return SM_SUCCESS;
error:
(void) aq_ta_free(aq_ta);
return ret;
}
/*
** AQ_TA_ADD_NEW -- create new transaction in AQ
**
** Parameters:
** aq_ctx -- AQ context
** paq_ta -- pointer to AQ transaction (output)
** flags -- flags
** nrcpts -- number of recipients
** locktype -- kind of locking
**
** Returns:
** usual sm_error code; ENOMEM, SM_E_FULL, et.al.
**
** Side Effects: none on error (except if unlock fails)
**
** Last code review: 2005-03-18 02:20:49; see comments below
** Last code change: 2005-03-18 02:20:46
*/
sm_ret_T
aq_ta_add_new(aq_ctx_P aq_ctx, aq_ta_P *paq_ta, uint32_t flags, uint nrcpts, thr_lock_T locktype)
{
sm_ret_T ret;
int r;
aq_ta_P aq_ta;
SM_IS_AQ(aq_ctx);
SM_REQUIRE(paq_ta != NULL);
if (thr_lock_it(locktype)) {
r = pthread_mutex_lock(&aq_ctx->aq_mutex);
SM_LOCK_OK(r);
if (r != 0)
return sm_error_perm(SM_EM_AQ, r);
}
/*
** This isn't really correct: this way we can never add
** a transaction (from SMTPS) with a lot of recipients...
** But for now it's "good enough" (it does not affect aliases,
** those are added differently).
** todo: maybe fix this.
*/
if (aq_ctx->aq_entries + nrcpts >= aq_ctx->aq_limit) {
ret = sm_error_temp(SM_EM_AQ, SM_E_FULL);
goto errunl;
}
aq_ta = (aq_ta_P) sm_zalloc(sizeof(*aq_ta));
if (NULL == aq_ta) {
ret = sm_error_temp(SM_EM_AQ, ENOMEM);
goto errunl;
}
#if AQ_TA_CHECK
/* set this early, otherwise the rest of the routines will fail */
aq_ta->sm_magic = SM_AQ_TA_MAGIC;
#endif
AQ_TAS_INSERT_TAIL(aq_ctx, aq_ta);
ret = aq_mail_new(aq_ta);
if (sm_is_err(ret))
goto error;
aq_ta->aqt_flags = flags;
QM_LEV_DPRINTF(7, (QM_DEBFP, "sev=DBG, func=aq_ta_add_new, ss_ta=%s, ta_flags=%x, aq_d_entries=%u, aq_entries=%u\n", aq_ta->aqt_ss_ta_id, flags, aq_ctx->aq_d_entries, aq_ctx->aq_entries));
if (SM_IS_FLAG(flags, AQ_TA_FL_DEFEDB))
aq_ctx->aq_d_entries++;
aq_ctx->aq_entries++;
SM_ASSERT(aq_ctx->aq_d_entries <= aq_ctx->aq_entries);
/* fill in more data?? */
*paq_ta = aq_ta;
if (thr_unl_no_err(locktype)) {
r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
SM_ASSERT(0 == r);
if (r != 0)
return sm_error_perm(SM_EM_AQ, r);
}
return SM_SUCCESS;
error:
/* clean up... */
(void) aq_ta_rm(aq_ctx, aq_ta, false);
errunl:
if (thr_unl_if_err(locktype)) {
r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
SM_ASSERT(0 == r);
if (r != 0 && sm_is_success(ret))
ret = sm_error_perm(SM_EM_AQ, r);
}
*paq_ta = NULL; /* just a courtesy for the caller */
return ret;
}
/*
** AQ_CLOSE -- close an AQ
**
** Parameters:
** aq_ctx -- AQ context
**
** Returns:
** SM_SUCCESS
**
** Last code review: 2005-03-30 22:00:51
** Last code change: 2005-03-30 21:59:31
*/
sm_ret_T
aq_close(aq_ctx_P aq_ctx)
{
if (NULL == aq_ctx)
return SM_SUCCESS;
SM_IS_AQ(aq_ctx);
/* walk through queues, free them! */
aq_tas_free(aq_ctx);
aq_rcpts_free(aq_ctx);
if (aq_ctx->aq_rdq_ht != NULL)
bht_destroy(aq_ctx->aq_rdq_ht, aq_rdq_ht_free, NULL);
(void) pthread_mutex_destroy(&aq_ctx->aq_mutex);
#if AQ_CHECK
aq_ctx->sm_magic = SM_MAGIC_NULL;
#endif
sm_free_size(aq_ctx, sizeof(*aq_ctx));
return SM_SUCCESS;
}
/*
** AQ_OPEN -- open a new AQ
**
** Parameters:
** qmgr_ctx -- QMGR context
** paq_ctx -- pointer to AQ context (output)
** max_entries -- maximum number of entries in AQ
** flags -- various flags
**
** Returns:
** usual sm_error code; ENOMEM, lock creation
**
** Last code review: 2005-03-30 22:10:57
** Last code change: 2005-03-30 22:04:58
*/
sm_ret_T
aq_open(qmgr_ctx_P qmgr_ctx, aq_ctx_P *paq_ctx, uint max_entries, uint flags)
{
int r;
sm_ret_T ret;
aq_ctx_P aq_ctx;
SM_REQUIRE(paq_ctx != NULL);
aq_ctx = (aq_ctx_P) sm_zalloc(sizeof(*aq_ctx));
if (NULL == aq_ctx)
goto enomem;
r = pthread_mutex_init(&aq_ctx->aq_mutex, SM_PTHREAD_MUTEXATTR);
if (r != 0) {
ret = sm_error_perm(SM_EM_AQ, r);
goto error;
}
AQ_TAS_INIT(aq_ctx);
AQR_INIT(aq_ctx);
aq_ctx->aq_limit = max_entries;
aq_ctx->aq_max_entries = max_entries;
aq_ctx->aq_qmgr_ctx = qmgr_ctx;
if (!SM_IS_FLAG(flags, AQ_OPEN_FL_NOHT)) {
aq_ctx->aq_rdq_ht = bht_new(max_entries * 2, max_entries);
if (NULL == aq_ctx->aq_rdq_ht)
goto enomem;
}
AQ_RDQS_INIT(aq_ctx->aq_rdqs);
AQ_RDQS_INIT(aq_ctx->aq_rdqs_free);
AQR_WAITQ_INIT(aq_ctx);
#if AQ_CHECK
aq_ctx->sm_magic = SM_AQ_MAGIC;
#endif
*paq_ctx = aq_ctx;
return SM_SUCCESS;
enomem:
ret = sm_error_temp(SM_EM_AQ, ENOMEM);
error:
/* complain */
SM_FREE_SIZE(aq_ctx, sizeof(*aq_ctx));
return ret;
}
/*
** AQ_USAGE -- return percentage of # of item in AQ (* 100)
**
** Parameters:
** aq_ctx -- AQ context
** which -- deferred or all entries
**
** Returns:
** fill level (0...100)
**
** Locking: doesn't lock aq_ctx -> may return bogus data
**
** Last code review: 2005-03-18 02:14:10
** Last code change:
*/
int
aq_usage(aq_ctx_P aq_ctx, int which)
{
uint entries;
SM_IS_AQ(aq_ctx);
if (AQ_USAGE_DEFEDB == which)
entries = aq_ctx->aq_d_entries;
else
entries = aq_ctx->aq_entries;
return (int) ((entries * 100) / aq_ctx->aq_limit);
}
#if 0
///*
//** AQ_TA_WALK -- walk through list of transactions, apply a function
//**
//** Parameters:
//** aq_ctx -- AQ context
//** f -- function to apply
//** ctx -- context for f
//**
//** Returns:
//** usual sm_error code
//**
//** Locking: doesn't lock aq_ctx!
//*/
//
//sm_ret_T
//aq_ta_walk(aq_ctx_P aq_ctx, aq_ta_F f, void *ctx)
//{
// sm_ret_T ret;
// aq_ta_P aq_ta, aq_nxt_ta;
//
// SM_IS_AQ(aq_ctx);
// for (aq_ta = AQ_TAS_FIRST(aq_ctx);
// aq_ta != AQ_TAS_END(aq_ctx);
// aq_ta = aq_nxt_ta)
// {
// aq_nxt_ta = AQ_TAS_NEXT(aq_ta);
// ret = f(aq_ta, ctx);
//
// /* stop on error?? */
// if (sm_is_err(ret))
// return ret;
// }
//
// return SM_SUCCESS;
//}
#endif /* 0 */
#if 0
///*
//** AQ_RCPT_WALK -- walk through list of recipients, apply a function
//**
//** Parameters:
//** aq_ctx -- AQ context
//** f -- function to apply
//** ctx -- context for f
//**
//** Returns:
//** usual sm_error code
//**
//** Locking: doesn't lock aq_ctx!
//*/
//
//sm_ret_T
//aq_rcpt_walk(aq_ctx_P aq_ctx, aq_rcpt_F f, void *ctx)
//{
// sm_ret_T ret;
// aq_rcpt_P aq_rcpt, aq_nxt_rcpt;
//
// SM_IS_AQ(aq_ctx);
// for (aq_rcpt = AQR_FIRST(aq_ctx);
// aq_rcpt != AQR_END(aq_ctx);
// aq_rcpt = aq_nxt_rcpt)
// {
// aq_nxt_rcpt = AQR_NEXT(aq_rcpt);
// ret = f(aq_rcpt, ctx);
//
// /* stop on error?? */
// if (sm_is_err(ret))
// return ret;
// }
// return SM_SUCCESS;
//}
#endif /* 0 */
/*
** AQ_TA_FIND -- find a transaction based on SMTPS transaction id
**
** Parameters:
** aq_ctx -- AQ context
** ta_id -- SMTPS transaction id
** lockit -- needs locking?
** paq_ta -- pointer to transaction (output)
**
** Returns:
** usual sm_error code; SM_E_NOTFOUND, (un)lock
**
** Side Effects: none
**
** Last code review: 2005-03-18 21:48:50
** Last code change:
*/
sm_ret_T
aq_ta_find(aq_ctx_P aq_ctx, sessta_id_T ta_id, bool lockit, aq_ta_P *paq_ta)
{
int r;
aq_ta_P aq_ta;
SM_IS_AQ(aq_ctx);
SM_REQUIRE(paq_ta != NULL);
*paq_ta = NULL;
if (lockit) {
r = pthread_mutex_lock(&aq_ctx->aq_mutex);
SM_LOCK_OK(r);
if (r != 0)
return sm_error_perm(SM_EM_AQ, r);
}
for (aq_ta = AQ_TAS_FIRST(aq_ctx); aq_ta != AQ_TAS_END(aq_ctx); aq_ta = AQ_TAS_NEXT(aq_ta))
{
if (SESSTA_EQ(aq_ta->aqt_ss_ta_id, ta_id)) {
*paq_ta = aq_ta;
break;
}
}
if (lockit) {
r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
SM_ASSERT(0 == r);
if (r != 0)
return sm_error_perm(SM_EM_AQ, r);
}
return (*paq_ta == NULL) ? sm_error_perm(SM_EM_AQ, SM_E_NOTFOUND)
: SM_SUCCESS;
}
#define AQ_RCPT_FIND(ta_id_x) \
{ \
int r; \
aq_rcpt_P aq_rcpt; \
\
SM_IS_AQ(aq_ctx); \
SM_REQUIRE(paq_rcpt != NULL); \
*paq_rcpt = NULL; \
if (thr_lock_it(locktype)) { \
r = pthread_mutex_lock(&aq_ctx->aq_mutex); \
SM_LOCK_OK(r); \
if (r != 0) \
return sm_error_perm(SM_EM_AQ, r); \
} \
for (aq_rcpt = AQR_FIRST(aq_ctx); \
aq_rcpt != AQR_END(aq_ctx); \
aq_rcpt = AQR_NEXT(aq_rcpt)) \
{ \
if (SESSTA_EQ(aq_rcpt->ta_id_x, ta_id) && \
aq_rcpt->aqr_idx == rcpt_idx) \
{ \
*paq_rcpt = aq_rcpt; \
break; \
} \
} \
if (thr_unl_no_err(locktype) || \
(*paq_rcpt == NULL && thr_unl_if_err(locktype))) \
{ \
r = pthread_mutex_unlock(&aq_ctx->aq_mutex); \
SM_ASSERT(0 == r); \
if (r != 0) \
return sm_error_perm(SM_EM_AQ, r); \
} \
return (*paq_rcpt == NULL) ? sm_error_perm(SM_EM_AQ, SM_E_NOTFOUND) \
: SM_SUCCESS; \
}
/*
** AQ_RCPT_FIND_SS -- find rcpt based on SMTPS transaction id and rcpt idx
**
** Note: do not change parameter names, they are used in the macro above
**
** Parameters:
** aq_ctx -- AQ context
** ta_id -- SMTPS transaction id
** rcpt_idx -- recipient index
** locktype -- kind of locking
** paq_rcpt -- pointer to rcpt (output)
**
** Returns:
** usual sm_error code: SM_E_NOTFOUND, (un)lock
**
** Side Effects: none
**
** Last code review: 2005-03-18 23:59:02
** Last code change:
*/
sm_ret_T
aq_rcpt_find_ss(aq_ctx_P aq_ctx, sessta_id_T ta_id, rcpt_idx_T rcpt_idx, thr_lock_T locktype, aq_rcpt_P *paq_rcpt)
AQ_RCPT_FIND(aqr_ss_ta_id)
/*
** AQ_RCPT_FIND_DA -- find rcpt based on DA transaction id and rcpt idx
**
** Note: do not change parameter names, they are used in the macro above
**
** Parameters:
** aq_ctx -- AQ context
** ta_id -- DA transaction id
** rcpt_idx -- recipient index
** locktype -- kind of locking
** paq_rcpt -- pointer to rcpt (output)
**
** Returns:
** usual sm_error code: SM_E_NOTFOUND, (un)lock
**
** Side Effects: none
**
** Last code review: 2005-03-18 23:59:02
** Last code change:
*/
sm_ret_T
aq_rcpt_find_da(aq_ctx_P aq_ctx, sessta_id_T ta_id, rcpt_idx_T rcpt_idx, thr_lock_T locktype, aq_rcpt_P *paq_rcpt)
AQ_RCPT_FIND(aqr_da_ta_id)
#define AQ_RCPT_FIND_ONE(ta_id_x) \
{ \
int r; \
aq_rcpt_P aq_rcpt; \
\
SM_IS_AQ(aq_ctx); \
SM_REQUIRE(paq_rcpt != NULL); \
*paq_rcpt = NULL; \
if (thr_lock_it(locktype)) \
{ \
r = pthread_mutex_lock(&aq_ctx->aq_mutex); \
SM_LOCK_OK(r); \
if (r != 0) \
return sm_error_perm(SM_EM_AQ, r); \
} \
for (aq_rcpt = AQR_FIRST(aq_ctx); \
aq_rcpt != AQR_END(aq_ctx); \
aq_rcpt = AQR_NEXT(aq_rcpt)) \
{ \
if (SESSTA_EQ(aq_rcpt->ta_id_x, ta_id)) \
{ \
*paq_rcpt = aq_rcpt; \
break; \
} \
} \
if (thr_unl_no_err(locktype) || \
(*paq_rcpt == NULL && thr_unl_if_err(locktype))) \
{ \
r = pthread_mutex_unlock(&aq_ctx->aq_mutex); \
SM_ASSERT(0 == r); \
if (r != 0) \
return sm_error_perm(SM_EM_AQ, r); \
} \
return (*paq_rcpt == NULL) ? sm_error_perm(SM_EM_AQ, SM_E_NOTFOUND) \
: SM_SUCCESS; \
}
/*
** AQ_RCPT_FIND_ONE_SS -- find rcpt based on SMTPS transaction id
**
** Note: do not change parameter names, they are used in the macro above
**
** Parameters:
** aq_ctx -- AQ context
** ta_id -- SMTPS transaction id
** locktype -- kind of locking
** paq_rcpt -- pointer to rcpt (output)
**
** Returns:
** usual sm_error code: SM_E_NOTFOUND, (un)lock
**
** Side Effects: none
**
** Last code review: 2005-03-18 21:50:29
** Last code change:
*/
sm_ret_T
aq_rcpt_find_one_ss(aq_ctx_P aq_ctx, sessta_id_T ta_id, thr_lock_T locktype, aq_rcpt_P *paq_rcpt)
AQ_RCPT_FIND_ONE(aqr_ss_ta_id)
/*
** AQ_RCPT_FIND_ONE_DA -- find rcpt based on DA transaction id
**
** Note: do not change parameter names, they are used in the macro above
**
** Parameters:
** aq_ctx -- AQ context
** ta_id -- DA transaction id
** locktype -- kind of locking
** paq_rcpt -- pointer to rcpt (output)
**
** Returns:
** usual sm_error code: SM_E_NOTFOUND, (un)lock
** Side Effects: none
**
** Last code review: 2005-03-18 21:50:29
** Last code change:
*/
sm_ret_T
aq_rcpt_find_one_da(aq_ctx_P aq_ctx, sessta_id_T ta_id, thr_lock_T locktype, aq_rcpt_P *paq_rcpt)
AQ_RCPT_FIND_ONE(aqr_da_ta_id)
/*
** AQ_RCPT_LOCKOP -- lock/unlock rcpt
** currently: entire AQ; if this doesn't change, then it's simpler
** to lock it directly in the application.
** Moreover, it might be better to have two different functions.
**
** Parameters:
** aq_ctx -- AQ context
** aq_rcpt -- rcpt
** locktype -- kind of locking
**
** Returns:
** usual sm_error code; only (un)lock
**
** Last code review: 2005-03-29 17:39:47
** Last code change:
*/
sm_ret_T
aq_rcpt_lockop(aq_ctx_P aq_ctx, aq_rcpt_P aq_rcpt, thr_lock_T locktype)
{
int r;
SM_IS_AQ(aq_ctx);
if (thr_lock_it(locktype)) {
r = pthread_mutex_lock(&aq_ctx->aq_mutex);
SM_LOCK_OK(r);
if (r != 0)
return sm_error_perm(SM_EM_AQ, r);
}
if (thr_unl_always(locktype)) {
r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
SM_ASSERT(0 == r);
if (r != 0)
return sm_error_perm(SM_EM_AQ, r);
}
return SM_SUCCESS;
}
syntax highlighted by Code2HTML, v. 0.9.1