/*
* Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers.
* All rights reserved.
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the sendmail distribution.
*/
#include "sm/generic.h"
SM_RCSID("@(#)$Id: da_stat.c,v 1.214 2007/10/19 02:45:05 ca Exp $")
#include "sm/error.h"
#include "sm/assert.h"
#include "sm/memops.h"
#include "sm/io.h"
#include "sm/rcb.h"
#include "sm/qmgr.h"
#include "sm/qmgr-int.h"
#include "sm/edb.h"
#include "qmgr.h"
#include "log.h"
/*
** Question: Use a struct to pass data between the various update functions?
** That struct would "collect" all necessary parameters in a single
** entity and it could also be used to set status flags, e.g., about
** some failures (e.g., generating a request failed), as well as
** returning results (e.g., iqdb_rcpts_done and return flags).
** This however requires to specify where the "packing" is done:
** only in "internal" update functions?
*/
/*
** QDA_UPD_IQDB -- Update IQDB status for one transaction; if all recipients
** have been delivered then remove the transaction from IQDB and
** add a request to remove it from IBDB.
**
** Parameters:
** qmgr_ctx -- QMGR context
** iqdb_rcpts_done -- number of recipients done
** ss_ta_id -- SMTPS transaction id
** cdb_id -- CDB id
** ibdb_req_hd -- head of request list for IBDB
**
** Returns:
** usual sm_error code; ENOMEM, SM_E_NOTFOUND,
** Caller should throttle/shutdown system in case of resource
** problems.
**
** Side Effects:
** decrements qss_ta->qssta_rcpts_tot (even on error)
** removes qss_ta even if IBDB update fails, this can cause
** duplicate deliveries on restart.
**
** Called by: qda_upd_ta_rcpt_stat(), qar_alias()
**
** Locking: locks qss_ta during operation, returns unlocked
**
** Last code review: 2005-04-03 22:30:08; see comments!
** Last code change:
*/
sm_ret_T
qda_upd_iqdb(qmgr_ctx_P qmgr_ctx, uint iqdb_rcpts_done, sessta_id_T ss_ta_id, cdb_id_P cdb_id, ibdb_req_hd_P ibdb_req_hd)
{
sm_ret_T ret, rv;
int r;
qss_ta_P qss_ta;
ibdb_ta_T ibdb_ta;
rv = SM_SUCCESS;
qss_ta = (qss_ta_P) iqdb_lookup(qmgr_ctx->qmgr_iqdb, ss_ta_id,
SMTP_STID_SIZE, THR_LOCK_UNLOCK);
/*
** Race condition: qss_ta is not locked! However, this is only
** a problem if some other function removes qss_ta which cannot
** happen as there are still "references" to it (qssta_rcpts_tot).
*/
if (NULL == qss_ta) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_CRIT, 0,
"sev=FATAL, func=qda_upd_iqdb, ss_ta=%s, iqdb_lookup=not_found",
ss_ta_id);
return sm_error_perm(SM_EM_Q_IQDB, SM_E_NOTFOUND);
}
r = pthread_mutex_lock(&qss_ta->qssta_mutex);
SM_LOCK_OK(r);
if (r != 0) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_CRIT, 1,
"sev=FATAL, func=qda_upd_iqdb, ss_ta=%s, lock=%d",
ss_ta_id, r);
return sm_error_perm(SM_EM_Q, r);
}
QM_LEV_DPRINTFC(QDC_UPDRCPT, 1, (QM_DEBFP, "sev=DBG, func=qda_upd_iqdb, qss_ta=%p, locked=%d, qssta_rcpts_tot=%d, iqdb_rcpts_done=%d\n", qss_ta, r, qss_ta->qssta_rcpts_tot, iqdb_rcpts_done));
SM_ASSERT(qss_ta->qssta_rcpts_tot >= iqdb_rcpts_done);
/* Update recipient counter */
qss_ta->qssta_rcpts_tot -= iqdb_rcpts_done;
if (qss_ta->qssta_rcpts_tot > 0) {
r = pthread_mutex_unlock(&qss_ta->qssta_mutex);
if (r != 0) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_CRIT, 1,
"sev=FATAL, func=qda_upd_iqdb, ss_ta=%s, unlock=%d",
ss_ta_id, r);
}
SM_ASSERT(0 == r);
return SM_SUCCESS;
}
/*
** No recipients left: remove transaction from IQDB and IBDB.
*/
ret = iqdb_trans_rm(qmgr_ctx->qmgr_iqdb, qss_ta->qssta_id,
SMTP_STID_SIZE, THR_LOCK_UNLOCK);
QM_LEV_DPRINTFC(QDC_UPDRCPT, 1, (QM_DEBFP, "sev=DBG, func=qda_upd_iqdb, iqdb_trans_rm(%s)\n", qss_ta->qssta_id));
if (sm_is_err(ret)) {
/* can this happen if cleanup() removed it? */
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_CRIT, 1,
"sev=FATAL, func=qda_upd_iqdb, ss_ta=%s, iqdb_trans_rm=%m",
ss_ta_id, ret);
}
if (QMGR_IS_RFLAG_I(qmgr_ctx, QMGR_RFL_IQD)) {
ret = qm_resource(qmgr_ctx, QMGR_UN_THROTTLE,
iqdb_usage(qmgr_ctx->qmgr_iqdb), QMGR_RFL_IQD);
}
ibdb_ta.ibt_ta_id = qss_ta->qssta_id;
ibdb_ta.ibt_mail_pa = qss_ta->qssta_mail->qsm_pa;
ibdb_ta.ibt_cdb_id = cdb_id;
ibdb_ta.ibt_nrcpts = qss_ta->qssta_rcpts_tot;
/*
** This could be written directly to IBDB if all recipients
** have been successfully delivered. (enhancement? pass a flag?)
*/
ret = ibdb_ta_app(qmgr_ctx->qmgr_ibdb, &ibdb_ta, ibdb_req_hd,
IBDB_TA_DONE);
if (sm_is_err(ret)) {
rv = ret;
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_ERR, 1,
"sev=ERROR, func=qda_upd_iqdb, ta=%s, ibdb_ta_app=%m",
ibdb_ta.ibt_ta_id, ret);
}
/*
** That's not really correct if an error occurred before...
** However, it is complicated to handle removal errors:
** e.g., removal from IQDB can't fail temporarily, but
** ibdb_ta_app() can fail temporarily due to "out of memory".
** Nevertheless, the TA has been taken care of so it must be removed
** from the internal DBs.
**
** really QSS_TA_FL_AQ_RM??
*/
QSS_TA_SET_FLAG(qss_ta,
QSS_TA_FL_IBDB_RM|QSS_TA_FL_IQDB_RM|QSS_TA_FL_AQ_RM);
/* qss_ta_free() unlocks qss_ta */
ret = qss_ta_free(qss_ta, true, QSS_TA_FREE_DA, 0);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_ERR, 1,
"sev=ERROR, func=qda_upd_iqdb, ss_ta=%s, qss_ta_free=%m"
, ss_ta_id, ret);
if (!sm_is_err(rv))
rv = ret;
}
return rv;
}
#define DA_TA_ID_IS_VALID(da_ta_id) ((da_ta_id)[0] != '\0')
/*
** QDA_DADB_CLOSE -- close DADB session/transaction etc
**
** Parameters:
** qmgr_ctx -- QMGR context
** da_ta_id -- DA transaction id
** dadb_ctx -- DA DB context (might be NULL)
** dadb_entry -- DA DB entry
** status -- status of (entire) transaction
** pflags -- (pointer to) flags (output)
** (currently can only set QDA_FL_ACT_SCHED)
**
** Returns:
** usual sm_error code
*/
sm_ret_T
qda_dadb_close(qmgr_ctx_P qmgr_ctx, sessta_id_T da_ta_id, dadb_ctx_P dadb_ctx, dadb_entry_P dadb_entry, sm_ret_T status, sm_ret_T *pflags)
{
sm_ret_T ret;
uint32_t daflags;
ret = SM_SUCCESS;
if (!DA_TA_ID_IS_VALID(da_ta_id) || NULL == dadb_ctx)
return SM_SUCCESS;
SM_REQUIRE(pflags != NULL);
daflags = 0;
if (DADBE_IS_FLAG(dadb_entry, DADBE_FL_SE_CL)) {
QM_LEV_DPRINTFC(QDC_UPDRCPT, 1, (QM_DEBFP, "sev=DBG, func=qda_dadb_close, dadb_entry=se_close, da_se=%s, da_ta=%s, flags=%#x, status=%r\n", dadb_entry->dadbe_da_se_id, dadb_entry->dadbe_da_ta_id, dadb_entry->dadbe_flags, status));
ret = dadb_sess_close_entry(qmgr_ctx, dadb_ctx,
dadb_entry, SM_SUCCESS == status, &daflags,
THR_LOCK_UNLOCK);
if (sm_is_err(ret)) {
/* error is only logged, OK?? */
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_ERR, 1,
"sev=ERROR, func=qda_dadb_close, da_ta=%s, dadb_sess_close_entry=%m",
da_ta_id, ret);
}
else if (QCNF_IS_FLAG(qmgr_ctx, QCNF_FL_SE_REUSE)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_INFO, 9,
"sev=INFO, func=qda_dadb_close, da_ta=%s, where=dadb_sess_close_entry, cur=%u",
da_ta_id, dadb_ctx->dadb_entries_cur);
}
QM_LEV_DPRINTFC(QDC_WKSCHED, 2, (QM_DEBFP, "sev=DBG, func=qda_dadb_close, daflags=%#x\n", ret));
DADBE_CLR_FLAG(dadb_entry, DADBE_FL_SE_CL);
if (SM_IS_FLAG(daflags, OCCE_FL_SE_WAIT))
*pflags |= QDA_FL_ACT_SCHED;
else if (SM_IS_FLAG(daflags, DADBE_FL_AVAIL) &&
SM_IS_FLAG(daflags, OCCE_FL_BLW_LIM))
*pflags |= QDA_FL_ACT_SCHED;
}
if (DADBE_IS_FLAG(dadb_entry, DADBE_FL_TA_CL)) {
QM_LEV_DPRINTFC(QDC_UPDRCPT, 1, (QM_DEBFP, "sev=DBG, func=qda_dadb_close, dadb_entry=ta_close, da_se=%s, da_ta=%s, flags=%#x\n", dadb_entry->dadbe_da_se_id, dadb_entry->dadbe_da_ta_id, dadb_entry->dadbe_flags));
ret = dadb_ta_close_entry(qmgr_ctx, dadb_ctx,
dadb_entry, &daflags, THR_LOCK_UNLOCK);
if (sm_is_err(ret)) {
/* error is only logged, OK?? */
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_ERR, 1,
"sev=ERROR, func=qda_dadb_close, da_ta=%s, dadb_ta_close_entry=%m",
da_ta_id, ret);
}
DADBE_CLR_FLAG(dadb_entry, DADBE_FL_TA_CL);
if (SM_IS_FLAG(daflags, OCCE_FL_TA_WAIT))
*pflags |= QDA_FL_ACT_SCHED;
}
QM_LEV_DPRINTFC(QDC_UPDRCPT, 2, (QM_DEBFP, "sev=DBG, func=qda_dadb_close, where=se/ta_close, daflags=%#x\n", daflags));
return ret;
}
/*
** QDA_UPD_TA_RCPT_STAT -- Update DA status for one transaction (da_ta_id)
** or one recipient (aq_rcpt).
**
** This function may append entries to the update queues for
** DEFEDB and IBDB which need to be taken care of by the caller.
**
** Parameters:
** qmgr_ctx -- QMGR context
** da_ta_id -- DA transaction id
** status -- status of (entire) transaction
** err_st -- state which cause error
** dadb_ctx -- DA DB context (might be NULL)
** dadb_entry -- DA DB entry
** aq_ta -- AQ transaction
** aq_rcpt -- AQ recipient
** edb_req_hd -- head of request list for (DEF)EDB
** ibdb_req_hd -- head of request list for IBDB
** errmsg -- error message (might be NULL)
** (must be "sanitized" by caller)
** pdelay_next_try -- (pointer to) delay until next try (output)
**
** Returns:
** success: flags as shown in sm/qmgr-int.h
** to activate scheduler or SMAR.
** Alternatively this might be an output parameter.
** error: usual sm_error code
**
** Side Effects:
** on error: see q_upd_rcpt_stat(),
** e.g., may write to ibdb, change ibdb/edb request lists.
**
** Called by:
** qda_update_ta_stat()
** qmgr_aq_cleanup()
**
** Locking: edbc and aq_ctx must be locked by caller
** may lock (unconditionally) dadb_ctx (if valid)
**
** Operation:
** Either a DA TA id must be specified (da_ta_id) or a single
** recipient (aq_rcpt). In the former case all recipients for
** that transactions will get a status update, in the latter case
** only the specified recipient.
*/
sm_ret_T
qda_upd_ta_rcpt_stat(qmgr_ctx_P qmgr_ctx, sessta_id_T da_ta_id, sm_ret_T status,
uint err_st, dadb_ctx_P dadb_ctx, dadb_entry_P dadb_entry,
aq_ta_P aq_ta, aq_rcpt_P aq_rcpt, edb_req_hd_P edb_req_hd,
ibdb_req_hd_P ibdb_req_hd, sm_str_P errmsg, int *pdelay_next_try)
{
uint iqdb_rcpts_done, nrcpts, entries;
sm_ret_T ret, flags, rv;
aq_rcpt_P aq_rcpt_nxt;
aq_ctx_P aq_ctx;
sessta_id_T ss_ta_id;
cdb_id_P cdb_id;
bool rmcdb;
/* One of these must be set */
SM_REQUIRE(DA_TA_ID_IS_VALID(da_ta_id) || aq_rcpt != NULL);
SM_IS_QMGR_CTX(qmgr_ctx);
SM_IS_AQ_TA(aq_ta);
if (DA_TA_ID_IS_VALID(da_ta_id))
SM_IS_DADBE(dadb_entry);
aq_ctx = qmgr_ctx->qmgr_aq;
SM_IS_AQ(aq_ctx);
QM_LEV_DPRINTFC(QDC_UPDRCPT, 1, (QM_DEBFP, "sev=DBG, func=qda_upd_ta_rcpt_stat, stat=%d, err_st=%#x, da_ta=%s, aqt_rcpts_inaq=%u\n", status, err_st, da_ta_id, aq_ta->aqt_rcpts_inaq));
if (DA_TA_ID_IS_VALID(da_ta_id))
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DA, QM_LMOD_DASTAT,
SM_LOG_DEBUG, 9,
"func=qda_upd_ta_rcpt_stat, da_ta=%s, stat=%d, err_state=%#x, aqt_rcpts_inaq=%u",
da_ta_id, status, err_st, aq_ta->aqt_rcpts_inaq);
else if (aq_rcpt != NULL) { /* Must be true, see above */
SM_IS_AQ_RCPT(aq_rcpt);
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DA, QM_LMOD_DASTAT,
SM_LOG_DEBUG, 8,
"func=qda_upd_ta_rcpt_stat, ss_ta=%s, rcpt=%@S, rcpt_idx=%d, stat=%d, err_state=%#x",
aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_pa,
aq_rcpt->aqr_idx, status, err_st);
}
rmcdb = false;
flags = 0;
nrcpts = 0;
cdb_id = SM_CSTR_DUP(aq_ta->aqt_cdb_id);
iqdb_rcpts_done = 0;
rv = SM_SUCCESS;
/* Find one recipient from this DA transaction */
if (NULL == aq_rcpt) {
/* If aq_rcpt is not passed in, then dadb_entry must be valid */
SM_ASSERT(dadb_entry != NULL);
/* Get SMTPS transaction ID */
SESSTA_COPY(ss_ta_id, dadb_entry->dadbe_ss_ta_id);
aq_rcpt = dadb_entry->dadbe_rcpt;
if (NULL == aq_rcpt) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_ERR, 1,
"sev=ERROR, func=qda_upd_ta_rcpt_stat, da_ta=%s, aq_rcpt=NULL",
da_ta_id);
}
}
else {
/* Get SMTPS transaction ID from recipient */
SESSTA_COPY(ss_ta_id, aq_rcpt->aqr_ss_ta_id);
}
/*
** Go through all recipients for this DA transaction
** and update their status. Also update AQ TA status
** (recipient counters) and the queues in which the recipients
** are actually stored (IQDB, IBDB, DEFED).
*/
entries = aq_ctx->aq_entries;
for (; aq_rcpt != NULL; aq_rcpt = aq_rcpt_nxt) {
/*
** Get next entry; if it points back to current: it's the
** last one, but only because every element is removed
** at the end of the loop.
*/
aq_rcpt_nxt = AQR_DA_SUCC(aq_rcpt);
#if QMGR_DEBUG > 1
QM_LEV_DPRINTFC(QDC_UPDRCPT, 3, (QM_DEBFP, "sev=DBG, func=qda_upd_ta_rcpt_stat, aq_rcpt=%p, aq_rcpt_nxt=%p\n", aq_rcpt, aq_rcpt_nxt));
aq_rcpt_print(aq_rcpt);
#endif
if (aq_rcpt_nxt == aq_rcpt)
aq_rcpt_nxt = NULL;
if (nrcpts++ > entries) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_FATAL, 0,
"sev=FATAL, func=qda_upd_ta_rcpt_stat, nrcpts=%u, aq_entries=%u",
nrcpts, entries);
/* ABORT for now */
SM_ASSERT(nrcpts <= entries);
break;
}
QM_LEV_DPRINTFC(QDC_UPDRCPT, 5, (QM_DEBFP, "sev=DBG, func=qda_upd_ta_rcpt_stat, da_ta=%s, rcpt_da=%s, idx=%d, aq_rcpt=%p, aq_rcpt_nxt=%p\n", da_ta_id, aq_rcpt->aqr_da_ta_id, aq_rcpt->aqr_idx, aq_rcpt, aq_rcpt_nxt));
/*
** Only perform checks if da_ta_id is set and it's not a
** recipient with an SMAR failure
*/
if (DA_TA_ID_IS_VALID(da_ta_id)) {
/* Sanity check: must have same DA transaction id */
if (!SESSTA_EQ(aq_rcpt->aqr_da_ta_id, da_ta_id)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_FATAL, 0,
"sev=FATAL, func=qda_upd_ta_rcpt_stat, da_ta=%s rcpt_da_ta=%s, stat=%d",
da_ta_id, aq_rcpt->aqr_da_ta_id,
aq_rcpt->aqr_status);
/* ABORT for now */
SM_ASSERT(SESSTA_EQ(aq_rcpt->aqr_da_ta_id, da_ta_id));
continue;
}
/* Sanity check; see also actdb-int.h */
/* XXX Should also check for AQR_FL_WAIT4UPD */
if (!AQR_IS_FLAG(aq_rcpt, AQR_FL_SCHED)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_ERR, 1,
"sev=ERROR, func=qda_upd_ta_rcpt_stat, da_ta=%s, stat=%d, flags=%#x, ss_ta=%s, idx=%d",
da_ta_id, aq_rcpt->aqr_status,
aq_rcpt->aqr_flags, ss_ta_id,
aq_rcpt->aqr_idx);
continue;
}
}
ret = q_upd_rcpt_stat(qmgr_ctx, ss_ta_id, status,
err_st, aq_ta, aq_rcpt, edb_req_hd,
ibdb_req_hd, errmsg, &iqdb_rcpts_done,
pdelay_next_try);
if (!sm_is_err(ret))
flags |= ret;
else {
rv = ret;
goto error;
}
}
/*
** Updated all recipients, now take care of the transaction
*/
/* Removed any entries from INCEDB? */
if (iqdb_rcpts_done > 0) {
ret = qda_upd_iqdb(qmgr_ctx, iqdb_rcpts_done, ss_ta_id, cdb_id,
ibdb_req_hd);
/* COMPLAIN on error? already done in qda_upd_iqdb() */
QM_LEV_DPRINTFC(QDC_UPDRCPT, 2, (QM_DEBFP, "sev=DBG, func=qda_upd_ta_rcpt_stat, iqdb_rcpts_done=%d, qda_upd_iqdb=%r\n", iqdb_rcpts_done, ret));
if (sm_is_err(ret))
goto error;
}
/*
** No deliverable recipients in AQ anymore or need to update TA?
** Note: this may update TA after each delivery attempt which
** can be costly. It might be useful to collect those updates
** and perform them only if there are enough of them (or a
** timeout occurred or something "requires" an update).
**
** XXX The second condition -- one recipient but that's a bounce --
** is not always correct: there can be multiple bounces; do we
** need a counter instead of a simple flag?
** It's not really important because it will be updated later on
** in that case. The only important part is that the TA is not
** removed from IBDB without being added to DEFEDB if it is
** still needed, i.e., some recipient is still there.
*/
if (aq_ta->aqt_rcpts_inaq == 0
|| (aq_ta->aqt_rcpts_inaq == 1 && aq_ta_has_bounce(aq_ta))
|| AQ_TA_IS_FLAG(aq_ta, AQ_TA_FL_EDB_UPD_R))
{
#if QMGR_DEBUG > 2
/* just a hack to stop testing... */
if (aq_ta->aqt_rcpts_inaq == 1 && aq_ta_has_bounce(aq_ta))
QM_LEV_DPRINTFC(QDC_UPDRCPT, 1, (QM_DEBFP, "sev=ERROR, func=qda_upd_ta_rcpt_stat, aq_ta_rm(%p=%s), aqt_rcpts_inaq=%u, aqt_rcpts_tot=%u, aqt_rcpts_left=%u, edb_upd=%d, hasbounce=%d\n", aq_ta, aq_ta->aqt_ss_ta_id, aq_ta->aqt_rcpts_inaq, aq_ta->aqt_rcpts_tot, aq_ta->aqt_rcpts_left, AQ_TA_IS_FLAG(aq_ta, AQ_TA_FL_EDB_UPD_C), aq_ta_has_bounce(aq_ta)));
#endif /* QMGR_DEBUG */
QM_LEV_DPRINTFC(QDC_UPDRCPT, 1, (QM_DEBFP, "sev=DBG, func=qda_upd_ta_rcpt_stat, aq_ta_rm(%p=%s), aqt_rcpts_tot=%u, aqt_rcpts_left=%u, edb_upd=%d\n", aq_ta, aq_ta->aqt_ss_ta_id, aq_ta->aqt_rcpts_tot, aq_ta->aqt_rcpts_left, AQ_TA_IS_FLAG(aq_ta, AQ_TA_FL_EDB_UPD_R)));
/* No recipients at all (in DEFEDB)? */
rmcdb = aq_ta->aqt_rcpts_left == 0;
/*
When to update a TA in DEFEDB?
1. When a RCPT is written to/removed from DEFEDB (R)
and the counters in the TA change (C) or the TA is not in DEFEDB (!D).
R && (C || !D)
Note: When a RCPT is updated in DEFEDB then TA is in DEFEDB or
the counters change (the counters do not change iff the RCPT status does
not change; if the RCPT status does not change, then the only reason
the RCPT is written to DEFEDB is because it was there earlier
and hence TA was there too (that is a pre-requirement of the algorithm:
a RCPT is only in DEFEDB iff its TA is there too; this may change if
the scheduler pre-empts recipients!)
Hence we can simplify
R && (C || !D) to R && C
2. When the counters in TA change (C) and it is in DEFEDB (D).
C && D
Is this correct?
1. || 2. == (R && C) || (C && D) == C && (R || D)
*/
if (!rmcdb &&
AQ_TA_IS_FLAG(aq_ta, AQ_TA_FL_EDB_UPD_C) &&
(AQ_TA_IS_FLAG(aq_ta, AQ_TA_FL_DEFEDB) ||
AQ_TA_IS_FLAG(aq_ta, AQ_TA_FL_EDB_UPD_R)))
{
ret = edb_ta_app(qmgr_ctx->qmgr_edb, aq_ta, edb_req_hd, status);
QM_LEV_DPRINTFC(QDC_UPDRCPT, 1, (QM_DEBFP, "sev=DBG, func=qda_upd_ta_rcpt_stat, ss_ta=%s, edb_ta_app=%r, stat=%d, ta_flags=%#x, aqt_rcpts_left=%u\n", ss_ta_id, ret, status, aq_ta->aqt_flags, aq_ta->aqt_rcpts_left));
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_ERR, 1,
"sev=ERROR, func=qda_upd_ta_rcpt_stat, ta=%s, edb_ta_app=%m"
, aq_ta->aqt_ss_ta_id, ret);
rv = ret;
goto error;
}
else {
QM_LEV_DPRINTFC(QDC_UPDRCPT, 4, (QM_DEBFP, "sev=DBG, func=qda_upd_ta_rcpt_stat, ss_ta=%s, edb_ta_app=%r\n", aq_ta->aqt_ss_ta_id, ret));
}
if (!AQ_TA_IS_FLAG(aq_ta, AQ_TA_FL_DEFEDB)) {
/* Not yet?? Only if it is actually written */
AQ_TA_SET_FLAG(aq_ta, AQ_TA_FL_DEFEDB);
aq_ctx->aq_d_entries++;
SM_ASSERT(aq_ctx->aq_d_entries <= aq_ctx->aq_entries);
QM_LEV_DPRINTFC(QDC_UPDRCPT, 0, (QM_DEBFP, "sev=DBG, func=qda_upd_ta_rcpt_stat, ss_ta=%s, ta_flags=%#x, aq_d_entries=%u, aq_entries=%u\n", ss_ta_id, aq_ta->aqt_flags, aq_ctx->aq_d_entries, aq_ctx->aq_entries));
}
AQ_TA_CLR_FLAG(aq_ta, AQ_TA_FL_EDB_UPD_C|AQ_TA_FL_EDB_UPD_R);
}
QM_LEV_DPRINTFC(QDC_UPDRCPT, 2, (QM_DEBFP, "sev=DBG, func=qda_upd_ta_rcpt_stat, aqt_rcpts_inaq=%d, aqt_rcpts_left=%d, temp=%u, perm=%u, flags=%#x, rmcdb=%d, AQ_TA_FL_DEFEDB=%d\n", aq_ta->aqt_rcpts_inaq, aq_ta->aqt_rcpts_left, aq_ta->aqt_rcpts_temp, aq_ta->aqt_rcpts_perm, aq_ta->aqt_flags, rmcdb, AQ_TA_IS_FLAG(aq_ta, AQ_TA_FL_DEFEDB)));
if (rmcdb && AQ_TA_IS_FLAG(aq_ta, AQ_TA_FL_DEFEDB)) {
ret = edb_ta_rm_req(qmgr_ctx->qmgr_edb, aq_ta->aqt_ss_ta_id, edb_req_hd);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_ERR, 1,
"sev=ERROR, func=qda_upd_ta_rcpt_stat, ta=%s, edb_ta_rm_req=%m",
aq_ta->aqt_ss_ta_id, ret);
rv = ret;
goto error;
}
else {
QM_LEV_DPRINTFC(QDC_UPDRCPT, 4, (QM_DEBFP, "sev=DBG, func=qda_upd_ta_rcpt_stat, edb_ta_rm_req(%s)=%r\n", aq_ta->aqt_ss_ta_id, ret));
}
}
/* protected by fs_* mutex */
if (rmcdb && cdb_id != NULL) {
ret = cdb_sz_rm(qmgr_ctx->qmgr_cdb_fsctx,
(char *) sm_cstr_data(cdb_id),
SM_B2KB(aq_ta->aqt_msg_sz_b),
&qmgr_ctx->qmgr_cdb_kbfree);
QM_LEV_DPRINTFC(QDC_UPDRCPT, 4, (QM_DEBFP, "sev=DBG, func=qda_upd_ta_rcpt_stat, cdb_sz_rm=%#x, size=%lu, kbfree=%lu\n", ret, (ulong) aq_ta->aqt_msg_sz_b, qmgr_ctx->qmgr_cdb_kbfree));
if (sm_is_err(ret)) {
/* just log, don't "error out" */
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_ERR, 1,
"sev=ERROR, func=qda_upd_ta_rcpt_stat, aq_ta=%p, ss_ta=%s, cdb=%C, msg_size=%lu, cdb_sz_rm=%#x",
aq_ta, aq_ta->aqt_ss_ta_id,
cdb_id, (ulong) aq_ta->aqt_msg_sz_b,
ret);
}
else if (qmgr_ctx->qmgr_cdb_kbfree > qmgr_ctx->qmgr_cnf.q_cnf_min_df)
{
ret = qm_resource(qmgr_ctx, QMGR_UN_THROTTLE,
DISK_USAGE(qmgr_ctx->qmgr_cdb_kbfree,
qmgr_ctx),
QMGR_RFL_CDB_I);
}
}
/* No recipients in AQ for this TA? */
if (0 == aq_ta->aqt_rcpts_inaq) {
#if QMGR_DEBUG
aq_ta_P aq_ta_p = aq_ta;
#endif
ret = aq_ta_rm(aq_ctx, aq_ta, false);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_ERR, 1,
"sev=ERROR, func=qda_upd_ta_rcpt_stat, aq_ta=%p, ss_ta=%s, aq_ta_rm=%m",
aq_ta, aq_ta->aqt_ss_ta_id, ret);
rv = ret;
goto error;
}
else {
QM_LEV_DPRINTFC(QDC_UPDRCPT, 4, (QM_DEBFP, "sev=DBG, func=qda_upd_ta_rcpt_stat, aq_ta_rm(%p)=%r\n", aq_ta_p, ret));
ret = qm_resource(qmgr_ctx, QMGR_UN_THROTTLE,
aq_usage(qmgr_ctx->qmgr_aq,
AQ_USAGE_ALL),
QMGR_RFL_AQ_I);
}
}
}
/* remove cdb? */
if (rmcdb && cdb_id != NULL) {
#if QMGR_TEST
if (SM_IS_FLAG(qmgr_ctx->qmgr_cnf.q_cnf_tests, QMGR_TEST_INT_SRC))
ret = SM_SUCCESS;
else
#endif
/*
** Note: maybe this should be done outside the locked code section?
** On some OS this may cause delays due to disk I/O and shouldn't be
** done while AQ and EDB are locked.
*/
ret = cdb_unlink(qmgr_ctx->qmgr_cdb_ctx, (char *) sm_cstr_data(cdb_id));
if (sm_is_err(ret)) {
/* just log, don't "error out" */
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_ERR, 1,
"sev=ERROR, func=qda_upd_ta_rcpt_stat, ss_ta=%s, cdb=%C, cdb_unlink=%m",
ss_ta_id, cdb_id, ret);
}
else {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_INFO, 9,
"sev=INFO, func=qda_upd_ta_rcpt_stat, ss_ta=%s, cdb=%C, status=done",
ss_ta_id, cdb_id);
}
}
SM_CSTR_FREE(cdb_id);
/* close DA session if necessary */
if (DA_TA_ID_IS_VALID(da_ta_id) && dadb_ctx != NULL) {
ret = qda_dadb_close(qmgr_ctx, da_ta_id, dadb_ctx, dadb_entry, status, &flags);
}
#if 0
if (rmcdb)
SM_ASSERT(0 == aq_ta->aqt_rcpts_inaq);
#endif /* 0 */
#if 0
/*
** Reset bounce indices to avoid that other bounced recipients
** will be added. It might be possible to defer the check to
** qm_bounce_add() by checking whether the bounce is already
** scheduled for delivery and only if it is then a new bounce
** envelope will be created (the envelope as such doesn't exist,
** only the bounce recipient).
*/
aq_ta->aqt_dbl_bounce_idx = 0;
aq_ta->aqt_bounce_idx = 0;
#endif /* 0 */
error:
return sm_is_err(rv) ? rv : flags;
}
/*
** QDA_UPDATE_TA_STAT -- Update DA status for one transaction
**
** This function is a wrapper for qda_upd_ta_rcpt_stat() which may
** append entries to the update queues for DEFEDB and IBDB.
** These entries are written to disk at the end of this function and
** hence may cause delays.
**
** Parameters:
** qmgr_ctx -- QMGR context
** da_ta_id -- DA transaction id
** status -- status of (entire) transaction
** err_st -- state which cause error
** dadb_ctx -- DA DB context
** dadb_entry -- DA DB entry
** aq_ta -- AQ transaction
** aq_rcpt -- AQ recipient
** errmsg -- error message (might be NULL)
** (must be "sanitized" by caller)
** locktype -- kind of locking
**
** Returns:
** usual sm_error code
**
** Side Effects:
** see qda_upd_ta_rcpt_stat() and ibdb_wr_status()
**
** Called by: qm_fr_sc_react()
**
** Locking: locktype: locks entire edbc and aq_ctx during operation
** (in that order) returns unlocked
** It's not possible to lock only one of these!
** may lock (unconditionally) dadb_ctx
*/
sm_ret_T
qda_update_ta_stat(qmgr_ctx_P qmgr_ctx, sessta_id_T da_ta_id, sm_ret_T status,
uint err_st, dadb_ctx_P dadb_ctx, dadb_entry_P dadb_entry,
aq_ta_P aq_ta, aq_rcpt_P aq_rcpt, sm_str_P errmsg, thr_lock_T locktype)
{
sm_ret_T ret, flags, rv;
int r, delay_next_try;
ibdb_req_hd_T ibdb_req_hd;
edb_req_hd_T edb_req_hd;
aq_ctx_P aq_ctx;
SM_IS_QMGR_CTX(qmgr_ctx);
SM_IS_AQ_TA(aq_ta);
aq_ctx = qmgr_ctx->qmgr_aq;
SM_IS_AQ(aq_ctx);
IBDBREQL_INIT(&(ibdb_req_hd));
delay_next_try = 0;
rv = SM_SUCCESS;
#if !QMGR_TEST_NO_AQUPDATE
if (thr_lock_it(locktype)) {
r = pthread_mutex_lock(&qmgr_ctx->qmgr_edbc->edbc_mutex);
SM_LOCK_OK(r);
if (r != 0) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_CRIT, 1,
"sev=CRIT, func=qda_update_ta_stat, lock_edbc=%m",
sm_err_perm(r));
return sm_error_perm(SM_EM_Q, r);
}
r = pthread_mutex_lock(&aq_ctx->aq_mutex);
SM_LOCK_OK(r);
if (r != 0) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_CRIT, 1,
"sev=CRIT, func=qda_update_ta_stat, lock_aq=%m",
sm_err_perm(r));
(void) pthread_mutex_unlock(&qmgr_ctx->qmgr_edbc->edbc_mutex);
return sm_error_perm(SM_EM_Q, r);
}
}
EDBREQL_INIT(&edb_req_hd);
QM_LEV_DPRINTFC(QDC_UPDRCPT, 1, (QM_DEBFP, "sev=DBG, func=qda_update_ta_stat, stat=%d, err_st=%#x, da_ta=%s, aqt_rcpts_inaq=%u\n", status, err_st, da_ta_id, aq_ta->aqt_rcpts_inaq));
#if QMGR_STATS
if (SMTP_OK == status)
++qmgr_ctx->qmgr_tas_sent;
#endif
flags = qda_upd_ta_rcpt_stat(qmgr_ctx, da_ta_id, status, err_st,
dadb_ctx, dadb_entry, aq_ta, aq_rcpt, &edb_req_hd,
&ibdb_req_hd, errmsg, &delay_next_try);
QM_LEV_DPRINTFC(QDC_UPDRCPT, 4, (QM_DEBFP, "sev=DBG, func=qda_update_ta_stat, qda_upd_ta_rcpt_stat=%r, delay_next_try=%d\n", flags, delay_next_try));
/* check unlocking! XXX */
if (thr_unl_always(locktype)) {
r = pthread_mutex_unlock(&aq_ctx->aq_mutex);
if (r != 0) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_CRIT, 1,
"sev=CRIT, func=qda_update_ta_stat, unlock_aq=%m",
sm_err_perm(r));
SM_ASSERT(0 == r);
}
}
if (sm_is_success(flags)) {
ret = edb_wr_status(qmgr_ctx->qmgr_edb, &edb_req_hd);
QM_LEV_DPRINTF(sm_is_err(ret) ? 1 : 5, (QM_DEBFP, "sev=DBG, func=qda_update_ta_stat, edb_wr_status=%r\n", ret));
if (sm_is_err(ret))
{
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_ERR, 1,
"sev=ERROR, func=qda_update_ta_stat, edb_wr_status=%m",
ret);
rv = ret;
}
}
else {
(void) edb_reql_free(qmgr_ctx->qmgr_edb, &edb_req_hd);
rv = ret = flags; /* set error code to avoid ibdb_wr_status */
flags = 0;
}
if (thr_unl_always(locktype)) {
r = pthread_mutex_unlock(&qmgr_ctx->qmgr_edbc->edbc_mutex);
if (r != 0) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_CRIT, 1,
"sev=CRIT, func=qda_update_ta_stat, unlock_edbc=%m",
sm_err_perm(r));
SM_ASSERT(0 == r);
}
}
/* successfully written to DEFEDB? */
if (sm_is_success(ret)) {
ret = ibdb_wr_status(qmgr_ctx->qmgr_ibdb, &ibdb_req_hd);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_DASTAT, QM_LMOD_DASTAT,
SM_LOG_ERR, 1,
"sev=ERROR, func=qda_update_ta_stat, ibdb_wr_status=%m",
ret);
rv = ret;
}
}
else {
/* ignore result */
(void) ibdb_req_cancel(qmgr_ctx->qmgr_ibdb, &ibdb_req_hd);
}
#else /* !QMGR_TEST_NO_AQUPDATE */
if (DA_TA_ID_IS_VALID(da_ta_id) && dadb_ctx != NULL)
ret = qda_dadb_close(qmgr_ctx, da_ta_id, dadb_ctx, dadb_entry, status, &flags);
flags = QDA_FL_ACT_SCHED;
#endif /* !QMGR_TEST_NO_AQUPDATE */
if (!sm_is_err(rv) && (QDA_ACT_SCHED(flags) || delay_next_try > 0) &&
qmgr_ctx->qmgr_sched != NULL)
{
timeval_T slpt, nowt, newt;
/* activate scheduler */
ret = evthr_timeval(qmgr_ctx->qmgr_sched->evthr_t_ctx, &nowt);
slpt.tv_usec = 0;
slpt.tv_sec = delay_next_try;
timeradd(&nowt, &slpt, &newt);
ret = evthr_new_sl(qmgr_ctx->qmgr_sched, newt, false);
/* XXX check ret? */
QM_LEV_DPRINTFC(QDC_WKSCHED, 1, (QM_DEBFP, "sev=DBG, func=qda_update_ta_stat, evthr_new_sl=%r, newt=%ld\n", ret, newt.tv_sec));
}
/* HACK notify qar task */
if (!sm_is_err(rv) && QDA_ACT_SMAR(flags)) {
sm_evthr_task_P qar_tsk;
qar_ctx_P qar_ctx;
qar_ctx = qmgr_ctx->qmgr_ar_ctx;
SM_IS_QAR_CTX(qar_ctx);
qar_tsk = qmgr_ctx->qmgr_ar_tsk;
ret = evthr_en_wr(qar_tsk);
QM_LEV_DPRINTFC(QDC_UPDRCPT, 5, (QM_DEBFP, "sev=DBG, func=qda_update_ta_stat, enable_smar_wr=%r\n", ret));
}
#if QMGR_TEST
/* trigger an error if requested (add some condition??) */
if (SM_IS_FLAG(qmgr_ctx->qmgr_cnf.q_cnf_tests, QMGR_TEST_UPD_ABORT)) {
SM_ASSERT(false); /* abort */
}
#endif /* QMGR_TEST */
return rv;
}
syntax highlighted by Code2HTML, v. 0.9.1