/* * Copyright (c) 2003-2005 Sendmail, Inc. and its suppliers. * All rights reserved. * * By using this file, you agree to the terms and conditions set * forth in the LICENSE file which can be found at the top level of * the sendmail distribution. */ #include "sm/generic.h" SM_RCSID("@(#)$Id: edbr.c,v 1.77 2006/11/27 03:40:40 ca Exp $") #include "sm/assert.h" #include "sm/magic.h" #include "sm/error.h" #include "sm/memops.h" #include "sm/edbc.h" #include "sm/qmgr-int.h" #include "sm/actdb-int.h" #include "sm/aqrdq.h" #include "qmgr.h" #include "log.h" #define AQ_USAGE_ALL_RDEDB 99 #define AQ_USAGE_DEF_RDEDB 10 /* ** QM_EDBR_GEN_DSN -- generate a dsn for a (failed) rcpt that has been ** read from DEFEDB unless there still is one, take care of EDB cache etc. ** ** Parameters: ** qmgr_ctx -- QMGR context ** aq_ctx -- AQ context ** aq_ta -- AQ transaction ** aq_rcpt -- AQ recipient ** edb_ctx -- EDB context ** edb_req_hd -- head of request list for (DEF)EDB ** edbc_ctx -- EDBC context ** edbc_node -- current EDBC node ** time_now -- current time ** ** Returns: ** usual sm_error code ** ** Side Effects: aq_rcpt is freed ** ** Called by: qm_get_edb_entries() ** ** Last code review: 2005-04-18 01:30:54; see comments ** Last code change: 2005-04-18 05:41:10 */ static sm_ret_T qm_edbr_gen_dsn(qmgr_ctx_P qmgr_ctx, aq_ctx_P aq_ctx, aq_ta_P aq_ta, aq_rcpt_P aq_rcpt, edb_ctx_P edb_ctx, edb_req_hd_T edb_req_hd, edbc_ctx_P edbc_ctx, edbc_node_P edbc_node, time_T time_now) { sm_ret_T ret, flags; time_T bounce_expire; flags = 0; bounce_expire = 0; /* is there already a bounce? */ /* XXX does this have to check for double bounce too? qm_bounce_add() */ if (aq_rcpt_has_bounce(aq_rcpt)) { aq_rcpt_P aq_rcpt_2; /* check whether bounce is still in AQ */ ret = aq_rcpt_find_ss(aq_ctx, aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_dsn_idx, THR_LOCK_UNLOCK, &aq_rcpt_2); if (sm_is_success(ret) && aq_rcpt_2 != NULL) bounce_expire = aq_rcpt_2->aqr_expire; } else ret = sm_error_perm(SM_EM_AQ, SM_E_NOTFOUND); if (bounce_expire == 0) (void) qm_rcpt_da_expire(qmgr_ctx, aq_rcpt, time_now, &bounce_expire); /* avoid endless loop in qm_get_edb_entries() */ if (bounce_expire <= time_now) bounce_expire = time_now + 1; if (sm_is_err(ret)) { /* generate bounce... */ ret = qm_bounce_add(qmgr_ctx, aq_rcpt->aqr_ss_ta, aq_rcpt, NULL, NULL); if (sm_is_success(ret)) flags |= ret; } if (sm_is_err(ret)) { /* XXX What to do now? Log and try again later? */ QM_LEV_DPRINTFC(QDC_EDBR, 0, (QM_DEBFP, "sev=ERROR, func=qm_edbr_gen_dsn, aq_rcpt=%p, rcpt_idx=%d, ss_ta=%s, qm_bounce_add=%r\n", aq_rcpt, aq_rcpt->aqr_idx, aq_ta->aqt_ss_ta_id, ret)); sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_DEFEDB, SM_LOG_ERR, 4, "sev=ERROR, func=qm_edbr_gen_dsn, ss_ta=%s, rcpt_pa=%S, stat=%d, qm_bounce_add=%m" , aq_rcpt->aqr_ss_ta_id , aq_rcpt->aqr_pa , aq_rcpt->aqr_status, ret); /* set return value; "fall through" for aq_rcpt removal */ flags = ret; } else { rcpt_id_T rcpt_id; flags |= ret; sm_snprintf(rcpt_id, sizeof(rcpt_id), SMTP_RCPTID_FORMAT, aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_idx); aq_rcpt->aqr_next_try = bounce_expire; /* create a "edbc_mv" function? */ ret = edbc_rm(edbc_ctx, edbc_node); if (sm_is_err(ret)) { QM_LEV_DPRINTFC(QDC_EDBR, 0, (QM_DEBFP, "sev=ERROR, func=qm_edbr_gen_dsn, aq_rcpt=%p, rcpt_idx=%d, ss_ta=%s, edbc_rm=%r\n", aq_rcpt, aq_rcpt->aqr_idx, aq_ta->aqt_ss_ta_id, ret)); sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_DEFEDB, SM_LOG_ERR, 4, "sev=ERROR, func=qm_edbr_gen_dsn, ss_ta=%s, idx=%u, rcpt_pa=%S, stat=%d, edbc_rm=%m" , aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_idx , aq_rcpt->aqr_pa, aq_rcpt->aqr_status, ret); } ret = edbc_add(edbc_ctx, rcpt_id, aq_rcpt->aqr_next_try, false); if (sm_is_err(ret)) { QM_LEV_DPRINTFC(QDC_EDBR, 0, (QM_DEBFP, "sev=ERROR, func=qm_edbr_gen_dsn, aq_rcpt=%p, rcpt_idx=%d, ss_ta=%s, edbc_add=%r\n", aq_rcpt, aq_rcpt->aqr_idx, aq_ta->aqt_ss_ta_id, ret)); sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_DEFEDB, SM_LOG_ERR, 4, "sev=ERROR, func=qm_edbr_gen_dsn, ss_ta=%s, idx=%u, rcpt_pa=%S, stat=%d, edbc_add=%m" , aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_idx , aq_rcpt->aqr_pa, aq_rcpt->aqr_status, ret); } ret = edb_rcpt_app(edb_ctx, aq_rcpt, &edb_req_hd, aq_rcpt->aqr_status); if (sm_is_err(ret)) { /* ** XXX How to deal with this error? ** Just ignore it and hope that it ** works next time the queue is read? */ sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_DEFEDB, SM_LOG_ERR, 4, "sev=ERROR, func=qm_edbr_gen_dsn, ss_ta=%s, idx=%u, rcpt_pa=%S, stat=%d, edb_rcpt_app=%m" , aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_idx , aq_rcpt->aqr_pa, aq_rcpt->aqr_status, ret); QM_LEV_DPRINTFC(QDC_EDBR, 0, (QM_DEBFP, "sev=ERROR, func=qm_edbr_gen_dsn, aq_rcpt=%p, ss_ta=%s, rcpt_idx=%u, edb_rcpt_app=%r\n", aq_rcpt, aq_ta->aqt_ss_ta_id, aq_rcpt->aqr_idx, ret)); } } (void) aq_rcpt_rm(aq_ctx, aq_rcpt, AQR_RM_LOCK); SM_ASSERT(aq_ta->aqt_rcpts_inaq > 0); --aq_ta->aqt_rcpts_inaq; #if 0 if (aq_ta->aqt_rcpts_inaq == 0) /* XXX remove TA from AQ; maybe do it later? */; #endif return flags; } /* ** Should this be a timer activated task? ** We could set the "sleep" time to the "next time to try" from EDBC. ** It's called from the scheduler, which is time triggered as well as ** event triggered. */ /* ** QM_GET_EDB_ENTRIES -- read some EDB entries and add them to AQ ** The entries are read in the order specified by EDBC. ** ** Parameters: ** qmgr_ctx -- QMGR context ** pnext_try -- (pointer to) seconds until next try (output) ** ** Returns: ** usual sm_error code ** should it be: >=0: number of elements not read due to some error ** or because AQ is almost filled? ** ** Called by: qmgr_sched() ** ** Locking: ** locks edbc_mutex ** locks aq_mutex when accessing AQ ** This will perform disk I/O and hence keeping aq_ctx ** locked during the entire function is "bad"... ** AQ needs to be locked if aq_ctx is manipulated (add/rm entries) ** As long as the scheduler does not run in parallel with this ** function, we don't need to lock aq_ta or aq_rcpt since only ** the scheduler will access them. ** AQ is also accessed if results come back from a DA, however, ** those functions lock edbc_mutex first, just as cleanup does. ** This means that this function will block any updates to AQ. ** ** Notes: should this skip over "bad" entries instead of stopping? ** ** Last code review: 2005-04-19 00:45:57; see comments/notes ** Last code change: */ sm_ret_T qm_get_edb_entries(qmgr_ctx_P qmgr_ctx, int *pnext_try) { sm_ret_T ret, flags; int r, usage; uint fct_state, edbc_entries_cnt, edbc_entries_lim ; bool edbc_aqfull; time_T time_now; aq_ctx_P aq_ctx; edb_ctx_P edb_ctx; edbc_ctx_P edbc_ctx; edbc_node_P edbc_node, edbc_node_nxt; edb_req_P edb_req; aq_rcpt_P aq_rcpt; aq_ta_P aq_ta, aq_ta_2; edb_req_hd_T edb_req_hd; #if QMGR_DEBUG bool chkaqta; #endif /* function state flags */ #define FST_EDBC_LCK 0x01 /* edbc is locked */ #define FST_AQTA_ALLOC 0x02 /* aq_ta is allocated */ #define FST_WR_EDB 0x04 /* write edb */ SM_IS_QMGR_CTX(qmgr_ctx); aq_ctx = qmgr_ctx->qmgr_aq; SM_IS_AQ(aq_ctx); edb_ctx = qmgr_ctx->qmgr_edb; edbc_ctx = qmgr_ctx->qmgr_edbc; edb_req = NULL; aq_rcpt = NULL; aq_ta = NULL; edbc_node = NULL; flags = 0; fct_state = 0; edbc_aqfull = EDBC_IS_FLAG(edbc_ctx, EDBC_FL_AQFULL); time_now = evthr_time(qmgr_ctx->qmgr_ev_ctx); if (pnext_try != NULL) *pnext_try = 0; #if QMGR_DEBUG chkaqta = false; #endif /*R: N:edb_req, N:aq_ta, N:aq_rcpt, N:edbc_node */ /* only used for perm. failed rcpts to update "next time to try" */ EDBREQL_INIT(&edb_req_hd); /* ** Note: it might be useful to check aq_usage() before trying ** to read any data: if the limits are exceeded, then there's no ** need to do anything. However, this keeps us from determining ** next_try. This could be solved by keeping the minimum value ** in edbc, however, that requires locking edbc. Overall, it ** does not seem to be worth the effort. */ ret = edb_req_new(edb_ctx, EDB_RQF_NONE, &edb_req, true); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_DEFEDB, SM_LOG_ERROR, 4, "sev=ERROR, func=qm_get_edb_entries, edb_req_new=%m" , ret); goto error; } /*R: A:edb_req, N:aq_ta, N:aq_rcpt, N:edbc_node */ /* ** Is it ok to keep EDBC locked? It's used in da_stat. ** However, if we don't keep it locked the order might be ** screwed up. */ r = pthread_mutex_lock(&edbc_ctx->edbc_mutex); SM_LOCK_OK(r); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_DEFEDB, SM_LOG_CRIT, 2, "sev=CRIT, func=qm_get_edb_entries, lock_edbc=%d", r); ret = sm_error_perm(SM_EM_AQ, r); goto error; } SM_SET_FLAG(fct_state, FST_EDBC_LCK); #if QMGR_DEBUG > 2 ebdc_print(edbc_ctx); #endif edbc_node = edbc_first(edbc_ctx); /*R: A:edb_req, N:aq_ta, N:aq_rcpt, P:edbc_node */ QM_LEV_DPRINTFC(QDC_EDBR, 3, (QM_DEBFP, "sev=DBG, func=qm_get_edb_entries, now=%6ld, entries=%d\n", (long) time_now, edbc_ctx->edbc_entries)); edbc_entries_cnt = 0; edbc_entries_lim = edbc_ctx->edbc_entries; while (edbc_node != NULL && edbc_node->ecn_next_try <= time_now && edbc_entries_cnt <= edbc_entries_lim ) { edbc_node_nxt = edbc_next(edbc_ctx, edbc_node); ++edbc_entries_cnt; QM_LEV_DPRINTFC(QDC_EDBR, 3, (QM_DEBFP, "sev=DBG, func=qm_get_edb_entries, id=%s, next_try=%6ld, now=%6ld\n", edbc_node->ecn_rcpt_id, (long) edbc_node->ecn_next_try, (long) time_now)); /* ** Check whether there is enough space in AQ. ** Get recipient from DEFEDB and put it into AQ. ** If corresponding TA isn't in AQ: fetch that too. */ usage = aq_usage(aq_ctx, AQ_USAGE_ALL); if (usage >= AQ_USAGE_ALL_RDEDB) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_DEFEDB, SM_LOG_INFO, edbc_aqfull ? 19 : 9, "sev=INFO, func=qm_get_edb_entries, aq_usage=%d, status=stop_adding_entries_from_defebd" , usage); EDBC_SET_FLAG(edbc_ctx, EDBC_FL_AQFULL); break; } /* ** defedb/AQ usage must not exceed AQ_USAGE_DEF_RDEDB if ** total usage exceeds lower threshold for throttling. ** This makes sure that defedb entries do not cause SMTPS ** to throttle (if the values are set properly). */ r = aq_usage(aq_ctx, AQ_USAGE_DEFEDB); if (r >= AQ_USAGE_DEF_RDEDB && usage >= qmgr_ctx->qmgr_lower[QMGR_RFL_AQ_I]) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_DEFEDB, SM_LOG_INFO, edbc_aqfull ? 19 : 9, "sev=INFO, func=qm_get_edb_entries, aq_usage=%d, aq_usage_defedb=%d, status=stop_adding_entries_from_defebd" , usage, r); EDBC_SET_FLAG(edbc_ctx, EDBC_FL_AQFULL); break; } EDBC_CLR_FLAG(edbc_ctx, EDBC_FL_AQFULL); /* Should we immediately add this to AQ? */ ret = aq_ta_add_new(aq_ctx, &aq_ta, AQ_TA_FL_DEFEDB, 1, THR_LOCK_UNLOCK); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_DEFEDB, SM_LOG_ERROR, 4, "sev=ERROR, func=qm_get_edb_entries, aq_ta_add_new=%m" , ret); goto error; } /*R: A:edb_req, A/L:aq_ta, N:aq_rcpt, P:edbc_node */ SM_SET_FLAG(fct_state, FST_AQTA_ALLOC); ret = aq_rcpt_add_new(aq_ctx, aq_ta, &aq_rcpt, AQR_FL_DEFEDB, THR_LOCK_UNLOCK); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_DEFEDB, SM_LOG_ERROR, 4, "sev=ERROR, func=qm_get_edb_entries, aq_rcpt_add_new=%m" , ret); goto error; } /*R: A:edb_req, A/L:aq_ta, A:aq_rcpt, P:edbc_node */ #if QMGR_DEBUG chkaqta = true; /* really? */ #endif /* Note: we have to avoid starvation of DEFEDB entries! */ RCPT_ID_COPY(edb_req->edb_req_id, edbc_node->ecn_rcpt_id); edb_req->edb_req_type = EDB_REQ_RCPT; ret = edb_rd_req(edb_ctx, edb_req); if (sm_is_err(ret)) { if (ret == sm_error_perm(SM_EM_EDB, DB_NOTFOUND)) { QM_LEV_DPRINTFC(QDC_EDBR, 1, (QM_DEBFP, "sev=WARN, func=qm_get_edb_entries, edb_rd_req(%s)=%r\n", edbc_node->ecn_rcpt_id, ret)); goto next_item; } sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_DEFEDB, SM_LOG_ERROR, 4, "sev=ERROR, func=qm_get_edb_entries, rcpt_id=%s, edb_rd_req=%m" , edbc_node->ecn_rcpt_id, ret); goto error; } /*R: A/R:edb_req, A/L:aq_ta, A:aq_rcpt, P:edbc_node */ /* AQ rcpt must be locked? Only when we update the flags. */ ret = edb_rcpt_dec(edb_req, aq_rcpt); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_DEFEDB, SM_LOG_ERROR, 4, "sev=ERROR, func=qm_get_edb_entries, rcpt_id=%s, edrcpt_dec=%m" , edbc_node->ecn_rcpt_id, ret); goto error; } /*R: A/R:edb_req, A/L:aq_ta, A/R:aq_rcpt, P:edbc_node */ AQR_DA_INIT(aq_rcpt); aq_rcpt->aqr_entered = time_now; /* ignore result, a failure can be tolerated */ (void) aq_rcpt_set_domain(aq_rcpt, qmgr_ctx->qmgr_hostname); /* Check whether TA is already in AQ */ ret = aq_ta_find(aq_ctx, aq_rcpt->aqr_ss_ta_id, true, &aq_ta_2); QM_LEV_DPRINTFC(QDC_EDBR, 4, (QM_DEBFP, "sev=DBG, func=qm_get_edb_entries, aq_ta_find(%s)=%r\n", aq_rcpt->aqr_ss_ta_id, ret)); if (SM_SUCCESS == ret) { aq_rcpt_P aq_rcpt_2; /*R: A/R:edb_req, A/L:aq_ta, P:aq_ta_2, A/R:aq_rcpt, P:edbc_node */ /* already there; move rcpt to this TA, remove other */ ret = aq_rcpt_find_one_ss(aq_ctx, aq_rcpt->aqr_ss_ta_id, THR_LOCK_UNLOCK, &aq_rcpt_2); if (sm_is_err(ret) || NULL == aq_rcpt_2) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_DEFEDB, SM_LOG_ERROR, 4, "sev=ERROR, func=qm_get_edb_entries, ss_ta=%s, aq_rcpt_find_one_ss=%m" , aq_rcpt->aqr_ss_ta_id, ret); AQR_SS_INIT(aq_rcpt); } /*R: A/R:edb_req, A/L:aq_ta, P:aq_ta_2, A/R:aq_rcpt, P:aq_rcpt_2, P:edbc_node */ else if (aq_rcpt_2 == aq_rcpt) { /* ??? are there others? */ AQR_SS_INIT(aq_rcpt); } else { /* currently this doesn't fail */ aq_rcpt_ss_insert(aq_rcpt_2, aq_rcpt); } QM_LEV_DPRINTFC(QDC_EDBR, 4, (QM_DEBFP, "sev=DBG, func=qm_get_edb_entries, old_aq_ta=%p, aq_rcpt=%p, flags=%#x\n", aq_ta_2, aq_rcpt, aq_rcpt->aqr_flags )); #if QMGR_DEBUG { aq_rcpt_P aq_rcpt_nxt; QM_LEV_DPRINTFC(QDC_EDBR, 4, (QM_DEBFP, "sev=DBG, func=qm_get_edb_entries, aq_rcpt=%p, aq_rcpt_2=%p\n", aq_rcpt, aq_rcpt_2)); for (aq_rcpt_nxt = AQR_SS_SUCC(aq_rcpt); aq_rcpt_nxt != NULL && aq_rcpt_nxt != aq_rcpt; aq_rcpt_nxt = AQR_SS_SUCC(aq_rcpt_nxt)) { QM_LEV_DPRINTFC(QDC_EDBR, 4, (QM_DEBFP, "sev=DBG, func=qm_get_edb_entries, aq_rcpt_nxt=%p\n", aq_rcpt_nxt)); } } #endif /* QMGR_DEBUG */ /* XXX Increment AQ TA counters? */ ++aq_ta_2->aqt_rcpts_inaq; aq_rcpt->aqr_ss_ta = aq_ta_2; /* free aq_ta, it's not used anymore */ ret = aq_ta_rm(aq_ctx, aq_ta, true); SM_CLR_FLAG(fct_state, FST_AQTA_ALLOC); /*R: A/R:edb_req, F:aq_ta, P:aq_ta_2, A/R:aq_rcpt, U:aq_rcpt_2, P:edbc_node */ if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_DEFEDB, SM_LOG_ERR, 2, "sev=ERROR, func=qm_get_edb_entries, ss_ta=%s, aq_ta_rm=%m" , aq_ta->aqt_ss_ta_id, ret); } /* for further checks down below */ aq_ta = aq_ta_2; /*R: A/R:edb_req, P:aq_ta, U:aq_ta_2, A/R:aq_rcpt, U:aq_rcpt_2, P:edbc_node */ } else { /*R: A/R:edb_req, A/L:aq_ta, U:aq_ta_2, A/R:aq_rcpt, P:edbc_node */ /* not found, get it from DEFEDB */ SESSTA_COPY(edb_req->edb_req_id, aq_rcpt->aqr_ss_ta_id); edb_req->edb_req_type = EDB_REQ_TA; ret = edb_rd_req(edb_ctx, edb_req); if (sm_is_err(ret)) { QM_LEV_DPRINTFC(QDC_EDBR, 0, (QM_DEBFP, "sev=ERROR, func=qm_get_edb_entries, edb_rd_req(%s)=%r\n", edb_req->edb_req_id, ret)); goto error; } ret = edb_ta_dec(edb_req, aq_ta); if (sm_is_err(ret)) { QM_LEV_DPRINTFC(QDC_EDBR, 0, (QM_DEBFP, "sev=ERROR, func=qm_get_edb_entries, edb_ta_dec(%s)=%r\n", edb_req->edb_req_id, ret)); goto error; } /*R: A/R:edb_req, A/L/R:aq_ta, U:aq_ta_2, A/R:aq_rcpt, P:edbc_node */ QM_LEV_DPRINTFC(QDC_EDBR, 6, (QM_DEBFP, "sev=DBG, func=qm_get_edb_entries, aq_rcpt=%p, aq_ta=%p, ss_ta=%s, mail=%@S, cdb=%C\n", aq_rcpt, aq_ta, aq_ta->aqt_ss_ta_id, aq_ta->aqt_mail->aqm_pa, aq_ta->aqt_cdb_id)); QM_LEV_DPRINTFC(QDC_EDBR, 6, (QM_DEBFP, "sev=DBG, func=qm_get_edb_entries, tried=%u, left=%u, temp=%u, perm=%u\n", aq_ta->aqt_rcpts_tried, aq_ta->aqt_rcpts_left, aq_ta->aqt_rcpts_temp, aq_ta->aqt_rcpts_perm)); AQR_SS_INIT(aq_rcpt); aq_ta->aqt_rcpts_inaq = 1; } /*R: "merge" of then/else: problem with aq_ta? added FST flag */ /*R: A/R:edb_req, P:aq_ta, A/R:aq_rcpt, P:edbc_node */ /*R: A/R:edb_req, A/L/R:aq_ta, A/R:aq_rcpt, P:edbc_node */ /*R: after "merge": */ /*R: A/R:edb_req, (A/L/R|P):aq_ta, A/R:aq_rcpt, P:edbc_node */ if (AQR_IS_FLAG(aq_rcpt, AQR_FL_PERM)) { ret = qm_edbr_gen_dsn(qmgr_ctx, aq_ctx, aq_ta, aq_rcpt, edb_ctx, edb_req_hd, edbc_ctx, edbc_node, time_now); aq_rcpt = NULL; if (sm_is_err(ret)) { if (sm_error_value(ret) == ENOMEM) { edbc_node = NULL; goto error; } } else { flags |= ret; SM_SET_FLAG(fct_state, FST_WR_EDB); } /*R: A/R:edb_req, (A/L/R|P):aq_ta, N:aq_rcpt, (P|F):edbc_node */ /* keep this edbc_node for retry in next run */ goto next_item; } else if (!AQR_IS_FLAG(aq_rcpt, AQR_FL_MEMAR) && !AQR_IS_FLAG(aq_rcpt, AQR_FL_RCVD4AR)) { /* send it to AR */ ret = qmgr_rcpt2ar(qmgr_ctx, aq_rcpt, THR_LOCK_UNLOCK); if (sm_is_err(ret)) { /* and now? remove it again? */ QM_LEV_DPRINTFC(QDC_EDBR, 0, (QM_DEBFP, "sev=ERROR, func=qm_get_edb_entries, aq_rcpt=%p, rcpt_idx=%d, ss_ta=%s, qmgr_rcpt2ar=%r\n", aq_rcpt, aq_rcpt->aqr_idx, aq_ta->aqt_ss_ta_id, ret)); sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_DEFEDB, SM_LOG_ERR, 4, "sev=ERROR, func=qm_get_edb_entries, ss_ta=%s, rcpt_pa=%S, stat=%d, qmgr_rcpt2ar=%m" , aq_rcpt->aqr_ss_ta_id , aq_rcpt->aqr_pa , aq_rcpt->aqr_status, ret); /* stop on other errors?? */ if (sm_error_value(ret) == SM_E_NO_AR || sm_error_value(ret) == ENOMEM) goto error; /* keep aq_rcpt? cleanup will take care of it */ } else flags |= QDA_FL_ACT_SMAR; /*R: [no change] A/R:edb_req, (A/L/R|P):aq_ta, A/R:aq_rcpt, P:edbc_node */ } else { /* add rcpt to todo queue if address is available */ ret = aq_rdq_add(aq_ctx, aq_rcpt, NULL, THR_LOCK_UNLOCK); if (sm_is_err(ret)) { /* what to do on error?? stop for now.*/ QM_LEV_DPRINTFC(QDC_EDBR, 0, (QM_DEBFP, "sev=ERROR, func=qm_get_edb_entries, aq_rdq_add=%r\n", ret)); sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_DEFEDB, SM_LOG_ERR, 4, "sev=ERROR, func=qm_get_edb_entries, ss_ta=%s, rcpt_pa=%S, stat=%d, aq_rdq_add=%m" , aq_rcpt->aqr_ss_ta_id , aq_rcpt->aqr_pa , aq_rcpt->aqr_status, ret); goto error; } /*R: [no change] A/R:edb_req, (A/L/R|P):aq_ta, A/R:aq_rcpt, P:edbc_node */ } /*R: [no change] A/R:edb_req, (A/L/R|P):aq_ta, A/R:aq_rcpt, P:edbc_node */ /* Remove edbc_node */ if (edbc_node != NULL && sm_is_err(ret = edbc_rm(edbc_ctx, edbc_node))) { /* XXX the node might be freed! hence data is wrong! */ QM_LEV_DPRINTFC(QDC_EDBR, 0, (QM_DEBFP, "sev=ERROR, func=qm_get_edb_entries, edbc_rm(%p, %ld, %s)=%r\n", edbc_node, (long) edbc_node->ecn_next_try, edbc_node->ecn_rcpt_id, ret)); } /*R: A/R:edb_req, (A/L/R|P):aq_ta, A/R:aq_rcpt, (P|F):edbc_node */ /* check for temp fail, increase counter */ if (aq_rcpt != NULL && smtp_is_reply_temp(aq_rcpt->aqr_status) && aq_ta->aqt_rcpts_temp == 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_DEFEDB, SM_LOG_ERR, 1, "sev=ERROR, func=qm_get_edb_entries, ss_ta=%s, rcpt_pa=%S, stat=%d, aqt_rcpts_temp=0, status=inconsistent_data" , aq_rcpt->aqr_ss_ta_id , aq_rcpt->aqr_pa , aq_rcpt->aqr_status); ++aq_ta->aqt_rcpts_temp; } next_item: edbc_node = edbc_node_nxt; /* Reset pointers to avoid accidental free */ aq_ta = NULL; aq_rcpt = NULL; /*R: A/R:edb_req, N:aq_ta, N:aq_rcpt, P:edbc_node */ } if (edbc_entries_cnt > edbc_entries_lim ) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_DEFEDB, SM_LOG_ERR, 1, "sev=ERROR, func=qm_get_edb_entries, status=loop_exceeded_limit, count=%u, expected=%u" , edbc_entries_cnt, edbc_entries_lim ); } if (edbc_node != NULL) { if (pnext_try != NULL) *pnext_try = edbc_node->ecn_next_try - time_now; QM_LEV_DPRINTFC(QDC_EDBR, 4, (QM_DEBFP, "sev=DBG, func=qm_get_edb_entries, stop loop due to next_try; id=%s, next_try=%6ld, now=%6ld, diff=%ld\n", edbc_node->ecn_rcpt_id, (long) edbc_node->ecn_next_try, (long) time_now, (long) (edbc_node->ecn_next_try - time_now))); } else { QM_LEV_DPRINTFC(QDC_EDBR, 5, (QM_DEBFP, "sev=DBG, func=qm_get_edb_entries, stop loop, all entries done\n")); } if (aq_rcpt != NULL) { (void) aq_rcpt_rm(aq_ctx, aq_rcpt, AQR_RM_LOCK); aq_rcpt = NULL; } if (aq_ta != NULL && SM_IS_FLAG(fct_state, FST_AQTA_ALLOC)) { (void) aq_ta_rm(aq_ctx, aq_ta, true); aq_ta = NULL; /* not really necessary */ SM_CLR_FLAG(fct_state, FST_AQTA_ALLOC); } if (SM_IS_FLAG(fct_state, FST_WR_EDB)) { ret = edb_wr_status(edb_ctx, &edb_req_hd); if (sm_is_err(ret)) { /* XXX What to do? */ QM_LEV_DPRINTFC(QDC_EDBR, 2, (QM_DEBFP, "sev=ERROR, func=qm_get_edb_entries, edb_wr_stat=%r\n", ret)); } } else (void) edb_reql_free(edb_ctx, &edb_req_hd); SM_ASSERT(SM_IS_FLAG(fct_state, FST_EDBC_LCK)); r = pthread_mutex_unlock(&edbc_ctx->edbc_mutex); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_DEFEDB, SM_LOG_CRIT, 1, "sev=CRIT, func=qm_get_edb_entries, unlock_edbc=%d" , r); SM_ASSERT(r == 0); } SM_CLR_FLAG(fct_state, FST_EDBC_LCK); if (edb_req != NULL) { (void) edb_req_rel(edb_ctx, edb_req, 0, THR_LOCK_UNLOCK); edb_req = NULL; } #if QMGR_DEBUG if (chkaqta) chkaq_ta(qmgr_ctx, AQ_TAS_FIRST(aq_ctx)); #endif return flags; error: /* invoke edb_wr_status(edb_ctx, &edb_req_hd); ?? */ if (SM_IS_FLAG(fct_state, FST_WR_EDB)) (void) edb_wr_status(edb_ctx, &edb_req_hd); else (void) edb_reql_free(edb_ctx, &edb_req_hd); if (aq_rcpt != NULL) { (void) aq_rcpt_rm(aq_ctx, aq_rcpt, AQR_RM_LOCK); aq_rcpt = NULL; /* not really necessary */ } if (aq_ta != NULL && SM_IS_FLAG(fct_state, FST_AQTA_ALLOC)) { (void) aq_ta_rm(aq_ctx, aq_ta, true); aq_ta = NULL; /* not really necessary */ SM_CLR_FLAG(fct_state, FST_AQTA_ALLOC); } if (SM_IS_FLAG(fct_state, FST_EDBC_LCK)) { r = pthread_mutex_unlock(&edbc_ctx->edbc_mutex); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SCHED, QM_LMOD_DEFEDB, SM_LOG_CRIT, 1, "sev=CRIT, func=qm_get_edb_entries, unlock_edbc=%d" , r); SM_ASSERT(r == 0); } SM_CLR_FLAG(fct_state, FST_EDBC_LCK); } if (edb_req != NULL) { (void) edb_req_rel(edb_ctx, edb_req, (sm_is_err(ret) && sm_error_value(ret) == ENOMEM) ? EDB_RQF_FREE : 0, THR_LOCK_UNLOCK); edb_req = NULL; /* not really necessary */ } QM_LEV_DPRINTFC(QDC_EDBR, 0, (QM_DEBFP, "sev=ERROR, qm_get_edb_entries=%r\n", ret)); return ret; }