/*
 * Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers.
 *	All rights reserved.
 *
 * By using this file, you agree to the terms and conditions set
 * forth in the LICENSE file which can be found at the top level of
 * the sendmail distribution.
 */

#include "sm/generic.h"
SM_RCSID("@(#)$Id: evthrst.c,v 1.41 2006/12/29 01:31:22 ca Exp $")
#include "sm/error.h"
#include "sm/assert.h"
#include "sm/memops.h"
#include "sm/heap.h"
#include "sm/evthr.h"
#include "sm/io.h"
#include "evthr-int.h"
#include "log.h"

/*
**  EVTHR_INIT -- initialize event thread system
**
**	Parameters:
**		pevthr_ctx -- (pointer to) evthr context (output)
**		minthr -- minimum number of threads
**		maxthr -- maximum number of threads
**		maxfd -- maximum number of fds
**
**	Returns:
**		usual sm_error code
**
**	Locking: none
**
**	Last code review: 2006-03-12 20:18:41
**	Last code change: 2006-10-29 17:41:05
*/

sm_ret_T
evthr_init(sm_evthr_ctx_P *pevthr_ctx, uint minthr, uint maxthr, uint maxfd)
{
	int status;
	uint u;
	sm_ret_T ret;
	pthread_t tid;
	sm_evthr_ctx_P evthr_ctx;
	int pipefd[2];
	sm_evthr_req_P req;

	NOTE(NO_COMPETING_THREADS_NOW)
	SM_REQUIRE(pevthr_ctx != NULL);
	SM_REQUIRE(maxthr >= 1);
	SM_REQUIRE(maxfd >= 1);

	evthr_ctx = (sm_evthr_ctx_P) sm_zalloc(sizeof(*evthr_ctx));
	if (NULL == evthr_ctx)
		return sm_error_temp(SM_EM_EVTHR, ENOMEM);
	u = sizeof(*evthr_ctx->evthr_c_fd2t) * maxfd;
	SM_ASSERT(u >= sizeof(*evthr_ctx->evthr_c_fd2t));
	SM_ASSERT(u >= maxfd);
	evthr_ctx->evthr_c_fd2t = (sm_evthr_task_P *) sm_zalloc(u);
	if (NULL == evthr_ctx->evthr_c_fd2t) {
		ret = sm_error_temp(SM_EM_EVTHR, ENOMEM);
		goto error;
	}
	evthr_ctx->evthr_c_maxfd = maxfd;

	pipefd[0] = pipefd[1] = -1;
	EVTHR_RUNQ_INIT(evthr_ctx);
	EVTHR_WAITQ_INIT(evthr_ctx);
	EVTHR_REQ_INIT(evthr_ctx);
	evthr_ctx->evthr_c_min = minthr;
	evthr_ctx->evthr_c_max_s = maxthr;
	evthr_ctx->evthr_c_max_h = maxthr;
#if EVTHR_PRIOS > 1
	evthr_ctx->evthr_c_nprio = EVTHR_PRIOS;
#endif

	/* should we start min thread here? */
	evthr_ctx->evthr_c_cur = 0;
	evthr_ctx->evthr_c_idl = 0;
	evthr_ctx->evthr_c_act = 0;
	if (pipe(pipefd) < 0) {
		ret = sm_error_perm(SM_EM_EVTHR, errno);
		goto error;
	}
	ret = sm_fd_nonblock(pipefd[0], true);
	if (sm_is_err(ret))
		goto error;
	ret = sm_fd_nonblock(pipefd[1], true);
	if (sm_is_err(ret))
		goto error;
	evthr_ctx->evthr_c_pipe[0] = pipefd[0];
	evthr_ctx->evthr_c_pipe[1] = pipefd[1];

	/* set up logging */
	ret = sm_log_create(NULL, &evthr_ctx->evthr_c_lctx, NULL);
	if (sm_is_err(ret))
		goto error;
	ret = sm_log_setfp_fd(evthr_ctx->evthr_c_lctx, smiolog, SMIOLOG_FILENO);
	if (sm_is_err(ret))
		goto error;
	ret = sm_log_setdebuglevel(evthr_ctx->evthr_c_lctx, 2);
	if (sm_is_err(ret))
		goto error;

	/*
	**  Initialize signal mask and start signal handler thread.
	**  Must be done before any thread is started, otherwise
	**  the signal mask is not set correctly for all threads.
	*/

	ret = evthr_signal_init(evthr_ctx);
	if (sm_is_err(ret))
		goto error;

	/* initialize mutexes, cv */
	status = pthread_mutex_init(&evthr_ctx->evthr_c_runqmut, SM_PTHREAD_MUTEXATTR);
	if (status != 0) {
		ret = sm_error_perm(SM_EM_EVTHR, status);
		goto error;
	}
	status = pthread_mutex_init(&evthr_ctx->evthr_c_waitqmut, SM_PTHREAD_MUTEXATTR);
	if (status != 0) {
		ret = sm_error_perm(SM_EM_EVTHR, status);
		goto err2;
	}
	status = pthread_cond_init(&evthr_ctx->evthr_c_cv, NULL);
	if (status != 0) {
		ret = sm_error_perm(SM_EM_EVTHR, status);
		goto err1;
	}

	/* necessary "early" for evthr_req_new() and evthr_reqs_free() */
	evthr_ctx->sm_magic = SM_EVTHR_CTX_MAGIC;
	status = pthread_mutex_init(&evthr_ctx->evthr_c_reqmut, SM_PTHREAD_MUTEXATTR);
	if (status != 0) {
		ret = sm_error_perm(SM_EM_EVTHR, status);
		goto err0;
	}

	/* How many request contexts should be created?? */
	for (u = 0; u < minthr; u++) {
		req = evthr_req_new(evthr_ctx);
		if (NULL == req)
			goto errora;
	}

	/* done... */
	*pevthr_ctx = evthr_ctx;

	for (u = 0; u < minthr; u++) {
		status = pthread_create(&tid, NULL, evthr_worker, (void *)evthr_ctx);
		if (status != 0) {
			ret = sm_error_perm(SM_EM_EVTHR, status);
			goto err0;
		}
		++evthr_ctx->evthr_c_cur;
	}
	NOTE(COMPETING_THREADS_NOW)

	status = gettimeofday(&evthr_ctx->evthr_c_time, NULL);
	if (status < 0)
		ret = sm_error_temp(SM_EM_EVTHR, errno);
	return ret;

	/* Todo: we need to stop threads that have been started! */
  errora:
	(void) evthr_reqs_free(evthr_ctx);
	/* maybe call evthr_stop()? however, the process will exit anyway */
  err0:
	(void) pthread_cond_destroy(&evthr_ctx->evthr_c_cv);
  err1:
	(void) pthread_mutex_destroy(&evthr_ctx->evthr_c_waitqmut);
  err2:
	(void) pthread_mutex_destroy(&evthr_ctx->evthr_c_runqmut);
  error:

	SM_FREE(evthr_ctx->evthr_c_fd2t);
	SM_FREE_SIZE(evthr_ctx, sizeof(*evthr_ctx));
	if (pipefd[0] != -1)
		close(pipefd[0]);
	if (pipefd[1] != -1)
		close(pipefd[1]);
	*pevthr_ctx = NULL;
	return ret;
}

/*
**  EVTHR_STOP -- stop event thread system
**
**	Parameters:
**		evthr_ctx -- evthr context
**
**	Returns:
**		usual sm_error code
**
**	Locking:
**		locks run queue so nothing gets started anymore
**
**	Last code review:
**	Last code change:
*/

sm_ret_T
evthr_stop(sm_evthr_ctx_P evthr_ctx)
{
	int status;
	sm_ret_T ret;
	sm_evthr_task_P task, task2;

	SM_IS_EVTHR_CTX(evthr_ctx);

	ret = SM_SUCCESS;
	status = pthread_mutex_lock(&evthr_ctx->evthr_c_runqmut);
	SM_LOCK_OK(status);
	DPRINTF(0, (stderr, "stop: lock=%d\n", status));
	if (status != 0) {
		NOTE(NOT_REACHED)
		return sm_error_perm(SM_EM_EVTHR, status);
	}

	/* stop threads? */
	DPRINTF(0, (stderr, "stop: cur=%d\n", evthr_ctx->evthr_c_cur));
	if (evthr_ctx->evthr_c_cur > 0) {
		/* more flags? NOW, IMMEDIATE, ... */
		EVTHR_SET_FLAG(evthr_ctx, EVTHR_FL_STOP);
		DPRINTF(0, (stderr, "stop: idle=%d\n", evthr_ctx->evthr_c_idl));
		if (evthr_ctx->evthr_c_idl > 0) {
			status = pthread_cond_broadcast(&evthr_ctx->evthr_c_cv);
			if (status != 0) {
				DPRINTF(0, (stderr, "stop: broadcast=%d\n", status));
				ret = sm_error_perm(SM_EM_EVTHR, status);
				goto errunlock;
			}
		}
		while (evthr_ctx->evthr_c_cur > 0) {
			/* Use pthread_cond_timedwait()? */
			status = pthread_cond_wait(&evthr_ctx->evthr_c_cv, &evthr_ctx->evthr_c_runqmut);
			if (status != 0) {
				DPRINTF(0, (stderr, "stop: condwait=%d\n", status));
				ret = sm_error_perm(SM_EM_EVTHR, status);
				goto errunlock;
			}
		}
	}

	/*
	**  Free the elements in the queues??
	**  Should we call some task-specific cleanup fct?
	**  Currently this leaks memory (and maybe accept fds); see below.
	**  However, this doesn't matter that much because the program
	**  terminates "soon".
	*/

	/* remove entries from the wait queue */
	status = pthread_mutex_lock(&evthr_ctx->evthr_c_waitqmut);
	SM_LOCK_OK(status);
	NOTE(NO_COMPETING_THREADS_NOW)
	if (0 == status) {
		for (task = EVTHR_WAITQ_FIRST(evthr_ctx);
		     task != EVTHR_WAITQ_END(evthr_ctx);
		     task = task2)
		{
			task2 = EVTHR_WAITQ_NEXT(task);
			CLOSE_FD(task->evthr_t_fd);
			if (task->evthr_t_nc != NULL)
				CLOSE_FD(task->evthr_t_nc->evthr_a_fd);

			/* todo: free the task itself */

			EVTHR_WAITQ_DEL(evthr_ctx, task);
		}
		status = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut);
	}
	else {
		NOTE(NOT_REACHED)
	}

	(void) close(evthr_ctx->evthr_c_pipe[0]);
	(void) close(evthr_ctx->evthr_c_pipe[1]);

	SM_FREE(evthr_ctx->evthr_c_fd2t);

	status = pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut);
	DPRINTF(0, (stderr, "stop: unlock=%d\n", status));
	if (status != 0)
		return sm_error_perm(SM_EM_EVTHR, status);

	/* destroy mutex, cv */
	status = pthread_mutex_destroy(&evthr_ctx->evthr_c_waitqmut);
	DPRINTF(0, (stderr, "stop: waitq_mutex_destroy=%d\n", status));
	if (status != 0)
		ret = sm_error_perm(SM_EM_EVTHR, status);
	status = pthread_mutex_destroy(&evthr_ctx->evthr_c_runqmut);
	DPRINTF(0, (stderr, "stop: runq_mutex_destroy=%d\n", status));
	if (status != 0)
		ret = sm_error_perm(SM_EM_EVTHR, status);
	status = pthread_cond_destroy(&evthr_ctx->evthr_c_cv);
	DPRINTF(0, (stderr, "stop: cond_destroy=%d\n", status));
	if (status != 0)
		ret = sm_error_perm(SM_EM_EVTHR, status);
	status = evthr_reqs_free(evthr_ctx);
	status = pthread_mutex_destroy(&evthr_ctx->evthr_c_reqmut);
	DPRINTF(0, (stderr, "stop: req_mutex_destroy=%d\n", status));
	if (status != 0)
		ret = sm_error_perm(SM_EM_EVTHR, status);

	sm_free((void *)evthr_ctx);
	return ret;

  errunlock:
	(void) pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut);
	return ret;
}

/*
**  EVTHR_REQ_NEW -- create a new request structure and append it to the list
**
**	Parameters:
**		evthr_ctx -- evthr context
**
**	Returns:
**		pointer to request structure; NULL on failure (ENOMEM)
**
**	Side Effects: none on error
**
**	Locking:
**		does not lock req list; must be done be caller.
**
**	Last code review: 2005-03-22 23:38:13
**	Last code change: 2006-10-30 04:09:35
*/

sm_evthr_req_P
evthr_req_new(sm_evthr_ctx_P evthr_ctx)
{
	sm_evthr_req_P req;

	req = (sm_evthr_req_P) sm_zalloc(sizeof(*req));
	if (NULL == req)
		return NULL;
	EVTHR_REQ_APP(evthr_ctx, req);
	SM_ASSERT(evthr_ctx->evthr_c_tot_reqs < UINT_MAX);
	++evthr_ctx->evthr_c_tot_reqs;
	return req;
}

/*
**  EVTHR_REQS_FREE -- free all requests
**
**	Parameters:
**		evthr_ctx -- evthr context
**
**	Returns:
**		usual sm_error code
**
**	Locking:
**		does not lock req list; must be done be caller.
**
**	Last code review:
**	Last code change: 2005-03-22 23:39:41
*/

sm_ret_T
evthr_reqs_free(sm_evthr_ctx_P evthr_ctx)
{
	sm_evthr_req_P req, req_next;

	SM_IS_EVTHR_CTX(evthr_ctx);
	for (req = EVTHR_REQ_FIRST(evthr_ctx); req != EVTHR_REQ_END(evthr_ctx); req = req_next)
	{
		req_next = EVTHR_REQ_NEXT(req);
		EVTHR_REQ_DEL(evthr_ctx, req);
		sm_free_size(req, sizeof(*req));
	}
	EVTHR_REQ_INIT(evthr_ctx);
	evthr_ctx->evthr_c_tot_reqs = 0;
	return SM_SUCCESS;
}


syntax highlighted by Code2HTML, v. 0.9.1