/* ** NGPT - Next Generation POSIX Threading ** Copyright (c) 2001 IBM Corporation ** Portions Copyright (c) 1999-2000 Ralf S. Engelschall ** ** This file is part of NGPT, a non-preemptive thread scheduling ** library which can be found at http://www.ibm.com/developer. ** ** This library is free software; you can redistribute it and/or ** modify it under the terms of the GNU Lesser General Public ** License as published by the Free Software Foundation; either ** version 2.1 of the License, or (at your option) any later version. ** ** This library is distributed in the hope that it will be useful, ** but WITHOUT ANY WARRANTY; without even the implied warranty of ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ** Lesser General Public License for more details. ** ** You should have received a copy of the GNU Lesser General Public ** License along with this library; if not, write to the Free Software ** Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 ** USA. ** ** pth_sched.c: Pth thread scheduler, the real heart of Pth */ /* ``Recursive, adj.; see Recursive.'' -- Unknown */ #include "pth_p.h" intern struct pth_descr_st pth_native_list[PTH_MAX_NATIVE_THREADS]; /* complete list of native thread descriptors. */ intern pth_t pth_main; /* the main thread */ intern pth_pqueue_t pth_NQ; /* queue of new threads */ intern pth_pqueue_t pth_RQ; /* queue of threads ready to run */ intern pth_pqueue_t pth_WQ; /* queue of threads waiting for an event */ intern pth_pqueue_t pth_SQ; /* queue of suspended threads */ intern pth_pqueue_t pth_DQ; /* queue of terminated threads */ intern float pth_loadval; /* average scheduler load value */ static pth_time_t pth_loadticknext; static pth_time_t pth_loadtickgap = PTH_TIME(1,0); intern sigset_t pth_sigblock; intern pth_qlock_t pth_native_lock; intern pth_qlock_t pth_sig_lock; intern pth_qlock_t pth_sched_lock; intern void pth_allthread_mask(void) { sigset_t tmp; pth_t t; int slot = 0; /* calculate global signal mask based on all threads sigmask */ sigfillset(&tmp); pth_lock_all(); for (t = pth_pqueue_head(&pth_NQ); t != NULL; t = pth_pqueue_walk(&pth_NQ, t, PTH_WALK_NEXT)) { sigandset(&tmp, &tmp, &t->mctx.sigs); } for (t = pth_pqueue_head(&pth_RQ); t != NULL; t = pth_pqueue_walk(&pth_RQ, t, PTH_WALK_NEXT)) { sigandset(&tmp, &tmp, &t->mctx.sigs); } for (t = pth_pqueue_head(&pth_WQ); t != NULL; t = pth_pqueue_walk(&pth_WQ, t, PTH_WALK_NEXT)) { sigandset(&tmp, &tmp, &t->mctx.sigs); } /* ???? */ for (t = pth_pqueue_head(&pth_SQ); t != NULL; t = pth_pqueue_walk(&pth_SQ, t, PTH_WALK_NEXT)) { sigandset(&tmp, &tmp, &t->mctx.sigs); } pth_release_all(); pth_acquire_lock(&pth_native_lock); while (pth_native_list[slot].is_used) { if ((t = pth_native_list[slot].current) && (t != pth_native_list[slot].sched)) sigandset(&tmp, &tmp, &t->mctx.sigs); slot++; } pth_release_lock(&pth_native_lock); /* store all thread's sigmask */ pth_acquire_lock(&pth_sig_lock); memcpy((void *)&pth_sigblock, &tmp, sizeof(sigset_t)); pth_release_lock(&pth_sig_lock); return; } /* initialize the scheduler ingredients */ intern void pth_scheduler_init(void) { pth_descr_t descr = NULL; /* No natives yet... */ pth_number_of_natives = 0; /* initialize pth_native_lock */ pth_native_lock.lock = 0; pth_native_lock.owner = 0; pth_native_lock.count = 0; if ((descr = pth_alloc_native(FALSE, FALSE)) == NULL) { fprintf(stderr, "**Pth** INIT: Cannot allocate initial native thread descriptor: %s\n", strerror(errno)); abort(); } /* fill in the first native slot... */ descr->pid = getpid(); descr->tid = gettid(); descr->stacksize = 0; descr->true_stack = NULL; descr->stack = NULL; descr->sched_index = 0; descr->is_bound = 1; /* create the internal signal pipe */ if (pipe(descr->sigpipe) == -1) { fprintf(stderr, "**Pth** INIT: Cannot create internal pipe: %s\n", strerror(errno)); abort(); } pth_fdmode(descr->sigpipe[0], PTH_FDMODE_NONBLOCK); pth_fdmode(descr->sigpipe[1], PTH_FDMODE_NONBLOCK); /* initalize the thread queues */ pth_pqueue_init(&pth_NQ); pth_pqueue_init(&pth_RQ); pth_pqueue_init(&pth_WQ); pth_pqueue_init(&pth_SQ); pth_pqueue_init(&pth_DQ); /* initialize load support */ pth_loadval = 1.0; pth_time_set(&pth_loadticknext, PTH_TIME_NOW); sigemptyset(&pth_sigblock); /* initialize pth_sig_lock */ pth_sig_lock.lock = 0; pth_sig_lock.owner = 0; pth_sig_lock.count = 0; return; } /* drop all threads (except for the currently active one) */ intern void pth_scheduler_drop(void) { pth_t t; /* Lock all the queues... */ pth_lock_all(); /* clear the new queue */ while ((t = pth_pqueue_delmax(&pth_NQ)) != NULL); pth_tcb_free(t); /* clear the ready queue */ while ((t = pth_pqueue_delmax(&pth_RQ)) != NULL); pth_tcb_free(t); /* clear the waiting queue */ while ((t = pth_pqueue_delmax(&pth_WQ)) != NULL); pth_tcb_free(t); /* clear the suspend queue */ while ((t = pth_pqueue_delmax(&pth_SQ)) != NULL); pth_tcb_free(t); /* clear the dead queue */ while ((t = pth_pqueue_delmax(&pth_DQ)) != NULL); pth_tcb_free(t); /* Release all the queues... */ pth_release_all(); /* Now re-initialize the queues */ pth_pqueue_init(&pth_RQ); pth_pqueue_init(&pth_WQ); pth_pqueue_init(&pth_SQ); pth_pqueue_init(&pth_DQ); pth_pqueue_init(&pth_NQ); return; } /* kill the scheduler ingredients */ intern void pth_scheduler_kill(void) { /* drop all threads */ pth_scheduler_drop(); /* now kill any native threads we created... */ pth_drop_natives(); return; } /* * Update the average scheduler load. * * This is called on every context switch, but we have to adjust the * average load value every second, only. When we're called more than * once per second we handle this by just calculating anything once * and then do NOPs until the next ticks is over. When the scheduler * waited for more than once second (or a thread CPU burst lasted for * more than once second) we simulate the missing calculations. That's * no problem because we can assume that the number of ready threads * then wasn't changed dramatically (or more context switched would have * been occurred and we would have been given more chances to operate). * The actual average load is calculated through an exponential average * formula. */ #define pth_scheduler_load(now) \ if (pth_time_cmp((now), &pth_loadticknext) >= 0) { \ pth_time_t ttmp; \ int numready; \ numready = pth_pqueue_elements(&pth_RQ); \ pth_time_set(&ttmp, (now)); \ do { \ pth_loadval = (numready*0.25) + (pth_loadval*0.75); \ pth_time_sub(&ttmp, &pth_loadtickgap); \ } while (pth_time_cmp(&ttmp, &pth_loadticknext) >= 0); \ pth_time_set(&pth_loadticknext, (now)); \ pth_time_add(&pth_loadticknext, &pth_loadtickgap); \ } /* the heart of this library: the thread scheduler */ intern void *pth_scheduler(void *dummy) { sigset_t sigs; pth_time_t running; pth_time_t snapshot; struct sigaction sa; sigset_t ss; int sig; pth_t t; pth_descr_t descr = NULL; volatile pth_t current; volatile pth_t this_sched = NULL; /* * bootstrapping */ /* find the pth_t for this scheduler... */ if ((descr = pth_get_native_descr()) == NULL) { fprintf(stderr,"pth_scheduler: unable to find scheduler for pid %i. Aborting...\n", (unsigned int)gettid()); abort(); } this_sched = descr->sched; pth_debug1("pth_scheduler: bootstrapping"); /* mark this thread as a special scheduler thread */ this_sched->state = PTH_STATE_SCHEDULER; /* block all signals in the scheduler thread */ sigfillset(&sigs); pth_sc(sigprocmask)(SIG_SETMASK, &sigs, NULL); /* initialize the snapshot time for bootstrapping the loop */ pth_time_set(&snapshot, PTH_TIME_NOW); /* * endless scheduler loop */ for (;;) { pth_debug1("pth_scheduler: running..."); //pth_acquire_lock(&pth_sched_lock); /* * Move threads from new queue to ready queue and give * them maximum priority so they start immediately */ pth_acquire_lock(&(pth_NQ.q_lock)); pth_acquire_lock(&(pth_RQ.q_lock)); while ((t = pth_pqueue_tail(&pth_NQ)) != NULL) { pth_pqueue_delete(&pth_NQ, t); t->state = PTH_STATE_READY; pth_pqueue_insert(&pth_RQ, pth_pqueue_favorite_prio(&pth_RQ), t); pth_debug3("pth_scheduler: new thread(0x%lx) \"%s\" moved to top of ready queue", t, t->name); } pth_release_lock(&(pth_RQ.q_lock)); pth_release_lock(&(pth_NQ.q_lock)); /* * Update average scheduler load */ pth_scheduler_load(&snapshot); /*ibm begin*/ while(TRUE) { /* * Find next thread in ready queue */ pth_acquire_lock(&(pth_RQ.q_lock)); current = pth_set_current(pth_pqueue_delmax(&pth_RQ)); if (current == NULL) { pth_set_current(this_sched); pth_debug1("pth_scheduler: No threads ready to run on this native thread, sleeping...\n"); pth_release_lock(&(pth_RQ.q_lock)); pth_native_yield(); pth_debug1("pth_scheduler: Awake again, looking for work..."); break; } /* * See if the thread is unbound... * Break out and schedule if so... */ if (current->boundnative == 0) { pth_release_lock(&(pth_RQ.q_lock)); break; } /* * See if the thread is bound to a different native thread... * Break out and schedule if not... */ if (current->boundnative == this_sched->lastrannative) { pth_release_lock(&(pth_RQ.q_lock)); break; } /* * The thread is bound to a different native thread... * We, therefore, need to put it back in the ready queue but * we'll give it favored status... */ pth_pqueue_insert(&pth_RQ, pth_pqueue_favorite_prio(&pth_RQ), current); pth_release_lock(&(pth_RQ.q_lock)); /* Only at the final exit time, the pth_main will be bound to the first native */ if ((current == pth_main) && (current->boundnative == pth_native_list[0].tid)) { char c=(int)1; pth_sc(write)(pth_native_list[0].sigpipe[1], &c, sizeof(char)); current = NULL; break; } } if (current == NULL) goto event_wait; /*ibm end*/ pth_debug5("pth_scheduler: thread(0x%lx) \"%s\" selected (prio=%d, qprio=%d)", current, current->name, current->prio, current->q_prio); /* * Raise additionally thread-specific signals * (they are delivered when we switch the context) * * Situation is ('#' = signal pending): * process pending (descr->sigpending): ----#### * thread pending (pth_current->sigpending): --##--## * Result has to be: * process new pending: --###### */ if (current->sigpendcnt > 0) { sigpending(&descr->sigpending); for (sig = 1; sig < PTH_NSIG; sig++) if (sigismember(¤t->sigpending, sig)) if (!sigismember(&descr->sigpending, sig)) tkill(gettid(), sig); } /* * Set running start time for new thread * and perform a context switch to it */ pth_debug3("pth_scheduler: switching to thread 0x%lx (\"%s\")", (unsigned long)current, current->name); /* update thread times */ pth_time_set(¤t->lastran, PTH_TIME_NOW); /* update scheduler times */ pth_time_set(&running, ¤t->lastran); pth_time_sub(&running, &snapshot); pth_time_add(&this_sched->running, &running); /* update the native thread we're running on this time... ibm*/ current->lastrannative = gettid(); /*ibm*/ /* ** ENTERING THREAD ** - by switching the machine context */ pth_mctx_switch(&this_sched->mctx, ¤t->mctx); /* update scheduler times */ pth_time_set(&snapshot, PTH_TIME_NOW); pth_debug3("pth_scheduler: cameback from thread 0x%lx (\"%s\")", (unsigned long)current, current->name); /* * Calculate and update the time the previous thread was running */ pth_time_set(&running, &snapshot); pth_time_sub(&running, ¤t->lastran); pth_time_add(¤t->running, &running); pth_debug4("pth_scheduler: thread (0x%lx) \"%s\" ran %.6f", current, current->name, pth_time_t2d(&running)); /* * Remove still pending thread-specific signals * (they are re-delivered next time) * * Situation is ('#' = signal pending): * thread old pending (pth_current->sigpending): --##--## * process old pending (descr->sigpending): ----#### * process still pending (sigstillpending): ---#-#-# * Result has to be: * process new pending: -----#-# * thread new pending (pth_current->sigpending): ---#---# */ if (current->sigpendcnt > 0) { sigset_t sigstillpending; sigpending(&sigstillpending); for (sig = 1; sig < PTH_NSIG; sig++) { if (sigismember(¤t->sigpending, sig)) { if (!sigismember(&sigstillpending, sig)) { /* thread (and perhaps also process) signal delivered */ sigdelset(¤t->sigpending, sig); current->sigpendcnt--; } else if (!sigismember(&descr->sigpending, sig)) { /* thread signal not delivered */ pth_util_sigdelete(sig); } } } } /* * Check for stack overflow */ if (current->stackguard != NULL) { if (*current->stackguard != 0xDEAD) { pth_debug3("pth_scheduler: stack overflow detected for thread 0x%lx (\"%s\")", (unsigned long)current, current->name); /* * if the application doesn't catch SIGSEGVs, we terminate * manually with a SIGSEGV now, but output a reasonable message. */ if (sigaction(SIGSEGV, NULL, &sa) == 0) { if (sa.sa_handler == SIG_DFL) { fprintf(stderr, "**NGPT** STACK OVERFLOW: tid = %d, thread pid_t=0x%lx, name=\"%s\"\n", (int) gettid(), (unsigned long)current, current->name); /* We kill the main thread, passign the segv... */ tkill(pth_primordial_thread()->tid, SIGSEGV); sigfillset(&ss); sigdelset(&ss, SIGSEGV); sigsuspend(&ss); abort(); } } /* * else we terminate the thread only and send us a SIGSEGV * which allows the application to handle the situation... */ current->join_arg = (void *)0xDEAD; current->state = PTH_STATE_DEAD; tkill(gettid(), SIGSEGV); } } /* * When previous thread is now marked as dead, kick it out */ if (current->state == PTH_STATE_DEAD) { pth_debug3("pth_scheduler: marking thread (0x%lx) \"%s\" as dead", current, current->name); if (!current->joinable) { pth_debug2("pth_scheduler: thread 0x%lx not joinable, reaping...", current); pth_tcb_free(current); } else { pth_debug2("pth_scheduler: thread 0x%lx joinable, moving to DEAD queue...", current); pth_acquire_lock(&(pth_DQ.q_lock)); pth_pqueue_insert(&pth_DQ, PTH_PRIO_STD, current); pth_release_lock(&(pth_DQ.q_lock)); } current = NULL; } /* * When thread wants to wait for an event * move it to waiting queue now */ if (current != NULL && current->state == PTH_STATE_WAITING) { pth_debug3("pth_scheduler: moving thread (0x%lx) \"%s\" to waiting queue", current, current->name); pth_acquire_lock(&(pth_WQ.q_lock)); pth_pqueue_insert(&pth_WQ, current->prio, current); pth_release_lock(&(pth_WQ.q_lock)); current = NULL; } /* * migrate old treads in ready queue into higher * priorities to avoid starvation and insert last running * thread back into this queue, too. */ pth_acquire_lock(&(pth_RQ.q_lock)); pth_pqueue_increase(&pth_RQ); if (current != NULL) pth_pqueue_insert(&pth_RQ, current->prio, current); pth_release_lock(&(pth_RQ.q_lock)); /* set the scheduler thread as current */ pth_set_current(this_sched); /* * Manage the events in the waiting queue, i.e. decide whether their * events occurred and move them to the ready queue. But wait only if * we have already no new or ready threads. */ event_wait: //pth_release_lock(&pth_sched_lock); pth_time_set(&snapshot, PTH_TIME_NOW); if ( pth_pqueue_elements(&pth_RQ) == 0 && pth_pqueue_elements(&pth_NQ) == 0 ) pth_sched_eventmanager(&snapshot, FALSE /* wait */); else pth_sched_eventmanager(&snapshot, TRUE /* poll */); } /* NOTREACHED */ return NULL; } /* * Look whether some events already occurred and move * corresponding threads from waiting queue back to ready queue. */ intern void pth_sched_eventmanager(pth_time_t *now, int dopoll) { pth_event_t nexttimer_ev; pth_time_t nexttimer_value; pth_event_t evh; pth_event_t ev; pth_t t; pth_t tlast; int this_occurred; int any_occurred; fd_set rfds; fd_set wfds; fd_set efds; struct timeval delay; struct timeval *pdelay; sigset_t oss; struct sigaction sa; struct sigaction osa[1+PTH_NSIG]; char minibuf[128]; int loop_repeat; int fdmax; int n; int rc; int sig; int tready; pth_descr_t descr = NULL; sigset_t sigs_we_block; pth_debug2("pth_sched_eventmanager: enter in %s mode", dopoll ? "polling" : "waiting"); /* Get the thread descriptor for the running thread... */ if ((descr = pth_get_native_descr()) == NULL) { pth_debug1("pth_sched_eventmanager: no scheduler found, assuming shutdown, exiting."); exit(0); } /* entry point for internal looping in event handling */ loop_entry: loop_repeat = FALSE; /* initialize fd sets */ FD_ZERO(&rfds); FD_ZERO(&wfds); FD_ZERO(&efds); fdmax = -1; /* initialize signal status */ sigpending(&descr->sigpending); sigfillset(&descr->sigblock); sigemptyset(&descr->sigcatch); sigemptyset(&descr->sigraised); /* initialize next timer */ pth_time_set(&nexttimer_value, PTH_TIME_ZERO); descr->nexttimer_thread = NULL; nexttimer_ev = NULL; if (descr == pth_primordial_thread()) { pth_acquire_lock(&pth_sig_lock); memcpy((void *)&sigs_we_block, &pth_sigblock, sizeof(sigset_t)); pth_release_lock(&pth_sig_lock); } /* for all threads in the waiting queue... */ any_occurred = FALSE; pth_acquire_lock(&(pth_WQ.q_lock)); for (t = pth_pqueue_head(&pth_WQ); t != NULL; t = pth_pqueue_walk(&pth_WQ, t, PTH_WALK_NEXT)) { /* cancellation support */ if (t->cancelreq == TRUE) any_occurred = TRUE; /* ... and all their events... */ if (t->events == NULL) continue; /* ...check whether events occurred */ ev = evh = t->events; do { if (!ev->ev_occurred) { this_occurred = FALSE; /* Filedescriptor I/O */ if (ev->ev_type == PTH_EVENT_FD) { /* filedescriptors are checked later all at once. Here we only assemble them in the fd sets */ if (ev->ev_goal & PTH_UNTIL_FD_READABLE) FD_SET(ev->ev_args.FD.fd, &rfds); if (ev->ev_goal & PTH_UNTIL_FD_WRITEABLE) FD_SET(ev->ev_args.FD.fd, &wfds); if (ev->ev_goal & PTH_UNTIL_FD_EXCEPTION) FD_SET(ev->ev_args.FD.fd, &efds); if (fdmax < ev->ev_args.FD.fd) fdmax = ev->ev_args.FD.fd; } /* Filedescriptor Set Select I/O */ else if (ev->ev_type == PTH_EVENT_SELECT) { /* filedescriptors are checked later all at once. Here we only merge the fd sets. */ pth_util_fds_merge(ev->ev_args.SELECT.nfd, ev->ev_args.SELECT.rfds, &rfds, ev->ev_args.SELECT.wfds, &wfds, ev->ev_args.SELECT.efds, &efds); if (fdmax < ev->ev_args.SELECT.nfd-1) fdmax = ev->ev_args.SELECT.nfd-1; } /* Signal Set */ else if (ev->ev_type == PTH_EVENT_SIGS) { for (sig = 1; sig < PTH_NSIG; sig++) { if (sigismember(ev->ev_args.SIGS.sigs, sig)) { /* thread signal handling */ if (sigismember(&t->sigpending, sig)) { *(ev->ev_args.SIGS.sig) = sig; sigdelset(&t->sigpending, sig); t->sigpendcnt--; this_occurred = TRUE; } /* process signal handling */ if (sigismember(&descr->sigpending, sig)) { if (ev->ev_args.SIGS.sig != NULL) *(ev->ev_args.SIGS.sig) = sig; pth_util_sigdelete(sig); sigdelset(&descr->sigpending, sig); this_occurred = TRUE; } else { if (descr == pth_primordial_thread()) { sigdelset(&sigs_we_block, sig); sigaddset(&descr->sigcatch, sig); } } } } } /* Timer */ else if (ev->ev_type == PTH_EVENT_TIME) { if (pth_time_cmp(&(ev->ev_args.TIME.tv), now) < 0) this_occurred = TRUE; else { /* remember the timer which will be elapsed next */ if (ev->ev_flags != TRUE) { if ((descr->nexttimer_thread == NULL && nexttimer_ev == NULL) || pth_time_cmp(&(ev->ev_args.TIME.tv), &nexttimer_value) < 0) { descr->nexttimer_thread = t; nexttimer_ev = ev; nexttimer_ev->ev_flags = TRUE; pth_time_set(&nexttimer_value, &(ev->ev_args.TIME.tv)); } } } } /* Message Port Arrivals */ else if (ev->ev_type == PTH_EVENT_MSG) { if (pth_ring_elements(&(ev->ev_args.MSG.mp->mp_queue)) > 0) this_occurred = TRUE; } /* Mutex Release */ else if (ev->ev_type == PTH_EVENT_MUTEX) { if (!(ev->ev_args.MUTEX.mutex->mx_state & PTH_MUTEX_LOCKED)) this_occurred = TRUE; } /* Condition Variable Signal */ else if (ev->ev_type == PTH_EVENT_COND) { if (ev->ev_args.COND.cond->cn_state & PTH_COND_SIGNALED) { if (ev->ev_args.COND.cond->cn_state & PTH_COND_BROADCAST) this_occurred = TRUE; else { if (!(ev->ev_args.COND.cond->cn_state & PTH_COND_HANDLED)) { ev->ev_args.COND.cond->cn_state |= PTH_COND_HANDLED; this_occurred = TRUE; } } } } /* Thread Termination */ else if (ev->ev_type == PTH_EVENT_TID) { if ( ( ev->ev_args.TID.tid == NULL && pth_pqueue_elements(&pth_DQ) > 0) || ( ev->ev_args.TID.tid != NULL && ev->ev_args.TID.tid->state == ev->ev_goal)) this_occurred = TRUE; } /* Custom Event Function */ else if (ev->ev_type == PTH_EVENT_FUNC) { if (ev->ev_args.FUNC.func(ev->ev_args.FUNC.arg)) this_occurred = TRUE; else { pth_time_t tv; pth_time_set(&tv, now); pth_time_add(&tv, &(ev->ev_args.FUNC.tv)); if ((descr->nexttimer_thread == NULL && nexttimer_ev == NULL) || pth_time_cmp(&tv, &nexttimer_value) < 0) { descr->nexttimer_thread = t; nexttimer_ev = ev; pth_time_set(&nexttimer_value, &tv); } } } /* tag event if it has occurred */ if (this_occurred) { pth_debug2("pth_sched_eventmanager: [non-I/O] event occurred for thread \"%s\"", t->name); ev->ev_occurred = TRUE; any_occurred = TRUE; } } } while ((ev = ev->ev_next) != evh); } pth_release_lock(&(pth_WQ.q_lock)); if (any_occurred) dopoll = TRUE; /* now decide how to poll for fd I/O and timers */ if (dopoll) { /* do a polling with immediate timeout, i.e. check the fd sets only without blocking */ pth_time_set(&delay, PTH_TIME_ZERO); pdelay = &delay; if (nexttimer_ev != NULL) nexttimer_ev->ev_flags = FALSE; } else if (nexttimer_ev != NULL) { /* do a polling with a timeout set to the next timer, i.e. wait for the fd sets or the next timer */ pth_time_set(&delay, &nexttimer_value); pth_time_sub(&delay, now); pdelay = &delay; } else { /* do a polling without a timeout, i.e. wait for the fd sets only with blocking */ pdelay = NULL; } /* clear pipe and let select() wait for the read-part of the pipe */ while (pth_sc(read)(descr->sigpipe[0], minibuf, sizeof(minibuf)) > 0) ; FD_SET(descr->sigpipe[0], &rfds); if (fdmax < descr->sigpipe[0]) fdmax = descr->sigpipe[0]; /* replace signal actions for signals we've to catch for events */ if (descr == pth_primordial_thread()) { for (sig = 1; sig < PTH_NSIG; sig++) { if (sigismember(&descr->sigcatch, sig)) { sa.sa_handler = pth_sched_eventmanager_sighandler; sigfillset(&sa.sa_mask); sa.sa_flags = 0; sigaction(sig, &sa, &osa[sig]); } } } /* allow some signals to be delivered: Either to our catching handler or directly to the configured handler for signals not catched by events */ if (descr == pth_primordial_thread()) pth_sc(sigprocmask)(SIG_SETMASK, &sigs_we_block, &oss); /* now do the polling for filedescriptor I/O and timers WHEN THE SCHEDULER SLEEPS AT ALL, THEN HERE!! */ rc = -1; descr->is_bound = 0; if (!(dopoll && fdmax == -1)) { while ((rc = pth_sc(select)(fdmax+1, &rfds, &wfds, &efds, pdelay)) < 0 && errno == EINTR) ; if (descr->nexttimer_thread == NULL) nexttimer_ev = NULL; } descr->is_bound = 1; /* restore signal mask and actions and handle signals */ if (descr == pth_primordial_thread()) { pth_sc(sigprocmask)(SIG_SETMASK, &oss, NULL); for (sig = 1; sig < PTH_NSIG; sig++) { if (sigismember(&descr->sigcatch, sig)) sigaction(sig, &osa[sig], NULL); } } /* if the timer elapsed, handle it */ if (!dopoll && rc == 0 && nexttimer_ev != NULL) { pth_debug2("pth_sched_eventmanger: nexttimer_ev = 0x%lx", nexttimer_ev); if (nexttimer_ev->ev_type == PTH_EVENT_FUNC) { /* it was an implicit timer event for a function event, so repeat the event handling for rechecking the function */ loop_repeat = TRUE; } else { /* it was an explicit timer event, standing for its own */ pth_debug3("pth_sched_eventmanager:[timeout] event occurred for thread 0x%lx \"%s\"", descr->nexttimer_thread, descr->nexttimer_thread->name); nexttimer_ev->ev_flags = FALSE; nexttimer_ev->ev_occurred = TRUE; } } /* if the internal signal pipe was used, adjust the select() results */ if (!dopoll && rc > 0 && FD_ISSET(descr->sigpipe[0], &rfds)) { FD_CLR(descr->sigpipe[0], &rfds); rc--; } /* if an error occurred, avoid confusion in the cleanup loop */ if (rc <= 0) { FD_ZERO(&rfds); FD_ZERO(&wfds); FD_ZERO(&efds); } /* now comes the final cleanup loop where we've to do two jobs: first we've to the late handling of the fd I/O events and additionally if a thread has one occurred event, we move it from the waiting queue to the ready queue */ /* for all threads in the waiting queue... */ pth_acquire_lock(&(pth_WQ.q_lock)); t = pth_pqueue_head(&pth_WQ); while (t != NULL) { /* do the late handling of the fd I/O and signal events in the waiting event ring */ any_occurred = FALSE; if (t->events != NULL) { ev = evh = t->events; do { /* * Late handling for still not occured events */ if (!ev->ev_occurred) { /* Filedescriptor I/O */ if (ev->ev_type == PTH_EVENT_FD) { if ( ( ev->ev_goal & PTH_UNTIL_FD_READABLE && FD_ISSET(ev->ev_args.FD.fd, &rfds)) || ( ev->ev_goal & PTH_UNTIL_FD_WRITEABLE && FD_ISSET(ev->ev_args.FD.fd, &wfds)) || ( ev->ev_goal & PTH_UNTIL_FD_EXCEPTION && FD_ISSET(ev->ev_args.FD.fd, &efds)) ) { pth_debug3("pth_sched_eventmanager: " "[I/O] event occurred for thread 0x%lx \"%s\"", t, t->name); ev->ev_occurred = TRUE; } } /* Filedescriptor Set I/O */ else if (ev->ev_type == PTH_EVENT_SELECT) { if (pth_util_fds_test(ev->ev_args.SELECT.nfd, ev->ev_args.SELECT.rfds, &rfds, ev->ev_args.SELECT.wfds, &wfds, ev->ev_args.SELECT.efds, &efds)) { n = pth_util_fds_select(ev->ev_args.SELECT.nfd, ev->ev_args.SELECT.rfds, &rfds, ev->ev_args.SELECT.wfds, &wfds, ev->ev_args.SELECT.efds, &efds); if (ev->ev_args.SELECT.n != NULL) *(ev->ev_args.SELECT.n) = n; ev->ev_occurred = TRUE; pth_debug3("pth_sched_eventmanager: " "[I/O] event occurred for thread 0x%lx \"%s\"", t, t->name); } } /* Signal Set */ else if (ev->ev_type == PTH_EVENT_SIGS) { for (sig = 1; sig < PTH_NSIG; sig++) { if (sigismember(ev->ev_args.SIGS.sigs, sig)) { if (sigismember(&descr->sigraised, sig)) { if (ev->ev_args.SIGS.sig != NULL) *(ev->ev_args.SIGS.sig) = sig; pth_debug3("pth_sched_eventmanager: " "[signal] event occurred for thread 0x%lx \"%s\"", t, t->name); sigdelset(&descr->sigraised, sig); ev->ev_occurred = TRUE; } } } } } /* * post-processing for already occured events */ else { /* Condition Variable Signal */ if (ev->ev_type == PTH_EVENT_COND) { /* clean signal */ if (ev->ev_args.COND.cond->cn_state & PTH_COND_SIGNALED) { ev->ev_args.COND.cond->cn_state &= ~(PTH_COND_SIGNALED); ev->ev_args.COND.cond->cn_state &= ~(PTH_COND_BROADCAST); ev->ev_args.COND.cond->cn_state &= ~(PTH_COND_HANDLED); } } } /* local to global mapping */ if (ev->ev_occurred) any_occurred = TRUE; } while ((ev = ev->ev_next) != evh); } /* cancellation support */ if (t->cancelreq == TRUE) { pth_debug3("pth_sched_eventmanager: cancellation request pending for thread 0x%lx \"%s\"", t, t->name); any_occurred = TRUE; } /* walk to next thread in waiting queue */ tlast = t; t = pth_pqueue_walk(&pth_WQ, t, PTH_WALK_NEXT); /* * move last thread to ready queue if any events occurred for it. * we insert it with a slightly increased queue priority to it a * better chance to immediately get scheduled, else the last running * thread might immediately get again the CPU which is usually not * what we want, because we oven use pth_yield() calls to give others * a chance. */ if (any_occurred) { pth_pqueue_delete(&pth_WQ, tlast); tlast->state = PTH_STATE_READY; pth_acquire_lock(&(pth_RQ.q_lock)); pth_pqueue_insert(&pth_RQ, tlast->prio+1, tlast); pth_release_lock(&(pth_RQ.q_lock)); pth_debug3("pth_sched_eventmanager: thread (0x%lx) \"%s\" moved from waiting " "to ready queue", tlast, tlast->name); } } pth_release_lock(&(pth_WQ.q_lock)); /* Make sure enough tasks are running to handle all the threads on the RQ and NQ */ tready = pth_pqueue_elements(&pth_RQ) + pth_pqueue_elements(&pth_NQ) + pth_pqueue_elements(&pth_DQ); for (n = 0; pth_native_list[n].is_used && tready > 1; n++) { char c = (int)1; pth_descr_t ds = &pth_native_list[n]; if (!ds->is_bound) { pth_sc(write)(ds->sigpipe[1], &c, sizeof(char)); tready--; } } /* perhaps we have to internally loop... */ if (loop_repeat) { pth_time_set(now, PTH_TIME_NOW); goto loop_entry; } pth_debug1("pth_sched_eventmanager: leaving"); return; } intern void pth_sched_eventmanager_sighandler(int sig) { char c; pth_descr_t descr = NULL; pth_debug2("pth_sched_eventmanger_sighandler: caught signal %d", sig); /* Get the thread descriptor for the running thread... */ if ((descr = pth_get_native_descr()) == NULL) { fprintf(stderr, "pth_sched_eventmanager_sighandler: no scheduler found !?!?!\n"); abort(); } /* remember raised signal */ sigaddset(&descr->sigraised, sig); /* write signal to signal pipe in order to awake the select() */ c = (int)sig; pth_sc(write)(pth_native_list[0].sigpipe[1], &c, sizeof(char)); return; }