#include <9pm/u.h>
#include <9pm/libc.h>
#include <9pm/ns.h>
#include <9pm/thread.h>
#include <9pm/threadimpl.h>
Tqueue _threndez;
Lock _threndezlock;
ulong
_threadrendezvous(ulong tag, ulong value)
{
Tproc *p;
Thread *this, *that, *new;
ulong v, t;
p = _threadgetproc();
this = p->curthread;
assert(this != nil);
lock(&_threndezlock);
_threaddebug(DBGREND, "rendezvous tag %lux", tag);
/* find a thread waiting in a rendezvous on tag */
that = _threadgetqbytag(&_threndez, tag);
/* if a thread in same or another proc waiting, rendezvous */
if (that) {
_threaddebug(DBGREND, "waiting proc is %lud.%d", that->proc->pid, that->id);
threadassert(that->state == Rendezvous);
/* exchange values */
v = that->value;
that->value = value;
/* remove from rendez-vous queue */
that->state = Runnable;
threadassert(this != nil);
threadassert(p->curthread == this);
threadassert(_threadgetproc() == p);
if (that->proc->blocked) {
threadassert(that->proc != p);
that->proc->blocked = 0;
_threaddebug(DBGREND, "unblocking rendezvous, tag = %lux", (ulong)that->proc);
unlock(&_threndezlock);
/* `Non-blocking' rendez-vous */
while (rendezvous((ulong)that->proc, (ulong)that) == ~0) {
_threaddebug(DBGNOTE|DBGTHRD, "interrupted");
if (this->exiting) {
_threaddebug(DBGNOTE|DBGTHRD, "and committing suicide");
threadexits(nil);
}
}
} else {
_threaddebug(DBGREND, "enqueueing");
_threadputq(&that->proc->runnable, that);
unlock(&_threndezlock);
}
threadassert(this != nil);
threadassert(p->curthread == this);
threadassert(_threadgetproc() == p);
yield();
return v;
}
_threaddebug(DBGREND, "blocking");
/* Mark this thread waiting */
this->value = value;
this->state = Rendezvous;
this->tag = tag;
_threadputq(&_threndez, this);
/* Look for runnable threads */
new = _threadgetq(&p->runnable);
if (new == nil) {
/* No other thread runnable, rendezvous */
p->blocked = 1;
_threaddebug(DBGREND, "blocking rendezvous, tag = %lux", (ulong)p);
unlock(&_threndezlock);
while ((new = (Thread *)rendezvous((ulong)p, 0)) == (Thread *)~0) {
_threaddebug(DBGNOTE|DBGTHRD, "interrupted");
if (this->exiting) {
_threaddebug(DBGNOTE|DBGTHRD, "and committing suicide");
threadexits(nil);
}
}
threadassert(!p->blocked);
threadassert(new->proc == p);
if (new == this) {
this->state = Running;
if (this->exiting)
threadexits(nil);
return this->value;
}
_threaddebug(DBGREND, "after rendezvous, new thread is %lux, id %d", new, new->id);
} else {
unlock(&_threndezlock);
}
threadassert(new != nil);
threadassert(new->proc != nil);
_threaddebug(DBGREND|DBGTHRD, "Scheduling %lud.%d", new->proc->pid, new->id);
p->curthread = new;
new->state = Running;
t = _threadswtch(this->env, new->env, ~0);
if (t != ~0)
_freethread((Thread *)t);
_threaddebug(DBGREND, "unblocking");
threadassert(p->curthread->next == (Thread *)~0);
this->state = Running;
if (p->curthread->exiting) {
_threaddebug(DBGKILL, "Exiting after rendezvous");
threadexits(nil);
}
return this->value;
}
syntax highlighted by Code2HTML, v. 0.9.1