/*
* Copyright (c) 2002-2005 Sendmail, Inc. and its suppliers.
* All rights reserved.
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the sendmail distribution.
*/
#include "sm/generic.h"
SM_RCSID("@(#)$Id: qm_fr_ar.c,v 1.151 2007/06/03 02:19:15 ca Exp $")
#include "sm/error.h"
#include "sm/assert.h"
#include "sm/memops.h"
#include "sm/io.h"
#include "sm/rcb.h"
#include "sm/common.h"
#include "sm/qmgr.h"
#include "sm/qmgr-int.h"
#include "qmgr.h"
#include "sm/edb.h"
#include "sm/reccom.h"
#include "sm/da.h"
#include "sm/dns.h"
#include "sm/smar.h"
#include "sm/aqrdq.h"
#include "qmgr.h"
#include "log.h"
/*
** Context structure for owner handling
** roc_old_owners: old number of owners (aq_ta->aqt_owners_n)
** roc_add_owners: number of owner addresses in this invocation
** roc_map: mapping owner indices (as given by SMAR) to indices in
** aq_ta->aqt_owners_pa
*/
struct rcpt_owner_ctx_S
{
rcpt_idx_T roc_old_owners;
rcpt_idx_T roc_add_owners;
size_t roc_map_size;
rcpt_idx_T *roc_map;
};
typedef struct rcpt_owner_ctx_S rcpt_owner_ctx_T, *rcpt_owner_ctx_P;
/*
** QAR_RCPT_ST -- Decode error status received from AR
**
** Parameters:
** aq_rcpt -- AQ rcpt
** v -- status code
**
** Returns:
** usual sm_error code; SM_E_UNEXPECTED
**
** Side Effects:
** updates aq_rcpt, esp. aqr_status_new
**
** Last code review: 2005-03-29 18:07:23
** Last code change:
*/
static sm_ret_T
qar_rcpt_st(aq_rcpt_P aq_rcpt, uint32_t v)
{
smtp_status_T rcpt_status;
#if 0
QM_LEV_DPRINTFC(QDC_A2Q, 3, (QM_DEBFP, "sev=WARN, func=qar_rcpt_st, where=updated, rcpt_id=%s, ss_ta=%s, idx=%d, state=%#x, addr_max=%d\n", rcpt_id, ta_ss_id, rcpt_idx, v, aq_rcpt->aqr_addr_max));
#endif
if (sm_is_temp_err(v) || (IS_SMTP_REPLY(v) && smtp_is_reply_temp(v))) {
rcpt_status = AQR_ST_TEMP;
AQR_SET_FLAG(aq_rcpt, AQR_FL_TEMP|AQR_FL_ARF|AQR_FL_RCVD4AR|AQR_FL_STAT_NEW);
}
else if (sm_is_perm_err(v) || (IS_SMTP_REPLY(v) && smtp_is_reply_fail(v))) {
rcpt_status = AQR_ST_PERM;
AQR_SET_FLAG(aq_rcpt, AQR_FL_PERM|AQR_FL_ARF|AQR_FL_RCVD4AR|AQR_FL_STAT_NEW);
}
else {
#if 0
QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qar_rcpt_st, where=update, rcpt_id=%s, ss_ta=%s, idx=%d, unknown_state=%#x\n", rcpt_id, ta_ss_id, rcpt_idx, v));
#endif
return sm_error_perm(SM_EM_Q_AR2Q, SM_E_UNEXPECTED);
}
switch ((sm_ret_T) v) {
case DNSR_NOTFOUND:
rcpt_status = SMTP_AR_NOTF;
break;
case DNSR_TEMP:
rcpt_status = SMTP_AR_TEMP;
break;
case sm_error_perm(SM_EM_AR, SM_E_ALIASEXP):
case DNSR_PERM:
rcpt_status = SMTP_AR_PERM;
break;
case DNSR_TIMEOUT:
rcpt_status = SMTP_AR_TMO;
break;
case sm_error_perm(SM_EM_AR, SM_E_MXEMPTY):
rcpt_status = SMTP_AR_MXEMPTY;
break;
case sm_error_temp(SM_EM_AR, SM_E_ALIASEXP):
rcpt_status = SMTP_AR_ALIAS;
break;
case sm_error_perm(SM_EM_AR, SM_E_ALIAS_REC):
rcpt_status = SMTP_AR_AL_REC;
break;
case sm_error_perm(SM_EM_AR, ELOOP):
rcpt_status = SMTP_AR_LOOP;
break;
default:
if (IS_SMTP_REPLY(v))
rcpt_status = v;
break;
}
/*
** Need to update recipient in various DBs (also status
** and counters). See da_stat.c, need to split
** qda_update_ta_stat() to allow for update of aq_rcpts.
** Instead of doing this now (for each recipient when it
** comes in, which may cause a lot of "small" updates),
** do it in the scheduler: aq_rcpt is marked as "failed"
** anyway and hence it is possible to go through the list
** and update several at once. That code should be similar
** to qda_update_ta_stat().
** We could keep track of the number of failures, and
** link those entries to allow for "fast" access.
** When a certain threshold is reached, those recipients
** are removed from AQ and the various DBs and counters
** are updated in a single "transaction".
*/
aq_rcpt->aqr_err_st = DA_AR_ERR;
AQR_SET_FLAG(aq_rcpt, AQR_FL_ERRST_UPD);
aq_rcpt->aqr_status_new = rcpt_status;
aq_rcpt->aqr_addr_max = 0;
return SM_SUCCESS;
}
/*
** QAR_OWNIDXMAP -- map owner index to local offset
**
** Parameters:
** owner_idx -- owner index as returned from AR
** roc -- owner context
**
** Returns:
** >=0: index in map
** <0: usual sm_error code: SM_E_NOTFOUND
**
** Last code review: 2005-03-29 23:01:13
** Last code change:
*/
static sm_ret_T
qar_ownidxmap(rcpt_idx_T owner_idx, rcpt_owner_ctx_P roc)
{
rcpt_idx_T i;
SM_ASSERT(roc != NULL);
for (i = 0; i < roc->roc_add_owners; i++) {
if (owner_idx == roc->roc_map[i])
return i + roc->roc_old_owners + 1;
}
return sm_error_perm(SM_EM_AQ, SM_E_NOTFOUND);
}
/*
** QAR_GET_ADDRS -- Get (IPv4) addresses from AR
**
** Parameters:
** aq_rcpt -- AQ rcpt
** rcb -- RCB from AR
** now -- current time
** nr -- number of addresses
** prv -- (pointer to) return value for caller to use (output)
** returns an error if "fatal" for communication, e.g.,
** protocol error.
**
** Returns:
** usual sm_error code; SM_E_OVFLW_SC, SM_E_PR_ERR
**
** Side Effects: allocate aqr_addrs (usually), must be free()d by caller
** on error. may fill in aqr_addrs only partially.
**
** Last code review: 2005-03-30 17:49:10
** Last code change:
*/
static sm_ret_T
qar_get_addrs(aq_rcpt_P aq_rcpt, sm_rcb_P rcb, time_T now, uint32_t nr, sm_ret_T *prv)
{
sm_ret_T ret;
uint32_t v, l, rt, i, n_A;
size_t addrs_size;
*prv = ret = SM_SUCCESS;
n_A = (nr < SM_DNS_A_MAX) ? nr : SM_DNS_A_MAX;
addrs_size = n_A * sizeof(*(aq_rcpt->aqr_addrs));
if (addrs_size < n_A || addrs_size < sizeof(*(aq_rcpt->aqr_addrs))) {
ret = sm_error_perm(SM_EM_Q_AR2Q, SM_E_OVFLW_SC);
goto error;
}
/* Later on: check size and realloc() if necessary */
aq_rcpt->aqr_addrs = (aq_raddr_P) sm_zalloc(addrs_size);
/* Use a fallback if memory allocation fails */
if (NULL == aq_rcpt->aqr_addrs) {
AQR_SET_FLAG(aq_rcpt, AQR_FL_MEMAR|AQR_FL_ARINCOMPL);
aq_rcpt->aqr_addr_max = 1;
aq_rcpt->aqr_addrs = &aq_rcpt->aqr_addr_mf;
}
aq_rcpt->aqr_addr_cur = 0;
/* Even if alloc failed: read all data (or ignore it?) */
for (i = 0; i < nr; i++) {
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
if (sm_is_err(ret) || l != 4 || rt != RT_R2Q_RCPT_IPV4) {
QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qar_get_addrs, expected=%#x, rt=%#x, v=%d, l=%d, ret=%r, i=%d, nr=%d\n", RT_R2Q_RCPT_IPV4, rt, v, l, ret, i, nr));
*prv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
goto error;
}
if ((!AQR_IS_FLAG(aq_rcpt, AQR_FL_MEMAR) || i == 0) && i < n_A)
aq_rcpt->aqr_addrs[i].aqra_ipv4 = v;
#if 0
QM_LEV_DPRINTFC(QDC_A2Q, 6, (QM_DEBFP, "sev=DBG, func=qar_get_addrs, rcpt_id=%s, %d/%d, ip=%A\n", rcpt_id, i, nr, (ipv4_T) v));
#endif
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
if (sm_is_err(ret) || l != 4 || rt != RT_R2Q_RCPT_PRE) {
QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qar_get_addrs, expected=%#x, rt=%#x, v=%d, l=%d, ret=%r\n", RT_R2Q_RCPT_PRE, rt, v, l, ret));
*prv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
goto error;
}
if ((!AQR_IS_FLAG(aq_rcpt, AQR_FL_MEMAR) || i == 0) && i < n_A)
aq_rcpt->aqr_addrs[i].aqra_pref = v;
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
if (sm_is_err(ret) || l != 4 || rt != RT_R2Q_RCPT_TTL) {
QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qar_get_addrs, expected=%#x, rt=%#x, v=%d, l=%d, ret=%r\n", RT_R2Q_RCPT_TTL, rt, v, l, ret));
*prv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
goto error;
}
if ((!AQR_IS_FLAG(aq_rcpt, AQR_FL_MEMAR) || i == 0) && i < n_A)
aq_rcpt->aqr_addrs[i].aqra_expt = now + v;
}
AQR_SET_FLAG(aq_rcpt, AQR_FL_RCVD4AR|AQR_FL_RDY4DLVRY);
if (!AQR_IS_FLAG(aq_rcpt, AQR_FL_MEMAR))
aq_rcpt->aqr_addr_max = nr;
return ret;
error:
if (!sm_is_err(ret))
ret = sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
return ret;
}
/*
** QAR_ALIAS -- Decode data received from AR for alias expansion
** recipients are written to EDB (using an EDB request list), not to AQ!
**
** Parameters:
** qar_ctx -- AR context
** aq_rcpt -- AQ rcpt
** nae -- number of aliases
** rcb -- RCB from which to read data
** prv -- (pointer to) return value for caller to use (output)
** returns an error if "fatal" for communication, e.g.,
** protocol error.
**
** Returns:
** usual sm_error code
**
** Locking:
** aq_ctx and defedb should be locked by caller
**
** Called by: qm_fr_ar_react()
**
** Todo: error handling!
** need to undo all changes done so far for any resource problem.
** what then? how is caller supposed to deal with that? just log it
** and try again later? theoretically cleanup should take care of it:
** it will remove aq_rcpt after a timeout and try to write it to EDB.
** we could try to do this "now" (maybe it makes sense to have one
** rcb entry for this as "emergency backup" available?)
**
** Last code review:
** Last code change:
*/
static sm_ret_T
qar_alias(qar_ctx_P qar_ctx, aq_rcpt_P aq_rcpt, uint32_t nae, sm_rcb_P rcb, int *prv, rcpt_owner_ctx_P roc)
{
uint32_t v, l, rt, alias_idx;
sm_ret_T ret, rcpt_status;
qmgr_ctx_P qmgr_ctx;
aq_ctx_P aq_ctx;
edb_ctx_P edb_ctx;
aq_rcpt_P aq_rcpt_a;
aq_ta_P aq_ta;
uint iqdb_rcpts_done;
uint32_t fct_state;
rcpt_id_T rcpt_id;
time_T now;
ibdb_req_hd_T ibdb_req_hd;
edb_req_hd_T edb_req_hd;
#define FST_IBDB_USED 0x0001 /* ibdb req list is in use */
#define FST_EDBRQL_USED 0x0002 /* edb req list is in use */
#define FST_EBD_WRITTEN 0x0004 /* edb has been written */
#define FST_IBBD_WRITTEN 0x0008 /* ibdb has been written */
#define FST_AQR_NEW 0x0010 /* aq_rcpt_new failed */
#define FST_EDB_RCPT_APP 0x0020 /* edb_rcpt_app failed */
#define FST_EDBC_ADD 0x0040 /* edbc_add failed */
#define FST_EDB_RCPT_RM 0x0080 /* edb_rcpt_rm_req failed */
SM_IS_QAR_CTX(qar_ctx);
qmgr_ctx = qar_ctx->qar_qmgr_ctx;
SM_IS_QMGR_CTX(qmgr_ctx);
aq_ctx = qmgr_ctx->qmgr_aq;
SM_IS_AQ(aq_ctx);
edb_ctx = qmgr_ctx->qmgr_edb;;
ret = SM_SUCCESS;
now = evthr_time(qmgr_ctx->qmgr_ev_ctx);
aq_rcpt_a = NULL;
aq_ta = aq_rcpt->aqr_ss_ta;
iqdb_rcpts_done = 0;
fct_state = 0;
IBDBREQL_INIT(&ibdb_req_hd);
SM_SET_FLAG(fct_state, FST_IBDB_USED);
EDBREQL_INIT(&edb_req_hd);
SM_SET_FLAG(fct_state, FST_EDBRQL_USED);
for (alias_idx = 0; alias_idx < nae; alias_idx++) {
ret = aq_rcpt_new(&aq_rcpt_a);
if (sm_is_err(ret)) {
SM_SET_FLAG(fct_state, FST_AQR_NEW);
goto error;
}
aq_rcpt_a->aqr_idx = aq_ta->aqt_nxt_idx + alias_idx;
SESSTA_COPY(aq_rcpt_a->aqr_ss_ta_id, aq_rcpt->aqr_ss_ta_id);
aq_rcpt_a->aqr_alias_idx = aq_rcpt->aqr_idx;
aq_rcpt_a->aqr_st_time = aq_rcpt->aqr_st_time;
aq_rcpt_a->aqr_ss_ta = aq_ta;
/* which flags to copy?? */
aq_rcpt_a->aqr_flags = AQR_FL_ALIAS; /* |aq_rcpt->aqr_flags */
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
if (sm_is_err(ret)) {
*prv = ret;
goto error;
}
rcpt_status = AQR_ST_NEW;
if (RT_R2Q_RCPT_ST == rt) {
/* handle error code */
ret = qar_rcpt_st(aq_rcpt_a, v);
QM_LEV_DPRINTFC(QDC_A2Q, 3, (QM_DEBFP, "sev=WARN, func=qar_alias, alias=%d/%d, status=%r, qar_rcpt_st=%r\n", alias_idx, nae, v, ret));
if (sm_is_err(ret)) {
*prv = ret;
goto error;
}
/* update recipient counters in TA */
ret = aq_upd_ta_rcpt_cnts(aq_ta, AQR_ST_NONE, aq_rcpt_a->aqr_status_new, qmgr_ctx->qmgr_lctx);
if (sm_is_err(ret)) {
QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qar_alias, alias=%d/%d, status=%r, aq_upd_ta_rcpt_cnts=%r\n", alias_idx, nae, v, ret));
goto error;
}
rcpt_status = aq_rcpt_a->aqr_status_new;
/* write rcpt to edb, see below */
}
else if (RT_R2Q_RCPT_DA == rt)
aq_rcpt_a->aqr_da_idx = v;
else {
*prv = sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
goto error;
}
/* RT_R2Q_RCPT_PORT (optional) or RT_R2Q_OWN_REF */
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
if (sm_is_err(ret) || l != 4) {
*prv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
goto error;
}
if (RT_R2Q_RCPT_PORT == rt) {
aq_rcpt_a->aqr_port = (short) v;
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
}
if (sm_is_err(ret) || l != 4 || rt != RT_R2Q_OWN_REF) {
*prv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
goto error;
}
if (v > 0) {
ret = qar_ownidxmap(v, roc);
if (sm_is_err(ret)) goto error;
aq_rcpt_a->aqr_owner_idx = (rcpt_idx_T) ret;
}
if (sm_is_success(sm_rcb_peek2uint32(rcb, &l, &rt)) &&
4 == l && RT_R2Q_RCPT_FL == rt)
{
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
if (sm_is_err(ret) || l != 4 || rt != RT_R2Q_RCPT_FL) {
*prv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
goto error;
}
if (SM_IS_FLAG(v, SMARRT_FL_VERP))
AQR_SET_FLAG(aq_rcpt_a, AQR_FL_HAS_VERP);
}
#if MTA_USE_TLS
if (!SM_RCB_ISEOB(rcb) &&
sm_is_success(sm_rcb_peek2uint32(rcb, &l, &rt)) &&
4 == l && RT_R2Q_MAP_RES_CNF_RCPT == rt)
{
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
if (sm_is_err(ret) || l != 4 || rt != RT_R2Q_MAP_RES_CNF_RCPT) {
*prv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
goto error;
}
aq_rcpt_a->aqr_maprescnf = v;
if (sm_is_temp_err(v) && !AQR_IS_FLAG(aq_rcpt, AQR_FL_TEMP)) {
aq_rcpt->aqr_status_new = SMTP_MAP_TEMP;
AQR_SET_FLAG(aq_rcpt, AQR_FL_TEMP|AQR_FL_ARF|AQR_FL_STAT_NEW);
}
ret = sm_rcb_get2uint32(rcb, &l, &rt);
if (sm_is_err(ret) || rt != RT_R2Q_RHS_CNF_RCPT) {
*prv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
goto error;
}
ret = sm_rcb_getnstr(rcb, &aq_rcpt_a->aqr_conf, l);
if (sm_is_err(ret)) {
*prv = ret;
goto error;
}
QM_LEV_DPRINTFC(QDC_A2Q, 8, (QM_DEBFP, "func=qar_alias, conf=%x\n", aq_rcpt->aqr_conf));
}
#endif /* MTA_USE_TLS */
ret = sm_rcb_get2uint32(rcb, &l, &rt);
if (sm_is_err(ret) || rt != RT_R2Q_RCPT_PA) {
*prv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
goto error;
}
ret = sm_rcb_getnstr(rcb, &aq_rcpt_a->aqr_pa, l);
QM_LEV_DPRINTFC(QDC_A2Q, 3, (QM_DEBFP, "sev=INFO, func=qar_alias, alias=%d/%d, new_addr=%S, owner_idx=%d, port=%hd, ret=%r\n", alias_idx, nae, aq_rcpt_a->aqr_pa, aq_rcpt_a->aqr_owner_idx, aq_rcpt_a->aqr_port, ret));
if (AQR_ST_NEW == rcpt_status) {
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
if (sm_is_err(ret) || rt != RT_R2Q_RCPT_NAR || l != 4) {
*prv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
goto error;
}
if (v < 1) {
*prv = sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
goto error;
}
ret = qar_get_addrs(aq_rcpt_a, rcb, now, v, prv);
if (sm_is_err(ret)) goto error;
}
sm_snprintf(rcpt_id, sizeof(rcpt_id), SMTP_RCPTID_FORMAT,
aq_rcpt_a->aqr_ss_ta_id, aq_rcpt_a->aqr_idx);
ret = edb_rcpt_app(edb_ctx, aq_rcpt_a, &edb_req_hd, rcpt_status);
if (sm_is_err(ret)) {
SM_SET_FLAG(fct_state, FST_EDB_RCPT_APP);
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_COMM, QM_LMOD_FROM_SMAR,
SM_LOG_ERR, 1,
"sev=ERROR, func=qar_alias, rcpt_id=%s, edb_rcpt_app=%m",
rcpt_id, ret);
goto error;
}
ret = edbc_add(qmgr_ctx->qmgr_edbc, rcpt_id, aq_rcpt_a->aqr_next_try, false);
if (sm_is_err(ret)) {
SM_SET_FLAG(fct_state, FST_EDBC_ADD);
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_COMM, QM_LMOD_FROM_SMAR,
SM_LOG_ERR, 1,
"sev=ERROR, func=qar_alias, rcpt_id=%s, edbc_add=%m",
rcpt_id, ret);
goto error;
}
/* reuse it?? later on... */
(void) aq_rcpt_free(aq_rcpt_a);
aq_rcpt_a = NULL;
}
/* XXX remove rcpt from aq after EDB has been written?! */
AQR_SET_FLAG(aq_rcpt, AQR_FL_REPLACED);
sm_snprintf(rcpt_id, sizeof(rcpt_id), SMTP_RCPTID_FORMAT,
aq_rcpt->aqr_ss_ta_id, aq_rcpt->aqr_idx);
if (AQR_IS_FLAG(aq_rcpt, AQR_FL_DEFEDB)) {
ret = edb_rcpt_rm_req(edb_ctx, rcpt_id, &edb_req_hd);
if (sm_is_err(ret)) {
/* XXX deal with error ... */
SM_SET_FLAG(fct_state, FST_EDB_RCPT_RM);
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_COMM, QM_LMOD_FROM_SMAR,
SM_LOG_ERR, 0,
"sev=ERROR, func=qar_alias, rcpt_id=%s, remove=%m"
, rcpt_id, ret);
goto error;
}
}
if (AQR_IS_FLAG(aq_rcpt, AQR_FL_IQDB)) {
ibdb_rcpt_T ibdb_rcpt;
ret = iqdb_rcpt_rm(qmgr_ctx->qmgr_iqdb, rcpt_id, SMTP_RCPTID_SIZE, THR_LOCK_UNLOCK);
if (sm_is_err(ret)) {
/* XXX */
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_COMM, QM_LMOD_FROM_SMAR,
SM_LOG_ERR, 1,
"sev=ERROR, func=qar_alias, rcpt_id=%s, iqdb_rcpt_rm=%m",
rcpt_id, ret);
goto error;
}
else {
/* One rcpt successfully removed from IQDB */
++iqdb_rcpts_done;
}
ibdb_rcpt.ibr_ta_id = aq_rcpt->aqr_ss_ta_id;
ibdb_rcpt.ibr_pa = aq_rcpt->aqr_pa;
ibdb_rcpt.ibr_idx = aq_rcpt->aqr_idx;
ret = ibdb_rcpt_status(qmgr_ctx->qmgr_ibdb, &ibdb_rcpt,
IBDB_RCPT_DONE, IBDB_FL_NOROLL, THR_LOCK_UNLOCK);
if (sm_is_err(ret)) {
/*
** XXX How to handle this error?
** Set a QMGR status flag?
** The system is probably out of memory.
*/
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_COMM, QM_LMOD_FROM_SMAR,
SM_LOG_ERR, 1,
"sev=ERROR, func=qar_alias, ss_ta=%s, ibdb_rcpt_app=%m",
aq_rcpt->aqr_ss_ta_id, ret);
goto error;
}
else
QM_LEV_DPRINTFC(QDC_A2Q, 5, (QM_DEBFP, "sev=DBG, func=qar_alias, ss_ta=%s, ibdb_rcpt_app=%r\n", aq_rcpt->aqr_ss_ta_id, ret));
}
/* update aq_ta; does this need to be undone if an error occurs?? */
aq_ta->aqt_nxt_idx += nae;
aq_ta->aqt_rcpts_tot += nae - 1;
aq_ta->aqt_rcpts_left += nae - 1;
ret = edb_ta_app(edb_ctx, aq_ta, &edb_req_hd, 0 /* XXX? */);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_COMM, QM_LMOD_FROM_SMAR,
SM_LOG_ERR, 1,
"sev=ERROR, func=qar_alias, ss_ta=%s, edb_ta_app=%m",
aq_rcpt->aqr_ss_ta_id, ret);
goto error;
}
/* Removed any entries from INCEDB? */
if (iqdb_rcpts_done > 0) {
ret = qda_upd_iqdb(qmgr_ctx, iqdb_rcpts_done, aq_rcpt->aqr_ss_ta_id, aq_ta->aqt_cdb_id, &ibdb_req_hd);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_COMM, QM_LMOD_FROM_SMAR,
SM_LOG_ERR, 1,
"sev=ERROR, func=qar_alias, ss_ta=%s, qda_upd_iqdb=%m",
aq_rcpt->aqr_ss_ta_id, ret);
goto error;
}
}
/* write request list ... */
ret = edb_wr_status(edb_ctx, &edb_req_hd);
QM_LEV_DPRINTFC(QDC_A2Q, 2, (QM_DEBFP, "sev=DBG, func=qar_alias, edb_wr_status=%r\n", ret));
/* write ibdb... if necessary */
if (sm_is_success(ret)) {
SM_SET_FLAG(fct_state, FST_EBD_WRITTEN);
if (!AQ_TA_IS_FLAG(aq_ta, AQ_TA_FL_DEFEDB)) {
AQ_TA_SET_FLAG(aq_ta, AQ_TA_FL_DEFEDB);
aq_ctx->aq_d_entries++;
QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=DBG, func=qar_alias, ss_ta=%s, ta_flags=%#x, aq_d_entries=%u, aq_entries=%u\n", aq_ta->aqt_ss_ta_id, aq_ta->aqt_flags, aq_ctx->aq_d_entries, aq_ctx->aq_entries));
SM_ASSERT(aq_ctx->aq_d_entries <= aq_ctx->aq_entries);
}
SM_CLR_FLAG(fct_state, FST_EDBRQL_USED);
ret = ibdb_wr_status(qmgr_ctx->qmgr_ibdb, &ibdb_req_hd);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_COMM, QM_LMOD_FROM_SMAR,
SM_LOG_ERR, 1,
"sev=ERROR, func=qar_alias, ibdb_wr_status=%m",
ret);
}
else {
SM_CLR_FLAG(fct_state, FST_IBDB_USED);
SM_SET_FLAG(fct_state, FST_IBBD_WRITTEN);
}
}
else {
/* XXX things to cancel? */
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_COMM, QM_LMOD_FROM_SMAR,
SM_LOG_ERR, 1,
"sev=ERROR, func=qar_alias, rcpt_id=%s, edb_wr_status=%m",
rcpt_id, ret);
goto error;
}
return SM_SUCCESS;
error:
/* CONTINUE */
if (SM_IS_FLAG(fct_state, FST_EDBRQL_USED)) {
(void) edb_reql_free(edb_ctx, &edb_req_hd);
SM_CLR_FLAG(fct_state, FST_EDBRQL_USED);
}
if (SM_IS_FLAG(fct_state, FST_IBDB_USED)) {
(void) ibdb_req_cancel(qmgr_ctx->qmgr_ibdb, &ibdb_req_hd);
SM_CLR_FLAG(fct_state, FST_IBDB_USED);
}
if (aq_rcpt_a != NULL) {
(void) aq_rcpt_free(aq_rcpt_a);
aq_rcpt_a = NULL;
}
if (sm_is_err(ret))
QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qar_alias, rt=%#x, v=%d, l=%d, ret=%r, rv=%r\n", rt, v, l, ret, *prv));
/* a protocol error but ret does not indicate an error -> set ret too */
if (sm_is_err(*prv) && !sm_is_err(ret))
ret = *prv;
return ret;
}
#undef FST_IBDB_USED
#undef FST_EDBRQL_USED
/*
** QAR_REACT -- Decode data received from AR and act accordingly
**
** Parameters:
** qar_ctx -- AR context
**
** Returns:
** usual sm_error code
** Only returns an error if "fatal" for communication, e.g.,
** protocol error.
**
** Locking:
** qar_ctx: locked by access via task
** aq_ctx: locked if aq_rcpt is found
**
** Called by: qm_fr_ar()
**
** Last code review: 2003-10-29 16:30:16, see comments below.
** Last change: 2005-03-29 21:35:38
**
** Todo: error handling!
*/
static sm_ret_T
qm_fr_ar_react(qar_ctx_P qar_ctx)
{
uint32_t v, l, rt, tl, nae;
sm_ret_T ret, rv;
sm_rcb_P rcb;
qmgr_ctx_P qmgr_ctx;
aq_ctx_P aq_ctx;
aq_rcpt_P aq_rcpt;
aq_ta_P aq_ta;
rcpt_id_T rcpt_id;
sessta_id_T ta_ss_id;
rcpt_idx_T rcpt_idx;
int r;
time_T now;
bool notify_sched;
uint fct_state;
rcpt_owner_ctx_T roc;
#define FST_EDB_LOCKED 0x01
#define FST_AQ_RCPT_LOCKED 0x02
#define FST_RCB_OPEN_DEC 0x04
SM_IS_QAR_CTX(qar_ctx);
qmgr_ctx = qar_ctx->qar_qmgr_ctx;
SM_IS_QMGR_CTX(qmgr_ctx);
aq_ctx = qmgr_ctx->qmgr_aq;
SM_IS_AQ(aq_ctx);
ret = rv = SM_SUCCESS;
now = evthr_time(qmgr_ctx->qmgr_ev_ctx);
notify_sched = true;
nae = 1; /* number of alias expansions */
sm_memzero(&roc, sizeof(roc));
fct_state = 0;
aq_rcpt = NULL;
/* decode rcb */
rcb = qar_ctx->qar_com.rcbcom_rdrcb;
ret = sm_rcb_open_dec(rcb);
if (sm_is_err(ret)) {
rv = ret;
goto error;
}
SM_SET_FLAG(fct_state, FST_RCB_OPEN_DEC);
/* total length of record */
ret = sm_rcb_getuint32(rcb, &tl);
if (sm_is_err(ret) || tl > QM_AR_MAX_REC_LEN || tl > sm_rcb_getlen(rcb)) {
rv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_AR2Q, SM_E_RCB2LONG);
goto error;
}
/* protocol header: version */
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
if (sm_is_err(ret)) {
rv = ret;
goto error;
}
if (l != 4 || rt != RT_PROT_VER || v != PROT_VER_RT) {
rv = sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_V_MISM);
goto error;
}
/* define protocol first in smX docs! */
/*
see smar/rcpt.c for the current protocol!
RT_R2Q_RCPT_ID, smar_rcpt->arr_id,
- RT_R2Q_RCPT_ST: error out
or
- RT_R2Q_RCPT_DA: continue
optional: RT_R2Q_RCPT_PA: 1-1 alias replacement
RT_R2Q_RCPT_NAR
loop for number of addresses
RT_R2Q_RCPT_IPV4
RT_R2Q_RCPT_PRE
RT_R2Q_RCPT_TTL
or
- RT_R2Q_RCPT_AE: alias expansion (1-n, n>1)
The protocol isn't good... too many cases to distinguish,
to much replicated code?
How about:
RT_R2Q_RCPT_NR: number of recipients
for each recipient:
either RT_R2Q_RCPT_ST: error from AR
or RT_R2Q_RCPT_DA
optional (only for #rcpts==1): RT_R2Q_RCPT_PA: 1-1 alias replacement
RT_R2Q_RCPT_NAR
loop for number of addresses
RT_R2Q_RCPT_IPV4
RT_R2Q_RCPT_PRE
RT_R2Q_RCPT_TTL
doesn't work:
no alias expansion (or 1-1) is very different from 1-n (n>1)
the latter requires:
n (number of new addresses)
rcpt_id and rcpt_pa for each new address
moreover, the original rcpt_id is needed to mark it as "replaced"
nevertheless, it might be better to send RT_R2Q_RCPT_AE before all other data
*/
/* decode data, act accordingly... */
ret = sm_rcb_get2uint32(rcb, &l, &rt);
if (sm_is_err(ret) || l != SMTP_RCPTID_SIZE || rt != RT_R2Q_RCPT_ID) {
rv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
goto error;
}
ret = sm_rcb_getn(rcb, (uchar *) rcpt_id, l);
if (sm_is_err(ret)) {
rv = ret;
goto error;
}
/* "decode" rcpt_id: ta_id and idx */
RCPTID2SESSTAID(rcpt_id, ta_ss_id);
r = RCPTID2IDX(rcpt_id, rcpt_idx);
if (r != 1) goto error;
/*
** NOTE: edbc MUST be locked before aq if aliases are returned!
** For now edbc is always locked, it shouldn't cause any significant
** contention: edbc isn't accessed without aq being locked too, so
** since aq must be locked here, it is ok to lock edb too.
** Why is it edbc and not edb? There seems to be some
** "cross locking" (if edbc is locked, don't access edb?)
** See README.dev.
*/
r = pthread_mutex_lock(&qmgr_ctx->qmgr_edbc->edbc_mutex);
SM_LOCK_OK(r);
if (r != 0) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_COMM, QM_LMOD_FROM_SMAR,
SM_LOG_ERR, 1,
"sev=CRIT, func=qmgr_aq_cleanup, lock_edbc=%m",
sm_err_perm(r));
goto error;
}
SM_SET_FLAG(fct_state, FST_EDB_LOCKED);
/* lookup rcpt_id in AQ */
ret = aq_rcpt_find_ss(aq_ctx, ta_ss_id, rcpt_idx, THR_LOCK_UNLERR, &aq_rcpt);
if (sm_is_err(ret)) {
/*
** This can happen if a recipient has been removed due to
** a timeout, just ignore it, the recipient will be tried
** later on again.
*/
if (ret == sm_error_perm(SM_EM_AQ, SM_E_NOTFOUND)) {
QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=WARN, func=qm_fr_ar_react, aq_rcpt_find_ss=NOTFOUND, rcpt_id=%s, ss_ta=%s, idx=%d\n", rcpt_id, ta_ss_id, rcpt_idx));
ret = 0;
}
else
QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_ar_react, aq_rcpt_find_ss=%r, rcpt_id=%s, ss_ta=%s, idx=%d\n", ret, rcpt_id, ta_ss_id, rcpt_idx));
goto error;
}
SM_SET_FLAG(fct_state, FST_AQ_RCPT_LOCKED);
aq_ta = aq_rcpt->aqr_ss_ta;
SM_IS_AQ_TA(aq_ta);
if (!AQR_IS_FLAG(aq_rcpt, AQR_FL_SENT2AR)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_COMM, QM_LMOD_FROM_SMAR,
SM_LOG_WARN, 7,
"sev=WARN, func=qm_fr_ar_react, rcpt_id=%s, status=not_sent2ar"
, rcpt_id);
goto unlock;
}
/* aq_rcpt (aq_ctx) is locked! */
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
if (sm_is_err(ret) || l != 4) {
rv = sm_is_err(ret) ? ret : sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
goto error;
}
if (RT_R2Q_RCPT_ST == rt) {
QM_LEV_DPRINTFC(QDC_A2Q, 3, (QM_DEBFP, "sev=WARN, func=qm_fr_ar_react, rcpt_id=%s, ss_ta=%s, idx=%d, state=%#x, addr_max=%d, where=global\n", rcpt_id, ta_ss_id, rcpt_idx, v, aq_rcpt->aqr_addr_max));
ret = qar_rcpt_st(aq_rcpt, v);
if (sm_is_err(ret)) goto error;
/* See comments in qar_rcpt_st() */
++aq_ta->aqt_rcpts_arf;
goto done; /* XXX Really? */
}
if (RT_R2Q_OWN_N == rt) {
uint n, map_idx, total_owners;
rcpt_idx_T owner_idx;
sm_str_P *owners_pa;
sm_str_P pa;
owners_pa = NULL;
pa = NULL;
roc.roc_add_owners = v;
owner_idx = roc.roc_old_owners = aq_ta->aqt_owners_n;
total_owners = roc.roc_add_owners + roc.roc_old_owners;
SM_ASSERT(total_owners >= roc.roc_add_owners);
SM_ASSERT(total_owners >= roc.roc_old_owners);
owners_pa = sm_zalloc(total_owners * sizeof(*owners_pa));
if (NULL == owners_pa) {
ret = sm_error_temp(SM_EM_Q_AR2Q, ENOMEM);
goto error;
}
roc.roc_map_size = v * sizeof(*roc.roc_map);
if (roc.roc_map_size < v || roc.roc_map_size < sizeof(*roc.roc_map)) {
ret = sm_error_perm(SM_EM_Q_AR2Q, SM_E_OVFLW_SC);
goto err_own;
}
roc.roc_map = sm_zalloc(roc.roc_map_size);
if (roc.roc_map == NULL) {
ret = sm_error_temp(SM_EM_Q_AR2Q, ENOMEM);
goto err_own;
}
QM_LEV_DPRINTFC(QDC_A2Q, 6, (QM_DEBFP, "func=qm_fr_ar_react, owners=%d\n", v));
for (n = v, map_idx = 0; n > 0; n--) {
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
if (sm_is_err(ret) || l != 4 || rt != RT_R2Q_OWN_IDX) {
rv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
goto err_own;
}
SM_ASSERT(map_idx < roc.roc_add_owners);
roc.roc_map[map_idx++] = v;
ret = sm_rcb_get2uint32(rcb, &l, &rt);
if (sm_is_err(ret) || rt != RT_R2Q_OWN_PA) {
rv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
goto err_own;
}
pa = NULL;
ret = sm_rcb_getnstr(rcb, &pa, l);
if (sm_is_err(ret)) goto err_own;
QM_LEV_DPRINTFC(QDC_A2Q, 6, (QM_DEBFP, "func=qm_fr_ar_react, owner_idx=%d, rcpt_idx=%d, pa=%S\n", owner_idx, roc.roc_map[map_idx - 1], pa));
SM_ASSERT(owner_idx < total_owners);
owners_pa[owner_idx] = pa;
pa = NULL;
++owner_idx;
}
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
if (sm_is_err(ret) || l != 4) {
rv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
goto err_own;
}
/* copy over old data */
if (aq_ta->aqt_owners_pa != NULL) {
SM_ASSERT(aq_ta->aqt_owners_n > 0);
/* use sm_memcpy()? */
for (n = 0; n < aq_ta->aqt_owners_n; n++)
owners_pa[n] = aq_ta->aqt_owners_pa[n];
SM_FREE(aq_ta->aqt_owners_pa);
}
aq_ta->aqt_owners_pa = owners_pa;
owners_pa = NULL;
aq_ta->aqt_owners_n += roc.roc_add_owners;
err_own:
if (sm_is_err(ret)) {
if (owners_pa != NULL) {
for (n = roc.roc_old_owners; n < roc.roc_add_owners; n++)
SM_STR_FREE(owners_pa[n]);
SM_FREE(owners_pa);
}
SM_FREE_SIZE(roc.roc_map, roc.roc_map_size);
goto error;
}
}
if (RT_R2Q_RCPT_AE == rt) {
nae = v;
if (nae <= 1) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_COMM, QM_LMOD_FROM_SMAR,
SM_LOG_ERR, 3,
"sev=ERROR, func=qm_fr_ar_react, number_of_aliases=%u, status=illegal_value"
, v);
goto error;
}
}
if (1 == nae) {
sm_ret_T rcpt_state;
rcpt_state = SM_SUCCESS;
if (RT_R2Q_RCPT_ST == rt) {
QM_LEV_DPRINTFC(QDC_A2Q, 3, (QM_DEBFP, "sev=WARN, func=qm_fr_ar_react, rcpt_id=%s, ss_ta=%s, idx=%d, state=%#x, addr_max=%d, where=individual\n", rcpt_id, ta_ss_id, rcpt_idx, v, aq_rcpt->aqr_addr_max));
ret = qar_rcpt_st(aq_rcpt, v);
if (sm_is_err(ret)) goto error;
rcpt_state = v;
/* XXX See comments in qar_rcpt_st() */
++aq_ta->aqt_rcpts_arf;
}
else if (rt != RT_R2Q_RCPT_DA) {
rv = sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
goto error;
}
/* SM_ASSERT(RT_R2Q_RCPT_DA == rt) */
aq_rcpt->aqr_da_idx = v;
/* RT_R2Q_RCPT_PORT (optional) or RT_R2Q_OWN_REF */
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
if (sm_is_err(ret) || l != 4) {
rv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
goto error;
}
if (RT_R2Q_RCPT_PORT == rt) {
aq_rcpt->aqr_port = (short) v;
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
}
if (sm_is_err(ret) || l != 4 || rt != RT_R2Q_OWN_REF) {
rv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
goto error;
}
if (v > 0) {
ret = qar_ownidxmap(v, &roc);
if (sm_is_err(ret)) goto error;
aq_rcpt->aqr_owner_idx = (rcpt_idx_T) ret;
}
if (sm_is_success(sm_rcb_peek2uint32(rcb, &l, &rt)) &&
4 == l && RT_R2Q_RCPT_FL == rt)
{
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
if (sm_is_err(ret) || l != 4 || rt != RT_R2Q_RCPT_FL) {
rv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
goto error;
}
if (SM_IS_FLAG(v, SMARRT_FL_VERP))
AQR_SET_FLAG(aq_rcpt, AQR_FL_HAS_VERP);
}
#if MTA_USE_TLS
if (!SM_RCB_ISEOB(rcb) &&
sm_is_success(sm_rcb_peek2uint32(rcb, &l, &rt)) &&
4 == l && RT_R2Q_MAP_RES_CNF_RCPT == rt)
{
ret = sm_rcb_get3uint32(rcb, &l, &rt, &v);
if (sm_is_err(ret) || l != 4 || rt != RT_R2Q_MAP_RES_CNF_RCPT) {
rv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
goto error;
}
aq_rcpt->aqr_maprescnf = v;
if (sm_is_temp_err(v) && !AQR_IS_FLAG(aq_rcpt, AQR_FL_TEMP)) {
aq_rcpt->aqr_status_new = SMTP_MAP_TEMP;
AQR_SET_FLAG(aq_rcpt, AQR_FL_TEMP|AQR_FL_ARF|AQR_FL_STAT_NEW);
}
ret = sm_rcb_get2uint32(rcb, &l, &rt);
if (sm_is_err(ret) || rt != RT_R2Q_RHS_CNF_RCPT) {
rv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
goto error;
}
ret = sm_rcb_getnstr(rcb, &aq_rcpt->aqr_conf, l);
if (sm_is_err(ret)) {
rv = ret;
goto error;
}
QM_LEV_DPRINTFC(QDC_A2Q, 8, (QM_DEBFP, "func=qm_fr_ar_react, conf=%x\n", aq_rcpt->aqr_conf));
}
#endif /* MTA_USE_TLS */
if (rcpt_state != SM_SUCCESS && SM_RCB_ISEOB(rcb))
goto done;
ret = sm_rcb_get2uint32(rcb, &l, &rt);
if (sm_is_err(ret)) {
rv = ret;
goto error;
}
if (RT_R2Q_RCPT_PA == rt) {
sm_str_P pa;
/* save original address, get new one, assign old one */
pa = aq_rcpt->aqr_pa;
aq_rcpt->aqr_pa = NULL;
ret = sm_rcb_getnstr(rcb, &aq_rcpt->aqr_pa, l);
QM_LEV_DPRINTFC(QDC_A2Q, 3, (QM_DEBFP, "func=qm_fr_ar_react, new_addr=%S, ret=%r\n", aq_rcpt->aqr_pa, ret));
if (sm_is_err(ret)) {
sm_str_free(pa);
goto error;
}
aq_rcpt->aqr_orig_pa = pa;
if (rcpt_state != SM_SUCCESS)
goto done;
/* get RT_R2Q_RCPT_NAR, value is read below */
ret = sm_rcb_get2uint32(rcb, &l, &rt);
}
if (sm_is_err(ret) || rt != RT_R2Q_RCPT_NAR || l != 4) {
rv = sm_is_err(ret) ? ret
: sm_error_perm(SM_EM_Q_AR2Q, SM_E_PR_ERR);
goto error;
}
/* get value for RT_R2Q_RCPT_NAR */
ret = sm_rcb_getuint32(rcb, &v);
if (sm_is_err(ret)) {
rv = ret;
goto error;
}
if (v < 1) goto error;
ret = qar_get_addrs(aq_rcpt, rcb, now, v, &rv);
if (sm_is_err(ret)) goto error;
ret = aq_rdq_add(aq_ctx, aq_rcpt, &v, THR_NO_LOCK);
if (sm_is_err(ret)) {
/* XXX what to do on error? */
QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_ar_react, aq_rdq_add=%r\n", ret));
goto error;
}
else {
/* notify_sched = !SM_IS_FLAG(v, AQRDQ_FL_OCEXC); */
QM_LEV_DPRINTFC(QDC_A2Q, 4, (QM_DEBFP, "sev=DBG, func=qm_fr_ar_react, aq_rdq_add=%r, flags=%#x\n", ret, aq_rcpt->aqr_flags));
}
}
else {
/*
** Who does the proper error handling?
** - failed temporarily: mark aq_rcpt properly
** AQR_FL_TEMP|AQR_FL_ARF|AQR_FL_RCVD4AR|AQR_FL_STAT_NEW
** - failed permanently: trigger bounce?
** - other kind of errors?
*/
ret = qar_alias(qar_ctx, aq_rcpt, nae, rcb, &rv, &roc);
if (sm_is_err(ret)) goto error;
}
done:
ret = aq_waitq_rm(qmgr_ctx->qmgr_aq, aq_rcpt, AQWQ_AR, false);
if (sm_is_err(ret))
QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_ar_react, aq_waitq_rm=%r\n", ret));
if (AQR_IS_FLAG(aq_rcpt, AQR_FL_ARF)) {
/* first rm and then add: just move it? */
ret = aq_waitq_add(qmgr_ctx->qmgr_aq, aq_rcpt, 0, AQWQ_AR, false);
if (sm_is_err(ret))
QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_ar_react, aq_waitq_add=%r\n", ret));
else {
ret = qmgr_set_aq_cleanup(qmgr_ctx->qmgr_cleanup_ctx, now, true);
if (sm_is_err(ret))
QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_ar_react, qmgr_set_aq_cleanup=%r\n", ret));
}
}
/* call this before decrementing the counter? See the function! */
ret = qm_ar_actsched(qmgr_ctx, aq_rcpt, ¬ify_sched);
if (!AQR_IS_FLAG(aq_rcpt, AQR_FL_IS_BNC|AQR_FL_IS_DBNC)) {
SM_ASSERT(aq_ta->aqt_rcpts_ar > 0);
--aq_ta->aqt_rcpts_ar;
}
unlock:
if (SM_IS_FLAG(fct_state, FST_EDB_LOCKED)) {
r = pthread_mutex_unlock(&qmgr_ctx->qmgr_edbc->edbc_mutex);
SM_ASSERT(0 == r);
if (0 == r)
SM_CLR_FLAG(fct_state, FST_EDB_LOCKED);
}
/* remove expanded address from AQ */
if (nae > 1) {
ret = aq_rcpt_rm(aq_ctx, aq_rcpt, 0);
if (sm_is_success(ret)) {
SM_ASSERT(aq_ta->aqt_rcpts_inaq > 0);
--aq_ta->aqt_rcpts_inaq;
aq_rcpt = NULL;
}
}
if (SM_IS_FLAG(fct_state, FST_AQ_RCPT_LOCKED)) {
/* NOTE: aq_rcpt might be NULL already! */
ret = aq_rcpt_lockop(aq_ctx, aq_rcpt, THR_UNLOCK_IT);
SM_CLR_FLAG(fct_state, FST_AQ_RCPT_LOCKED);
}
QM_LEV_DPRINTFC(QDC_A2Q, 3, (QM_DEBFP, "sev=DBG, func=qm_fr_ar_react, rcpt_id=%s, status=updated, idx=%d, state=%d, aqt_rcpts_ar=%u, ret=%r, notify_sched=%d\n", rcpt_id, rcpt_idx, aq_rcpt == NULL ? -1 : aq_rcpt->aqr_status, aq_ta->aqt_rcpts_ar, ret, notify_sched));
/*
** Activate scheduler, but how? Always or use some test?
** Note: the scheduler must also be activated when a recipient has
** has been resolved because it tests the number of "outstanding"
** recipients before it schedules a transaction.
** Does this mean it should always be called? It might be nice
** to provide a simple function that tests whether the scheduler
** should be activated or some flags that tell the scheduler
** what has changed so it doesn't need to "walk" through all
** the ready queues...
*/
if (notify_sched) {
timeval_T nowt;
/* activate scheduler */
ret = evthr_timeval(qmgr_ctx->qmgr_ar_tsk->evthr_t_ctx, &nowt);
ret = evthr_new_sl(qmgr_ctx->qmgr_sched, nowt, false);
if (sm_is_err(ret)) {
/* what to do? */
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_COMM, QM_LMOD_FROM_SMAR,
SM_LOG_ERR, 4,
"sev=ERROR, func=qm_fr_ar_react, evthr_new_sl=%m"
, ret);
}
else
QM_LEV_DPRINTFC(QDC_A2Q, 4, (QM_DEBFP, "func=qm_fr_ar_react, evthr_new_sl=%r\n", ret));
}
SM_FREE_SIZE(roc.roc_map, roc.roc_map_size);
ret = sm_rcb_close_dec(qar_ctx->qar_com.rcbcom_rdrcb);
SM_CLR_FLAG(fct_state, FST_RCB_OPEN_DEC);
(void) sm_rcb_open_rcv(qar_ctx->qar_com.rcbcom_rdrcb);
return ret;
error:
if (aq_rcpt != NULL
&& aq_rcpt->aqr_addrs != NULL
&& !AQR_IS_FLAG(aq_rcpt, AQR_FL_MEMAR)
&& aq_rcpt->aqr_addrs != &aq_rcpt->aqr_addr_mf)
{
SM_FREE(aq_rcpt->aqr_addrs);
}
if (SM_IS_FLAG(fct_state, FST_AQ_RCPT_LOCKED)) {
(void) aq_rcpt_lockop(aq_ctx, aq_rcpt, THR_UNLOCK_IT);
SM_CLR_FLAG(fct_state, FST_AQ_RCPT_LOCKED);
}
if (SM_IS_FLAG(fct_state, FST_EDB_LOCKED)) {
r = pthread_mutex_unlock(&qmgr_ctx->qmgr_edbc->edbc_mutex);
SM_ASSERT(0 == r);
if (0 == r)
SM_CLR_FLAG(fct_state, FST_EDB_LOCKED);
}
if (SM_IS_FLAG(fct_state, FST_RCB_OPEN_DEC)) {
/* use rcb functions that don't do check the state */
(void) sm_rcb_close_decn(qar_ctx->qar_com.rcbcom_rdrcb);
SM_CLR_FLAG(fct_state, FST_RCB_OPEN_DEC);
}
/* open rcb to receive next record */
(void) sm_rcb_open_rcvn(qar_ctx->qar_com.rcbcom_rdrcb);
SM_FREE_SIZE(roc.roc_map, roc.roc_map_size);
if (sm_is_err(rv))
QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_ar_react, rt=%#x, v=%d, l=%d, ret=%r, rv=%r\n", rt, v, l, ret, rv));
return rv;
}
#undef FST_EDB_LOCKED
#undef FST_AQ_RCPT_LOCKED
#undef FST_RCB_OPEN_DEC
/*
** AR2QMGR -- AR - QMGR interface
**
** Parameters:
** tsk -- evthr task
**
** Returns:
** usual sm_error code
**
** Locking:
** qar_ctx: locked by access via task
**
** Called by: qmgr_ar() for read events
**
** Last code review: 2003-10-16
*/
sm_ret_T
qm_fr_ar(sm_evthr_task_P tsk)
{
int fd, r;
sm_ret_T ret;
qmgr_ctx_P qmgr_ctx;
qar_ctx_P qar_ctx;
SM_IS_EVTHR_TSK(tsk);
qar_ctx = (qar_ctx_P) tsk->evthr_t_actx;
SM_IS_QAR_CTX(qar_ctx);
qmgr_ctx = qar_ctx->qar_qmgr_ctx;
SM_IS_QMGR_CTX(qmgr_ctx);
fd = tsk->evthr_t_fd; /* checked in caller */
ret = sm_rcb_rcv(fd, qar_ctx->qar_com.rcbcom_rdrcb, QSS_RC_MINSZ);
QM_LEV_DPRINTTC(QDC_A2Q, 4, (QM_DEBFP, "sev=DBG, func=qm_fr_ar, tsk=%p, fd=%d, ret=%r, buf=%d, len=%d\n",
tsk, fd, ret, qar_ctx->qar_com.rcbcom_rdrcb->sm_rcb_base[0], sm_rcb_getlen(qar_ctx->qar_com.rcbcom_rdrcb)), qmgr_ctx->qmgr_ev_ctx->evthr_c_time);
if (ret > 0)
return EVTHR_WAITQ;
else if (0 == ret) {
ret = sm_rcb_close_rcv(qar_ctx->qar_com.rcbcom_rdrcb);
/* start appropriate function ... */
ret = qm_fr_ar_react(qar_ctx);
QM_LEV_DPRINTFC(QDC_A2Q, 5, (QM_DEBFP, "sev=DBG, func=qm_fr_ar, qm_fr_ar_react=%r\n", ret));
if (sm_is_err(ret))
goto termit; /* too harsh? */
else if (QMGR_R_WAITQ == ret)
return EVTHR_WAITQ;
else if (QMGR_R_ASYNC == ret)
return EVTHR_OK;
else if (EVTHR_DEL == ret)
goto termit; /* terminate this SMTPC */
else
return ret;
}
else if (SM_IO_EOF == ret) {
ret = sm_rcb_close_rcv(qar_ctx->qar_com.rcbcom_rdrcb);
termit:
QM_LEV_DPRINTFC(QDC_A2Q, 1, (QM_DEBFP, "sev=DBG, func=qm_fr_ar, task=%p, status=terminate, ret=%r\n", qar_ctx->qar_com.rcbcom_tsk, ret));
close(fd);
/* XXX see comment in qm_fr_ss */
tsk->evthr_t_fd = INVALID_FD; /* make it invalid */
qar_ctx->qar_status = QSC_ST_SH_DOWN;
r = pthread_mutex_lock(&qmgr_ctx->qmgr_mutex);
SM_LOCK_OK(r);
if (r != 0) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_COMM, QM_LMOD_FROM_SMAR,
SM_LOG_CRIT, 1,
"sev=CRIT, func=qm_fr_ar, lock=%d",
r);
goto error;
}
qmgr_ctx->qmgr_ar_tsk = NULL;
r = pthread_mutex_unlock(&qmgr_ctx->qmgr_mutex);
SM_ASSERT(0 == r);
/* free qar_ctx? done in qmgr_stop() */
qar_ctx->qar_com.rcbcom_tsk = NULL;
return EVTHR_DEL;
}
else { /* if (ret < 0) */
QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_ar, ret=%r, errno=%d\n", ret, errno));
}
QM_LEV_DPRINTFC(QDC_A2Q, 0, (QM_DEBFP, "sev=ERROR, func=qm_fr_ar, fd=%d\n", fd));
error:
return EVTHR_DEL;
}
syntax highlighted by Code2HTML, v. 0.9.1