/*
* Copyright (c) 2003-2005 Sendmail, Inc. and its suppliers.
* All rights reserved.
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the sendmail distribution.
*/
#include "sm/generic.h"
SM_RCSID("@(#)$Id: edbr.c,v 1.77 2006/11/27 03:40:40 ca Exp $")
#include "sm/assert.h"
#include "sm/magic.h"
#include "sm/error.h"
#include "sm/memops.h"
#include "sm/edbc.h"
#include "sm/qmgr-int.h"
#include "sm/actdb-int.h"
#include "sm/aqrdq.h"
#include "qmgr.h"
#include "log.h"
#define AQ_USAGE_ALL_RDEDB 99
#define AQ_USAGE_DEF_RDEDB 10
/*
** QM_EDBR_GEN_DSN -- generate a dsn for a (failed) rcpt that has been
** read from DEFEDB unless there still is one, take care of EDB cache etc.
**
** Parameters:
** qmgr_ctx -- QMGR context
** aq_ctx -- AQ context
** aq_ta -- AQ transaction
** aq_rcpt -- AQ recipient
** edb_ctx -- EDB context
** edb_req_hd -- head of request list for (DEF)EDB
** edbc_ctx -- EDBC context
** edbc_node -- current EDBC node
** time_now -- current time
**
** Returns:
** usual sm_error code
**
** Side Effects: aq_rcpt is freed
**
** Called by: qm_get_edb_entries()
**
** Last code review: 2005-04-18 01:30:54; see comments
** Last code change: 2005-04-18 05:41:10
*/
static sm_ret_T
qm_edbr_gen_dsn(qmgr_ctx_P qmgr_ctx, aq_ctx_P aq_ctx, aq_ta_P aq_ta, aq_rcpt_P aq_rcpt, edb_ctx_P edb_ctx, edb_req_hd_T edb_req_hd, edbc_ctx_P edbc_ctx, edbc_node_P edbc_node, time_T time_now)
{
sm_ret_T ret, flags;
time_T bounce_expire;
flags = 0;
bounce_expire = 0;
/* is there already a bounce? */
/* XXX does this have to check for double bounce too? qm_bounce_add() */
if (aq_rcpt_has_bounce(aq_rcpt)) {
aq_rcpt_P aq_rcpt_2;
/* check whether bounce is still in AQ */
ret = aq_rcpt_find_ss(aq_ctx, aq_rcpt->aqr_ss_ta_id,
aq_rcpt->aqr_dsn_idx, THR_LOCK_UNLOCK, &aq_rcpt_2);
if (sm_is_success(ret) && aq_rcpt_2 != NULL)
bounce_expire = aq_rcpt_2->aqr_expire;
}
else
ret = sm_error_perm(SM_EM_AQ, SM_E_NOTFOUND);
if (bounce_expire == 0)
(void) qm_rcpt_da_expire(qmgr_ctx, aq_rcpt, time_now, &bounce_expire);
/* avoid endless loop in qm_get_edb_entries() */
if (bounce_expire <= time_now)
bounce_expire = time_now + 1;
if (sm_is_err(ret)) {
/* generate bounce... */
ret = qm_bounce_add(qmgr_ctx, aq_rcpt->aqr_ss_ta, aq_rcpt, NULL, NULL);
if (sm_is_success(ret))
flags |= ret;
}
if (sm_is_err(ret)) {
/* XXX What to do now? Log and try again later? */
QM_LEV_DPRINTFC(QDC_EDBR, 0, (QM_DEBFP, "sev=ERROR, func=qm_edbr_gen_dsn, aq_rcpt=%p, rcpt_idx=%d, ss_ta=%s, qm_bounce_add=%r\n", aq_rcpt, aq_rcpt->aqr_idx, aq_ta->aqt_ss_ta_id, ret));
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_DEFEDB,
SM_LOG_ERR, 4,
"sev=ERROR, func=qm_edbr_gen_dsn, ss_ta=%s, rcpt_pa=%S, stat=%d, qm_bounce_add=%m"
, aq_rcpt->aqr_ss_ta_id
, aq_rcpt->aqr_pa
, aq_rcpt->aqr_status, ret);
/* set return value; "fall through" for aq_rcpt removal */
flags = ret;
}
else {
rcpt_id_T rcpt_id;
flags |= ret;
sm_snprintf(rcpt_id, sizeof(rcpt_id), SMTP_RCPTID_FORMAT,
aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_idx);
aq_rcpt->aqr_next_try = bounce_expire;
/* create a "edbc_mv" function? */
ret = edbc_rm(edbc_ctx, edbc_node);
if (sm_is_err(ret)) {
QM_LEV_DPRINTFC(QDC_EDBR, 0, (QM_DEBFP, "sev=ERROR, func=qm_edbr_gen_dsn, aq_rcpt=%p, rcpt_idx=%d, ss_ta=%s, edbc_rm=%r\n", aq_rcpt, aq_rcpt->aqr_idx, aq_ta->aqt_ss_ta_id, ret));
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_DEFEDB,
SM_LOG_ERR, 4,
"sev=ERROR, func=qm_edbr_gen_dsn, ss_ta=%s, idx=%u, rcpt_pa=%S, stat=%d, edbc_rm=%m"
, aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_idx
, aq_rcpt->aqr_pa, aq_rcpt->aqr_status, ret);
}
ret = edbc_add(edbc_ctx, rcpt_id, aq_rcpt->aqr_next_try, false);
if (sm_is_err(ret)) {
QM_LEV_DPRINTFC(QDC_EDBR, 0, (QM_DEBFP, "sev=ERROR, func=qm_edbr_gen_dsn, aq_rcpt=%p, rcpt_idx=%d, ss_ta=%s, edbc_add=%r\n", aq_rcpt, aq_rcpt->aqr_idx, aq_ta->aqt_ss_ta_id, ret));
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_DEFEDB,
SM_LOG_ERR, 4,
"sev=ERROR, func=qm_edbr_gen_dsn, ss_ta=%s, idx=%u, rcpt_pa=%S, stat=%d, edbc_add=%m"
, aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_idx
, aq_rcpt->aqr_pa, aq_rcpt->aqr_status, ret);
}
ret = edb_rcpt_app(edb_ctx, aq_rcpt, &edb_req_hd, aq_rcpt->aqr_status);
if (sm_is_err(ret)) {
/*
** XXX How to deal with this error?
** Just ignore it and hope that it
** works next time the queue is read?
*/
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_DEFEDB,
SM_LOG_ERR, 4,
"sev=ERROR, func=qm_edbr_gen_dsn, ss_ta=%s, idx=%u, rcpt_pa=%S, stat=%d, edb_rcpt_app=%m"
, aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_idx
, aq_rcpt->aqr_pa, aq_rcpt->aqr_status, ret);
QM_LEV_DPRINTFC(QDC_EDBR, 0, (QM_DEBFP, "sev=ERROR, func=qm_edbr_gen_dsn, aq_rcpt=%p, ss_ta=%s, rcpt_idx=%u, edb_rcpt_app=%r\n", aq_rcpt, aq_ta->aqt_ss_ta_id, aq_rcpt->aqr_idx, ret));
}
}
(void) aq_rcpt_rm(aq_ctx, aq_rcpt, AQR_RM_LOCK);
SM_ASSERT(aq_ta->aqt_rcpts_inaq > 0);
--aq_ta->aqt_rcpts_inaq;
#if 0
if (aq_ta->aqt_rcpts_inaq == 0)
/* XXX remove TA from AQ; maybe do it later? */;
#endif
return flags;
}
/*
** Should this be a timer activated task?
** We could set the "sleep" time to the "next time to try" from EDBC.
** It's called from the scheduler, which is time triggered as well as
** event triggered.
*/
/*
** QM_GET_EDB_ENTRIES -- read some EDB entries and add them to AQ
** The entries are read in the order specified by EDBC.
**
** Parameters:
** qmgr_ctx -- QMGR context
** pnext_try -- (pointer to) seconds until next try (output)
**
** Returns:
** usual sm_error code
** should it be: >=0: number of elements not read due to some error
** or because AQ is almost filled?
**
** Called by: qmgr_sched()
**
** Locking:
** locks edbc_mutex
** locks aq_mutex when accessing AQ
** This will perform disk I/O and hence keeping aq_ctx
** locked during the entire function is "bad"...
** AQ needs to be locked if aq_ctx is manipulated (add/rm entries)
** As long as the scheduler does not run in parallel with this
** function, we don't need to lock aq_ta or aq_rcpt since only
** the scheduler will access them.
** AQ is also accessed if results come back from a DA, however,
** those functions lock edbc_mutex first, just as cleanup does.
** This means that this function will block any updates to AQ.
**
** Notes: should this skip over "bad" entries instead of stopping?
**
** Last code review: 2005-04-19 00:45:57; see comments/notes
** Last code change:
*/
sm_ret_T
qm_get_edb_entries(qmgr_ctx_P qmgr_ctx, int *pnext_try)
{
sm_ret_T ret, flags;
int r, usage;
uint fct_state, edbc_entries_cnt, edbc_entries_lim ;
bool edbc_aqfull;
time_T time_now;
aq_ctx_P aq_ctx;
edb_ctx_P edb_ctx;
edbc_ctx_P edbc_ctx;
edbc_node_P edbc_node, edbc_node_nxt;
edb_req_P edb_req;
aq_rcpt_P aq_rcpt;
aq_ta_P aq_ta, aq_ta_2;
edb_req_hd_T edb_req_hd;
#if QMGR_DEBUG
bool chkaqta;
#endif
/* function state flags */
#define FST_EDBC_LCK 0x01 /* edbc is locked */
#define FST_AQTA_ALLOC 0x02 /* aq_ta is allocated */
#define FST_WR_EDB 0x04 /* write edb */
SM_IS_QMGR_CTX(qmgr_ctx);
aq_ctx = qmgr_ctx->qmgr_aq;
SM_IS_AQ(aq_ctx);
edb_ctx = qmgr_ctx->qmgr_edb;
edbc_ctx = qmgr_ctx->qmgr_edbc;
edb_req = NULL;
aq_rcpt = NULL;
aq_ta = NULL;
edbc_node = NULL;
flags = 0;
fct_state = 0;
edbc_aqfull = EDBC_IS_FLAG(edbc_ctx, EDBC_FL_AQFULL);
time_now = evthr_time(qmgr_ctx->qmgr_ev_ctx);
if (pnext_try != NULL)
*pnext_try = 0;
#if QMGR_DEBUG
chkaqta = false;
#endif
/*R: N:edb_req, N:aq_ta, N:aq_rcpt, N:edbc_node */
/* only used for perm. failed rcpts to update "next time to try" */
EDBREQL_INIT(&edb_req_hd);
/*
** Note: it might be useful to check aq_usage() before trying
** to read any data: if the limits are exceeded, then there's no
** need to do anything. However, this keeps us from determining
** next_try. This could be solved by keeping the minimum value
** in edbc, however, that requires locking edbc. Overall, it
** does not seem to be worth the effort.
*/
ret = edb_req_new(edb_ctx, EDB_RQF_NONE, &edb_req, true);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_DEFEDB,
SM_LOG_ERROR, 4,
"sev=ERROR, func=qm_get_edb_entries, edb_req_new=%m"
, ret);
goto error;
}
/*R: A:edb_req, N:aq_ta, N:aq_rcpt, N:edbc_node */
/*
** Is it ok to keep EDBC locked? It's used in da_stat.
** However, if we don't keep it locked the order might be
** screwed up.
*/
r = pthread_mutex_lock(&edbc_ctx->edbc_mutex);
SM_LOCK_OK(r);
if (r != 0) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_DEFEDB,
SM_LOG_CRIT, 2,
"sev=CRIT, func=qm_get_edb_entries, lock_edbc=%d", r);
ret = sm_error_perm(SM_EM_AQ, r);
goto error;
}
SM_SET_FLAG(fct_state, FST_EDBC_LCK);
#if QMGR_DEBUG > 2
ebdc_print(edbc_ctx);
#endif
edbc_node = edbc_first(edbc_ctx);
/*R: A:edb_req, N:aq_ta, N:aq_rcpt, P:edbc_node */
QM_LEV_DPRINTFC(QDC_EDBR, 3, (QM_DEBFP, "sev=DBG, func=qm_get_edb_entries, now=%6ld, entries=%d\n", (long) time_now, edbc_ctx->edbc_entries));
edbc_entries_cnt = 0;
edbc_entries_lim = edbc_ctx->edbc_entries;
while (edbc_node != NULL && edbc_node->ecn_next_try <= time_now
&& edbc_entries_cnt <= edbc_entries_lim )
{
edbc_node_nxt = edbc_next(edbc_ctx, edbc_node);
++edbc_entries_cnt;
QM_LEV_DPRINTFC(QDC_EDBR, 3, (QM_DEBFP, "sev=DBG, func=qm_get_edb_entries, id=%s, next_try=%6ld, now=%6ld\n", edbc_node->ecn_rcpt_id, (long) edbc_node->ecn_next_try, (long) time_now));
/*
** Check whether there is enough space in AQ.
** Get recipient from DEFEDB and put it into AQ.
** If corresponding TA isn't in AQ: fetch that too.
*/
usage = aq_usage(aq_ctx, AQ_USAGE_ALL);
if (usage >= AQ_USAGE_ALL_RDEDB) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_DEFEDB,
SM_LOG_INFO, edbc_aqfull ? 19 : 9,
"sev=INFO, func=qm_get_edb_entries, aq_usage=%d, status=stop_adding_entries_from_defebd"
, usage);
EDBC_SET_FLAG(edbc_ctx, EDBC_FL_AQFULL);
break;
}
/*
** defedb/AQ usage must not exceed AQ_USAGE_DEF_RDEDB if
** total usage exceeds lower threshold for throttling.
** This makes sure that defedb entries do not cause SMTPS
** to throttle (if the values are set properly).
*/
r = aq_usage(aq_ctx, AQ_USAGE_DEFEDB);
if (r >= AQ_USAGE_DEF_RDEDB &&
usage >= qmgr_ctx->qmgr_lower[QMGR_RFL_AQ_I])
{
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_DEFEDB,
SM_LOG_INFO, edbc_aqfull ? 19 : 9,
"sev=INFO, func=qm_get_edb_entries, aq_usage=%d, aq_usage_defedb=%d, status=stop_adding_entries_from_defebd"
, usage, r);
EDBC_SET_FLAG(edbc_ctx, EDBC_FL_AQFULL);
break;
}
EDBC_CLR_FLAG(edbc_ctx, EDBC_FL_AQFULL);
/* Should we immediately add this to AQ? */
ret = aq_ta_add_new(aq_ctx, &aq_ta, AQ_TA_FL_DEFEDB, 1, THR_LOCK_UNLOCK);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_DEFEDB,
SM_LOG_ERROR, 4,
"sev=ERROR, func=qm_get_edb_entries, aq_ta_add_new=%m"
, ret);
goto error;
}
/*R: A:edb_req, A/L:aq_ta, N:aq_rcpt, P:edbc_node */
SM_SET_FLAG(fct_state, FST_AQTA_ALLOC);
ret = aq_rcpt_add_new(aq_ctx, aq_ta, &aq_rcpt, AQR_FL_DEFEDB, THR_LOCK_UNLOCK);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_DEFEDB,
SM_LOG_ERROR, 4,
"sev=ERROR, func=qm_get_edb_entries, aq_rcpt_add_new=%m"
, ret);
goto error;
}
/*R: A:edb_req, A/L:aq_ta, A:aq_rcpt, P:edbc_node */
#if QMGR_DEBUG
chkaqta = true; /* really? */
#endif
/* Note: we have to avoid starvation of DEFEDB entries! */
RCPT_ID_COPY(edb_req->edb_req_id, edbc_node->ecn_rcpt_id);
edb_req->edb_req_type = EDB_REQ_RCPT;
ret = edb_rd_req(edb_ctx, edb_req);
if (sm_is_err(ret)) {
if (ret == sm_error_perm(SM_EM_EDB, DB_NOTFOUND)) {
QM_LEV_DPRINTFC(QDC_EDBR, 1, (QM_DEBFP, "sev=WARN, func=qm_get_edb_entries, edb_rd_req(%s)=%r\n", edbc_node->ecn_rcpt_id, ret));
goto next_item;
}
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_DEFEDB,
SM_LOG_ERROR, 4,
"sev=ERROR, func=qm_get_edb_entries, rcpt_id=%s, edb_rd_req=%m"
, edbc_node->ecn_rcpt_id, ret);
goto error;
}
/*R: A/R:edb_req, A/L:aq_ta, A:aq_rcpt, P:edbc_node */
/* AQ rcpt must be locked? Only when we update the flags. */
ret = edb_rcpt_dec(edb_req, aq_rcpt);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_DEFEDB,
SM_LOG_ERROR, 4,
"sev=ERROR, func=qm_get_edb_entries, rcpt_id=%s, edrcpt_dec=%m"
, edbc_node->ecn_rcpt_id, ret);
goto error;
}
/*R: A/R:edb_req, A/L:aq_ta, A/R:aq_rcpt, P:edbc_node */
AQR_DA_INIT(aq_rcpt);
aq_rcpt->aqr_entered = time_now;
/* ignore result, a failure can be tolerated */
(void) aq_rcpt_set_domain(aq_rcpt, qmgr_ctx->qmgr_hostname);
/* Check whether TA is already in AQ */
ret = aq_ta_find(aq_ctx, aq_rcpt->aqr_ss_ta_id, true, &aq_ta_2);
QM_LEV_DPRINTFC(QDC_EDBR, 4, (QM_DEBFP, "sev=DBG, func=qm_get_edb_entries, aq_ta_find(%s)=%r\n", aq_rcpt->aqr_ss_ta_id, ret));
if (SM_SUCCESS == ret) {
aq_rcpt_P aq_rcpt_2;
/*R: A/R:edb_req, A/L:aq_ta, P:aq_ta_2, A/R:aq_rcpt, P:edbc_node */
/* already there; move rcpt to this TA, remove other */
ret = aq_rcpt_find_one_ss(aq_ctx, aq_rcpt->aqr_ss_ta_id,
THR_LOCK_UNLOCK, &aq_rcpt_2);
if (sm_is_err(ret) || NULL == aq_rcpt_2) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_DEFEDB,
SM_LOG_ERROR, 4,
"sev=ERROR, func=qm_get_edb_entries, ss_ta=%s, aq_rcpt_find_one_ss=%m"
, aq_rcpt->aqr_ss_ta_id, ret);
AQR_SS_INIT(aq_rcpt);
}
/*R: A/R:edb_req, A/L:aq_ta, P:aq_ta_2, A/R:aq_rcpt, P:aq_rcpt_2, P:edbc_node */
else if (aq_rcpt_2 == aq_rcpt) {
/* ??? are there others? */
AQR_SS_INIT(aq_rcpt);
}
else {
/* currently this doesn't fail */
aq_rcpt_ss_insert(aq_rcpt_2, aq_rcpt);
}
QM_LEV_DPRINTFC(QDC_EDBR, 4, (QM_DEBFP, "sev=DBG, func=qm_get_edb_entries, old_aq_ta=%p, aq_rcpt=%p, flags=%#x\n", aq_ta_2, aq_rcpt, aq_rcpt->aqr_flags ));
#if QMGR_DEBUG
{
aq_rcpt_P aq_rcpt_nxt;
QM_LEV_DPRINTFC(QDC_EDBR, 4, (QM_DEBFP, "sev=DBG, func=qm_get_edb_entries, aq_rcpt=%p, aq_rcpt_2=%p\n", aq_rcpt, aq_rcpt_2));
for (aq_rcpt_nxt = AQR_SS_SUCC(aq_rcpt);
aq_rcpt_nxt != NULL &&
aq_rcpt_nxt != aq_rcpt;
aq_rcpt_nxt = AQR_SS_SUCC(aq_rcpt_nxt))
{
QM_LEV_DPRINTFC(QDC_EDBR, 4, (QM_DEBFP, "sev=DBG, func=qm_get_edb_entries, aq_rcpt_nxt=%p\n", aq_rcpt_nxt));
}
}
#endif /* QMGR_DEBUG */
/* XXX Increment AQ TA counters? */
++aq_ta_2->aqt_rcpts_inaq;
aq_rcpt->aqr_ss_ta = aq_ta_2;
/* free aq_ta, it's not used anymore */
ret = aq_ta_rm(aq_ctx, aq_ta, true);
SM_CLR_FLAG(fct_state, FST_AQTA_ALLOC);
/*R: A/R:edb_req, F:aq_ta, P:aq_ta_2, A/R:aq_rcpt, U:aq_rcpt_2, P:edbc_node */
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_DEFEDB,
SM_LOG_ERR, 2,
"sev=ERROR, func=qm_get_edb_entries, ss_ta=%s, aq_ta_rm=%m"
, aq_ta->aqt_ss_ta_id, ret);
}
/* for further checks down below */
aq_ta = aq_ta_2;
/*R: A/R:edb_req, P:aq_ta, U:aq_ta_2, A/R:aq_rcpt, U:aq_rcpt_2, P:edbc_node */
}
else
{
/*R: A/R:edb_req, A/L:aq_ta, U:aq_ta_2, A/R:aq_rcpt, P:edbc_node */
/* not found, get it from DEFEDB */
SESSTA_COPY(edb_req->edb_req_id, aq_rcpt->aqr_ss_ta_id);
edb_req->edb_req_type = EDB_REQ_TA;
ret = edb_rd_req(edb_ctx, edb_req);
if (sm_is_err(ret)) {
QM_LEV_DPRINTFC(QDC_EDBR, 0, (QM_DEBFP, "sev=ERROR, func=qm_get_edb_entries, edb_rd_req(%s)=%r\n", edb_req->edb_req_id, ret));
goto error;
}
ret = edb_ta_dec(edb_req, aq_ta);
if (sm_is_err(ret)) {
QM_LEV_DPRINTFC(QDC_EDBR, 0, (QM_DEBFP, "sev=ERROR, func=qm_get_edb_entries, edb_ta_dec(%s)=%r\n", edb_req->edb_req_id, ret));
goto error;
}
/*R: A/R:edb_req, A/L/R:aq_ta, U:aq_ta_2, A/R:aq_rcpt, P:edbc_node */
QM_LEV_DPRINTFC(QDC_EDBR, 6, (QM_DEBFP, "sev=DBG, func=qm_get_edb_entries, aq_rcpt=%p, aq_ta=%p, ss_ta=%s, mail=%@S, cdb=%C\n", aq_rcpt, aq_ta, aq_ta->aqt_ss_ta_id, aq_ta->aqt_mail->aqm_pa, aq_ta->aqt_cdb_id));
QM_LEV_DPRINTFC(QDC_EDBR, 6, (QM_DEBFP, "sev=DBG, func=qm_get_edb_entries, tried=%u, left=%u, temp=%u, perm=%u\n", aq_ta->aqt_rcpts_tried, aq_ta->aqt_rcpts_left, aq_ta->aqt_rcpts_temp, aq_ta->aqt_rcpts_perm));
AQR_SS_INIT(aq_rcpt);
aq_ta->aqt_rcpts_inaq = 1;
}
/*R: "merge" of then/else: problem with aq_ta? added FST flag */
/*R: A/R:edb_req, P:aq_ta, A/R:aq_rcpt, P:edbc_node */
/*R: A/R:edb_req, A/L/R:aq_ta, A/R:aq_rcpt, P:edbc_node */
/*R: after "merge": */
/*R: A/R:edb_req, (A/L/R|P):aq_ta, A/R:aq_rcpt, P:edbc_node */
if (AQR_IS_FLAG(aq_rcpt, AQR_FL_PERM)) {
ret = qm_edbr_gen_dsn(qmgr_ctx, aq_ctx, aq_ta,
aq_rcpt, edb_ctx, edb_req_hd, edbc_ctx,
edbc_node, time_now);
aq_rcpt = NULL;
if (sm_is_err(ret)) {
if (sm_error_value(ret) == ENOMEM) {
edbc_node = NULL;
goto error;
}
}
else {
flags |= ret;
SM_SET_FLAG(fct_state, FST_WR_EDB);
}
/*R: A/R:edb_req, (A/L/R|P):aq_ta, N:aq_rcpt, (P|F):edbc_node */
/* keep this edbc_node for retry in next run */
goto next_item;
}
else if (!AQR_IS_FLAG(aq_rcpt, AQR_FL_MEMAR) &&
!AQR_IS_FLAG(aq_rcpt, AQR_FL_RCVD4AR))
{
/* send it to AR */
ret = qmgr_rcpt2ar(qmgr_ctx, aq_rcpt, THR_LOCK_UNLOCK);
if (sm_is_err(ret)) {
/* and now? remove it again? */
QM_LEV_DPRINTFC(QDC_EDBR, 0, (QM_DEBFP, "sev=ERROR, func=qm_get_edb_entries, aq_rcpt=%p, rcpt_idx=%d, ss_ta=%s, qmgr_rcpt2ar=%r\n", aq_rcpt, aq_rcpt->aqr_idx, aq_ta->aqt_ss_ta_id, ret));
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_DEFEDB,
SM_LOG_ERR, 4,
"sev=ERROR, func=qm_get_edb_entries, ss_ta=%s, rcpt_pa=%S, stat=%d, qmgr_rcpt2ar=%m"
, aq_rcpt->aqr_ss_ta_id
, aq_rcpt->aqr_pa
, aq_rcpt->aqr_status, ret);
/* stop on other errors?? */
if (sm_error_value(ret) == SM_E_NO_AR ||
sm_error_value(ret) == ENOMEM)
goto error;
/* keep aq_rcpt? cleanup will take care of it */
}
else
flags |= QDA_FL_ACT_SMAR;
/*R: [no change] A/R:edb_req, (A/L/R|P):aq_ta, A/R:aq_rcpt, P:edbc_node */
}
else {
/* add rcpt to todo queue if address is available */
ret = aq_rdq_add(aq_ctx, aq_rcpt, NULL, THR_LOCK_UNLOCK);
if (sm_is_err(ret)) {
/* what to do on error?? stop for now.*/
QM_LEV_DPRINTFC(QDC_EDBR, 0, (QM_DEBFP, "sev=ERROR, func=qm_get_edb_entries, aq_rdq_add=%r\n", ret));
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_DEFEDB,
SM_LOG_ERR, 4,
"sev=ERROR, func=qm_get_edb_entries, ss_ta=%s, rcpt_pa=%S, stat=%d, aq_rdq_add=%m"
, aq_rcpt->aqr_ss_ta_id
, aq_rcpt->aqr_pa
, aq_rcpt->aqr_status, ret);
goto error;
}
/*R: [no change] A/R:edb_req, (A/L/R|P):aq_ta, A/R:aq_rcpt, P:edbc_node */
}
/*R: [no change] A/R:edb_req, (A/L/R|P):aq_ta, A/R:aq_rcpt, P:edbc_node */
/* Remove edbc_node */
if (edbc_node != NULL && sm_is_err(ret = edbc_rm(edbc_ctx, edbc_node)))
{
/* XXX the node might be freed! hence data is wrong! */
QM_LEV_DPRINTFC(QDC_EDBR, 0, (QM_DEBFP, "sev=ERROR, func=qm_get_edb_entries, edbc_rm(%p, %ld, %s)=%r\n", edbc_node, (long) edbc_node->ecn_next_try, edbc_node->ecn_rcpt_id, ret));
}
/*R: A/R:edb_req, (A/L/R|P):aq_ta, A/R:aq_rcpt, (P|F):edbc_node */
/* check for temp fail, increase counter */
if (aq_rcpt != NULL &&
smtp_is_reply_temp(aq_rcpt->aqr_status) &&
aq_ta->aqt_rcpts_temp == 0)
{
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_DEFEDB,
SM_LOG_ERR, 1,
"sev=ERROR, func=qm_get_edb_entries, ss_ta=%s, rcpt_pa=%S, stat=%d, aqt_rcpts_temp=0, status=inconsistent_data"
, aq_rcpt->aqr_ss_ta_id
, aq_rcpt->aqr_pa
, aq_rcpt->aqr_status);
++aq_ta->aqt_rcpts_temp;
}
next_item:
edbc_node = edbc_node_nxt;
/* Reset pointers to avoid accidental free */
aq_ta = NULL;
aq_rcpt = NULL;
/*R: A/R:edb_req, N:aq_ta, N:aq_rcpt, P:edbc_node */
}
if (edbc_entries_cnt > edbc_entries_lim ) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_DEFEDB,
SM_LOG_ERR, 1,
"sev=ERROR, func=qm_get_edb_entries, status=loop_exceeded_limit, count=%u, expected=%u"
, edbc_entries_cnt, edbc_entries_lim );
}
if (edbc_node != NULL) {
if (pnext_try != NULL)
*pnext_try = edbc_node->ecn_next_try - time_now;
QM_LEV_DPRINTFC(QDC_EDBR, 4, (QM_DEBFP, "sev=DBG, func=qm_get_edb_entries, stop loop due to next_try; id=%s, next_try=%6ld, now=%6ld, diff=%ld\n", edbc_node->ecn_rcpt_id, (long) edbc_node->ecn_next_try, (long) time_now, (long) (edbc_node->ecn_next_try - time_now)));
}
else {
QM_LEV_DPRINTFC(QDC_EDBR, 5, (QM_DEBFP, "sev=DBG, func=qm_get_edb_entries, stop loop, all entries done\n"));
}
if (aq_rcpt != NULL) {
(void) aq_rcpt_rm(aq_ctx, aq_rcpt, AQR_RM_LOCK);
aq_rcpt = NULL;
}
if (aq_ta != NULL && SM_IS_FLAG(fct_state, FST_AQTA_ALLOC)) {
(void) aq_ta_rm(aq_ctx, aq_ta, true);
aq_ta = NULL; /* not really necessary */
SM_CLR_FLAG(fct_state, FST_AQTA_ALLOC);
}
if (SM_IS_FLAG(fct_state, FST_WR_EDB)) {
ret = edb_wr_status(edb_ctx, &edb_req_hd);
if (sm_is_err(ret)) {
/* XXX What to do? */
QM_LEV_DPRINTFC(QDC_EDBR, 2, (QM_DEBFP, "sev=ERROR, func=qm_get_edb_entries, edb_wr_stat=%r\n", ret));
}
}
else
(void) edb_reql_free(edb_ctx, &edb_req_hd);
SM_ASSERT(SM_IS_FLAG(fct_state, FST_EDBC_LCK));
r = pthread_mutex_unlock(&edbc_ctx->edbc_mutex);
if (r != 0) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_DEFEDB,
SM_LOG_CRIT, 1,
"sev=CRIT, func=qm_get_edb_entries, unlock_edbc=%d"
, r);
SM_ASSERT(r == 0);
}
SM_CLR_FLAG(fct_state, FST_EDBC_LCK);
if (edb_req != NULL) {
(void) edb_req_rel(edb_ctx, edb_req, 0, THR_LOCK_UNLOCK);
edb_req = NULL;
}
#if QMGR_DEBUG
if (chkaqta)
chkaq_ta(qmgr_ctx, AQ_TAS_FIRST(aq_ctx));
#endif
return flags;
error:
/* invoke edb_wr_status(edb_ctx, &edb_req_hd); ?? */
if (SM_IS_FLAG(fct_state, FST_WR_EDB))
(void) edb_wr_status(edb_ctx, &edb_req_hd);
else
(void) edb_reql_free(edb_ctx, &edb_req_hd);
if (aq_rcpt != NULL) {
(void) aq_rcpt_rm(aq_ctx, aq_rcpt, AQR_RM_LOCK);
aq_rcpt = NULL; /* not really necessary */
}
if (aq_ta != NULL && SM_IS_FLAG(fct_state, FST_AQTA_ALLOC)) {
(void) aq_ta_rm(aq_ctx, aq_ta, true);
aq_ta = NULL; /* not really necessary */
SM_CLR_FLAG(fct_state, FST_AQTA_ALLOC);
}
if (SM_IS_FLAG(fct_state, FST_EDBC_LCK)) {
r = pthread_mutex_unlock(&edbc_ctx->edbc_mutex);
if (r != 0) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SCHED, QM_LMOD_DEFEDB,
SM_LOG_CRIT, 1,
"sev=CRIT, func=qm_get_edb_entries, unlock_edbc=%d"
, r);
SM_ASSERT(r == 0);
}
SM_CLR_FLAG(fct_state, FST_EDBC_LCK);
}
if (edb_req != NULL) {
(void) edb_req_rel(edb_ctx, edb_req,
(sm_is_err(ret) && sm_error_value(ret) == ENOMEM)
? EDB_RQF_FREE : 0,
THR_LOCK_UNLOCK);
edb_req = NULL; /* not really necessary */
}
QM_LEV_DPRINTFC(QDC_EDBR, 0, (QM_DEBFP, "sev=ERROR, qm_get_edb_entries=%r\n", ret));
return ret;
}
syntax highlighted by Code2HTML, v. 0.9.1