/*
* Copyright (c) 2002-2005 Sendmail, Inc. and its suppliers.
* All rights reserved.
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the sendmail distribution.
*/
#include "sm/generic.h"
SM_RCSID("@(#)$Id: ibdb_commit.c,v 1.77 2007/06/24 02:52:57 ca Exp $")
#include "sm/error.h"
#include "sm/assert.h"
#include "sm/memops.h"
#include "sm/io.h"
#include "sm/rcb.h"
#include "sm/qmgr.h"
#include "sm/qmgr-int.h"
#include "sm/fxszq.h"
#include "qmgr.h"
#include "log.h"
/*
** QM_IBDB_COMMIT -- commit ibdb records and notify SMTPS
** (using list of open transactions)
**
** Parameters:
** qmgr_ctx -- QMGR context
**
** Returns:
** usual sm_error code
**
** Called by:
** ss_schedctaid()
** qmgr_ibdb_commit()
**
** Locking: qmgr_ctx->qmgr_optas must be locked by caller.
**
** Last code review: 2005-04-11 17:36:57; see comments
** Last code change:
*/
/*
** fixme: Todo better error handling:
** If commit fails, tell the open transactions about it.
** Note: SMTPS can deal with that because it has a timeout.
** What about the storage in IQDB? It must be free()d.
*/
static sm_ret_T
qm_ibdb_commit(qmgr_ctx_P qmgr_ctx)
{
uint i, ntasks;
int r;
bool notified;
sm_ret_T ret, commit_status;
qss_opta_P qss_optas;
qss_ta_P qss_ta;
sm_rcbe_P rcbe;
sm_evthr_task_P tasks[MAX_GEN_LI_FD];
sessta_id_T taid;
SM_IS_QMGR_CTX(qmgr_ctx);
qss_optas = qmgr_ctx->qmgr_optas;
SM_IS_QS_OPTAS(qss_optas);
/* should be checked by caller already */
if (FSQ_IS_EMPTY(qss_optas->qot_cur))
return SM_SUCCESS; /* different value? */
rcbe = NULL;
commit_status = SMTP_R_OK;
/* commit ibdb, notify open transactions */
ret = ibdb_commit(qmgr_ctx->qmgr_ibdb);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SMTPS, QM_LMOD_IBDB,
SM_LOG_ERR, 4,
"sev=ERROR, func=qm_ibdb_commit, ibdb_commit=%m",
ret);
/*
** XXX Set status to "TEMPFAIL" instead and notify SMTPS tasks?
** XXX check ret and throttle SMTPS?
*/
commit_status = SMTP_R_TEMP; /* XXX not used at error: */
goto error;
}
QM_LEV_DPRINTFC(QDC_IBDB, 1, (QM_DEBFP, "sev=DBG, func=qm_ibdb_commit, going through open transactions=%u\n", qss_optas->qot_cur));
ntasks = 0;
while (!FSQ_IS_EMPTY(qss_optas->qot_cur)) {
qss_ta = FSQ_GET(qss_optas->qot_max, qss_optas->qot_first,
qss_optas->qot_cur, qss_optas->qot_tas, i);
SM_IS_QS_TA(qss_ta);
r = pthread_mutex_lock(&qss_ta->qssta_mutex);
SM_LOCK_OK(r);
if (r != 0) {
/*
** This is not supposed to happen. abort?
** Put it back and give up.
*/
FSQ_APPEND(qss_optas->qot_max, qss_optas->qot_last,
qss_optas->qot_cur, qss_optas->qot_tas, qss_ta);
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SMTPS, QM_LMOD_IBDB,
SM_LOG_CRIT, 0,
"sev=CRIT, func=qm_ibdb_commit, lock=%d, qss_ta=%p, status=giving_up"
, r, qss_ta);
goto errnotify;
}
QM_LEV_DPRINTFC(QDC_IBDB, 1, (QM_DEBFP, "sev=DBG, func=qm_ibdb_commit, qss_ta=%p, cdb=%C, idx=%u\n", qss_ta, qss_ta->qssta_cdb_id, qss_optas->qot_cur));
ret = qm_rcbe_new(qmgr_ctx, &rcbe, -1);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SMTPS, QM_LMOD_IBDB,
SM_LOG_ERR, 4,
"sev=ERROR, func=qm_ibdb_commit, qm_rcbe_new=%m",
ret);
goto errunl;
}
/* check whether we had this task before */
notified = false;
for (i = 0; i < ntasks && !notified; i++) {
notified = (tasks[i] ==
qss_ta->qssta_ssctx->qss_com.rcbcom_tsk);
}
if (!notified) {
SM_ASSERT(ntasks < MAX_GEN_LI_FD);
tasks[ntasks++] = qss_ta->qssta_ssctx->qss_com.rcbcom_tsk;
}
ret = qm_2ss_ctaid(qss_ta->qssta_ssctx, qss_ta, rcbe, commit_status);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SMTPS, QM_LMOD_IBDB,
SM_LOG_ERR, 4,
"sev=ERROR, func=qm_ibdb_commit, qm_2ss_ctaid=%m",
ret);
goto errunl;
}
ret = sm_rcbcom_endrep(&qss_ta->qssta_ssctx->qss_com,
qss_ta->qssta_ssctx->qss_com.rcbcom_tsk,
true /* HACK */, &rcbe);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SMTPS, QM_LMOD_IBDB,
SM_LOG_ERR, 4,
"sev=ERROR, func=qm_ibdb_commit, qss_ta=%p, cdb=%C, sm_rcbcom_endrep=%m"
, qss_ta, qss_ta->qssta_cdb_id, ret);
goto errunl;
}
QM_LEV_DPRINTFC(QDC_IBDB, 1, (QM_DEBFP, "sev=DBG, func=qm_ibdb_commit, qss_ta=%p, cdb=%C, OK\n", qss_ta, qss_ta->qssta_cdb_id));
/* save TA ID for logging purposes */
SESSTA_COPY(taid, qss_ta->qssta_id);
QSS_TA_SET_FLAG(qss_ta, QSS_TA_FL_OPTA_RM);
/* unlocks qssta_mutex */
ret = qss_ta_free(qss_ta, true, QSS_TA_FREE_OP, 0);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SMTPS, QM_LMOD_IBDB,
SM_LOG_ERR, 4,
"sev=ERROR, func=qm_ibdb_commit, ss_ta=%s, qss_ta_free=%d",
taid, ret);
/* XXX Do nothing? */
}
else {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SMTPS, QM_LMOD_IBDB,
SM_LOG_INFO, 8,
"sev=INFO, func=qm_ibdb_commit, ss_ta=%s, status=accepted",
taid);
}
}
QM_LEV_DPRINTFC(QDC_IBDB, 4, (QM_DEBFP, "sev=DBG, func=qm_ibdb_commit, went through open transactions=%u\n", qss_optas->qot_cur));
/* notify tasks to send out commit confirmations */
for (i = 0; i < ntasks; i++) {
QM_LEV_DPRINTFC(QDC_IBDB, 7, (QM_DEBFP, "sev=DBG, func=qm_ibdb_commit, enable task=%p\n", tasks[i]));
ret = evthr_en_wr(tasks[i]);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SMTPS, QM_LMOD_IBDB,
SM_LOG_ERR, 3,
"sev=ERROR, func=qm_ibdb_commit, task=%p, evthr_en_wr=%m"
, tasks[i], ret);
}
else
QM_LEV_DPRINTFC(QDC_IBDB, 6, (QM_DEBFP, "sev=DBG, func=qm_ibdb_commit, enable task=%p done\n", tasks[i]));
}
return SM_SUCCESS;
errunl:
r = pthread_mutex_unlock(&qss_ta->qssta_mutex);
if (r != 0) {
QM_LEV_DPRINTFC(QDC_IBDB, 0, (QM_DEBFP, "sev=ERROR, func=qm_ibdb_commit, qss_ta=%p unlock=%d\n", qss_ta, r));
}
else
QM_LEV_DPRINTFC(QDC_IBDB, 6, (QM_DEBFP, "sev=DBG, func=qm_ibdb_commit, qss_ta=%p, unlocked=%d\n", qss_ta, r));
errnotify:
/* notify tasks nevertheless */
for (i = 0; i < ntasks; i++) {
ret = evthr_en_wr(tasks[i]);
if (sm_is_err(ret))
QM_LEV_DPRINTFC(QDC_IBDB, 0, (QM_DEBFP, "sev=ERROR, func=qm_ibdb_commit, task=%p, evthr_en_wr=%r\n", tasks[i], ret));
}
error:
if (rcbe != NULL)
sm_rcbe_free(rcbe);
return ret;
}
/*
** QM_SS_SCHEDCTAID -- schedule reply for CTAID (close transaction)
** append qss_ta to list of open transactions,
** if list is full: commit existing open transactions (disk I/O!),
** otherwise just trigger the task.
**
** Parameters:
** qmgr_ctx -- QMGR context
** qss_ta -- transaction
**
** Returns:
** usual sm_error code
** QSS_TA_FL_OPTA is set in qss_ta if it has been appended.
**
** Locking: locks qmgr_optas during operation.
**
** Called by: qm_fr_ss_react
**
** Last code review: 2005-04-11 17:41:11; see comments
** Last code change:
*/
sm_ret_T
qm_ss_schedctaid(qmgr_ctx_P qmgr_ctx, qss_ta_P qss_ta)
{
int r;
sm_ret_T ret;
qss_opta_P qss_optas;
timeval_T curt, sleept, delay;
SM_IS_QMGR_CTX(qmgr_ctx);
qss_optas = qmgr_ctx->qmgr_optas;
SM_IS_QS_OPTAS(qss_optas);
r = pthread_mutex_lock(&qss_optas->qot_mutex);
SM_LOCK_OK(r);
if (r != 0) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SMTPS, QM_LMOD_IBDB,
SM_LOG_CRIT, 0,
"sev=CRIT, func=qm_ss_schedctaid, lock=%d",
r);
ret = sm_error_temp(SM_EM_Q_IBDB, r);
goto error;
}
if (FSQ_IS_FULL(qss_optas->qot_max, qss_optas->qot_cur)) {
ret = qm_ibdb_commit(qmgr_ctx);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SMTPS, QM_LMOD_IBDB,
SM_LOG_ERR, 3,
"sev=ERROR, func=qm_ss_schedctaid, qm_ibdb_commit=%m",
ret);
goto errunl;
}
else
QM_LEV_DPRINTFC(QDC_IBDB, 3, (QM_DEBFP, "sev=DBG, func=qm_ss_schedctaid, qm_ibdb_commit=succesful\n"));
}
FSQ_APPEND(qss_optas->qot_max, qss_optas->qot_last, qss_optas->qot_cur,
qss_optas->qot_tas, qss_ta);
QSS_TA_SET_FLAG(qss_ta, QSS_TA_FL_OPTA);
r = pthread_mutex_unlock(&qss_optas->qot_mutex);
if (r != 0) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SMTPS, QM_LMOD_IBDB,
SM_LOG_CRIT, 0,
"sev=CRIT, func=qm_ss_schedctaid, unlock=%d",
r);
SM_ASSERT(r == 0);
ret = sm_error_temp(SM_EM_Q_IBDB, r);
goto error;
}
/* Trigger commit, i.e., wake up commit task. */
ret = evthr_timeval(qmgr_ctx->qmgr_ev_ctx, &curt);
SM_ASSERT(ret == SM_SUCCESS);
delay.tv_usec = qmgr_ctx->qmgr_cnf.q_cnf_ibdb_delay;
delay.tv_sec = 0;
timeradd(&curt, &delay, &sleept);
ret = evthr_new_sl(qmgr_ctx->qmgr_icommit, sleept, false);
/*
** fixme: access to qmgr_icommit is not protected, this might be wrong,
** hence it is only used for debug output.
*/
if (timercmp(&sleept, &qmgr_ctx->qmgr_icommit->evthr_t_sleep, <)) {
QM_LEV_DPRINTFC(QDC_IBDB, 3, (QM_DEBFP, "sev=DBG, func=qm_ss_schedctaid, old sleep=%ld.%06ld, new time=%ld.%06ld, evthr_new_sl=%r, cdb=%C, tsk=%p\n"
, qmgr_ctx->qmgr_icommit->evthr_t_sleep.tv_sec
, qmgr_ctx->qmgr_icommit->evthr_t_sleep.tv_usec
, sleept.tv_sec, sleept.tv_usec, ret
, qss_ta->qssta_cdb_id, qmgr_ctx->qmgr_icommit));
}
return SM_SUCCESS;
errunl:
r = pthread_mutex_unlock(&qss_optas->qot_mutex);
SM_ASSERT(r == 0);
if (r != 0 && sm_is_success(ret))
ret = sm_error_perm(SM_EM_Q_IBDB, r);
error:
return ret;
}
/*
** QMGR_IBDB_COMMIT -- task to periodically commit ibdb records
**
** Parameters:
** tsk -- evthr task
**
** Returns:
** usual sm_error code
**
** Locking: locks qmgr_optas during operation.
**
** Last code review: 2003-10-24 16:09:23, see comments below.
*/
sm_ret_T
qmgr_ibdb_commit(sm_evthr_task_P tsk)
{
int r;
sm_ret_T ret;
qmgr_ctx_P qmgr_ctx;
qss_opta_P qss_optas;
timeval_T sleept, delay;
SM_IS_EVTHR_TSK(tsk);
qmgr_ctx = (qmgr_ctx_P) tsk->evthr_t_actx;
SM_IS_QMGR_CTX(qmgr_ctx);
qss_optas = qmgr_ctx->qmgr_optas;
SM_IS_QS_OPTAS(qss_optas);
#if 0
r = gettimeofday(&sleept, NULL); /* fixme: check r */
#else
ret = evthr_timeval(qmgr_ctx->qmgr_ev_ctx, &sleept); /* XXX check ret */
#endif
QM_LEV_DPRINTFC(QDC_IBDB, 4, (QM_DEBFP, "time=%ld.%06ld, func=qmgr_ibdb_commit\n", sleept.tv_sec, sleept.tv_usec));
r = pthread_mutex_lock(&qss_optas->qot_mutex);
SM_LOCK_OK(r);
if (r != 0) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SMTPS, QM_LMOD_IBDB,
SM_LOG_CRIT, 0,
"sev=CRIT, func=qmgr_ibdb_commit, lock=%d",
r);
goto error;
}
/* XXX just wait till someone wakes this up? */
if (FSQ_IS_EMPTY(qss_optas->qot_cur)) {
/*
** This isn't good: don't wake up unless necessary!
** If there's no incoming mail, this task should not run.
** The task is woken up by qm_ss_schedctaid(), hence it
** could be a really long delay here.
*/
delay.tv_sec = 300;
delay.tv_usec = 0;
/* IBDB keeps track of changes, it's ok to always call this */
ret = ibdb_commit(qmgr_ctx->qmgr_ibdb);
if (sm_is_err(ret)) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SMTPS, QM_LMOD_IBDB,
SM_LOG_ERR, 3,
"sev=ERROR, func=qmgr_ibdb_commit, ibdb_commit=%m",
ret);
}
}
else {
ret = qm_ibdb_commit(qmgr_ctx);
QM_LEV_DPRINTFC(QDC_IBDB, 1, (QM_DEBFP, "time=%ld.%06ld, func=qmgr_ibdb_commit, ret=%r\n", sleept.tv_sec, sleept.tv_usec, ret));
if (sm_is_err(ret)) {
/* error already logged by qm_ibdb_commit() */
goto errunl;
}
delay.tv_usec = qmgr_ctx->qmgr_cnf.q_cnf_ibdb_delay;
delay.tv_sec = 0;
}
timeradd(&sleept, &delay, &tsk->evthr_t_sleep);
r = pthread_mutex_unlock(&qss_optas->qot_mutex);
if (r != 0) {
sm_log_write(qmgr_ctx->qmgr_lctx,
QM_LCAT_SMTPS, QM_LMOD_IBDB,
SM_LOG_CRIT, 0,
"sev=CRIT, func=qmgr_ibdb_commit, unlock=%d", r);
SM_ASSERT(r == 0);
goto error; /* just in case assertions are turned off */
}
return EVTHR_SLPQ;
errunl:
r = pthread_mutex_unlock(&qss_optas->qot_mutex);
SM_ASSERT(r == 0);
error:
delay.tv_usec = 0;
delay.tv_sec = 1;
timeradd(&sleept, &delay, &tsk->evthr_t_sleep);
return EVTHR_SLPQ;
}
syntax highlighted by Code2HTML, v. 0.9.1