/*
* Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers.
* All rights reserved.
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the sendmail distribution.
*/
#include "sm/generic.h"
SM_RCSID("@(#)$Id: evthrst.c,v 1.41 2006/12/29 01:31:22 ca Exp $")
#include "sm/error.h"
#include "sm/assert.h"
#include "sm/memops.h"
#include "sm/heap.h"
#include "sm/evthr.h"
#include "sm/io.h"
#include "evthr-int.h"
#include "log.h"
/*
** EVTHR_INIT -- initialize event thread system
**
** Parameters:
** pevthr_ctx -- (pointer to) evthr context (output)
** minthr -- minimum number of threads
** maxthr -- maximum number of threads
** maxfd -- maximum number of fds
**
** Returns:
** usual sm_error code
**
** Locking: none
**
** Last code review: 2006-03-12 20:18:41
** Last code change: 2006-10-29 17:41:05
*/
sm_ret_T
evthr_init(sm_evthr_ctx_P *pevthr_ctx, uint minthr, uint maxthr, uint maxfd)
{
int status;
uint u;
sm_ret_T ret;
pthread_t tid;
sm_evthr_ctx_P evthr_ctx;
int pipefd[2];
sm_evthr_req_P req;
NOTE(NO_COMPETING_THREADS_NOW)
SM_REQUIRE(pevthr_ctx != NULL);
SM_REQUIRE(maxthr >= 1);
SM_REQUIRE(maxfd >= 1);
evthr_ctx = (sm_evthr_ctx_P) sm_zalloc(sizeof(*evthr_ctx));
if (NULL == evthr_ctx)
return sm_error_temp(SM_EM_EVTHR, ENOMEM);
u = sizeof(*evthr_ctx->evthr_c_fd2t) * maxfd;
SM_ASSERT(u >= sizeof(*evthr_ctx->evthr_c_fd2t));
SM_ASSERT(u >= maxfd);
evthr_ctx->evthr_c_fd2t = (sm_evthr_task_P *) sm_zalloc(u);
if (NULL == evthr_ctx->evthr_c_fd2t) {
ret = sm_error_temp(SM_EM_EVTHR, ENOMEM);
goto error;
}
evthr_ctx->evthr_c_maxfd = maxfd;
pipefd[0] = pipefd[1] = -1;
EVTHR_RUNQ_INIT(evthr_ctx);
EVTHR_WAITQ_INIT(evthr_ctx);
EVTHR_REQ_INIT(evthr_ctx);
evthr_ctx->evthr_c_min = minthr;
evthr_ctx->evthr_c_max_s = maxthr;
evthr_ctx->evthr_c_max_h = maxthr;
#if EVTHR_PRIOS > 1
evthr_ctx->evthr_c_nprio = EVTHR_PRIOS;
#endif
/* should we start min thread here? */
evthr_ctx->evthr_c_cur = 0;
evthr_ctx->evthr_c_idl = 0;
evthr_ctx->evthr_c_act = 0;
if (pipe(pipefd) < 0) {
ret = sm_error_perm(SM_EM_EVTHR, errno);
goto error;
}
ret = sm_fd_nonblock(pipefd[0], true);
if (sm_is_err(ret))
goto error;
ret = sm_fd_nonblock(pipefd[1], true);
if (sm_is_err(ret))
goto error;
evthr_ctx->evthr_c_pipe[0] = pipefd[0];
evthr_ctx->evthr_c_pipe[1] = pipefd[1];
/* set up logging */
ret = sm_log_create(NULL, &evthr_ctx->evthr_c_lctx, NULL);
if (sm_is_err(ret))
goto error;
ret = sm_log_setfp_fd(evthr_ctx->evthr_c_lctx, smiolog, SMIOLOG_FILENO);
if (sm_is_err(ret))
goto error;
ret = sm_log_setdebuglevel(evthr_ctx->evthr_c_lctx, 2);
if (sm_is_err(ret))
goto error;
/*
** Initialize signal mask and start signal handler thread.
** Must be done before any thread is started, otherwise
** the signal mask is not set correctly for all threads.
*/
ret = evthr_signal_init(evthr_ctx);
if (sm_is_err(ret))
goto error;
/* initialize mutexes, cv */
status = pthread_mutex_init(&evthr_ctx->evthr_c_runqmut, SM_PTHREAD_MUTEXATTR);
if (status != 0) {
ret = sm_error_perm(SM_EM_EVTHR, status);
goto error;
}
status = pthread_mutex_init(&evthr_ctx->evthr_c_waitqmut, SM_PTHREAD_MUTEXATTR);
if (status != 0) {
ret = sm_error_perm(SM_EM_EVTHR, status);
goto err2;
}
status = pthread_cond_init(&evthr_ctx->evthr_c_cv, NULL);
if (status != 0) {
ret = sm_error_perm(SM_EM_EVTHR, status);
goto err1;
}
/* necessary "early" for evthr_req_new() and evthr_reqs_free() */
evthr_ctx->sm_magic = SM_EVTHR_CTX_MAGIC;
status = pthread_mutex_init(&evthr_ctx->evthr_c_reqmut, SM_PTHREAD_MUTEXATTR);
if (status != 0) {
ret = sm_error_perm(SM_EM_EVTHR, status);
goto err0;
}
/* How many request contexts should be created?? */
for (u = 0; u < minthr; u++) {
req = evthr_req_new(evthr_ctx);
if (NULL == req)
goto errora;
}
/* done... */
*pevthr_ctx = evthr_ctx;
for (u = 0; u < minthr; u++) {
status = pthread_create(&tid, NULL, evthr_worker, (void *)evthr_ctx);
if (status != 0) {
ret = sm_error_perm(SM_EM_EVTHR, status);
goto err0;
}
++evthr_ctx->evthr_c_cur;
}
NOTE(COMPETING_THREADS_NOW)
status = gettimeofday(&evthr_ctx->evthr_c_time, NULL);
if (status < 0)
ret = sm_error_temp(SM_EM_EVTHR, errno);
return ret;
/* Todo: we need to stop threads that have been started! */
errora:
(void) evthr_reqs_free(evthr_ctx);
/* maybe call evthr_stop()? however, the process will exit anyway */
err0:
(void) pthread_cond_destroy(&evthr_ctx->evthr_c_cv);
err1:
(void) pthread_mutex_destroy(&evthr_ctx->evthr_c_waitqmut);
err2:
(void) pthread_mutex_destroy(&evthr_ctx->evthr_c_runqmut);
error:
SM_FREE(evthr_ctx->evthr_c_fd2t);
SM_FREE_SIZE(evthr_ctx, sizeof(*evthr_ctx));
if (pipefd[0] != -1)
close(pipefd[0]);
if (pipefd[1] != -1)
close(pipefd[1]);
*pevthr_ctx = NULL;
return ret;
}
/*
** EVTHR_STOP -- stop event thread system
**
** Parameters:
** evthr_ctx -- evthr context
**
** Returns:
** usual sm_error code
**
** Locking:
** locks run queue so nothing gets started anymore
**
** Last code review:
** Last code change:
*/
sm_ret_T
evthr_stop(sm_evthr_ctx_P evthr_ctx)
{
int status;
sm_ret_T ret;
sm_evthr_task_P task, task2;
SM_IS_EVTHR_CTX(evthr_ctx);
ret = SM_SUCCESS;
status = pthread_mutex_lock(&evthr_ctx->evthr_c_runqmut);
SM_LOCK_OK(status);
DPRINTF(0, (stderr, "stop: lock=%d\n", status));
if (status != 0) {
NOTE(NOT_REACHED)
return sm_error_perm(SM_EM_EVTHR, status);
}
/* stop threads? */
DPRINTF(0, (stderr, "stop: cur=%d\n", evthr_ctx->evthr_c_cur));
if (evthr_ctx->evthr_c_cur > 0) {
/* more flags? NOW, IMMEDIATE, ... */
EVTHR_SET_FLAG(evthr_ctx, EVTHR_FL_STOP);
DPRINTF(0, (stderr, "stop: idle=%d\n", evthr_ctx->evthr_c_idl));
if (evthr_ctx->evthr_c_idl > 0) {
status = pthread_cond_broadcast(&evthr_ctx->evthr_c_cv);
if (status != 0) {
DPRINTF(0, (stderr, "stop: broadcast=%d\n", status));
ret = sm_error_perm(SM_EM_EVTHR, status);
goto errunlock;
}
}
while (evthr_ctx->evthr_c_cur > 0) {
/* Use pthread_cond_timedwait()? */
status = pthread_cond_wait(&evthr_ctx->evthr_c_cv, &evthr_ctx->evthr_c_runqmut);
if (status != 0) {
DPRINTF(0, (stderr, "stop: condwait=%d\n", status));
ret = sm_error_perm(SM_EM_EVTHR, status);
goto errunlock;
}
}
}
/*
** Free the elements in the queues??
** Should we call some task-specific cleanup fct?
** Currently this leaks memory (and maybe accept fds); see below.
** However, this doesn't matter that much because the program
** terminates "soon".
*/
/* remove entries from the wait queue */
status = pthread_mutex_lock(&evthr_ctx->evthr_c_waitqmut);
SM_LOCK_OK(status);
NOTE(NO_COMPETING_THREADS_NOW)
if (0 == status) {
for (task = EVTHR_WAITQ_FIRST(evthr_ctx);
task != EVTHR_WAITQ_END(evthr_ctx);
task = task2)
{
task2 = EVTHR_WAITQ_NEXT(task);
CLOSE_FD(task->evthr_t_fd);
if (task->evthr_t_nc != NULL)
CLOSE_FD(task->evthr_t_nc->evthr_a_fd);
/* todo: free the task itself */
EVTHR_WAITQ_DEL(evthr_ctx, task);
}
status = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut);
}
else {
NOTE(NOT_REACHED)
}
(void) close(evthr_ctx->evthr_c_pipe[0]);
(void) close(evthr_ctx->evthr_c_pipe[1]);
SM_FREE(evthr_ctx->evthr_c_fd2t);
status = pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut);
DPRINTF(0, (stderr, "stop: unlock=%d\n", status));
if (status != 0)
return sm_error_perm(SM_EM_EVTHR, status);
/* destroy mutex, cv */
status = pthread_mutex_destroy(&evthr_ctx->evthr_c_waitqmut);
DPRINTF(0, (stderr, "stop: waitq_mutex_destroy=%d\n", status));
if (status != 0)
ret = sm_error_perm(SM_EM_EVTHR, status);
status = pthread_mutex_destroy(&evthr_ctx->evthr_c_runqmut);
DPRINTF(0, (stderr, "stop: runq_mutex_destroy=%d\n", status));
if (status != 0)
ret = sm_error_perm(SM_EM_EVTHR, status);
status = pthread_cond_destroy(&evthr_ctx->evthr_c_cv);
DPRINTF(0, (stderr, "stop: cond_destroy=%d\n", status));
if (status != 0)
ret = sm_error_perm(SM_EM_EVTHR, status);
status = evthr_reqs_free(evthr_ctx);
status = pthread_mutex_destroy(&evthr_ctx->evthr_c_reqmut);
DPRINTF(0, (stderr, "stop: req_mutex_destroy=%d\n", status));
if (status != 0)
ret = sm_error_perm(SM_EM_EVTHR, status);
sm_free((void *)evthr_ctx);
return ret;
errunlock:
(void) pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut);
return ret;
}
/*
** EVTHR_REQ_NEW -- create a new request structure and append it to the list
**
** Parameters:
** evthr_ctx -- evthr context
**
** Returns:
** pointer to request structure; NULL on failure (ENOMEM)
**
** Side Effects: none on error
**
** Locking:
** does not lock req list; must be done be caller.
**
** Last code review: 2005-03-22 23:38:13
** Last code change: 2006-10-30 04:09:35
*/
sm_evthr_req_P
evthr_req_new(sm_evthr_ctx_P evthr_ctx)
{
sm_evthr_req_P req;
req = (sm_evthr_req_P) sm_zalloc(sizeof(*req));
if (NULL == req)
return NULL;
EVTHR_REQ_APP(evthr_ctx, req);
SM_ASSERT(evthr_ctx->evthr_c_tot_reqs < UINT_MAX);
++evthr_ctx->evthr_c_tot_reqs;
return req;
}
/*
** EVTHR_REQS_FREE -- free all requests
**
** Parameters:
** evthr_ctx -- evthr context
**
** Returns:
** usual sm_error code
**
** Locking:
** does not lock req list; must be done be caller.
**
** Last code review:
** Last code change: 2005-03-22 23:39:41
*/
sm_ret_T
evthr_reqs_free(sm_evthr_ctx_P evthr_ctx)
{
sm_evthr_req_P req, req_next;
SM_IS_EVTHR_CTX(evthr_ctx);
for (req = EVTHR_REQ_FIRST(evthr_ctx); req != EVTHR_REQ_END(evthr_ctx); req = req_next)
{
req_next = EVTHR_REQ_NEXT(req);
EVTHR_REQ_DEL(evthr_ctx, req);
sm_free_size(req, sizeof(*req));
}
EVTHR_REQ_INIT(evthr_ctx);
evthr_ctx->evthr_c_tot_reqs = 0;
return SM_SUCCESS;
}
syntax highlighted by Code2HTML, v. 0.9.1