/* * Copyright (c) 2002-2006 Sendmail, Inc. and its suppliers. * All rights reserved. * Copyright (c) 2006 Claus Assmann * * By using this file, you agree to the terms and conditions set * forth in the LICENSE file which can be found at the top level of * the sendmail distribution. */ /* ** partly based on workq.c from Programming with POSIX Threads ** by David Butenhof. */ #include "sm/generic.h" SM_RCSID("@(#)$Id: evthr.c,v 1.127 2007/01/01 02:10:25 ca Exp $") #include "sm/error.h" #include "sm/assert.h" #include "sm/memops.h" #include "sm/time.h" #include "sm/heap.h" #include "sm/evthr.h" #include "sm/io.h" #include "sm/socket.h" #include "evthr-int.h" #define LIBEVTHR_LOG_DEFINES 1 #include "log.h" #ifndef EVTHR_DBG_DELAY # define EVTHR_DBG_DELAY 0 #endif /* ** Notice: ** The timeout granularity that can be achieved seems to depend on ** the scheduling granularity. Some tests (EVTHR_DBG_DELAY=1) show that ** the time actually waited in select() might differ by up to 20ms ** from the timeout specified. Assuming that we have a scheduling ** granularity of 10ms, this doesn't seem to be unreasonable. ** Question: is this "good enough" or do we need to consider a ** different approach (see the design docs). ** select() timeout deviations: ** FreeBSD 3.2 25 ms ** OSF/1 0.5ms */ /* ** Todo: We should use better error codes in here, i.e., ** include the module number in the error codes! */ /* ** A different version of sleep(), safe for pthreads on Solaris? ** Maybe use this conditionally (but how to test?) ** Problem: doesn't complain about errors. ** what about nanosleep() as alternative? */ #define sm_sleep(s) do { \ int rs; \ timeval_T st; \ \ st.tv_sec = (s); \ st.tv_usec = 0; \ do \ rs = select(0, NULL, NULL, NULL, &st); \ while (rs < 0 && errno == EINTR); \ } while (0) /* ** EVTHR_CHG_REQ -- apply change requests ** ** Parameters: ** evthr_ctx -- evthr context ** ** Returns: ** usual sm_error code ** ** Locking: ** locks request queue ** wait queue must be locked by caller. ** ** Last code review: 2006-03-10 02:12:02 ** Last code change: 2006-03-10 02:11:58 */ static sm_ret_T evthr_chg_req(sm_evthr_ctx_P evthr_ctx) { int flags, r; uint cnt, creqs; sm_evthr_req_P req; sm_evthr_task_P task, task_next; #if EVTHR_PARANOIA sm_evthr_task_P slptsk; #endif SM_IS_EVTHR_CTX(evthr_ctx); r = pthread_mutex_lock(&evthr_ctx->evthr_c_reqmut); SM_LOCK_OK(r); if (r != 0) { NOTE(NOT_REACHED) /* complain, sleep, and continue */ DPRINTF(0, (stderr, "ERROR: evthr_chg_req lock=%d\n", r)); return sm_error_perm(SM_EM_EVTHR, r); } #if EVTHR_PARANOIA slptsk = NULL; #endif cnt = 0; /* ** Note: we can't walk through the request list and ** change the tasks since they might not be locked. ** This causes n * m runtime complexity ** (length of wait queue times length of request queue), ** but the length of the request queue should be fairly short; ** maybe one or two entries. ** ** We can't use EVTHR_WAITQ_LOOP() since we may change the order ** of entries in the queue (for sleep tasks). */ task = EVTHR_WAITQ_FIRST(evthr_ctx); while (task != EVTHR_WAITQ_END(evthr_ctx) && evthr_ctx->evthr_c_nreqs > 0) { SM_IS_EVTHR_TSK(task); task_next = EVTHR_WAITQ_NEXT(task); #if EVTHR_PARANOIA /* ** Detects tasks that are twice in wait queue ** (directly next to each other only). ** We could use another bit in the status field ** to detect tasks that have already been inspected. */ SM_ASSERT(task != slptsk); SM_ASSERT(!EVTHR_ALREADY_CHK(task)); EVTHR_CHECKED(task); slptsk = task; #endif /* EVTHR_PARANOIA */ for (req = EVTHR_REQ_FIRST(evthr_ctx), creqs = 0; req != EVTHR_REQ_END(evthr_ctx); req = EVTHR_REQ_NEXT(req), ++creqs) { if (task == req->evthr_r_task) break; SM_ASSERT(creqs <= evthr_ctx->evthr_c_tot_reqs); } if (req == EVTHR_REQ_END(evthr_ctx) || task != req->evthr_r_task) { task = task_next; continue; } --evthr_ctx->evthr_c_nreqs; if (req->evthr_r_chge == EVTHR_CHG_TIME_YES || timercmp(&task->evthr_t_sleep, &req->evthr_r_sleep, >)) { DPRINTF(8, (stderr, "evthr_chg_req: change sleep time for task=%p\n", task)); task->evthr_t_sleep = req->evthr_r_sleep; /* ** Remove task from list and insert it at right place. ** This might be done more efficiently by ** checking first whether it needs to be moved. ** Notice: we may hit the same task again, but since ** we removed the request from the list that shouldn't ** matter. */ EVTHR_WAITQ_DEL(evthr_ctx, task); EVTHR_REM_FROMQ(task, EVTHR_EV_IWQ); evthr_slpq_ins(evthr_ctx, task); } /* Change event request flags */ flags = req->evthr_r_rqevf; DPRINTF(8, (stderr, "evthr_chg_req: change event flags for task=%p: old=%x, change=%x, set=%d, clr=%d\n", task, task->evthr_t_rqevf, flags, evthr_r_set(flags), evthr_r_clr(flags))); if (evthr_r_set(flags) == evthr_r_yes(EVTHR_EV_DEL)) (void) evthr_task_del(evthr_ctx, task, THR_NO_LOCK); else { if (evthr_r_set(flags)) evthr_set_ev(task, evthr_r_set_ev(flags)); if (evthr_r_clr(flags)) evthr_clr_ev(task, evthr_r_clr_ev(flags)); } EVTHR_REQ_CLR(req); ++cnt; SM_ASSERT(cnt <= evthr_ctx->evthr_c_tasks); task = task_next; } #if EVTHR_PARANOIA EVTHR_WAITQ_LOOP(evthr_ctx, task) { SM_IS_EVTHR_TSK(task); EVTHR_CLR_CHECKED(task); } #endif /* EVTHR_PARANOIA */ r = pthread_mutex_unlock(&evthr_ctx->evthr_c_reqmut); SM_ASSERT(r == 0); return SM_SUCCESS; } /* ** EVTHR_FINDFD -- find the task in wait queue belonging to a file descriptor ** ** Parameters: ** evthr_ctx -- evthr context ** fd -- activated file descriptor ** ** Returns: ** pointer to task, NULL if not found ** ** Locking: ** wait queue must be locked by caller. ** ** Warnings: ** Doesn't work if there are multiple events for one task. ** ** Last code review: 2006-03-10 02:13:22 ** Last code change: */ static sm_evthr_task_P evthr_findfd(sm_evthr_ctx_P evthr_ctx, int fd) { uint cnt; sm_evthr_task_P task; SM_IS_EVTHR_CTX(evthr_ctx); if (fd < 0) return NULL; /* Take a shortcut */ if ((uint)fd < evthr_ctx->evthr_c_maxfd) { task = evthr_ctx->evthr_c_fd2t[fd]; if (task != NULL) return task; } for (task = EVTHR_WAITQ_FIRST(evthr_ctx), cnt = 0; task != EVTHR_WAITQ_END(evthr_ctx); task = EVTHR_WAITQ_NEXT(task), ++cnt) { SM_IS_EVTHR_TSK(task); if (fd == task->evthr_t_fd) return task; SM_ASSERT(cnt <= evthr_ctx->evthr_c_tasks); } return NULL; } /* ** EVTHR_WAKEUPTHR -- wakeup or start threads to deal with tasks ** ** Parameters: ** evthr_ctx -- evthr context ** tasks -- number of activated tasks ** nbtasks -- number of activated, non-blocking tasks ** ** Returns: ** >=0: number of tasks that have not been started ** <0: usual sm_error code ** ** Locking: ** none ** ** Last code review: ** Last code change: */ static sm_ret_T evthr_wakeupthr(sm_evthr_ctx_P evthr_ctx, uint tasks, uint nbtasks) { pthread_t tid; int status; uint u, idl, act, started; sm_ret_T ret; SM_IS_EVTHR_CTX(evthr_ctx); started = 0; idl = evthr_ctx->evthr_c_idl; act = evthr_ctx->evthr_c_act; for (u = 0; u < tasks; u++) { /* ** If any threads are idling, wake one. */ DPRINTF(4, (stderr, "wakeup: idl=%u, cur=%u, act=%u, max=%u, max_h=%u, tasks=%u/%u, nbtasks=%u\n", evthr_ctx->evthr_c_idl, evthr_ctx->evthr_c_cur, evthr_ctx->evthr_c_act, evthr_ctx->evthr_c_max_s, evthr_ctx->evthr_c_max_h, u, tasks, nbtasks)); if (idl > 0) { EVTHR_CLR_FLAG(evthr_ctx, EVTHR_FL_SL_EXC); status = pthread_cond_signal(&evthr_ctx->evthr_c_cv); if (status != 0) { ret = sm_error_perm(SM_EM_EVTHR, status); goto error; } --idl; ++started; } else if (evthr_ctx->evthr_c_cur < evthr_ctx->evthr_c_max_s) { /* ** If there were no idling threads, and we're allowed ** to create a new thread, do so. */ EVTHR_CLR_FLAG(evthr_ctx, EVTHR_FL_SL_EXC); DPRINTF(3, (stderr, "Creating new worker\n")); status = pthread_create(&tid, NULL, evthr_worker, (void *) evthr_ctx); if (status != 0) { ret = sm_error_perm(SM_EM_EVTHR, status); goto error; } ++evthr_ctx->evthr_c_cur; ++act; ++started; } else if (act < evthr_ctx->evthr_c_max_h && u >= nbtasks) { /* possibly blocking task: create a new thread */ DPRINTF(2, (stderr, "Creating new worker [exceeding softlimit]\n")); status = pthread_create(&tid, NULL, evthr_worker, (void *) evthr_ctx); if (status != 0) { ret = sm_error_perm(SM_EM_EVTHR, status); goto error; } ++evthr_ctx->evthr_c_cur; ++act; ++started; } } /* values don't change... */ DPRINTF(9, (stderr, "wakeup done: idl=%d, cur=%d, max=%d\n", evthr_ctx->evthr_c_idl, evthr_ctx->evthr_c_cur, evthr_ctx->evthr_c_max_s)); return started; error: return ret; } /* ** EVTHR_ACCEPT -- accept() a new connection ** ** Parameters: ** evthr_ctx -- evthr context ** task -- task description ** fd -- file descriptor on which the connection is active ** ** Returns: ** usual sm_error code ** ** Question: should we leave this to the application?? ** It's just another "ready for read" file descriptor... ** ** Last code review: 2006-03-10 02:25:54 ** Last code change: */ static sm_ret_T evthr_accept(sm_evthr_ctx_P evthr_ctx, sm_evthr_task_P task, int fd) { int connfd; int err0, err1; sockoptlen_T l; socklen_T socklen; struct sockaddr sockaddr; sm_evthr_nc_P nc; sm_ret_T ret; ret = SM_SUCCESS; l = (sockoptlen_T) sizeof(err1); err0 = getsockopt(fd, SOL_SOCKET, SO_ERROR, (void *) &err1, &l); if (err0 == 0 && err1 != 0) { sm_log_write(evthr_ctx->evthr_c_lctx, EVTHR_LCAT_EVLOOP, EVTHR_LMOD_ACCEPT, SM_LOG_ERROR, 1, "sev=ERROR, func=evthr_accept, fd=%d, getsockopt()=%d", fd, err1); } nc = (sm_evthr_nc_P) sm_zalloc(sizeof(*nc)); if (NULL == nc) { ret = sm_error_perm(SM_EM_EVTHR, ENOMEM); goto error; } socklen = (socklen_T) sizeof(sockaddr); connfd = accept(fd, &sockaddr, &socklen); if (connfd < 0) { err0 = errno; sm_log_write(evthr_ctx->evthr_c_lctx, EVTHR_LCAT_EVLOOP, EVTHR_LMOD_ACCEPT, SM_LOG_ERROR, err0 == EINTR ? 14 : 6, "sev=ERROR, func=evthr_accept, fd=%d, accept()=%d", fd, errno); ret = sm_error_perm(SM_EM_EVTHR, err0); goto error; } else if (socklen == 0 || /* sockaddr.sin_len == 0 || */ (sockaddr.sa_family != AF_INET && sockaddr.sa_family != AF_UNIX)) { sm_log_write(evthr_ctx->evthr_c_lctx, EVTHR_LCAT_EVLOOP, EVTHR_LMOD_ACCEPT, SM_LOG_ERROR, 3, "sev=ERROR, func=evthr_accept, fd=%d, accept()=bogus_data, socklen=%d, family=%d", fd, socklen, sockaddr.sa_family); ret = sm_error_perm(SM_EM_EVTHR, errno == 0 ? EINVAL : errno); goto error; } else { /* ** Got a new connection, what to do with it?? ** Put the task fct into run queue and let it deal with it? ** How do we tell it the new fd (connfd)? */ nc->evthr_a_len = socklen; nc->evthr_a_addr = sockaddr; nc->evthr_a_fd = connfd; task->evthr_t_nc = nc; } error: return ret; } #if EVTHR_DBG_DELAY static timeval_T difft; static timeval_T prevt; static timeval_T ta, tb, td, ts; static int cnt = 0; static bool chktd(sm_evthr_ctx_P evthr_ctx, timeval_T exp, int slt) { timeval_T now, dt; ++cnt; gettimeofday(&now, NULL); timersub(&now, &exp, &dt); if (timercmp(&dt, &difft, >)) { sm_evthr_task_P task, slptsk; fprintf(stderr, "loop[%2d]: now=%ld.%06ld\n", cnt, now.tv_sec, now.tv_usec); ERRPRINTTV(" sleep=", exp); ERRPRINTTV(" diff=", dt); ERRPRINTTV(" prevt=", prevt); ERRPRINTTV(" ts=", ts); ERRPRINTTV(" td=", td); fprintf(stderr, " slt=%d\n", slt); for (task = EVTHR_WAITQ_FIRST(evthr_ctx); task != EVTHR_WAITQ_END(evthr_ctx); task = slptsk) { if (!evthr_is_slp(task) || timercmp(&now, &task->evthr_t_sleep, <)) fprintf(stderr, "not these\n"); ERRPRINTTV(" sleept=", task->evthr_t_sleep); if (!evthr_is_slp(task) || timercmp(&now, &task->evthr_t_sleep, <)) break; slptsk = EVTHR_WAITQ_NEXT(task); } return false; } return true; } #define CHKDEL \ if (!chktd(evthr_ctx, task->evthr_t_sleep, slt)) \ { \ if (ftask != task) \ ERRPRINTTV("activate 1st =", ftask->evthr_t_sleep); \ if (timeok) \ { \ ERRPRINTTV("activate slpt=", task->evthr_t_sleep); \ ERRPRINTTV(" tb=", tb); \ ERRPRINTTV(" ta=", ta); \ ERRPRINTTV(" ts=", ts); \ ERRPRINTTV(" to=", to); \ ERRPRINTTV(" td=", td); \ timersub(&now, &tb, &td); \ ERRPRINTTV(" t0=", td); \ } \ timeok = false; \ } #define CHKDEL1 \ timersub(&ta, &tb, &ts); \ timersub(&ts, &to, &td); \ if (td.tv_sec < 0 || td.tv_usec < 0) \ timersub(&to, &ts, &td); \ if (r == 0 && timercmp(&td, &difft, >)) \ { \ ERRPRINTTV("tb=", tb); \ ERRPRINTTV("ta=", ta); \ ERRPRINTTV("ts=", ts); \ ERRPRINTTV("to=", to); \ ERRPRINTTV("td=", td); \ } #endif /* EVTHR_DBG_DELAY */ #if 0 Control flow for evthr loop: /* prepare data to wait for */ /* do we add another layer of abstraction here (for select/poll/...)? or do we code this directly? */ /* read events */ /* write events */ /* timeout events */ /* wait for an event */ /* check the events that occurred */ /* add the corresponding tasks to the runqueue */ /* signal idle threads that there is new work */ #endif /* 0 */ /* ** EVTHR_LOG_SHE -- log error from signal handler ** ** Parameters: ** evthr_ctx -- evthr context ** ** Returns: ** usual sm_error code ** ** Last code review: 2006-03-10 02:27:21 ** Last code change: */ static sm_ret_T evthr_log_she(sm_evthr_ctx_P evthr_ctx) { SM_IS_EVTHR_CTX(evthr_ctx); switch (evthr_ctx->evthr_sige_where) { case EVTHR_SHE_SIGWAIT: sm_log_write(evthr_ctx->evthr_c_lctx, EVTHR_LCAT_SIGNAL, EVTHR_LMOD_SIGNAL, SM_LOG_ERROR, 1, "sev=ERROR, func=evthr_signal, sigwait=%d", evthr_ctx->evthr_sige_what); break; case EVTHR_SHE_UNKSIG: sm_log_write(evthr_ctx->evthr_c_lctx, EVTHR_LCAT_SIGNAL, EVTHR_LMOD_SIGNAL, SM_LOG_ERROR, 1, "sev=ERROR, func=evthr_signal, sigwait=%d", evthr_ctx->evthr_sige_what); break; default: sm_log_write(evthr_ctx->evthr_c_lctx, EVTHR_LCAT_SIGNAL, EVTHR_LMOD_SIGNAL, SM_LOG_ERROR, 1, "sev=ERROR, func=evthr_signal, status=sigwait_returned_unmasked_signal, signal=%d", evthr_ctx->evthr_sige_what); return sm_error_perm(SM_EM_EVTHR, SM_E_UNEXPECTED); } return SM_SUCCESS; } /* ** EVTHR_LOOP -- check for events for tasks in the wait queue ** puts activated tasks in run queue and notifies worker threads ** ** Parameters: ** evthr_ctx -- evthr context ** ** Returns: ** usual sm_error code ** ** Last code review: ** Last code change: */ sm_ret_T evthr_loop(sm_evthr_ctx_P evthr_ctx) { #if HAVE_SELECT int r, i; int maxfd, lastfd, fd; timeval_T to; fd_set rdfds, wrfds; fd_set rds, wrs; char buf[EVTHR_REQ_SIZE]; #endif /* HAVE_SELECT */ uint tasks, nbtasks, locks; sm_ret_T ret; sm_evthr_task_P task, slptsk; timeval_T now, nto; #if EVTHR_DBG_DELAY int slt; bool timeok; sm_evthr_task_P ftask; #endif uint cnt; bool canstarttask; sm_evthr_task_P fbtask; /* first blocking task in run queue */ #define WAITQ_LOCK_F 0x01 #define RUNQ_LOCK_F 0x02 #define got_lock(l, w) (l) |= (w) #define rel_lock(l, w) (l) &= ~(w) #define is_locked(l, w) (((l) & (w)) != 0) #define got_waitqlock(l) got_lock((l), WAITQ_LOCK_F) #define rel_waitqlock(l) rel_lock((l), WAITQ_LOCK_F) #define got_runqlock(l) got_lock((l), RUNQ_LOCK_F) #define rel_runqlock(l) rel_lock((l), RUNQ_LOCK_F) #define is_waitqlock(l) is_locked((l), WAITQ_LOCK_F) #define is_runqlock(l) is_locked((l), RUNQ_LOCK_F) SM_IS_EVTHR_CTX(evthr_ctx); #if HAVE_SELECT lastfd = rdpipe(evthr_ctx); #endif #if EVTHR_DBG_DELAY difft.tv_sec = 0; difft.tv_usec = 20000; nto.tv_sec = 0; nto.tv_usec = 0; prevt.tv_sec = 0; prevt.tv_usec = 0; #endif /* EVTHR_DBG_DELAY */ /* loop until something tells us to stop */ for (;;) { #if EVTHR_DBG_DELAY timeok = true; slt = 0; #endif #if HAVE_SELECT FD_ZERO(&rdfds); FD_ZERO(&wrfds); FD_ZERO(&rds); FD_ZERO(&wrs); FD_SET(rdpipe(evthr_ctx), &rdfds); FD_SET(rdpipe(evthr_ctx), &rds); maxfd = rdpipe(evthr_ctx); locks = 0; to.tv_sec = 5; to.tv_usec = 0; r = pthread_mutex_lock(&evthr_ctx->evthr_c_runqmut); SM_LOCK_OK(r); if (r != 0) { NOTE(NOT_REACHED) DPRINTF(0, (stderr, "ERROR: main loop can't get runq mutex=%d\n", r)); canstarttask = true; } else { canstarttask = (evthr_ctx->evthr_c_act < evthr_ctx->evthr_c_max_s); r = pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut); SM_ASSERT(r == 0); } DPRINTF(9, (stderr, "main loop\n")); r = pthread_mutex_lock(&evthr_ctx->evthr_c_waitqmut); SM_LOCK_OK(r); DPRINTF(9, (stderr, "main loop: got waitq lock=%d\n", r)); if (r != 0) { NOTE(NOT_REACHED) /* complain, sleep, and continue */ DPRINTF(0, (stderr, "ERROR: main loop didn't get waitq lock: %d\n", r)); continue; /* break if too many errors? */ } got_waitqlock(locks); ret = evthr_chg_req(evthr_ctx); if (sm_is_err(ret)) { /* complain, sleep, and continue */ DPRINTF(0, (stderr, "ERROR: evthr_chg_req=%x\n", ret)); r = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut); rel_waitqlock(locks); continue; } /* Retrieve earliest wakeup time (wait queue is sorted!) */ if (!EVTHR_WAITQ_EMPTY(evthr_ctx)) { task = EVTHR_WAITQ_FIRST(evthr_ctx); SM_IS_EVTHR_TSK(task); if (evthr_is_slp(task) && (canstarttask || !EVTHRT_IS_FLAG(task, EVTHRT_FL_BLK_SL))) { /* ** This should be "our" function ** that deals with errors */ r = gettimeofday(&now, NULL); timersub(&task->evthr_t_sleep, &now, &nto); #if EVTHR_DBG_DELAY prevt = nto; #endif if (nto.tv_sec < 0) to.tv_sec = 0; else to = nto; } } #if EVTHR_PARANOIA slptsk = NULL; #endif cnt = 0; EVTHR_WAITQ_LOOP(evthr_ctx, task) { SM_IS_EVTHR_TSK(task); #if EVTHR_PARANOIA /* ** Detects tasks that are twice in wait queue ** (directly next to each other only). ** We could use another bit in the status field ** to detect tasks that have already been inspected. */ SM_ASSERT(task != slptsk); SM_ASSERT(!EVTHR_ALREADY_CHK(task)); EVTHR_CHECKED(task); slptsk = task; #endif /* EVTHR_PARANOIA */ r = evthr_rqevents(task); fd = task->evthr_t_fd; DPRINTF(3, (stderr, "main loop: got task=%p, fd=%d, rqevents=%x, flags=%x, canstart=%d\n", task, fd, r, task->evthr_t_flags, canstarttask)); if ((evthr_is_rdf(r) || evthr_is_lif(r)) && (canstarttask || !EVTHRT_IS_FLAG(task, EVTHRT_FL_BLK_RD))) { SM_ASSERT(is_valid_fd(fd)); FD_SET(fd, &rds); FD_SET(fd, &rdfds); SET_MAX(maxfd, fd); } if (evthr_is_wrf(r) && (canstarttask || !EVTHRT_IS_FLAG(task, EVTHRT_FL_BLK_WR))) { SM_ASSERT(is_valid_fd(fd)); FD_SET(fd, &wrfds); FD_SET(fd, &wrs); SET_MAX(maxfd, fd); } /* Skip over SLeep entries */ ++cnt; SM_ASSERT(cnt <= evthr_ctx->evthr_c_tasks); } #if EVTHR_PARANOIA EVTHR_WAITQ_LOOP(evthr_ctx, task) { SM_IS_EVTHR_TSK(task); EVTHR_CLR_CHECKED(task); } #endif /* EVTHR_PARANOIA */ r = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut); rel_waitqlock(locks); tasks = 0; nbtasks = 0; DPRINTF(4, (stderr, "main loop select, maxfd=%d, tmout=%ld.%06ld\n", maxfd + 1, to.tv_sec, to.tv_usec)); #if EVTHR_DBG_DELAY gettimeofday(&tb, NULL); #endif r = select(maxfd + 1, &rdfds, &wrfds, NULL, &to); gettimeofday(&now, NULL); /* make sure time increases */ if (timercmp(&now, &evthr_ctx->evthr_c_time, >)) evthr_ctx->evthr_c_time = now; #if EVTHR_DBG_DELAY ta = evthr_ctx->evthr_c_time; #endif DPRINTF(4, (stderr, "main loop select=%d, errno=%d\n", r, errno)); if (r < 0) { if (errno != EINTR) { sm_log_write(evthr_ctx->evthr_c_lctx, EVTHR_LCAT_EVLOOP, EVTHR_LMOD_ACCEPT, SM_LOG_ERROR, 1, "sev=ERROR, func=evthr_loop, fd=%d, select()=%d, error=%s", maxfd, r, strerror(errno)); sm_sleep(1); } continue; } #if EVTHR_DBG_DELAY CHKDEL1 #endif r = pthread_mutex_lock(&evthr_ctx->evthr_c_runqmut); SM_LOCK_OK(r); if (r != 0) { NOTE(NOT_REACHED) DPRINTF(0, (stderr, "sev=ERROR, main loop can't get runq mutex=%d\n", r)); continue; } got_runqlock(locks); r = pthread_mutex_lock(&evthr_ctx->evthr_c_waitqmut); SM_LOCK_OK(r); if (r != 0) { NOTE(NOT_REACHED) DPRINTF(0, (stderr, "sev=ERROR, main loop can't get waitq mutex=%d\n", r)); r = pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut); rel_runqlock(locks); continue; } got_waitqlock(locks); EVTHR_CLR_WAKEUP(evthr_ctx); /* first blocking task appended to run queue */ fbtask = NULL; /* * Move task from wait queue to run queue. * Note: this deals with potentially blocking tasks and keeps track of * the first one. If priorities are used, the priority of a potentially * blocking task is forced to be the lowest priority (== largest value). * * locking: waitq and runq must be locked. */ #if EVTHR_PRIOS <= 1 # define EVTHR_T_INC_PRIO(evthr_ctx, task) SM_NOOP #else # define EVTHR_T_INC_PRIO(evthr_ctx, task) do { \ if ((evthr_ctx)->evthr_c_nprio > 1) { \ ++EVTHR_T_PRIO(task); \ if (EVTHR_T_PRIO(task) >= (evthr_ctx)->evthr_c_nprio) \ EVTHR_T_PRIO_SET(task, 0); \ } \ } while (0) #endif /* EVTHR_PRIOS <= 1 */ /* See NOTE above! */ #define WQ2RQ(evthr_ctx, task) do { \ EVTHR_WAITQ_DEL(evthr_ctx, task); \ if (EVTHRT_IS_FLAG(task, EVTHRT_FL_BLOCK)) { \ EVTHR_T_PRIO_SET(task, EVTHR_PRIOS - 1); \ EVTHR_RUNQ_APP(evthr_ctx, EVTHR_T_PRIO(task), task); \ if (NULL == fbtask) \ fbtask = task; \ } \ else if (NULL == fbtask \ || EVTHR_T_PRIO(task) < EVTHR_PRIOS - 1) { \ ++nbtasks; \ EVTHR_RUNQ_APP(evthr_ctx, EVTHR_T_PRIO(task), task); \ } \ else { \ ++nbtasks; \ EVTHR_RUNQ_INS(evthr_ctx, EVTHR_T_PRIO(task), fbtask, task);\ } \ EVTHR_IS_INQ(task, EVTHR_EV_IRQ); \ ++tasks; \ EVTHR_T_INC_PRIO(evthr_ctx, task); \ } while (0) /* Check I/O activity */ /* ** todo: Abstract this by introducing macros to loop through the fds ** then it can be used for select(), poll(), ... (hopefully) */ i = lastfd; do { /* Optimization: stop when # of ready fds reached */ /* ** Should we keep the listening fd's as entries in the wait queue?? ** Make it configurable? (another bit in _ev). */ /* note: this "if/else if" is a bit stange to decrease indentation */ if (FD_ISSET(i, &rds) && FD_ISSET(i, &rdfds) && rdpipe(evthr_ctx) == i) { DPRINTF(8, (stderr, "main loop pipe\n")); r = read(rdpipe(evthr_ctx), buf, 1); DPRINTF(4, (stderr, "main loop pipe=%d, buf=%c\n", r, buf[0])); if (r < 0) continue; /* complain */ else if (r == 0) continue; /* complain */ ret = SM_SUCCESS; switch (buf[0]) { case EVTHR_STOP: case EVTHR_ABRT: goto done; case EVTHR_CONT: break; case EVTHR_USR1: case EVTHR_USR2: r = EVTHR_WHY2IDX(buf[0]); if (r < 0 || r >= EVTHR_MAX_SIGS) break; task = evthr_ctx->evthr_c_sg2t[r]; if (NULL == task) break; /* better check than this? */ if (!evthr_is_inwq(task)) break; EVTHR_GOT_EV(task, EVTHR_EV_SG_Y); WQ2RQ(evthr_ctx, task); break; case EVTHR_ERROR: evthr_log_she(evthr_ctx); break; default: sm_log_write(evthr_ctx->evthr_c_lctx, EVTHR_LCAT_EVLOOP, EVTHR_LMOD_ACCEPT, SM_LOG_ERROR, 1, "sev=ERROR, func=evthr_loop, rd_pipe=%#x, status=unknown_value" , buf[0]); break; } if (sm_is_err(ret)) continue; } else if (FD_ISSET(i, &rds) && FD_ISSET(i, &rdfds)) { DPRINTF(8, (stderr, "main loop read task(?)\n")); task = evthr_findfd(evthr_ctx, i); if (task != NULL) { SM_IS_EVTHR_TSK(task); evthr_clr_rdy(task); r = evthr_rqevents(task); DPRINTF(3, (stderr, "main loop read task=%p, ev=%x\n", task, r)); if (evthr_is_lif(r)) { EVTHR_GOT_EV(task, EVTHR_EV_LI_Y); ret = evthr_accept( evthr_ctx, task, i); } else { EVTHR_GOT_EV(task, EVTHR_EV_RD_Y); if (FD_ISSET(i, &wrs) && FD_ISSET(i, &wrfds)) EVTHR_GOT_EV(task, EVTHR_EV_WR_Y); ret = SM_SUCCESS; } if (sm_is_success(ret)) { WQ2RQ(evthr_ctx, task); if (evthr_is_slp(task) && timercmp(&evthr_ctx->evthr_c_time, &task->evthr_t_sleep, >=)) EVTHR_GOT_EV(task, EVTHR_EV_SL_Y); } } } else if (FD_ISSET(i, &wrs) && FD_ISSET(i, &wrfds)) { DPRINTF(8, (stderr, "main loop write task\n")); task = evthr_findfd(evthr_ctx, i); if (task != NULL) { evthr_clr_rdy(task); EVTHR_GOT_EV(task, EVTHR_EV_WR_Y); if (evthr_is_slp(task) && timercmp(&evthr_ctx->evthr_c_time, &task->evthr_t_sleep, >=)) EVTHR_GOT_EV(task, EVTHR_EV_SL_Y); SM_IS_EVTHR_TSK(task); DPRINTF(3, (stderr, "main loop write task=%p\n", task)); WQ2RQ(evthr_ctx, task); } } /* NEXT fd */ if (++i > maxfd) i = 0; /* while not DONE(fd) */ } while (i != lastfd); /* ** Check timeouts activity. ** Note: if a task had I/O activity, then it is already ** removed from wait queue, hence even if a timeout occurred ** too, that event will not be noted in the event mask. */ #if EVTHR_DBG_DELAY ftask = EVTHR_WAITQ_FIRST(evthr_ctx); #endif for (task = EVTHR_WAITQ_FIRST(evthr_ctx); task != EVTHR_WAITQ_END(evthr_ctx); task = slptsk) { SM_IS_EVTHR_TSK(task); if (!evthr_is_slp(task) || timercmp(&evthr_ctx->evthr_c_time, &task->evthr_t_sleep, <)) break; #if EVTHR_DBG_DELAY CHKDEL #endif DPRINTF(3, (stderr, "main loop: add sleep task %p\n", task)); evthr_clr_rdy(task); EVTHR_GOT_EV(task, EVTHR_EV_SL_Y); slptsk = EVTHR_WAITQ_NEXT(task); WQ2RQ(evthr_ctx, task); #if EVTHR_DBG_DELAY ++slt; #endif } SM_ASSERT(is_waitqlock(locks)); r = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut); SM_ASSERT(r == 0); rel_waitqlock(locks); SM_ASSERT(is_runqlock(locks)); r = pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut); SM_ASSERT(r == 0); rel_runqlock(locks); DPRINTF(8, (stderr, "main loop tasks=%d\n", tasks)); #if EVTHR_DBG_DELAY if (!timeok) { gettimeofday(&now, NULL); ERRPRINTTV("start: now=", now); } #endif /* EVTHR_DBG_DELAY */ if (tasks > 0) { ret = evthr_wakeupthr(evthr_ctx, tasks, nbtasks); DPRINTF(7, (stderr, "main loop started tasks=%d/%u\n" , ret, tasks)); } fbtask = NULL; #endif /* HAVE_SELECT */ } done: if (is_waitqlock(locks)) { r = pthread_mutex_unlock(&evthr_ctx->evthr_c_waitqmut); rel_waitqlock(locks); } if (is_runqlock(locks)) { r = pthread_mutex_unlock(&evthr_ctx->evthr_c_runqmut); rel_runqlock(locks); } return SM_SUCCESS; } /* ** EVTHR_TIMEVAL -- return (cached) time in evthr system ** ** Parameters: ** evthr_ctx -- evthr context ** ct -- time (output) ** ** Returns: ** usual sm_error code */ sm_ret_T evthr_timeval(sm_evthr_ctx_P evthr_ctx, timeval_T *ct) { SM_IS_EVTHR_CTX(evthr_ctx); SM_REQUIRE(ct != NULL); /* ** Need to lock access to the structure in case it is updated. ** Maybe a read/write lock? */ *ct = evthr_ctx->evthr_c_time; return SM_SUCCESS; } /* ** EVTHR_TIME -- return (cached) time in evthr system ** ** Parameters: ** evthr_ctx -- evthr context ** ** Returns: ** time in seconds since the epoch */ time_T evthr_time(sm_evthr_ctx_P evthr_ctx) { SM_IS_EVTHR_CTX(evthr_ctx); /* See above: evthr_timeval() about locking */ return evthr_ctx->evthr_c_time.tv_sec; }