#include <9pm/u.h> #include <9pm/libc.h> #include <9pm/ns.h> #include <9pm/thread.h> #include <9pm/threadimpl.h> Tqueue _threndez; Lock _threndezlock; ulong _threadrendezvous(ulong tag, ulong value) { Tproc *p; Thread *this, *that, *new; ulong v, t; p = _threadgetproc(); this = p->curthread; assert(this != nil); lock(&_threndezlock); _threaddebug(DBGREND, "rendezvous tag %lux", tag); /* find a thread waiting in a rendezvous on tag */ that = _threadgetqbytag(&_threndez, tag); /* if a thread in same or another proc waiting, rendezvous */ if (that) { _threaddebug(DBGREND, "waiting proc is %lud.%d", that->proc->pid, that->id); threadassert(that->state == Rendezvous); /* exchange values */ v = that->value; that->value = value; /* remove from rendez-vous queue */ that->state = Runnable; threadassert(this != nil); threadassert(p->curthread == this); threadassert(_threadgetproc() == p); if (that->proc->blocked) { threadassert(that->proc != p); that->proc->blocked = 0; _threaddebug(DBGREND, "unblocking rendezvous, tag = %lux", (ulong)that->proc); unlock(&_threndezlock); /* `Non-blocking' rendez-vous */ while (rendezvous((ulong)that->proc, (ulong)that) == ~0) { _threaddebug(DBGNOTE|DBGTHRD, "interrupted"); if (this->exiting) { _threaddebug(DBGNOTE|DBGTHRD, "and committing suicide"); threadexits(nil); } } } else { _threaddebug(DBGREND, "enqueueing"); _threadputq(&that->proc->runnable, that); unlock(&_threndezlock); } threadassert(this != nil); threadassert(p->curthread == this); threadassert(_threadgetproc() == p); yield(); return v; } _threaddebug(DBGREND, "blocking"); /* Mark this thread waiting */ this->value = value; this->state = Rendezvous; this->tag = tag; _threadputq(&_threndez, this); /* Look for runnable threads */ new = _threadgetq(&p->runnable); if (new == nil) { /* No other thread runnable, rendezvous */ p->blocked = 1; _threaddebug(DBGREND, "blocking rendezvous, tag = %lux", (ulong)p); unlock(&_threndezlock); while ((new = (Thread *)rendezvous((ulong)p, 0)) == (Thread *)~0) { _threaddebug(DBGNOTE|DBGTHRD, "interrupted"); if (this->exiting) { _threaddebug(DBGNOTE|DBGTHRD, "and committing suicide"); threadexits(nil); } } threadassert(!p->blocked); threadassert(new->proc == p); if (new == this) { this->state = Running; if (this->exiting) threadexits(nil); return this->value; } _threaddebug(DBGREND, "after rendezvous, new thread is %lux, id %d", new, new->id); } else { unlock(&_threndezlock); } threadassert(new != nil); threadassert(new->proc != nil); _threaddebug(DBGREND|DBGTHRD, "Scheduling %lud.%d", new->proc->pid, new->id); p->curthread = new; new->state = Running; t = _threadswtch(this->env, new->env, ~0); if (t != ~0) _freethread((Thread *)t); _threaddebug(DBGREND, "unblocking"); threadassert(p->curthread->next == (Thread *)~0); this->state = Running; if (p->curthread->exiting) { _threaddebug(DBGKILL, "Exiting after rendezvous"); threadexits(nil); } return this->value; }