/* * Copyright (c) 2004-2006 Sendmail, Inc. and its suppliers. * All rights reserved. * * By using this file, you agree to the terms and conditions set * forth in the LICENSE file which can be found at the top level of * the sendmail distribution. */ #include "sm/generic.h" SM_RCSID("@(#)$Id: occ.c,v 1.33 2007/08/14 04:35:08 ca Exp $") #include "sm/types.h" #include "sm/assert.h" #include "sm/magic.h" #include "sm/str.h" #include "sm/memops.h" #include "sm/pthread.h" #include "sm/evthr.h" #include "sm/qmgr-int.h" #include "sm/map.h" #include "occ.h" /* ** Note: locking occ_ctx for all operations can cause locking contentions. ** In most cases it should be sufficient to lock just occ_entry. ** "exception": the lookup strings (occ_ctx->occx_lhs etc) must be protected ** (alternatively local strings can be used again with the possible drawback ** that an allocation may fail). */ /* ** OCC_SESS_OPEN -- open a session in OCC ** ** Parameters: ** qmgr_ctx -- QMGR context (only qmgr_cnf is needed) ** occ_ctx -- OCC context ** srv_ipv4 -- IPv4 address of server (HACK) ** pocc_entry -- pointer to OCC entry (output) ** locktype -- kind of locking ** ** Returns: ** usual sm_error code; ENOMEM (see occ_entry_new()) ** ** Side Effects: none on error (except if unlock fails) ** ** Locking: locks occ_ctx during operation if requested, returns unlocked ** ** Last code review: 2005-03-17 02:23:27 ** Last code change: */ sm_ret_T occ_sess_open(qmgr_ctx_P qmgr_ctx, occ_ctx_P occ_ctx, ipv4_T srv_ipv4, occ_entry_P *pocc_entry, thr_lock_T locktype) { #undef SMFCT #define SMFCT "occ_sess_open" sm_ret_T ret; int r; occ_entry_P occ_entry; time_T now; now = evthr_time(qmgr_ctx->qmgr_ev_ctx); if (thr_lock_it(locktype)) { r = smthread_mutex_lock(&occ_ctx->occx_mutex); SM_LOCK_OK(r); if (r != 0) return sm_error_perm(SM_EM_OCC, r); } /* find entry for IP address, if it doesn't exist: create new one */ ret = occ_entry_find(occ_ctx->occx_ht, srv_ipv4, &occ_entry, &occ_ctx->occx_mutex, THR_NO_LOCK); if (sm_is_err(ret)) { ret = occ_entry_new(occ_ctx->occx_ht, &occ_ctx->occx_fl_hd, srv_ipv4, &occ_entry, &occ_ctx->occx_mutex, THR_NO_LOCK); if (sm_is_err(ret)) goto error; SM_IS_OCCE(occ_entry); occ_entry->occe_init_conc = qmgr_ctx->qmgr_cnf.q_cnf_init_conc_conn; occ_entry->occe_cur_conc = qmgr_ctx->qmgr_cnf.q_cnf_init_conc_conn; occ_entry->occe_max_conc = qmgr_ctx->qmgr_cnf.q_cnf_max_conc_conn; if (qmgr_ctx->qmgr_conf_map != NULL) { /* tags for conf db */ #define OCITAG "oci:" /* outgoing connections initial */ #define OCMTAG "ocm:" /* outgoing connections max */ #define OCTOTAG "octo:" /* outgoing connection cache timeout */ sm_str_clr(occ_ctx->occx_lhs); sm_str_clr(occ_ctx->occx_tag); sm_str_clr(occ_ctx->occx_rhs); sm_inet_ipv4str(srv_ipv4, occ_ctx->occx_lhs); if (sm_str_scat(occ_ctx->occx_tag, OCITAG) == SM_SUCCESS && sm_map_lookup_ip(qmgr_ctx->qmgr_conf_map, occ_ctx->occx_lhs, occ_ctx->occx_tag, SMMAP_LFL_SUBNETS|SMMAP_LFL_TAG, occ_ctx->occx_rhs) == SM_SUCCESS && sm_str_getlen(occ_ctx->occx_rhs) > 0) { ulong v; errno = 0; v = strtoul((char *)sm_str_getdata(occ_ctx->occx_rhs), NULL, 0); if (v != ULONG_MAX && errno != ERANGE) occ_entry->occe_cur_conc = occ_entry->occe_init_conc = (uint) v; } sm_str_clr(occ_ctx->occx_tag); sm_str_clr(occ_ctx->occx_rhs); if (sm_str_scat(occ_ctx->occx_tag, OCMTAG) == SM_SUCCESS && sm_map_lookup_ip(qmgr_ctx->qmgr_conf_map, occ_ctx->occx_lhs, occ_ctx->occx_tag, SMMAP_LFL_SUBNETS|SMMAP_LFL_TAG, occ_ctx->occx_rhs) == SM_SUCCESS && sm_str_getlen(occ_ctx->occx_rhs) > 0) { ulong v; errno = 0; v = strtoul((char *)sm_str_getdata(occ_ctx->occx_rhs), NULL, 0); if (v != ULONG_MAX && errno != ERANGE) occ_entry->occe_max_conc = (uint) v; } sm_str_clr(occ_ctx->occx_tag); sm_str_clr(occ_ctx->occx_rhs); if (sm_str_scat(occ_ctx->occx_tag, OCTOTAG) == SM_SUCCESS && sm_map_lookup_ip(qmgr_ctx->qmgr_conf_map, occ_ctx->occx_lhs, occ_ctx->occx_tag, SMMAP_LFL_SUBNETS|SMMAP_LFL_TAG, occ_ctx->occx_rhs) == SM_SUCCESS && sm_str_getlen(occ_ctx->occx_rhs) > 0) { ulong v; errno = 0; v = strtoul((char *)sm_str_getdata(occ_ctx->occx_rhs), NULL, 0); if (v != ULONG_MAX && errno != ERANGE) occ_entry->occe_timeout = (uint) v; } QM_LEV_DPRINTFC(QDC_OCC, 4, (QM_DEBFP, "sev=INFO, func=occ_sess_open, ipv4=%A, conc_init=%u, conc_max=%u\n", (ipv4_T) srv_ipv4, occ_entry->occe_cur_conc, occ_entry->occe_max_conc)); } } else { SM_ASSERT(occ_entry != NULL); SM_ASSERT(occ_entry->occe_srv_ipv4 == srv_ipv4); } /* increment counters etc */ ++occ_entry->occe_open_se; ++occ_entry->occe_open_ta; #if 0 occ_entry->occe_last_conn = now; #endif occ_entry->occe_last_upd = now; QM_LEV_DPRINTFC(QDC_OCC, 4, (QM_DEBFP, "sev=INFO, func=occ_sess_open, occ_entry=%p, ipv4=%A, open_se=%u, open_ta=%u, occ_entry=%p, flags=%x\n", occ_entry, (ipv4_T) occ_entry->occe_srv_ipv4, occ_entry->occe_open_se, occ_entry->occe_open_ta, occ_entry, occ_entry->occe_flags)); if (pocc_entry != NULL) *pocc_entry = occ_entry; if (thr_unl_no_err(locktype)) { r = smthread_mutex_unlock(&occ_ctx->occx_mutex); SM_ASSERT(r == 0); if (r != 0) return sm_error_perm(SM_EM_OCC, r); } return SM_SUCCESS; error: if (thr_unl_if_err(locktype)) { r = smthread_mutex_unlock(&occ_ctx->occx_mutex); SM_ASSERT(r == 0); if (r != 0 && sm_is_success(ret)) ret = sm_error_perm(SM_EM_OCC, r); } return ret; } /* ** OCC_SESS_CLOSE_ENTRY -- close a session in OCC ** ** Parameters: ** occ_ctx -- OCC context ** srv_ipv4 -- IPv4 address of server (HACK) ** ok -- session was ok? ** now -- current time ** pflags -- flags of session (output, may be NULL) ** locktype -- kind of locking ** ** Returns: ** usual sm_error code; SM_E_NOTFOUND, (un)lock errors ** ** Side Effects: none on error (except if unlock fails) ** ** Locking: locks occ_ctx if requested ** ** Last code review: 2005-03-14 18:54:53 ** Last code change: */ sm_ret_T occ_sess_close_entry(occ_ctx_P occ_ctx, ipv4_T srv_ipv4, bool ok, time_T now, uint32_t *pflags, thr_lock_T locktype) { #undef SMFCT #define SMFCT "occ_sess_close_entry" sm_ret_T ret; int r; uint32_t flags; occ_entry_P occ_entry; SM_IS_OCCX(occ_ctx); ret = SM_SUCCESS; flags = 0; if (thr_lock_it(locktype)) { r = smthread_mutex_lock(&occ_ctx->occx_mutex); SM_LOCK_OK(r); if (r != 0) return sm_error_perm(SM_EM_OCC, r); } /* access locked via occ_ctx */ occ_entry = NULL; ret = occ_entry_find(occ_ctx->occx_ht, srv_ipv4, &occ_entry, &occ_ctx->occx_mutex, THR_NO_LOCK); if (SM_SUCCESS == ret) { SM_IS_OCCE(occ_entry); if (occ_entry->occe_open_se == 0) { QM_LEV_DPRINTFC(QDC_OCC, 0, (QM_DEBFP, "sev=ERROR, func=occ_sess_close_entry, ipv4=%A, open_se=0\n", occ_entry->occe_srv_ipv4)); } else { QM_LEV_DPRINTFC(QDC_OCC, 3, (QM_DEBFP, "sev=DBG, func=occ_sess_close_entry, ipv4=%A, ok=%d, open_se=%u, cur_conc=%u, flags=%x\n", occ_entry->occe_srv_ipv4, ok, occ_entry->occe_open_se, occ_entry->occe_cur_conc, occ_entry->occe_flags)); if (ok && occ_entry->occe_open_se >= occ_entry->occe_cur_conc) { if (occ_entry->occe_cur_conc < occ_entry->occe_max_conc) ++occ_entry->occe_cur_conc; } else if (!ok) { if (occ_entry->occe_cur_conc > 0) --occ_entry->occe_cur_conc; } --occ_entry->occe_open_se; if (occ_entry->occe_open_se < occ_entry->occe_cur_conc) { flags |= OCCE_FL_BLW_LIM; if (OCCE_IS_FLAG(occ_entry, OCCE_FL_SE_WAIT)) { flags |= OCCE_FL_SE_WAIT; OCCE_CLR_FLAG(occ_entry, OCCE_FL_SE_WAIT); } } } if (occ_entry->occe_open_ta == 0) { QM_LEV_DPRINTFC(QDC_OCC, 3, (QM_DEBFP, "sev=ERROR, func=occ_sess_close_entry, ipv4=%A, open_ta=0\n", occ_entry->occe_srv_ipv4)); } else occ_entry->occe_open_ta--; if (occ_entry->occe_open_se == 0) { #if !DA_OCC_RSC bht_rm(occ_ctx->occx_ht, (char *)&occ_entry->occe_srv_ipv4, sizeof(occ_entry->occe_srv_ipv4), NULL, NULL); occ_entry_free(&occ_ctx->occx_fl_hd, occ_entry, &occ_ctx->occx_mutex, THR_NO_LOCK); #endif /* !DA_OCC_RSC */ } } else { QM_LEV_DPRINTFC(QDC_OCC, 4, (QM_DEBFP, "sev=ERROR, func=occ_sess_close_entry, ipv4=%A, occ_entry_find=%x\n", occ_entry->occe_srv_ipv4, ret)); } if ((!sm_is_err(ret) && thr_unl_no_err(locktype)) || (sm_is_err(ret) && thr_unl_if_err(locktype))) { r = smthread_mutex_unlock(&occ_ctx->occx_mutex); SM_ASSERT(r == 0); if (r != 0 && sm_is_success(ret)) ret = sm_error_perm(SM_EM_OCC, r); } if (pflags != NULL) *pflags = flags; return ret; } /* ** OCC_SESS_REUSE -- reuse a session in OCC: increase number of open TAs ** ** Parameters: ** occ_ctx -- OCC context ** occ_entry -- OCC entry ** now -- current time ** locktype -- kind of locking ** ** Returns: ** SM_SUCCESS except for (un)lock errors ** ** Side Effects: none on error (except if unlock fails) ** ** Locking: locks occ_ctx if requested ** ** Last code review: 2005-03-14 18:56:05 ** Last code change: */ sm_ret_T occ_sess_reuse(occ_ctx_P occ_ctx, occ_entry_P occ_entry, time_T now, thr_lock_T locktype) { #undef SMFCT #define SMFCT "occ_sess_reuse" int r; SM_IS_OCCX(occ_ctx); SM_IS_OCCE(occ_entry); if (thr_lock_it(locktype)) { r = smthread_mutex_lock(&occ_ctx->occx_mutex); SM_LOCK_OK(r); if (r != 0) { /* SM_ASSERT(r == 0); */ return sm_error_perm(SM_EM_OCC, r); } } ++occ_entry->occe_open_ta; if (thr_unl_no_err(locktype)) { r = smthread_mutex_unlock(&occ_ctx->occx_mutex); SM_ASSERT(r == 0); if (r != 0) return sm_error_perm(SM_EM_OCC, r); } return SM_SUCCESS; } /* ** OCC_TA_CLOSE_ENTRY -- close a transaction in OCC ** ** Parameters: ** occ_ctx -- OCC context ** srv_ipv4 -- IPv4 address of server (HACK) ** now -- current time ** pflags -- flags of session (output, may be NULL) ** locktype -- kind of locking ** ** Returns: ** usual sm_error code; SM_E_NOTFOUND, (un)lock errors ** ** Side Effects: none on error (except if unlock fails) ** ** Locking: locks occ_ctx if requested ** ** Last code review: 2005-03-14 18:56:53 ** Last code change: */ sm_ret_T occ_ta_close_entry(occ_ctx_P occ_ctx, ipv4_T srv_ipv4, time_T now, uint32_t *pflags, thr_lock_T locktype) { #undef SMFCT #define SMFCT "occ_ta_close_entry" int r; sm_ret_T ret; uint32_t flags; occ_entry_P occ_entry; SM_IS_OCCX(occ_ctx); flags = 0; if (thr_lock_it(locktype)) { r = smthread_mutex_lock(&occ_ctx->occx_mutex); SM_LOCK_OK(r); if (r != 0) return sm_error_perm(SM_EM_OCC, r); } occ_entry = NULL; ret = occ_entry_find(occ_ctx->occx_ht, srv_ipv4, &occ_entry, &occ_ctx->occx_mutex, THR_NO_LOCK); if (SM_SUCCESS == ret) { SM_IS_OCCE(occ_entry); if (occ_entry->occe_open_ta == 0) QM_LEV_DPRINTFC(QDC_OCC, 4, (QM_DEBFP, "sev=ERROR, func=occ_ta_close_entry, entry=%p, ipv4=%A, open_ta=%d, state=before_decrement\n", occ_entry, occ_entry->occe_srv_ipv4, occ_entry->occe_open_ta)); else { occ_entry->occe_open_ta--; if (OCCE_IS_FLAG(occ_entry, OCCE_FL_TA_WAIT) && occ_entry->occe_open_ta < occ_entry->occe_cur_conc) { flags |= OCCE_FL_TA_WAIT; OCCE_CLR_FLAG(occ_entry, OCCE_FL_TA_WAIT); } } QM_LEV_DPRINTFC(QDC_OCC, 4, (QM_DEBFP, "sev=INFO, func=occ_ta_close_entry, occ_entry=%p, ipv4=%A, open_ta=%d, state=after_decrement, occ_entry=%p, flags=%x\n", occ_entry, occ_entry->occe_srv_ipv4, occ_entry->occe_open_ta, occ_entry, occ_entry->occe_flags)); } else { QM_LEV_DPRINTFC(QDC_OCC, 4, (QM_DEBFP, "sev=ERROR, func=occ_ta_close_entry, ipv4=%A, occ_entry_find=%x\n", occ_entry->occe_srv_ipv4, ret)); } if (thr_unl_no_err(locktype)) { r = smthread_mutex_unlock(&occ_ctx->occx_mutex); SM_ASSERT(r == 0); if (r != 0) return sm_error_perm(SM_EM_OCC, r); } if (pflags != NULL) *pflags = flags; return SM_SUCCESS; } /* ** OCC_CLOSE -- close an OCC ** ** Parameters: ** occ_ctx -- OCC context ** ** Returns: ** SM_SUCCESS ** ** Side Effects: free occ ** ** Locking: no locking, destroys OCC (and hence lock) ** ** Last code review: 2005-03-16 05:33:13 ** Last code change: */ sm_ret_T occ_close(occ_ctx_P occ_ctx) { #undef SMFCT #define SMFCT "occ_close" if (NULL == occ_ctx) return SM_SUCCESS; SM_IS_OCCX(occ_ctx); #if DA_OCC_RSC rsc_free(occ_ctx->occx_ht); #else bht_destroy(occ_ctx->occx_ht, NULL, NULL); #endif pthread_mutex_destroy(&occ_ctx->occx_mutex); SM_STR_FREE(occ_ctx->occx_lhs); SM_STR_FREE(occ_ctx->occx_tag); SM_STR_FREE(occ_ctx->occx_rhs); #if OCC_CHECK occ_ctx->sm_magic = SM_MAGIC_NULL; #endif sm_free_size(occ_ctx, sizeof(*occ_ctx)); return SM_SUCCESS; } #if DA_OCC_RSC /* ** RSC_OCCE_CREATE -- "create" an OCC entry (callback function for rsc) ** ** Parameters: ** key -- ignored ** len -- ignored ** value -- OCC entry ** ctx -- ignored ** ** Returns: ** value ** ** Last code review: 2005-03-17 04:50:31 ** Last code change: */ static void * rsc_occe_create(const char *key, uint len, void *value, void *ctx) { #undef SMFCT #define SMFCT "rsc_occe_create" return value; } /* ** RSC_OCCE_DELETE -- "delete" an OCC entry (callback function for rsc) ** ** Parameters: ** value -- OCC entry ** ctx -- OCC context ** ** Returns: ** SM_SUCCESS (see occ_entry_free()) ** ** Last code review: 2005-03-16 05:38:32 ** Last code change: */ static sm_ret_T rsc_occe_delete(void *value, void *ctx) { #undef SMFCT #define SMFCT "rsc_occe_delete" occ_ctx_P occ_ctx; occ_ctx = (occ_ctx_P) ctx; SM_IS_OCCX(occ_ctx); return occ_entry_free(&occ_ctx->occx_fl_hd, (occ_entry_P) value, &occ_ctx->occx_mutex, THR_NO_LOCK); } #endif /* DA_OCC_RSC */ /* ** OCC_OPEN -- open a new OCC context ** ** Parameters: ** pocc_ctx -- pointer to OCC context (output) ** size -- size of OCC ** ** Returns: ** usual sm_error code; ENOMEM, ** ** Side Effects: none on error ** ** Last code review: 2005-03-16 05:42:34 ** Last code change: */ sm_ret_T occ_open(occ_ctx_P *pocc_ctx, uint size) { #undef SMFCT #define SMFCT "occ_open" int r; sm_ret_T ret; occ_ctx_P occ_ctx; /* these are upper limits which are a bit larger than necessary */ #define SM_MAX_CONF_STR 16 /* max length of IPv4 address + tag */ #define SM_CONF_IPV4 10 /* length of IPv4 address */ #define SM_CONF_TAG 8 /* length of tag */ #define SM_CONF_RHS 14 /* length of IPv4 address + tag */ SM_REQUIRE(pocc_ctx != NULL); SM_REQUIRE(size > 0); occ_ctx = (occ_ctx_P) sm_zalloc(sizeof(*occ_ctx)); if (NULL == occ_ctx) return sm_error_temp(SM_EM_OCC, ENOMEM); /* use some prime number?? */ #if DA_OCC_RSC occ_ctx->occx_ht = rsc_new(size * 2 + 1, size * 3 + 3, rsc_occe_create, rsc_occe_delete, (void *) occ_ctx); #else occ_ctx->occx_ht = bht_new(size, size * 2 + 1); #endif if (NULL == occ_ctx->occx_ht) goto enomem; OCCFL_INIT(&occ_ctx->occx_fl_hd); occ_ctx->occx_lhs = sm_str_new(NULL, SM_CONF_IPV4, SM_MAX_CONF_STR); if (NULL == occ_ctx->occx_lhs) goto enomem; occ_ctx->occx_tag = sm_str_new(NULL, SM_CONF_TAG, SM_MAX_CONF_STR); if (NULL == occ_ctx->occx_tag) goto enomem; occ_ctx->occx_rhs = sm_str_new(NULL, SM_CONF_RHS, SM_MAX_CONF_STR); if (NULL == occ_ctx->occx_rhs) goto enomem; r = pthread_mutex_init(&occ_ctx->occx_mutex, SM_PTHREAD_MUTEXATTR); if (r != 0) { ret = sm_error_perm(SM_EM_OCC, r); goto error; } #if 0 // occ_ctx->occ_entries_max = size; // occ_ctx->occ_entries_lim = size; #endif #if OCC_CHECK occ_ctx->sm_magic = SM_OCCX_MAGIC; #endif *pocc_ctx = occ_ctx; return SM_SUCCESS; enomem: ret = sm_error_temp(SM_EM_OCC, ENOMEM); error: if (occ_ctx != NULL) /* just paranoia */ { if (occ_ctx->occx_ht != NULL) { #if DA_OCC_RSC rsc_free(occ_ctx->occx_ht); #else bht_destroy(occ_ctx->occx_ht, NULL, NULL); #endif } SM_STR_FREE(occ_ctx->occx_lhs); SM_STR_FREE(occ_ctx->occx_tag); SM_STR_FREE(occ_ctx->occx_rhs); sm_free_size(occ_ctx, sizeof(*occ_ctx)); } return ret; }