/* * Copyright (c) 2003-2006 Sendmail, Inc. and its suppliers. * All rights reserved. * * By using this file, you agree to the terms and conditions set * forth in the LICENSE file which can be found at the top level of * the sendmail distribution. */ #include "sm/generic.h" SM_RCSID("@(#)$Id: cleanup.c,v 1.62 2007/10/17 02:38:25 ca Exp $") #include "sm/error.h" #include "sm/assert.h" #include "sm/qmgr.h" #include "sm/qmgr-int.h" #include "sm/da.h" #include "qmgr.h" #include "log.h" /* ** Cleanup various data structures in QMGR. ** Remove items from caches etc that are "too old". ** This is necessary for many data structures if a request has been sent ** to another module (e.g., smtpc or smar) but no reply has been received ** (because the module crashed or some communication problem or ...) ** ** ToDo: error handling: how to deal with various errors? ** I/O and BDB errors: stop ** resource errors: "throttle" system and try again later? ** that requires that the actions can be "undone" or at least repeated ** later on. ** These seems to be too complicated :-( so let's stop for now... ** This should be not too bad because: ** 1. the system shouldn't run out of resources... ** 2. these cleanup functions are not supposed to be doing much (or anything ** at all) ** That means the usual "update after delivery" code path is more likely ** to cause a restart than this. */ #define QCLEANUP_IQDB 0 #define QCLEANUP_AQWAIT 1 #define QCLEANUP_EDB 2 #define QCLEANUP_LIMIT 3 struct qcleanup_ctx_S { sm_magic_T sm_magic; pthread_mutex_t qc_mutex; qmgr_ctx_P qc_qmgr_ctx; /* pointer back to main ctx */ uint qc_flags; /* things to do */ time_T qc_sweep[QCLEANUP_LIMIT]; #define qc_ibdb_sweep qc_sweep[QCLEANUP_IQDB] #define qc_aq_sweep qc_sweep[QCLEANUP_AQWAIT] #define qc_edb_sweep qc_sweep[QCLEANUP_EDB] }; /* qcleanup_ctx flags */ #define QCLEANUP_FL_NONE 0x00 #define QCLEANUP_FL_IQDB 0x01 #define QCLEANUP_FL_AQWAIT 0x02 #define QCLEANUP_FL_EDB 0x04 #define QCLEANUP_SET_FLAG(qcleanup_ctx, fl) (qcleanup_ctx)->qc_flags |= (fl) #define QCLEANUP_CLR_FLAG(qcleanup_ctx, fl) (qcleanup_ctx)->qc_flags &= ~(fl) #define QCLEANUP_IS_FLAG(qcleanup_ctx, fl) (((qcleanup_ctx)->qc_flags & (fl)) != 0) #define QCLEANUP_IS_FL(flags, fl) (((flags) & (fl)) != 0) #define SM_IS_QCLEANUP(qcleanup_ctx) SM_REQUIRE_ISA((qcleanup_ctx), SM_QCLEANUP_CTX_MAGIC) /* ** QCLEANUP_CTX_FREE -- free qcleanup_ctx ** ** Parameters: ** qcleanup_ctx -- qmgr cleanup context ** ** Returns: ** SM_SUCCESS (except for mutex destroy error) ** ** Last code review: 2004-12-08 18:15:21 ** Last code change: */ sm_ret_T qcleanup_ctx_free(qcleanup_ctx_P qcleanup_ctx) { int r; sm_ret_T ret; if (NULL == qcleanup_ctx) return SM_SUCCESS; ret = SM_SUCCESS; SM_IS_QCLEANUP(qcleanup_ctx); r = pthread_mutex_destroy(&qcleanup_ctx->qc_mutex); if (r != 0) ret = sm_error_perm(SM_EM_AQ, r); qcleanup_ctx->sm_magic = SM_MAGIC_NULL; SM_FREE_SIZE(qcleanup_ctx, sizeof(*qcleanup_ctx)); return ret; } /* ** QCLEANUP_CTX_NEW -- create a new qcleanup_ctx ** ** Parameters: ** qmgr_ctx -- QMGR context ** pqcleanup_ctx -- (pointer to) qmgr cleanup context (output) ** ** Returns: ** usual sm_error code; ENOMEM ** ** Last code review: 2004-12-08 18:16:40 ** Last code change: */ sm_ret_T qcleanup_ctx_new(qmgr_ctx_P qmgr_ctx, qcleanup_ctx_P *pqcleanup_ctx) { int r; sm_ret_T ret; time_T time_now; qcleanup_ctx_P qcleanup_ctx; SM_REQUIRE(pqcleanup_ctx != NULL); qcleanup_ctx = (qcleanup_ctx_P) sm_zalloc(sizeof(*qcleanup_ctx)); if (NULL == qcleanup_ctx) { ret = sm_error_temp(SM_EM_AQ, ENOMEM); goto error; } r = pthread_mutex_init(&qcleanup_ctx->qc_mutex, SM_PTHREAD_MUTEXATTR); if (r != 0) { ret = sm_error_perm(SM_EM_AQ, r); goto error; } QCLEANUP_SET_FLAG(qcleanup_ctx, QCLEANUP_FL_IQDB); QCLEANUP_SET_FLAG(qcleanup_ctx, QCLEANUP_FL_EDB); qcleanup_ctx->qc_qmgr_ctx = qmgr_ctx; time_now = evthr_time(qmgr_ctx->qmgr_ev_ctx); qcleanup_ctx->qc_ibdb_sweep = time_now + 60; /* TIMEOUT: CONF? */ qcleanup_ctx->qc_edb_sweep = time_now + qmgr_ctx->qmgr_cnf.q_cnf_edb_cnf.edbcnf_chkpt_delay; qcleanup_ctx->qc_aq_sweep = TIME_T_MAX; qcleanup_ctx->sm_magic = SM_QCLEANUP_CTX_MAGIC; *pqcleanup_ctx = qcleanup_ctx; return SM_SUCCESS; error: /* complain? */ SM_FREE_SIZE(qcleanup_ctx, sizeof(*qcleanup_ctx)); return ret; } /* ** QMGR_AQ_CLEANUP -- Clean up AQ ** ** Parameters: ** qmgr_ctx -- QMGR context ** pnextrunval -- (pointer to) next run (output) ** ** Returns: ** usual sm_error code ** ** Side Effects: must abort on error (see qda_upd_ta_rcpt_stat()) ** ** Locking: locks entire edbc and aq_ctx during operation ** (in that order) returns unlocked ** ** Last code review: 2005-04-10 05:10:52; see comments ** Last code change: */ static sm_ret_T qmgr_aq_cleanup(qmgr_ctx_P qmgr_ctx, timeval_T *pnextrunval) { sm_ret_T ret, qda_upd_fl; uint err_st, n, fct_state; int delay_next_try, r; aq_ctx_P aq_ctx; aq_rcpt_P aq_rcpt, aq_rcpt_nxt, aq_rcpt_h; aq_ta_P aq_ta; time_T time_now; ibdb_req_hd_T ibdb_req_hd; edb_req_hd_T edb_req_hd; sessta_id_T da_ta_id; SM_IS_QMGR_CTX(qmgr_ctx); SM_REQUIRE(pnextrunval != NULL); aq_ctx = qmgr_ctx->qmgr_aq; SM_IS_AQ(aq_ctx); /* function state flags */ #define FST_EDBC_LCK 0x01 /* edbc is locked */ #define FST_AQ_LCK 0x02 /* aq is locked */ fct_state = 0; time_now = evthr_time(qmgr_ctx->qmgr_ev_ctx); da_ta_id[0] = '\0'; err_st = 0; qda_upd_fl = 0; ret = SM_SUCCESS; IBDBREQL_INIT(&ibdb_req_hd); delay_next_try = 0; r = pthread_mutex_lock(&qmgr_ctx->qmgr_edbc->edbc_mutex); SM_LOCK_OK(r); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_CLEANUP, QM_LMOD_CLEANUP, SM_LOG_CRIT, 0, "sev=CRIT, func=qmgr_aq_cleanup, lock_edbc=%m", sm_err_perm(r)); return sm_error_perm(SM_EM_Q_CLEAN, r); } SM_SET_FLAG(fct_state, FST_EDBC_LCK); r = pthread_mutex_lock(&aq_ctx->aq_mutex); SM_LOCK_OK(r); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_CLEANUP, QM_LMOD_CLEANUP, SM_LOG_CRIT, 0, "sev=CRIT, func=qmgr_aq_cleanup, lock_aq=%m", sm_err_perm(r)); (void) pthread_mutex_unlock(&qmgr_ctx->qmgr_edbc->edbc_mutex); return sm_error_temp(SM_EM_Q_CLEAN, r); } SM_SET_FLAG(fct_state, FST_AQ_LCK); EDBREQL_INIT(&edb_req_hd); /* ** This loop is "ugly" because some function "inside" can ** remove entries from the list, hence it is currently impossible ** to set the "next" entry correctly. ** the loop may start over again at the first element, hence a ** limit of entries^2 is set. */ n = aq_ctx->aq_entries * aq_ctx->aq_entries; /* overflow? */ aq_rcpt_h = NULL; for (aq_rcpt = AQR_WAITQ_FIRST(aq_ctx); aq_rcpt != AQR_WAITQ_END(aq_ctx) && aq_rcpt->aqr_expire <= time_now; aq_rcpt = aq_rcpt_nxt, --n) { SM_IS_AQ_RCPT(aq_rcpt); if (0 == n) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_CLEANUP, QM_LMOD_CLEANUP, SM_LOG_ERR, 1, "sev=ERROR, func=qmgr_aq_cleanup, entries=%u, status=endless_loop" , aq_ctx->aq_entries); break; } aq_rcpt_nxt = AQR_WAITQ_NEXT(aq_rcpt); aq_ta = aq_rcpt->aqr_ss_ta; /* some checks for now because sched also does timeouts */ if (AQR_IS_FLAG(aq_rcpt, AQR_FL_TMOUT)) { /* remember this element, may start from here again */ aq_rcpt_h = aq_rcpt; continue; } AQR_SET_FLAG(aq_rcpt, AQR_FL_TMOUT|AQR_FL_STAT_NEW|AQR_FL_ERRST_UPD); /* DA or AR timeout? */ if (AQR_IS_FLAG(aq_rcpt, AQR_FL_ARF)) { if (!AQR_IS_FLAG(aq_rcpt, AQR_FL_ERRST_UPD)) aq_rcpt->aqr_status_new = SMTP_AR_TEMP; err_st = DA_AR_ERR; QM_LEV_DPRINTFC(QDC_CLEANUP, 3, (QM_DEBFP, "sev=INFO, func=qmgr_aq_cleanup, status=AR_F, rcpt=%p\n", aq_rcpt)); } else if (AQR_IS_FLAG(aq_rcpt, AQR_FL_WAIT4UPD)) { aq_rcpt->aqr_status_new = SMTP_TMO_DA; err_st = DA_AQ_TMO_ERR; QM_LEV_DPRINTFC(QDC_CLEANUP, 3, (QM_DEBFP, "sev=INFO, func=qmgr_aq_cleanup, status=timeout_for_DA, rcpt=%p, now(%ld)-entered(%ld)=%ld\n", aq_rcpt, (long) time_now, (long) aq_rcpt->aqr_entered, (long) (time_now - aq_rcpt->aqr_entered))); } else if (!AQR_IS_FLAG(aq_rcpt, AQR_FL_SCHED)) { aq_rcpt->aqr_status_new = SMTP_TMO_SCHED; err_st = DA_AQ_TMO_ERR; QM_LEV_DPRINTFC(QDC_CLEANUP, 3, (QM_DEBFP, "sev=INFO, func=qmgr_aq_cleanup, status=timeout, rcpt=%p, now(%ld)-entered(%ld)=%ld\n", aq_rcpt, (long) time_now, (long) aq_rcpt->aqr_entered, (long) (time_now - aq_rcpt->aqr_entered))); } else SM_ASSERT(0); aq_rcpt->aqr_err_st = err_st; if (!AQR_IS_FLAG(aq_rcpt, AQR_FL_WAIT4UPD)) AQR_DA_INIT(aq_rcpt); ++aq_ctx->aq_t_da; /* decremented in q_upd_rcpt_stat() */ QM_LEV_DPRINTFC(QDC_CLEANUP, 9, (QM_DEBFP, "sev=INFO, func=qmgr_aq_cleanup, status=%d, now=%ld, timeout=%ld, start=%ld, expired=%d\n", aq_rcpt, (long) time_now, (long) qmgr_ctx->qmgr_cnf.q_cnf_tmo_return, (long) aq_rcpt->aqr_st_time, (aq_rcpt->aqr_st_time + qmgr_ctx->qmgr_cnf.q_cnf_tmo_return < time_now))); /* Too long in queue? */ if (aq_rcpt->aqr_st_time + qmgr_ctx->qmgr_cnf.q_cnf_tmo_return < time_now) { AQR_SET_FLAG(aq_rcpt, AQR_FL_PERM|AQR_FL_DSN_TMT); /* Masks "original" error: really do this?? */ err_st = DA_DQ_TMO_ERR; QM_LEV_DPRINTFC(QDC_CLEANUP, 1, (QM_DEBFP, "func=qmgr_aq_cleanup, status=timeout, aq_rcpt=%p, now-aqr_st_time=%ld\n", aq_rcpt, (long) (time_now - aq_rcpt->aqr_st_time))); } /* ** This can call qmgr_set_aq_cleanup()! */ if (AQR_IS_FLAG(aq_rcpt, AQR_FL_WAIT4UPD)) { uint sc_id; dadb_entry_P dadb_entry; qsc_ctx_P qsc_ctx; /* ** Need to remove entire DA transaction. ** Note: this can remove other entries from ** waitq, hence aq_rcpt_nxt might be invalid ** afterwards, hence it is set to the begin of ** the list again or the next element after an ** "untouched" one (see above). ** XXX Note: if an "untouched" element is ** removed by qda_upd_ta_rcpt_stat() then this breaks! ** ** This requires that the elements are really removed, ** otherwise it's an endless loop. ** ** Could we pass a pointer to the next element to the ** function and have it changed if it is removed by ** the function? that could be pretty complicated... */ ret = SMTPC_GETCID(aq_rcpt->aqr_da_ta_id, sc_id); SM_ASSERT(1 == ret); SM_ASSERT(sc_id >= 0 && sc_id <= QM_N_SC_GLI(qmgr_ctx)); ret = qsc_ctx_find(qmgr_ctx, sc_id, &qsc_ctx); SM_ASSERT(sm_is_success(ret)); SM_IS_QSC_CTX(qsc_ctx); ret = dadb_ta_find(qsc_ctx->qsc_dadb_ctx, aq_rcpt->aqr_da_ta_id, &dadb_entry); SM_ASSERT(sm_is_success(ret)); SM_IS_DADBE(dadb_entry); DADBE_SET_FLAG(dadb_entry, DADBE_FL_TA_CL); SESSTA_COPY(da_ta_id , aq_rcpt->aqr_da_ta_id); ret = qda_upd_ta_rcpt_stat(qmgr_ctx, da_ta_id, aq_rcpt->aqr_status_new, err_st, qsc_ctx->qsc_dadb_ctx, dadb_entry, aq_ta, aq_rcpt, &edb_req_hd, &ibdb_req_hd, NULL, &delay_next_try); da_ta_id[0] = '\0'; if (NULL == aq_rcpt_h) aq_rcpt_nxt = AQR_WAITQ_FIRST(aq_ctx); else aq_rcpt_nxt = AQR_WAITQ_NEXT(aq_rcpt_h); } else { ret = qda_upd_ta_rcpt_stat(qmgr_ctx, da_ta_id, aq_rcpt->aqr_status_new, err_st, NULL, NULL, /* dadb_ctx, dadb_entry */ aq_ta, aq_rcpt, &edb_req_hd, &ibdb_req_hd, NULL, &delay_next_try); } if (sm_is_success(ret)) qda_upd_fl |= ret; else { /* what to do here?? stop on "fatal" error */ sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_CLEANUP, QM_LMOD_CLEANUP, SM_LOG_ERR, 1, "sev=ERROR, func=qmgr_aq_cleanup, da_ta=%s, aqr_flags=%#x, qda_upd_ta_rcpt_stat=%m" , aq_rcpt->aqr_da_ta_id, aq_rcpt->aqr_flags , ret); if (QM_DA_STAT_FATAL(ret)) /* other errors? */ goto error; } } if (aq_rcpt != AQR_WAITQ_END(aq_ctx)) { timeval_T nextrunval; nextrunval.tv_sec = aq_rcpt->aqr_expire; nextrunval.tv_usec = 0; if (timercmp(&nextrunval, pnextrunval, <)) { pnextrunval->tv_sec = aq_rcpt->aqr_expire; pnextrunval->tv_usec = 0; } ret = qmgr_set_aq_cleanup(qmgr_ctx->qmgr_cleanup_ctx, aq_rcpt->aqr_expire, false); if (sm_is_err(ret)) QM_LEV_DPRINTFC(QDC_CLEANUP, 1, (QM_DEBFP, "sev=INFO, func=qmgr_aq_cleanup, qmgr_set_aq_cleanup=%r\n", ret)); } r = pthread_mutex_unlock(&aq_ctx->aq_mutex); SM_ASSERT(0 == r); SM_CLR_FLAG(fct_state, FST_AQ_LCK); /* Always? Note: this also overrides ret! */ ret = edb_wr_status(qmgr_ctx->qmgr_edb, &edb_req_hd); r = pthread_mutex_unlock(&qmgr_ctx->qmgr_edbc->edbc_mutex); SM_ASSERT(0 == r); SM_CLR_FLAG(fct_state, FST_EDBC_LCK); QM_LEV_DPRINTF(sm_is_err(ret) ? 0 : 3, (QM_DEBFP, "sev=INFO, func=qmgr_aq_cleanup, edb_wr_status=%r\n", ret)); /* ** If writing to DEFEDB fails we need to ** cancel the wr requests for IBDB. */ if (sm_is_success(ret)) { ret = ibdb_wr_status(qmgr_ctx->qmgr_ibdb, &ibdb_req_hd); if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_CLEANUP, QM_LMOD_CLEANUP, SM_LOG_ERR, 1, "sev=ERROR, func=qmgr_aq_cleanup, ibdb_wr_status=%m", ret); goto error; } } else { (void) ibdb_req_cancel(qmgr_ctx->qmgr_ibdb, &ibdb_req_hd); goto error; } QM_LEV_DPRINTF(sm_is_err(ret) ? 0 : 3, (QM_DEBFP, "sev=INFO, func=qmgr_aq_cleanup, ibdb_update=%r\n", ret)); /* maybe activate scheduler */ if (delay_next_try != 0 && qmgr_ctx->qmgr_sched != NULL) { timeval_T nowt, sleept, delayval; ret = evthr_timeval(qmgr_ctx->qmgr_ev_ctx, &nowt); delayval.tv_usec = 0; delayval.tv_sec = delay_next_try; timeradd(&nowt, &delayval, &sleept); /* Activate scheduler, but how? Always or use some test? */ ret = evthr_new_sl(qmgr_ctx->qmgr_sched, sleept, false); if (sm_is_err(ret)) QM_LEV_DPRINTFC(QDC_CLEANUP, 0, (QM_DEBFP, "sev=ERROR, func=qmgr_aq_cleanup, evthr_new_sl=%r\n", ret)); } /* HACK notify qar task */ if (QDA_ACT_SMAR(qda_upd_fl)) { sm_evthr_task_P qar_tsk; qar_ctx_P qar_ctx; qar_ctx = qmgr_ctx->qmgr_ar_ctx; SM_IS_QAR_CTX(qar_ctx); qar_tsk = qmgr_ctx->qmgr_ar_tsk; ret = evthr_en_wr(qar_tsk); QM_LEV_DPRINTFC(QDC_CLEANUP, 5, (QM_DEBFP, "func=qmgr_aq_cleanup, enable_smar_wr=%r\n", ret)); } return ret; error: /* cleanup? */ if (SM_IS_FLAG(fct_state, FST_AQ_LCK)) { r = pthread_mutex_unlock(&aq_ctx->aq_mutex); SM_ASSERT(0 == r); SM_CLR_FLAG(fct_state, FST_AQ_LCK); } if (SM_IS_FLAG(fct_state, FST_EDBC_LCK)) { r = pthread_mutex_unlock(&qmgr_ctx->qmgr_edbc->edbc_mutex); SM_ASSERT(0 == r); SM_CLR_FLAG(fct_state, FST_EDBC_LCK); } return ret; #undef FST_EDBC_LCK #undef FST_AQ_LCK } /* CONF */ #define CLEANUP_TMO_TA 7200 /* remove TA entries that are older than this */ #define CLEANUP_TMO_SE 9000 /* remove SE entries that are older than this */ #define CLEANUP_DELAY 3600 /* run task every hour */ #if IQDB_CLEANUP /* ** IQDB_CLEANUP_CB -- clean up IQDB (callback for "walk" function) ** ** Parameters: ** key -- ignored ** value -- value in IQDB ** ctx -- qmgr context ** walk_ctx -- ignored ** type -- type of entry ** ** Returns: ** none ** ** Locking: none (up to caller) ** ** Last code review: 2005-04-10 05:32:05 ** Last code change: */ static void iqdb_cleanup_cb(const char *key, const void *value, void *ctx, void *walk_ctx, uint type) { time_T time_now; qss_ta_P qss_ta; qss_sess_P qss_sess; qmgr_ctx_P qmgr_ctx; qmgr_ctx = (qmgr_ctx_P) ctx; SM_IS_QMGR_CTX(qmgr_ctx); time_now = evthr_time(qmgr_ctx->qmgr_ev_ctx); switch (type) { case IQDB_T_SESS: qss_sess = (qss_sess_P) value; SM_IS_QS_SE(qss_sess); if (qss_sess->qsses_st_time + CLEANUP_TMO_SE > time_now) break; /* expired... */ sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_CLEANUP, QM_LMOD_CLEANUP, SM_LOG_INFO, 9, "sev=INFO, func=iqdb_cleanup_cb, qss_sess=%p, now=%7ld, entered=%7ld" , qss_sess , (long) time_now , (long) qss_sess->qsses_st_time); (void) iqdb_session_rm(qmgr_ctx->qmgr_iqdb, qss_sess->qsses_id, SMTP_STID_SIZE, THR_NO_LOCK); (void) qss_sess_free(qss_sess); break; case IQDB_T_TA: qss_ta = (qss_ta_P) value; SM_IS_QS_TA(qss_ta); if (qss_ta->qssta_st_time + CLEANUP_TMO_TA > time_now) break; if (QSS_TA_IS_FLAG(qss_ta, QSS_TA_FL_AQ)) { /* still in AQ? */ sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_CLEANUP, QM_LMOD_CLEANUP, SM_LOG_WARN, 7, "sev=WARN, func=iqdb_cleanup_cb, qss_ta=%p, flags=%#x, status=in_AQ, now=%7ld, entered=%7ld" , qss_ta, qss_ta->qssta_flags , (long) time_now , (long) qss_ta->qssta_st_time); break; } /* expired... */ sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_CLEANUP, QM_LMOD_CLEANUP, SM_LOG_INFO, 9, "sev=INFO, func=iqdb_cleanup_cb, qss_ta=%p, flags=%#x, now=%7ld, entered=%7ld" , qss_ta, qss_ta->qssta_flags , (long) time_now , (long) qss_ta->qssta_st_time); #if 0 qss_ta_abort(qss_ta); /* qss_ta_free() is sufficient for now */ #endif (void) qss_ta_free(qss_ta, false, 0, 0); break; #if 0 case IQDB_T_RCPT: /* will be removed when qss_ta is removed */ #endif } } #endif /* IQDB_CLEANUP */ /* ** QMGR_CLEANUP -- task to periodically clean up (IQDB, AQ, EDB) ** ** Parameters: ** tsk -- evthr task ** ** Returns: ** usual sm_error code ** ** Side Effects: terminate system on error ** ** Last code review: 2005-04-10 16:02:28 ** Last code change: */ sm_ret_T qmgr_cleanup(sm_evthr_task_P tsk) { int r; sm_ret_T ret; qcleanup_ctx_P qcleanup_ctx; qmgr_ctx_P qmgr_ctx; time_T nextrun, iqdbt, edbt, aqt; timeval_T nowtval, nextruntval, delaytval; SM_IS_EVTHR_TSK(tsk); qcleanup_ctx = (qcleanup_ctx_P) tsk->evthr_t_actx; SM_IS_QCLEANUP(qcleanup_ctx); qmgr_ctx = qcleanup_ctx->qc_qmgr_ctx; SM_IS_QMGR_CTX(qmgr_ctx); ret = evthr_timeval(qmgr_ctx->qmgr_ev_ctx, &nowtval); delaytval.tv_usec = 0; delaytval.tv_sec = CLEANUP_DELAY; /* TIMEOUT: CONF? */ timeradd(&nowtval, &delaytval, &nextruntval); r = pthread_mutex_lock(&qcleanup_ctx->qc_mutex); SM_LOCK_OK(r); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_CLEANUP, QM_LMOD_CLEANUP, SM_LOG_CRIT, 0, "sev=CRIT, func=qmgr_cleanup, lock=%d", r); goto error; } aqt = qcleanup_ctx->qc_aq_sweep; /* if AQ sweep is done now, then don't run it again */ if (aqt <= nowtval.tv_sec) qcleanup_ctx->qc_aq_sweep = TIME_T_MAX; /* if IQBD sweep is done now, then run it again after CLEANUP_DELAY */ iqdbt = qcleanup_ctx->qc_ibdb_sweep; if (iqdbt <= nowtval.tv_sec) qcleanup_ctx->qc_ibdb_sweep += CLEANUP_DELAY; /* if EDB sweep is done now, then run it again after delay */ edbt = qcleanup_ctx->qc_edb_sweep; if (edbt <= nowtval.tv_sec) qcleanup_ctx->qc_edb_sweep += qmgr_ctx->qmgr_cnf.q_cnf_edb_cnf.edbcnf_chkpt_delay; r = pthread_mutex_unlock(&qcleanup_ctx->qc_mutex); SM_ASSERT(0 == r); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_CLEANUP, QM_LMOD_CLEANUP, SM_LOG_ERR, 1, "sev=ERROR, func=qmgr_cleanup, unlock=%d", r); goto error; } nextrun = SM_MIN(aqt, iqdbt); if (nextrun <= nowtval.tv_sec) { nextrun = SM_MAX(aqt, iqdbt); if (nextrun <= nowtval.tv_sec) nextrun += CLEANUP_DELAY; } QM_LEV_DPRINTFC(QDC_CLEANUP, 1, (QM_DEBFP, "sev=INFO, func=qmgr_cleanup, nextrun=%7ld, aqt=%7ld, iqdbt=%7ld, edbt=%7ld, now=%7ld\n" , (long) nextrun, (long) aqt, (long) iqdbt, (long) edbt , (long) nowtval.tv_sec)); #if IQDB_CLEANUP IQDB cleanup has been disable temporarily as the locking of entries in IQDB is not entirely correct yet. 2006-04-17 15:57:37 /* clean up old entries in iqdb? */ r = iqdb_usage(qmgr_ctx->qmgr_iqdb); /* ** Threshold: activate IQDB cleanup only if it is exceeded ** Note: it might be useful to set this flags also in other places, ** e.g., the resource module (which is called when a threshold ** is exceeded). */ if (r > 80) /* THRESHOLD: CONF */ QCLEANUP_SET_FLAG(qcleanup_ctx, QCLEANUP_FL_IQDB); if (iqdbt <= nowtval.tv_sec && QCLEANUP_IS_FLAG(qcleanup_ctx, QCLEANUP_FL_IQDB)) { iqdb_walk(qmgr_ctx->qmgr_iqdb, iqdb_cleanup_cb, (void *) 0, THR_LOCK_UNLOCK); QCLEANUP_CLR_FLAG(qcleanup_ctx, QCLEANUP_FL_IQDB); } #endif if (aqt <= nowtval.tv_sec) { /* clean up old entries in AQ wait list */ sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_CLEANUP, QM_LMOD_CLEANUP, SM_LOG_INFO, 13, "sev=INFO, func=qmgr_cleanup, invoking=qmgr_aq_cleanup"); ret = qmgr_aq_cleanup(qmgr_ctx, &nextruntval); /* set sleep time?? */ if (sm_is_err(ret)) goto error; } /* ** Clean up defedb? ** Try to do this only in "quiet" times... how to determine? ** No entries in AQ? Last write to DEFEDB some time ago? ** Nothing in wait for update list? */ if (edbt <= nowtval.tv_sec && QCLEANUP_IS_FLAG(qcleanup_ctx, QCLEANUP_FL_EDB)) { time_T nextedb; uint32_t delay; ret = edb_chkpt(qmgr_ctx->qmgr_edb); if (SM_IS_PERM_ERR(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_CLEANUP, QM_LMOD_CLEANUP, SM_LOG_ERROR, 3, "sev=ERROR, func=qmgr_cleanup, edb_chkpt=%m" , ret); goto error; } delay = qmgr_ctx->qmgr_cnf.q_cnf_edb_cnf.edbcnf_chkpt_delay; QM_LEV_DPRINTFC(QDC_CLEANUP, 3, (QM_DEBFP, "sev=INFO, func=qmgr_cleanup, edb_chkpt=%r, delay=%ld\n" , ret, (long) delay)); if (delay > 0) { nextedb = nowtval.tv_sec + delay; if (nextrun > nextedb) nextrun = nextedb; } } if (nextrun < nextruntval.tv_sec) nextruntval.tv_sec = nextrun; tsk->evthr_t_sleep = nextruntval; /* other things to clean up? */ return EVTHR_SLPQ; error: /* terminate on error... */ return EVTHR_TERM; } /* ** QMGR_SET_CLEANUP -- set options for cleanup and change wakeup time ** for cleanup task if necessary and requested. ** ** Parameters: ** qcleanup_ctx -- qmgr cleanup context ** which -- which cleanup to run ** when -- when to run cleanup (0: disable) ** changewakeup -- change wakeup time for cleanup task? ** ** Returns: ** usual sm_error code; ENOMEM, et.al. (evthr_new_sl()) ** ** Side Effects: see description above ** ** Last code review: 2005-03-23 02:05:06 ** Last code change: */ sm_ret_T qmgr_set_cleanup(qcleanup_ctx_P qcleanup_ctx, time_T when, uint which, bool changewakeup) { int r; uint u; sm_ret_T ret; qmgr_ctx_P qmgr_ctx; timeval_T sleept; time_T nextrun; SM_IS_QCLEANUP(qcleanup_ctx); qmgr_ctx = qcleanup_ctx->qc_qmgr_ctx; SM_IS_QMGR_CTX(qmgr_ctx); SM_REQUIRE(which < QCLEANUP_LIMIT); r = pthread_mutex_lock(&qcleanup_ctx->qc_mutex); SM_LOCK_OK(r); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_CLEANUP, QM_LMOD_CLEANUP, SM_LOG_CRIT, 0, "sev=CRIT, func=qmgr_set_cleanup, lock=%d", r); ret = sm_error_temp(SM_EM_Q, r); goto error; } QM_LEV_DPRINTFC(QDC_CLEANUP, 5, (QM_DEBFP, "sev=INFO, func=qmgr_set_cleanup, now=%7ld, when=%7ld, aq_sweep=%7ld, ibdb_sweep=%7ld, edb_sweep=%7ld\n" , (long) evthr_time(qmgr_ctx->qmgr_ev_ctx) , (long) when , (long) qcleanup_ctx->qc_aq_sweep , (long) qcleanup_ctx->qc_ibdb_sweep , (long) qcleanup_ctx->qc_edb_sweep )); ret = SM_SUCCESS; if (when == (time_T) 0) { QCLEANUP_CLR_FLAG(qcleanup_ctx, QCLEANUP_FL_AQWAIT); nextrun = TIME_T_MAX; for (u = 0; u < QCLEANUP_LIMIT; u++) { if (u != which) { nextrun = SM_MIN(nextrun, qcleanup_ctx->qc_sweep[u]); } } sleept.tv_sec = nextrun; sleept.tv_usec = 0; if (changewakeup) ret = evthr_new_sl(qmgr_ctx->qmgr_tsk_cleanup, sleept, true); qcleanup_ctx->qc_aq_sweep = TIME_T_MAX; } else { QCLEANUP_SET_FLAG(qcleanup_ctx, QCLEANUP_FL_AQWAIT); nextrun = when; for (u = 0; u < QCLEANUP_LIMIT; u++) { if (u != which) { nextrun = SM_MIN(nextrun, qcleanup_ctx->qc_sweep[u]); } } sleept.tv_sec = nextrun; sleept.tv_usec = 0; if (changewakeup) ret = evthr_new_sl(qmgr_ctx->qmgr_tsk_cleanup, sleept, false); QM_LEV_DPRINTFC(QDC_CLEANUP, 5, (QM_DEBFP, "sev=INFO, func=qmgr_set_cleanup, nextrun=%7ld, ret=%r\n" , (long) nextrun, ret)); qcleanup_ctx->qc_aq_sweep = when; } if (sm_is_err(ret)) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_CLEANUP, QM_LMOD_CLEANUP, SM_LOG_CRIT, 0, "sev=CRIT, func=qmgr_set_cleanup, evthr_new_sl=%m", ret); } r = pthread_mutex_unlock(&qcleanup_ctx->qc_mutex); SM_ASSERT(0 == r); if (r != 0) { sm_log_write(qmgr_ctx->qmgr_lctx, QM_LCAT_CLEANUP, QM_LMOD_CLEANUP, SM_LOG_ERR, 1, "sev=ERROR, func=qmgr_set_cleanup, unlock=%d", r); ret = sm_error_temp(SM_EM_Q, r); goto error; } return ret; error: /* cleanup? currently none needed */ return ret; } /* ** QMGR_SET_AQ_CLEANUP -- set options for AQ cleanup and change wakeup time ** for cleanup task if necessary and requested. ** ** Parameters: ** qcleanup_ctx -- qmgr cleanup context ** when -- when to run AQ cleanup (0: disable) ** changewakeup -- change wakeup time for cleanup task? ** ** Returns: ** usual sm_error code ** ** Side Effects: see qmgr_set_cleanup() ** ** Last code review: 2005-03-23 04:42:38 ** Last code change: */ sm_ret_T qmgr_set_aq_cleanup(qcleanup_ctx_P qcleanup_ctx, time_T when, bool changewakeup) { return qmgr_set_cleanup(qcleanup_ctx, when, QCLEANUP_AQWAIT, changewakeup); }