/*
* Copyright (c) 2003-2005 Sendmail, Inc. and its suppliers.
* All rights reserved.
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the sendmail distribution.
*/
#include "sm/generic.h"
SM_RCSID("@(#)$Id: aqrdq.c,v 1.32 2006/12/31 22:38:30 ca Exp $")
#include "sm/types.h"
#include "sm/assert.h"
#include "sm/magic.h"
#include "sm/bhtable.h"
#include "sm/actdb-int.h"
#include "sm/aqrdq.h"
#include "aqrdq.h"
#include "sm/qmgrdbg.h"
#include "log.h"
/*
** AQ_RDQ_ADD -- add rcpt to rdq entry (may create new rdq entry)
**
** Parameters:
** aq_ctx -- AQ context
** aq_rcpt -- AQ recipient
** paqrdq_flags -- flags from rcpt dest queue (output)
** locktype -- kind of locking
**
** Returns:
** usual sm_error code; ENOMEM, etc
**
** Side Effects: none on error (except if unlock fails)
**
** Locking: locks aq_ctx if requested
**
** Last code review: 2005-03-25 00:55:35; see comments below
** Last code change: 2006-11-11 18:23:17
*/
sm_ret_T
aq_rdq_add(aq_ctx_P aq_ctx, aq_rcpt_P aq_rcpt, uint32_t *paqrdq_flags, thr_lock_T locktype)
{
sm_ret_T ret;
int r;
aqrdq_ctx_P aqrdq_ctx;
SM_IS_AQ(aq_ctx);
SM_IS_AQ_RCPT(aq_rcpt);
if (thr_lock_it(locktype)) {
r = pthread_mutex_lock(&aq_ctx->aq_mutex);
SM_LOCK_OK(r);
if (r != 0)
return sm_error_perm(SM_EM_AQ, r);
}
ret = SM_SUCCESS;
/*
** Do we need to lock aq_ctx to access aq_rcpt?
** There's currently only one caller who doesn't have a lock on aq:
** qm_get_edb_entries() which creates a new entry and later on adds
** it to the rcpt dest queue.
** Question: who accesses aq_rcpt? this should be documented
** somewhere so we can figure out whether aq must be locked
** before accessing aq_rcpt.
*/
SM_ASSERT(!AQR_IS_FLAG(aq_rcpt, AQR_FL_RDQ));
if (aq_rcpt->aqr_addr_cur >= aq_rcpt->aqr_addr_max) {
/*
** There are no more addresses to try, so the rcpt can't be
** added to the rcpt dest queue.
** Should this really return success? This is mostly a
** courtesy for callers so they don't need to worry about it.
*/
/* ret = SM_NOTDONE; */
QM_LEV_DPRINTF(0, (QM_DEBFP, "sev=ERROR, func=aq_rdq_add, rcpt=%p, cur=%d, max=%d\n"
, aq_rcpt, aq_rcpt->aqr_addr_cur, aq_rcpt->aqr_addr_max));
goto unlock;
}
aqrdq_ctx = bht_find(aq_ctx->aq_rdq_ht, (void *) &(AQR_CUR_DEST(aq_rcpt)),
sizeof(AQR_CUR_DEST(aq_rcpt)));
if (NULL == aqrdq_ctx) {
ret = aq_rdq_new(aq_ctx, aq_rcpt->aqr_da_idx, AQR_CUR_DEST(aq_rcpt),
&aqrdq_ctx);
if (sm_is_err(ret))
goto error;
}
if (paqrdq_flags != NULL)
*paqrdq_flags = aqrdq_ctx->aqrdq_flags;
/* other values to change? */
AQ_RDQ_INSERT_TAIL(aqrdq_ctx->aqrdq_rcpts, aq_rcpt);
aqrdq_ctx->aqrdq_entries++;
AQR_SET_FLAG(aq_rcpt, AQR_FL_RDQ);
unlock:
if (thr_unl_no_err(locktype)) {
r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
SM_ASSERT(0 == r);
if (r != 0 && sm_is_success(ret))
ret = sm_error_perm(SM_EM_AQ, r);
}
return ret;
error:
if (thr_unl_if_err(locktype)) {
r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
SM_ASSERT(0 == r);
if (r != 0 && sm_is_success(ret))
ret = sm_error_perm(SM_EM_AQ, r);
}
return ret;
}
/*
** AQ_RDQ_RM -- remove rcpt from rdq entry
**
** Parameters:
** aq_ctx -- AQ context
** aq_rcpt -- AQ recipient
** locktype -- kind of locking
** lctx -- logging context
**
** Returns:
** usual sm_error code; SM_E_UNEXPECTED et.al.
**
** Last code review: 2005-03-20 03:32:40
** Last code change: 2005-03-20 03:32:40
*/
sm_ret_T
aq_rdq_rm(aq_ctx_P aq_ctx, aq_rcpt_P aq_rcpt, thr_lock_T locktype, sm_log_ctx_P lctx)
{
sm_ret_T ret;
int r;
aqrdq_ctx_P aqrdq_ctx;
SM_IS_AQ(aq_ctx);
SM_IS_AQ_RCPT(aq_rcpt);
if (thr_lock_it(locktype)) {
r = pthread_mutex_lock(&aq_ctx->aq_mutex);
SM_LOCK_OK(r);
if (r != 0)
return sm_error_perm(SM_EM_AQ, r);
}
ret = SM_SUCCESS;
if (aq_rcpt->aqr_addr_cur >= aq_rcpt->aqr_addr_max ||
!AQR_IS_FLAG(aq_rcpt, AQR_FL_RDQ))
{
/*
** Not in any queue. This can happen because the caller
** doesn't have much of a clue, e.g., updating recipient
** status due to an AR timeout will cause this.
** Be nice and return success.
*/
QM_LEV_DPRINTF(3, (QM_DEBFP, "sev=WARN, func=aq_rdq_rm, aq_rcpt=%p, cur=%d, max=%d, aqr_flags=%x, ss_ta_id=%s, da_ta_id=%s, idx=%u\n"
, aq_rcpt, aq_rcpt->aqr_addr_cur, aq_rcpt->aqr_addr_max, aq_rcpt->aqr_flags, aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_da_ta_id, aq_rcpt->aqr_idx));
goto unlock;
}
aqrdq_ctx = bht_find(aq_ctx->aq_rdq_ht, (void *) &(AQR_CUR_DEST(aq_rcpt)),
sizeof(AQR_CUR_DEST(aq_rcpt)));
if (NULL == aqrdq_ctx) {
/* OOPS shouldn't happen... */
ret = sm_error_perm(SM_EM_AQ, SM_E_UNEXPECTED);
goto error;
}
if (AQR_IS_FLAG(aq_rcpt, AQR_FL_RDQ)) {
SM_IS_AQ_RCPT(AQ_RDQ_FIRST(aqrdq_ctx->aqrdq_rcpts));
AQ_RDQ_REMOVE(aqrdq_ctx->aqrdq_rcpts, aq_rcpt);
if (aqrdq_ctx->aqrdq_entries <= 0) {
/* assert? */
ret = sm_error_perm(SM_EM_AQ, SM_E_UNEXPECTED);
sm_log_write(lctx, ACTDB_LCAT_ACTDB, ACTDB_LMOD_ACTDB,
SM_LOG_INCONS, 2,
"sev=FATAL, func=aq_rdq_rm, aqrdq_entries=0, id=%s, rcpt_idx=%u"
, aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_idx);
}
else
--aqrdq_ctx->aqrdq_entries;
AQR_CLR_FLAG(aq_rcpt, AQR_FL_RDQ);
}
else {
/* "can't happen" (checked above) */
ret = sm_error_perm(SM_EM_AQ, SM_E_UNEXPECTED);
SM_ASSERT(AQR_IS_FLAG(aq_rcpt, AQR_FL_RDQ));
}
if (0 == aqrdq_ctx->aqrdq_entries) {
bht_rm(aq_ctx->aq_rdq_ht,
(void *) &(AQR_CUR_DEST(aq_rcpt)),
sizeof(AQR_CUR_DEST(aq_rcpt)),
aq_rdq_ht_free, aq_ctx);
}
/* change some other values? */
unlock:
if (thr_unl_no_err(locktype)) {
r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
SM_ASSERT(0 == r);
if (r != 0 && sm_is_success(ret))
ret = sm_error_perm(SM_EM_AQ, r);
}
return ret;
error:
if (thr_unl_if_err(locktype)) {
r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
SM_ASSERT(0 == r);
if (r != 0 && sm_is_success(ret))
ret = sm_error_perm(SM_EM_AQ, r);
}
return ret;
}
#if 0
///*
//** AQ_RDQ_FIND -- find rdq entry for an IPv4 address
//**
//** Parameters:
//** aq_ctx -- AQ context
//** ipv4 -- IPv4 address
//** paqrdq_ctx -- (pointer to) AQ RDQ entry (output)
//** locktype -- kind of locking
//**
//** Returns:
//** usual sm_error code; SM_E_NOTFOUND, (un)lock
//**
//** Last code review: 2005-04-13 23:12:15
//** Last code change:
//*/
//
//sm_ret_T
//aq_rdq_find(aq_ctx_P aq_ctx, ipv4_T ipv4, aqrdq_ctx_P *paqrdq_ctx, thr_lock_T locktype)
//{
// sm_ret_T ret;
// int r;
// aqrdq_ctx_P aqrdq_ctx;
//
// SM_IS_AQ(aq_ctx);
// if (thr_lock_it(locktype))
// {
// r = pthread_mutex_lock(&aq_ctx->aq_mutex);
// SM_LOCK_OK(r);
// if (r != 0)
// return sm_error_perm(SM_EM_AQ, r);
// }
// ret = SM_SUCCESS;
// aqrdq_ctx = bht_find(aq_ctx->aq_rdq_ht, (void *) &ipv4, sizeof(ipv4));
// if (aqrdq_ctx != NULL)
// {
// if (paqrdq_ctx != NULL)
// *paqrdq_ctx = aqrdq_ctx;
// }
// else
// ret = sm_error_perm(SM_EM_AQ, SM_E_NOTFOUND);
//
// if ((!sm_is_err(ret) && thr_unl_no_err(locktype))
// || (sm_is_err(ret) && thr_unl_if_err(locktype)))
// {
// r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
// SM_ASSERT(0 == r);
// if (r != 0 && sm_is_success(ret))
// ret = sm_error_perm(SM_EM_AQ, r);
// }
// return ret;
//}
#endif
syntax highlighted by Code2HTML, v. 0.9.1