/*
* Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers.
* All rights reserved.
* Copyright (c) 2006 Claus Assmann
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the sendmail distribution.
*/
/*
** partly based on workq.c from Programming with POSIX Threads
** by David Butenhof.
*/
#include "sm/generic.h"
SM_RCSID("@(#)$Id: evthr.c,v 1.127 2007/01/01 02:10:25 ca Exp $")
#include "sm/error.h"
#include "sm/assert.h"
#include "sm/memops.h"
#include "sm/time.h"
#include "sm/heap.h"
#include "sm/evthr.h"
#include "sm/io.h"
#include "sm/socket.h"
#include "evthr-int.h"
#define LIBEVTHR_LOG_DEFINES 1
#include "log.h"
#ifndef EVTHR_DBG_DELAY
# define EVTHR_DBG_DELAY 0
#endif
/*
** Notice:
** The timeout granularity that can be achieved seems to depend on
** the scheduling granularity. Some tests (EVTHR_DBG_DELAY=1) show that
** the time actually waited in select() might differ by up to 20ms
** from the timeout specified. Assuming that we have a scheduling
** granularity of 10ms, this doesn't seem to be unreasonable.
** Question: is this "good enough" or do we need to consider a
** different approach (see the design docs).
** select() timeout deviations:
** FreeBSD 3.2 25 ms
** OSF/1 0.5ms
*/
/*
** Todo: We should use better error codes in here, i.e.,
** include the module number in the error codes!
*/
/*
** A different version of sleep(), safe for pthreads on Solaris?
** Maybe use this conditionally (but how to test?)
** Problem: doesn't complain about errors.
** what about nanosleep() as alternative?
*/
#define sm_sleep(s) do { \
int rs; \
timeval_T st; \
\
st.tv_sec = (s); \
st.tv_usec = 0; \
do \
rs = select(0, NULL, NULL, NULL, &st); \
while (rs < 0 && errno == EINTR); \
} while (0)
/*
** EVTHR_CHG_REQ -- apply change requests
**
** Parameters:
** evthr_ctx -- evthr context
**
** Returns:
** usual sm_error code
**
** Locking:
** locks request queue
** wait queue must be locked by caller.
**
** Last code review: 2006-03-10 02:12:02
** Last code change: 2006-03-10 02:11:58
*/
static sm_ret_T
evthr_chg_req(sm_evthr_ctx_P evthr_ctx)
{
int flags, r;
uint cnt, creqs;
sm_evthr_req_P req;
sm_evthr_task_P task, task_next;
#if EVTHR_PARANOIA
sm_evthr_task_P slptsk;
#endif
SM_IS_EVTHR_CTX(evthr_ctx);
r = pthread_mutex_lock(&evthr_ctx->evthr_c_reqmut);
SM_LOCK_OK(r);
if (r != 0) {
NOTE(NOT_REACHED)
/* complain, sleep, and continue */
DPRINTF(0, (stderr, "ERROR: evthr_chg_req lock=%d\n", r));
return sm_error_perm(SM_EM_EVTHR, r);
}
#if EVTHR_PARANOIA
slptsk = NULL;
#endif
cnt = 0;
/*
** Note: we can't walk through the request list and
** change the tasks since they might not be locked.
** This causes n * m runtime complexity
** (length of wait queue times length of request queue),
** but the length of the request queue should be fairly short;
** maybe one or two entries.
**
** We can't use EVTHR_WAITQ_LOOP() since we may change the order
** of entries in the queue (for sleep tasks).
*/
task = EVTHR_WAITQ_FIRST(evthr_ctx);
while (task != EVTHR_WAITQ_END(evthr_ctx) && evthr_ctx->evthr_c_nreqs > 0)
{
SM_IS_EVTHR_TSK(task);
task_next = EVTHR_WAITQ_NEXT(task);
#if EVTHR_PARANOIA
/*
** Detects tasks that are twice in wait queue
** (directly next to each other only).
** We could use another bit in the status field
** to detect tasks that have already been inspected.
*/
SM_ASSERT(task != slptsk);
SM_ASSERT(!EVTHR_ALREADY_CHK(task));
EVTHR_CHECKED(task);
slptsk = task;
#endif /* EVTHR_PARANOIA */
for (req = EVTHR_REQ_FIRST(evthr_ctx), creqs = 0;
req != EVTHR_REQ_END(evthr_ctx);
req = EVTHR_REQ_NEXT(req), ++creqs)
{
if (task == req->evthr_r_task)
break;
SM_ASSERT(creqs <= evthr_ctx->evthr_c_tot_reqs);
}
if (req == EVTHR_REQ_END(evthr_ctx) || task != req->evthr_r_task)
{
task = task_next;
continue;
}
--evthr_ctx->evthr_c_nreqs;
if (req->evthr_r_chge == EVTHR_CHG_TIME_YES
|| timercmp(&task->evthr_t_sleep, &req->evthr_r_sleep, >))
{
DPRINTF(8, (stderr, "evthr_chg_req: change sleep time for task=%p\n", task));
task->evthr_t_sleep = req->evthr_r_sleep;
/*
** Remove task from list and insert it at right place.
** This might be done more efficiently by
** checking first whether it needs to be moved.
** Notice: we may hit the same task again, but since
** we removed the request from the list that shouldn't
** matter.
*/
EVTHR_WAITQ_DEL(evthr_ctx, task);
EVTHR_REM_FROMQ(task, EVTHR_EV_IWQ);
evthr_slpq_ins(evthr_ctx, task);
}
/* Change event request flags */
flags = req->evthr_r_rqevf;
DPRINTF(8, (stderr, "evthr_chg_req: change event flags for task=%p: old=%x, change=%x, set=%d, clr=%d\n", task, task->evthr_t_rqevf, flags, evthr_r_set(flags), evthr_r_clr(flags)));
if (evthr_r_set(flags) == evthr_r_yes(EVTHR_EV_DEL))
(void) evthr_task_del(evthr_ctx, task, THR_NO_LOCK);
else {
if (evthr_r_set(flags))
evthr_set_ev(task, evthr_r_set_ev(flags));
if (evthr_r_clr(flags))
evthr_clr_ev(task, evthr_r_clr_ev(flags));
}
EVTHR_REQ_CLR(req);
++cnt;
SM_ASSERT(cnt <= evthr_ctx->evthr_c_tasks);
task = task_next;
}
#if EVTHR_PARANOIA
EVTHR_WAITQ_LOOP(evthr_ctx, task)
{
SM_IS_EVTHR_TSK(task);
EVTHR_CLR_CHECKED(task);
}
#endif /* EVTHR_PARANOIA */
r = pthread_mutex_unlock(&evthr_ctx->evthr_c_reqmut);
SM_ASSERT(r == 0);
return SM_SUCCESS;
}
/*
** EVTHR_FINDFD -- find the task in wait queue belonging to a file descriptor
**
** Parameters:
** evthr_ctx -- evthr context
** fd -- activated file descriptor
**
** Returns:
** pointer to task, NULL if not found
**
** Locking:
** wait queue must be locked by caller.
**
** Warnings:
** Doesn't work if there are multiple events for one task.
**
** Last code review: 2006-03-10 02:13:22
** Last code change:
*/
static sm_evthr_task_P
evthr_findfd(sm_evthr_ctx_P evthr_ctx, int fd)
{
uint cnt;
sm_evthr_task_P task;
SM_IS_EVTHR_CTX(evthr_ctx);
if (fd < 0)
return NULL;
/* Take a shortcut */
if ((uint)fd < evthr_ctx->evthr_c_maxfd) {
task = evthr_ctx->evthr_c_fd2t[fd];
if (task != NULL)
return task;
}
for (task = EVTHR_WAITQ_FIRST(evthr_ctx), cnt = 0;
task != EVTHR_WAITQ_END(evthr_ctx);
task = EVTHR_WAITQ_NEXT(task), ++cnt)
{
SM_IS_EVTHR_TSK(task);
if (fd == task->evthr_t_fd)
return task;
SM_ASSERT(cnt <= evthr_ctx->evthr_c_tasks);
}
return NULL;
}
/*
** EVTHR_WAKEUPTHR -- wakeup or start threads to deal with tasks
**
** Parameters:
** evthr_ctx -- evthr context
** tasks -- number of activated tasks
** nbtasks -- number of activated, non-blocking tasks
**
** Returns:
** >=0: number of tasks that have not been started
** <0: usual sm_error code
**
** Locking:
** none
**
** Last code review:
** Last code change:
*/
static sm_ret_T
evthr_wakeupthr(sm_evthr_ctx_P evthr_ctx, uint tasks, uint nbtasks)
{
pthread_t tid;
int status;
uint u, idl, act, started;
sm_ret_T ret;
SM_IS_EVTHR_CTX(evthr_ctx);
started = 0;
idl = evthr_ctx->evthr_c_idl;
act = evthr_ctx->evthr_c_act;
for (u = 0; u < tasks; u++) {
/*
** If any threads are idling, wake one.
*/
DPRINTF(4, (stderr, "wakeup: idl=%u, cur=%u, act=%u, max=%u, max_h=%u, tasks=%u/%u, nbtasks=%u\n",
evthr_ctx->evthr_c_idl, evthr_ctx->evthr_c_cur,
evthr_ctx->evthr_c_act, evthr_ctx->evthr_c_max_s,
evthr_ctx->evthr_c_max_h, u, tasks, nbtasks));
if (idl > 0) {
EVTHR_CLR_FLAG(evthr_ctx, EVTHR_FL_SL_EXC);
status = pthread_cond_signal(&evthr_ctx->evthr_c_cv);
if (status != 0) {
ret = sm_error_perm(SM_EM_EVTHR, status);
goto error;
}
--idl;
++started;
}
else if (evthr_ctx->evthr_c_cur < evthr_ctx->evthr_c_max_s) {
/*
** If there were no idling threads, and we're allowed
** to create a new thread, do so.
*/
EVTHR_CLR_FLAG(evthr_ctx, EVTHR_FL_SL_EXC);
DPRINTF(3, (stderr, "Creating new worker\n"));
status = pthread_create(&tid, NULL, evthr_worker,
(void *) evthr_ctx);
if (status != 0) {
ret = sm_error_perm(SM_EM_EVTHR, status);
goto error;
}
++evthr_ctx->evthr_c_cur;
++act;
++started;
}
else if (act < evthr_ctx->evthr_c_max_h && u >= nbtasks) {
/* possibly blocking task: create a new thread */
DPRINTF(2, (stderr,
"Creating new worker [exceeding softlimit]\n"));
status = pthread_create(&tid, NULL, evthr_worker,
(void *) evthr_ctx);
if (status != 0) {
ret = sm_error_perm(SM_EM_EVTHR, status);
goto error;
}
++evthr_ctx->evthr_c_cur;
++act;
++started;
}
}
/* values don't change... */
DPRINTF(9, (stderr, "wakeup done: idl=%d, cur=%d, max=%d\n",
evthr_ctx->evthr_c_idl, evthr_ctx->evthr_c_cur,
evthr_ctx->evthr_c_max_s));
return started;
error:
return ret;
}
/*
** EVTHR_ACCEPT -- accept() a new connection
**
** Parameters:
** evthr_ctx -- evthr context
** task -- task description
** fd -- file descriptor on which the connection is active
**
** Returns:
** usual sm_error code
**
** Question: should we leave this to the application??
** It's just another "ready for read" file descriptor...
**
** Last code review: 2006-03-10 02:25:54
** Last code change:
*/
static sm_ret_T
evthr_accept(sm_evthr_ctx_P evthr_ctx, sm_evthr_task_P task, int fd)
{
int connfd;
int err0, err1;
sockoptlen_T l;
socklen_T socklen;
struct sockaddr sockaddr;
sm_evthr_nc_P nc;
sm_ret_T ret;
ret = SM_SUCCESS;
l = (sockoptlen_T) sizeof(err1);
err0 = getsockopt(fd, SOL_SOCKET, SO_ERROR, (void *) &err1, &l);
if (err0 == 0 && err1 != 0) {
sm_log_write(evthr_ctx->evthr_c_lctx,
EVTHR_LCAT_EVLOOP, EVTHR_LMOD_ACCEPT,
SM_LOG_ERROR, 1,
"sev=ERROR, func=evthr_accept, fd=%d, getsockopt()=%d",
fd, err1);
}
nc = (sm_evthr_nc_P) sm_zalloc(sizeof(*nc));
if (NULL == nc) {
ret = sm_error_perm(SM_EM_EVTHR, ENOMEM);
goto error;
}
socklen = (socklen_T) sizeof(sockaddr);
connfd = accept(fd, &sockaddr, &socklen);
if (connfd < 0) {
err0 = errno;
sm_log_write(evthr_ctx->evthr_c_lctx,
EVTHR_LCAT_EVLOOP, EVTHR_LMOD_ACCEPT,
SM_LOG_ERROR, err0 == EINTR ? 14 : 6,
"sev=ERROR, func=evthr_accept, fd=%d, accept()=%d",
fd, errno);
ret = sm_error_perm(SM_EM_EVTHR, err0);
goto error;
}
else if (socklen == 0 || /* sockaddr.sin_len == 0 || */
(sockaddr.sa_family != AF_INET &&
sockaddr.sa_family != AF_UNIX))
{
sm_log_write(evthr_ctx->evthr_c_lctx,
EVTHR_LCAT_EVLOOP, EVTHR_LMOD_ACCEPT,
SM_LOG_ERROR, 3,
"sev=ERROR, func=evthr_accept, fd=%d, accept()=bogus_data, socklen=%d, family=%d",
fd, socklen, sockaddr.sa_family);
ret = sm_error_perm(SM_EM_EVTHR, errno == 0 ? EINVAL : errno);
goto error;
}
else {
/*
** Got a new connection, what to do with it??
** Put the task fct into run queue and let it deal with it?
** How do we tell it the new fd (connfd)?
*/
nc->evthr_a_len = socklen;
nc->evthr_a_addr = sockaddr;
nc->evthr_a_fd = connfd;
task->evthr_t_nc = nc;
}
error:
return ret;
}
#if EVTHR_DBG_DELAY
static timeval_T difft;
static timeval_T prevt;
static timeval_T ta, tb, td, ts;
static int cnt = 0;
static bool
chktd(sm_evthr_ctx_P evthr_ctx, timeval_T exp, int slt)
{
timeval_T now, dt;
++cnt;
gettimeofday(&now, NULL);
timersub(&now, &exp, &dt);
if (timercmp(&dt, &difft, >)) {
sm_evthr_task_P task, slptsk;
fprintf(stderr, "loop[%2d]: now=%ld.%06ld\n", cnt, now.tv_sec, now.tv_usec);
ERRPRINTTV(" sleep=", exp);
ERRPRINTTV(" diff=", dt);
ERRPRINTTV(" prevt=", prevt);
ERRPRINTTV(" ts=", ts);
ERRPRINTTV(" td=", td);
fprintf(stderr, " slt=%d\n", slt);
for (task = EVTHR_WAITQ_FIRST(evthr_ctx);
task != EVTHR_WAITQ_END(evthr_ctx);
task = slptsk)
{
if (!evthr_is_slp(task) ||
timercmp(&now, &task->evthr_t_sleep, <))
fprintf(stderr, "not these\n");
ERRPRINTTV(" sleept=", task->evthr_t_sleep);
if (!evthr_is_slp(task) ||
timercmp(&now, &task->evthr_t_sleep, <))
break;
slptsk = EVTHR_WAITQ_NEXT(task);
}
return false;
}
return true;
}
#define CHKDEL \
if (!chktd(evthr_ctx, task->evthr_t_sleep, slt)) \
{ \
if (ftask != task) \
ERRPRINTTV("activate 1st =", ftask->evthr_t_sleep); \
if (timeok) \
{ \
ERRPRINTTV("activate slpt=", task->evthr_t_sleep); \
ERRPRINTTV(" tb=", tb); \
ERRPRINTTV(" ta=", ta); \
ERRPRINTTV(" ts=", ts); \
ERRPRINTTV(" to=", to); \
ERRPRINTTV(" td=", td); \
timersub(&now, &tb, &td); \
ERRPRINTTV(" t0=", td); \
} \
timeok = false; \
}
#define CHKDEL1 \
timersub(&ta, &tb, &ts); \
timersub(&ts, &to, &td); \
if (td.tv_sec < 0 || td.tv_usec < 0) \
timersub(&to, &ts, &td); \
if (r == 0 && timercmp(&td, &difft, >)) \
{ \
ERRPRINTTV("tb=", tb); \
ERRPRINTTV("ta=", ta); \
ERRPRINTTV("ts=", ts); \
ERRPRINTTV("to=", to); \
ERRPRINTTV("td=", td); \
}
#endif /* EVTHR_DBG_DELAY */
#if 0
Control flow for evthr loop:
/* prepare data to wait for */
/*
do we add another layer of abstraction here
(for select/poll/...)?
or do we code this directly?
*/
/* read events */
/* write events */
/* timeout events */
/* wait for an event */
/* check the events that occurred */
/* add the corresponding tasks to the runqueue */
/* signal idle threads that there is new work */
#endif /* 0 */
/*
** EVTHR_LOG_SHE -- log error from signal handler
**
** Parameters:
** evthr_ctx -- evthr context
**
** Returns:
** usual sm_error code
**
** Last code review: 2006-03-10 02:27:21
** Last code change:
*/
static sm_ret_T
evthr_log_she(sm_evthr_ctx_P evthr_ctx)
{
SM_IS_EVTHR_CTX(evthr_ctx);
switch (evthr_ctx->evthr_sige_where) {
case EVTHR_SHE_SIGWAIT:
sm_log_write(evthr_ctx->evthr_c_lctx,
EVTHR_LCAT_SIGNAL, EVTHR_LMOD_SIGNAL,
SM_LOG_ERROR, 1,
"sev=ERROR, func=evthr_signal, sigwait=%d",
evthr_ctx->evthr_sige_what);
break;
case EVTHR_SHE_UNKSIG:
sm_log_write(evthr_ctx->evthr_c_lctx,
EVTHR_LCAT_SIGNAL, EVTHR_LMOD_SIGNAL,
SM_LOG_ERROR, 1,
"sev=ERROR, func=evthr_signal, sigwait=%d",
evthr_ctx->evthr_sige_what);
break;
default:
sm_log_write(evthr_ctx->evthr_c_lctx,
EVTHR_LCAT_SIGNAL, EVTHR_LMOD_SIGNAL,
SM_LOG_ERROR, 1,
"sev=ERROR, func=evthr_signal, status=sigwait_returned_unmasked_signal, signal=%d",
evthr_ctx->evthr_sige_what);
return sm_error_perm(SM_EM_EVTHR, SM_E_UNEXPECTED);
}
return SM_SUCCESS;
}
/*
** EVTHR_LOOP -- check for events for tasks in the wait queue
** puts activated tasks in run queue and notifies worker threads
**
** Parameters:
** evthr_ctx -- evthr context
**
** Returns:
** usual sm_error code
**
** Last code review:
** Last code change:
*/
sm_ret_T
evthr_loop(sm_evthr_ctx_P evthr_ctx)
{
#if HAVE_SELECT
int r, i;
int maxfd, lastfd, fd;
timeval_T to;
fd_set rdfds, wrfds;
fd_set rds, wrs;
char buf[EVTHR_REQ_SIZE];
#endif /* HAVE_SELECT */
uint tasks, nbtasks, locks;
sm_ret_T ret;
sm_evthr_task_P task, slptsk;
timeval_T now, nto;
#if EVTHR_DBG_DELAY
int slt;
bool timeok;
sm_evthr_task_P ftask;
#endif
uint cnt;
bool canstarttask;
sm_evthr_task_P fbtask; /* first blocking task in run queue */
#define WAITQ_LOCK_F 0x01
#define RUNQ_LOCK_F 0x02
#define got_lock(l, w) (l) |= (w)
#define rel_lock(l, w) (l) &= ~(w)
#define is_locked(l, w) (((l) & (w)) != 0)
#define got_waitqlock(l) got_lock((l), WAITQ_LOCK_F)
#define rel_waitqlock(l) rel_lock((l), WAITQ_LOCK_F)
#define got_runqlock(l) got_lock((l), RUNQ_LOCK_F)
#define rel_runqlock(l) rel_lock((l), RUNQ_LOCK_F)
#define is_waitqlock(l) is_locked((l), WAITQ_LOCK_F)
#define is_runqlock(l) is_locked((l), RUNQ_LOCK_F)
SM_IS_EVTHR_CTX(evthr_ctx);
#if HAVE_SELECT
lastfd = rdpipe(evthr_ctx);
#endif
#if EVTHR_DBG_DELAY
difft.tv_sec = 0;
difft.tv_usec = 20000;
nto.tv_sec = 0;
nto.tv_usec = 0;
prevt.tv_sec = 0;
prevt.tv_usec = 0;
#endif /* EVTHR_DBG_DELAY */
/* loop until something tells us to stop */
for (;;) {
#if EVTHR_DBG_DELAY
timeok = true;
slt = 0;
#endif
#if HAVE_SELECT
FD_ZERO(&rdfds);
FD_ZERO(&wrfds);
FD_ZERO(&rds);
FD_ZERO(&wrs);
FD_SET(rdpipe(evthr_ctx), &rdfds);
FD_SET(rdpipe(evthr_ctx), &rds);
maxfd = rdpipe(evthr_ctx);
locks = 0;
to.tv_sec = 5;
to.tv_usec = 0;
r = pthread_mutex_lock(&evthr_ctx->evthr_c_runqmut);
SM_LOCK_OK(r);
if (r != 0) {
NOTE(NOT_REACHED)
DPRINTF(0, (stderr, "ERROR: main loop can't get runq mutex=%d\n", r));
canstarttask = true;
}
else {
canstarttask = (evthr_ctx->evthr_c_act < evthr_ctx->evthr_c_max_s);
r = pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut);
SM_ASSERT(r == 0);
}
DPRINTF(9, (stderr, "main loop\n"));
r = pthread_mutex_lock(&evthr_ctx->evthr_c_waitqmut);
SM_LOCK_OK(r);
DPRINTF(9, (stderr, "main loop: got waitq lock=%d\n", r));
if (r != 0) {
NOTE(NOT_REACHED)
/* complain, sleep, and continue */
DPRINTF(0, (stderr, "ERROR: main loop didn't get waitq lock: %d\n", r));
continue;
/* break if too many errors? */
}
got_waitqlock(locks);
ret = evthr_chg_req(evthr_ctx);
if (sm_is_err(ret)) {
/* complain, sleep, and continue */
DPRINTF(0, (stderr, "ERROR: evthr_chg_req=%x\n", ret));
r = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut);
rel_waitqlock(locks);
continue;
}
/* Retrieve earliest wakeup time (wait queue is sorted!) */
if (!EVTHR_WAITQ_EMPTY(evthr_ctx)) {
task = EVTHR_WAITQ_FIRST(evthr_ctx);
SM_IS_EVTHR_TSK(task);
if (evthr_is_slp(task) &&
(canstarttask || !EVTHRT_IS_FLAG(task, EVTHRT_FL_BLK_SL)))
{
/*
** This should be "our" function
** that deals with errors
*/
r = gettimeofday(&now, NULL);
timersub(&task->evthr_t_sleep, &now, &nto);
#if EVTHR_DBG_DELAY
prevt = nto;
#endif
if (nto.tv_sec < 0)
to.tv_sec = 0;
else
to = nto;
}
}
#if EVTHR_PARANOIA
slptsk = NULL;
#endif
cnt = 0;
EVTHR_WAITQ_LOOP(evthr_ctx, task) {
SM_IS_EVTHR_TSK(task);
#if EVTHR_PARANOIA
/*
** Detects tasks that are twice in wait queue
** (directly next to each other only).
** We could use another bit in the status field
** to detect tasks that have already been inspected.
*/
SM_ASSERT(task != slptsk);
SM_ASSERT(!EVTHR_ALREADY_CHK(task));
EVTHR_CHECKED(task);
slptsk = task;
#endif /* EVTHR_PARANOIA */
r = evthr_rqevents(task);
fd = task->evthr_t_fd;
DPRINTF(3, (stderr, "main loop: got task=%p, fd=%d, rqevents=%x, flags=%x, canstart=%d\n",
task, fd, r, task->evthr_t_flags,
canstarttask));
if ((evthr_is_rdf(r) || evthr_is_lif(r)) &&
(canstarttask || !EVTHRT_IS_FLAG(task, EVTHRT_FL_BLK_RD)))
{
SM_ASSERT(is_valid_fd(fd));
FD_SET(fd, &rds);
FD_SET(fd, &rdfds);
SET_MAX(maxfd, fd);
}
if (evthr_is_wrf(r) &&
(canstarttask || !EVTHRT_IS_FLAG(task, EVTHRT_FL_BLK_WR)))
{
SM_ASSERT(is_valid_fd(fd));
FD_SET(fd, &wrfds);
FD_SET(fd, &wrs);
SET_MAX(maxfd, fd);
}
/* Skip over SLeep entries */
++cnt;
SM_ASSERT(cnt <= evthr_ctx->evthr_c_tasks);
}
#if EVTHR_PARANOIA
EVTHR_WAITQ_LOOP(evthr_ctx, task) {
SM_IS_EVTHR_TSK(task);
EVTHR_CLR_CHECKED(task);
}
#endif /* EVTHR_PARANOIA */
r = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut);
rel_waitqlock(locks);
tasks = 0;
nbtasks = 0;
DPRINTF(4, (stderr,
"main loop select, maxfd=%d, tmout=%ld.%06ld\n",
maxfd + 1, to.tv_sec, to.tv_usec));
#if EVTHR_DBG_DELAY
gettimeofday(&tb, NULL);
#endif
r = select(maxfd + 1, &rdfds, &wrfds, NULL, &to);
gettimeofday(&now, NULL);
/* make sure time increases */
if (timercmp(&now, &evthr_ctx->evthr_c_time, >))
evthr_ctx->evthr_c_time = now;
#if EVTHR_DBG_DELAY
ta = evthr_ctx->evthr_c_time;
#endif
DPRINTF(4, (stderr, "main loop select=%d, errno=%d\n", r, errno));
if (r < 0) {
if (errno != EINTR) {
sm_log_write(evthr_ctx->evthr_c_lctx,
EVTHR_LCAT_EVLOOP, EVTHR_LMOD_ACCEPT,
SM_LOG_ERROR, 1,
"sev=ERROR, func=evthr_loop, fd=%d, select()=%d, error=%s",
maxfd, r, strerror(errno));
sm_sleep(1);
}
continue;
}
#if EVTHR_DBG_DELAY
CHKDEL1
#endif
r = pthread_mutex_lock(&evthr_ctx->evthr_c_runqmut);
SM_LOCK_OK(r);
if (r != 0) {
NOTE(NOT_REACHED)
DPRINTF(0, (stderr, "sev=ERROR, main loop can't get runq mutex=%d\n", r));
continue;
}
got_runqlock(locks);
r = pthread_mutex_lock(&evthr_ctx->evthr_c_waitqmut);
SM_LOCK_OK(r);
if (r != 0) {
NOTE(NOT_REACHED)
DPRINTF(0, (stderr, "sev=ERROR, main loop can't get waitq mutex=%d\n", r));
r = pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut);
rel_runqlock(locks);
continue;
}
got_waitqlock(locks);
EVTHR_CLR_WAKEUP(evthr_ctx);
/* first blocking task appended to run queue */
fbtask = NULL;
/*
* Move task from wait queue to run queue.
* Note: this deals with potentially blocking tasks and keeps track of
* the first one. If priorities are used, the priority of a potentially
* blocking task is forced to be the lowest priority (== largest value).
*
* locking: waitq and runq must be locked.
*/
#if EVTHR_PRIOS <= 1
# define EVTHR_T_INC_PRIO(evthr_ctx, task) SM_NOOP
#else
# define EVTHR_T_INC_PRIO(evthr_ctx, task) do { \
if ((evthr_ctx)->evthr_c_nprio > 1) { \
++EVTHR_T_PRIO(task); \
if (EVTHR_T_PRIO(task) >= (evthr_ctx)->evthr_c_nprio) \
EVTHR_T_PRIO_SET(task, 0); \
} \
} while (0)
#endif /* EVTHR_PRIOS <= 1 */
/* See NOTE above! */
#define WQ2RQ(evthr_ctx, task) do { \
EVTHR_WAITQ_DEL(evthr_ctx, task); \
if (EVTHRT_IS_FLAG(task, EVTHRT_FL_BLOCK)) { \
EVTHR_T_PRIO_SET(task, EVTHR_PRIOS - 1); \
EVTHR_RUNQ_APP(evthr_ctx, EVTHR_T_PRIO(task), task); \
if (NULL == fbtask) \
fbtask = task; \
} \
else if (NULL == fbtask \
|| EVTHR_T_PRIO(task) < EVTHR_PRIOS - 1) { \
++nbtasks; \
EVTHR_RUNQ_APP(evthr_ctx, EVTHR_T_PRIO(task), task); \
} \
else { \
++nbtasks; \
EVTHR_RUNQ_INS(evthr_ctx, EVTHR_T_PRIO(task), fbtask, task);\
} \
EVTHR_IS_INQ(task, EVTHR_EV_IRQ); \
++tasks; \
EVTHR_T_INC_PRIO(evthr_ctx, task); \
} while (0)
/* Check I/O activity */
/*
** todo: Abstract this by introducing macros to loop through the fds
** then it can be used for select(), poll(), ... (hopefully)
*/
i = lastfd;
do {
/* Optimization: stop when # of ready fds reached */
/*
** Should we keep the listening fd's as entries in the wait queue??
** Make it configurable? (another bit in _ev).
*/
/* note: this "if/else if" is a bit stange to decrease indentation */
if (FD_ISSET(i, &rds) && FD_ISSET(i, &rdfds) && rdpipe(evthr_ctx) == i)
{
DPRINTF(8, (stderr, "main loop pipe\n"));
r = read(rdpipe(evthr_ctx), buf, 1);
DPRINTF(4, (stderr, "main loop pipe=%d, buf=%c\n", r, buf[0]));
if (r < 0)
continue; /* complain */
else if (r == 0)
continue; /* complain */
ret = SM_SUCCESS;
switch (buf[0]) {
case EVTHR_STOP:
case EVTHR_ABRT:
goto done;
case EVTHR_CONT:
break;
case EVTHR_USR1:
case EVTHR_USR2:
r = EVTHR_WHY2IDX(buf[0]);
if (r < 0 || r >= EVTHR_MAX_SIGS)
break;
task = evthr_ctx->evthr_c_sg2t[r];
if (NULL == task)
break;
/* better check than this? */
if (!evthr_is_inwq(task))
break;
EVTHR_GOT_EV(task, EVTHR_EV_SG_Y);
WQ2RQ(evthr_ctx, task);
break;
case EVTHR_ERROR:
evthr_log_she(evthr_ctx);
break;
default:
sm_log_write(evthr_ctx->evthr_c_lctx,
EVTHR_LCAT_EVLOOP, EVTHR_LMOD_ACCEPT,
SM_LOG_ERROR, 1,
"sev=ERROR, func=evthr_loop, rd_pipe=%#x, status=unknown_value"
, buf[0]);
break;
}
if (sm_is_err(ret))
continue;
}
else if (FD_ISSET(i, &rds) && FD_ISSET(i, &rdfds)) {
DPRINTF(8, (stderr, "main loop read task(?)\n"));
task = evthr_findfd(evthr_ctx, i);
if (task != NULL) {
SM_IS_EVTHR_TSK(task);
evthr_clr_rdy(task);
r = evthr_rqevents(task);
DPRINTF(3, (stderr, "main loop read task=%p, ev=%x\n", task, r));
if (evthr_is_lif(r)) {
EVTHR_GOT_EV(task, EVTHR_EV_LI_Y);
ret = evthr_accept( evthr_ctx, task, i);
}
else {
EVTHR_GOT_EV(task, EVTHR_EV_RD_Y);
if (FD_ISSET(i, &wrs) && FD_ISSET(i, &wrfds))
EVTHR_GOT_EV(task, EVTHR_EV_WR_Y);
ret = SM_SUCCESS;
}
if (sm_is_success(ret)) {
WQ2RQ(evthr_ctx, task);
if (evthr_is_slp(task) &&
timercmp(&evthr_ctx->evthr_c_time, &task->evthr_t_sleep, >=))
EVTHR_GOT_EV(task, EVTHR_EV_SL_Y);
}
}
}
else if (FD_ISSET(i, &wrs) && FD_ISSET(i, &wrfds)) {
DPRINTF(8, (stderr, "main loop write task\n"));
task = evthr_findfd(evthr_ctx, i);
if (task != NULL) {
evthr_clr_rdy(task);
EVTHR_GOT_EV(task, EVTHR_EV_WR_Y);
if (evthr_is_slp(task) &&
timercmp(&evthr_ctx->evthr_c_time, &task->evthr_t_sleep, >=))
EVTHR_GOT_EV(task, EVTHR_EV_SL_Y);
SM_IS_EVTHR_TSK(task);
DPRINTF(3, (stderr, "main loop write task=%p\n", task));
WQ2RQ(evthr_ctx, task);
}
}
/* NEXT fd */
if (++i > maxfd)
i = 0;
/* while not DONE(fd) */
} while (i != lastfd);
/*
** Check timeouts activity.
** Note: if a task had I/O activity, then it is already
** removed from wait queue, hence even if a timeout occurred
** too, that event will not be noted in the event mask.
*/
#if EVTHR_DBG_DELAY
ftask = EVTHR_WAITQ_FIRST(evthr_ctx);
#endif
for (task = EVTHR_WAITQ_FIRST(evthr_ctx);
task != EVTHR_WAITQ_END(evthr_ctx);
task = slptsk)
{
SM_IS_EVTHR_TSK(task);
if (!evthr_is_slp(task) ||
timercmp(&evthr_ctx->evthr_c_time, &task->evthr_t_sleep, <))
break;
#if EVTHR_DBG_DELAY
CHKDEL
#endif
DPRINTF(3, (stderr, "main loop: add sleep task %p\n", task));
evthr_clr_rdy(task);
EVTHR_GOT_EV(task, EVTHR_EV_SL_Y);
slptsk = EVTHR_WAITQ_NEXT(task);
WQ2RQ(evthr_ctx, task);
#if EVTHR_DBG_DELAY
++slt;
#endif
}
SM_ASSERT(is_waitqlock(locks));
r = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut);
SM_ASSERT(r == 0);
rel_waitqlock(locks);
SM_ASSERT(is_runqlock(locks));
r = pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut);
SM_ASSERT(r == 0);
rel_runqlock(locks);
DPRINTF(8, (stderr, "main loop tasks=%d\n", tasks));
#if EVTHR_DBG_DELAY
if (!timeok) {
gettimeofday(&now, NULL);
ERRPRINTTV("start: now=", now);
}
#endif /* EVTHR_DBG_DELAY */
if (tasks > 0) {
ret = evthr_wakeupthr(evthr_ctx, tasks, nbtasks);
DPRINTF(7, (stderr, "main loop started tasks=%d/%u\n"
, ret, tasks));
}
fbtask = NULL;
#endif /* HAVE_SELECT */
}
done:
if (is_waitqlock(locks)) {
r = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut);
rel_waitqlock(locks);
}
if (is_runqlock(locks)) {
r = pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut);
rel_runqlock(locks);
}
return SM_SUCCESS;
}
/*
** EVTHR_TIMEVAL -- return (cached) time in evthr system
**
** Parameters:
** evthr_ctx -- evthr context
** ct -- time (output)
**
** Returns:
** usual sm_error code
*/
sm_ret_T
evthr_timeval(sm_evthr_ctx_P evthr_ctx, timeval_T *ct)
{
SM_IS_EVTHR_CTX(evthr_ctx);
SM_REQUIRE(ct != NULL);
/*
** Need to lock access to the structure in case it is updated.
** Maybe a read/write lock?
*/
*ct = evthr_ctx->evthr_c_time;
return SM_SUCCESS;
}
/*
** EVTHR_TIME -- return (cached) time in evthr system
**
** Parameters:
** evthr_ctx -- evthr context
**
** Returns:
** time in seconds since the epoch
*/
time_T
evthr_time(sm_evthr_ctx_P evthr_ctx)
{
SM_IS_EVTHR_CTX(evthr_ctx);
/* See above: evthr_timeval() about locking */
return evthr_ctx->evthr_c_time.tv_sec;
}
syntax highlighted by Code2HTML, v. 0.9.1