/*
* Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers.
* All rights reserved.
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the sendmail distribution.
*/
#include "sm/generic.h"
SM_RCSID("@(#)$Id: tasks.c,v 1.82 2007/01/14 00:04:10 ca Exp $")
#include "sm/error.h"
#include "sm/assert.h"
#include "sm/memops.h"
#include "sm/heap.h"
#include "sm/signal.h"
#include "sm/evthr.h"
#include "sm/io.h"
#include "evthr-int.h"
#include "log.h"
/*
** EVTHR_WAKEUP -- wakeup main loop ("scheduler")
**
** Parameters:
** evthr_ctx -- evthr context
**
** Returns:
** usual sm_error code; write() errno
**
** Locking:
** locks waitq
**
** Last code review: 2005-03-23 00:14:51; see below
** Last code change:
*/
static sm_ret_T
evthr_wakeup(sm_evthr_ctx_P evthr_ctx)
{
int r;
char c;
sm_ret_T ret;
SM_IS_EVTHR_CTX(evthr_ctx);
ret = SM_SUCCESS;
r = pthread_mutex_lock(&evthr_ctx->evthr_c_waitqmut);
SM_LOCK_OK(r);
if (r != 0) {
NOTE(NOT_REACHED)
return sm_error_perm(SM_EM_EVTHR, r);
}
if (!EVTHR_IS_WAKEUP(evthr_ctx)) {
c = EVTHR_CONT;
r = write(wrpipe(evthr_ctx), (void *) &c, 1);
if (r != 1) {
/*
** What to do on error? Let's hope scheduler is
** started anyway (otherwise the system could
** theoretically deadlock).
*/
ret = sm_error_perm(SM_EM_EVTHR, (errno == 0) ? EIO : errno);
DPRINTF(0, (stderr, "sev=ERROR, func=evthr_wakeup, write=%c, failed=%d\n", c, r));
sm_log_write(evthr_ctx->evthr_c_lctx,
EVTHR_LCAT_MISC, EVTHR_LMOD_MISC, SM_LOG_CRIT, 0,
"sev=CRIT, func=evthr_wakeup, write=%d, error=%m", r, ret);
}
else {
EVTHR_SET_WAKEUP(evthr_ctx);
DPRINTF(8, (stderr, "func=evthr_wakeup, status=wakeup\n"));
}
}
r = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut);
SM_ASSERT(r == 0);
if (r != 0)
ret = sm_error_perm(SM_EM_EVTHR, r);
return ret;
}
/*
** EVTHR_ADD_REQ -- add a change request for a task
**
** Parameters:
** evthr_ctx -- evthr context
** task -- task to change
** rqflags -- new request flags
** slpt -- new sleep time (iff change is set accordingly)
** change -- NO/YES/LESS ((don't) change timeout or shorten it)
**
** Returns:
** usual sm_error code; ENOMEM
**
** Side Effects: none on error
**
** Locking:
** locks evthr_c_reqmut
**
** Last code review: 2005-03-23 00:08:51
** Last code change:
*/
static sm_ret_T
evthr_add_req(sm_evthr_ctx_P evthr_ctx, sm_evthr_task_P task, int rqflags, timeval_P slpt, int chg_time)
{
int r;
bool found;
sm_evthr_req_P req;
SM_IS_EVTHR_CTX(evthr_ctx);
SM_IS_EVTHR_TSK(task);
r = pthread_mutex_lock(&evthr_ctx->evthr_c_reqmut);
SM_LOCK_OK(r);
if (r != 0) {
NOTE(NOT_REACHED)
DPRINTF(0, (stderr, "sev=ERROR, func=evthr_add_req, lock=%d\n", r));
return sm_error_perm(SM_EM_EVTHR, r);
}
found = false;
for (req = EVTHR_REQ_FIRST(evthr_ctx);
req != EVTHR_REQ_END(evthr_ctx);
req = EVTHR_REQ_NEXT(req))
{
if (req->evthr_r_task == NULL) {
++evthr_ctx->evthr_c_nreqs;
found = true;
break;
}
if (task == req->evthr_r_task) {
/* question: overwrite or | ? */
DPRINTF(5, (stderr, "func=evthr_add_req, overwrite existing request: %x -> %x\n", req->evthr_r_rqevf, rqflags));
found = true;
break;
}
}
if (!found) {
req = evthr_req_new(evthr_ctx);
found = req != NULL;
if (found)
++evthr_ctx->evthr_c_nreqs;
}
if (found) {
req->evthr_r_task = task;
if (rqflags != 0)
req->evthr_r_rqevf = rqflags;
if (chg_time != EVTHR_CHG_TIME_NO) {
req->evthr_r_sleep = *slpt;
req->evthr_r_chge = chg_time;
}
}
else {
DPRINTF(0, (stderr, "sev=ERROR, func=evthr_add_req, change request for task=%lx failed\n", (long) task));
}
r = pthread_mutex_unlock(&evthr_ctx->evthr_c_reqmut);
if (r != 0)
DPRINTF(0, (stderr, "sev=ERROR, func=evthr_add_req, unlock=%d\n", r));
SM_ASSERT(r == 0);
return found ? SM_SUCCESS : sm_error_perm(SM_EM_EVTHR, ENOMEM);
}
/*
** EVTHR_WAITQ_APP -- append task to waitq, wake up main loop ("scheduler")
**
** Parameters:
** task -- evthr task context
**
** Returns:
** usual sm_error code
**
** Locking:
** locks waitq
**
** Todo: deal better with failures, see for example EVTHR_EV_WWQ.
*/
sm_ret_T
evthr_waitq_app(sm_evthr_task_P task)
{
int r;
sm_evthr_ctx_P evthr_ctx;
#if EVTHR_PARANOIA
sm_evthr_task_P task_next;
#endif
SM_IS_EVTHR_TSK(task);
#if EVTHR_PARANOIA
/* HACK! Make sure the task is not in any queue; should use a macro */
SM_REQUIRE(task->evthr_t_state == 0);
#endif
evthr_ctx = task->evthr_t_ctx;
SM_IS_EVTHR_CTX(evthr_ctx);
EVTHR_WANTS_INQ(task, EVTHR_EV_WWQ);
r = pthread_mutex_lock(&evthr_ctx->evthr_c_waitqmut);
SM_LOCK_OK(r);
if (r != 0) {
NOTE(NOT_REACHED)
goto error;
}
#if EVTHR_PARANOIA > 1
EVTHR_WAITQ_LOOP(evthr_ctx, task_next)
SM_ASSERT(task != task_next);
#endif
EVTHR_WAITQ_APP(evthr_ctx, task);
#if SM_LOCK_TASK
r = pthread_mutex_unlock(&task->evthr_t_mutex);
if (r != 0) {
DPRINTF(0, (stderr, "sev=ERROR, func=evthr_waitq_app, tsk=0x%p, unlock=%d\n",
task, r));
}
SM_ASSERT(r == 0);
#endif /* SM_LOCK_TASK */
/* call evthr_wakeup()?? would require a variant that doesn't lock waitq */
if (!EVTHR_IS_WAKEUP(evthr_ctx)) {
char c;
c = EVTHR_CONT;
r = write(wrpipe(evthr_ctx), (void *) &c, 1);
if (r != 1)
sm_log_write(evthr_ctx->evthr_c_lctx,
EVTHR_LCAT_MISC, EVTHR_LMOD_MISC, SM_LOG_CRIT, 0,
"sev=CRIT, func=evthr_waitq_app, write=%d, error=%d", r, errno);
else {
EVTHR_SET_WAKEUP(evthr_ctx);
DPRINTF(8, (stderr, "func=evthr_waitq_app, status=wakeup\n"));
}
}
EVTHR_IS_INQ(task, EVTHR_EV_IWQ);
r = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut);
SM_ASSERT(r == 0);
if (r != 0)
goto error;
return SM_SUCCESS;
error:
return sm_error_perm(SM_EM_EVTHR, r); /* no good! */
}
/*
** EVTHR_EN_WR -- enable WR check for task
**
** Parameters:
** task -- evthr task context
**
** Returns:
** usual sm_error code
**
** Last code review:
** Last code change:
*/
sm_ret_T
evthr_en_wr(sm_evthr_task_P task)
{
sm_ret_T ret;
sm_evthr_ctx_P evthr_ctx;
SM_IS_EVTHR_TSK(task);
evthr_ctx = task->evthr_t_ctx;
SM_IS_EVTHR_CTX(evthr_ctx);
ret = evthr_add_req(evthr_ctx, task, evthr_r_yes(EVTHR_EV_WR), NULL, EVTHR_CHG_TIME_NO);
if (sm_is_err(ret))
return ret;
return evthr_wakeup(evthr_ctx);
}
/*
** EVTHR_A2DEL -- ask to delete task
**
** Parameters:
** task -- evthr task context
**
** Returns:
** usual sm_error code
**
** Last code review:
** Last code change: 2005-09-23 20:42:46
*/
sm_ret_T
evthr_a2del(sm_evthr_task_P task)
{
sm_ret_T ret;
sm_evthr_ctx_P evthr_ctx;
SM_IS_EVTHR_TSK(task);
evthr_ctx = task->evthr_t_ctx;
SM_IS_EVTHR_CTX(evthr_ctx);
ret = evthr_add_req(evthr_ctx, task, evthr_r_yes(EVTHR_EV_DEL), NULL, EVTHR_CHG_TIME_NO);
if (sm_is_err(ret))
return ret;
return evthr_wakeup(evthr_ctx);
}
/*
** EVTHR_NEW_SL -- change SL timeout for task
**
** Parameters:
** task -- evthr task context
** slpt -- sleep time (absolute time)
** change -- always change timeout or only shorten it?
**
** Returns:
** usual sm_error code; ENOMEM, et.al.
**
** Last code review: 2005-03-23 02:00:37
** Last code change:
*/
sm_ret_T
evthr_new_sl(sm_evthr_task_P task, timeval_T slpt, bool change)
{
sm_ret_T ret;
sm_evthr_ctx_P evthr_ctx;
SM_IS_EVTHR_TSK(task);
evthr_ctx = task->evthr_t_ctx;
SM_IS_EVTHR_CTX(evthr_ctx);
/* try to catch relative sleep times, e.g., 60s instead of 1081548659 */
SM_REQUIRE(slpt.tv_sec > 3600);
ret = evthr_add_req(evthr_ctx, task, evthr_r_yes(EVTHR_EV_SL), &slpt,
change ? EVTHR_CHG_TIME_YES : EVTHR_CHG_TIME_LESS);
if (sm_is_err(ret))
return ret;
return evthr_wakeup(evthr_ctx);
}
/*
** EVTHR_SLPQ_INS -- insert a task into the sleep (wait) queue
**
** Parameters:
** evthr_ctx -- evthr context
** task -- task context (with valid sleep time)
**
** Returns:
** usual sm_error code (always SM_SUCCESS)
**
** Locking:
** waitq must be locked by caller.
*/
sm_ret_T
evthr_slpq_ins(sm_evthr_ctx_P evthr_ctx, sm_evthr_task_P task)
{
sm_evthr_task_P task_next;
SM_IS_EVTHR_CTX(evthr_ctx);
SM_IS_EVTHR_TSK(task);
EVTHR_WAITQ_LOOP(evthr_ctx, task_next) {
if (!evthr_is_slp(task_next)
|| timercmp(&task_next->evthr_t_sleep, &task->evthr_t_sleep, >)
)
{
EVTHR_WAITQ_INS(evthr_ctx, task_next, task);
EVTHR_IS_INQ(task, EVTHR_EV_IWQ);
return SM_SUCCESS;
}
}
EVTHR_WAITQ_APP(evthr_ctx, task);
EVTHR_IS_INQ(task, EVTHR_EV_IWQ);
return SM_SUCCESS;
}
/*
** EVTHR_TASK_NEW -- add a new task to the event thread system
**
** Parameters:
** evthr_ctx -- evthr context
** ptask -- pointer to task context (return parameter)
** ev -- type of event to watch for
** fd -- filedescriptor or signal to watch
** sleept -- pointer to sleep time (absolute time)
** fct -- function to call
** taskctx -- argument for function to call
**
** Returns:
** usual sm_error code
**
** Locking:
** locks waitq
*/
sm_ret_T
evthr_task_new(sm_evthr_ctx_P evthr_ctx, sm_evthr_task_P *ptask, int ev, int fd, timeval_T *sleept, evthr_task_F *fct, void *taskctx)
{
int r;
sm_ret_T ret;
sm_evthr_task_P task;
SM_IS_EVTHR_CTX(evthr_ctx);
SM_REQUIRE(ptask != NULL);
task = (sm_evthr_task_P) sm_zalloc(sizeof(*task));
if (task == NULL)
return sm_error_temp(SM_EM_EVTHR, ENOMEM);
evthr_rqevents(task) = ev;
/* signal or fd? */
if (evthr_is_sgf(ev)) {
task->evthr_t_sig = fd;
r = EVTHR_SIG2IDX(fd);
if (r < 0 || r >= EVTHR_MAX_SIGS) {
ret = sm_error_perm(SM_EM_EVTHR, EINVAL);
goto err2;
}
evthr_ctx->evthr_c_sg2t[r] = task;
task->evthr_t_fd = -1;
}
else {
task->evthr_t_fd = fd;
task->evthr_t_sig = -1;
}
if (sleept != NULL)
task->evthr_t_sleep = *sleept;
task->evthr_t_fct = fct;
task->evthr_t_actx = taskctx;
task->evthr_t_ctx = evthr_ctx;
EVTHR_T_PRIO_SET(task, 0);
#if SM_LOCK_TASK
r = pthread_mutex_init(&task->evthr_t_mutex, SM_PTHREAD_MUTEXATTR);
if (r != 0) {
ret = sm_error_perm(SM_EM_EVTHR, r);
goto error;
}
#endif /* SM_LOCK_TASK */
r = pthread_mutex_lock(&evthr_ctx->evthr_c_waitqmut);
SM_LOCK_OK(r);
if (r != 0) {
NOTE(NOT_REACHED)
ret = sm_error_perm(SM_EM_EVTHR, r);
goto err2;
}
/* set early on to avoid problem in next function calls */
task->sm_magic = SM_EVTHR_TASK_MAGIC;
if (evthr_is_slpf(ev)) {
/* return code ignored; see above */
(void) evthr_slpq_ins(evthr_ctx, task);
}
else {
EVTHR_WAITQ_APP(evthr_ctx, task);
if (fd >= 0 && (uint)fd < evthr_ctx->evthr_c_maxfd)
evthr_ctx->evthr_c_fd2t[fd] = task;
EVTHR_IS_INQ(task, EVTHR_EV_IWQ);
}
evthr_ctx->evthr_c_tasks++;
DPRINTF(4, (stderr, "func=evthr_task_new, tasks=%d\n", evthr_ctx->evthr_c_tasks));
r = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut);
SM_ASSERT(r == 0);
if (r != 0) {
ret = sm_error_perm(SM_EM_EVTHR, r);
goto err1;
}
/*
** How to inform the main loop ("scheduler") about the change?
** Should it just "catch" it during the next iteration?
** Or just send a notice via the pipe?
** Probably the latter, See: evthr_wakeup()
*/
*ptask = task;
return SM_SUCCESS;
err1:
EVTHR_WAITQ_DEL(evthr_ctx, *ptask);
if (fd >= 0 && (uint)fd < evthr_ctx->evthr_c_maxfd)
evthr_ctx->evthr_c_fd2t[fd] = NULL;
evthr_ctx->evthr_c_tasks--;
err2:
#if SM_LOCK_TASK
(void) pthread_mutex_destroy(&task->evthr_t_mutex);
error:
#endif
SM_FREE(*ptask);
return ret;
}
/*
** EVTHR_TASK_DEL -- remove a task from the event thread system
**
** Parameters:
** evthr_ctx -- evthr context
** task -- task context
** locktype -- kind of locking
**
** Returns:
** usual sm_error code
**
** Locking:
** waitq is locked/unlocked as requested by locktype
*/
sm_ret_T
evthr_task_del(sm_evthr_ctx_P evthr_ctx, sm_evthr_task_P task, thr_lock_T locktype)
{
int r;
sm_ret_T ret;
SM_IS_EVTHR_CTX(evthr_ctx);
SM_IS_EVTHR_TSK(task);
/* This should check whether task is "in use" */
ret = SM_SUCCESS;
/*
** Do we need a mutex per task? -> SM_LOCK_TASK
** Just killing off tasks isn't a good idea, we must make
** sure they are not in use anymore.
** How? Requirement for the application itself?
*/
/*
** Check only whether fd is "valid", not whether _ev actually
** is I/O related because it could be that the task temporarily
** just sleeps.
*/
r = task->evthr_t_fd;
if (r >= 0 && (uint)r < evthr_ctx->evthr_c_maxfd)
evthr_ctx->evthr_c_fd2t[r] = NULL;
r = EVTHR_SIG2IDX(task->evthr_t_sig);
if (r >= 0 && r < EVTHR_MAX_SIGS)
evthr_ctx->evthr_c_sg2t[r] = NULL;
if (thr_lock_it(locktype)) {
r = pthread_mutex_lock(&evthr_ctx->evthr_c_waitqmut);
SM_LOCK_OK(r);
if (r != 0) {
NOTE(NOT_REACHED)
ret = sm_error_perm(SM_EM_EVTHR, r);
goto error;
}
}
/* where is the task? */
if (evthr_is_inwq(task)) {
EVTHR_WAITQ_DEL(evthr_ctx, task);
EVTHR_REM_FROMQ(task, EVTHR_EV_IWQ);
}
evthr_ctx->evthr_c_tasks--;
DPRINTF(4, (stderr, "func=evthr_task_del, tasks=%d\n", evthr_ctx->evthr_c_tasks));
if (thr_unl_always(locktype)) {
r = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut);
SM_ASSERT(r == 0);
if (r != 0)
ret = sm_error_perm(SM_EM_EVTHR, r);
}
task->sm_magic = SM_MAGIC_NULL;
sm_free_size(task, sizeof(*task));
/* how about substructures, esp. the user data? */
return ret;
error:
/* cleanup?? */
return ret;
}
/*
** EVTHR_WAKEUP_TASK -- wake up a task
**
** Parameters:
** task -- evthr task context
**
** Returns:
** usual sm_error code
**
** Last code review:
** Last code change:
*/
sm_ret_T
evthr_wakeup_task(sm_evthr_task_P task)
{
sm_ret_T ret;
sm_evthr_ctx_P evthr_ctx;
timeval_T slpt;
SM_IS_EVTHR_TSK(task);
evthr_ctx = task->evthr_t_ctx;
SM_IS_EVTHR_CTX(evthr_ctx);
ret = evthr_timeval(evthr_ctx, &slpt);
/* check ret? */
ret = evthr_add_req(evthr_ctx, task, evthr_r_yes(EVTHR_EV_SL), &slpt,
EVTHR_CHG_TIME_LESS);
DPRINTF(8, (stderr, "func=evthr_wakeup_task, task=%p, add_res=%r\n",
task, ret));
if (sm_is_err(ret))
return ret;
return evthr_wakeup(evthr_ctx);
}
syntax highlighted by Code2HTML, v. 0.9.1