/*
 * Copyright (c) 2002-2005 Sendmail, Inc. and its suppliers.
 *	All rights reserved.
 *
 * By using this file, you agree to the terms and conditions set
 * forth in the LICENSE file which can be found at the top level of
 * the sendmail distribution.
 */

#include "sm/generic.h"
SM_RCSID("@(#)$Id: ibdb_commit.c,v 1.77 2007/06/24 02:52:57 ca Exp $")
#include "sm/error.h"
#include "sm/assert.h"
#include "sm/memops.h"
#include "sm/io.h"
#include "sm/rcb.h"
#include "sm/qmgr.h"
#include "sm/qmgr-int.h"
#include "sm/fxszq.h"
#include "qmgr.h"
#include "log.h"

/*
**  QM_IBDB_COMMIT -- commit ibdb records and notify SMTPS
**	(using list of open transactions)
**
**	Parameters:
**		qmgr_ctx -- QMGR context
**
**	Returns:
**		usual sm_error code
**
**	Called by:
**		ss_schedctaid()
**		qmgr_ibdb_commit()
**
**	Locking: qmgr_ctx->qmgr_optas must be locked by caller.
**
**	Last code review: 2005-04-11 17:36:57; see comments
**	Last code change:
*/

/*
**  fixme: Todo better error handling:
**	If commit fails, tell the open transactions about it.
**	Note: SMTPS can deal with that because it has a timeout.
**	What about the storage in IQDB? It must be free()d.
*/

static sm_ret_T
qm_ibdb_commit(qmgr_ctx_P qmgr_ctx)
{
	uint i, ntasks;
	int r;
	bool notified;
	sm_ret_T ret, commit_status;
	qss_opta_P qss_optas;
	qss_ta_P qss_ta;
	sm_rcbe_P rcbe;
	sm_evthr_task_P tasks[MAX_GEN_LI_FD];
	sessta_id_T taid;

	SM_IS_QMGR_CTX(qmgr_ctx);
	qss_optas = qmgr_ctx->qmgr_optas;
	SM_IS_QS_OPTAS(qss_optas);

	/* should be checked by caller already */
	if (FSQ_IS_EMPTY(qss_optas->qot_cur))
		return SM_SUCCESS;	/* different value? */
	rcbe = NULL;
	commit_status = SMTP_R_OK;

	/* commit ibdb, notify open transactions */
	ret = ibdb_commit(qmgr_ctx->qmgr_ibdb);
	if (sm_is_err(ret)) {
		sm_log_write(qmgr_ctx->qmgr_lctx,
			QM_LCAT_SMTPS, QM_LMOD_IBDB,
			SM_LOG_ERR, 4,
			"sev=ERROR, func=qm_ibdb_commit, ibdb_commit=%m",
			ret);

		/*
		**  XXX Set status to "TEMPFAIL" instead and notify SMTPS tasks?
		**  XXX check ret and throttle SMTPS?
		*/

		commit_status = SMTP_R_TEMP;	/* XXX not used at error: */
		goto error;
	}
	QM_LEV_DPRINTFC(QDC_IBDB, 1, (QM_DEBFP, "sev=DBG, func=qm_ibdb_commit, going through open transactions=%u\n", qss_optas->qot_cur));

	ntasks = 0;
	while (!FSQ_IS_EMPTY(qss_optas->qot_cur)) {
		qss_ta = FSQ_GET(qss_optas->qot_max, qss_optas->qot_first,
				qss_optas->qot_cur, qss_optas->qot_tas, i);
		SM_IS_QS_TA(qss_ta);
		r = pthread_mutex_lock(&qss_ta->qssta_mutex);
		SM_LOCK_OK(r);
		if (r != 0) {
			/*
			**  This is not supposed to happen. abort?
			**  Put it back and give up.
			*/

			FSQ_APPEND(qss_optas->qot_max, qss_optas->qot_last,
				qss_optas->qot_cur, qss_optas->qot_tas, qss_ta);
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_SMTPS, QM_LMOD_IBDB,
				SM_LOG_CRIT, 0,
				"sev=CRIT, func=qm_ibdb_commit, lock=%d, qss_ta=%p, status=giving_up"
				, r, qss_ta);
			goto errnotify;
		}
		QM_LEV_DPRINTFC(QDC_IBDB, 1, (QM_DEBFP, "sev=DBG, func=qm_ibdb_commit, qss_ta=%p, cdb=%C, idx=%u\n", qss_ta, qss_ta->qssta_cdb_id, qss_optas->qot_cur));
		ret = qm_rcbe_new(qmgr_ctx, &rcbe, -1);
		if (sm_is_err(ret)) {
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_SMTPS, QM_LMOD_IBDB,
				SM_LOG_ERR, 4,
				"sev=ERROR, func=qm_ibdb_commit, qm_rcbe_new=%m",
				ret);
			goto errunl;
		}

		/* check whether we had this task before */
		notified = false;
		for (i = 0; i < ntasks && !notified; i++) {
			notified = (tasks[i] ==
				qss_ta->qssta_ssctx->qss_com.rcbcom_tsk);
		}
		if (!notified) {
			SM_ASSERT(ntasks < MAX_GEN_LI_FD);
			tasks[ntasks++] = qss_ta->qssta_ssctx->qss_com.rcbcom_tsk;
		}

		ret = qm_2ss_ctaid(qss_ta->qssta_ssctx, qss_ta, rcbe, commit_status);
		if (sm_is_err(ret)) {
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_SMTPS, QM_LMOD_IBDB,
				SM_LOG_ERR, 4,
				"sev=ERROR, func=qm_ibdb_commit, qm_2ss_ctaid=%m",
				ret);
			goto errunl;
		}
		ret = sm_rcbcom_endrep(&qss_ta->qssta_ssctx->qss_com,
				qss_ta->qssta_ssctx->qss_com.rcbcom_tsk,
				true /* HACK */, &rcbe);
		if (sm_is_err(ret)) {
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_SMTPS, QM_LMOD_IBDB,
				SM_LOG_ERR, 4,
				"sev=ERROR, func=qm_ibdb_commit, qss_ta=%p, cdb=%C, sm_rcbcom_endrep=%m"
				, qss_ta, qss_ta->qssta_cdb_id, ret);
			goto errunl;
		}
		QM_LEV_DPRINTFC(QDC_IBDB, 1, (QM_DEBFP, "sev=DBG, func=qm_ibdb_commit, qss_ta=%p, cdb=%C, OK\n", qss_ta, qss_ta->qssta_cdb_id));

		/* save TA ID for logging purposes */
		SESSTA_COPY(taid, qss_ta->qssta_id);
		QSS_TA_SET_FLAG(qss_ta, QSS_TA_FL_OPTA_RM);

		/* unlocks qssta_mutex */
		ret = qss_ta_free(qss_ta, true, QSS_TA_FREE_OP, 0);
		if (sm_is_err(ret)) {
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_SMTPS, QM_LMOD_IBDB,
				SM_LOG_ERR, 4,
				"sev=ERROR, func=qm_ibdb_commit, ss_ta=%s, qss_ta_free=%d",
				taid, ret);
			/* XXX Do nothing? */
		}
		else {
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_SMTPS, QM_LMOD_IBDB,
				SM_LOG_INFO, 8,
				"sev=INFO, func=qm_ibdb_commit, ss_ta=%s, status=accepted",
				taid);
		}
	}
	QM_LEV_DPRINTFC(QDC_IBDB, 4, (QM_DEBFP, "sev=DBG, func=qm_ibdb_commit, went through open transactions=%u\n", qss_optas->qot_cur));

	/* notify tasks to send out commit confirmations */
	for (i = 0; i < ntasks; i++) {
		QM_LEV_DPRINTFC(QDC_IBDB, 7, (QM_DEBFP, "sev=DBG, func=qm_ibdb_commit, enable task=%p\n", tasks[i]));
		ret = evthr_en_wr(tasks[i]);
		if (sm_is_err(ret)) {
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_SMTPS, QM_LMOD_IBDB,
				SM_LOG_ERR, 3,
				"sev=ERROR, func=qm_ibdb_commit, task=%p, evthr_en_wr=%m"
				, tasks[i], ret);
		}
		else
			QM_LEV_DPRINTFC(QDC_IBDB, 6, (QM_DEBFP, "sev=DBG, func=qm_ibdb_commit, enable task=%p done\n", tasks[i]));
	}
	return SM_SUCCESS;

  errunl:
	r = pthread_mutex_unlock(&qss_ta->qssta_mutex);
	if (r != 0) {
		QM_LEV_DPRINTFC(QDC_IBDB, 0, (QM_DEBFP, "sev=ERROR, func=qm_ibdb_commit, qss_ta=%p unlock=%d\n", qss_ta, r));
	}
	else
		QM_LEV_DPRINTFC(QDC_IBDB, 6, (QM_DEBFP, "sev=DBG, func=qm_ibdb_commit, qss_ta=%p, unlocked=%d\n", qss_ta, r));

  errnotify:
	/* notify tasks nevertheless */
	for (i = 0; i < ntasks; i++) {
		ret = evthr_en_wr(tasks[i]);
		if (sm_is_err(ret))
			QM_LEV_DPRINTFC(QDC_IBDB, 0, (QM_DEBFP, "sev=ERROR, func=qm_ibdb_commit, task=%p, evthr_en_wr=%r\n", tasks[i], ret));
	}

  error:
	if (rcbe != NULL)
		sm_rcbe_free(rcbe);
	return ret;
}

/*
**  QM_SS_SCHEDCTAID -- schedule reply for CTAID (close transaction)
**	append qss_ta to list of open transactions,
**	if list is full: commit existing open transactions (disk I/O!),
**	otherwise just trigger the task.
**
**	Parameters:
**		qmgr_ctx -- QMGR context
**		qss_ta -- transaction
**
**	Returns:
**		usual sm_error code
**		QSS_TA_FL_OPTA is set in qss_ta if it has been appended.
**
**	Locking: locks qmgr_optas during operation.
**
**	Called by: qm_fr_ss_react
**
**	Last code review: 2005-04-11 17:41:11; see comments
**	Last code change:
*/

sm_ret_T
qm_ss_schedctaid(qmgr_ctx_P qmgr_ctx, qss_ta_P qss_ta)
{
	int r;
	sm_ret_T ret;
	qss_opta_P qss_optas;
	timeval_T curt, sleept, delay;

	SM_IS_QMGR_CTX(qmgr_ctx);
	qss_optas = qmgr_ctx->qmgr_optas;
	SM_IS_QS_OPTAS(qss_optas);

	r = pthread_mutex_lock(&qss_optas->qot_mutex);
	SM_LOCK_OK(r);
	if (r != 0) {
		sm_log_write(qmgr_ctx->qmgr_lctx,
			QM_LCAT_SMTPS, QM_LMOD_IBDB,
			SM_LOG_CRIT, 0,
			"sev=CRIT, func=qm_ss_schedctaid, lock=%d",
			r);
		ret = sm_error_temp(SM_EM_Q_IBDB, r);
		goto error;
	}

	if (FSQ_IS_FULL(qss_optas->qot_max, qss_optas->qot_cur)) {
		ret = qm_ibdb_commit(qmgr_ctx);
		if (sm_is_err(ret)) {
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_SMTPS, QM_LMOD_IBDB,
				SM_LOG_ERR, 3,
				"sev=ERROR, func=qm_ss_schedctaid, qm_ibdb_commit=%m",
				ret);
			goto errunl;
		}
		else
			QM_LEV_DPRINTFC(QDC_IBDB, 3, (QM_DEBFP, "sev=DBG, func=qm_ss_schedctaid, qm_ibdb_commit=succesful\n"));
	}

	FSQ_APPEND(qss_optas->qot_max, qss_optas->qot_last, qss_optas->qot_cur,
			qss_optas->qot_tas, qss_ta);
	QSS_TA_SET_FLAG(qss_ta, QSS_TA_FL_OPTA);

	r = pthread_mutex_unlock(&qss_optas->qot_mutex);
	if (r != 0) {
		sm_log_write(qmgr_ctx->qmgr_lctx,
			QM_LCAT_SMTPS, QM_LMOD_IBDB,
			SM_LOG_CRIT, 0,
			"sev=CRIT, func=qm_ss_schedctaid, unlock=%d",
			r);
		SM_ASSERT(r == 0);
		ret = sm_error_temp(SM_EM_Q_IBDB, r);
		goto error;
	}

	/* Trigger commit, i.e., wake up commit task. */
	ret = evthr_timeval(qmgr_ctx->qmgr_ev_ctx, &curt);
	SM_ASSERT(ret == SM_SUCCESS);
	delay.tv_usec = qmgr_ctx->qmgr_cnf.q_cnf_ibdb_delay;
	delay.tv_sec = 0;
	timeradd(&curt, &delay, &sleept);
	ret = evthr_new_sl(qmgr_ctx->qmgr_icommit, sleept, false);

	/*
	**  fixme: access to qmgr_icommit is not protected, this might be wrong,
	**	hence it is only used for debug output.
	*/

	if (timercmp(&sleept, &qmgr_ctx->qmgr_icommit->evthr_t_sleep, <)) {
		QM_LEV_DPRINTFC(QDC_IBDB, 3, (QM_DEBFP, "sev=DBG, func=qm_ss_schedctaid, old sleep=%ld.%06ld, new time=%ld.%06ld, evthr_new_sl=%r, cdb=%C, tsk=%p\n"
			, qmgr_ctx->qmgr_icommit->evthr_t_sleep.tv_sec
			, qmgr_ctx->qmgr_icommit->evthr_t_sleep.tv_usec
			, sleept.tv_sec, sleept.tv_usec, ret
			, qss_ta->qssta_cdb_id, qmgr_ctx->qmgr_icommit));
	}
	return SM_SUCCESS;

  errunl:
	r = pthread_mutex_unlock(&qss_optas->qot_mutex);
	SM_ASSERT(r == 0);
	if (r != 0 && sm_is_success(ret))
		ret = sm_error_perm(SM_EM_Q_IBDB, r);
  error:
	return ret;
}

/*
**  QMGR_IBDB_COMMIT -- task to periodically commit ibdb records
**
**	Parameters:
**		tsk -- evthr task
**
**	Returns:
**		usual sm_error code
**
**	Locking: locks qmgr_optas during operation.
**
**	Last code review: 2003-10-24 16:09:23, see comments below.
*/

sm_ret_T
qmgr_ibdb_commit(sm_evthr_task_P tsk)
{
	int r;
	sm_ret_T ret;
	qmgr_ctx_P qmgr_ctx;
	qss_opta_P qss_optas;
	timeval_T sleept, delay;

	SM_IS_EVTHR_TSK(tsk);
	qmgr_ctx = (qmgr_ctx_P) tsk->evthr_t_actx;
	SM_IS_QMGR_CTX(qmgr_ctx);
	qss_optas = qmgr_ctx->qmgr_optas;
	SM_IS_QS_OPTAS(qss_optas);

#if 0
	r = gettimeofday(&sleept, NULL);	/* fixme: check r */
#else
	ret = evthr_timeval(qmgr_ctx->qmgr_ev_ctx, &sleept); /* XXX check ret */
#endif
	QM_LEV_DPRINTFC(QDC_IBDB, 4, (QM_DEBFP, "time=%ld.%06ld, func=qmgr_ibdb_commit\n", sleept.tv_sec, sleept.tv_usec));

	r = pthread_mutex_lock(&qss_optas->qot_mutex);
	SM_LOCK_OK(r);
	if (r != 0) {
		sm_log_write(qmgr_ctx->qmgr_lctx,
			QM_LCAT_SMTPS, QM_LMOD_IBDB,
			SM_LOG_CRIT, 0,
			"sev=CRIT, func=qmgr_ibdb_commit, lock=%d",
			r);
		goto error;
	}

	/* XXX just wait till someone wakes this up? */
	if (FSQ_IS_EMPTY(qss_optas->qot_cur)) {
		/*
		**  This isn't good: don't wake up unless necessary!
		**  If there's no incoming mail, this task should not run.
		**  The task is woken up by qm_ss_schedctaid(), hence it
		**  could be a really long delay here.
		*/

		delay.tv_sec = 300;
		delay.tv_usec = 0;

		/* IBDB keeps track of changes, it's ok to always call this */
		ret = ibdb_commit(qmgr_ctx->qmgr_ibdb);
		if (sm_is_err(ret)) {
			sm_log_write(qmgr_ctx->qmgr_lctx,
				QM_LCAT_SMTPS, QM_LMOD_IBDB,
				SM_LOG_ERR, 3,
				"sev=ERROR, func=qmgr_ibdb_commit, ibdb_commit=%m",
				ret);
		}
	}
	else {
		ret = qm_ibdb_commit(qmgr_ctx);
		QM_LEV_DPRINTFC(QDC_IBDB, 1, (QM_DEBFP, "time=%ld.%06ld, func=qmgr_ibdb_commit, ret=%r\n", sleept.tv_sec, sleept.tv_usec, ret));
		if (sm_is_err(ret)) {
			/* error already logged by qm_ibdb_commit() */
			goto errunl;
		}

		delay.tv_usec = qmgr_ctx->qmgr_cnf.q_cnf_ibdb_delay;
		delay.tv_sec = 0;
	}
	timeradd(&sleept, &delay, &tsk->evthr_t_sleep);

	r = pthread_mutex_unlock(&qss_optas->qot_mutex);
	if (r != 0) {
		sm_log_write(qmgr_ctx->qmgr_lctx,
			QM_LCAT_SMTPS, QM_LMOD_IBDB,
			SM_LOG_CRIT, 0,
			"sev=CRIT, func=qmgr_ibdb_commit, unlock=%d", r);
		SM_ASSERT(r == 0);
		goto error;	/* just in case assertions are turned off */
	}

	return EVTHR_SLPQ;

  errunl:
	r = pthread_mutex_unlock(&qss_optas->qot_mutex);
	SM_ASSERT(r == 0);
  error:
	delay.tv_usec = 0;
	delay.tv_sec = 1;
	timeradd(&sleept, &delay, &tsk->evthr_t_sleep);
	return EVTHR_SLPQ;
}


syntax highlighted by Code2HTML, v. 0.9.1