/*
 * Copyright (c) 2004-2006 Sendmail, Inc. and its suppliers.
 *      All rights reserved.
 *
 * By using this file, you agree to the terms and conditions set
 * forth in the LICENSE file which can be found at the top level of
 * the sendmail distribution.
 */

#include "sm/generic.h"
SM_RCSID("@(#)$Id: occ.c,v 1.33 2007/08/14 04:35:08 ca Exp $")
#include "sm/types.h"
#include "sm/assert.h"
#include "sm/magic.h"
#include "sm/str.h"
#include "sm/memops.h"
#include "sm/pthread.h"
#include "sm/evthr.h"
#include "sm/qmgr-int.h"
#include "sm/map.h"
#include "occ.h"

/*
**  Note: locking occ_ctx for all operations can cause locking contentions.
**  In most cases it should be sufficient to lock just occ_entry.
**  "exception": the lookup strings (occ_ctx->occx_lhs etc) must be protected
**  (alternatively local strings can be used again with the possible drawback
**  that an allocation may fail).
*/

/*
**  OCC_SESS_OPEN -- open a session in OCC
**
**	Parameters:
**		qmgr_ctx -- QMGR context (only qmgr_cnf is needed)
**		occ_ctx -- OCC context
**		srv_ipv4 -- IPv4 address of server (HACK)
**		pocc_entry -- pointer to OCC entry (output)
**		locktype -- kind of locking
**
**	Returns:
**		usual sm_error code; ENOMEM (see occ_entry_new())
**
**	Side Effects: none on error (except if unlock fails)
**
**	Locking: locks occ_ctx during operation if requested, returns unlocked
**
**	Last code review: 2005-03-17 02:23:27
**	Last code change:
*/

sm_ret_T
occ_sess_open(qmgr_ctx_P qmgr_ctx, occ_ctx_P occ_ctx, ipv4_T srv_ipv4, occ_entry_P *pocc_entry, thr_lock_T locktype)
{
#undef SMFCT
#define SMFCT	"occ_sess_open"
	sm_ret_T ret;
	int r;
	occ_entry_P occ_entry;
	time_T now;

	now = evthr_time(qmgr_ctx->qmgr_ev_ctx);
	if (thr_lock_it(locktype)) {
		r = smthread_mutex_lock(&occ_ctx->occx_mutex);
		SM_LOCK_OK(r);
		if (r != 0)
			return sm_error_perm(SM_EM_OCC, r);
	}

	/* find entry for IP address, if it doesn't exist: create new one */
	ret = occ_entry_find(occ_ctx->occx_ht, srv_ipv4, &occ_entry,
			&occ_ctx->occx_mutex, THR_NO_LOCK);
	if (sm_is_err(ret)) {
		ret = occ_entry_new(occ_ctx->occx_ht, &occ_ctx->occx_fl_hd, srv_ipv4,
				&occ_entry, &occ_ctx->occx_mutex, THR_NO_LOCK);
		if (sm_is_err(ret))
			goto error;
		SM_IS_OCCE(occ_entry);
		occ_entry->occe_init_conc = qmgr_ctx->qmgr_cnf.q_cnf_init_conc_conn;
		occ_entry->occe_cur_conc = qmgr_ctx->qmgr_cnf.q_cnf_init_conc_conn;
		occ_entry->occe_max_conc = qmgr_ctx->qmgr_cnf.q_cnf_max_conc_conn;

		if (qmgr_ctx->qmgr_conf_map != NULL) {
/* tags for conf db */
#define OCITAG "oci:"	/* outgoing connections initial */
#define OCMTAG "ocm:"	/* outgoing connections max */
#define OCTOTAG "octo:"	/* outgoing connection cache timeout */

			sm_str_clr(occ_ctx->occx_lhs);
			sm_str_clr(occ_ctx->occx_tag);
			sm_str_clr(occ_ctx->occx_rhs);
			sm_inet_ipv4str(srv_ipv4, occ_ctx->occx_lhs);
			if (sm_str_scat(occ_ctx->occx_tag, OCITAG) == SM_SUCCESS
			    && sm_map_lookup_ip(qmgr_ctx->qmgr_conf_map,
			                        occ_ctx->occx_lhs, occ_ctx->occx_tag,
			                        SMMAP_LFL_SUBNETS|SMMAP_LFL_TAG,
			                        occ_ctx->occx_rhs) == SM_SUCCESS
			    && sm_str_getlen(occ_ctx->occx_rhs) > 0)
			{
				ulong v;

				errno = 0;
				v = strtoul((char *)sm_str_getdata(occ_ctx->occx_rhs), NULL, 0);
				if (v != ULONG_MAX && errno != ERANGE)
					occ_entry->occe_cur_conc = occ_entry->occe_init_conc =
						(uint) v;
			}

			sm_str_clr(occ_ctx->occx_tag);
			sm_str_clr(occ_ctx->occx_rhs);
			if (sm_str_scat(occ_ctx->occx_tag, OCMTAG) == SM_SUCCESS
			    && sm_map_lookup_ip(qmgr_ctx->qmgr_conf_map,
			                        occ_ctx->occx_lhs, occ_ctx->occx_tag,
			                        SMMAP_LFL_SUBNETS|SMMAP_LFL_TAG,
			                        occ_ctx->occx_rhs) == SM_SUCCESS
			    && sm_str_getlen(occ_ctx->occx_rhs) > 0)
			{
				ulong v;

				errno = 0;
				v = strtoul((char *)sm_str_getdata(occ_ctx->occx_rhs), NULL, 0);
				if (v != ULONG_MAX && errno != ERANGE)
					occ_entry->occe_max_conc = (uint) v;
			}

			sm_str_clr(occ_ctx->occx_tag);
			sm_str_clr(occ_ctx->occx_rhs);
			if (sm_str_scat(occ_ctx->occx_tag, OCTOTAG) == SM_SUCCESS
			    && sm_map_lookup_ip(qmgr_ctx->qmgr_conf_map,
			                        occ_ctx->occx_lhs, occ_ctx->occx_tag,
			                        SMMAP_LFL_SUBNETS|SMMAP_LFL_TAG,
			                        occ_ctx->occx_rhs) == SM_SUCCESS
			    && sm_str_getlen(occ_ctx->occx_rhs) > 0)
			{
				ulong v;

				errno = 0;
				v = strtoul((char *)sm_str_getdata(occ_ctx->occx_rhs), NULL, 0);
				if (v != ULONG_MAX && errno != ERANGE)
					occ_entry->occe_timeout = (uint) v;
			}
			QM_LEV_DPRINTFC(QDC_OCC, 4, (QM_DEBFP, "sev=INFO, func=occ_sess_open, ipv4=%A, conc_init=%u, conc_max=%u\n", (ipv4_T) srv_ipv4, occ_entry->occe_cur_conc, occ_entry->occe_max_conc));
		}
	}
	else {
		SM_ASSERT(occ_entry != NULL);
		SM_ASSERT(occ_entry->occe_srv_ipv4 == srv_ipv4);
	}

	/* increment counters etc */
	++occ_entry->occe_open_se;
	++occ_entry->occe_open_ta;
#if 0
	occ_entry->occe_last_conn = now;
#endif
	occ_entry->occe_last_upd = now;
	QM_LEV_DPRINTFC(QDC_OCC, 4, (QM_DEBFP, "sev=INFO, func=occ_sess_open, occ_entry=%p, ipv4=%A, open_se=%u, open_ta=%u, occ_entry=%p, flags=%x\n", occ_entry, (ipv4_T) occ_entry->occe_srv_ipv4, occ_entry->occe_open_se, occ_entry->occe_open_ta, occ_entry, occ_entry->occe_flags));

	if (pocc_entry != NULL)
		*pocc_entry = occ_entry;
	if (thr_unl_no_err(locktype)) {
		r = smthread_mutex_unlock(&occ_ctx->occx_mutex);
		SM_ASSERT(r == 0);
		if (r != 0)
			return sm_error_perm(SM_EM_OCC, r);
	}
	return SM_SUCCESS;

  error:
	if (thr_unl_if_err(locktype)) {
		r = smthread_mutex_unlock(&occ_ctx->occx_mutex);
		SM_ASSERT(r == 0);
		if (r != 0 && sm_is_success(ret))
			ret = sm_error_perm(SM_EM_OCC, r);
	}
	return ret;
}

/*
**  OCC_SESS_CLOSE_ENTRY -- close a session in OCC
**
**	Parameters:
**		occ_ctx -- OCC context
**		srv_ipv4 -- IPv4 address of server (HACK)
**		ok -- session was ok?
**		now -- current time
**		pflags -- flags of session (output, may be NULL)
**		locktype -- kind of locking
**
**	Returns:
**		usual sm_error code; SM_E_NOTFOUND, (un)lock errors
**
**	Side Effects: none on error (except if unlock fails)
**
**	Locking: locks occ_ctx if requested
**
**	Last code review: 2005-03-14 18:54:53
**	Last code change:
*/

sm_ret_T
occ_sess_close_entry(occ_ctx_P occ_ctx, ipv4_T srv_ipv4, bool ok, time_T now, uint32_t *pflags, thr_lock_T locktype)
{
#undef SMFCT
#define SMFCT	"occ_sess_close_entry"
	sm_ret_T ret;
	int r;
	uint32_t flags;
	occ_entry_P occ_entry;

	SM_IS_OCCX(occ_ctx);
	ret = SM_SUCCESS;
	flags = 0;
	if (thr_lock_it(locktype)) {
		r = smthread_mutex_lock(&occ_ctx->occx_mutex);
		SM_LOCK_OK(r);
		if (r != 0)
			return sm_error_perm(SM_EM_OCC, r);
	}

	/* access locked via occ_ctx */
	occ_entry = NULL;
	ret = occ_entry_find(occ_ctx->occx_ht, srv_ipv4,
			&occ_entry, &occ_ctx->occx_mutex, THR_NO_LOCK);
	if (SM_SUCCESS == ret) {
		SM_IS_OCCE(occ_entry);
		if (occ_entry->occe_open_se == 0) {
			QM_LEV_DPRINTFC(QDC_OCC, 0, (QM_DEBFP, "sev=ERROR, func=occ_sess_close_entry, ipv4=%A, open_se=0\n", occ_entry->occe_srv_ipv4));
		}
		else {
			QM_LEV_DPRINTFC(QDC_OCC, 3, (QM_DEBFP, "sev=DBG, func=occ_sess_close_entry, ipv4=%A, ok=%d, open_se=%u, cur_conc=%u, flags=%x\n", occ_entry->occe_srv_ipv4, ok, occ_entry->occe_open_se, occ_entry->occe_cur_conc, occ_entry->occe_flags));

			if (ok && occ_entry->occe_open_se >= occ_entry->occe_cur_conc) {
				if (occ_entry->occe_cur_conc < occ_entry->occe_max_conc)
					++occ_entry->occe_cur_conc;
			}
			else if (!ok) {
				if (occ_entry->occe_cur_conc > 0)
					--occ_entry->occe_cur_conc;
			}

			--occ_entry->occe_open_se;
			if (occ_entry->occe_open_se < occ_entry->occe_cur_conc) {
				flags |= OCCE_FL_BLW_LIM;
				if (OCCE_IS_FLAG(occ_entry, OCCE_FL_SE_WAIT)) {
					flags |= OCCE_FL_SE_WAIT;
					OCCE_CLR_FLAG(occ_entry, OCCE_FL_SE_WAIT);
				}
			}
		}
		if (occ_entry->occe_open_ta == 0) {
			QM_LEV_DPRINTFC(QDC_OCC, 3, (QM_DEBFP, "sev=ERROR, func=occ_sess_close_entry, ipv4=%A, open_ta=0\n", occ_entry->occe_srv_ipv4));
		}
		else
			occ_entry->occe_open_ta--;
		if (occ_entry->occe_open_se == 0) {
#if !DA_OCC_RSC
			bht_rm(occ_ctx->occx_ht, (char *)&occ_entry->occe_srv_ipv4,
				sizeof(occ_entry->occe_srv_ipv4), NULL, NULL);
			occ_entry_free(&occ_ctx->occx_fl_hd, occ_entry,
					&occ_ctx->occx_mutex, THR_NO_LOCK);
#endif /* !DA_OCC_RSC */
		}
	}
	else {
		QM_LEV_DPRINTFC(QDC_OCC, 4, (QM_DEBFP, "sev=ERROR, func=occ_sess_close_entry, ipv4=%A, occ_entry_find=%x\n", occ_entry->occe_srv_ipv4, ret));
	}

	if ((!sm_is_err(ret) && thr_unl_no_err(locktype))
	    || (sm_is_err(ret) && thr_unl_if_err(locktype)))
	{
		r = smthread_mutex_unlock(&occ_ctx->occx_mutex);
		SM_ASSERT(r == 0);
		if (r != 0 && sm_is_success(ret))
			ret = sm_error_perm(SM_EM_OCC, r);
	}
	if (pflags != NULL)
		*pflags = flags;
	return ret;
}

/*
**  OCC_SESS_REUSE -- reuse a session in OCC: increase number of open TAs
**
**	Parameters:
**		occ_ctx -- OCC context
**		occ_entry -- OCC entry
**		now -- current time
**		locktype -- kind of locking
**
**	Returns:
**		SM_SUCCESS except for (un)lock errors
**
**	Side Effects: none on error (except if unlock fails)
**
**	Locking: locks occ_ctx if requested
**
**	Last code review: 2005-03-14 18:56:05
**	Last code change:
*/

sm_ret_T
occ_sess_reuse(occ_ctx_P occ_ctx, occ_entry_P occ_entry, time_T now, thr_lock_T locktype)
{
#undef SMFCT
#define SMFCT "occ_sess_reuse"
	int r;

	SM_IS_OCCX(occ_ctx);
	SM_IS_OCCE(occ_entry);
	if (thr_lock_it(locktype)) {
		r = smthread_mutex_lock(&occ_ctx->occx_mutex);
		SM_LOCK_OK(r);
		if (r != 0) {
			/* SM_ASSERT(r == 0); */
			return sm_error_perm(SM_EM_OCC, r);
		}
	}

	++occ_entry->occe_open_ta;

	if (thr_unl_no_err(locktype)) {
		r = smthread_mutex_unlock(&occ_ctx->occx_mutex);
		SM_ASSERT(r == 0);
		if (r != 0)
			return sm_error_perm(SM_EM_OCC, r);
	}
	return SM_SUCCESS;
}

/*
**  OCC_TA_CLOSE_ENTRY -- close a transaction in OCC
**
**	Parameters:
**		occ_ctx -- OCC context
**		srv_ipv4 -- IPv4 address of server (HACK)
**		now -- current time
**		pflags -- flags of session (output, may be NULL)
**		locktype -- kind of locking
**
**	Returns:
**		usual sm_error code; SM_E_NOTFOUND, (un)lock errors
**
**	Side Effects: none on error (except if unlock fails)
**
**	Locking: locks occ_ctx if requested
**
**	Last code review: 2005-03-14 18:56:53
**	Last code change:
*/

sm_ret_T
occ_ta_close_entry(occ_ctx_P occ_ctx, ipv4_T srv_ipv4, time_T now, uint32_t *pflags, thr_lock_T locktype)
{
#undef SMFCT
#define SMFCT "occ_ta_close_entry"
	int r;
	sm_ret_T ret;
	uint32_t flags;
	occ_entry_P occ_entry;

	SM_IS_OCCX(occ_ctx);
	flags = 0;
	if (thr_lock_it(locktype)) {
		r = smthread_mutex_lock(&occ_ctx->occx_mutex);
		SM_LOCK_OK(r);
		if (r != 0)
			return sm_error_perm(SM_EM_OCC, r);
	}

	occ_entry = NULL;
	ret = occ_entry_find(occ_ctx->occx_ht, srv_ipv4,
			&occ_entry, &occ_ctx->occx_mutex, THR_NO_LOCK);
	if (SM_SUCCESS == ret) {
		SM_IS_OCCE(occ_entry);
		if (occ_entry->occe_open_ta == 0)
			QM_LEV_DPRINTFC(QDC_OCC, 4, (QM_DEBFP, "sev=ERROR, func=occ_ta_close_entry, entry=%p, ipv4=%A, open_ta=%d, state=before_decrement\n", occ_entry, occ_entry->occe_srv_ipv4, occ_entry->occe_open_ta));
		else {
			occ_entry->occe_open_ta--;
			if (OCCE_IS_FLAG(occ_entry, OCCE_FL_TA_WAIT)
			    && occ_entry->occe_open_ta < occ_entry->occe_cur_conc)
			{
				flags |= OCCE_FL_TA_WAIT;
				OCCE_CLR_FLAG(occ_entry, OCCE_FL_TA_WAIT);
			}
		}

		QM_LEV_DPRINTFC(QDC_OCC, 4, (QM_DEBFP, "sev=INFO, func=occ_ta_close_entry, occ_entry=%p, ipv4=%A, open_ta=%d, state=after_decrement, occ_entry=%p, flags=%x\n", occ_entry, occ_entry->occe_srv_ipv4, occ_entry->occe_open_ta, occ_entry, occ_entry->occe_flags));
	}
	else {
		QM_LEV_DPRINTFC(QDC_OCC, 4, (QM_DEBFP, "sev=ERROR, func=occ_ta_close_entry, ipv4=%A, occ_entry_find=%x\n", occ_entry->occe_srv_ipv4, ret));
	}

	if (thr_unl_no_err(locktype)) {
		r = smthread_mutex_unlock(&occ_ctx->occx_mutex);
		SM_ASSERT(r == 0);
		if (r != 0)
			return sm_error_perm(SM_EM_OCC, r);
	}
	if (pflags != NULL)
		*pflags = flags;
	return SM_SUCCESS;
}

/*
**  OCC_CLOSE -- close an OCC
**
**	Parameters:
**		occ_ctx -- OCC context
**
**	Returns:
**		SM_SUCCESS
**
**	Side Effects: free occ
**
**	Locking: no locking, destroys OCC (and hence lock)
**
**	Last code review: 2005-03-16 05:33:13
**	Last code change:
*/

sm_ret_T
occ_close(occ_ctx_P occ_ctx)
{
#undef SMFCT
#define SMFCT "occ_close"
	if (NULL == occ_ctx)
		return SM_SUCCESS;
	SM_IS_OCCX(occ_ctx);
#if DA_OCC_RSC
	rsc_free(occ_ctx->occx_ht);
#else
	bht_destroy(occ_ctx->occx_ht, NULL, NULL);
#endif
	pthread_mutex_destroy(&occ_ctx->occx_mutex);
	SM_STR_FREE(occ_ctx->occx_lhs);
	SM_STR_FREE(occ_ctx->occx_tag);
	SM_STR_FREE(occ_ctx->occx_rhs);
#if OCC_CHECK
	occ_ctx->sm_magic = SM_MAGIC_NULL;
#endif
	sm_free_size(occ_ctx, sizeof(*occ_ctx));
	return SM_SUCCESS;
}

#if DA_OCC_RSC
/*
**  RSC_OCCE_CREATE -- "create" an OCC entry (callback function for rsc)
**
**	Parameters:
**		key -- ignored
**		len -- ignored
**		value -- OCC entry
**		ctx -- ignored
**
**	Returns:
**		value
**
**	Last code review: 2005-03-17 04:50:31
**	Last code change:
*/

static void *
rsc_occe_create(const char *key, uint len, void *value, void *ctx)
{
#undef SMFCT
#define SMFCT "rsc_occe_create"
	return value;
}

/*
**  RSC_OCCE_DELETE -- "delete" an OCC entry (callback function for rsc)
**
**	Parameters:
**		value -- OCC entry
**		ctx -- OCC context
**
**	Returns:
**		SM_SUCCESS (see occ_entry_free())
**
**	Last code review: 2005-03-16 05:38:32
**	Last code change:
*/

static sm_ret_T
rsc_occe_delete(void *value, void *ctx)
{
#undef SMFCT
#define SMFCT "rsc_occe_delete"
	occ_ctx_P occ_ctx;

	occ_ctx = (occ_ctx_P) ctx;
	SM_IS_OCCX(occ_ctx);
	return occ_entry_free(&occ_ctx->occx_fl_hd, (occ_entry_P) value,
			&occ_ctx->occx_mutex, THR_NO_LOCK);
}
#endif /* DA_OCC_RSC */

/*
**  OCC_OPEN -- open a new OCC context
**
**	Parameters:
**		pocc_ctx -- pointer to OCC context (output)
**		size -- size of OCC
**
**	Returns:
**		usual sm_error code; ENOMEM,
**
**	Side Effects: none on error
**
**	Last code review: 2005-03-16 05:42:34
**	Last code change:
*/

sm_ret_T
occ_open(occ_ctx_P *pocc_ctx, uint size)
{
#undef SMFCT
#define SMFCT "occ_open"
	int r;
	sm_ret_T ret;
	occ_ctx_P occ_ctx;

/* these are upper limits which are a bit larger than necessary */
#define SM_MAX_CONF_STR		16	/* max length of IPv4 address + tag */
#define SM_CONF_IPV4		10	/* length of IPv4 address */
#define SM_CONF_TAG		8	/* length of tag */
#define SM_CONF_RHS		14	/* length of IPv4 address + tag */

	SM_REQUIRE(pocc_ctx != NULL);
	SM_REQUIRE(size > 0);
	occ_ctx = (occ_ctx_P) sm_zalloc(sizeof(*occ_ctx));
	if (NULL == occ_ctx)
		return sm_error_temp(SM_EM_OCC, ENOMEM);

	/* use some prime number?? */
#if DA_OCC_RSC
	occ_ctx->occx_ht = rsc_new(size * 2 + 1, size * 3 + 3,
		rsc_occe_create, rsc_occe_delete, (void *) occ_ctx);
#else
	occ_ctx->occx_ht = bht_new(size, size * 2 + 1);
#endif
	if (NULL == occ_ctx->occx_ht)
		goto enomem;
	OCCFL_INIT(&occ_ctx->occx_fl_hd);
	occ_ctx->occx_lhs = sm_str_new(NULL, SM_CONF_IPV4, SM_MAX_CONF_STR);
	if (NULL == occ_ctx->occx_lhs)
		goto enomem;
	occ_ctx->occx_tag = sm_str_new(NULL, SM_CONF_TAG, SM_MAX_CONF_STR);
	if (NULL == occ_ctx->occx_tag)
		goto enomem;
	occ_ctx->occx_rhs = sm_str_new(NULL, SM_CONF_RHS, SM_MAX_CONF_STR);
	if (NULL == occ_ctx->occx_rhs)
		goto enomem;

	r = pthread_mutex_init(&occ_ctx->occx_mutex, SM_PTHREAD_MUTEXATTR);
	if (r != 0) {
		ret = sm_error_perm(SM_EM_OCC, r);
		goto error;
	}
#if 0
//	occ_ctx->occ_entries_max = size;
//	occ_ctx->occ_entries_lim = size;
#endif

#if OCC_CHECK
	occ_ctx->sm_magic = SM_OCCX_MAGIC;
#endif
	*pocc_ctx = occ_ctx;
	return SM_SUCCESS;

  enomem:
	ret = sm_error_temp(SM_EM_OCC, ENOMEM);
  error:
	if (occ_ctx != NULL)	/* just paranoia */ {
		if (occ_ctx->occx_ht != NULL) {
#if DA_OCC_RSC
			rsc_free(occ_ctx->occx_ht);
#else
			bht_destroy(occ_ctx->occx_ht, NULL, NULL);
#endif
		}
		SM_STR_FREE(occ_ctx->occx_lhs);
		SM_STR_FREE(occ_ctx->occx_tag);
		SM_STR_FREE(occ_ctx->occx_rhs);
		sm_free_size(occ_ctx, sizeof(*occ_ctx));
	}
	return ret;
}


syntax highlighted by Code2HTML, v. 0.9.1