/* ** NGPT - Next Generation POSIX Threading ** Copyright (c) 2001 IBM Corporation ** Portions Copyright (c) 1999-2000 Ralf S. Engelschall ** ** This file is part of NGPT, a non-preemptive thread scheduling ** library which can be found at http://www.ibm.com/developer. ** ** This library is free software; you can redistribute it and/or ** modify it under the terms of the GNU Lesser General Public ** License as published by the Free Software Foundation; either ** version 2.1 of the License, or (at your option) any later version. ** ** This library is distributed in the hope that it will be useful, ** but WITHOUT ANY WARRANTY; without even the implied warranty of ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ** Lesser General Public License for more details. ** ** You should have received a copy of the GNU Lesser General Public ** License along with this library; if not, write to the Free Software ** Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 ** USA. ** ** pth_lib.c: Pth main library code */ /* ``It took me fifteen years to discover I had no talent for programming, but I couldn't give it up because by that time I was too famous.'' -- Unknown */ #include "pth_p.h" /* implicit initialization support */ intern int pth_initialized = FALSE; intern int pth_initializing = TRUE; intern int pth_shutdown_inprogress = FALSE; #if cpp #define pth_implicit_init() \ if (!pth_initialized) \ pth_init(); #endif intern int pth_max_native_threads; /*ibm*/ intern int pth_number_of_natives; /*ibm*/ intern int pth_threads_per_native; /*ibm*/ /* return the hexadecimal Pth library version number */ long pth_version(void) { return PTH_VERSION; } /* special version of malloc that do mmap instead... */ intern void *pth_malloc(size_t sz) { void *memptr = NULL; memptr = mmap(NULL, sz, PROT_READ | PROT_WRITE | PROT_EXEC, MAP_PRIVATE | MAP_ANON, -1, 0); if (memptr == MAP_FAILED) return_errno(NULL, ENOMEM); return memptr; } #if cpp #define pth_free(mem)\ pth_free_mem(mem, sizeof(*(mem))); #define pth_free_bytes(mem, sz)\ pth_free_mem(mem, sz); #endif /* special version of free that do munmap instead... */ intern void pth_free_mem(void *memptr, size_t sz) { munmap(memptr, sz); } /* initialize the package */ int pth_init(void) { int slot = 0; char *c_ratio = NULL; /*ibm*/ char *c_numcpus = NULL; /*ibm*/ int cpu_based = 0; /*ibm*/ pth_attr_t t_attr; pth_descr_t descr = NULL; /* support for implicit initialization calls and to prevent multiple explict initialization, too */ if (pth_initialized) return_errno(FALSE, EPERM); pth_initialized = TRUE; pth_initializing = TRUE; pth_shutdown_inprogress = FALSE; /* Initialize the native thread list... */ for (slot = 0; slot < PTH_MAX_NATIVE_THREADS; slot++) memset(&pth_native_list[slot], 0x0, sizeof(struct pth_descr_st)); pth_debug1("pth_init: enter"); /* initialize the scheduler */ pth_scheduler_init(); /* the current descriptor... */ if ((descr = pth_get_native_descr()) == NULL) { fprintf(stderr,"pth_init: unable to retrieve initial descriptor !?!?!?\n"); abort(); } /* spawn the scheduler thread */ t_attr = pth_attr_new(); if (t_attr == NULL) { fprintf(stderr,"pth_init: unable to allocate initial attribute !?!?!?!\n"); abort(); } pth_attr_set(t_attr, PTH_ATTR_PRIO, PTH_PRIO_MAX); pth_attr_set(t_attr, PTH_ATTR_NAME, "**SCHEDULER**"); pth_attr_set(t_attr, PTH_ATTR_JOINABLE, FALSE); pth_attr_set(t_attr, PTH_ATTR_CANCEL_STATE, PTH_CANCEL_DISABLE); pth_attr_set(t_attr, PTH_ATTR_STACK_SIZE, 64*1024); pth_attr_set(t_attr, PTH_ATTR_STACK_ADDR, NULL); descr->sched = pth_spawn(t_attr, pth_scheduler, NULL); if (descr->sched == NULL) { errno_shield { pth_attr_destroy(t_attr); pth_scheduler_kill(); } return FALSE; } descr->sched->lastrannative = gettid(); /* spawn a thread for the main program */ pth_attr_set(t_attr, PTH_ATTR_PRIO, PTH_PRIO_STD); pth_attr_set(t_attr, PTH_ATTR_NAME, "main"); pth_attr_set(t_attr, PTH_ATTR_JOINABLE, TRUE); pth_attr_set(t_attr, PTH_ATTR_CANCEL_STATE, PTH_CANCEL_ENABLE|PTH_CANCEL_DEFERRED); pth_attr_set(t_attr, PTH_ATTR_STACK_SIZE, 0 /* special */); pth_attr_set(t_attr, PTH_ATTR_STACK_ADDR, NULL); pth_main = pth_spawn(t_attr, (void *(*)(void *))(-1), NULL); if (pth_main == NULL) { errno_shield { pth_attr_destroy(t_attr); pth_scheduler_kill(); } return FALSE; } pth_attr_destroy(t_attr); /* The last and only native main runs on needs to be set here */ pth_main->lastrannative = gettid(); /*begin ibm*/ pth_threads_per_native = 4; pth_max_native_threads = 0; pth_number_of_natives = 1; /* determine the number of native threads per cpu. */ c_ratio = getenv("MAXTHREADPERCPU"); if (c_ratio != NULL) { long ratio = strtol(c_ratio, (char **)NULL, 10); if (errno != ERANGE) pth_threads_per_native = (int)ratio; } /* * See if the MAXNATIVETHREADS environment variable is set. * We'll use this instead of the number of cpus if this * is set since the user wants to override the default behavior * which is based on the number of CPUs in the host. */ c_numcpus = getenv("MAXNATIVETHREADS"); if (c_numcpus != NULL) { long numcpus = strtol(c_numcpus, (char **)NULL, 10); if (errno != ERANGE) pth_max_native_threads = (int)numcpus; } /* * We check to see if we've gotten an override... * If not, we'll base it off of CPU and set a * max number of threads per cpu to 1. */ if (pth_max_native_threads == 0) { pth_max_native_threads = 2; pth_threads_per_native = 1; cpu_based = 1; } pth_debug4("pth_init: Maximum # of native threads: %i, Threshold: %i, Basis: %s", pth_max_native_threads, pth_threads_per_native, (cpu_based) ? "CPU" : "ENV"); /*end ibm*/ /* * The first time we've to manually switch into the scheduler to start * threading. Because at this time the only non-scheduler thread is the * "main thread" we will come back immediately. We've to also initialize * the pth_current variable here to allow the pth_spawn_trampoline * function to find the scheduler. */ pth_set_current(descr->sched); pth_mctx_switch(&pth_main->mctx, &descr->sched->mctx); /* Create the watchdog... */ if (pth_max_native_threads > 1) pth_new_watchdog(); /* Now we're done initializing... */ pth_initializing = FALSE; /* came back, so let's go home... */ pth_debug1("pth_init: leave"); return TRUE; } static int pth_exit_cb(void *); /* kill the package internals */ int pth_kill(void) { pth_event_t ev; char c = (int)1; pth_t current = pth_get_current(); if (current == NULL || current != pth_main) return_errno(FALSE, EPERM); pth_debug1("pth_kill: enter"); /* if this is not first native, bound to it and wakeup the select */ if (pth_native_list[0].tid != gettid()) { pth_main->boundnative = pth_native_list[0].tid; pth_sc(write)(pth_native_list[0].sigpipe[1], &c, sizeof(char)); ev = pth_event(PTH_EVENT_FUNC, pth_exit_cb); pth_debug2("pth_kill: ev = 0x%lx", ev); pth_wait(ev); pth_event_free(ev, PTH_FREE_THIS); } pth_thread_cleanup(pth_main); pth_scheduler_kill(); pth_initialized = FALSE; pth_tcb_free(pth_main); pth_number_of_natives = 0; pth_shutdown_inprogress = TRUE; pth_debug1("pth_kill: leave"); return TRUE; } /* scheduler control/query */ long pth_ctrl(unsigned long query, ...) { long rc; va_list ap; rc = 0; va_start(ap, query); if (query & PTH_CTRL_GETTHREADS) { if (query & PTH_CTRL_GETTHREADS_NEW) rc += pth_pqueue_elements(&pth_NQ); if (query & PTH_CTRL_GETTHREADS_READY) rc += pth_pqueue_elements(&pth_RQ); if (query & PTH_CTRL_GETTHREADS_RUNNING) { int slot = 0; pth_t t; pth_acquire_lock(&pth_native_lock); while (pth_native_list[slot].is_used) { if ((t = pth_native_list[slot].current) && (t != pth_native_list[slot].sched)) rc++; slot++; } pth_release_lock(&pth_native_lock); } if (query & PTH_CTRL_GETTHREADS_WAITING) rc += pth_pqueue_elements(&pth_WQ); if (query & PTH_CTRL_GETTHREADS_SUSPENDED) rc += pth_pqueue_elements(&pth_SQ); if (query & PTH_CTRL_GETTHREADS_DEAD) rc += pth_pqueue_elements(&pth_DQ); } else if (query & PTH_CTRL_GETAVLOAD) { float *pload = va_arg(ap, float *); *pload = pth_loadval; } else if (query & PTH_CTRL_GETPRIO) { pth_t t = va_arg(ap, pth_t); rc = t->prio; } else if (query & PTH_CTRL_GETNAME) { pth_t t = va_arg(ap, pth_t); rc = (long)t->name; } else if (query & PTH_CTRL_DUMPSTATE) { FILE *fp = va_arg(ap, FILE *); pth_dumpstate(fp); } else rc = -1; va_end(ap); if (rc == -1) return_errno(-1, EINVAL); return rc; } /* create a new thread of execution by spawning a cooperative thread */ static void pth_spawn_trampoline(void) { void *data; pth_debug1("pth_spawn_trampoline: enter"); /* just jump into the start routine */ data = (*pth_get_current()->start_func)(pth_get_current()->start_arg); /* and do an implicit exit of the tread with the result value */ pth_exit(data); /* no return! */ abort(); } #if cpp #define pth_active_threads() \ ((pth_pqueue_elements(&pth_NQ)) + \ (pth_pqueue_elements(&pth_RQ)) + \ (pth_pqueue_elements(&pth_WQ)) + \ (pth_pqueue_elements(&pth_SQ))) #endif pth_t pth_spawn(pth_attr_t attr, void *(*func)(void *), void *arg) { pth_t t; int create_suspended = 0; /*ibm*/ pth_pqueue_t *pth_CQ = &pth_NQ; /*ibm*/ unsigned int stacksize; void *stackaddr; pth_time_t ts; pth_t current = NULL; pth_debug1("pth_spawn: enter"); /* In certain cases, we need the current thread... */ current = pth_get_current(); /* consistency */ if (func == NULL) return_errno(NULL, EINVAL); /* support the special case of main() */ if (func == (void *(*)(void *))(-1)) func = NULL; /* allocate a new thread control block */ stacksize = (attr == PTH_ATTR_DEFAULT ? 128*1024 : attr->a_stacksize); /*ibm*/ stackaddr = (attr == PTH_ATTR_DEFAULT ? NULL : attr->a_stackaddr); if ((t = pth_tcb_alloc(stacksize, stackaddr)) == NULL) return NULL; /* errno is inherited */ /* configure remaining attributes */ if (attr != PTH_ATTR_DEFAULT) { /* overtake fields from the attribute structure */ t->prio = attr->a_prio; t->joinable = attr->a_joinable; t->cancelstate = attr->a_cancelstate; create_suspended = attr->a_suspendstate; /*ibm*/ pth_util_cpystrn(t->name, attr->a_name, PTH_TCB_NAMELEN); } else if (current != NULL) { /* overtake some fields from the parent thread */ t->prio = current->prio; t->joinable = current->joinable; t->cancelstate = current->cancelstate; pth_snprintf(t->name, PTH_TCB_NAMELEN, "%s.child@%d=0x%lx", current->name, (unsigned int)time(NULL), (unsigned long)current); } else { /* defaults */ t->prio = PTH_PRIO_STD; t->joinable = TRUE; t->cancelstate = PTH_CANCEL_DEFAULT; pth_snprintf(t->name, PTH_TCB_NAMELEN, "user/%x", (unsigned int)time(NULL)); } /* initialize the time points and ranges */ pth_time_set(&ts, PTH_TIME_NOW); pth_time_set(&t->spawned, &ts); pth_time_set(&t->lastran, &ts); pth_time_set(&t->running, PTH_TIME_ZERO); /* initialize events */ t->events = NULL; /* clear raised signals */ sigemptyset(&t->sigpending); t->sigpendcnt = 0; /* remember the start routine and arguments for our trampoline */ t->start_func = func; t->start_arg = arg; /* initialize join argument */ t->join_arg = NULL; /* initialize thread specific storage */ t->data_value = NULL; t->data_count = 0; /* initialize cancellaton stuff */ t->cancelreq = FALSE; t->cleanups = NULL; /* initialize mutex stuff */ pth_ring_init(&t->mutexring); /* initialize the machine context of this new thread */ if (t->stacksize > 0) { /* the "main thread" (indicated by == 0) is special! */ if (!pth_mctx_set(&t->mctx, pth_spawn_trampoline, t->stack, ((char *)t->stack+t->stacksize))) { errno_shield { pth_tcb_free(t); } return NULL; } } /* set the signal mask for this context... */ pth_sc(sigprocmask)(SIG_SETMASK, NULL, &(t->mctx.sigs)); /*begin ibm*/ /* if we're creating suspended, we need to put it on the "suspend queue" * instead of the "new queue". */ if (create_suspended == TRUE) pth_CQ = &pth_SQ; /*end ibm*/ /* finally insert it into the "new queue" where the scheduler will pick it up for dispatching */ if (func != pth_scheduler) { t->state = PTH_STATE_NEW; pth_acquire_lock(&(pth_CQ->q_lock)); pth_pqueue_insert(pth_CQ, t->prio, t); /*ibm*/ pth_release_lock(&(pth_CQ->q_lock)); } /*begin ibm*/ /* * Check to see if we're allowed to create additional native * threads and we've reached the threshold... */ if ( pth_max_native_threads > 1 && (pth_active_threads() > 1) && (((pth_active_threads()) % pth_threads_per_native) == 0) && current != NULL) { // (pth_primordial_thread()->tid == gettid()) && current != NULL) { /* * We are, now check to see if we've reached the max number of natives an' * we've reached the threshold... */ if (pth_number_of_natives < pth_max_native_threads && pth_number_of_natives < pth_active_threads()) { /* * We're not yet at the maximum number of natives so it's time * to create another native thread and start scheduling on it. */ pth_debug3("pth_spawn: Active natives (%i) less than allowed natives (%i), spawning new native.", pth_number_of_natives, pth_max_native_threads); pth_new_native(); pth_debug2("pth_spawn: New native thread created, number of active native thread is %i.", pth_number_of_natives); } } /*end ibm*/ pth_debug1("pth_spawn: leave"); /* the returned thread id is just the pointer to the thread control block... */ return t; } /* returns the current thread */ pth_t pth_self(void) { return pth_get_current(); } /*begin ibm*/ /* return specified thread's stacksize */ int pth_getstacksize(pth_t t, int *size) { /* First, a quick check for a null... */ if (size == NULL || t == NULL) return_errno(FALSE, EINVAL); /* Now make sure the thread exists... */ if (pth_thread_exists(t) == FALSE) return_errno(-1, ESRCH); /* Update the size... */ *size = t->stacksize; return t->stacksize; } /* return the specified thread's context */ void * pth_getcontext(pth_t t) { #if PTH_MCTX_MTH(sjlj) &&\ !PTH_MCTX_DSP(sjljlx) &&\ !PTH_MCTX_DSP(sjljisc) &&\ !PTH_MCTX_DSP(sjljw32) /* First, a quick check for a null... */ if (t == NULL) return_errno(NULL, EINVAL); /* * Check to see if it is the current thread * and if so, update the context... */ if (t == pth_get_current()) pth_sigsetjmp(t->mctx.jb); /* Now make sure the thread exists... */ else if (pth_thread_exists(t) == FALSE) return_errno(NULL, ESRCH); return (void *)(t->mctx.jb); #else return_errno(NULL, ENOSYS); #endif } /* return the current location of errno */ int * pth_geterrno(void) { if (pth_get_current() == NULL) { pth_errno_storage = 0; return &pth_errno_storage; } return &(pth_get_current()->mctx.error); } /* bind the current thread to the last native thread it ran on */ int pth_bindtonative(pth_t t) { /* If we're running M:1 mode, just return... */ if (pth_max_native_threads == 1) return TRUE; /* Can't bind a null thread... */ if (t == NULL) return_errno(FALSE, EINVAL); t->boundnative = t->lastrannative; /* * So we don't lose a native for scheduling purposes, * we reduce the active number of native threads when * we bind a user level thread to a native. */ pth_number_of_natives--; return TRUE; } /*end ibm*/ /* raise a signal for a thread */ int pth_raise(pth_t t, int sig) { struct sigaction sa; if (t == NULL || t == pth_get_current() || (sig < 0 || sig > PTH_NSIG)) return_errno(FALSE, EINVAL); if (sig == 0) /* just test whether thread exists */ return pth_thread_exists(t); else { /* raise signal for thread */ if (sigaction(sig, NULL, &sa) != 0) return FALSE; if (sa.sa_handler == SIG_IGN) return TRUE; /* fine, nothing to do, sig is globally ignored */ if (!sigismember(&t->sigpending, sig)) { sigaddset(&t->sigpending, sig); t->sigpendcnt++; } pth_yield(t); return TRUE; } } /* Get the current thread for the current native. */ intern pth_t pth_get_current(void) { pth_descr_t descr = NULL; if ((descr = pth_get_native_descr()) == NULL) return_errno_noerr(NULL, ESRCH); return descr->current; } /* Set the current thread for a the current native. */ intern pth_t pth_set_current(pth_t new_current) { int slot = 0; pid_t tid = gettid(); pth_acquire_lock(&pth_native_lock); while (pth_native_list[slot].is_used) { if (pth_native_list[slot].tid == tid) { pth_native_list[slot].current = new_current; pth_release_lock(&pth_native_lock); return new_current; } slot++; } pth_release_lock(&pth_native_lock); return NULL; } /* Get the current native thread info */ intern pth_descr_t pth_get_native_descr(void) { int slot = 0; pid_t tid = gettid(); pth_acquire_lock(&pth_native_lock); while (pth_native_list[slot].is_used) { if (pth_native_list[slot].tid == tid) { pth_release_lock(&pth_native_lock); pth_assert(pth_native_list[slot].sched_index == slot); return &pth_native_list[slot]; } slot++; } pth_release_lock(&pth_native_lock); return NULL; } /* Get the primordial thread descriptor */ intern pth_descr_t pth_primordial_thread(void) { if (pth_native_list[0].is_used == FALSE) return_errno(NULL, ESRCH); return &pth_native_list[0]; } /* check whether a thread exists */ intern int pth_thread_exists(pth_t t) { if (!pth_pqueue_contains(&pth_NQ, t)) if (!pth_pqueue_contains(&pth_RQ, t)) if (!pth_pqueue_contains(&pth_WQ, t)) if (!pth_pqueue_contains(&pth_SQ, t)) if (!pth_pqueue_contains(&pth_DQ, t)) return_errno(FALSE, ESRCH); /* not found */ return TRUE; } /* cleanup a particular thread */ intern void pth_thread_cleanup(pth_t thread) { /* run the cleanup handlers */ if (thread->cleanups != NULL) pth_cleanup_popall(thread, TRUE); /* release still acquired mutex variables */ pth_mutex_releaseall(thread); return; } /* terminates the current thread */ static int pth_exit_cb(void *arg) { int rc; /* NOTICE: THIS FUNCTION EXECUTES FROM WITHIN THE SCHEDULER THREAD! */ rc = 0; rc += pth_pqueue_elements(&pth_NQ); rc += pth_pqueue_elements(&pth_RQ); rc += pth_pqueue_elements(&pth_WQ); rc += pth_pqueue_elements(&pth_SQ); rc += pth_pqueue_elements(&pth_DQ); if (rc == 1 /* just our main thread */) return TRUE; else return FALSE; } intern void pth_exit_wrapper(int code) { char c = (int)1; pth_kill(); if (pth_native_list[0].tid != gettid()) pth_sc(write)(pth_native_list[0].sigpipe[1], &c, sizeof(char)); _exit(code); } void pth_exit(void *value) { pth_event_t ev; pth_t current = pth_get_current(); pth_t this_sched = pth_get_native_descr()->sched; pth_debug3("pth_exit: marking thread 0x%lx \"%s\" as dead", current, current->name); /* main thread is special: wait until it is the last thread */ if (current == pth_main) { char c = (int)1; /* if this is not first native, bound to it and wakeup select */ if (pth_native_list[0].tid != gettid()) { pth_main->boundnative = pth_native_list[0].tid; pth_sc(write)(pth_native_list[0].sigpipe[1], &c, sizeof(char)); } ev = pth_event(PTH_EVENT_FUNC, pth_exit_cb); pth_wait(ev); pth_event_free(ev, PTH_FREE_THIS); } /* execute cleanups */ pth_thread_cleanup(current); /* mark the current thread as dead, so the scheduler removes us */ current->join_arg = value; current->state = PTH_STATE_DEAD; if (current != pth_main) { /* * Now we explicitly switch into the scheduler and let it * reap the current thread structure; we can't free it here, * or we'd be running on a stack which malloc() regards as * free memory, which would be a somewhat perilous situation. */ pth_debug3("pth_exit: switching from thread 0x%lx \"%s\" to scheduler", current, current->name); pth_mctx_switch(¤t->mctx, &this_sched->mctx); abort(); /* not reached! */ } else { /* * main thread is special: exit the _process_ * [double-cast to avoid warnings because of size] */ pth_kill(); exit((int)((long)value)); abort(); /* not reached! */ } } /* waits for the termination of the specified thread */ int pth_join(pth_t tid, void **value) { pth_event_t ev; static pth_key_t ev_key = PTH_KEY_INIT; pth_debug3("pth_join: joining thread 0x%lx \"%s\"", tid, tid == NULL ? "-ANY-" : tid->name); if (tid == pth_get_current()) return_errno(FALSE, EDEADLK); if (tid != NULL && !tid->joinable) return_errno(FALSE, EINVAL); if (pth_ctrl(PTH_CTRL_GETTHREADS) == 1) return_errno(FALSE, EDEADLK); if (tid == NULL) tid = pth_pqueue_head(&pth_DQ); if (tid == NULL || (tid != NULL && tid->state != PTH_STATE_DEAD)) { ev = pth_event(PTH_EVENT_TID|PTH_UNTIL_TID_DEAD|PTH_MODE_STATIC, &ev_key, tid); pth_wait(ev); } if (tid == NULL) tid = pth_pqueue_head(&pth_DQ); if (tid == NULL || (tid != NULL && tid->state != PTH_STATE_DEAD)) return_errno(FALSE, EIO); if (value != NULL) *value = tid->join_arg; pth_acquire_lock(&(pth_DQ.q_lock)); pth_pqueue_delete(&pth_DQ, tid); pth_tcb_free(tid); pth_release_lock(&(pth_DQ.q_lock)); return TRUE; } /* delegates control back to scheduler for context switches */ int pth_yield(pth_t to) { pth_pqueue_t *q = NULL; pth_t current = pth_get_current(); pth_debug3("pth_yield: enter from thread 0x%lx \"%s\"", current, current->name); /* a given thread has to be new or ready or we ignore the request */ if (to != NULL) { switch (to->state) { case PTH_STATE_NEW: q = &pth_NQ; break; case PTH_STATE_READY: q = &pth_RQ; break; default: q = NULL; } if (q == NULL || !pth_pqueue_contains(q, to)) return_errno(FALSE, EINVAL); } /* give a favored thread maximum priority in his queue */ if (to != NULL && q != NULL) { pth_acquire_lock(&(q->q_lock)); pth_pqueue_favorite(q, to); pth_acquire_lock(&(q->q_lock)); } /* switch to scheduler */ if (to != NULL) pth_debug3("pth_yield: give up control to scheduler " "in favour of thread 0x%lx \"%s\"", to, to->name); else pth_debug2("pth_yield: 0x%lx give up control to scheduler", current); pth_mctx_switch(¤t->mctx, &pth_get_native_descr()->sched->mctx); pth_debug2("pth_yield: 0x%lx got back control from scheduler", current); pth_debug3("pth_yield: leave to thread 0x%lx \"%s\"", current, current->name); return TRUE; } /* suspend a thread until its again manually resumed */ int pth_suspend(pth_t t) { pth_pqueue_t *q; if (t == NULL) return_errno(FALSE, EINVAL); if (t == pth_get_current()) return_errno(FALSE, EPERM); if (pth_get_native_descr()->sched == t) return_errno(FALSE, EPERM); switch (t->state) { case PTH_STATE_NEW: q = &pth_NQ; break; case PTH_STATE_READY: q = &pth_RQ; break; case PTH_STATE_WAITING: q = &pth_WQ; break; default: q = NULL; } if (q == NULL) return_errno(FALSE, EPERM); if (!pth_pqueue_contains(q, t)) return_errno(FALSE, ESRCH); pth_acquire_lock(&(q->q_lock)); pth_acquire_lock(&(pth_SQ.q_lock)); pth_pqueue_delete(q, t); pth_pqueue_insert(&pth_SQ, PTH_PRIO_STD, t); pth_release_lock(&(pth_SQ.q_lock)); pth_release_lock(&(q->q_lock)); pth_debug3("pth_suspend: suspending thread 0x%lx %s\n", t, t->name); return TRUE; } /* resume a previously suspended thread */ int pth_resume(pth_t t) { pth_pqueue_t *q = NULL; if (t == NULL) return_errno(FALSE, EINVAL); if (t == pth_get_current()) return_errno(FALSE, EPERM); if (pth_get_native_descr()->sched == t) return_errno(FALSE, EPERM); if (!pth_pqueue_contains(&pth_SQ, t)) return_errno(FALSE, EPERM); pth_acquire_lock(&(pth_SQ.q_lock)); pth_pqueue_delete(&pth_SQ, t); switch (t->state) { case PTH_STATE_NEW: q = &pth_NQ; break; case PTH_STATE_READY: q = &pth_RQ; break; case PTH_STATE_WAITING: q = &pth_WQ; break; default: q = NULL; } pth_acquire_lock(&(q->q_lock)); pth_pqueue_insert(q, PTH_PRIO_STD, t); pth_release_lock(&(q->q_lock)); pth_release_lock(&(pth_SQ.q_lock)); pth_debug3("pth_resume: resuming thread 0x%lx %s\n", t, t->name); pth_yield(t); /*ibm*/ return TRUE; } /* switch a filedescriptor's I/O mode */ int pth_fdmode(int fd, int newmode) { int fdmode; int oldmode; /* retrieve old mode (usually cheap) */ if ((fdmode = fcntl(fd, F_GETFL, NULL)) == -1) oldmode = PTH_FDMODE_ERROR; else if (fdmode & O_NONBLOCKING) oldmode = PTH_FDMODE_NONBLOCK; else oldmode = PTH_FDMODE_BLOCK; /* set new mode (usually expensive) */ if (oldmode == PTH_FDMODE_BLOCK && newmode == PTH_FDMODE_NONBLOCK) fcntl(fd, F_SETFL, (fdmode | O_NONBLOCKING)); if (oldmode == PTH_FDMODE_NONBLOCK && newmode == PTH_FDMODE_BLOCK) fcntl(fd, F_SETFL, (fdmode & ~(O_NONBLOCKING))); /* return old mode */ return oldmode; } /* wait for specific amount of time */ int pth_nap(pth_time_t naptime) { pth_time_t until; pth_event_t ev; static pth_key_t ev_key = PTH_KEY_INIT; if (pth_time_cmp(&naptime, PTH_TIME_ZERO) == 0) return_errno(FALSE, EINVAL); pth_time_set(&until, PTH_TIME_NOW); pth_time_add(&until, &naptime); ev = pth_event(PTH_EVENT_TIME|PTH_MODE_STATIC, &ev_key, until); pth_wait(ev); return TRUE; } /* runs a constructor once */ int pth_once(pth_once_t *oncectrl, void (*constructor)(void *), void *arg) { if (oncectrl == NULL || constructor == NULL) return_errno(FALSE, EINVAL); if (*oncectrl != TRUE) constructor(arg); *oncectrl = TRUE; return TRUE; }