/*
 * Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers.
 *	All rights reserved.
 * Copyright (c) 2006 Claus Assmann
 *
 * By using this file, you agree to the terms and conditions set
 * forth in the LICENSE file which can be found at the top level of
 * the sendmail distribution.
 */

/*
**  partly based on workq.c from Programming with POSIX Threads
**  by David Butenhof.
*/

#include "sm/generic.h"
SM_RCSID("@(#)$Id: evthr.c,v 1.127 2007/01/01 02:10:25 ca Exp $")
#include "sm/error.h"
#include "sm/assert.h"
#include "sm/memops.h"
#include "sm/time.h"
#include "sm/heap.h"
#include "sm/evthr.h"
#include "sm/io.h"
#include "sm/socket.h"
#include "evthr-int.h"
#define LIBEVTHR_LOG_DEFINES 1
#include "log.h"

#ifndef EVTHR_DBG_DELAY
# define EVTHR_DBG_DELAY 0
#endif

/*
**  Notice:
**  The timeout granularity that can be achieved seems to depend on
**  the scheduling granularity. Some tests (EVTHR_DBG_DELAY=1) show that
**  the time actually waited in select() might differ by up to 20ms
**  from the timeout specified. Assuming that we have a scheduling
**  granularity of 10ms, this doesn't seem to be unreasonable.
**  Question: is this "good enough" or do we need to consider a
**  different approach (see the design docs).
**  select() timeout deviations:
**  FreeBSD 3.2  25  ms
**  OSF/1         0.5ms
*/

/*
**  Todo: We should use better error codes in here, i.e.,
**  include the module number in the error codes!
*/

/*
**  A different version of sleep(), safe for pthreads on Solaris?
**  Maybe use this conditionally (but how to test?)
**  Problem: doesn't complain about errors.
**  what about nanosleep() as alternative?
*/

#define sm_sleep(s) do {                        \
	int rs;                                     \
	timeval_T st;                               \
		                                        \
	st.tv_sec = (s);                            \
	st.tv_usec = 0;                             \
	do                                          \
		rs = select(0, NULL, NULL, NULL, &st);  \
	while (rs < 0 && errno == EINTR);           \
} while (0)

/*
**  EVTHR_CHG_REQ -- apply change requests
**
**	Parameters:
**		evthr_ctx -- evthr context
**
**	Returns:
**		usual sm_error code
**
**	Locking:
**		locks request queue
**		wait queue must be locked by caller.
**
**	Last code review: 2006-03-10 02:12:02
**	Last code change: 2006-03-10 02:11:58
*/

static sm_ret_T
evthr_chg_req(sm_evthr_ctx_P evthr_ctx)
{
	int flags, r;
	uint cnt, creqs;
	sm_evthr_req_P req;
	sm_evthr_task_P task, task_next;
#if EVTHR_PARANOIA
	sm_evthr_task_P slptsk;
#endif

	SM_IS_EVTHR_CTX(evthr_ctx);
	r = pthread_mutex_lock(&evthr_ctx->evthr_c_reqmut);
	SM_LOCK_OK(r);
	if (r != 0) {
		NOTE(NOT_REACHED)

		/* complain, sleep, and continue */
		DPRINTF(0, (stderr, "ERROR: evthr_chg_req lock=%d\n", r));
		return sm_error_perm(SM_EM_EVTHR, r);
	}

#if EVTHR_PARANOIA
	slptsk = NULL;
#endif
	cnt = 0;

	/*
	**  Note: we can't walk through the request list and
	**  change the tasks since they might not be locked.
	**  This causes n * m runtime complexity
	**  (length of wait queue times length of request queue),
	**  but the length of the request queue should be fairly short;
	**  maybe one or two entries.
	**
	**  We can't use EVTHR_WAITQ_LOOP() since we may change the order
	**  of entries in the queue (for sleep tasks).
	*/

	task = EVTHR_WAITQ_FIRST(evthr_ctx);
	while (task != EVTHR_WAITQ_END(evthr_ctx) && evthr_ctx->evthr_c_nreqs > 0)
	{
		SM_IS_EVTHR_TSK(task);
		task_next = EVTHR_WAITQ_NEXT(task);
#if EVTHR_PARANOIA
		/*
		**  Detects tasks that are twice in wait queue
		**  (directly next to each other only).
		**  We could use another bit in the status field
		**  to detect tasks that have already been inspected.
		*/

		SM_ASSERT(task != slptsk);
		SM_ASSERT(!EVTHR_ALREADY_CHK(task));
		EVTHR_CHECKED(task);
		slptsk = task;
#endif /* EVTHR_PARANOIA */
		for (req = EVTHR_REQ_FIRST(evthr_ctx), creqs = 0;
		     req != EVTHR_REQ_END(evthr_ctx);
		     req = EVTHR_REQ_NEXT(req), ++creqs)
		{
			if (task == req->evthr_r_task)
				break;
			SM_ASSERT(creqs <= evthr_ctx->evthr_c_tot_reqs);
		}
		if (req == EVTHR_REQ_END(evthr_ctx) || task != req->evthr_r_task)
		{
			task = task_next;
			continue;
		}
		--evthr_ctx->evthr_c_nreqs;

		if (req->evthr_r_chge == EVTHR_CHG_TIME_YES
		    || timercmp(&task->evthr_t_sleep, &req->evthr_r_sleep, >))
		{
DPRINTF(8, (stderr, "evthr_chg_req: change sleep time for task=%p\n", task));
			task->evthr_t_sleep = req->evthr_r_sleep;

			/*
			**  Remove task from list and insert it at right place.
			**  This might be done more efficiently by
			**  checking first whether it needs to be moved.
			**  Notice: we may hit the same task again, but since
			**  we removed the request from the list that shouldn't
			**  matter.
			*/

			EVTHR_WAITQ_DEL(evthr_ctx, task);
			EVTHR_REM_FROMQ(task, EVTHR_EV_IWQ);
			evthr_slpq_ins(evthr_ctx, task);
		}

		/* Change event request flags */
		flags = req->evthr_r_rqevf;
DPRINTF(8, (stderr, "evthr_chg_req: change event flags for task=%p: old=%x, change=%x, set=%d, clr=%d\n", task, task->evthr_t_rqevf, flags, evthr_r_set(flags), evthr_r_clr(flags)));
		if (evthr_r_set(flags) == evthr_r_yes(EVTHR_EV_DEL))
			(void) evthr_task_del(evthr_ctx, task, THR_NO_LOCK);
		else {
			if (evthr_r_set(flags))
				evthr_set_ev(task, evthr_r_set_ev(flags));
			if (evthr_r_clr(flags))
				evthr_clr_ev(task, evthr_r_clr_ev(flags));
		}
		EVTHR_REQ_CLR(req);

		++cnt;
		SM_ASSERT(cnt <= evthr_ctx->evthr_c_tasks);
		task = task_next;
	}
#if EVTHR_PARANOIA
	EVTHR_WAITQ_LOOP(evthr_ctx, task)
	{
		SM_IS_EVTHR_TSK(task);
		EVTHR_CLR_CHECKED(task);
	}
#endif /* EVTHR_PARANOIA */
	r = pthread_mutex_unlock(&evthr_ctx->evthr_c_reqmut);
	SM_ASSERT(r == 0);
	return SM_SUCCESS;
}

/*
**  EVTHR_FINDFD -- find the task in wait queue belonging to a file descriptor
**
**	Parameters:
**		evthr_ctx -- evthr context
**		fd -- activated file descriptor
**
**	Returns:
**		pointer to task, NULL if not found
**
**	Locking:
**		wait queue must be locked by caller.
**
**	Warnings:
**		Doesn't work if there are multiple events for one task.
**
**	Last code review: 2006-03-10 02:13:22
**	Last code change:
*/

static sm_evthr_task_P
evthr_findfd(sm_evthr_ctx_P evthr_ctx, int fd)
{
	uint cnt;
	sm_evthr_task_P task;

	SM_IS_EVTHR_CTX(evthr_ctx);
	if (fd < 0)
		return NULL;

	/* Take a shortcut */
	if ((uint)fd < evthr_ctx->evthr_c_maxfd) {
		task = evthr_ctx->evthr_c_fd2t[fd];
		if (task != NULL)
			return task;
	}

	for (task = EVTHR_WAITQ_FIRST(evthr_ctx), cnt = 0;
	     task != EVTHR_WAITQ_END(evthr_ctx);
	     task = EVTHR_WAITQ_NEXT(task), ++cnt)
	{
		SM_IS_EVTHR_TSK(task);
		if (fd == task->evthr_t_fd)
			return task;
		SM_ASSERT(cnt <= evthr_ctx->evthr_c_tasks);
	}
	return NULL;
}

/*
**  EVTHR_WAKEUPTHR -- wakeup or start threads to deal with tasks
**
**	Parameters:
**		evthr_ctx -- evthr context
**		tasks -- number of activated tasks
**		nbtasks -- number of activated, non-blocking tasks
**
**	Returns:
**		>=0: number of tasks that have not been started
**		<0: usual sm_error code
**
**	Locking:
**		none
**
**	Last code review:
**	Last code change:
*/

static sm_ret_T
evthr_wakeupthr(sm_evthr_ctx_P evthr_ctx, uint tasks, uint nbtasks)
{
	pthread_t tid;
	int status;
	uint u, idl, act, started;
	sm_ret_T ret;

	SM_IS_EVTHR_CTX(evthr_ctx);
	started = 0;

	idl = evthr_ctx->evthr_c_idl;
	act = evthr_ctx->evthr_c_act;
	for (u = 0; u < tasks; u++) {
		/*
		**  If any threads are idling, wake one.
		*/

		DPRINTF(4, (stderr, "wakeup: idl=%u, cur=%u, act=%u, max=%u, max_h=%u, tasks=%u/%u, nbtasks=%u\n",
			evthr_ctx->evthr_c_idl, evthr_ctx->evthr_c_cur,
			evthr_ctx->evthr_c_act, evthr_ctx->evthr_c_max_s,
			evthr_ctx->evthr_c_max_h, u, tasks, nbtasks));
		if (idl > 0) {
			EVTHR_CLR_FLAG(evthr_ctx, EVTHR_FL_SL_EXC);
			status = pthread_cond_signal(&evthr_ctx->evthr_c_cv);
			if (status != 0) {
				ret = sm_error_perm(SM_EM_EVTHR, status);
				goto error;
			}
			--idl;
			++started;
		}
		else if (evthr_ctx->evthr_c_cur < evthr_ctx->evthr_c_max_s) {
			/*
			**  If there were no idling threads, and we're allowed
			**  to create a new thread, do so.
			*/

			EVTHR_CLR_FLAG(evthr_ctx, EVTHR_FL_SL_EXC);
			DPRINTF(3, (stderr, "Creating new worker\n"));
			status = pthread_create(&tid, NULL, evthr_worker,
					(void *) evthr_ctx);
			if (status != 0) {
				ret = sm_error_perm(SM_EM_EVTHR, status);
				goto error;
			}
			++evthr_ctx->evthr_c_cur;
			++act;
			++started;
		}
		else if (act < evthr_ctx->evthr_c_max_h && u >= nbtasks) {
			/* possibly blocking task: create a new thread */
			DPRINTF(2, (stderr,
				"Creating new worker [exceeding softlimit]\n"));
			status = pthread_create(&tid, NULL, evthr_worker,
					(void *) evthr_ctx);
			if (status != 0) {
				ret = sm_error_perm(SM_EM_EVTHR, status);
				goto error;
			}
			++evthr_ctx->evthr_c_cur;
			++act;
			++started;
		}
	}

	/* values don't change... */
	DPRINTF(9, (stderr, "wakeup done: idl=%d, cur=%d, max=%d\n",
		evthr_ctx->evthr_c_idl, evthr_ctx->evthr_c_cur,
		evthr_ctx->evthr_c_max_s));

	return started;

 error:
	return ret;
}

/*
**  EVTHR_ACCEPT -- accept() a new connection
**
**	Parameters:
**		evthr_ctx -- evthr context
**		task -- task description
**		fd -- file descriptor on which the connection is active
**
**	Returns:
**		usual sm_error code
**
**  Question: should we leave this to the application??
**  It's just another "ready for read" file descriptor...
**
**	Last code review: 2006-03-10 02:25:54
**	Last code change:
*/

static sm_ret_T
evthr_accept(sm_evthr_ctx_P evthr_ctx, sm_evthr_task_P task, int fd)
{
	int connfd;
	int err0, err1;
	sockoptlen_T  l;
	socklen_T socklen;
	struct sockaddr sockaddr;
	sm_evthr_nc_P nc;
	sm_ret_T ret;

	ret = SM_SUCCESS;
	l = (sockoptlen_T) sizeof(err1);
	err0 = getsockopt(fd, SOL_SOCKET, SO_ERROR, (void *) &err1, &l);
	if (err0 == 0 && err1 != 0) {
		sm_log_write(evthr_ctx->evthr_c_lctx,
			EVTHR_LCAT_EVLOOP, EVTHR_LMOD_ACCEPT,
			SM_LOG_ERROR, 1,
			"sev=ERROR, func=evthr_accept, fd=%d, getsockopt()=%d",
			fd, err1);
	}
	nc = (sm_evthr_nc_P) sm_zalloc(sizeof(*nc));
	if (NULL == nc) {
		ret = sm_error_perm(SM_EM_EVTHR, ENOMEM);
		goto error;
	}
	socklen = (socklen_T) sizeof(sockaddr);
	connfd = accept(fd, &sockaddr, &socklen);
	if (connfd < 0) {
		err0 = errno;

		sm_log_write(evthr_ctx->evthr_c_lctx,
			EVTHR_LCAT_EVLOOP, EVTHR_LMOD_ACCEPT,
			SM_LOG_ERROR, err0 == EINTR ? 14 : 6,
			"sev=ERROR, func=evthr_accept, fd=%d, accept()=%d",
			fd, errno);
		ret = sm_error_perm(SM_EM_EVTHR, err0);
		goto error;
	}
	else if (socklen == 0 || /* sockaddr.sin_len == 0 || */
		 (sockaddr.sa_family != AF_INET &&
		  sockaddr.sa_family != AF_UNIX))
	{
		sm_log_write(evthr_ctx->evthr_c_lctx,
			EVTHR_LCAT_EVLOOP, EVTHR_LMOD_ACCEPT,
			SM_LOG_ERROR, 3,
			"sev=ERROR, func=evthr_accept, fd=%d, accept()=bogus_data, socklen=%d, family=%d",
			fd, socklen, sockaddr.sa_family);
		ret = sm_error_perm(SM_EM_EVTHR, errno == 0 ? EINVAL : errno);
		goto error;
	}
	else {
		/*
		**  Got a new connection, what to do with it??
		**  Put the task fct into run queue and let it deal with it?
		**  How do we tell it the new fd (connfd)?
		*/

		nc->evthr_a_len = socklen;
		nc->evthr_a_addr = sockaddr;
		nc->evthr_a_fd = connfd;
		task->evthr_t_nc = nc;
	}
 error:
	return ret;
}

#if EVTHR_DBG_DELAY
static timeval_T difft;
static timeval_T prevt;
static timeval_T ta, tb, td, ts;
static int cnt = 0;

static bool
chktd(sm_evthr_ctx_P evthr_ctx, timeval_T exp, int slt)
{
	timeval_T now, dt;

	++cnt;
	gettimeofday(&now, NULL);
	timersub(&now, &exp, &dt);
	if (timercmp(&dt, &difft, >)) {
		sm_evthr_task_P task, slptsk;

fprintf(stderr, "loop[%2d]: now=%ld.%06ld\n", cnt, now.tv_sec, now.tv_usec);
ERRPRINTTV("        sleep=", exp);
ERRPRINTTV("         diff=", dt);
ERRPRINTTV("        prevt=", prevt);
ERRPRINTTV("           ts=", ts);
ERRPRINTTV("           td=", td);
fprintf(stderr, "          slt=%d\n", slt);
		for (task = EVTHR_WAITQ_FIRST(evthr_ctx);
		     task != EVTHR_WAITQ_END(evthr_ctx);
		     task = slptsk)
		{
			if (!evthr_is_slp(task) ||
			    timercmp(&now, &task->evthr_t_sleep, <))
				fprintf(stderr, "not these\n");
ERRPRINTTV("       sleept=", task->evthr_t_sleep);
			if (!evthr_is_slp(task) ||
			    timercmp(&now, &task->evthr_t_sleep, <))
				break;
			slptsk = EVTHR_WAITQ_NEXT(task);
		}
		return false;
	}
	return true;
}

#define CHKDEL                                              \
if (!chktd(evthr_ctx, task->evthr_t_sleep, slt))            \
{                                                           \
	if (ftask != task)                                      \
		ERRPRINTTV("activate 1st =", ftask->evthr_t_sleep); \
	if (timeok)                                             \
	{                                                       \
		ERRPRINTTV("activate slpt=", task->evthr_t_sleep);  \
		ERRPRINTTV("           tb=", tb);                   \
		ERRPRINTTV("           ta=", ta);                   \
		ERRPRINTTV("           ts=", ts);                   \
		ERRPRINTTV("           to=", to);                   \
		ERRPRINTTV("           td=", td);                   \
		timersub(&now, &tb, &td);                           \
		ERRPRINTTV("           t0=", td);                   \
	}                                                       \
	timeok = false;                                         \
}

#define CHKDEL1                             \
	timersub(&ta, &tb, &ts);                \
	timersub(&ts, &to, &td);                \
	if (td.tv_sec < 0 || td.tv_usec < 0)    \
		timersub(&to, &ts, &td);            \
	if (r == 0 && timercmp(&td, &difft, >)) \
	{                                       \
		ERRPRINTTV("tb=", tb);              \
		ERRPRINTTV("ta=", ta);              \
		ERRPRINTTV("ts=", ts);              \
		ERRPRINTTV("to=", to);              \
		ERRPRINTTV("td=", td);              \
	}
#endif /* EVTHR_DBG_DELAY */

#if 0
Control flow for evthr loop:

/* prepare data to wait for */

/*
do we add another layer of abstraction here
(for select/poll/...)?
or do we code this directly?
*/

/* read events */
/* write events */
/* timeout events */

/* wait for an event */

/* check the events that occurred */
/* add the corresponding tasks to the runqueue */
/* signal idle threads that there is new work */
#endif /* 0 */

/*
**  EVTHR_LOG_SHE -- log error from signal handler
**
**	Parameters:
**		evthr_ctx -- evthr context
**
**	Returns:
**		usual sm_error code
**
**	Last code review: 2006-03-10 02:27:21
**	Last code change:
*/

static sm_ret_T
evthr_log_she(sm_evthr_ctx_P evthr_ctx)
{
	SM_IS_EVTHR_CTX(evthr_ctx);

	switch (evthr_ctx->evthr_sige_where) {
	 case EVTHR_SHE_SIGWAIT:
		sm_log_write(evthr_ctx->evthr_c_lctx,
			EVTHR_LCAT_SIGNAL, EVTHR_LMOD_SIGNAL,
			SM_LOG_ERROR, 1,
			"sev=ERROR, func=evthr_signal, sigwait=%d",
			evthr_ctx->evthr_sige_what);
		break;

	 case EVTHR_SHE_UNKSIG:
		sm_log_write(evthr_ctx->evthr_c_lctx,
			EVTHR_LCAT_SIGNAL, EVTHR_LMOD_SIGNAL,
			SM_LOG_ERROR, 1,
			"sev=ERROR, func=evthr_signal, sigwait=%d",
			evthr_ctx->evthr_sige_what);
		break;

	 default:
		sm_log_write(evthr_ctx->evthr_c_lctx,
			EVTHR_LCAT_SIGNAL, EVTHR_LMOD_SIGNAL,
			SM_LOG_ERROR, 1,
			"sev=ERROR, func=evthr_signal, status=sigwait_returned_unmasked_signal, signal=%d",
			evthr_ctx->evthr_sige_what);
		return sm_error_perm(SM_EM_EVTHR, SM_E_UNEXPECTED);
	}
	return SM_SUCCESS;
}

/*
**  EVTHR_LOOP -- check for events for tasks in the wait queue
**	puts activated tasks in run queue and notifies worker threads
**
**	Parameters:
**		evthr_ctx -- evthr context
**
**	Returns:
**		usual sm_error code
**
**	Last code review:
**	Last code change:
*/

sm_ret_T
evthr_loop(sm_evthr_ctx_P evthr_ctx)
{
#if HAVE_SELECT
	int r, i;
	int maxfd, lastfd, fd;
	timeval_T to;
	fd_set rdfds, wrfds;
	fd_set rds, wrs;
	char buf[EVTHR_REQ_SIZE];
#endif /* HAVE_SELECT */
	uint tasks, nbtasks, locks;
	sm_ret_T ret;
	sm_evthr_task_P task, slptsk;
	timeval_T now, nto;
#if EVTHR_DBG_DELAY
	int slt;
	bool timeok;
	sm_evthr_task_P ftask;
#endif
	uint cnt;
	bool canstarttask;
	sm_evthr_task_P fbtask;	/* first blocking task in run queue */

#define WAITQ_LOCK_F	0x01
#define RUNQ_LOCK_F	0x02
#define got_lock(l, w)	(l) |= (w)
#define rel_lock(l, w)	(l) &= ~(w)
#define is_locked(l, w)	(((l) & (w)) != 0)
#define got_waitqlock(l)	got_lock((l), WAITQ_LOCK_F)
#define rel_waitqlock(l)	rel_lock((l), WAITQ_LOCK_F)
#define got_runqlock(l)		got_lock((l), RUNQ_LOCK_F)
#define rel_runqlock(l)		rel_lock((l), RUNQ_LOCK_F)
#define is_waitqlock(l)		is_locked((l), WAITQ_LOCK_F)
#define is_runqlock(l)		is_locked((l), RUNQ_LOCK_F)

	SM_IS_EVTHR_CTX(evthr_ctx);

#if HAVE_SELECT
	lastfd = rdpipe(evthr_ctx);
#endif

#if EVTHR_DBG_DELAY
	difft.tv_sec = 0;
	difft.tv_usec = 20000;
	nto.tv_sec = 0;
	nto.tv_usec = 0;
	prevt.tv_sec = 0;
	prevt.tv_usec = 0;
#endif /* EVTHR_DBG_DELAY */

	/* loop until something tells us to stop */
	for (;;) {
#if EVTHR_DBG_DELAY
		timeok = true;
		slt = 0;
#endif
#if HAVE_SELECT
		FD_ZERO(&rdfds);
		FD_ZERO(&wrfds);
		FD_ZERO(&rds);
		FD_ZERO(&wrs);
		FD_SET(rdpipe(evthr_ctx), &rdfds);
		FD_SET(rdpipe(evthr_ctx), &rds);
		maxfd = rdpipe(evthr_ctx);

		locks = 0;
		to.tv_sec = 5;
		to.tv_usec = 0;

		r = pthread_mutex_lock(&evthr_ctx->evthr_c_runqmut);
		SM_LOCK_OK(r);
		if (r != 0) {
			NOTE(NOT_REACHED)
			DPRINTF(0, (stderr, "ERROR: main loop can't get runq mutex=%d\n", r));
			canstarttask = true;
		}
		else {
			canstarttask = (evthr_ctx->evthr_c_act < evthr_ctx->evthr_c_max_s);
			r = pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut);
			SM_ASSERT(r == 0);
		}

		DPRINTF(9, (stderr, "main loop\n"));
		r = pthread_mutex_lock(&evthr_ctx->evthr_c_waitqmut);
		SM_LOCK_OK(r);
		DPRINTF(9, (stderr, "main loop: got waitq lock=%d\n", r));
		if (r != 0) {
			NOTE(NOT_REACHED)

			/* complain, sleep, and continue */
			DPRINTF(0, (stderr, "ERROR: main loop didn't get waitq lock: %d\n", r));
			continue;
			/* break if too many errors? */
		}
		got_waitqlock(locks);

		ret = evthr_chg_req(evthr_ctx);
		if (sm_is_err(ret)) {
			/* complain, sleep, and continue */
			DPRINTF(0, (stderr, "ERROR: evthr_chg_req=%x\n", ret));
			r = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut);
			rel_waitqlock(locks);
			continue;
		}

		/* Retrieve earliest wakeup time (wait queue is sorted!) */
		if (!EVTHR_WAITQ_EMPTY(evthr_ctx)) {
			task = EVTHR_WAITQ_FIRST(evthr_ctx);
			SM_IS_EVTHR_TSK(task);
			if (evthr_is_slp(task) &&
			    (canstarttask || !EVTHRT_IS_FLAG(task, EVTHRT_FL_BLK_SL)))
			{
				/*
				**  This should be "our" function
				**  that deals with errors
				*/

				r = gettimeofday(&now, NULL);
				timersub(&task->evthr_t_sleep, &now, &nto);
#if EVTHR_DBG_DELAY
				prevt = nto;
#endif
				if (nto.tv_sec < 0)
					to.tv_sec = 0;
				else
					to = nto;
			}
		}
#if EVTHR_PARANOIA
		slptsk = NULL;
#endif
		cnt = 0;
		EVTHR_WAITQ_LOOP(evthr_ctx, task) {
			SM_IS_EVTHR_TSK(task);
#if EVTHR_PARANOIA
			/*
			**  Detects tasks that are twice in wait queue
			**  (directly next to each other only).
			**  We could use another bit in the status field
			**  to detect tasks that have already been inspected.
			*/

			SM_ASSERT(task != slptsk);
			SM_ASSERT(!EVTHR_ALREADY_CHK(task));
			EVTHR_CHECKED(task);
			slptsk = task;
#endif /* EVTHR_PARANOIA */

			r = evthr_rqevents(task);
			fd = task->evthr_t_fd;
			DPRINTF(3, (stderr, "main loop: got task=%p, fd=%d, rqevents=%x, flags=%x, canstart=%d\n",
				task, fd, r, task->evthr_t_flags,
				canstarttask));
			if ((evthr_is_rdf(r) || evthr_is_lif(r)) &&
			    (canstarttask || !EVTHRT_IS_FLAG(task, EVTHRT_FL_BLK_RD)))
			{
				SM_ASSERT(is_valid_fd(fd));
				FD_SET(fd, &rds);
				FD_SET(fd, &rdfds);
				SET_MAX(maxfd, fd);
			}
			if (evthr_is_wrf(r) &&
			    (canstarttask || !EVTHRT_IS_FLAG(task, EVTHRT_FL_BLK_WR)))
			{
				SM_ASSERT(is_valid_fd(fd));
				FD_SET(fd, &wrfds);
				FD_SET(fd, &wrs);
				SET_MAX(maxfd, fd);
			}
			/* Skip over SLeep entries */

			++cnt;
			SM_ASSERT(cnt <= evthr_ctx->evthr_c_tasks);
		}
#if EVTHR_PARANOIA
		EVTHR_WAITQ_LOOP(evthr_ctx, task) {
			SM_IS_EVTHR_TSK(task);
			EVTHR_CLR_CHECKED(task);
		}
#endif /* EVTHR_PARANOIA */

		r = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut);
		rel_waitqlock(locks);

		tasks = 0;
		nbtasks = 0;
		DPRINTF(4, (stderr,
			"main loop select, maxfd=%d, tmout=%ld.%06ld\n",
			maxfd + 1, to.tv_sec, to.tv_usec));
#if EVTHR_DBG_DELAY
		gettimeofday(&tb, NULL);
#endif
		r = select(maxfd + 1, &rdfds, &wrfds, NULL, &to);
		gettimeofday(&now, NULL);

		/* make sure time increases */
		if (timercmp(&now, &evthr_ctx->evthr_c_time, >))
			evthr_ctx->evthr_c_time = now;
#if EVTHR_DBG_DELAY
		ta = evthr_ctx->evthr_c_time;
#endif
		DPRINTF(4, (stderr, "main loop select=%d, errno=%d\n", r, errno));
		if (r < 0) {
			if (errno != EINTR) {
				sm_log_write(evthr_ctx->evthr_c_lctx,
					EVTHR_LCAT_EVLOOP, EVTHR_LMOD_ACCEPT,
					SM_LOG_ERROR, 1,
					"sev=ERROR, func=evthr_loop, fd=%d, select()=%d, error=%s",
					 maxfd, r, strerror(errno));
				sm_sleep(1);
			}
			continue;
		}
#if EVTHR_DBG_DELAY
		CHKDEL1
#endif

		r = pthread_mutex_lock(&evthr_ctx->evthr_c_runqmut);
		SM_LOCK_OK(r);
		if (r != 0) {
			NOTE(NOT_REACHED)
			DPRINTF(0, (stderr, "sev=ERROR, main loop can't get runq mutex=%d\n", r));
			continue;
		}
		got_runqlock(locks);

		r = pthread_mutex_lock(&evthr_ctx->evthr_c_waitqmut);
		SM_LOCK_OK(r);
		if (r != 0) {
			NOTE(NOT_REACHED)
			DPRINTF(0, (stderr, "sev=ERROR, main loop can't get waitq mutex=%d\n", r));
			r = pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut);
			rel_runqlock(locks);
			continue;
		}
		got_waitqlock(locks);
		EVTHR_CLR_WAKEUP(evthr_ctx);

		/* first blocking task appended to run queue */
		fbtask = NULL;

/*
 *  Move task from wait queue to run queue.
 *  Note: this deals with potentially blocking tasks and keeps track of
 *  the first one. If priorities are used, the priority of a potentially
 *  blocking task is forced to be the lowest priority (== largest value).
 *
 *  locking: waitq and runq must be locked.
 */

#if EVTHR_PRIOS <= 1
# define EVTHR_T_INC_PRIO(evthr_ctx, task)	SM_NOOP
#else
# define EVTHR_T_INC_PRIO(evthr_ctx, task)	do {                  \
		if ((evthr_ctx)->evthr_c_nprio > 1) {                     \
			++EVTHR_T_PRIO(task);                                 \
			if (EVTHR_T_PRIO(task) >= (evthr_ctx)->evthr_c_nprio) \
				EVTHR_T_PRIO_SET(task, 0);                        \
		}                                                         \
	} while (0)
#endif /* EVTHR_PRIOS <= 1 */

/* See NOTE above! */
#define WQ2RQ(evthr_ctx, task)	do {                                    \
		EVTHR_WAITQ_DEL(evthr_ctx, task);                               \
		if (EVTHRT_IS_FLAG(task, EVTHRT_FL_BLOCK)) {                    \
			EVTHR_T_PRIO_SET(task, EVTHR_PRIOS - 1);                    \
			EVTHR_RUNQ_APP(evthr_ctx, EVTHR_T_PRIO(task), task);        \
			if (NULL == fbtask)                                         \
				fbtask = task;                                          \
		}                                                               \
		else if (NULL == fbtask                                         \
			|| EVTHR_T_PRIO(task) < EVTHR_PRIOS - 1) {                  \
			++nbtasks;                                                  \
			EVTHR_RUNQ_APP(evthr_ctx, EVTHR_T_PRIO(task), task);        \
		}                                                               \
		else {                                                          \
			++nbtasks;                                                  \
			EVTHR_RUNQ_INS(evthr_ctx, EVTHR_T_PRIO(task), fbtask, task);\
		}                                                               \
		EVTHR_IS_INQ(task, EVTHR_EV_IRQ);                               \
		++tasks;                                                        \
		EVTHR_T_INC_PRIO(evthr_ctx, task);                              \
	} while (0)

		/* Check I/O activity */
/*
**  todo: Abstract this by introducing macros to loop through the fds
**  then it can be used for select(), poll(), ... (hopefully)
*/
		i = lastfd;
		do {
			/* Optimization: stop when # of ready fds reached */
/*
**  Should we keep the listening fd's as entries in the wait queue??
**  Make it configurable? (another bit in _ev).
*/

			/* note: this "if/else if" is a bit stange to decrease indentation */
			if (FD_ISSET(i, &rds) && FD_ISSET(i, &rdfds) && rdpipe(evthr_ctx) == i)
			{

				DPRINTF(8, (stderr, "main loop pipe\n"));
				r = read(rdpipe(evthr_ctx), buf, 1);
				DPRINTF(4, (stderr, "main loop pipe=%d, buf=%c\n", r, buf[0]));
				if (r < 0)
					continue; /* complain */
				else if (r == 0)
					continue; /* complain */
				ret = SM_SUCCESS;
				switch (buf[0]) {
				 case EVTHR_STOP:
				 case EVTHR_ABRT:
					goto done;
				 case EVTHR_CONT:
					break;
				 case EVTHR_USR1:
				 case EVTHR_USR2:
					r = EVTHR_WHY2IDX(buf[0]);
					if (r < 0 || r >= EVTHR_MAX_SIGS)
						break;
					task = evthr_ctx->evthr_c_sg2t[r];
					if (NULL == task)
						break;

					/* better check than this? */
					if (!evthr_is_inwq(task))
						break;
					EVTHR_GOT_EV(task, EVTHR_EV_SG_Y);
					WQ2RQ(evthr_ctx, task);
					break;

				 case EVTHR_ERROR:
					evthr_log_she(evthr_ctx);
					break;

				 default:
					sm_log_write(evthr_ctx->evthr_c_lctx,
						EVTHR_LCAT_EVLOOP, EVTHR_LMOD_ACCEPT,
						SM_LOG_ERROR, 1,
						"sev=ERROR, func=evthr_loop, rd_pipe=%#x, status=unknown_value"
						, buf[0]);

					break;
				}
				if (sm_is_err(ret))
					continue;
			}
			else if (FD_ISSET(i, &rds) && FD_ISSET(i, &rdfds)) {
				DPRINTF(8, (stderr, "main loop read task(?)\n"));
				task = evthr_findfd(evthr_ctx, i);
				if (task != NULL) {
					SM_IS_EVTHR_TSK(task);
					evthr_clr_rdy(task);
					r = evthr_rqevents(task);
					DPRINTF(3, (stderr, "main loop read task=%p, ev=%x\n", task, r));
					if (evthr_is_lif(r)) {
						EVTHR_GOT_EV(task, EVTHR_EV_LI_Y);
						ret = evthr_accept( evthr_ctx, task, i);
					}
					else {
						EVTHR_GOT_EV(task, EVTHR_EV_RD_Y);
						if (FD_ISSET(i, &wrs) && FD_ISSET(i, &wrfds))
							EVTHR_GOT_EV(task, EVTHR_EV_WR_Y);
						ret = SM_SUCCESS;
					}
					if (sm_is_success(ret)) {
						WQ2RQ(evthr_ctx, task);
						if (evthr_is_slp(task) &&
						    timercmp(&evthr_ctx->evthr_c_time, &task->evthr_t_sleep, >=))
							EVTHR_GOT_EV(task, EVTHR_EV_SL_Y);
					}
				}
			}
			else if (FD_ISSET(i, &wrs) && FD_ISSET(i, &wrfds)) {
				DPRINTF(8, (stderr, "main loop write task\n"));
				task = evthr_findfd(evthr_ctx, i);
				if (task != NULL) {
					evthr_clr_rdy(task);
					EVTHR_GOT_EV(task, EVTHR_EV_WR_Y);
					if (evthr_is_slp(task) &&
					    timercmp(&evthr_ctx->evthr_c_time, &task->evthr_t_sleep, >=))
						EVTHR_GOT_EV(task, EVTHR_EV_SL_Y);
					SM_IS_EVTHR_TSK(task);
					DPRINTF(3, (stderr, "main loop write task=%p\n", task));
					WQ2RQ(evthr_ctx, task);
				}
			}

			/* NEXT fd */
			if (++i > maxfd)
				i = 0;

		/* while not DONE(fd) */
		} while (i != lastfd);

		/*
		**  Check timeouts activity.
		**  Note: if a task had I/O activity, then it is already
		**  removed from wait queue, hence even if a timeout occurred
		**  too, that event will not be noted in the event mask.
		*/

#if EVTHR_DBG_DELAY
		ftask = EVTHR_WAITQ_FIRST(evthr_ctx);
#endif
		for (task = EVTHR_WAITQ_FIRST(evthr_ctx);
		     task != EVTHR_WAITQ_END(evthr_ctx);
		     task = slptsk)
		{
			SM_IS_EVTHR_TSK(task);
			if (!evthr_is_slp(task) ||
			    timercmp(&evthr_ctx->evthr_c_time, &task->evthr_t_sleep, <))
				break;
#if EVTHR_DBG_DELAY
			CHKDEL
#endif
			DPRINTF(3, (stderr, "main loop: add sleep task %p\n", task));
			evthr_clr_rdy(task);
			EVTHR_GOT_EV(task, EVTHR_EV_SL_Y);
			slptsk = EVTHR_WAITQ_NEXT(task);
			WQ2RQ(evthr_ctx, task);
#if EVTHR_DBG_DELAY
			++slt;
#endif
		}

		SM_ASSERT(is_waitqlock(locks));
		r = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut);
		SM_ASSERT(r == 0);
		rel_waitqlock(locks);

		SM_ASSERT(is_runqlock(locks));
		r = pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut);
		SM_ASSERT(r == 0);
		rel_runqlock(locks);

		DPRINTF(8, (stderr, "main loop tasks=%d\n", tasks));

#if EVTHR_DBG_DELAY
		if (!timeok) {
			gettimeofday(&now, NULL);
			ERRPRINTTV("start:    now=", now);
		}
#endif /* EVTHR_DBG_DELAY */

		if (tasks > 0) {
			ret = evthr_wakeupthr(evthr_ctx, tasks, nbtasks);
			DPRINTF(7, (stderr, "main loop started tasks=%d/%u\n"
				, ret, tasks));
		}
		fbtask = NULL;

#endif /* HAVE_SELECT */

	}

 done:
	if (is_waitqlock(locks)) {
		r = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut);
		rel_waitqlock(locks);
	}
	if (is_runqlock(locks)) {
		r = pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut);
		rel_runqlock(locks);
	}
	return SM_SUCCESS;
}

/*
**  EVTHR_TIMEVAL -- return (cached) time in evthr system
**
**	Parameters:
**		evthr_ctx -- evthr context
**		ct -- time (output)
**
**	Returns:
**		usual sm_error code
*/

sm_ret_T
evthr_timeval(sm_evthr_ctx_P evthr_ctx, timeval_T *ct)
{
	SM_IS_EVTHR_CTX(evthr_ctx);
	SM_REQUIRE(ct != NULL);

	/*
	**  Need to lock access to the structure in case it is updated.
	**  Maybe a read/write lock?
	*/

	*ct = evthr_ctx->evthr_c_time;
	return SM_SUCCESS;
}

/*
**  EVTHR_TIME -- return (cached) time in evthr system
**
**	Parameters:
**		evthr_ctx -- evthr context
**
**	Returns:
**		time in seconds since the epoch
*/

time_T
evthr_time(sm_evthr_ctx_P evthr_ctx)
{
	SM_IS_EVTHR_CTX(evthr_ctx);

	/* See above: evthr_timeval() about locking */
	return evthr_ctx->evthr_c_time.tv_sec;
}


syntax highlighted by Code2HTML, v. 0.9.1