/* * Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers. * All rights reserved. * * By using this file, you agree to the terms and conditions set * forth in the LICENSE file which can be found at the top level of * the sendmail distribution. */ #include "sm/generic.h" SM_RCSID("@(#)$Id: tasks.c,v 1.82 2007/01/14 00:04:10 ca Exp $") #include "sm/error.h" #include "sm/assert.h" #include "sm/memops.h" #include "sm/heap.h" #include "sm/signal.h" #include "sm/evthr.h" #include "sm/io.h" #include "evthr-int.h" #include "log.h" /* ** EVTHR_WAKEUP -- wakeup main loop ("scheduler") ** ** Parameters: ** evthr_ctx -- evthr context ** ** Returns: ** usual sm_error code; write() errno ** ** Locking: ** locks waitq ** ** Last code review: 2005-03-23 00:14:51; see below ** Last code change: */ static sm_ret_T evthr_wakeup(sm_evthr_ctx_P evthr_ctx) { int r; char c; sm_ret_T ret; SM_IS_EVTHR_CTX(evthr_ctx); ret = SM_SUCCESS; r = pthread_mutex_lock(&evthr_ctx->evthr_c_waitqmut); SM_LOCK_OK(r); if (r != 0) { NOTE(NOT_REACHED) return sm_error_perm(SM_EM_EVTHR, r); } if (!EVTHR_IS_WAKEUP(evthr_ctx)) { c = EVTHR_CONT; r = write(wrpipe(evthr_ctx), (void *) &c, 1); if (r != 1) { /* ** What to do on error? Let's hope scheduler is ** started anyway (otherwise the system could ** theoretically deadlock). */ ret = sm_error_perm(SM_EM_EVTHR, (errno == 0) ? EIO : errno); DPRINTF(0, (stderr, "sev=ERROR, func=evthr_wakeup, write=%c, failed=%d\n", c, r)); sm_log_write(evthr_ctx->evthr_c_lctx, EVTHR_LCAT_MISC, EVTHR_LMOD_MISC, SM_LOG_CRIT, 0, "sev=CRIT, func=evthr_wakeup, write=%d, error=%m", r, ret); } else { EVTHR_SET_WAKEUP(evthr_ctx); DPRINTF(8, (stderr, "func=evthr_wakeup, status=wakeup\n")); } } r = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut); SM_ASSERT(r == 0); if (r != 0) ret = sm_error_perm(SM_EM_EVTHR, r); return ret; } /* ** EVTHR_ADD_REQ -- add a change request for a task ** ** Parameters: ** evthr_ctx -- evthr context ** task -- task to change ** rqflags -- new request flags ** slpt -- new sleep time (iff change is set accordingly) ** change -- NO/YES/LESS ((don't) change timeout or shorten it) ** ** Returns: ** usual sm_error code; ENOMEM ** ** Side Effects: none on error ** ** Locking: ** locks evthr_c_reqmut ** ** Last code review: 2005-03-23 00:08:51 ** Last code change: */ static sm_ret_T evthr_add_req(sm_evthr_ctx_P evthr_ctx, sm_evthr_task_P task, int rqflags, timeval_P slpt, int chg_time) { int r; bool found; sm_evthr_req_P req; SM_IS_EVTHR_CTX(evthr_ctx); SM_IS_EVTHR_TSK(task); r = pthread_mutex_lock(&evthr_ctx->evthr_c_reqmut); SM_LOCK_OK(r); if (r != 0) { NOTE(NOT_REACHED) DPRINTF(0, (stderr, "sev=ERROR, func=evthr_add_req, lock=%d\n", r)); return sm_error_perm(SM_EM_EVTHR, r); } found = false; for (req = EVTHR_REQ_FIRST(evthr_ctx); req != EVTHR_REQ_END(evthr_ctx); req = EVTHR_REQ_NEXT(req)) { if (req->evthr_r_task == NULL) { ++evthr_ctx->evthr_c_nreqs; found = true; break; } if (task == req->evthr_r_task) { /* question: overwrite or | ? */ DPRINTF(5, (stderr, "func=evthr_add_req, overwrite existing request: %x -> %x\n", req->evthr_r_rqevf, rqflags)); found = true; break; } } if (!found) { req = evthr_req_new(evthr_ctx); found = req != NULL; if (found) ++evthr_ctx->evthr_c_nreqs; } if (found) { req->evthr_r_task = task; if (rqflags != 0) req->evthr_r_rqevf = rqflags; if (chg_time != EVTHR_CHG_TIME_NO) { req->evthr_r_sleep = *slpt; req->evthr_r_chge = chg_time; } } else { DPRINTF(0, (stderr, "sev=ERROR, func=evthr_add_req, change request for task=%lx failed\n", (long) task)); } r = pthread_mutex_unlock(&evthr_ctx->evthr_c_reqmut); if (r != 0) DPRINTF(0, (stderr, "sev=ERROR, func=evthr_add_req, unlock=%d\n", r)); SM_ASSERT(r == 0); return found ? SM_SUCCESS : sm_error_perm(SM_EM_EVTHR, ENOMEM); } /* ** EVTHR_WAITQ_APP -- append task to waitq, wake up main loop ("scheduler") ** ** Parameters: ** task -- evthr task context ** ** Returns: ** usual sm_error code ** ** Locking: ** locks waitq ** ** Todo: deal better with failures, see for example EVTHR_EV_WWQ. */ sm_ret_T evthr_waitq_app(sm_evthr_task_P task) { int r; sm_evthr_ctx_P evthr_ctx; #if EVTHR_PARANOIA sm_evthr_task_P task_next; #endif SM_IS_EVTHR_TSK(task); #if EVTHR_PARANOIA /* HACK! Make sure the task is not in any queue; should use a macro */ SM_REQUIRE(task->evthr_t_state == 0); #endif evthr_ctx = task->evthr_t_ctx; SM_IS_EVTHR_CTX(evthr_ctx); EVTHR_WANTS_INQ(task, EVTHR_EV_WWQ); r = pthread_mutex_lock(&evthr_ctx->evthr_c_waitqmut); SM_LOCK_OK(r); if (r != 0) { NOTE(NOT_REACHED) goto error; } #if EVTHR_PARANOIA > 1 EVTHR_WAITQ_LOOP(evthr_ctx, task_next) SM_ASSERT(task != task_next); #endif EVTHR_WAITQ_APP(evthr_ctx, task); #if SM_LOCK_TASK r = pthread_mutex_unlock(&task->evthr_t_mutex); if (r != 0) { DPRINTF(0, (stderr, "sev=ERROR, func=evthr_waitq_app, tsk=0x%p, unlock=%d\n", task, r)); } SM_ASSERT(r == 0); #endif /* SM_LOCK_TASK */ /* call evthr_wakeup()?? would require a variant that doesn't lock waitq */ if (!EVTHR_IS_WAKEUP(evthr_ctx)) { char c; c = EVTHR_CONT; r = write(wrpipe(evthr_ctx), (void *) &c, 1); if (r != 1) sm_log_write(evthr_ctx->evthr_c_lctx, EVTHR_LCAT_MISC, EVTHR_LMOD_MISC, SM_LOG_CRIT, 0, "sev=CRIT, func=evthr_waitq_app, write=%d, error=%d", r, errno); else { EVTHR_SET_WAKEUP(evthr_ctx); DPRINTF(8, (stderr, "func=evthr_waitq_app, status=wakeup\n")); } } EVTHR_IS_INQ(task, EVTHR_EV_IWQ); r = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut); SM_ASSERT(r == 0); if (r != 0) goto error; return SM_SUCCESS; error: return sm_error_perm(SM_EM_EVTHR, r); /* no good! */ } /* ** EVTHR_EN_WR -- enable WR check for task ** ** Parameters: ** task -- evthr task context ** ** Returns: ** usual sm_error code ** ** Last code review: ** Last code change: */ sm_ret_T evthr_en_wr(sm_evthr_task_P task) { sm_ret_T ret; sm_evthr_ctx_P evthr_ctx; SM_IS_EVTHR_TSK(task); evthr_ctx = task->evthr_t_ctx; SM_IS_EVTHR_CTX(evthr_ctx); ret = evthr_add_req(evthr_ctx, task, evthr_r_yes(EVTHR_EV_WR), NULL, EVTHR_CHG_TIME_NO); if (sm_is_err(ret)) return ret; return evthr_wakeup(evthr_ctx); } /* ** EVTHR_A2DEL -- ask to delete task ** ** Parameters: ** task -- evthr task context ** ** Returns: ** usual sm_error code ** ** Last code review: ** Last code change: 2005-09-23 20:42:46 */ sm_ret_T evthr_a2del(sm_evthr_task_P task) { sm_ret_T ret; sm_evthr_ctx_P evthr_ctx; SM_IS_EVTHR_TSK(task); evthr_ctx = task->evthr_t_ctx; SM_IS_EVTHR_CTX(evthr_ctx); ret = evthr_add_req(evthr_ctx, task, evthr_r_yes(EVTHR_EV_DEL), NULL, EVTHR_CHG_TIME_NO); if (sm_is_err(ret)) return ret; return evthr_wakeup(evthr_ctx); } /* ** EVTHR_NEW_SL -- change SL timeout for task ** ** Parameters: ** task -- evthr task context ** slpt -- sleep time (absolute time) ** change -- always change timeout or only shorten it? ** ** Returns: ** usual sm_error code; ENOMEM, et.al. ** ** Last code review: 2005-03-23 02:00:37 ** Last code change: */ sm_ret_T evthr_new_sl(sm_evthr_task_P task, timeval_T slpt, bool change) { sm_ret_T ret; sm_evthr_ctx_P evthr_ctx; SM_IS_EVTHR_TSK(task); evthr_ctx = task->evthr_t_ctx; SM_IS_EVTHR_CTX(evthr_ctx); /* try to catch relative sleep times, e.g., 60s instead of 1081548659 */ SM_REQUIRE(slpt.tv_sec > 3600); ret = evthr_add_req(evthr_ctx, task, evthr_r_yes(EVTHR_EV_SL), &slpt, change ? EVTHR_CHG_TIME_YES : EVTHR_CHG_TIME_LESS); if (sm_is_err(ret)) return ret; return evthr_wakeup(evthr_ctx); } /* ** EVTHR_SLPQ_INS -- insert a task into the sleep (wait) queue ** ** Parameters: ** evthr_ctx -- evthr context ** task -- task context (with valid sleep time) ** ** Returns: ** usual sm_error code (always SM_SUCCESS) ** ** Locking: ** waitq must be locked by caller. */ sm_ret_T evthr_slpq_ins(sm_evthr_ctx_P evthr_ctx, sm_evthr_task_P task) { sm_evthr_task_P task_next; SM_IS_EVTHR_CTX(evthr_ctx); SM_IS_EVTHR_TSK(task); EVTHR_WAITQ_LOOP(evthr_ctx, task_next) { if (!evthr_is_slp(task_next) || timercmp(&task_next->evthr_t_sleep, &task->evthr_t_sleep, >) ) { EVTHR_WAITQ_INS(evthr_ctx, task_next, task); EVTHR_IS_INQ(task, EVTHR_EV_IWQ); return SM_SUCCESS; } } EVTHR_WAITQ_APP(evthr_ctx, task); EVTHR_IS_INQ(task, EVTHR_EV_IWQ); return SM_SUCCESS; } /* ** EVTHR_TASK_NEW -- add a new task to the event thread system ** ** Parameters: ** evthr_ctx -- evthr context ** ptask -- pointer to task context (return parameter) ** ev -- type of event to watch for ** fd -- filedescriptor or signal to watch ** sleept -- pointer to sleep time (absolute time) ** fct -- function to call ** taskctx -- argument for function to call ** ** Returns: ** usual sm_error code ** ** Locking: ** locks waitq */ sm_ret_T evthr_task_new(sm_evthr_ctx_P evthr_ctx, sm_evthr_task_P *ptask, int ev, int fd, timeval_T *sleept, evthr_task_F *fct, void *taskctx) { int r; sm_ret_T ret; sm_evthr_task_P task; SM_IS_EVTHR_CTX(evthr_ctx); SM_REQUIRE(ptask != NULL); task = (sm_evthr_task_P) sm_zalloc(sizeof(*task)); if (task == NULL) return sm_error_temp(SM_EM_EVTHR, ENOMEM); evthr_rqevents(task) = ev; /* signal or fd? */ if (evthr_is_sgf(ev)) { task->evthr_t_sig = fd; r = EVTHR_SIG2IDX(fd); if (r < 0 || r >= EVTHR_MAX_SIGS) { ret = sm_error_perm(SM_EM_EVTHR, EINVAL); goto err2; } evthr_ctx->evthr_c_sg2t[r] = task; task->evthr_t_fd = -1; } else { task->evthr_t_fd = fd; task->evthr_t_sig = -1; } if (sleept != NULL) task->evthr_t_sleep = *sleept; task->evthr_t_fct = fct; task->evthr_t_actx = taskctx; task->evthr_t_ctx = evthr_ctx; EVTHR_T_PRIO_SET(task, 0); #if SM_LOCK_TASK r = pthread_mutex_init(&task->evthr_t_mutex, SM_PTHREAD_MUTEXATTR); if (r != 0) { ret = sm_error_perm(SM_EM_EVTHR, r); goto error; } #endif /* SM_LOCK_TASK */ r = pthread_mutex_lock(&evthr_ctx->evthr_c_waitqmut); SM_LOCK_OK(r); if (r != 0) { NOTE(NOT_REACHED) ret = sm_error_perm(SM_EM_EVTHR, r); goto err2; } /* set early on to avoid problem in next function calls */ task->sm_magic = SM_EVTHR_TASK_MAGIC; if (evthr_is_slpf(ev)) { /* return code ignored; see above */ (void) evthr_slpq_ins(evthr_ctx, task); } else { EVTHR_WAITQ_APP(evthr_ctx, task); if (fd >= 0 && (uint)fd < evthr_ctx->evthr_c_maxfd) evthr_ctx->evthr_c_fd2t[fd] = task; EVTHR_IS_INQ(task, EVTHR_EV_IWQ); } evthr_ctx->evthr_c_tasks++; DPRINTF(4, (stderr, "func=evthr_task_new, tasks=%d\n", evthr_ctx->evthr_c_tasks)); r = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut); SM_ASSERT(r == 0); if (r != 0) { ret = sm_error_perm(SM_EM_EVTHR, r); goto err1; } /* ** How to inform the main loop ("scheduler") about the change? ** Should it just "catch" it during the next iteration? ** Or just send a notice via the pipe? ** Probably the latter, See: evthr_wakeup() */ *ptask = task; return SM_SUCCESS; err1: EVTHR_WAITQ_DEL(evthr_ctx, *ptask); if (fd >= 0 && (uint)fd < evthr_ctx->evthr_c_maxfd) evthr_ctx->evthr_c_fd2t[fd] = NULL; evthr_ctx->evthr_c_tasks--; err2: #if SM_LOCK_TASK (void) pthread_mutex_destroy(&task->evthr_t_mutex); error: #endif SM_FREE(*ptask); return ret; } /* ** EVTHR_TASK_DEL -- remove a task from the event thread system ** ** Parameters: ** evthr_ctx -- evthr context ** task -- task context ** locktype -- kind of locking ** ** Returns: ** usual sm_error code ** ** Locking: ** waitq is locked/unlocked as requested by locktype */ sm_ret_T evthr_task_del(sm_evthr_ctx_P evthr_ctx, sm_evthr_task_P task, thr_lock_T locktype) { int r; sm_ret_T ret; SM_IS_EVTHR_CTX(evthr_ctx); SM_IS_EVTHR_TSK(task); /* This should check whether task is "in use" */ ret = SM_SUCCESS; /* ** Do we need a mutex per task? -> SM_LOCK_TASK ** Just killing off tasks isn't a good idea, we must make ** sure they are not in use anymore. ** How? Requirement for the application itself? */ /* ** Check only whether fd is "valid", not whether _ev actually ** is I/O related because it could be that the task temporarily ** just sleeps. */ r = task->evthr_t_fd; if (r >= 0 && (uint)r < evthr_ctx->evthr_c_maxfd) evthr_ctx->evthr_c_fd2t[r] = NULL; r = EVTHR_SIG2IDX(task->evthr_t_sig); if (r >= 0 && r < EVTHR_MAX_SIGS) evthr_ctx->evthr_c_sg2t[r] = NULL; if (thr_lock_it(locktype)) { r = pthread_mutex_lock(&evthr_ctx->evthr_c_waitqmut); SM_LOCK_OK(r); if (r != 0) { NOTE(NOT_REACHED) ret = sm_error_perm(SM_EM_EVTHR, r); goto error; } } /* where is the task? */ if (evthr_is_inwq(task)) { EVTHR_WAITQ_DEL(evthr_ctx, task); EVTHR_REM_FROMQ(task, EVTHR_EV_IWQ); } evthr_ctx->evthr_c_tasks--; DPRINTF(4, (stderr, "func=evthr_task_del, tasks=%d\n", evthr_ctx->evthr_c_tasks)); if (thr_unl_always(locktype)) { r = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut); SM_ASSERT(r == 0); if (r != 0) ret = sm_error_perm(SM_EM_EVTHR, r); } task->sm_magic = SM_MAGIC_NULL; sm_free_size(task, sizeof(*task)); /* how about substructures, esp. the user data? */ return ret; error: /* cleanup?? */ return ret; } /* ** EVTHR_WAKEUP_TASK -- wake up a task ** ** Parameters: ** task -- evthr task context ** ** Returns: ** usual sm_error code ** ** Last code review: ** Last code change: */ sm_ret_T evthr_wakeup_task(sm_evthr_task_P task) { sm_ret_T ret; sm_evthr_ctx_P evthr_ctx; timeval_T slpt; SM_IS_EVTHR_TSK(task); evthr_ctx = task->evthr_t_ctx; SM_IS_EVTHR_CTX(evthr_ctx); ret = evthr_timeval(evthr_ctx, &slpt); /* check ret? */ ret = evthr_add_req(evthr_ctx, task, evthr_r_yes(EVTHR_EV_SL), &slpt, EVTHR_CHG_TIME_LESS); DPRINTF(8, (stderr, "func=evthr_wakeup_task, task=%p, add_res=%r\n", task, ret)); if (sm_is_err(ret)) return ret; return evthr_wakeup(evthr_ctx); }