#include <9pm/u.h> #include <9pm/libc.h> #include <9pm/thread.h> #include "assert.h" static void initproc(void (*)(ulong, int, char*[]), int, char*[], uint); static void garbagethread(Thread *t); static Thread* getqbytag(Tqueue *, ulong); static void putq(Tqueue *, Thread *); static Thread* getq(Tqueue *); static void launcher(ulong, void (*)(void *), void *); static void mainlauncher(ulong, int argc, char *argv[]); static void garbageproc(Proc *); static Proc* prepproc(Newproc *); static long _times(long *t); static int notefd; static int passerpid; static long totalmalloc; static Thread* dontkill; static Tqueue rendez; static Lock rendezlock; int mainstacksize; Channel* thewaitchan; struct Pqueue pq; Proc **procp; /* Pointer to pointer to proc's Proc structure */ void threadsysfatal(char *fmt, va_list arg) { char buf[1024]; doprint(buf, buf+sizeof(buf), fmt, arg); if(argv0) threadprint(2, "%s: %s\n", argv0, buf); else threadprint(2, "%s\n", buf); threadexitsall(buf); } static int nextID(void) { static Lock l; static id; int i; lock(&l); i = ++id; unlock(&l); return i; } void threadexits(char *exitstr) { Thread *t, *new; Proc *p; Channel *c; _threaddebug(DBGTHRD|DBGKILL, "Exitthread()"); p = *procp; t = p->curthread; if (p->nthreads > 1) { t->exiting = 1; p->nthreads--; lock(&rendezlock); while ((new = getq(&p->runnable)) == nil) { _threaddebug(DBGTHRD, "Nothing left to run"); /* called with rendezlock held */ p->blocked = 1; unlock(&rendezlock); if ((new = (Thread *)rendezvous((ulong)p, 0)) != (Thread *)~0) { threadassert(!p->blocked); threadassert(new->proc == p); p->curthread = new; new->state = Running; /* switch to new thread, pass it `t' for cleanup */ new->garbage = t; longjmp(new->env, (int)t); /* No return */ } _threaddebug(DBGNOTE|DBGTHRD, "interrupted"); lock(&rendezlock); } unlock(&rendezlock); _threaddebug(DBGTHRD, "Yield atexit to %d.%d", p->pid, new->id); p->curthread = new; new->state = Running; if (new->exiting) threadexits(nil); /* switch to new thread, pass it `t' for cleanup */ new->garbage = t; longjmp(new->env, (int)t); /* no return */ } /* * thewaitchan confounds exiting the entire program, so handle it * carefully; store exposed global in local variable c. */ if ((c = thewaitchan) != nil) { Waitmsg *w; long t[4]; /* create a Waitmsg out of whole cloth; this proc was created RFNOWAIT */ w = _threadmalloc(sizeof(Waitmsg)+(exitstr==nil? 0 : strlen(exitstr)+1), 0); w->pid = p->pid; w->time[2] = _times(t); /* real */ w->time[0] = t[0]+t[2]; /* user + child user */ w->time[1] = t[1]+t[3]; /* sys + child sys */ w->msg = ""; if(exitstr != nil){ w->msg = (char*)(&w[1]); strcpy(w->msg, exitstr); } _threaddebug(DBGCHLD, "In thread %s: sending exit status %s for %d\n", p->curthread->cmdname, exitstr, p->pid); sendp(c, w); } _threaddebug(DBGPROC, "Exiting\n"); t->exiting = 1; if(exitstr == nil) p->str[0] = '\0'; else strncpy(p->str, exitstr, sizeof(p->str)); p->nthreads--; /* Clean up and exit */ longjmp(p->oldenv, DOEXIT); } static Thread * threadof(int id) { Tproc *pp; Thread *t; lock(&pq.lock); for (pp = pq.head; pp->next; pp = pp->next) for (t = pp->threads.head; t; t = t->nextt) if (t->id == id) { unlock(&pq.lock); return t; } unlock(&pq.lock); return nil; } int threadid(void) { Tproc *p; p = getproc()->threadpriv; return p->curthread->id; } static char* skip(char *p) { while(*p == ' ') p++; while(*p != ' ' && *p != 0) p++; return p; } int threadpid(int id) { Tproc *pp; Thread *t; if (id < 0) return id; if (id == 0){ pp = getproc()->threadpriv; return pp->pid; } lock(&pq.lock); for (pp = pq.head; pp->next; pp = pp->next) for (t = pp->threads.head; t; t = t->nextt) if (t->id == id) { unlock(&pq.lock); return pp->pid; } unlock(&pq.lock); return -1; } void yield(void) { Thread *new, *t; Proc *p; ulong thr; p = *procp; t = p->curthread; if (t->exiting) { _threaddebug(DBGTHRD|DBGKILL, "Exiting in yield()"); threadexits(nil); /* no return */ } if ((new = getq(&p->runnable)) == nil) { _threaddebug(DBGTHRD, "Nothing to yield for"); return; /* Nothing to yield for */ } if ((thr = setjmp(p->curthread->env))) { if (thr != ~0) garbagethread((Thread *)thr); if ((*procp)->curthread->exiting) threadexits(nil); return; /* Return from yielding */ } putq(&p->runnable, p->curthread); p->curthread->state = Runnable; _threaddebug(DBGTHRD, "Yielding to %d.%d", p->pid, new->id); p->curthread = new; new->state = Running; longjmp(new->env, ~0); /* no return */ } ulong _threadrendezvous(ulong tag, ulong value) { Proc *p; Thread *this, *that, *new; ulong v, t; p = *procp; this = p->curthread; lock(&rendezlock); _threaddebug(DBGREND, "rendezvous tag %lux", tag); /* find a thread waiting in a rendezvous on tag */ that = getqbytag(&rendez, tag); /* if a thread in same or another proc waiting, rendezvous */ if (that) { _threaddebug(DBGREND, "waiting proc is %lud.%d", that->proc->pid, that->id); threadassert(that->state == Rendezvous); /* exchange values */ v = that->value; that->value = value; /* remove from rendez-vous queue */ that->state = Runnable; if (that->proc->blocked) { threadassert(that->proc != p); that->proc->blocked = 0; _threaddebug(DBGREND, "unblocking rendezvous, tag = %lux", (ulong)that->proc); unlock(&rendezlock); /* `Non-blocking' rendez-vous */ while (rendezvous((ulong)that->proc, (ulong)that) == ~0) { _threaddebug(DBGNOTE|DBGTHRD, "interrupted"); if (this->exiting) { _threaddebug(DBGNOTE|DBGTHRD, "and committing suicide"); threadexits(nil); } } } else { putq(&that->proc->runnable, that); unlock(&rendezlock); } yield(); return v; } _threaddebug(DBGREND, "blocking"); /* Mark this thread waiting */ this->value = value; this->state = Rendezvous; this->tag = tag; putq(&rendez, this); /* Look for runnable threads */ new = getq(&p->runnable); if (new == nil) { /* No other thread runnable, rendezvous */ p->blocked = 1; _threaddebug(DBGREND, "blocking rendezvous, tag = %lux", (ulong)p); unlock(&rendezlock); while ((new = (Thread *)rendezvous((ulong)p, 0)) == (Thread *)~0) { _threaddebug(DBGNOTE|DBGTHRD, "interrupted"); if (this->exiting) { _threaddebug(DBGNOTE|DBGTHRD, "and committing suicide"); threadexits(nil); } } threadassert(!p->blocked); threadassert(new->proc == p); if (new == this) { this->state = Running; if (this->exiting) threadexits(nil); return this->value; } _threaddebug(DBGREND, "after rendezvous, new thread is %lux, id %d", new, new->id); } else { unlock(&rendezlock); } if ((t = setjmp(p->curthread->env))) { if (t != ~0) garbagethread((Thread *)t); _threaddebug(DBGREND, "unblocking"); threadassert(p->curthread->next == (Thread *)~0); this->state = Running; if (p->curthread->exiting) { _threaddebug(DBGKILL, "Exiting after rendezvous"); threadexits(nil); } return this->value; } _threaddebug(DBGREND|DBGTHRD, "Scheduling %lud.%d", new->proc->pid, new->id); p->curthread = new; new->state = Running; longjmp(new->env, ~0); /* no return */ return ~0; /* Not called */ } void threadsetname(char *name) { Thread *t = (*procp)->curthread; if (t->cmdname) free(t->cmdname); t->cmdname = strdup(name); } char *threadgetname(void) { return (*procp)->curthread->cmdname; } ulong* procdata(void) { return &(*procp)->udata; } ulong* threaddata(void) { return &(*procp)->curthread->udata; } int procrfork(void (*f)(void *), void *arg, uint stacksize, int rforkflag) { Newproc *np; Proc *p; int id; ulong *tos; p = *procp; /* Save old stack position */ if ((id = setjmp(p->curthread->env))) { _threaddebug(DBGPROC, "newproc, return\n"); return id; /* Return with pid of new proc */ } np = _threadmalloc(sizeof(Newproc), 0); threadassert(np != nil); _threaddebug(DBGPROC, "newproc, creating stack\n"); /* Create a stack */ np->stack = _threadmalloc(stacksize, 0); threadassert(np->stack != nil); memset(np->stack, 0xFE, stacksize); tos = (ulong *)(&np->stack[stacksize&(~7)]); FIX1; *--tos = (ulong)arg; *--tos = (ulong)f; FIX2; np->stacksize = stacksize; np->stackptr = tos; np->grp = p->curthread->grp; np->launcher = (long)launcher; np->rforkflag = rforkflag; _threaddebug(DBGPROC, "newproc, switch stacks\n"); /* Goto unshared stack and fire up new process */ p->arg = np; longjmp(p->oldenv, DOPROC); /* no return */ return -1; } int proccreate(void (*f)(void*), void *arg, uint stacksize) { return procrfork(f, arg, stacksize, 0); } int threadcreate(void (*f)(void *arg), void *arg, uint stacksize) { Thread *child; ulong *tos; Proc *p; p = *procp; if (stacksize < 32) { werrstr("%s", "stacksize"); return -1; } if ((child = _threadmalloc(sizeof(Thread), 0)) == nil || (child->stk = _threadmalloc(child->stksize = stacksize, 0)) == nil) { if (child) free(child); werrstr("%s", "_threadmalloc"); return -1; } memset(child->stk, 0xFE, stacksize); p->nthreads++; child->cmdname = nil; child->id = nextID(); child->proc = p; tos = (ulong *)(&child->stk[stacksize&~7]); FIX1; *--tos = (ulong)arg; *--tos = (ulong)f; FIX2; /* Insert a dummy argument on 386 */ child->env[JMPBUFPC] = ((ulong)launcher+JMPBUFDPC); /* -STACKOFF leaves room for old pc and new pc in frame */ child->env[JMPBUFSP] = (ulong)(tos - STACKOFF); child->state = Runnable; child->exiting = 0; child->nextt = nil; if (p->threads.head == nil) { p->threads.head = p->threads.tail = child; } else { p->threads.tail->nextt = child; p->threads.tail = child; } child->next = (Thread *)~0; child->garbage = nil; putq(&p->runnable, child); return child->id; } void procexecl(Channel *pidc, char *f, ...) { procexec(pidc, f, &f+1); } void procexec(Channel *pidc, char *f, char *args[]) { Proc *p; Dir *d; int n, pid; Execproc *ep; Channel *c; char *q; int s, l; /* make sure exec is likely to succeed before tossing thread state */ d = dirstat(f); if(d == nil || (d->mode & DMDIR) || access(f, AEXEC) < 0) { free(d); bad: if (pidc) sendul(pidc, ~0); return; } free(d); p = *procp; if(p->threads.head != p->curthread || p->threads.head->nextt != nil) goto bad; n = 0; while (args[n++]) ; ep = (Execproc*)procp; q = ep->data; s = sizeof(ep->data); if ((l = n*sizeof(char *)) < s) { _threaddebug(DBGPROC, "args on stack, %d pointers\n", n); ep->arg = (char **)q; q += l; s -= l; } else ep->arg = _threadmalloc(l, 0); /* will be leaked */ if ((l = strlen(f) + 1) < s) { ep->file = q; strcpy(q, f); _threaddebug(DBGPROC, "file on stack, %s\n", q); q += l; s -= l; } else ep->file = strdup(f); /* will be leaked */ ep->arg[--n] = nil; while (--n >= 0) if ((l = strlen(args[n]) + 1) < s) { ep->arg[n] = q; strcpy(q, args[n]); _threaddebug(DBGPROC, "arg on stack, %s\n", q); q += l; s -= l; } else ep->arg[n] = strdup(args[n]); /* will be leaked */ /* committed to exec-ing */ if ((pid = setjmp(p->curthread->env))) { int i; Waitmsg *w; rfork(RFFDG); close(0); close(1); for (i = 3; i < 100; i++) close(i); if(pidc != nil) send(pidc, &pid); /* wait for exec-ed child */ _threaddebug(DBGCHLD, "Proc %d waiting for exec-ed child %d\n", getpid(), pid); for(;;){ w = wait(); threadassert(w != nil); _threaddebug(DBGCHLD, "Child %d exited\n", w->pid); if(w->pid == pid) break; free(w); } if ((c = thewaitchan) != nil) { /* global is exposed */ _threaddebug(DBGCHLD, "Sending exit status for exec: real pid %d, fake pid %d, status %s\n", pid, getpid(), w->msg); sendp(c, w); }else free(w); _threaddebug(DBGPROC, "Exiting (exec)\n"); threadexits("libthread procexec"); } p->arg = ep; longjmp(p->oldenv, DOEXEC); /* No return; */ } Channel * threadwaitchan(void) { static void *arg[1]; thewaitchan = chancreate(sizeof(Waitmsg*), 0); return thewaitchan; } int threadgetgrp(void) { return (*procp)->curthread->grp; } int threadsetgrp(int ng) { int og; og = (*procp)->curthread->grp; (*procp)->curthread->grp = ng; return og; } void * _threadmalloc(long size, int z) { void *m; m = malloc(size); if (m == nil) sysfatal("Malloc of size %ld failed: %r\n", size); setmalloctag(m, getcallerpc(&size)); totalmalloc += size; if (size > 1000000) { fprint(2, "Malloc of size %ld, total %ld\n", size, totalmalloc); abort(); } if (z) memset(m, 0, size); return m; } void threadnonotes(void) { char buf[30]; int fd; sprint(buf, "#p/%d/note", passerpid); if((fd = open(buf, OWRITE)) >= 0) write(fd, "kilpasser", 9); close(fd); } static void runproc(Proc *p) { int action, pid, rforkflag; Proc *pp; long r; int id; char str[32]; r = ~0; runp: /* Leave a proc manager */ while ((action = setjmp(p->oldenv))) { Newproc *np; Execproc *ne; p = *procp; switch(action) { case DOEXEC: ne = (Execproc *)p->arg; if ((pid = rfork(RFPROC|RFMEM|RFREND|RFNOTEG|RFFDG)) < 0) { exits("doexecproc: fork: %r"); } if (pid == 0) { exec(ne->file, ne->arg); if (_threaddebuglevel & DBGPROC) { char **a; fprint(2, "%s: ", ne->file); a = ne->arg; while(*a) fprint(2, "%s, ", *a++); fprint(2, "0\n"); } exits("Can't exec"); /* No return */ } longjmp(p->curthread->env, pid); /* No return */ case DOEXIT: _threaddebug(DBGPROC, "at doexit\n"); lock(&pq.lock); if (pq.head == p) { pq.head = p->next; if (pq.tail == p) { pq.tail = nil; postnote(PNPROC, passerpid, "kilpasser"); } } else { for (pp = pq.head; pp->next; pp = pp->next) { if(pp->next == p) { pp->next = p->next; if (pq.tail == p) pq.tail = pp; break; } } } unlock(&pq.lock); strncpy(str, p->str, sizeof(str)); garbageproc(p); exits(str); case DOPROC: _threaddebug(DBGPROC, "at doproc\n"); np = (Newproc *)p->arg; pp = prepproc(np); id = pp->curthread->id; // get id before fork() to avoid race rforkflag = np->rforkflag; free(np); if ((pid = rfork(rforkflag|RFPROC|RFMEM|RFNOWAIT)) < 0) { exits("donewproc: fork: %r"); } if (pid == 0) { /* Child is the new proc; touch old proc struct no more */ p = pp; p->pid = getpid(); goto runp; /* No return */ } /* Parent, return to caller */ r = id; // better than returning pid _threaddebug(DBGPROC, "newproc, unswitch stacks\n"); break; default: /* `Can't happen' */ threadprint(2, "runproc, can't happen: %d\n", action); threadassert(0); } } if (_threaddebuglevel & DBGPROC) threadprint(2, "runproc, longjmp\n"); /* Jump into proc */ *procp = p; longjmp(p->curthread->env, r); /* No return */ } static void initproc(void (*f)(ulong, int argc, char *argv[]), int argc, char *argv[], uint stacksize) { Proc *p; Newproc *np; ulong *tos; ulong *av; int i; Execproc ex; procp = (Proc **)&ex; /* address of the execproc struct */ if (_threaddebuglevel & DBGPROC) threadprint(2, "Initproc, f = 0x%p, argc = %d, argv = 0x%p\n", f, argc, argv); /* Create a stack and fill it */ np = _threadmalloc(sizeof(Newproc), 0); threadassert(np != nil); np->stack = _threadmalloc(stacksize, 0); threadassert(np->stack != nil); memset(np->stack, 0xFE, stacksize); tos = (ulong *)(&np->stack[stacksize&(~7)]); np->stacksize = stacksize; np->rforkflag = 0; np->grp = 0; for (i = 0; i < argc; i++){ char *nargv; nargv = (char *)tos - (strlen(argv[i]) + 1); strcpy(nargv, argv[i]); argv[i] = nargv; tos = (ulong *)nargv; } /* round down to address of char* */ tos = (ulong *)((ulong)tos & ~0x3); tos -= argc + 1; /* round down to address of vlong (for the alpha): */ tos = (ulong *)((ulong)tos & ~0x7); av = tos; memmove(av, argv, (argc+1)*sizeof(char *)); FIX1; *--tos = (ulong)av; *--tos = (ulong)argc; FIX2; np->stackptr = tos; np->launcher = (long)f; p = prepproc(np); p->pid = getpid(); free(np); if (_threaddebuglevel & DBGPROC) threadprint(2, "calling runproc\n"); runproc(p); /* no return; */ } static void garbageproc(Proc *p) { Thread *t, *nextt; for (t = p->threads.head; t; t = nextt) { if (t->cmdname) free(t->cmdname); threadassert(t->stk != nil); free(t->stk); nextt = t->nextt; free(t); } free(p); } static void garbagethread(Thread *t) { Proc *p; Thread *r, *pr; p = t->proc; threadassert(*procp == p); pr = nil; for (r = p->threads.head; r; r = r->nextt) { if (r == t) break; pr = r; } threadassert (r != nil); if (pr) pr->nextt = r->nextt; else p->threads.head = r->nextt; if (p->threads.tail == r) p->threads.tail = pr; if (t->cmdname) free(t->cmdname); threadassert(t->stk != nil); free(t->stk); free(t); } static void putq(Tqueue *q, Thread *t) { lock(&q->lock); _threaddebug(DBGQUE, "Putq 0x%lux on 0x%lux, next == 0x%lux", (ulong)t, (ulong)q, (ulong)t->next); threadassert((ulong)(t->next) == (ulong)~0); t->next = nil; if (q->head == nil) { q->head = t; q->tail = t; } else { threadassert(q->tail->next == nil); q->tail->next = t; q->tail = t; } unlock(&q->lock); } static Thread * getq(Tqueue *q) { Thread *t; lock(&q->lock); if ((t = q->head)) { q->head = q->head->next; t->next = (Thread *)~0; } unlock(&q->lock); _threaddebug(DBGQUE, "Getq 0x%lux from 0x%lux", (ulong)t, (ulong)q); return t; } static Thread * getqbytag(Tqueue *q, ulong tag) { Thread *r, *pr, *w, *pw; w = pr = pw = nil; _threaddebug(DBGQUE, "Getqbytag 0x%lux", q); lock(&q->lock); for (r = q->head; r; r = r->next) { if (r->tag == tag) { w = r; pw = pr; if (r->proc == *procp) { /* Locals or blocked remotes are best */ break; } } pr = r; } if (w) { if (pw) pw->next = w->next; else q->head = w->next; if (q->tail == w){ q->tail = pw; threadassert(pw == nil || pw->next == nil); } w->next = (Thread *)~0; } unlock(&q->lock); _threaddebug(DBGQUE, "Getqbytag 0x%lux from 0x%lux", w, q); return w; } static void launcher(ulong, void (*f)(void *arg), void *arg) { Proc *p; Thread *t; p = *procp; t = p->curthread; if (t->garbage) { garbagethread(t->garbage); t->garbage = nil; } (*f)(arg); threadexits(nil); /* no return */ } static void mainlauncher(ulong, int argc, char *argv[]) { /* ulong *p; */ /* p = (ulong *)&argc; */ /* fprint(2, "p[-2..2]: %lux %lux %lux %lux %lux\n", */ /* p[-2], p[-1], p[0], p[1], p[2]); */ threadmain(argc, argv); threadexits(nil); /* no return */ } static Proc * prepproc(Newproc *np) { Proc *p; Thread *t; /* Create proc and thread structs */ p = _threadmalloc(sizeof(Proc), 1); t = _threadmalloc(sizeof(Thread), 1); if (p == nil || t == nil) { char err[32] = ""; errstr(err, 32); write(2, err, strlen(err)); write(2, "\n", 1); exits("procinit: _threadmalloc"); } t->cmdname = strdup("threadmain"); t->id = nextID(); t->grp = np->grp; /* Inherit grp id */ t->proc = p; t->state = Running; t->nextt = nil; t->next = (Thread *)~0; t->garbage = nil; t->stk = np->stack; t->stksize = np->stacksize; t->env[JMPBUFPC] = (np->launcher+JMPBUFDPC); /* -STACKOFF leaves room for old pc and new pc in frame */ t->env[JMPBUFSP] = (ulong)(np->stackptr - STACKOFF); /*fprint(2, "SP = %lux\n", t->env[JMPBUFSP]); */ p->curthread = t; p->threads.head = p->threads.tail = t; p->nthreads = 1; p->pid = 0; p->next = nil; lock(&pq.lock); if (pq.head == nil) { pq.head = pq.tail = p; } else { threadassert(pq.tail->next == nil); pq.tail->next = p; pq.tail = p; } unlock(&pq.lock); return p; } int threadprint(int fd, char *fmt, ...) { int n; va_list arg; char *buf; buf = _threadmalloc(2048, 0); if(buf == nil) return -1; va_start(arg, fmt); doprint(buf, buf+2048, fmt, arg); va_end(arg); n = write(fd, buf, strlen(buf)); free(buf); return n; }