#include <9pm/u.h>
#include <9pm/libc.h>
#include <9pm/thread.h>
#include "assert.h"
static void initproc(void (*)(ulong, int, char*[]), int, char*[], uint);
static void garbagethread(Thread *t);
static Thread* getqbytag(Tqueue *, ulong);
static void putq(Tqueue *, Thread *);
static Thread* getq(Tqueue *);
static void launcher(ulong, void (*)(void *), void *);
static void mainlauncher(ulong, int argc, char *argv[]);
static void garbageproc(Proc *);
static Proc* prepproc(Newproc *);
static long _times(long *t);
static int notefd;
static int passerpid;
static long totalmalloc;
static Thread* dontkill;
static Tqueue rendez;
static Lock rendezlock;
int mainstacksize;
Channel* thewaitchan;
struct Pqueue pq;
Proc **procp; /* Pointer to pointer to proc's Proc structure */
void
threadsysfatal(char *fmt, va_list arg)
{
char buf[1024];
doprint(buf, buf+sizeof(buf), fmt, arg);
if(argv0)
threadprint(2, "%s: %s\n", argv0, buf);
else
threadprint(2, "%s\n", buf);
threadexitsall(buf);
}
static int
nextID(void)
{
static Lock l;
static id;
int i;
lock(&l);
i = ++id;
unlock(&l);
return i;
}
void
threadexits(char *exitstr)
{
Thread *t, *new;
Proc *p;
Channel *c;
_threaddebug(DBGTHRD|DBGKILL, "Exitthread()");
p = *procp;
t = p->curthread;
if (p->nthreads > 1) {
t->exiting = 1;
p->nthreads--;
lock(&rendezlock);
while ((new = getq(&p->runnable)) == nil) {
_threaddebug(DBGTHRD, "Nothing left to run");
/* called with rendezlock held */
p->blocked = 1;
unlock(&rendezlock);
if ((new = (Thread *)rendezvous((ulong)p, 0)) != (Thread *)~0) {
threadassert(!p->blocked);
threadassert(new->proc == p);
p->curthread = new;
new->state = Running;
/* switch to new thread, pass it `t' for cleanup */
new->garbage = t;
longjmp(new->env, (int)t);
/* No return */
}
_threaddebug(DBGNOTE|DBGTHRD, "interrupted");
lock(&rendezlock);
}
unlock(&rendezlock);
_threaddebug(DBGTHRD, "Yield atexit to %d.%d", p->pid, new->id);
p->curthread = new;
new->state = Running;
if (new->exiting)
threadexits(nil);
/* switch to new thread, pass it `t' for cleanup */
new->garbage = t;
longjmp(new->env, (int)t);
/* no return */
}
/*
* thewaitchan confounds exiting the entire program, so handle it
* carefully; store exposed global in local variable c.
*/
if ((c = thewaitchan) != nil) {
Waitmsg *w;
long t[4];
/* create a Waitmsg out of whole cloth; this proc was created RFNOWAIT */
w = _threadmalloc(sizeof(Waitmsg)+(exitstr==nil? 0 : strlen(exitstr)+1), 0);
w->pid = p->pid;
w->time[2] = _times(t); /* real */
w->time[0] = t[0]+t[2]; /* user + child user */
w->time[1] = t[1]+t[3]; /* sys + child sys */
w->msg = "";
if(exitstr != nil){
w->msg = (char*)(&w[1]);
strcpy(w->msg, exitstr);
}
_threaddebug(DBGCHLD, "In thread %s: sending exit status %s for %d\n",
p->curthread->cmdname, exitstr, p->pid);
sendp(c, w);
}
_threaddebug(DBGPROC, "Exiting\n");
t->exiting = 1;
if(exitstr == nil)
p->str[0] = '\0';
else
strncpy(p->str, exitstr, sizeof(p->str));
p->nthreads--;
/* Clean up and exit */
longjmp(p->oldenv, DOEXIT);
}
static Thread *
threadof(int id)
{
Tproc *pp;
Thread *t;
lock(&pq.lock);
for (pp = pq.head; pp->next; pp = pp->next)
for (t = pp->threads.head; t; t = t->nextt)
if (t->id == id) {
unlock(&pq.lock);
return t;
}
unlock(&pq.lock);
return nil;
}
int
threadid(void)
{
Tproc *p;
p = getproc()->threadpriv;
return p->curthread->id;
}
static char*
skip(char *p)
{
while(*p == ' ')
p++;
while(*p != ' ' && *p != 0)
p++;
return p;
}
int
threadpid(int id)
{
Tproc *pp;
Thread *t;
if (id < 0)
return id;
if (id == 0){
pp = getproc()->threadpriv;
return pp->pid;
}
lock(&pq.lock);
for (pp = pq.head; pp->next; pp = pp->next)
for (t = pp->threads.head; t; t = t->nextt)
if (t->id == id) {
unlock(&pq.lock);
return pp->pid;
}
unlock(&pq.lock);
return -1;
}
void
yield(void)
{
Thread *new, *t;
Proc *p;
ulong thr;
p = *procp;
t = p->curthread;
if (t->exiting) {
_threaddebug(DBGTHRD|DBGKILL, "Exiting in yield()");
threadexits(nil); /* no return */
}
if ((new = getq(&p->runnable)) == nil) {
_threaddebug(DBGTHRD, "Nothing to yield for");
return; /* Nothing to yield for */
}
if ((thr = setjmp(p->curthread->env))) {
if (thr != ~0)
garbagethread((Thread *)thr);
if ((*procp)->curthread->exiting)
threadexits(nil);
return; /* Return from yielding */
}
putq(&p->runnable, p->curthread);
p->curthread->state = Runnable;
_threaddebug(DBGTHRD, "Yielding to %d.%d", p->pid, new->id);
p->curthread = new;
new->state = Running;
longjmp(new->env, ~0);
/* no return */
}
ulong
_threadrendezvous(ulong tag, ulong value)
{
Proc *p;
Thread *this, *that, *new;
ulong v, t;
p = *procp;
this = p->curthread;
lock(&rendezlock);
_threaddebug(DBGREND, "rendezvous tag %lux", tag);
/* find a thread waiting in a rendezvous on tag */
that = getqbytag(&rendez, tag);
/* if a thread in same or another proc waiting, rendezvous */
if (that) {
_threaddebug(DBGREND, "waiting proc is %lud.%d", that->proc->pid, that->id);
threadassert(that->state == Rendezvous);
/* exchange values */
v = that->value;
that->value = value;
/* remove from rendez-vous queue */
that->state = Runnable;
if (that->proc->blocked) {
threadassert(that->proc != p);
that->proc->blocked = 0;
_threaddebug(DBGREND, "unblocking rendezvous, tag = %lux", (ulong)that->proc);
unlock(&rendezlock);
/* `Non-blocking' rendez-vous */
while (rendezvous((ulong)that->proc, (ulong)that) == ~0) {
_threaddebug(DBGNOTE|DBGTHRD, "interrupted");
if (this->exiting) {
_threaddebug(DBGNOTE|DBGTHRD, "and committing suicide");
threadexits(nil);
}
}
} else {
putq(&that->proc->runnable, that);
unlock(&rendezlock);
}
yield();
return v;
}
_threaddebug(DBGREND, "blocking");
/* Mark this thread waiting */
this->value = value;
this->state = Rendezvous;
this->tag = tag;
putq(&rendez, this);
/* Look for runnable threads */
new = getq(&p->runnable);
if (new == nil) {
/* No other thread runnable, rendezvous */
p->blocked = 1;
_threaddebug(DBGREND, "blocking rendezvous, tag = %lux", (ulong)p);
unlock(&rendezlock);
while ((new = (Thread *)rendezvous((ulong)p, 0)) == (Thread *)~0) {
_threaddebug(DBGNOTE|DBGTHRD, "interrupted");
if (this->exiting) {
_threaddebug(DBGNOTE|DBGTHRD, "and committing suicide");
threadexits(nil);
}
}
threadassert(!p->blocked);
threadassert(new->proc == p);
if (new == this) {
this->state = Running;
if (this->exiting)
threadexits(nil);
return this->value;
}
_threaddebug(DBGREND, "after rendezvous, new thread is %lux, id %d", new, new->id);
} else {
unlock(&rendezlock);
}
if ((t = setjmp(p->curthread->env))) {
if (t != ~0)
garbagethread((Thread *)t);
_threaddebug(DBGREND, "unblocking");
threadassert(p->curthread->next == (Thread *)~0);
this->state = Running;
if (p->curthread->exiting) {
_threaddebug(DBGKILL, "Exiting after rendezvous");
threadexits(nil);
}
return this->value;
}
_threaddebug(DBGREND|DBGTHRD, "Scheduling %lud.%d", new->proc->pid, new->id);
p->curthread = new;
new->state = Running;
longjmp(new->env, ~0);
/* no return */
return ~0; /* Not called */
}
void
threadsetname(char *name)
{
Thread *t = (*procp)->curthread;
if (t->cmdname)
free(t->cmdname);
t->cmdname = strdup(name);
}
char *threadgetname(void)
{
return (*procp)->curthread->cmdname;
}
ulong*
procdata(void) {
return &(*procp)->udata;
}
ulong*
threaddata(void)
{
return &(*procp)->curthread->udata;
}
int
procrfork(void (*f)(void *), void *arg, uint stacksize, int rforkflag)
{
Newproc *np;
Proc *p;
int id;
ulong *tos;
p = *procp;
/* Save old stack position */
if ((id = setjmp(p->curthread->env))) {
_threaddebug(DBGPROC, "newproc, return\n");
return id; /* Return with pid of new proc */
}
np = _threadmalloc(sizeof(Newproc), 0);
threadassert(np != nil);
_threaddebug(DBGPROC, "newproc, creating stack\n");
/* Create a stack */
np->stack = _threadmalloc(stacksize, 0);
threadassert(np->stack != nil);
memset(np->stack, 0xFE, stacksize);
tos = (ulong *)(&np->stack[stacksize&(~7)]);
FIX1;
*--tos = (ulong)arg;
*--tos = (ulong)f;
FIX2;
np->stacksize = stacksize;
np->stackptr = tos;
np->grp = p->curthread->grp;
np->launcher = (long)launcher;
np->rforkflag = rforkflag;
_threaddebug(DBGPROC, "newproc, switch stacks\n");
/* Goto unshared stack and fire up new process */
p->arg = np;
longjmp(p->oldenv, DOPROC);
/* no return */
return -1;
}
int
proccreate(void (*f)(void*), void *arg, uint stacksize)
{
return procrfork(f, arg, stacksize, 0);
}
int
threadcreate(void (*f)(void *arg), void *arg, uint stacksize)
{
Thread *child;
ulong *tos;
Proc *p;
p = *procp;
if (stacksize < 32) {
werrstr("%s", "stacksize");
return -1;
}
if ((child = _threadmalloc(sizeof(Thread), 0)) == nil ||
(child->stk = _threadmalloc(child->stksize = stacksize, 0)) == nil) {
if (child) free(child);
werrstr("%s", "_threadmalloc");
return -1;
}
memset(child->stk, 0xFE, stacksize);
p->nthreads++;
child->cmdname = nil;
child->id = nextID();
child->proc = p;
tos = (ulong *)(&child->stk[stacksize&~7]);
FIX1;
*--tos = (ulong)arg;
*--tos = (ulong)f;
FIX2; /* Insert a dummy argument on 386 */
child->env[JMPBUFPC] = ((ulong)launcher+JMPBUFDPC);
/* -STACKOFF leaves room for old pc and new pc in frame */
child->env[JMPBUFSP] = (ulong)(tos - STACKOFF);
child->state = Runnable;
child->exiting = 0;
child->nextt = nil;
if (p->threads.head == nil) {
p->threads.head = p->threads.tail = child;
} else {
p->threads.tail->nextt = child;
p->threads.tail = child;
}
child->next = (Thread *)~0;
child->garbage = nil;
putq(&p->runnable, child);
return child->id;
}
void
procexecl(Channel *pidc, char *f, ...)
{
procexec(pidc, f, &f+1);
}
void
procexec(Channel *pidc, char *f, char *args[])
{
Proc *p;
Dir *d;
int n, pid;
Execproc *ep;
Channel *c;
char *q;
int s, l;
/* make sure exec is likely to succeed before tossing thread state */
d = dirstat(f);
if(d == nil || (d->mode & DMDIR) || access(f, AEXEC) < 0) {
free(d);
bad: if (pidc) sendul(pidc, ~0);
return;
}
free(d);
p = *procp;
if(p->threads.head != p->curthread || p->threads.head->nextt != nil)
goto bad;
n = 0;
while (args[n++])
;
ep = (Execproc*)procp;
q = ep->data;
s = sizeof(ep->data);
if ((l = n*sizeof(char *)) < s) {
_threaddebug(DBGPROC, "args on stack, %d pointers\n", n);
ep->arg = (char **)q;
q += l;
s -= l;
} else
ep->arg = _threadmalloc(l, 0); /* will be leaked */
if ((l = strlen(f) + 1) < s) {
ep->file = q;
strcpy(q, f);
_threaddebug(DBGPROC, "file on stack, %s\n", q);
q += l;
s -= l;
} else
ep->file = strdup(f); /* will be leaked */
ep->arg[--n] = nil;
while (--n >= 0)
if ((l = strlen(args[n]) + 1) < s) {
ep->arg[n] = q;
strcpy(q, args[n]);
_threaddebug(DBGPROC, "arg on stack, %s\n", q);
q += l;
s -= l;
} else
ep->arg[n] = strdup(args[n]); /* will be leaked */
/* committed to exec-ing */
if ((pid = setjmp(p->curthread->env))) {
int i;
Waitmsg *w;
rfork(RFFDG);
close(0);
close(1);
for (i = 3; i < 100; i++)
close(i);
if(pidc != nil)
send(pidc, &pid);
/* wait for exec-ed child */
_threaddebug(DBGCHLD, "Proc %d waiting for exec-ed child %d\n", getpid(), pid);
for(;;){
w = wait();
threadassert(w != nil);
_threaddebug(DBGCHLD, "Child %d exited\n", w->pid);
if(w->pid == pid)
break;
free(w);
}
if ((c = thewaitchan) != nil) { /* global is exposed */
_threaddebug(DBGCHLD, "Sending exit status for exec: real pid %d, fake pid %d, status %s\n", pid, getpid(), w->msg);
sendp(c, w);
}else
free(w);
_threaddebug(DBGPROC, "Exiting (exec)\n");
threadexits("libthread procexec");
}
p->arg = ep;
longjmp(p->oldenv, DOEXEC);
/* No return; */
}
Channel *
threadwaitchan(void)
{
static void *arg[1];
thewaitchan = chancreate(sizeof(Waitmsg*), 0);
return thewaitchan;
}
int
threadgetgrp(void)
{
return (*procp)->curthread->grp;
}
int
threadsetgrp(int ng)
{
int og;
og = (*procp)->curthread->grp;
(*procp)->curthread->grp = ng;
return og;
}
void *
_threadmalloc(long size, int z)
{
void *m;
m = malloc(size);
if (m == nil)
sysfatal("Malloc of size %ld failed: %r\n", size);
setmalloctag(m, getcallerpc(&size));
totalmalloc += size;
if (size > 1000000) {
fprint(2, "Malloc of size %ld, total %ld\n", size, totalmalloc);
abort();
}
if (z)
memset(m, 0, size);
return m;
}
void
threadnonotes(void)
{
char buf[30];
int fd;
sprint(buf, "#p/%d/note", passerpid);
if((fd = open(buf, OWRITE)) >= 0)
write(fd, "kilpasser", 9);
close(fd);
}
static void
runproc(Proc *p)
{
int action, pid, rforkflag;
Proc *pp;
long r;
int id;
char str[32];
r = ~0;
runp:
/* Leave a proc manager */
while ((action = setjmp(p->oldenv))) {
Newproc *np;
Execproc *ne;
p = *procp;
switch(action) {
case DOEXEC:
ne = (Execproc *)p->arg;
if ((pid = rfork(RFPROC|RFMEM|RFREND|RFNOTEG|RFFDG)) < 0) {
exits("doexecproc: fork: %r");
}
if (pid == 0) {
exec(ne->file, ne->arg);
if (_threaddebuglevel & DBGPROC) {
char **a;
fprint(2, "%s: ", ne->file);
a = ne->arg;
while(*a)
fprint(2, "%s, ", *a++);
fprint(2, "0\n");
}
exits("Can't exec");
/* No return */
}
longjmp(p->curthread->env, pid);
/* No return */
case DOEXIT:
_threaddebug(DBGPROC, "at doexit\n");
lock(&pq.lock);
if (pq.head == p) {
pq.head = p->next;
if (pq.tail == p) {
pq.tail = nil;
postnote(PNPROC, passerpid, "kilpasser");
}
} else {
for (pp = pq.head; pp->next; pp = pp->next) {
if(pp->next == p) {
pp->next = p->next;
if (pq.tail == p)
pq.tail = pp;
break;
}
}
}
unlock(&pq.lock);
strncpy(str, p->str, sizeof(str));
garbageproc(p);
exits(str);
case DOPROC:
_threaddebug(DBGPROC, "at doproc\n");
np = (Newproc *)p->arg;
pp = prepproc(np);
id = pp->curthread->id; // get id before fork() to avoid race
rforkflag = np->rforkflag;
free(np);
if ((pid = rfork(rforkflag|RFPROC|RFMEM|RFNOWAIT)) < 0) {
exits("donewproc: fork: %r");
}
if (pid == 0) {
/* Child is the new proc; touch old proc struct no more */
p = pp;
p->pid = getpid();
goto runp;
/* No return */
}
/* Parent, return to caller */
r = id; // better than returning pid
_threaddebug(DBGPROC, "newproc, unswitch stacks\n");
break;
default:
/* `Can't happen' */
threadprint(2, "runproc, can't happen: %d\n", action);
threadassert(0);
}
}
if (_threaddebuglevel & DBGPROC)
threadprint(2, "runproc, longjmp\n");
/* Jump into proc */
*procp = p;
longjmp(p->curthread->env, r);
/* No return */
}
static void
initproc(void (*f)(ulong, int argc, char *argv[]), int argc, char *argv[], uint stacksize)
{
Proc *p;
Newproc *np;
ulong *tos;
ulong *av;
int i;
Execproc ex;
procp = (Proc **)&ex; /* address of the execproc struct */
if (_threaddebuglevel & DBGPROC)
threadprint(2, "Initproc, f = 0x%p, argc = %d, argv = 0x%p\n",
f, argc, argv);
/* Create a stack and fill it */
np = _threadmalloc(sizeof(Newproc), 0);
threadassert(np != nil);
np->stack = _threadmalloc(stacksize, 0);
threadassert(np->stack != nil);
memset(np->stack, 0xFE, stacksize);
tos = (ulong *)(&np->stack[stacksize&(~7)]);
np->stacksize = stacksize;
np->rforkflag = 0;
np->grp = 0;
for (i = 0; i < argc; i++){
char *nargv;
nargv = (char *)tos - (strlen(argv[i]) + 1);
strcpy(nargv, argv[i]);
argv[i] = nargv;
tos = (ulong *)nargv;
}
/* round down to address of char* */
tos = (ulong *)((ulong)tos & ~0x3);
tos -= argc + 1;
/* round down to address of vlong (for the alpha): */
tos = (ulong *)((ulong)tos & ~0x7);
av = tos;
memmove(av, argv, (argc+1)*sizeof(char *));
FIX1;
*--tos = (ulong)av;
*--tos = (ulong)argc;
FIX2;
np->stackptr = tos;
np->launcher = (long)f;
p = prepproc(np);
p->pid = getpid();
free(np);
if (_threaddebuglevel & DBGPROC)
threadprint(2, "calling runproc\n");
runproc(p);
/* no return; */
}
static void
garbageproc(Proc *p)
{
Thread *t, *nextt;
for (t = p->threads.head; t; t = nextt) {
if (t->cmdname)
free(t->cmdname);
threadassert(t->stk != nil);
free(t->stk);
nextt = t->nextt;
free(t);
}
free(p);
}
static void
garbagethread(Thread *t)
{
Proc *p;
Thread *r, *pr;
p = t->proc;
threadassert(*procp == p);
pr = nil;
for (r = p->threads.head; r; r = r->nextt) {
if (r == t)
break;
pr = r;
}
threadassert (r != nil);
if (pr)
pr->nextt = r->nextt;
else
p->threads.head = r->nextt;
if (p->threads.tail == r)
p->threads.tail = pr;
if (t->cmdname)
free(t->cmdname);
threadassert(t->stk != nil);
free(t->stk);
free(t);
}
static void
putq(Tqueue *q, Thread *t)
{
lock(&q->lock);
_threaddebug(DBGQUE, "Putq 0x%lux on 0x%lux, next == 0x%lux",
(ulong)t, (ulong)q, (ulong)t->next);
threadassert((ulong)(t->next) == (ulong)~0);
t->next = nil;
if (q->head == nil) {
q->head = t;
q->tail = t;
} else {
threadassert(q->tail->next == nil);
q->tail->next = t;
q->tail = t;
}
unlock(&q->lock);
}
static Thread *
getq(Tqueue *q)
{
Thread *t;
lock(&q->lock);
if ((t = q->head)) {
q->head = q->head->next;
t->next = (Thread *)~0;
}
unlock(&q->lock);
_threaddebug(DBGQUE, "Getq 0x%lux from 0x%lux", (ulong)t, (ulong)q);
return t;
}
static Thread *
getqbytag(Tqueue *q, ulong tag)
{
Thread *r, *pr, *w, *pw;
w = pr = pw = nil;
_threaddebug(DBGQUE, "Getqbytag 0x%lux", q);
lock(&q->lock);
for (r = q->head; r; r = r->next) {
if (r->tag == tag) {
w = r;
pw = pr;
if (r->proc == *procp) {
/* Locals or blocked remotes are best */
break;
}
}
pr = r;
}
if (w) {
if (pw)
pw->next = w->next;
else
q->head = w->next;
if (q->tail == w){
q->tail = pw;
threadassert(pw == nil || pw->next == nil);
}
w->next = (Thread *)~0;
}
unlock(&q->lock);
_threaddebug(DBGQUE, "Getqbytag 0x%lux from 0x%lux", w, q);
return w;
}
static void
launcher(ulong, void (*f)(void *arg), void *arg)
{
Proc *p;
Thread *t;
p = *procp;
t = p->curthread;
if (t->garbage) {
garbagethread(t->garbage);
t->garbage = nil;
}
(*f)(arg);
threadexits(nil);
/* no return */
}
static void
mainlauncher(ulong, int argc, char *argv[])
{
/* ulong *p; */
/* p = (ulong *)&argc; */
/* fprint(2, "p[-2..2]: %lux %lux %lux %lux %lux\n", */
/* p[-2], p[-1], p[0], p[1], p[2]); */
threadmain(argc, argv);
threadexits(nil);
/* no return */
}
static Proc *
prepproc(Newproc *np)
{
Proc *p;
Thread *t;
/* Create proc and thread structs */
p = _threadmalloc(sizeof(Proc), 1);
t = _threadmalloc(sizeof(Thread), 1);
if (p == nil || t == nil) {
char err[32] = "";
errstr(err, 32);
write(2, err, strlen(err));
write(2, "\n", 1);
exits("procinit: _threadmalloc");
}
t->cmdname = strdup("threadmain");
t->id = nextID();
t->grp = np->grp; /* Inherit grp id */
t->proc = p;
t->state = Running;
t->nextt = nil;
t->next = (Thread *)~0;
t->garbage = nil;
t->stk = np->stack;
t->stksize = np->stacksize;
t->env[JMPBUFPC] = (np->launcher+JMPBUFDPC);
/* -STACKOFF leaves room for old pc and new pc in frame */
t->env[JMPBUFSP] = (ulong)(np->stackptr - STACKOFF);
/*fprint(2, "SP = %lux\n", t->env[JMPBUFSP]); */
p->curthread = t;
p->threads.head = p->threads.tail = t;
p->nthreads = 1;
p->pid = 0;
p->next = nil;
lock(&pq.lock);
if (pq.head == nil) {
pq.head = pq.tail = p;
} else {
threadassert(pq.tail->next == nil);
pq.tail->next = p;
pq.tail = p;
}
unlock(&pq.lock);
return p;
}
int
threadprint(int fd, char *fmt, ...)
{
int n;
va_list arg;
char *buf;
buf = _threadmalloc(2048, 0);
if(buf == nil)
return -1;
va_start(arg, fmt);
doprint(buf, buf+2048, fmt, arg);
va_end(arg);
n = write(fd, buf, strlen(buf));
free(buf);
return n;
}
syntax highlighted by Code2HTML, v. 0.9.1