#include <u.h>
#include <libc.h>
#define NODEFINE
#include <9pm/u.h>
#include <9pm/libc.h>
#include <9pm/ns.h>
#include <9pm/thread.h>
#include <9pm/threadimpl.h>
static void forkpasser(void);
static void procsched(Tproc*);
int mainstacksize;
static int passerpid;
static int notefd; /* for use only by passer */
static Tproc **procp;
static Proc thesysproc;
typedef struct Mainarg Mainarg;
struct Mainarg
{
int argc;
char **argv;
};
static void
mainlauncher(void *arg)
{
Mainarg *a;
a = arg;
threadmain(a->argc, a->argv);
}
void
pm_main(int argc, char *argv[])
{
int i, sz;
char **nargv, *s;
Mainarg *a;
Tproc *p;
Tproc *mainprocp;
extern void (*pm__sysfatal)(char *fmt, va_list arg);
//_threaddebuglevel = DBGPROC;
mainprocp = nil;
procp = &mainprocp;
pm__qlockinit(_threadrendezvous);
pm__sysfatal = _threadsysfatal;
pm_quotefmtinstall(); /* for chanprint */
if(mainstacksize == 0)
mainstacksize = 512*1024;
/* Fork off the passer, set up the process groups. */
forkpasser();
/*
* On Plan 9, the system stack is not shared.
* Argv is pretty much the only pointer that a threaded
* program would see that points at unshared memory,
* so we might as well copy the arguments into shared
* memory.
*/
sz = sizeof(char*)*(argc+1);
for(i=0; i<argc; i++)
sz += strlen(argv[i])+1;
nargv = _threadmalloc(sz, 0);
s = (char*)(nargv+argc+1);
for(i=0; i<argc; i++){
strcpy(s, argv[i]);
nargv[i] = s;
s += strlen(s)+1;
}
nargv[i] = nil;
a = _threadmalloc(sizeof *a, 0);
a->argc = argc;
a->argv = nargv;
p = _newproc(mainlauncher, a, mainstacksize, "threadmain", 0, 0);
assert(p->curthread != nil);
*procp = p;
p->sysproc = thesysproc;
_threaddebug(DBGPROC, "calling procsched");
procsched(p);
/* not reached */
}
/*
* The main process runs threadmain() but sits in a different note group
* so the library can use notes internally without disturbing other processes.
* A child process is created, the "passer", and left in the
* original group to pass interrupts, etc. on to the main group. Synchronization
* between the two processes across the fork guarantees that the original
* note group survives, in case anyone has its notepg file open. There is
* a race, though, as to whether someone using the main process's pid
* as the route to the notepg file will find the original group or the new one.
*/
static int
notehandler(void*, char *note)
{
_threaddebug(DBGNOTE, "Notepasser %d got note %s", passerpid, note);
if(pm_strcmp(note, "threadsignal") == 0){
if(_threadnonotes)
_exits("no notes");
write(notefd, note, strlen(note));
if(_threadexitsallstatus)
_exits(_threadexitsallstatus);
return 1;
}
return 0;
}
static void
kidnotifier(void *a, char *note)
{
USED(a);
/*
* YOU CANNOT PRINT IN THIS FUNCTION.
* Printing will lock the fgrp to acquire a Chan*,
* which will fail; if the proc happened to hold this
* lock already we'll deadlock.
*
* Atnotify-registered handlers get run by notehandler,
* which runs only in the passer (which doesn't hold locks).
* We only do internal thread stuff here.
*
* _threaddebug is okay, since it bypasses the 9pm library.
*/
_threaddebug(DBGNOTE, "note '%s' status '%s'", note, _threadexitsallstatus);
if(strcmp(note, "threadsignal") == 0){
if(_threadexitsallstatus)
_exits(_threadexitsallstatus);
/*
* The other reason to get signaled is threadkillgrp
* or threadkill. If a non-current thread is to die, it
* will die next time it would be scheduled. If the current
* thread is to die, it will do so once it yields. The signal
* may have interrupted an I/O operation.
*/
noted(NCONT);
}
noted(NDFLT);
}
static void
forkpasser(void)
{
char buf[40], err[32];
int fd, mainpid;
Tproc *tp;
mainpid = getpid();
rfork(RFREND);
pm_atnotify(notehandler, 1);
switch(rfork(RFPROC|RFMEM|RFNAMEG|RFCFDG)){
case -1:
threadassert(0);
case 0:
/* handle notes */
tp = _threadmalloc(sizeof(*tp), 1);
*procp = tp;
tp->sysproc.fgrp = _threadmalloc(sizeof(Fgrp), 1);
pm_incref(&tp->sysproc.fgrp->ref);
if(_threaddebuglevel){
fd = open("/dev/cons", OWRITE);
dup(fd, 2);
}
rfork(RFCNAMEG);
rendezvous(0, 0); /* wait for parent to switch note groups */
passerpid = getpid();
sprint(buf, "#p/%d/notepg", mainpid); /* notepg of main process's note group */
notefd = open(buf, OWRITE|OCEXEC);
_threaddebug(DBGNOTE, "Passer is %d", passerpid);
rendezvous(1, 0); /* tell parent we've opened notepg, can continue */
for(;;){
if(sleep(120*1000) < 0){
errstr(err, sizeof(err));
if (strcmp(err, "interrupted") != 0)
break;
}
}
_threaddebug(DBGNOTE, "Passer %d exits", passerpid);
exits("interrupted");
default:
/* put main process in a different note group */
rfork(RFNOTEG);
notify(kidnotifier);
rendezvous(0, 0); /* tell child we switched note groups */
rendezvous(1, 0); /* wait for child to open notepg */
return;
}
}
typedef struct Uchan Uchan;
struct Uchan
{
int sysfd;
Cname* syspath;
};
/*
* The general structure of procsched is system-independent,
* but the system-dependent parts are special enough that it's
* very hard to tease them out into separate functions.
*/
static void
procsched(Tproc *self)
{
char exitstr[ERRMAX];
Chan *c;
Uchan *u;
int action, i, r;
Fgrp *f;
Execproc *e;
Tproc *p;
int KNOWS_ABOUT_STRUCT_UCHAN_FROM_DEVFS_C;
_threaddebug(DBGPROC, "procsched self=%p", self);
Top:
self->pid = getpid();
self->sysproc.pid = self->pid;
r = ~0;
for(;;){
_threaddebug(DBGPROC, "procsched run", self->pid, self->curthread->id);
switch(action=_threadswtch(self->oldenv, self->curthread->env, r)){
default: /* can't happen */
threadprint(2, "runproc, can't happen: %d\n", action);
threadassert(0);
case DOEXEC: /* fork off a program, return pid */
self = _threadgetproc();
_threaddebug(DBGPROC, "doexec");
e = &self->execproc;
rfork(RFFDG|RFENVG);
f = pm_getproc()->fgrp;
if(f->maxfd > 0){
for(i=0; i<3; i++){
c = f->fd[i];
if(c==nil)
{
_threaddebug(DBGPROC, "pm fd %d is closed", i);
continue;
}
u = c->aux;
assert(u->sysfd >= 0);
while(u->sysfd != i && u->sysfd < 3)
u->sysfd = dup(u->sysfd, -1);
_threaddebug(DBGPROC, "pm fd %d => sysfd %d", i, u->sysfd);
}
for(i=0; i<3; i++){
c = f->fd[i];
if(c==nil){
close(i);
continue;
}
u = c->aux;
if(u->sysfd != i){
_threaddebug(DBGPROC, "sysfd %d => %d", u->sysfd, i);
dup(u->sysfd, i);
}
}
for(i=3; i<100; i++)
close(i);
}
switch(r = rfork(RFPROC|RFREND|RFNOTEG|RFFDG)){
case -1:
pm_oserror();
pm_sysfatal("thread doexec: rfork: %r");
case 0:
exec(e->file, e->arg);
pm_oserror();
_threaddebug(DBGPROC, "exec %s [argv0=%s] failed: %r",
e->file, e->arg[0]);
exits("exec");
default:
continue; /* return r to caller */
}
/* not reached */
abort();
case DOPROC: /* start a new Tproc running */
self = _threadgetproc();
_threaddebug(DBGPROC, "doproc");
p = self->arg;
r = p->curthread->id;
switch(rfork(p->rforkflag|RFPROC|RFMEM|RFNOWAIT)){
case -1:
pm_oserror();
pm_sysfatal("thread doproc: rfork: %r");
case 0:
/*
* procsched(p), but if we did that we would
* use unbounded amounts of stack to set up
* a linear process tree. The goto avoids growing
* the stack.
*/
self = p;
*procp = self;
goto Top;
default:
continue; /* return r to caller */
}
/* not reached */
abort();
case DOEXIT: /* destroy this proc */
self = _threadgetproc();
_threaddebug(DBGPROC, "doexit");
/* remove ourself from the list of active procs */
lock(&_threadpq.lk);
if(_threadpq.head == self){
_threadpq.head = self->next;
if(_threadpq.tail == self){
_threadpq.tail = nil;
/* no procs left, signal the passer to exit */
_threadsignalpasser();
}
}else{
for(p=_threadpq.head; p->next; p=p->next){
if(p->next == self){
p->next = self->next;
if(_threadpq.tail == self)
_threadpq.tail = p;
break;
}
}
}
unlock(&_threadpq.lk);
/*
* Clean up and exit. Remember that we're on the
* system stack segment (not a malloced stack),
* thus it is okay to free self and all associated data.
*/
utfecpy(exitstr, exitstr+sizeof exitstr, self->exitstr);
_freeproc(self);
_exits(exitstr);
}
/* not reached */
abort();
}
}
/*
* Threadtimes is the equivalent of the times() library call.
* On Plan 9, using times() burns you because we have closed
* times's file descriptor out from under it.
*/
static char*
skip(char *p)
{
while(*p == ' ')
p++;
while(*p != ' ' && *p != 0)
p++;
return p;
}
long
_threadtimes(long *t)
{
char b[200], *p;
int f;
ulong r;
memset(b, 0, sizeof(b));
f = open("/dev/cputime", OREAD|OCEXEC);
if(f < 0)
return 0;
if(read(f, b, sizeof(b)) <= 0){
close(f);
return 0;
}
p = b;
if(t)
t[0] = atol(p);
p = skip(p);
if(t)
t[1] = atol(p);
p = skip(p);
r = atol(p);
if(t){
p = skip(p);
t[2] = atol(p);
p = skip(p);
t[3] = atol(p);
}
return r;
}
void
_threadsignal(void)
{
pm_postnote(PNGROUP, getpid(), "threadsignal");
}
void
_threadsignalpasser(void)
{
pm_postnote(PNPROC, passerpid, "threadsignal");
}
void
pm_nap(void)
{
while(rendezvous(getpid(), 0) == ~0)
;
}
void
pm_wake(Proc *p)
{
rendezvous(p->pid, 0);
}
void
_procexecwait(int pid)
{
Channel *c;
Waitmsg *w, *nw;
close(0);
close(1);
_threaddebug(DBGCHLD, "Proc %d waiting for exec-ed child %d", getpid(), pid);
for(;;){
while((w = wait()) == nil){
char e[ERRMAX];
rerrstr(e, sizeof e);
if(strstr(e, "interrupted") == nil)
threadassert(0);
}
_threaddebug(DBGCHLD, "Child %d exited", w->pid);
if(w->pid == pid)
break;
free(w);
}
if ((c = _threadwaitchan) != nil) { /* global is exposed */
_threaddebug(DBGCHLD, "Sending exit status for exec: pid %d, waiter pid %d, status %s", pid, getpid(), w->msg);
/*
* w is malloced with the system malloc.
* we need to pass a pointer that is malloced with the 9pm malloc.
*/
nw = _threadmalloc(sizeof(*w)+strlen(w->msg)+1, 1);
*nw = *w;
nw->msg = (char*)&nw[1];
strcpy(nw->msg, w->msg);
sendp(c, nw);
}
free(w);
}
Proc*
pm_getproc(void)
{
if(procp && *procp)
return &(*procp)->sysproc;
return &thesysproc;
}
Tproc*
_threadgetproc(void)
{
return *procp;
}
Thread*
_threadgetthr(void)
{
return (*procp)->curthread;
}
void
_threaddebug(ulong l, char *fmt, ...)
{
va_list arg;
Tproc *p;
int n;
char buf[256];
p = _threadgetproc();
if ((l & _threaddebuglevel) == 0) return;
if(p->curthread)
n = pm_sprint(buf, "%d.%d ", p->pid, p->curthread->id);
else
n = pm_sprint(buf, "%d.nothread ", p->pid);
va_start(arg, fmt);
n = pm_doprint(buf+n, buf+sizeof(buf)-1, fmt, arg) - buf;
va_end(arg);
buf[n] = '\n';
write(2, buf, n+1);
}
void
_threadassert(char *s)
{
char buf[256];
Tproc *p;
p = _threadgetproc();
if(p && p->curthread){
pm_sprint(buf, "%d.%d ", p->pid, p->curthread->id);
write(2, buf, strlen(buf));
}
pm_snprint(buf, sizeof(buf), "libthread: %s: assertion failed\n", s);
write(2, buf, strlen(buf));
notify(nil);
abort();
}
syntax highlighted by Code2HTML, v. 0.9.1