#include #include #define NODEFINE #include <9pm/u.h> #include <9pm/libc.h> #include <9pm/ns.h> #include <9pm/thread.h> #include <9pm/threadimpl.h> static void forkpasser(void); static void procsched(Tproc*); int mainstacksize; static int passerpid; static int notefd; /* for use only by passer */ static Tproc **procp; static Proc thesysproc; typedef struct Mainarg Mainarg; struct Mainarg { int argc; char **argv; }; static void mainlauncher(void *arg) { Mainarg *a; a = arg; threadmain(a->argc, a->argv); } void pm_main(int argc, char *argv[]) { int i, sz; char **nargv, *s; Mainarg *a; Tproc *p; Tproc *mainprocp; extern void (*pm__sysfatal)(char *fmt, va_list arg); //_threaddebuglevel = DBGPROC; mainprocp = nil; procp = &mainprocp; pm__qlockinit(_threadrendezvous); pm__sysfatal = _threadsysfatal; pm_quotefmtinstall(); /* for chanprint */ if(mainstacksize == 0) mainstacksize = 512*1024; /* Fork off the passer, set up the process groups. */ forkpasser(); /* * On Plan 9, the system stack is not shared. * Argv is pretty much the only pointer that a threaded * program would see that points at unshared memory, * so we might as well copy the arguments into shared * memory. */ sz = sizeof(char*)*(argc+1); for(i=0; iargc = argc; a->argv = nargv; p = _newproc(mainlauncher, a, mainstacksize, "threadmain", 0, 0); assert(p->curthread != nil); *procp = p; p->sysproc = thesysproc; _threaddebug(DBGPROC, "calling procsched"); procsched(p); /* not reached */ } /* * The main process runs threadmain() but sits in a different note group * so the library can use notes internally without disturbing other processes. * A child process is created, the "passer", and left in the * original group to pass interrupts, etc. on to the main group. Synchronization * between the two processes across the fork guarantees that the original * note group survives, in case anyone has its notepg file open. There is * a race, though, as to whether someone using the main process's pid * as the route to the notepg file will find the original group or the new one. */ static int notehandler(void*, char *note) { _threaddebug(DBGNOTE, "Notepasser %d got note %s", passerpid, note); if(pm_strcmp(note, "threadsignal") == 0){ if(_threadnonotes) _exits("no notes"); write(notefd, note, strlen(note)); if(_threadexitsallstatus) _exits(_threadexitsallstatus); return 1; } return 0; } static void kidnotifier(void *a, char *note) { USED(a); /* * YOU CANNOT PRINT IN THIS FUNCTION. * Printing will lock the fgrp to acquire a Chan*, * which will fail; if the proc happened to hold this * lock already we'll deadlock. * * Atnotify-registered handlers get run by notehandler, * which runs only in the passer (which doesn't hold locks). * We only do internal thread stuff here. * * _threaddebug is okay, since it bypasses the 9pm library. */ _threaddebug(DBGNOTE, "note '%s' status '%s'", note, _threadexitsallstatus); if(strcmp(note, "threadsignal") == 0){ if(_threadexitsallstatus) _exits(_threadexitsallstatus); /* * The other reason to get signaled is threadkillgrp * or threadkill. If a non-current thread is to die, it * will die next time it would be scheduled. If the current * thread is to die, it will do so once it yields. The signal * may have interrupted an I/O operation. */ noted(NCONT); } noted(NDFLT); } static void forkpasser(void) { char buf[40], err[32]; int fd, mainpid; Tproc *tp; mainpid = getpid(); rfork(RFREND); pm_atnotify(notehandler, 1); switch(rfork(RFPROC|RFMEM|RFNAMEG|RFCFDG)){ case -1: threadassert(0); case 0: /* handle notes */ tp = _threadmalloc(sizeof(*tp), 1); *procp = tp; tp->sysproc.fgrp = _threadmalloc(sizeof(Fgrp), 1); pm_incref(&tp->sysproc.fgrp->ref); if(_threaddebuglevel){ fd = open("/dev/cons", OWRITE); dup(fd, 2); } rfork(RFCNAMEG); rendezvous(0, 0); /* wait for parent to switch note groups */ passerpid = getpid(); sprint(buf, "#p/%d/notepg", mainpid); /* notepg of main process's note group */ notefd = open(buf, OWRITE|OCEXEC); _threaddebug(DBGNOTE, "Passer is %d", passerpid); rendezvous(1, 0); /* tell parent we've opened notepg, can continue */ for(;;){ if(sleep(120*1000) < 0){ errstr(err, sizeof(err)); if (strcmp(err, "interrupted") != 0) break; } } _threaddebug(DBGNOTE, "Passer %d exits", passerpid); exits("interrupted"); default: /* put main process in a different note group */ rfork(RFNOTEG); notify(kidnotifier); rendezvous(0, 0); /* tell child we switched note groups */ rendezvous(1, 0); /* wait for child to open notepg */ return; } } typedef struct Uchan Uchan; struct Uchan { int sysfd; Cname* syspath; }; /* * The general structure of procsched is system-independent, * but the system-dependent parts are special enough that it's * very hard to tease them out into separate functions. */ static void procsched(Tproc *self) { char exitstr[ERRMAX]; Chan *c; Uchan *u; int action, i, r; Fgrp *f; Execproc *e; Tproc *p; int KNOWS_ABOUT_STRUCT_UCHAN_FROM_DEVFS_C; _threaddebug(DBGPROC, "procsched self=%p", self); Top: self->pid = getpid(); self->sysproc.pid = self->pid; r = ~0; for(;;){ _threaddebug(DBGPROC, "procsched run", self->pid, self->curthread->id); switch(action=_threadswtch(self->oldenv, self->curthread->env, r)){ default: /* can't happen */ threadprint(2, "runproc, can't happen: %d\n", action); threadassert(0); case DOEXEC: /* fork off a program, return pid */ self = _threadgetproc(); _threaddebug(DBGPROC, "doexec"); e = &self->execproc; rfork(RFFDG|RFENVG); f = pm_getproc()->fgrp; if(f->maxfd > 0){ for(i=0; i<3; i++){ c = f->fd[i]; if(c==nil) { _threaddebug(DBGPROC, "pm fd %d is closed", i); continue; } u = c->aux; assert(u->sysfd >= 0); while(u->sysfd != i && u->sysfd < 3) u->sysfd = dup(u->sysfd, -1); _threaddebug(DBGPROC, "pm fd %d => sysfd %d", i, u->sysfd); } for(i=0; i<3; i++){ c = f->fd[i]; if(c==nil){ close(i); continue; } u = c->aux; if(u->sysfd != i){ _threaddebug(DBGPROC, "sysfd %d => %d", u->sysfd, i); dup(u->sysfd, i); } } for(i=3; i<100; i++) close(i); } switch(r = rfork(RFPROC|RFREND|RFNOTEG|RFFDG)){ case -1: pm_oserror(); pm_sysfatal("thread doexec: rfork: %r"); case 0: exec(e->file, e->arg); pm_oserror(); _threaddebug(DBGPROC, "exec %s [argv0=%s] failed: %r", e->file, e->arg[0]); exits("exec"); default: continue; /* return r to caller */ } /* not reached */ abort(); case DOPROC: /* start a new Tproc running */ self = _threadgetproc(); _threaddebug(DBGPROC, "doproc"); p = self->arg; r = p->curthread->id; switch(rfork(p->rforkflag|RFPROC|RFMEM|RFNOWAIT)){ case -1: pm_oserror(); pm_sysfatal("thread doproc: rfork: %r"); case 0: /* * procsched(p), but if we did that we would * use unbounded amounts of stack to set up * a linear process tree. The goto avoids growing * the stack. */ self = p; *procp = self; goto Top; default: continue; /* return r to caller */ } /* not reached */ abort(); case DOEXIT: /* destroy this proc */ self = _threadgetproc(); _threaddebug(DBGPROC, "doexit"); /* remove ourself from the list of active procs */ lock(&_threadpq.lk); if(_threadpq.head == self){ _threadpq.head = self->next; if(_threadpq.tail == self){ _threadpq.tail = nil; /* no procs left, signal the passer to exit */ _threadsignalpasser(); } }else{ for(p=_threadpq.head; p->next; p=p->next){ if(p->next == self){ p->next = self->next; if(_threadpq.tail == self) _threadpq.tail = p; break; } } } unlock(&_threadpq.lk); /* * Clean up and exit. Remember that we're on the * system stack segment (not a malloced stack), * thus it is okay to free self and all associated data. */ utfecpy(exitstr, exitstr+sizeof exitstr, self->exitstr); _freeproc(self); _exits(exitstr); } /* not reached */ abort(); } } /* * Threadtimes is the equivalent of the times() library call. * On Plan 9, using times() burns you because we have closed * times's file descriptor out from under it. */ static char* skip(char *p) { while(*p == ' ') p++; while(*p != ' ' && *p != 0) p++; return p; } long _threadtimes(long *t) { char b[200], *p; int f; ulong r; memset(b, 0, sizeof(b)); f = open("/dev/cputime", OREAD|OCEXEC); if(f < 0) return 0; if(read(f, b, sizeof(b)) <= 0){ close(f); return 0; } p = b; if(t) t[0] = atol(p); p = skip(p); if(t) t[1] = atol(p); p = skip(p); r = atol(p); if(t){ p = skip(p); t[2] = atol(p); p = skip(p); t[3] = atol(p); } return r; } void _threadsignal(void) { pm_postnote(PNGROUP, getpid(), "threadsignal"); } void _threadsignalpasser(void) { pm_postnote(PNPROC, passerpid, "threadsignal"); } void pm_nap(void) { while(rendezvous(getpid(), 0) == ~0) ; } void pm_wake(Proc *p) { rendezvous(p->pid, 0); } void _procexecwait(int pid) { Channel *c; Waitmsg *w, *nw; close(0); close(1); _threaddebug(DBGCHLD, "Proc %d waiting for exec-ed child %d", getpid(), pid); for(;;){ while((w = wait()) == nil){ char e[ERRMAX]; rerrstr(e, sizeof e); if(strstr(e, "interrupted") == nil) threadassert(0); } _threaddebug(DBGCHLD, "Child %d exited", w->pid); if(w->pid == pid) break; free(w); } if ((c = _threadwaitchan) != nil) { /* global is exposed */ _threaddebug(DBGCHLD, "Sending exit status for exec: pid %d, waiter pid %d, status %s", pid, getpid(), w->msg); /* * w is malloced with the system malloc. * we need to pass a pointer that is malloced with the 9pm malloc. */ nw = _threadmalloc(sizeof(*w)+strlen(w->msg)+1, 1); *nw = *w; nw->msg = (char*)&nw[1]; strcpy(nw->msg, w->msg); sendp(c, nw); } free(w); } Proc* pm_getproc(void) { if(procp && *procp) return &(*procp)->sysproc; return &thesysproc; } Tproc* _threadgetproc(void) { return *procp; } Thread* _threadgetthr(void) { return (*procp)->curthread; } void _threaddebug(ulong l, char *fmt, ...) { va_list arg; Tproc *p; int n; char buf[256]; p = _threadgetproc(); if ((l & _threaddebuglevel) == 0) return; if(p->curthread) n = pm_sprint(buf, "%d.%d ", p->pid, p->curthread->id); else n = pm_sprint(buf, "%d.nothread ", p->pid); va_start(arg, fmt); n = pm_doprint(buf+n, buf+sizeof(buf)-1, fmt, arg) - buf; va_end(arg); buf[n] = '\n'; write(2, buf, n+1); } void _threadassert(char *s) { char buf[256]; Tproc *p; p = _threadgetproc(); if(p && p->curthread){ pm_sprint(buf, "%d.%d ", p->pid, p->curthread->id); write(2, buf, strlen(buf)); } pm_snprint(buf, sizeof(buf), "libthread: %s: assertion failed\n", s); write(2, buf, strlen(buf)); notify(nil); abort(); }