/* * Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers. * All rights reserved. * Copyright (c) 2006 Claus Assmann * * By using this file, you agree to the terms and conditions set * forth in the LICENSE file which can be found at the top level of * the sendmail distribution. */ /* ** partly based on workq.c from Programming with POSIX Threads ** by David Butenhof. */ #include "sm/generic.h" SM_RCSID("@(#)$Id: worker.c,v 1.48 2006/10/30 02:35:44 ca Exp $") #include "sm/error.h" #include "sm/assert.h" #include "sm/memops.h" #include "sm/time.h" #include "sm/heap.h" #include "sm/evthr.h" #include "sm/io.h" #include "sm/socket.h" #include "evthr-int.h" #include "log.h" #if EVTHR_PRIOS > 1 /* ** EVTHR_RUNQ_EMPTY_ALL -- are all run queues empty? ** ** Parameters: ** evthr_ctx -- evthr context ** pprio -- (pointer to) priority of non-empty queue ** >=0: priority of queue that isn't empty ** <0: all queues empty ** ** Returns: ** true iff all run queues are empty */ static int evthr_runq_empty_all(sm_evthr_ctx_P evthr_ctx, int *pprio) { int i; SM_REQUIRE(pprio != NULL); for (i = 0; i < evthr_ctx->evthr_c_nprio; i++) { if (!EVTHR_RUNQ_EMPTY(evthr_ctx, i)) { *pprio = i; return false; } } *pprio = -1; return true; } #endif /* EVTHR_PRIOS > 1 */ #define WAKEUP_SCHEDULER(evthr_ctx, c, status, tid) do { \ if (!EVTHR_IS_WAKEUP(evthr_ctx)) { \ c = EVTHR_CONT; \ status = write(wrpipe(evthr_ctx), (void *) &c, 1);\ if (status != 1) { \ DPRINTF(0, (stderr, "[%ld] sev=ERROR, write %c failed=%d\n", \ (long) tid, c, status)); \ } \ else { \ EVTHR_SET_WAKEUP(evthr_ctx); \ DPRINTF(8, (stderr, "func=evthr_worker, where=waitq, status=wakeup\n")); \ } \ } \ } while (0) /* ** EVTHR_WORKER -- worker thread ** ** Several of these can be started as threads, they check the ** runq and execute the functions stored in the tasks. ** ** Parameters: ** arg -- evthr context ** ** Returns: ** ??? ** ** Todo: deal better with failures, see for example EVTHR_EV_WWQ. */ void * evthr_worker(void *arg) { sm_evthr_ctx_P evthr_ctx; timespec_T timeout; timeval_T now; int status; bool timedout; char c; sm_ret_T ret, r2; pthread_t tid; #if EVTHR_PRIOS <= 1 # define prio 0 #else int prio; #endif /* timeouts for waiting for work: */ #define EVTHR_WORKER_TO_N 60 /* normal (increase even further?) */ #define EVTHR_WORKER_TO_M 5 /* if more than min workers */ #define EVTHR_WORKER_TO_S 1 /* if more than soft limit */ SM_REQUIRE(arg != NULL); evthr_ctx = (sm_evthr_ctx_P) arg; tid = pthread_self(); status = pthread_detach(tid); if (status != 0) DPRINTF(0, (stderr, "[%ld] worker detach failed=%d\n", (long) tid, status)); /* wait for work */ /* ** We don't need to validate the workq_t here... task don't ** create server threads until requests are queued (the ** queue has been initialized by then!) and task wait for all ** server threads to terminate before destroying a work queue. */ DPRINTF(5, (stderr, "[%ld] worker is starting\n", (long) tid)); status = pthread_mutex_lock(&evthr_ctx->evthr_c_runqmut); SM_LOCK_OK(status); if (status != 0) { NOTE(NOT_REACHED) return NULL; } while (true) { timedout = false; DPRINTF(7, (stderr, "[%ld] worker waiting for work\n", (long) tid)); gettimeofday(&now, NULL); /* make sure time increases */ if (timercmp(&now, &evthr_ctx->evthr_c_time, >)) SM_TIMEVAL_TO_TIMESPEC(&now, &timeout); else SM_TIMEVAL_TO_TIMESPEC(&evthr_ctx->evthr_c_time, &timeout); if (evthr_ctx->evthr_c_cur > evthr_ctx->evthr_c_max_s) { timespec_T delay; delay.tv_sec = 0; delay.tv_nsec = 10000000; /* 10ms */ sm_timespecadd(&timeout, &delay, &timeout); } else if (evthr_ctx->evthr_c_cur > evthr_ctx->evthr_c_min) { /* evthr_ctx->evthr_c_cur <= evthr_ctx->evthr_c_max_s */ timeout.tv_sec += EVTHR_WORKER_TO_M + evthr_ctx->evthr_c_max_s - evthr_ctx->evthr_c_cur; } else timeout.tv_sec += EVTHR_WORKER_TO_N; while (EVTHR_RUNQ_EMPTY_ALL(evthr_ctx, prio) && !EVTHR_IS_FLAG(evthr_ctx, EVTHR_FL_STOP)) { /* ** "Go idle" waiting for work. Each server will wait ** up to EVTHR_WORKER_TO_x seconds for work, ** and then give up if there are enough threads. ** ** Note: if the system should stop then cv is ** signalled, hence the timeout can be large. */ ++evthr_ctx->evthr_c_idl; status = pthread_cond_timedwait(&evthr_ctx->evthr_c_cv, &evthr_ctx->evthr_c_runqmut, &timeout); SM_ASSERT(evthr_ctx->evthr_c_idl > 0); --evthr_ctx->evthr_c_idl; /* Got work? */ if (!EVTHR_RUNQ_EMPTY_ALL(evthr_ctx, prio)) break; /* If the wait timed out queued, quit */ if (status == ETIMEDOUT) { timedout = true; break; } else if (status != 0) { /* ** This shouldn't happen, so the work queue ** package should fail. Because the work queue ** API is asynchronous, that would add ** complication. Because the chances of failure ** are slim, I choose to avoid that ** complication. The server thread will return, ** and allow another server thread to pick up ** the work later. Note that, if this was the ** only server thread, the queue won't be ** serviced until a new work item is queued. ** That could be fixed by creating a new server ** here. */ sm_log_write(evthr_ctx->evthr_c_lctx, EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER, SM_LOG_ERROR, 1, "sev=ERROR, func=evthr_worker, timed_wait=%d" , status); SM_ASSERT(evthr_ctx->evthr_c_cur > 0); --evthr_ctx->evthr_c_cur; if (evthr_ctx->evthr_c_cur <= evthr_ctx->evthr_c_max_s) EVTHR_CLR_FLAG(evthr_ctx, EVTHR_FL_SL_EXC); (void) pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut); return NULL; } } /* ** should we look for a non empty queue again or just check ** the one that we found? that is: ** if (prio > 0 && !EVTHR_RUNQ_EMPTY_ALL(evthr_ctx, prio)) */ if (!EVTHR_RUNQ_EMPTY_ALL(evthr_ctx, prio)) { sm_evthr_task_P task; evthr_task_F *fct; task = EVTHR_RUNQ_FIRST(evthr_ctx, prio); SM_IS_EVTHR_TSK(task); #if SM_LOCK_TASK status = pthread_mutex_lock(&task->evthr_t_mutex); SM_LOCK_OK(status); if (status != 0) { NOTE(NOT_REACHED) DPRINTF(0, (stderr, "[%ld] sev=ERROR, worker, task=%p, lock=%d\n", (long) tid, task, status)); sm_log_write(evthr_ctx->evthr_c_lctx, EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER, SM_LOG_ERROR, 1, "sev=ERROR, func=evthr_worker, mutex=t_mutex, lock=%d" , status); return NULL; } #endif /* SM_LOCK_TASK */ DPRINTF(7, (stderr, "[%ld] worker, task=%p, stop=%d\n", (long) tid, task, EVTHR_IS_FLAG(evthr_ctx, EVTHR_FL_STOP))); fct = task->evthr_t_fct; EVTHR_RUNQ_DEL(evthr_ctx, prio, task); EVTHR_REM_FROMQ(task, EVTHR_EV_IRQ); ++evthr_ctx->evthr_c_act; status = pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut); if (status != 0) { sm_log_write(evthr_ctx->evthr_c_lctx, EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER, SM_LOG_ERROR, 1, "sev=ERROR, func=evthr_worker, mutex=runq_mutex, unlock=%d" , status); SM_ASSERT(evthr_ctx->evthr_c_act > 0); --evthr_ctx->evthr_c_act; return NULL; } DPRINTF(4, (stderr, "[%ld] worker task=%p, flags=%X\n", (long) tid, task, task->evthr_t_flags)); ret = fct(task); evthr_clr_rdy(task); /* ** free nc here? is this safe (locked?) ** It might be safer to copy it into a local variable ** and free evthr_t_nc above. ** However, we also pass sleep time directly... ** Maybe we should use also a local variable for it. */ SM_FREE(task->evthr_t_nc); DPRINTF(4, (stderr, "[%ld] worker task=%p, ret=%x\n", (long) tid, task, ret)); status = pthread_mutex_lock(&evthr_ctx->evthr_c_runqmut); SM_LOCK_OK(status); if (status != 0) { NOTE(NOT_REACHED) return NULL; } SM_ASSERT(evthr_ctx->evthr_c_act > 0); --evthr_ctx->evthr_c_act; /* check whether fct wants to change event mask */ /* Do this only if not ASYNC? */ if (!evthr_act_async(ret)) { if (evthr_r_set(ret)) evthr_set_ev(task, evthr_r_set_ev(ret)); if (evthr_r_clr(ret)) evthr_clr_ev(task, evthr_r_clr_ev(ret)); } /* check what to do with task */ if (evthr_act_waitq(ret)) { EVTHR_WANTS_INQ(task, EVTHR_EV_WWQ); status = pthread_mutex_lock(&evthr_ctx->evthr_c_waitqmut); SM_LOCK_OK(status); if (status != 0) { NOTE(NOT_REACHED) #if SM_LOCK_TASK sm_log_write(evthr_ctx->evthr_c_lctx, EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER, SM_LOG_ERROR, 1, "sev=ERROR, func=evthr_worker, mutex=waitq_mutex, lock=%d" , status); (void) pthread_mutex_unlock(&task->evthr_t_mutex); #endif return NULL; } EVTHR_WAITQ_APP(evthr_ctx, task); WAKEUP_SCHEDULER(evthr_ctx, c, status, tid); EVTHR_IS_INQ(task, EVTHR_EV_IWQ); status = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut); if (status != 0) { sm_log_write(evthr_ctx->evthr_c_lctx, EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER, SM_LOG_ERROR, 1, "sev=ERROR, func=evthr_worker, mutex=waitq_mutex, unlock=%d" , status); SM_ASSERT(status == 0); } } else if (evthr_act_slpq(ret)) { EVTHR_WANTS_INQ(task, EVTHR_EV_WWQ); status = pthread_mutex_lock(&evthr_ctx->evthr_c_waitqmut); SM_LOCK_OK(status); if (status != 0) { NOTE(NOT_REACHED) #if SM_LOCK_TASK (void) pthread_mutex_unlock(&task->evthr_t_mutex); #endif DPRINTF(0, (stderr, "[%ld] sev=ERROR, lock-waitq failed=%d\n", (long) tid, status)); sm_log_write(evthr_ctx->evthr_c_lctx, EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER, SM_LOG_ERROR, 1, "sev=ERROR, func=evthr_worker, mutex=waitq_mutex, lock=%d" , status); return NULL; /* no good! */ } evthr_slpq_ins(evthr_ctx, task); WAKEUP_SCHEDULER(evthr_ctx, c, status, tid); status = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut); if (status != 0) { sm_log_write(evthr_ctx->evthr_c_lctx, EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER, SM_LOG_ERROR, 1, "sev=ERROR, func=evthr_worker, mutex=waitq_mutex, unlock=%d" , status); SM_ASSERT(status == 0); } } else if (evthr_act_runq(ret)) { /* we have the mutex for runq */ EVTHR_RUNQ_APP(evthr_ctx, prio, task); } else if (evthr_act_del(ret)) { r2 = evthr_task_del(evthr_ctx, task, THR_LOCK_UNLOCK); if (sm_is_err(r2)) { /* complain? */ EVTHR_IS_INQ(task, EVTHR_EV_DEL); DPRINTF(0, (stderr, "[%ld] sev=ERROR, task_del failed=%x\n", (long) tid, r2)); sm_log_write(evthr_ctx->evthr_c_lctx, EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER, SM_LOG_ERROR, 1, "sev=ERROR, func=evthr_worker, evthr_task_del=%d" , r2); } } if (evthr_act_term(ret)) { c = EVTHR_STOP; status = write(wrpipe(evthr_ctx), (void *) &c, 1); if (status != 1) { r2 = errno; DPRINTF(0, (stderr, "[%ld] sev=ERROR, write %c failed=%d\n", (long) tid, c, status)); sm_log_write(evthr_ctx->evthr_c_lctx, EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER, SM_LOG_ERROR, 1, "sev=ERROR, func=evthr_worker, write=%d, errno=%d" , status, r2); } } #if SM_LOCK_TASK if (!evthr_act_async(ret)) { status = pthread_mutex_unlock(&task->evthr_t_mutex); if (status != 0) { DPRINTF(0, (stderr, "[%ld] sev=ERROR, worker, task=%p, unlock=%d\n", (long) tid, task, status)); sm_log_write(evthr_ctx->evthr_c_lctx, EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER, SM_LOG_ERROR, 1, "sev=ERROR, func=evthr_worker, mutex=t_mutex, unlock=%d" , status); return NULL; } } #endif /* SM_LOCK_TASK */ } /* ** If there are no more work requests, and the servers ** have been asked to quit, then shut down. */ if (EVTHR_RUNQ_EMPTY_ALL(evthr_ctx, prio) && EVTHR_IS_FLAG(evthr_ctx, EVTHR_FL_STOP)) { DPRINTF(6, (stderr, "[%ld] worker shutting down\n", (long) tid)); SM_ASSERT(evthr_ctx->evthr_c_cur > 0); --evthr_ctx->evthr_c_cur; if (evthr_ctx->evthr_c_cur <= evthr_ctx->evthr_c_max_s) EVTHR_CLR_FLAG(evthr_ctx, EVTHR_FL_SL_EXC); /* ** NOTE: Just to prove that every rule has an ** exception, I'm using the "cv" condition for two ** separate predicates here. That's OK, since the ** case used here applies only once during the life ** of a work queue -- during rundown. The overhead ** is minimal and it's not worth creating a separate ** condition variable that would be waited and ** signaled exactly once! */ if (evthr_ctx->evthr_c_cur == 0) (void) pthread_cond_broadcast(&evthr_ctx->evthr_c_cv); (void) pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut); return NULL; } /* ** If there's no more work, and task waited for as long as ** task is allowed, then terminate this server thread. */ if (EVTHR_RUNQ_EMPTY_ALL(evthr_ctx, prio) && timedout && evthr_ctx->evthr_c_cur > evthr_ctx->evthr_c_min) { DPRINTF(5, (stderr, "[%ld] worker terminating due to timeout.\n", (long) tid)); SM_ASSERT(evthr_ctx->evthr_c_cur > 0); --evthr_ctx->evthr_c_cur; if (evthr_ctx->evthr_c_cur <= evthr_ctx->evthr_c_max_s) EVTHR_CLR_FLAG(evthr_ctx, EVTHR_FL_SL_EXC); break; } } pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut); DPRINTF(4, (stderr, "[%ld] worker exiting\n", (long) tid)); return NULL; }