/*
* Copyright (c) 2004-2006 Sendmail, Inc. and its suppliers.
* All rights reserved.
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the sendmail distribution.
*/
#include "sm/generic.h"
SM_RCSID("@(#)$Id: occ.c,v 1.33 2007/08/14 04:35:08 ca Exp $")
#include "sm/types.h"
#include "sm/assert.h"
#include "sm/magic.h"
#include "sm/str.h"
#include "sm/memops.h"
#include "sm/pthread.h"
#include "sm/evthr.h"
#include "sm/qmgr-int.h"
#include "sm/map.h"
#include "occ.h"
/*
** Note: locking occ_ctx for all operations can cause locking contentions.
** In most cases it should be sufficient to lock just occ_entry.
** "exception": the lookup strings (occ_ctx->occx_lhs etc) must be protected
** (alternatively local strings can be used again with the possible drawback
** that an allocation may fail).
*/
/*
** OCC_SESS_OPEN -- open a session in OCC
**
** Parameters:
** qmgr_ctx -- QMGR context (only qmgr_cnf is needed)
** occ_ctx -- OCC context
** srv_ipv4 -- IPv4 address of server (HACK)
** pocc_entry -- pointer to OCC entry (output)
** locktype -- kind of locking
**
** Returns:
** usual sm_error code; ENOMEM (see occ_entry_new())
**
** Side Effects: none on error (except if unlock fails)
**
** Locking: locks occ_ctx during operation if requested, returns unlocked
**
** Last code review: 2005-03-17 02:23:27
** Last code change:
*/
sm_ret_T
occ_sess_open(qmgr_ctx_P qmgr_ctx, occ_ctx_P occ_ctx, ipv4_T srv_ipv4, occ_entry_P *pocc_entry, thr_lock_T locktype)
{
#undef SMFCT
#define SMFCT "occ_sess_open"
sm_ret_T ret;
int r;
occ_entry_P occ_entry;
time_T now;
now = evthr_time(qmgr_ctx->qmgr_ev_ctx);
if (thr_lock_it(locktype)) {
r = smthread_mutex_lock(&occ_ctx->occx_mutex);
SM_LOCK_OK(r);
if (r != 0)
return sm_error_perm(SM_EM_OCC, r);
}
/* find entry for IP address, if it doesn't exist: create new one */
ret = occ_entry_find(occ_ctx->occx_ht, srv_ipv4, &occ_entry,
&occ_ctx->occx_mutex, THR_NO_LOCK);
if (sm_is_err(ret)) {
ret = occ_entry_new(occ_ctx->occx_ht, &occ_ctx->occx_fl_hd, srv_ipv4,
&occ_entry, &occ_ctx->occx_mutex, THR_NO_LOCK);
if (sm_is_err(ret))
goto error;
SM_IS_OCCE(occ_entry);
occ_entry->occe_init_conc = qmgr_ctx->qmgr_cnf.q_cnf_init_conc_conn;
occ_entry->occe_cur_conc = qmgr_ctx->qmgr_cnf.q_cnf_init_conc_conn;
occ_entry->occe_max_conc = qmgr_ctx->qmgr_cnf.q_cnf_max_conc_conn;
if (qmgr_ctx->qmgr_conf_map != NULL) {
/* tags for conf db */
#define OCITAG "oci:" /* outgoing connections initial */
#define OCMTAG "ocm:" /* outgoing connections max */
#define OCTOTAG "octo:" /* outgoing connection cache timeout */
sm_str_clr(occ_ctx->occx_lhs);
sm_str_clr(occ_ctx->occx_tag);
sm_str_clr(occ_ctx->occx_rhs);
sm_inet_ipv4str(srv_ipv4, occ_ctx->occx_lhs);
if (sm_str_scat(occ_ctx->occx_tag, OCITAG) == SM_SUCCESS
&& sm_map_lookup_ip(qmgr_ctx->qmgr_conf_map,
occ_ctx->occx_lhs, occ_ctx->occx_tag,
SMMAP_LFL_SUBNETS|SMMAP_LFL_TAG,
occ_ctx->occx_rhs) == SM_SUCCESS
&& sm_str_getlen(occ_ctx->occx_rhs) > 0)
{
ulong v;
errno = 0;
v = strtoul((char *)sm_str_getdata(occ_ctx->occx_rhs), NULL, 0);
if (v != ULONG_MAX && errno != ERANGE)
occ_entry->occe_cur_conc = occ_entry->occe_init_conc =
(uint) v;
}
sm_str_clr(occ_ctx->occx_tag);
sm_str_clr(occ_ctx->occx_rhs);
if (sm_str_scat(occ_ctx->occx_tag, OCMTAG) == SM_SUCCESS
&& sm_map_lookup_ip(qmgr_ctx->qmgr_conf_map,
occ_ctx->occx_lhs, occ_ctx->occx_tag,
SMMAP_LFL_SUBNETS|SMMAP_LFL_TAG,
occ_ctx->occx_rhs) == SM_SUCCESS
&& sm_str_getlen(occ_ctx->occx_rhs) > 0)
{
ulong v;
errno = 0;
v = strtoul((char *)sm_str_getdata(occ_ctx->occx_rhs), NULL, 0);
if (v != ULONG_MAX && errno != ERANGE)
occ_entry->occe_max_conc = (uint) v;
}
sm_str_clr(occ_ctx->occx_tag);
sm_str_clr(occ_ctx->occx_rhs);
if (sm_str_scat(occ_ctx->occx_tag, OCTOTAG) == SM_SUCCESS
&& sm_map_lookup_ip(qmgr_ctx->qmgr_conf_map,
occ_ctx->occx_lhs, occ_ctx->occx_tag,
SMMAP_LFL_SUBNETS|SMMAP_LFL_TAG,
occ_ctx->occx_rhs) == SM_SUCCESS
&& sm_str_getlen(occ_ctx->occx_rhs) > 0)
{
ulong v;
errno = 0;
v = strtoul((char *)sm_str_getdata(occ_ctx->occx_rhs), NULL, 0);
if (v != ULONG_MAX && errno != ERANGE)
occ_entry->occe_timeout = (uint) v;
}
QM_LEV_DPRINTFC(QDC_OCC, 4, (QM_DEBFP, "sev=INFO, func=occ_sess_open, ipv4=%A, conc_init=%u, conc_max=%u\n", (ipv4_T) srv_ipv4, occ_entry->occe_cur_conc, occ_entry->occe_max_conc));
}
}
else {
SM_ASSERT(occ_entry != NULL);
SM_ASSERT(occ_entry->occe_srv_ipv4 == srv_ipv4);
}
/* increment counters etc */
++occ_entry->occe_open_se;
++occ_entry->occe_open_ta;
#if 0
occ_entry->occe_last_conn = now;
#endif
occ_entry->occe_last_upd = now;
QM_LEV_DPRINTFC(QDC_OCC, 4, (QM_DEBFP, "sev=INFO, func=occ_sess_open, occ_entry=%p, ipv4=%A, open_se=%u, open_ta=%u, occ_entry=%p, flags=%x\n", occ_entry, (ipv4_T) occ_entry->occe_srv_ipv4, occ_entry->occe_open_se, occ_entry->occe_open_ta, occ_entry, occ_entry->occe_flags));
if (pocc_entry != NULL)
*pocc_entry = occ_entry;
if (thr_unl_no_err(locktype)) {
r = smthread_mutex_unlock(&occ_ctx->occx_mutex);
SM_ASSERT(r == 0);
if (r != 0)
return sm_error_perm(SM_EM_OCC, r);
}
return SM_SUCCESS;
error:
if (thr_unl_if_err(locktype)) {
r = smthread_mutex_unlock(&occ_ctx->occx_mutex);
SM_ASSERT(r == 0);
if (r != 0 && sm_is_success(ret))
ret = sm_error_perm(SM_EM_OCC, r);
}
return ret;
}
/*
** OCC_SESS_CLOSE_ENTRY -- close a session in OCC
**
** Parameters:
** occ_ctx -- OCC context
** srv_ipv4 -- IPv4 address of server (HACK)
** ok -- session was ok?
** now -- current time
** pflags -- flags of session (output, may be NULL)
** locktype -- kind of locking
**
** Returns:
** usual sm_error code; SM_E_NOTFOUND, (un)lock errors
**
** Side Effects: none on error (except if unlock fails)
**
** Locking: locks occ_ctx if requested
**
** Last code review: 2005-03-14 18:54:53
** Last code change:
*/
sm_ret_T
occ_sess_close_entry(occ_ctx_P occ_ctx, ipv4_T srv_ipv4, bool ok, time_T now, uint32_t *pflags, thr_lock_T locktype)
{
#undef SMFCT
#define SMFCT "occ_sess_close_entry"
sm_ret_T ret;
int r;
uint32_t flags;
occ_entry_P occ_entry;
SM_IS_OCCX(occ_ctx);
ret = SM_SUCCESS;
flags = 0;
if (thr_lock_it(locktype)) {
r = smthread_mutex_lock(&occ_ctx->occx_mutex);
SM_LOCK_OK(r);
if (r != 0)
return sm_error_perm(SM_EM_OCC, r);
}
/* access locked via occ_ctx */
occ_entry = NULL;
ret = occ_entry_find(occ_ctx->occx_ht, srv_ipv4,
&occ_entry, &occ_ctx->occx_mutex, THR_NO_LOCK);
if (SM_SUCCESS == ret) {
SM_IS_OCCE(occ_entry);
if (occ_entry->occe_open_se == 0) {
QM_LEV_DPRINTFC(QDC_OCC, 0, (QM_DEBFP, "sev=ERROR, func=occ_sess_close_entry, ipv4=%A, open_se=0\n", occ_entry->occe_srv_ipv4));
}
else {
QM_LEV_DPRINTFC(QDC_OCC, 3, (QM_DEBFP, "sev=DBG, func=occ_sess_close_entry, ipv4=%A, ok=%d, open_se=%u, cur_conc=%u, flags=%x\n", occ_entry->occe_srv_ipv4, ok, occ_entry->occe_open_se, occ_entry->occe_cur_conc, occ_entry->occe_flags));
if (ok && occ_entry->occe_open_se >= occ_entry->occe_cur_conc) {
if (occ_entry->occe_cur_conc < occ_entry->occe_max_conc)
++occ_entry->occe_cur_conc;
}
else if (!ok) {
if (occ_entry->occe_cur_conc > 0)
--occ_entry->occe_cur_conc;
}
--occ_entry->occe_open_se;
if (occ_entry->occe_open_se < occ_entry->occe_cur_conc) {
flags |= OCCE_FL_BLW_LIM;
if (OCCE_IS_FLAG(occ_entry, OCCE_FL_SE_WAIT)) {
flags |= OCCE_FL_SE_WAIT;
OCCE_CLR_FLAG(occ_entry, OCCE_FL_SE_WAIT);
}
}
}
if (occ_entry->occe_open_ta == 0) {
QM_LEV_DPRINTFC(QDC_OCC, 3, (QM_DEBFP, "sev=ERROR, func=occ_sess_close_entry, ipv4=%A, open_ta=0\n", occ_entry->occe_srv_ipv4));
}
else
occ_entry->occe_open_ta--;
if (occ_entry->occe_open_se == 0) {
#if !DA_OCC_RSC
bht_rm(occ_ctx->occx_ht, (char *)&occ_entry->occe_srv_ipv4,
sizeof(occ_entry->occe_srv_ipv4), NULL, NULL);
occ_entry_free(&occ_ctx->occx_fl_hd, occ_entry,
&occ_ctx->occx_mutex, THR_NO_LOCK);
#endif /* !DA_OCC_RSC */
}
}
else {
QM_LEV_DPRINTFC(QDC_OCC, 4, (QM_DEBFP, "sev=ERROR, func=occ_sess_close_entry, ipv4=%A, occ_entry_find=%x\n", occ_entry->occe_srv_ipv4, ret));
}
if ((!sm_is_err(ret) && thr_unl_no_err(locktype))
|| (sm_is_err(ret) && thr_unl_if_err(locktype)))
{
r = smthread_mutex_unlock(&occ_ctx->occx_mutex);
SM_ASSERT(r == 0);
if (r != 0 && sm_is_success(ret))
ret = sm_error_perm(SM_EM_OCC, r);
}
if (pflags != NULL)
*pflags = flags;
return ret;
}
/*
** OCC_SESS_REUSE -- reuse a session in OCC: increase number of open TAs
**
** Parameters:
** occ_ctx -- OCC context
** occ_entry -- OCC entry
** now -- current time
** locktype -- kind of locking
**
** Returns:
** SM_SUCCESS except for (un)lock errors
**
** Side Effects: none on error (except if unlock fails)
**
** Locking: locks occ_ctx if requested
**
** Last code review: 2005-03-14 18:56:05
** Last code change:
*/
sm_ret_T
occ_sess_reuse(occ_ctx_P occ_ctx, occ_entry_P occ_entry, time_T now, thr_lock_T locktype)
{
#undef SMFCT
#define SMFCT "occ_sess_reuse"
int r;
SM_IS_OCCX(occ_ctx);
SM_IS_OCCE(occ_entry);
if (thr_lock_it(locktype)) {
r = smthread_mutex_lock(&occ_ctx->occx_mutex);
SM_LOCK_OK(r);
if (r != 0) {
/* SM_ASSERT(r == 0); */
return sm_error_perm(SM_EM_OCC, r);
}
}
++occ_entry->occe_open_ta;
if (thr_unl_no_err(locktype)) {
r = smthread_mutex_unlock(&occ_ctx->occx_mutex);
SM_ASSERT(r == 0);
if (r != 0)
return sm_error_perm(SM_EM_OCC, r);
}
return SM_SUCCESS;
}
/*
** OCC_TA_CLOSE_ENTRY -- close a transaction in OCC
**
** Parameters:
** occ_ctx -- OCC context
** srv_ipv4 -- IPv4 address of server (HACK)
** now -- current time
** pflags -- flags of session (output, may be NULL)
** locktype -- kind of locking
**
** Returns:
** usual sm_error code; SM_E_NOTFOUND, (un)lock errors
**
** Side Effects: none on error (except if unlock fails)
**
** Locking: locks occ_ctx if requested
**
** Last code review: 2005-03-14 18:56:53
** Last code change:
*/
sm_ret_T
occ_ta_close_entry(occ_ctx_P occ_ctx, ipv4_T srv_ipv4, time_T now, uint32_t *pflags, thr_lock_T locktype)
{
#undef SMFCT
#define SMFCT "occ_ta_close_entry"
int r;
sm_ret_T ret;
uint32_t flags;
occ_entry_P occ_entry;
SM_IS_OCCX(occ_ctx);
flags = 0;
if (thr_lock_it(locktype)) {
r = smthread_mutex_lock(&occ_ctx->occx_mutex);
SM_LOCK_OK(r);
if (r != 0)
return sm_error_perm(SM_EM_OCC, r);
}
occ_entry = NULL;
ret = occ_entry_find(occ_ctx->occx_ht, srv_ipv4,
&occ_entry, &occ_ctx->occx_mutex, THR_NO_LOCK);
if (SM_SUCCESS == ret) {
SM_IS_OCCE(occ_entry);
if (occ_entry->occe_open_ta == 0)
QM_LEV_DPRINTFC(QDC_OCC, 4, (QM_DEBFP, "sev=ERROR, func=occ_ta_close_entry, entry=%p, ipv4=%A, open_ta=%d, state=before_decrement\n", occ_entry, occ_entry->occe_srv_ipv4, occ_entry->occe_open_ta));
else {
occ_entry->occe_open_ta--;
if (OCCE_IS_FLAG(occ_entry, OCCE_FL_TA_WAIT)
&& occ_entry->occe_open_ta < occ_entry->occe_cur_conc)
{
flags |= OCCE_FL_TA_WAIT;
OCCE_CLR_FLAG(occ_entry, OCCE_FL_TA_WAIT);
}
}
QM_LEV_DPRINTFC(QDC_OCC, 4, (QM_DEBFP, "sev=INFO, func=occ_ta_close_entry, occ_entry=%p, ipv4=%A, open_ta=%d, state=after_decrement, occ_entry=%p, flags=%x\n", occ_entry, occ_entry->occe_srv_ipv4, occ_entry->occe_open_ta, occ_entry, occ_entry->occe_flags));
}
else {
QM_LEV_DPRINTFC(QDC_OCC, 4, (QM_DEBFP, "sev=ERROR, func=occ_ta_close_entry, ipv4=%A, occ_entry_find=%x\n", occ_entry->occe_srv_ipv4, ret));
}
if (thr_unl_no_err(locktype)) {
r = smthread_mutex_unlock(&occ_ctx->occx_mutex);
SM_ASSERT(r == 0);
if (r != 0)
return sm_error_perm(SM_EM_OCC, r);
}
if (pflags != NULL)
*pflags = flags;
return SM_SUCCESS;
}
/*
** OCC_CLOSE -- close an OCC
**
** Parameters:
** occ_ctx -- OCC context
**
** Returns:
** SM_SUCCESS
**
** Side Effects: free occ
**
** Locking: no locking, destroys OCC (and hence lock)
**
** Last code review: 2005-03-16 05:33:13
** Last code change:
*/
sm_ret_T
occ_close(occ_ctx_P occ_ctx)
{
#undef SMFCT
#define SMFCT "occ_close"
if (NULL == occ_ctx)
return SM_SUCCESS;
SM_IS_OCCX(occ_ctx);
#if DA_OCC_RSC
rsc_free(occ_ctx->occx_ht);
#else
bht_destroy(occ_ctx->occx_ht, NULL, NULL);
#endif
pthread_mutex_destroy(&occ_ctx->occx_mutex);
SM_STR_FREE(occ_ctx->occx_lhs);
SM_STR_FREE(occ_ctx->occx_tag);
SM_STR_FREE(occ_ctx->occx_rhs);
#if OCC_CHECK
occ_ctx->sm_magic = SM_MAGIC_NULL;
#endif
sm_free_size(occ_ctx, sizeof(*occ_ctx));
return SM_SUCCESS;
}
#if DA_OCC_RSC
/*
** RSC_OCCE_CREATE -- "create" an OCC entry (callback function for rsc)
**
** Parameters:
** key -- ignored
** len -- ignored
** value -- OCC entry
** ctx -- ignored
**
** Returns:
** value
**
** Last code review: 2005-03-17 04:50:31
** Last code change:
*/
static void *
rsc_occe_create(const char *key, uint len, void *value, void *ctx)
{
#undef SMFCT
#define SMFCT "rsc_occe_create"
return value;
}
/*
** RSC_OCCE_DELETE -- "delete" an OCC entry (callback function for rsc)
**
** Parameters:
** value -- OCC entry
** ctx -- OCC context
**
** Returns:
** SM_SUCCESS (see occ_entry_free())
**
** Last code review: 2005-03-16 05:38:32
** Last code change:
*/
static sm_ret_T
rsc_occe_delete(void *value, void *ctx)
{
#undef SMFCT
#define SMFCT "rsc_occe_delete"
occ_ctx_P occ_ctx;
occ_ctx = (occ_ctx_P) ctx;
SM_IS_OCCX(occ_ctx);
return occ_entry_free(&occ_ctx->occx_fl_hd, (occ_entry_P) value,
&occ_ctx->occx_mutex, THR_NO_LOCK);
}
#endif /* DA_OCC_RSC */
/*
** OCC_OPEN -- open a new OCC context
**
** Parameters:
** pocc_ctx -- pointer to OCC context (output)
** size -- size of OCC
**
** Returns:
** usual sm_error code; ENOMEM,
**
** Side Effects: none on error
**
** Last code review: 2005-03-16 05:42:34
** Last code change:
*/
sm_ret_T
occ_open(occ_ctx_P *pocc_ctx, uint size)
{
#undef SMFCT
#define SMFCT "occ_open"
int r;
sm_ret_T ret;
occ_ctx_P occ_ctx;
/* these are upper limits which are a bit larger than necessary */
#define SM_MAX_CONF_STR 16 /* max length of IPv4 address + tag */
#define SM_CONF_IPV4 10 /* length of IPv4 address */
#define SM_CONF_TAG 8 /* length of tag */
#define SM_CONF_RHS 14 /* length of IPv4 address + tag */
SM_REQUIRE(pocc_ctx != NULL);
SM_REQUIRE(size > 0);
occ_ctx = (occ_ctx_P) sm_zalloc(sizeof(*occ_ctx));
if (NULL == occ_ctx)
return sm_error_temp(SM_EM_OCC, ENOMEM);
/* use some prime number?? */
#if DA_OCC_RSC
occ_ctx->occx_ht = rsc_new(size * 2 + 1, size * 3 + 3,
rsc_occe_create, rsc_occe_delete, (void *) occ_ctx);
#else
occ_ctx->occx_ht = bht_new(size, size * 2 + 1);
#endif
if (NULL == occ_ctx->occx_ht)
goto enomem;
OCCFL_INIT(&occ_ctx->occx_fl_hd);
occ_ctx->occx_lhs = sm_str_new(NULL, SM_CONF_IPV4, SM_MAX_CONF_STR);
if (NULL == occ_ctx->occx_lhs)
goto enomem;
occ_ctx->occx_tag = sm_str_new(NULL, SM_CONF_TAG, SM_MAX_CONF_STR);
if (NULL == occ_ctx->occx_tag)
goto enomem;
occ_ctx->occx_rhs = sm_str_new(NULL, SM_CONF_RHS, SM_MAX_CONF_STR);
if (NULL == occ_ctx->occx_rhs)
goto enomem;
r = pthread_mutex_init(&occ_ctx->occx_mutex, SM_PTHREAD_MUTEXATTR);
if (r != 0) {
ret = sm_error_perm(SM_EM_OCC, r);
goto error;
}
#if 0
// occ_ctx->occ_entries_max = size;
// occ_ctx->occ_entries_lim = size;
#endif
#if OCC_CHECK
occ_ctx->sm_magic = SM_OCCX_MAGIC;
#endif
*pocc_ctx = occ_ctx;
return SM_SUCCESS;
enomem:
ret = sm_error_temp(SM_EM_OCC, ENOMEM);
error:
if (occ_ctx != NULL) /* just paranoia */ {
if (occ_ctx->occx_ht != NULL) {
#if DA_OCC_RSC
rsc_free(occ_ctx->occx_ht);
#else
bht_destroy(occ_ctx->occx_ht, NULL, NULL);
#endif
}
SM_STR_FREE(occ_ctx->occx_lhs);
SM_STR_FREE(occ_ctx->occx_tag);
SM_STR_FREE(occ_ctx->occx_rhs);
sm_free_size(occ_ctx, sizeof(*occ_ctx));
}
return ret;
}
syntax highlighted by Code2HTML, v. 0.9.1