/* * Copyright (c) 2002-2005 Sendmail, Inc. and its suppliers. * All rights reserved. * * By using this file, you agree to the terms and conditions set * forth in the LICENSE file which can be found at the top level of * the sendmail distribution. */ #include "sm/generic.h" SM_RCSID("@(#)$Id: ibdb_commit.c,v 1.77 2007/06/24 02:52:57 ca Exp $") #include "sm/error.h" #include "sm/assert.h" #include "sm/memops.h" #include "sm/io.h" #include "sm/rcb.h" #include "sm/qmgr.h" #include "sm/qmgr-int.h" #include "sm/fxszq.h" #include "qmgr.h" #include "log.h" /* ** QM_IBDB_COMMIT -- commit ibdb records and notify SMTPS ** (using list of open transactions) ** ** Parameters: ** qmgr_ctx -- QMGR context ** ** Returns: ** usual sm_error code ** ** Called by: ** ss_schedctaid() ** qmgr_ibdb_commit() ** ** Locking: qmgr_ctx->qmgr_optas must be locked by caller. ** ** Last code review: 2005-04-11 17:36:57; see comments ** Last code change: */ /* ** fixme: Todo better error handling: ** If commit fails, tell the open transactions about it. ** Note: SMTPS can deal with that because it has a timeout. ** What about the storage in IQDB? It must be free()d. */ static sm_ret_T qm_ibdb_commit(qmgr_ctx_P qmgr_ctx) { uint i, ntasks; int r; bool notified; sm_ret_T ret, commit_status; qss_opta_P qss_optas; qss_ta_P qss_ta; sm_rcbe_P rcbe; sm_evthr_task_P tasks[MAX_GEN_LI_FD]; sessta_id_T taid; SM_IS_QMGR_CTX(qmgr_ctx); qss_optas = qmgr_ctx->qmgr_optas; SM_IS_QS_OPTAS(qss_optas); /* should be checked by caller already */ if (FSQ_IS_EMPTY(qss_optas->qot_cur)) return SM_SUCCESS; /* different value? */ rcbe = NULL; commit_status = SMTP_R_OK; /* commit ibdb, notify open transactions */ ret = ibdb_commit(qmgr_ctx->qmgr_ibdb); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SMTPS, QM_LMOD_IBDB, SM_LOG_ERR, 4, "sev=ERROR, func=qm_ibdb_commit, ibdb_commit=%m", ret); /* ** XXX Set status to "TEMPFAIL" instead and notify SMTPS tasks? ** XXX check ret and throttle SMTPS? */ commit_status = SMTP_R_TEMP; /* XXX not used at error: */ goto error; } QM_LEV_DPRINTFC(QDC_IBDB, 1, (QM_DEBFP, "sev=DBG, func=qm_ibdb_commit, going through open transactions=%u\n", qss_optas->qot_cur)); ntasks = 0; while (!FSQ_IS_EMPTY(qss_optas->qot_cur)) { qss_ta = FSQ_GET(qss_optas->qot_max, qss_optas->qot_first, qss_optas->qot_cur, qss_optas->qot_tas, i); SM_IS_QS_TA(qss_ta); r = pthread_mutex_lock(&qss_ta->qssta_mutex); SM_LOCK_OK(r); if (r != 0) { /* ** This is not supposed to happen. abort? ** Put it back and give up. */ FSQ_APPEND(qss_optas->qot_max, qss_optas->qot_last, qss_optas->qot_cur, qss_optas->qot_tas, qss_ta); sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SMTPS, QM_LMOD_IBDB, SM_LOG_CRIT, 0, "sev=CRIT, func=qm_ibdb_commit, lock=%d, qss_ta=%p, status=giving_up" , r, qss_ta); goto errnotify; } QM_LEV_DPRINTFC(QDC_IBDB, 1, (QM_DEBFP, "sev=DBG, func=qm_ibdb_commit, qss_ta=%p, cdb=%C, idx=%u\n", qss_ta, qss_ta->qssta_cdb_id, qss_optas->qot_cur)); ret = qm_rcbe_new(qmgr_ctx, &rcbe, -1); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SMTPS, QM_LMOD_IBDB, SM_LOG_ERR, 4, "sev=ERROR, func=qm_ibdb_commit, qm_rcbe_new=%m", ret); goto errunl; } /* check whether we had this task before */ notified = false; for (i = 0; i < ntasks && !notified; i++) { notified = (tasks[i] == qss_ta->qssta_ssctx->qss_com.rcbcom_tsk); } if (!notified) { SM_ASSERT(ntasks < MAX_GEN_LI_FD); tasks[ntasks++] = qss_ta->qssta_ssctx->qss_com.rcbcom_tsk; } ret = qm_2ss_ctaid(qss_ta->qssta_ssctx, qss_ta, rcbe, commit_status); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SMTPS, QM_LMOD_IBDB, SM_LOG_ERR, 4, "sev=ERROR, func=qm_ibdb_commit, qm_2ss_ctaid=%m", ret); goto errunl; } ret = sm_rcbcom_endrep(&qss_ta->qssta_ssctx->qss_com, qss_ta->qssta_ssctx->qss_com.rcbcom_tsk, true /* HACK */, &rcbe); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SMTPS, QM_LMOD_IBDB, SM_LOG_ERR, 4, "sev=ERROR, func=qm_ibdb_commit, qss_ta=%p, cdb=%C, sm_rcbcom_endrep=%m" , qss_ta, qss_ta->qssta_cdb_id, ret); goto errunl; } QM_LEV_DPRINTFC(QDC_IBDB, 1, (QM_DEBFP, "sev=DBG, func=qm_ibdb_commit, qss_ta=%p, cdb=%C, OK\n", qss_ta, qss_ta->qssta_cdb_id)); /* save TA ID for logging purposes */ SESSTA_COPY(taid, qss_ta->qssta_id); QSS_TA_SET_FLAG(qss_ta, QSS_TA_FL_OPTA_RM); /* unlocks qssta_mutex */ ret = qss_ta_free(qss_ta, true, QSS_TA_FREE_OP, 0); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SMTPS, QM_LMOD_IBDB, SM_LOG_ERR, 4, "sev=ERROR, func=qm_ibdb_commit, ss_ta=%s, qss_ta_free=%d", taid, ret); /* XXX Do nothing? */ } else { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SMTPS, QM_LMOD_IBDB, SM_LOG_INFO, 8, "sev=INFO, func=qm_ibdb_commit, ss_ta=%s, status=accepted", taid); } } QM_LEV_DPRINTFC(QDC_IBDB, 4, (QM_DEBFP, "sev=DBG, func=qm_ibdb_commit, went through open transactions=%u\n", qss_optas->qot_cur)); /* notify tasks to send out commit confirmations */ for (i = 0; i < ntasks; i++) { QM_LEV_DPRINTFC(QDC_IBDB, 7, (QM_DEBFP, "sev=DBG, func=qm_ibdb_commit, enable task=%p\n", tasks[i])); ret = evthr_en_wr(tasks[i]); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SMTPS, QM_LMOD_IBDB, SM_LOG_ERR, 3, "sev=ERROR, func=qm_ibdb_commit, task=%p, evthr_en_wr=%m" , tasks[i], ret); } else QM_LEV_DPRINTFC(QDC_IBDB, 6, (QM_DEBFP, "sev=DBG, func=qm_ibdb_commit, enable task=%p done\n", tasks[i])); } return SM_SUCCESS; errunl: r = pthread_mutex_unlock(&qss_ta->qssta_mutex); if (r != 0) { QM_LEV_DPRINTFC(QDC_IBDB, 0, (QM_DEBFP, "sev=ERROR, func=qm_ibdb_commit, qss_ta=%p unlock=%d\n", qss_ta, r)); } else QM_LEV_DPRINTFC(QDC_IBDB, 6, (QM_DEBFP, "sev=DBG, func=qm_ibdb_commit, qss_ta=%p, unlocked=%d\n", qss_ta, r)); errnotify: /* notify tasks nevertheless */ for (i = 0; i < ntasks; i++) { ret = evthr_en_wr(tasks[i]); if (sm_is_err(ret)) QM_LEV_DPRINTFC(QDC_IBDB, 0, (QM_DEBFP, "sev=ERROR, func=qm_ibdb_commit, task=%p, evthr_en_wr=%r\n", tasks[i], ret)); } error: if (rcbe != NULL) sm_rcbe_free(rcbe); return ret; } /* ** QM_SS_SCHEDCTAID -- schedule reply for CTAID (close transaction) ** append qss_ta to list of open transactions, ** if list is full: commit existing open transactions (disk I/O!), ** otherwise just trigger the task. ** ** Parameters: ** qmgr_ctx -- QMGR context ** qss_ta -- transaction ** ** Returns: ** usual sm_error code ** QSS_TA_FL_OPTA is set in qss_ta if it has been appended. ** ** Locking: locks qmgr_optas during operation. ** ** Called by: qm_fr_ss_react ** ** Last code review: 2005-04-11 17:41:11; see comments ** Last code change: */ sm_ret_T qm_ss_schedctaid(qmgr_ctx_P qmgr_ctx, qss_ta_P qss_ta) { int r; sm_ret_T ret; qss_opta_P qss_optas; timeval_T curt, sleept, delay; SM_IS_QMGR_CTX(qmgr_ctx); qss_optas = qmgr_ctx->qmgr_optas; SM_IS_QS_OPTAS(qss_optas); r = pthread_mutex_lock(&qss_optas->qot_mutex); SM_LOCK_OK(r); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SMTPS, QM_LMOD_IBDB, SM_LOG_CRIT, 0, "sev=CRIT, func=qm_ss_schedctaid, lock=%d", r); ret = sm_error_temp(SM_EM_Q_IBDB, r); goto error; } if (FSQ_IS_FULL(qss_optas->qot_max, qss_optas->qot_cur)) { ret = qm_ibdb_commit(qmgr_ctx); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SMTPS, QM_LMOD_IBDB, SM_LOG_ERR, 3, "sev=ERROR, func=qm_ss_schedctaid, qm_ibdb_commit=%m", ret); goto errunl; } else QM_LEV_DPRINTFC(QDC_IBDB, 3, (QM_DEBFP, "sev=DBG, func=qm_ss_schedctaid, qm_ibdb_commit=succesful\n")); } FSQ_APPEND(qss_optas->qot_max, qss_optas->qot_last, qss_optas->qot_cur, qss_optas->qot_tas, qss_ta); QSS_TA_SET_FLAG(qss_ta, QSS_TA_FL_OPTA); r = pthread_mutex_unlock(&qss_optas->qot_mutex); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SMTPS, QM_LMOD_IBDB, SM_LOG_CRIT, 0, "sev=CRIT, func=qm_ss_schedctaid, unlock=%d", r); SM_ASSERT(r == 0); ret = sm_error_temp(SM_EM_Q_IBDB, r); goto error; } /* Trigger commit, i.e., wake up commit task. */ ret = evthr_timeval(qmgr_ctx->qmgr_ev_ctx, &curt); SM_ASSERT(ret == SM_SUCCESS); delay.tv_usec = qmgr_ctx->qmgr_cnf.q_cnf_ibdb_delay; delay.tv_sec = 0; timeradd(&curt, &delay, &sleept); ret = evthr_new_sl(qmgr_ctx->qmgr_icommit, sleept, false); /* ** fixme: access to qmgr_icommit is not protected, this might be wrong, ** hence it is only used for debug output. */ if (timercmp(&sleept, &qmgr_ctx->qmgr_icommit->evthr_t_sleep, <)) { QM_LEV_DPRINTFC(QDC_IBDB, 3, (QM_DEBFP, "sev=DBG, func=qm_ss_schedctaid, old sleep=%ld.%06ld, new time=%ld.%06ld, evthr_new_sl=%r, cdb=%C, tsk=%p\n" , qmgr_ctx->qmgr_icommit->evthr_t_sleep.tv_sec , qmgr_ctx->qmgr_icommit->evthr_t_sleep.tv_usec , sleept.tv_sec, sleept.tv_usec, ret , qss_ta->qssta_cdb_id, qmgr_ctx->qmgr_icommit)); } return SM_SUCCESS; errunl: r = pthread_mutex_unlock(&qss_optas->qot_mutex); SM_ASSERT(r == 0); if (r != 0 && sm_is_success(ret)) ret = sm_error_perm(SM_EM_Q_IBDB, r); error: return ret; } /* ** QMGR_IBDB_COMMIT -- task to periodically commit ibdb records ** ** Parameters: ** tsk -- evthr task ** ** Returns: ** usual sm_error code ** ** Locking: locks qmgr_optas during operation. ** ** Last code review: 2003-10-24 16:09:23, see comments below. */ sm_ret_T qmgr_ibdb_commit(sm_evthr_task_P tsk) { int r; sm_ret_T ret; qmgr_ctx_P qmgr_ctx; qss_opta_P qss_optas; timeval_T sleept, delay; SM_IS_EVTHR_TSK(tsk); qmgr_ctx = (qmgr_ctx_P) tsk->evthr_t_actx; SM_IS_QMGR_CTX(qmgr_ctx); qss_optas = qmgr_ctx->qmgr_optas; SM_IS_QS_OPTAS(qss_optas); #if 0 r = gettimeofday(&sleept, NULL); /* fixme: check r */ #else ret = evthr_timeval(qmgr_ctx->qmgr_ev_ctx, &sleept); /* XXX check ret */ #endif QM_LEV_DPRINTFC(QDC_IBDB, 4, (QM_DEBFP, "time=%ld.%06ld, func=qmgr_ibdb_commit\n", sleept.tv_sec, sleept.tv_usec)); r = pthread_mutex_lock(&qss_optas->qot_mutex); SM_LOCK_OK(r); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SMTPS, QM_LMOD_IBDB, SM_LOG_CRIT, 0, "sev=CRIT, func=qmgr_ibdb_commit, lock=%d", r); goto error; } /* XXX just wait till someone wakes this up? */ if (FSQ_IS_EMPTY(qss_optas->qot_cur)) { /* ** This isn't good: don't wake up unless necessary! ** If there's no incoming mail, this task should not run. ** The task is woken up by qm_ss_schedctaid(), hence it ** could be a really long delay here. */ delay.tv_sec = 300; delay.tv_usec = 0; /* IBDB keeps track of changes, it's ok to always call this */ ret = ibdb_commit(qmgr_ctx->qmgr_ibdb); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SMTPS, QM_LMOD_IBDB, SM_LOG_ERR, 3, "sev=ERROR, func=qmgr_ibdb_commit, ibdb_commit=%m", ret); } } else { ret = qm_ibdb_commit(qmgr_ctx); QM_LEV_DPRINTFC(QDC_IBDB, 1, (QM_DEBFP, "time=%ld.%06ld, func=qmgr_ibdb_commit, ret=%r\n", sleept.tv_sec, sleept.tv_usec, ret)); if (sm_is_err(ret)) { /* error already logged by qm_ibdb_commit() */ goto errunl; } delay.tv_usec = qmgr_ctx->qmgr_cnf.q_cnf_ibdb_delay; delay.tv_sec = 0; } timeradd(&sleept, &delay, &tsk->evthr_t_sleep); r = pthread_mutex_unlock(&qss_optas->qot_mutex); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_SMTPS, QM_LMOD_IBDB, SM_LOG_CRIT, 0, "sev=CRIT, func=qmgr_ibdb_commit, unlock=%d", r); SM_ASSERT(r == 0); goto error; /* just in case assertions are turned off */ } return EVTHR_SLPQ; errunl: r = pthread_mutex_unlock(&qss_optas->qot_mutex); SM_ASSERT(r == 0); error: delay.tv_usec = 0; delay.tv_sec = 1; timeradd(&sleept, &delay, &tsk->evthr_t_sleep); return EVTHR_SLPQ; }