/*
 * Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers.
 *	All rights reserved.
 * Copyright (c) 2006 Claus Assmann
 *
 * By using this file, you agree to the terms and conditions set
 * forth in the LICENSE file which can be found at the top level of
 * the sendmail distribution.
 */

/*
**  partly based on workq.c from Programming with POSIX Threads
**  by David Butenhof.
*/

#include "sm/generic.h"
SM_RCSID("@(#)$Id: worker.c,v 1.48 2006/10/30 02:35:44 ca Exp $")
#include "sm/error.h"
#include "sm/assert.h"
#include "sm/memops.h"
#include "sm/time.h"
#include "sm/heap.h"
#include "sm/evthr.h"
#include "sm/io.h"
#include "sm/socket.h"
#include "evthr-int.h"
#include "log.h"

#if EVTHR_PRIOS > 1
/*
**  EVTHR_RUNQ_EMPTY_ALL -- are all run queues empty?
**
**	Parameters:
**		evthr_ctx -- evthr context
**		pprio -- (pointer to) priority of non-empty queue
**			>=0: priority of queue that isn't empty
**			<0: all queues empty
**
**	Returns:
**		true iff all run queues are empty
*/

static int
evthr_runq_empty_all(sm_evthr_ctx_P evthr_ctx, int *pprio)
{
	int i;

	SM_REQUIRE(pprio != NULL);
	for (i = 0; i < evthr_ctx->evthr_c_nprio; i++) {
		if (!EVTHR_RUNQ_EMPTY(evthr_ctx, i)) {
			*pprio = i;
			return false;
		}
	}
	*pprio = -1;
	return true;
}
#endif /* EVTHR_PRIOS > 1 */

#define WAKEUP_SCHEDULER(evthr_ctx, c, status, tid) do { \
		if (!EVTHR_IS_WAKEUP(evthr_ctx)) { \
			c = EVTHR_CONT; \
			status = write(wrpipe(evthr_ctx), (void *) &c, 1);\
			if (status != 1) { \
				DPRINTF(0, (stderr, "[%ld] sev=ERROR, write %c failed=%d\n", \
					(long) tid, c, status)); \
			} \
			else { \
				EVTHR_SET_WAKEUP(evthr_ctx); \
				DPRINTF(8, (stderr, "func=evthr_worker, where=waitq, status=wakeup\n")); \
			} \
		} \
	} while (0)

/*
**  EVTHR_WORKER -- worker thread
**
**	Several of these can be started as threads, they check the
**	runq and execute the functions stored in the tasks.
**
**	Parameters:
**		arg -- evthr context
**
**	Returns:
**		???
**
**  Todo: deal better with failures, see for example EVTHR_EV_WWQ.
*/

void *
evthr_worker(void *arg)
{
	sm_evthr_ctx_P evthr_ctx;
	timespec_T timeout;
	timeval_T now;
	int status;
	bool timedout;
	char c;
	sm_ret_T ret, r2;
	pthread_t tid;
#if EVTHR_PRIOS <= 1
# define prio 0
#else
	int prio;
#endif

/* timeouts for waiting for work: */
#define EVTHR_WORKER_TO_N	60	/* normal (increase even further?) */
#define EVTHR_WORKER_TO_M	5	/* if more than min workers */
#define EVTHR_WORKER_TO_S	1	/* if more than soft limit */

	SM_REQUIRE(arg != NULL);
	evthr_ctx = (sm_evthr_ctx_P) arg;
	tid = pthread_self();
	status = pthread_detach(tid);
	if (status != 0)
		DPRINTF(0, (stderr, "[%ld] worker detach failed=%d\n",
			(long) tid, status));

	/* wait for work */

	/*
	**  We don't need to validate the workq_t here... task don't
	**  create server threads until requests are queued (the
	**  queue has been initialized by then!) and task wait for all
	**  server threads to terminate before destroying a work queue.
	*/

	DPRINTF(5, (stderr, "[%ld] worker is starting\n", (long) tid));
	status = pthread_mutex_lock(&evthr_ctx->evthr_c_runqmut);
	SM_LOCK_OK(status);
	if (status != 0) {
		NOTE(NOT_REACHED)
		return NULL;
	}

	while (true) {
		timedout = false;
		DPRINTF(7, (stderr, "[%ld] worker waiting for work\n", (long) tid));
		gettimeofday(&now, NULL);

		/* make sure time increases */
		if (timercmp(&now, &evthr_ctx->evthr_c_time, >))
			SM_TIMEVAL_TO_TIMESPEC(&now, &timeout);
		else
			SM_TIMEVAL_TO_TIMESPEC(&evthr_ctx->evthr_c_time,
						&timeout);

		if (evthr_ctx->evthr_c_cur > evthr_ctx->evthr_c_max_s) {
			timespec_T delay;

			delay.tv_sec = 0;
			delay.tv_nsec = 10000000; /* 10ms */
			sm_timespecadd(&timeout, &delay, &timeout);
		}
		else if (evthr_ctx->evthr_c_cur > evthr_ctx->evthr_c_min) {
			/* evthr_ctx->evthr_c_cur <= evthr_ctx->evthr_c_max_s */
			timeout.tv_sec += EVTHR_WORKER_TO_M +
				evthr_ctx->evthr_c_max_s - evthr_ctx->evthr_c_cur;
		}
		else
			timeout.tv_sec += EVTHR_WORKER_TO_N;

		while (EVTHR_RUNQ_EMPTY_ALL(evthr_ctx, prio) &&
			!EVTHR_IS_FLAG(evthr_ctx, EVTHR_FL_STOP)) {
			/*
			**  "Go idle" waiting for work. Each server will wait
			**  up to EVTHR_WORKER_TO_x seconds for work,
			**  and then give up if there are enough threads.
			**
			**  Note: if the system should stop then cv is
			**  signalled, hence the timeout can be large.
			*/

			++evthr_ctx->evthr_c_idl;
			status = pthread_cond_timedwait(&evthr_ctx->evthr_c_cv,
					&evthr_ctx->evthr_c_runqmut, &timeout);
			SM_ASSERT(evthr_ctx->evthr_c_idl > 0);
			--evthr_ctx->evthr_c_idl;

			/* Got work? */
			if (!EVTHR_RUNQ_EMPTY_ALL(evthr_ctx, prio))
				break;

			/* If the wait timed out queued, quit */
			if (status == ETIMEDOUT) {
				timedout = true;
				break;
			}
			else if (status != 0) {
				/*
				**  This shouldn't happen, so the work queue
				**  package should fail. Because the work queue
				**  API is asynchronous, that would add
				**  complication. Because the chances of failure
				**  are slim, I choose to avoid that
				**  complication. The server thread will return,
				**  and allow another server thread to pick up
				**  the work later. Note that, if this was the
				**  only server thread, the queue won't be
				**  serviced until a new work item is queued.
				**  That could be fixed by creating a new server
				**  here.
				*/

				sm_log_write(evthr_ctx->evthr_c_lctx,
					EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER,
					SM_LOG_ERROR, 1,
					"sev=ERROR, func=evthr_worker, timed_wait=%d"
					, status);
				SM_ASSERT(evthr_ctx->evthr_c_cur > 0);
				--evthr_ctx->evthr_c_cur;
				if (evthr_ctx->evthr_c_cur <= evthr_ctx->evthr_c_max_s)
					EVTHR_CLR_FLAG(evthr_ctx, EVTHR_FL_SL_EXC);
				(void) pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut);
				return NULL;
			}
		}

		/*
		**  should we look for a non empty queue again or just check
		**  the one that we found? that is:
		**  if (prio > 0 && !EVTHR_RUNQ_EMPTY_ALL(evthr_ctx, prio))
		*/

		if (!EVTHR_RUNQ_EMPTY_ALL(evthr_ctx, prio)) {
			sm_evthr_task_P task;
			evthr_task_F *fct;

			task = EVTHR_RUNQ_FIRST(evthr_ctx, prio);
			SM_IS_EVTHR_TSK(task);
#if SM_LOCK_TASK
			status = pthread_mutex_lock(&task->evthr_t_mutex);
			SM_LOCK_OK(status);
			if (status != 0) {
				NOTE(NOT_REACHED)
				DPRINTF(0, (stderr,
					"[%ld] sev=ERROR, worker, task=%p, lock=%d\n",
					(long) tid, task, status));
				sm_log_write(evthr_ctx->evthr_c_lctx,
					EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER,
					SM_LOG_ERROR, 1,
					"sev=ERROR, func=evthr_worker, mutex=t_mutex, lock=%d"
					, status);
				return NULL;
			}
#endif /* SM_LOCK_TASK */
			DPRINTF(7, (stderr, "[%ld] worker, task=%p, stop=%d\n",
				(long) tid, task,
				EVTHR_IS_FLAG(evthr_ctx, EVTHR_FL_STOP)));
			fct = task->evthr_t_fct;
			EVTHR_RUNQ_DEL(evthr_ctx, prio, task);
			EVTHR_REM_FROMQ(task, EVTHR_EV_IRQ);
			++evthr_ctx->evthr_c_act;
			status = pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut);
			if (status != 0) {
				sm_log_write(evthr_ctx->evthr_c_lctx,
					EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER,
					SM_LOG_ERROR, 1,
					"sev=ERROR, func=evthr_worker, mutex=runq_mutex, unlock=%d"
					, status);
				SM_ASSERT(evthr_ctx->evthr_c_act > 0);
				--evthr_ctx->evthr_c_act;
				return NULL;
			}

			DPRINTF(4, (stderr, "[%ld] worker task=%p, flags=%X\n",
				(long) tid, task, task->evthr_t_flags));
			ret = fct(task);
			evthr_clr_rdy(task);

			/*
			**  free nc here? is this safe (locked?)
			**  It might be safer to copy it into a local variable
			**  and free evthr_t_nc above.
			**  However, we also pass sleep time directly...
			**  Maybe we should use also a local variable for it.
			*/

			SM_FREE(task->evthr_t_nc);

			DPRINTF(4, (stderr, "[%ld] worker task=%p, ret=%x\n",
				(long) tid, task, ret));
			status = pthread_mutex_lock(&evthr_ctx->evthr_c_runqmut);
			SM_LOCK_OK(status);
			if (status != 0) {
				NOTE(NOT_REACHED)
				return NULL;
			}
			SM_ASSERT(evthr_ctx->evthr_c_act > 0);
			--evthr_ctx->evthr_c_act;

			/* check whether fct wants to change event mask */
			/* Do this only if not ASYNC? */
			if (!evthr_act_async(ret)) {
				if (evthr_r_set(ret))
					evthr_set_ev(task, evthr_r_set_ev(ret));
				if (evthr_r_clr(ret))
					evthr_clr_ev(task, evthr_r_clr_ev(ret));
			}

			/* check what to do with task */
			if (evthr_act_waitq(ret)) {
				EVTHR_WANTS_INQ(task, EVTHR_EV_WWQ);
				status = pthread_mutex_lock(&evthr_ctx->evthr_c_waitqmut);
				SM_LOCK_OK(status);
				if (status != 0) {
					NOTE(NOT_REACHED)
#if SM_LOCK_TASK
					sm_log_write(evthr_ctx->evthr_c_lctx,
						EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER,
						SM_LOG_ERROR, 1,
						"sev=ERROR, func=evthr_worker, mutex=waitq_mutex, lock=%d"
						, status);
					(void) pthread_mutex_unlock(&task->evthr_t_mutex);
#endif
					return NULL;
				}
				EVTHR_WAITQ_APP(evthr_ctx, task);
				WAKEUP_SCHEDULER(evthr_ctx, c, status, tid);
				EVTHR_IS_INQ(task, EVTHR_EV_IWQ);
				status = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut);
				if (status != 0) {
					sm_log_write(evthr_ctx->evthr_c_lctx,
						EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER,
						SM_LOG_ERROR, 1,
						"sev=ERROR, func=evthr_worker, mutex=waitq_mutex, unlock=%d"
						, status);
					SM_ASSERT(status == 0);
				}
			}
			else if (evthr_act_slpq(ret)) {
				EVTHR_WANTS_INQ(task, EVTHR_EV_WWQ);
				status = pthread_mutex_lock(&evthr_ctx->evthr_c_waitqmut);
				SM_LOCK_OK(status);
				if (status != 0) {
					NOTE(NOT_REACHED)
#if SM_LOCK_TASK
					(void) pthread_mutex_unlock(&task->evthr_t_mutex);
#endif
					DPRINTF(0, (stderr, "[%ld] sev=ERROR, lock-waitq failed=%d\n",
						(long) tid, status));
					sm_log_write(evthr_ctx->evthr_c_lctx,
						EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER,
						SM_LOG_ERROR, 1,
						"sev=ERROR, func=evthr_worker, mutex=waitq_mutex, lock=%d"
						, status);
					return NULL;	/* no good! */
				}
				evthr_slpq_ins(evthr_ctx, task);
				WAKEUP_SCHEDULER(evthr_ctx, c, status, tid);
				status = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut);
				if (status != 0) {
					sm_log_write(evthr_ctx->evthr_c_lctx,
						EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER,
						SM_LOG_ERROR, 1,
						"sev=ERROR, func=evthr_worker, mutex=waitq_mutex, unlock=%d"
						, status);
					SM_ASSERT(status == 0);
				}
			}
			else if (evthr_act_runq(ret)) {
				/* we have the mutex for runq */
				EVTHR_RUNQ_APP(evthr_ctx, prio, task);
			}
			else if (evthr_act_del(ret)) {
				r2 = evthr_task_del(evthr_ctx, task, THR_LOCK_UNLOCK);
				if (sm_is_err(r2)) {
					/* complain? */
					EVTHR_IS_INQ(task, EVTHR_EV_DEL);
					DPRINTF(0, (stderr, "[%ld] sev=ERROR, task_del failed=%x\n",
						(long) tid, r2));
					sm_log_write(evthr_ctx->evthr_c_lctx,
						EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER,
						SM_LOG_ERROR, 1,
						"sev=ERROR, func=evthr_worker, evthr_task_del=%d"
						, r2);
				}
			}
			if (evthr_act_term(ret)) {
				c = EVTHR_STOP;
				status = write(wrpipe(evthr_ctx), (void *) &c, 1);
				if (status != 1) {
					r2 = errno;
					DPRINTF(0, (stderr, "[%ld] sev=ERROR, write %c failed=%d\n",
						(long) tid, c, status));
					sm_log_write(evthr_ctx->evthr_c_lctx,
						EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER,
						SM_LOG_ERROR, 1,
						"sev=ERROR, func=evthr_worker, write=%d, errno=%d"
						, status, r2);
				}
			}
#if SM_LOCK_TASK
			if (!evthr_act_async(ret)) {
				status = pthread_mutex_unlock(&task->evthr_t_mutex);
				if (status != 0) {
					DPRINTF(0, (stderr, "[%ld] sev=ERROR, worker, task=%p, unlock=%d\n",
						(long) tid, task, status));
					sm_log_write(evthr_ctx->evthr_c_lctx,
						EVTHR_LCAT_WORKER, EVTHR_LMOD_WORKER,
						SM_LOG_ERROR, 1,
						"sev=ERROR, func=evthr_worker, mutex=t_mutex, unlock=%d"
						, status);
					return NULL;
				}
			}
#endif /* SM_LOCK_TASK */
		}

		/*
		**  If there are no more work requests, and the servers
		**  have been asked to quit, then shut down.
		*/

		if (EVTHR_RUNQ_EMPTY_ALL(evthr_ctx, prio) &&
		    EVTHR_IS_FLAG(evthr_ctx, EVTHR_FL_STOP)) {
			DPRINTF(6, (stderr, "[%ld] worker shutting down\n", (long) tid));
			SM_ASSERT(evthr_ctx->evthr_c_cur > 0);
			--evthr_ctx->evthr_c_cur;
			if (evthr_ctx->evthr_c_cur <= evthr_ctx->evthr_c_max_s)
				EVTHR_CLR_FLAG(evthr_ctx, EVTHR_FL_SL_EXC);

			/*
			**  NOTE: Just to prove that every rule has an
			**  exception, I'm using the "cv" condition for two
			**  separate predicates here. That's OK, since the
			**  case used here applies only once during the life
			**  of a work queue -- during rundown. The overhead
			**  is minimal and it's not worth creating a separate
			**  condition variable that would be waited and
			**  signaled exactly once!
			*/

			if (evthr_ctx->evthr_c_cur == 0)
				(void) pthread_cond_broadcast(&evthr_ctx->evthr_c_cv);
			(void) pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut);
			return NULL;
		}

		/*
		**  If there's no more work, and task waited for as long as
		**  task is allowed, then terminate this server thread.
		*/

		if (EVTHR_RUNQ_EMPTY_ALL(evthr_ctx, prio) && timedout &&
		    evthr_ctx->evthr_c_cur > evthr_ctx->evthr_c_min) {
			DPRINTF(5, (stderr,
				"[%ld] worker terminating due to timeout.\n",
				(long) tid));
			SM_ASSERT(evthr_ctx->evthr_c_cur > 0);
			--evthr_ctx->evthr_c_cur;
			if (evthr_ctx->evthr_c_cur <= evthr_ctx->evthr_c_max_s)
				EVTHR_CLR_FLAG(evthr_ctx, EVTHR_FL_SL_EXC);
			break;
		}
	}

	pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut);
	DPRINTF(4, (stderr, "[%ld] worker exiting\n", (long) tid));
	return NULL;
}


syntax highlighted by Code2HTML, v. 0.9.1