/* * Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers. * All rights reserved. * * By using this file, you agree to the terms and conditions set * forth in the LICENSE file which can be found at the top level of * the sendmail distribution. */ #include "sm/generic.h" SM_RCSID("@(#)$Id: evthrst.c,v 1.41 2006/12/29 01:31:22 ca Exp $") #include "sm/error.h" #include "sm/assert.h" #include "sm/memops.h" #include "sm/heap.h" #include "sm/evthr.h" #include "sm/io.h" #include "evthr-int.h" #include "log.h" /* ** EVTHR_INIT -- initialize event thread system ** ** Parameters: ** pevthr_ctx -- (pointer to) evthr context (output) ** minthr -- minimum number of threads ** maxthr -- maximum number of threads ** maxfd -- maximum number of fds ** ** Returns: ** usual sm_error code ** ** Locking: none ** ** Last code review: 2006-03-12 20:18:41 ** Last code change: 2006-10-29 17:41:05 */ sm_ret_T evthr_init(sm_evthr_ctx_P *pevthr_ctx, uint minthr, uint maxthr, uint maxfd) { int status; uint u; sm_ret_T ret; pthread_t tid; sm_evthr_ctx_P evthr_ctx; int pipefd[2]; sm_evthr_req_P req; NOTE(NO_COMPETING_THREADS_NOW) SM_REQUIRE(pevthr_ctx != NULL); SM_REQUIRE(maxthr >= 1); SM_REQUIRE(maxfd >= 1); evthr_ctx = (sm_evthr_ctx_P) sm_zalloc(sizeof(*evthr_ctx)); if (NULL == evthr_ctx) return sm_error_temp(SM_EM_EVTHR, ENOMEM); u = sizeof(*evthr_ctx->evthr_c_fd2t) * maxfd; SM_ASSERT(u >= sizeof(*evthr_ctx->evthr_c_fd2t)); SM_ASSERT(u >= maxfd); evthr_ctx->evthr_c_fd2t = (sm_evthr_task_P *) sm_zalloc(u); if (NULL == evthr_ctx->evthr_c_fd2t) { ret = sm_error_temp(SM_EM_EVTHR, ENOMEM); goto error; } evthr_ctx->evthr_c_maxfd = maxfd; pipefd[0] = pipefd[1] = -1; EVTHR_RUNQ_INIT(evthr_ctx); EVTHR_WAITQ_INIT(evthr_ctx); EVTHR_REQ_INIT(evthr_ctx); evthr_ctx->evthr_c_min = minthr; evthr_ctx->evthr_c_max_s = maxthr; evthr_ctx->evthr_c_max_h = maxthr; #if EVTHR_PRIOS > 1 evthr_ctx->evthr_c_nprio = EVTHR_PRIOS; #endif /* should we start min thread here? */ evthr_ctx->evthr_c_cur = 0; evthr_ctx->evthr_c_idl = 0; evthr_ctx->evthr_c_act = 0; if (pipe(pipefd) < 0) { ret = sm_error_perm(SM_EM_EVTHR, errno); goto error; } ret = sm_fd_nonblock(pipefd[0], true); if (sm_is_err(ret)) goto error; ret = sm_fd_nonblock(pipefd[1], true); if (sm_is_err(ret)) goto error; evthr_ctx->evthr_c_pipe[0] = pipefd[0]; evthr_ctx->evthr_c_pipe[1] = pipefd[1]; /* set up logging */ ret = sm_log_create(NULL, &evthr_ctx->evthr_c_lctx, NULL); if (sm_is_err(ret)) goto error; ret = sm_log_setfp_fd(evthr_ctx->evthr_c_lctx, smiolog, SMIOLOG_FILENO); if (sm_is_err(ret)) goto error; ret = sm_log_setdebuglevel(evthr_ctx->evthr_c_lctx, 2); if (sm_is_err(ret)) goto error; /* ** Initialize signal mask and start signal handler thread. ** Must be done before any thread is started, otherwise ** the signal mask is not set correctly for all threads. */ ret = evthr_signal_init(evthr_ctx); if (sm_is_err(ret)) goto error; /* initialize mutexes, cv */ status = pthread_mutex_init(&evthr_ctx->evthr_c_runqmut, SM_PTHREAD_MUTEXATTR); if (status != 0) { ret = sm_error_perm(SM_EM_EVTHR, status); goto error; } status = pthread_mutex_init(&evthr_ctx->evthr_c_waitqmut, SM_PTHREAD_MUTEXATTR); if (status != 0) { ret = sm_error_perm(SM_EM_EVTHR, status); goto err2; } status = pthread_cond_init(&evthr_ctx->evthr_c_cv, NULL); if (status != 0) { ret = sm_error_perm(SM_EM_EVTHR, status); goto err1; } /* necessary "early" for evthr_req_new() and evthr_reqs_free() */ evthr_ctx->sm_magic = SM_EVTHR_CTX_MAGIC; status = pthread_mutex_init(&evthr_ctx->evthr_c_reqmut, SM_PTHREAD_MUTEXATTR); if (status != 0) { ret = sm_error_perm(SM_EM_EVTHR, status); goto err0; } /* How many request contexts should be created?? */ for (u = 0; u < minthr; u++) { req = evthr_req_new(evthr_ctx); if (NULL == req) goto errora; } /* done... */ *pevthr_ctx = evthr_ctx; for (u = 0; u < minthr; u++) { status = pthread_create(&tid, NULL, evthr_worker, (void *)evthr_ctx); if (status != 0) { ret = sm_error_perm(SM_EM_EVTHR, status); goto err0; } ++evthr_ctx->evthr_c_cur; } NOTE(COMPETING_THREADS_NOW) status = gettimeofday(&evthr_ctx->evthr_c_time, NULL); if (status < 0) ret = sm_error_temp(SM_EM_EVTHR, errno); return ret; /* Todo: we need to stop threads that have been started! */ errora: (void) evthr_reqs_free(evthr_ctx); /* maybe call evthr_stop()? however, the process will exit anyway */ err0: (void) pthread_cond_destroy(&evthr_ctx->evthr_c_cv); err1: (void) pthread_mutex_destroy(&evthr_ctx->evthr_c_waitqmut); err2: (void) pthread_mutex_destroy(&evthr_ctx->evthr_c_runqmut); error: SM_FREE(evthr_ctx->evthr_c_fd2t); SM_FREE_SIZE(evthr_ctx, sizeof(*evthr_ctx)); if (pipefd[0] != -1) close(pipefd[0]); if (pipefd[1] != -1) close(pipefd[1]); *pevthr_ctx = NULL; return ret; } /* ** EVTHR_STOP -- stop event thread system ** ** Parameters: ** evthr_ctx -- evthr context ** ** Returns: ** usual sm_error code ** ** Locking: ** locks run queue so nothing gets started anymore ** ** Last code review: ** Last code change: */ sm_ret_T evthr_stop(sm_evthr_ctx_P evthr_ctx) { int status; sm_ret_T ret; sm_evthr_task_P task, task2; SM_IS_EVTHR_CTX(evthr_ctx); ret = SM_SUCCESS; status = pthread_mutex_lock(&evthr_ctx->evthr_c_runqmut); SM_LOCK_OK(status); DPRINTF(0, (stderr, "stop: lock=%d\n", status)); if (status != 0) { NOTE(NOT_REACHED) return sm_error_perm(SM_EM_EVTHR, status); } /* stop threads? */ DPRINTF(0, (stderr, "stop: cur=%d\n", evthr_ctx->evthr_c_cur)); if (evthr_ctx->evthr_c_cur > 0) { /* more flags? NOW, IMMEDIATE, ... */ EVTHR_SET_FLAG(evthr_ctx, EVTHR_FL_STOP); DPRINTF(0, (stderr, "stop: idle=%d\n", evthr_ctx->evthr_c_idl)); if (evthr_ctx->evthr_c_idl > 0) { status = pthread_cond_broadcast(&evthr_ctx->evthr_c_cv); if (status != 0) { DPRINTF(0, (stderr, "stop: broadcast=%d\n", status)); ret = sm_error_perm(SM_EM_EVTHR, status); goto errunlock; } } while (evthr_ctx->evthr_c_cur > 0) { /* Use pthread_cond_timedwait()? */ status = pthread_cond_wait(&evthr_ctx->evthr_c_cv, &evthr_ctx->evthr_c_runqmut); if (status != 0) { DPRINTF(0, (stderr, "stop: condwait=%d\n", status)); ret = sm_error_perm(SM_EM_EVTHR, status); goto errunlock; } } } /* ** Free the elements in the queues?? ** Should we call some task-specific cleanup fct? ** Currently this leaks memory (and maybe accept fds); see below. ** However, this doesn't matter that much because the program ** terminates "soon". */ /* remove entries from the wait queue */ status = pthread_mutex_lock(&evthr_ctx->evthr_c_waitqmut); SM_LOCK_OK(status); NOTE(NO_COMPETING_THREADS_NOW) if (0 == status) { for (task = EVTHR_WAITQ_FIRST(evthr_ctx); task != EVTHR_WAITQ_END(evthr_ctx); task = task2) { task2 = EVTHR_WAITQ_NEXT(task); CLOSE_FD(task->evthr_t_fd); if (task->evthr_t_nc != NULL) CLOSE_FD(task->evthr_t_nc->evthr_a_fd); /* todo: free the task itself */ EVTHR_WAITQ_DEL(evthr_ctx, task); } status = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut); } else { NOTE(NOT_REACHED) } (void) close(evthr_ctx->evthr_c_pipe[0]); (void) close(evthr_ctx->evthr_c_pipe[1]); SM_FREE(evthr_ctx->evthr_c_fd2t); status = pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut); DPRINTF(0, (stderr, "stop: unlock=%d\n", status)); if (status != 0) return sm_error_perm(SM_EM_EVTHR, status); /* destroy mutex, cv */ status = pthread_mutex_destroy(&evthr_ctx->evthr_c_waitqmut); DPRINTF(0, (stderr, "stop: waitq_mutex_destroy=%d\n", status)); if (status != 0) ret = sm_error_perm(SM_EM_EVTHR, status); status = pthread_mutex_destroy(&evthr_ctx->evthr_c_runqmut); DPRINTF(0, (stderr, "stop: runq_mutex_destroy=%d\n", status)); if (status != 0) ret = sm_error_perm(SM_EM_EVTHR, status); status = pthread_cond_destroy(&evthr_ctx->evthr_c_cv); DPRINTF(0, (stderr, "stop: cond_destroy=%d\n", status)); if (status != 0) ret = sm_error_perm(SM_EM_EVTHR, status); status = evthr_reqs_free(evthr_ctx); status = pthread_mutex_destroy(&evthr_ctx->evthr_c_reqmut); DPRINTF(0, (stderr, "stop: req_mutex_destroy=%d\n", status)); if (status != 0) ret = sm_error_perm(SM_EM_EVTHR, status); sm_free((void *)evthr_ctx); return ret; errunlock: (void) pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut); return ret; } /* ** EVTHR_REQ_NEW -- create a new request structure and append it to the list ** ** Parameters: ** evthr_ctx -- evthr context ** ** Returns: ** pointer to request structure; NULL on failure (ENOMEM) ** ** Side Effects: none on error ** ** Locking: ** does not lock req list; must be done be caller. ** ** Last code review: 2005-03-22 23:38:13 ** Last code change: 2006-10-30 04:09:35 */ sm_evthr_req_P evthr_req_new(sm_evthr_ctx_P evthr_ctx) { sm_evthr_req_P req; req = (sm_evthr_req_P) sm_zalloc(sizeof(*req)); if (NULL == req) return NULL; EVTHR_REQ_APP(evthr_ctx, req); SM_ASSERT(evthr_ctx->evthr_c_tot_reqs < UINT_MAX); ++evthr_ctx->evthr_c_tot_reqs; return req; } /* ** EVTHR_REQS_FREE -- free all requests ** ** Parameters: ** evthr_ctx -- evthr context ** ** Returns: ** usual sm_error code ** ** Locking: ** does not lock req list; must be done be caller. ** ** Last code review: ** Last code change: 2005-03-22 23:39:41 */ sm_ret_T evthr_reqs_free(sm_evthr_ctx_P evthr_ctx) { sm_evthr_req_P req, req_next; SM_IS_EVTHR_CTX(evthr_ctx); for (req = EVTHR_REQ_FIRST(evthr_ctx); req != EVTHR_REQ_END(evthr_ctx); req = req_next) { req_next = EVTHR_REQ_NEXT(req); EVTHR_REQ_DEL(evthr_ctx, req); sm_free_size(req, sizeof(*req)); } EVTHR_REQ_INIT(evthr_ctx); evthr_ctx->evthr_c_tot_reqs = 0; return SM_SUCCESS; }