/*
* Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers.
* All rights reserved.
* Copyright (c) 2006 Claus Assmann
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the sendmail distribution.
*/
/*
** partly based on workq.c from Programming with POSIX Threads
** by David Butenhof.
*/
#include "sm/generic.h"
SM_RCSID("@(#)$Id: worker.c,v 1.48 2006/10/30 02:35:44 ca Exp $")
#include "sm/error.h"
#include "sm/assert.h"
#include "sm/memops.h"
#include "sm/time.h"
#include "sm/heap.h"
#include "sm/evthr.h"
#include "sm/io.h"
#include "sm/socket.h"
#include "evthr-int.h"
#include "log.h"
#if EVTHR_PRIOS > 1
/*
** EVTHR_RUNQ_EMPTY_ALL -- are all run queues empty?
**
** Parameters:
** evthr_ctx -- evthr context
** pprio -- (pointer to) priority of non-empty queue
** >=0: priority of queue that isn't empty
** <0: all queues empty
**
** Returns:
** true iff all run queues are empty
*/
static int
evthr_runq_empty_all(sm_evthr_ctx_P evthr_ctx, int *pprio)
{
int i;
SM_REQUIRE(pprio != NULL);
for (i = 0; i < evthr_ctx->evthr_c_nprio; i++) {
if (!EVTHR_RUNQ_EMPTY(evthr_ctx, i)) {
*pprio = i;
return false;
}
}
*pprio = -1;
return true;
}
#endif /* EVTHR_PRIOS > 1 */
#define WAKEUP_SCHEDULER(evthr_ctx, c, status, tid) do { \
if (!EVTHR_IS_WAKEUP(evthr_ctx)) { \
c = EVTHR_CONT; \
status = write(wrpipe(evthr_ctx), (void *) &c, 1);\
if (status != 1) { \
DPRINTF(0, (stderr, "[%ld] sev=ERROR, write %c failed=%d\n", \
(long) tid, c, status)); \
} \
else { \
EVTHR_SET_WAKEUP(evthr_ctx); \
DPRINTF(8, (stderr, "func=evthr_worker, where=waitq, status=wakeup\n")); \
} \
} \
} while (0)
/*
** EVTHR_WORKER -- worker thread
**
** Several of these can be started as threads, they check the
** runq and execute the functions stored in the tasks.
**
** Parameters:
** arg -- evthr context
**
** Returns:
** ???
**
** Todo: deal better with failures, see for example EVTHR_EV_WWQ.
*/
void *
evthr_worker(void *arg)
{
sm_evthr_ctx_P evthr_ctx;
timespec_T timeout;
timeval_T now;
int status;
bool timedout;
char c;
sm_ret_T ret, r2;
pthread_t tid;
#if EVTHR_PRIOS <= 1
# define prio 0
#else
int prio;
#endif
/* timeouts for waiting for work: */
#define EVTHR_WORKER_TO_N 60 /* normal (increase even further?) */
#define EVTHR_WORKER_TO_M 5 /* if more than min workers */
#define EVTHR_WORKER_TO_S 1 /* if more than soft limit */
SM_REQUIRE(arg != NULL);
evthr_ctx = (sm_evthr_ctx_P) arg;
tid = pthread_self();
status = pthread_detach(tid);
if (status != 0)
DPRINTF(0, (stderr, "[%ld] worker detach failed=%d\n",
(long) tid, status));
/* wait for work */
/*
** We don't need to validate the workq_t here... task don't
** create server threads until requests are queued (the
** queue has been initialized by then!) and task wait for all
** server threads to terminate before destroying a work queue.
*/
DPRINTF(5, (stderr, "[%ld] worker is starting\n", (long) tid));
status = pthread_mutex_lock(&evthr_ctx->evthr_c_runqmut);
SM_LOCK_OK(status);
if (status != 0) {
NOTE(NOT_REACHED)
return NULL;
}
while (true) {
timedout = false;
DPRINTF(7, (stderr, "[%ld] worker waiting for work\n", (long) tid));
gettimeofday(&now, NULL);
/* make sure time increases */
if (timercmp(&now, &evthr_ctx->evthr_c_time, >))
SM_TIMEVAL_TO_TIMESPEC(&now, &timeout);
else
SM_TIMEVAL_TO_TIMESPEC(&evthr_ctx->evthr_c_time,
&timeout);
if (evthr_ctx->evthr_c_cur > evthr_ctx->evthr_c_max_s) {
timespec_T delay;
delay.tv_sec = 0;
delay.tv_nsec = 10000000; /* 10ms */
sm_timespecadd(&timeout, &delay, &timeout);
}
else if (evthr_ctx->evthr_c_cur > evthr_ctx->evthr_c_min) {
/* evthr_ctx->evthr_c_cur <= evthr_ctx->evthr_c_max_s */
timeout.tv_sec += EVTHR_WORKER_TO_M +
evthr_ctx->evthr_c_max_s - evthr_ctx->evthr_c_cur;
}
else
timeout.tv_sec += EVTHR_WORKER_TO_N;
while (EVTHR_RUNQ_EMPTY_ALL(evthr_ctx, prio) &&
!EVTHR_IS_FLAG(evthr_ctx, EVTHR_FL_STOP)) {
/*
** "Go idle" waiting for work. Each server will wait
** up to EVTHR_WORKER_TO_x seconds for work,
** and then give up if there are enough threads.
**
** Note: if the system should stop then cv is
** signalled, hence the timeout can be large.
*/
++evthr_ctx->evthr_c_idl;
status = pthread_cond_timedwait(&evthr_ctx->evthr_c_cv,
&evthr_ctx->evthr_c_runqmut, &timeout);
SM_ASSERT(evthr_ctx->evthr_c_idl > 0);
--evthr_ctx->evthr_c_idl;
/* Got work? */
if (!EVTHR_RUNQ_EMPTY_ALL(evthr_ctx, prio))
break;
/* If the wait timed out queued, quit */
if (status == ETIMEDOUT) {
timedout = true;
break;
}
else if (status != 0) {
/*
** This shouldn't happen, so the work queue
** package should fail. Because the work queue
** API is asynchronous, that would add
** complication. Because the chances of failure
** are slim, I choose to avoid that
** complication. The server thread will return,
** and allow another server thread to pick up
** the work later. Note that, if this was the
** only server thread, the queue won't be
** serviced until a new work item is queued.
** That could be fixed by creating a new server
** here.
*/
sm_log_write(evthr_ctx->evthr_c_lctx,
EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER,
SM_LOG_ERROR, 1,
"sev=ERROR, func=evthr_worker, timed_wait=%d"
, status);
SM_ASSERT(evthr_ctx->evthr_c_cur > 0);
--evthr_ctx->evthr_c_cur;
if (evthr_ctx->evthr_c_cur <= evthr_ctx->evthr_c_max_s)
EVTHR_CLR_FLAG(evthr_ctx, EVTHR_FL_SL_EXC);
(void) pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut);
return NULL;
}
}
/*
** should we look for a non empty queue again or just check
** the one that we found? that is:
** if (prio > 0 && !EVTHR_RUNQ_EMPTY_ALL(evthr_ctx, prio))
*/
if (!EVTHR_RUNQ_EMPTY_ALL(evthr_ctx, prio)) {
sm_evthr_task_P task;
evthr_task_F *fct;
task = EVTHR_RUNQ_FIRST(evthr_ctx, prio);
SM_IS_EVTHR_TSK(task);
#if SM_LOCK_TASK
status = pthread_mutex_lock(&task->evthr_t_mutex);
SM_LOCK_OK(status);
if (status != 0) {
NOTE(NOT_REACHED)
DPRINTF(0, (stderr,
"[%ld] sev=ERROR, worker, task=%p, lock=%d\n",
(long) tid, task, status));
sm_log_write(evthr_ctx->evthr_c_lctx,
EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER,
SM_LOG_ERROR, 1,
"sev=ERROR, func=evthr_worker, mutex=t_mutex, lock=%d"
, status);
return NULL;
}
#endif /* SM_LOCK_TASK */
DPRINTF(7, (stderr, "[%ld] worker, task=%p, stop=%d\n",
(long) tid, task,
EVTHR_IS_FLAG(evthr_ctx, EVTHR_FL_STOP)));
fct = task->evthr_t_fct;
EVTHR_RUNQ_DEL(evthr_ctx, prio, task);
EVTHR_REM_FROMQ(task, EVTHR_EV_IRQ);
++evthr_ctx->evthr_c_act;
status = pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut);
if (status != 0) {
sm_log_write(evthr_ctx->evthr_c_lctx,
EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER,
SM_LOG_ERROR, 1,
"sev=ERROR, func=evthr_worker, mutex=runq_mutex, unlock=%d"
, status);
SM_ASSERT(evthr_ctx->evthr_c_act > 0);
--evthr_ctx->evthr_c_act;
return NULL;
}
DPRINTF(4, (stderr, "[%ld] worker task=%p, flags=%X\n",
(long) tid, task, task->evthr_t_flags));
ret = fct(task);
evthr_clr_rdy(task);
/*
** free nc here? is this safe (locked?)
** It might be safer to copy it into a local variable
** and free evthr_t_nc above.
** However, we also pass sleep time directly...
** Maybe we should use also a local variable for it.
*/
SM_FREE(task->evthr_t_nc);
DPRINTF(4, (stderr, "[%ld] worker task=%p, ret=%x\n",
(long) tid, task, ret));
status = pthread_mutex_lock(&evthr_ctx->evthr_c_runqmut);
SM_LOCK_OK(status);
if (status != 0) {
NOTE(NOT_REACHED)
return NULL;
}
SM_ASSERT(evthr_ctx->evthr_c_act > 0);
--evthr_ctx->evthr_c_act;
/* check whether fct wants to change event mask */
/* Do this only if not ASYNC? */
if (!evthr_act_async(ret)) {
if (evthr_r_set(ret))
evthr_set_ev(task, evthr_r_set_ev(ret));
if (evthr_r_clr(ret))
evthr_clr_ev(task, evthr_r_clr_ev(ret));
}
/* check what to do with task */
if (evthr_act_waitq(ret)) {
EVTHR_WANTS_INQ(task, EVTHR_EV_WWQ);
status = pthread_mutex_lock(&evthr_ctx->evthr_c_waitqmut);
SM_LOCK_OK(status);
if (status != 0) {
NOTE(NOT_REACHED)
#if SM_LOCK_TASK
sm_log_write(evthr_ctx->evthr_c_lctx,
EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER,
SM_LOG_ERROR, 1,
"sev=ERROR, func=evthr_worker, mutex=waitq_mutex, lock=%d"
, status);
(void) pthread_mutex_unlock(&task->evthr_t_mutex);
#endif
return NULL;
}
EVTHR_WAITQ_APP(evthr_ctx, task);
WAKEUP_SCHEDULER(evthr_ctx, c, status, tid);
EVTHR_IS_INQ(task, EVTHR_EV_IWQ);
status = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut);
if (status != 0) {
sm_log_write(evthr_ctx->evthr_c_lctx,
EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER,
SM_LOG_ERROR, 1,
"sev=ERROR, func=evthr_worker, mutex=waitq_mutex, unlock=%d"
, status);
SM_ASSERT(status == 0);
}
}
else if (evthr_act_slpq(ret)) {
EVTHR_WANTS_INQ(task, EVTHR_EV_WWQ);
status = pthread_mutex_lock(&evthr_ctx->evthr_c_waitqmut);
SM_LOCK_OK(status);
if (status != 0) {
NOTE(NOT_REACHED)
#if SM_LOCK_TASK
(void) pthread_mutex_unlock(&task->evthr_t_mutex);
#endif
DPRINTF(0, (stderr, "[%ld] sev=ERROR, lock-waitq failed=%d\n",
(long) tid, status));
sm_log_write(evthr_ctx->evthr_c_lctx,
EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER,
SM_LOG_ERROR, 1,
"sev=ERROR, func=evthr_worker, mutex=waitq_mutex, lock=%d"
, status);
return NULL; /* no good! */
}
evthr_slpq_ins(evthr_ctx, task);
WAKEUP_SCHEDULER(evthr_ctx, c, status, tid);
status = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut);
if (status != 0) {
sm_log_write(evthr_ctx->evthr_c_lctx,
EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER,
SM_LOG_ERROR, 1,
"sev=ERROR, func=evthr_worker, mutex=waitq_mutex, unlock=%d"
, status);
SM_ASSERT(status == 0);
}
}
else if (evthr_act_runq(ret)) {
/* we have the mutex for runq */
EVTHR_RUNQ_APP(evthr_ctx, prio, task);
}
else if (evthr_act_del(ret)) {
r2 = evthr_task_del(evthr_ctx, task, THR_LOCK_UNLOCK);
if (sm_is_err(r2)) {
/* complain? */
EVTHR_IS_INQ(task, EVTHR_EV_DEL);
DPRINTF(0, (stderr, "[%ld] sev=ERROR, task_del failed=%x\n",
(long) tid, r2));
sm_log_write(evthr_ctx->evthr_c_lctx,
EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER,
SM_LOG_ERROR, 1,
"sev=ERROR, func=evthr_worker, evthr_task_del=%d"
, r2);
}
}
if (evthr_act_term(ret)) {
c = EVTHR_STOP;
status = write(wrpipe(evthr_ctx), (void *) &c, 1);
if (status != 1) {
r2 = errno;
DPRINTF(0, (stderr, "[%ld] sev=ERROR, write %c failed=%d\n",
(long) tid, c, status));
sm_log_write(evthr_ctx->evthr_c_lctx,
EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER,
SM_LOG_ERROR, 1,
"sev=ERROR, func=evthr_worker, write=%d, errno=%d"
, status, r2);
}
}
#if SM_LOCK_TASK
if (!evthr_act_async(ret)) {
status = pthread_mutex_unlock(&task->evthr_t_mutex);
if (status != 0) {
DPRINTF(0, (stderr, "[%ld] sev=ERROR, worker, task=%p, unlock=%d\n",
(long) tid, task, status));
sm_log_write(evthr_ctx->evthr_c_lctx,
EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER,
SM_LOG_ERROR, 1,
"sev=ERROR, func=evthr_worker, mutex=t_mutex, unlock=%d"
, status);
return NULL;
}
}
#endif /* SM_LOCK_TASK */
}
/*
** If there are no more work requests, and the servers
** have been asked to quit, then shut down.
*/
if (EVTHR_RUNQ_EMPTY_ALL(evthr_ctx, prio) &&
EVTHR_IS_FLAG(evthr_ctx, EVTHR_FL_STOP)) {
DPRINTF(6, (stderr, "[%ld] worker shutting down\n", (long) tid));
SM_ASSERT(evthr_ctx->evthr_c_cur > 0);
--evthr_ctx->evthr_c_cur;
if (evthr_ctx->evthr_c_cur <= evthr_ctx->evthr_c_max_s)
EVTHR_CLR_FLAG(evthr_ctx, EVTHR_FL_SL_EXC);
/*
** NOTE: Just to prove that every rule has an
** exception, I'm using the "cv" condition for two
** separate predicates here. That's OK, since the
** case used here applies only once during the life
** of a work queue -- during rundown. The overhead
** is minimal and it's not worth creating a separate
** condition variable that would be waited and
** signaled exactly once!
*/
if (evthr_ctx->evthr_c_cur == 0)
(void) pthread_cond_broadcast(&evthr_ctx->evthr_c_cv);
(void) pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut);
return NULL;
}
/*
** If there's no more work, and task waited for as long as
** task is allowed, then terminate this server thread.
*/
if (EVTHR_RUNQ_EMPTY_ALL(evthr_ctx, prio) && timedout &&
evthr_ctx->evthr_c_cur > evthr_ctx->evthr_c_min) {
DPRINTF(5, (stderr,
"[%ld] worker terminating due to timeout.\n",
(long) tid));
SM_ASSERT(evthr_ctx->evthr_c_cur > 0);
--evthr_ctx->evthr_c_cur;
if (evthr_ctx->evthr_c_cur <= evthr_ctx->evthr_c_max_s)
EVTHR_CLR_FLAG(evthr_ctx, EVTHR_FL_SL_EXC);
break;
}
}
pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut);
DPRINTF(4, (stderr, "[%ld] worker exiting\n", (long) tid));
return NULL;
}
syntax highlighted by Code2HTML, v. 0.9.1