#include <u.h>
#include <libc.h>
#define NODEFINE
#include <9pm/u.h>
#include <9pm/libc.h>
#include <9pm/ns.h>
#include <9pm/thread.h>
#include <9pm/threadimpl.h>

static void forkpasser(void);
static void procsched(Tproc*);
int mainstacksize;
static int passerpid;
static int notefd;	/* for use only by passer */
static Tproc **procp;
static Proc thesysproc;

typedef struct Mainarg Mainarg;
struct Mainarg
{
	int argc;
	char **argv;
};

static void
mainlauncher(void *arg)
{
	Mainarg *a;

	a = arg;
	threadmain(a->argc, a->argv);
}

void
pm_main(int argc, char *argv[])
{
	int i, sz;
	char **nargv, *s;
	Mainarg *a;
	Tproc *p;
	Tproc *mainprocp;
	extern void (*pm__sysfatal)(char *fmt, va_list arg);

//_threaddebuglevel = DBGPROC;
	mainprocp = nil;
	procp = &mainprocp;

	pm__qlockinit(_threadrendezvous);
	pm__sysfatal = _threadsysfatal;
	pm_quotefmtinstall();	/* for chanprint */
	if(mainstacksize == 0)
		mainstacksize = 512*1024;

	/* Fork off the passer, set up the process groups. */
	forkpasser();

	/*
	 * On Plan 9, the system stack is not shared.
	 * Argv is pretty much the only pointer that a threaded
	 * program would see that points at unshared memory,
	 * so we might as well copy the arguments into shared
	 * memory.
	 */
	sz = sizeof(char*)*(argc+1);
	for(i=0; i<argc; i++)
		sz += strlen(argv[i])+1;
	nargv = _threadmalloc(sz, 0);
	s = (char*)(nargv+argc+1);
	for(i=0; i<argc; i++){
		strcpy(s, argv[i]);
		nargv[i] = s;
		s += strlen(s)+1;
	}
	nargv[i] = nil;
	a = _threadmalloc(sizeof *a, 0);
	a->argc = argc;
	a->argv = nargv;

	p = _newproc(mainlauncher, a, mainstacksize, "threadmain", 0, 0);
assert(p->curthread != nil);
	*procp = p;
	p->sysproc = thesysproc;
	_threaddebug(DBGPROC, "calling procsched");
	procsched(p);
	/* not reached */
}

/*
 * The main process runs threadmain() but sits in a different note group
 * so the library can use notes internally without disturbing other processes.
 * A child process is created, the "passer", and left in the
 * original group to pass interrupts, etc. on to the main group.  Synchronization
 * between the two processes across the fork guarantees that the original
 * note group survives, in case anyone has its notepg file open.  There is
 * a race, though, as to whether someone using the main process's pid
 * as the route to the notepg file will find the original group or the new one.
 */
static int
notehandler(void*, char *note)
{
	_threaddebug(DBGNOTE, "Notepasser %d got note %s", passerpid, note);
	if(pm_strcmp(note, "threadsignal") == 0){
		if(_threadnonotes)
			_exits("no notes");
		write(notefd, note, strlen(note));
		if(_threadexitsallstatus)
			_exits(_threadexitsallstatus);
		return 1;
	}
	return 0;
}

static void
kidnotifier(void *a, char *note)
{
	USED(a);

	/*
	 * YOU CANNOT PRINT IN THIS FUNCTION.
	 * Printing will lock the fgrp to acquire a Chan*,
	 * which will fail; if the proc happened to hold this
	 * lock already we'll deadlock.
	 *
	 * Atnotify-registered handlers get run by notehandler,
	 * which runs only in the passer (which doesn't hold locks).
	 * We only do internal thread stuff here.
	 *
	 * _threaddebug is okay, since it bypasses the 9pm library.
	 */
	_threaddebug(DBGNOTE, "note '%s' status '%s'", note, _threadexitsallstatus);
	if(strcmp(note, "threadsignal") == 0){
		if(_threadexitsallstatus)
			_exits(_threadexitsallstatus);
		/*
		 * The other reason to get signaled is threadkillgrp
		 * or threadkill.  If a non-current thread is to die, it
		 * will die next time it would be scheduled.  If the current
		 * thread is to die, it will do so once it yields.  The signal
		 * may have interrupted an I/O operation.
		 */
		noted(NCONT);
	}
	noted(NDFLT);
}

static void
forkpasser(void)
{
	char buf[40], err[32];
	int fd, mainpid;
	Tproc *tp;

	mainpid = getpid();
	rfork(RFREND);
	pm_atnotify(notehandler, 1);
	switch(rfork(RFPROC|RFMEM|RFNAMEG|RFCFDG)){
	case -1:
		threadassert(0);
	case 0:
		/* handle notes */
		tp = _threadmalloc(sizeof(*tp), 1);
		*procp = tp;
		tp->sysproc.fgrp = _threadmalloc(sizeof(Fgrp), 1);
		pm_incref(&tp->sysproc.fgrp->ref);
		if(_threaddebuglevel){
			fd = open("/dev/cons", OWRITE);
			dup(fd, 2);
		}
		rfork(RFCNAMEG);
		rendezvous(0, 0);	/* wait for parent to switch note groups */
		passerpid = getpid();
		sprint(buf, "#p/%d/notepg", mainpid);	/* notepg of main process's note group */
		notefd = open(buf, OWRITE|OCEXEC);
		_threaddebug(DBGNOTE, "Passer is %d", passerpid);
		rendezvous(1, 0);	/* tell parent we've opened notepg, can continue */
		for(;;){
			if(sleep(120*1000) < 0){
				errstr(err, sizeof(err));
				if (strcmp(err, "interrupted") != 0)
					break;
			}
		}
		_threaddebug(DBGNOTE, "Passer %d exits", passerpid);
		exits("interrupted");

	default:
		/* put main process in a different note group */
		rfork(RFNOTEG);
		notify(kidnotifier);
		rendezvous(0, 0);	/* tell child we switched note groups */
		rendezvous(1, 0);	/* wait for child to open notepg */
		return;
	}
}

typedef struct Uchan	Uchan;
struct Uchan
{
	int	sysfd;
	Cname*	syspath;
};
/*
 * The general structure of procsched is system-independent,
 * but the system-dependent parts are special enough that it's
 * very hard to tease them out into separate functions.
 */
static void
procsched(Tproc *self)
{
	char exitstr[ERRMAX];
	Chan *c;
	Uchan *u;
	int action, i, r;
	Fgrp *f;
	Execproc *e;
	Tproc *p;

	int KNOWS_ABOUT_STRUCT_UCHAN_FROM_DEVFS_C;

	_threaddebug(DBGPROC, "procsched self=%p", self);

Top:
	self->pid = getpid();
	self->sysproc.pid = self->pid;
	r = ~0;
	for(;;){
		_threaddebug(DBGPROC, "procsched run", self->pid, self->curthread->id);
		switch(action=_threadswtch(self->oldenv, self->curthread->env, r)){

		default:		/* can't happen */
			threadprint(2, "runproc, can't happen: %d\n", action);
			threadassert(0);

		case DOEXEC:	/* fork off a program, return pid */
			self = _threadgetproc();
			_threaddebug(DBGPROC, "doexec");
			e = &self->execproc;
			rfork(RFFDG|RFENVG);
			f = pm_getproc()->fgrp;
			if(f->maxfd > 0){
				for(i=0; i<3; i++){
					c = f->fd[i];
					if(c==nil)
{
_threaddebug(DBGPROC, "pm fd %d is closed", i);
						continue;
}
					u = c->aux;
					assert(u->sysfd >= 0);
					while(u->sysfd != i && u->sysfd < 3)
						u->sysfd = dup(u->sysfd, -1);
_threaddebug(DBGPROC, "pm fd %d => sysfd %d", i, u->sysfd);
				}
				for(i=0; i<3; i++){
					c = f->fd[i];
					if(c==nil){
						close(i);
						continue;
					}
					u = c->aux;
					if(u->sysfd != i){
_threaddebug(DBGPROC, "sysfd %d => %d", u->sysfd, i);
						dup(u->sysfd, i);
					}
				}
				for(i=3; i<100; i++)
					close(i);
			}
			switch(r = rfork(RFPROC|RFREND|RFNOTEG|RFFDG)){
			case -1:
				pm_oserror();
				pm_sysfatal("thread doexec: rfork: %r");
			case 0:
				exec(e->file, e->arg);
				pm_oserror();
				_threaddebug(DBGPROC, "exec %s [argv0=%s] failed: %r",
						e->file, e->arg[0]);
				exits("exec");
			default:
				continue;		/* return r to caller */
			}
			/* not reached */
			abort();

		case DOPROC:	/* start a new Tproc running */
			self = _threadgetproc();
			_threaddebug(DBGPROC, "doproc");
			p = self->arg;
			r = p->curthread->id;
			switch(rfork(p->rforkflag|RFPROC|RFMEM|RFNOWAIT)){
			case -1:
				pm_oserror();
				pm_sysfatal("thread doproc: rfork: %r");
			case 0:
				/*
				 * procsched(p), but if we did that we would
				 * use unbounded amounts of stack to set up
				 * a linear process tree.  The goto avoids growing
				 * the stack.
				 */
				self = p;
				*procp = self;
				goto Top;
			default:
				continue;		/* return r to caller */
			}
			/* not reached */
			abort();

		case DOEXIT:	/* destroy this proc */
			self = _threadgetproc();
			_threaddebug(DBGPROC, "doexit");

			/* remove ourself from the list of active procs */
			lock(&_threadpq.lk);
			if(_threadpq.head == self){
				_threadpq.head = self->next;
				if(_threadpq.tail == self){
					_threadpq.tail = nil;
					/* no procs left, signal the passer to exit */
					_threadsignalpasser();
				}
			}else{
				for(p=_threadpq.head; p->next; p=p->next){
					if(p->next == self){
						p->next = self->next;
						if(_threadpq.tail == self)
							_threadpq.tail = p;
						break;
					}
				}
			}
			unlock(&_threadpq.lk);

			/*
			 * Clean up and exit.  Remember that we're on the	
			 * system stack segment (not a malloced stack),
			 * thus it is okay to free self and all associated data.
			 */
			utfecpy(exitstr, exitstr+sizeof exitstr, self->exitstr);
			_freeproc(self);
			_exits(exitstr);
		}
		/* not reached */
		abort();
	}
}

/*
 * Threadtimes is the equivalent of the times() library call.
 * On Plan 9, using times() burns you because we have closed
 * times's file descriptor out from under it.
 */
static char*
skip(char *p)
{

	while(*p == ' ')
		p++;
	while(*p != ' ' && *p != 0)
		p++;
	return p;
}

long
_threadtimes(long *t)
{
	char b[200], *p;
	int f;
	ulong r;

	memset(b, 0, sizeof(b));
	f = open("/dev/cputime", OREAD|OCEXEC);
	if(f < 0)
		return 0;
	if(read(f, b, sizeof(b)) <= 0){
		close(f);
		return 0;
	}
	p = b;
	if(t)
		t[0] = atol(p);
	p = skip(p);
	if(t)
		t[1] = atol(p);
	p = skip(p);
	r = atol(p);
	if(t){
		p = skip(p);
		t[2] = atol(p);
		p = skip(p);
		t[3] = atol(p);
	}
	return r;
}

void
_threadsignal(void)
{
	pm_postnote(PNGROUP, getpid(), "threadsignal");
}

void
_threadsignalpasser(void)
{
	pm_postnote(PNPROC, passerpid, "threadsignal");
}

void
pm_nap(void)
{
	while(rendezvous(getpid(), 0) == ~0)
		;
}

void
pm_wake(Proc *p)
{
	rendezvous(p->pid, 0);
}

void
_procexecwait(int pid)
{
	Channel *c;
	Waitmsg *w, *nw;

	close(0);
	close(1);
	_threaddebug(DBGCHLD, "Proc %d waiting for exec-ed child %d", getpid(), pid);
	for(;;){
		while((w = wait()) == nil){
			char e[ERRMAX];
			rerrstr(e, sizeof e);
			if(strstr(e, "interrupted") == nil)
				threadassert(0);
		}
		_threaddebug(DBGCHLD, "Child %d exited", w->pid);
		if(w->pid == pid)
			break;
		free(w);
	}
	if ((c = _threadwaitchan) != nil) {	/* global is exposed */
		_threaddebug(DBGCHLD, "Sending exit status for exec: pid %d, waiter pid %d, status %s", pid, getpid(), w->msg);
		/*
		 * w is malloced with the system malloc.
		 * we need to pass a pointer that is malloced with the 9pm malloc.
		 */
		nw = _threadmalloc(sizeof(*w)+strlen(w->msg)+1, 1);
		*nw = *w;
		nw->msg = (char*)&nw[1];
		strcpy(nw->msg, w->msg);
		sendp(c, nw);
	}
	free(w);
}

Proc*
pm_getproc(void)
{
	if(procp && *procp)
		return &(*procp)->sysproc;
	return &thesysproc;
}

Tproc*
_threadgetproc(void)
{
	return *procp;
}

Thread*
_threadgetthr(void)
{
	return (*procp)->curthread;
}


void
_threaddebug(ulong l, char *fmt, ...)
{
	va_list arg;
	Tproc *p;
	int n;
	char buf[256];

	p = _threadgetproc();
	if ((l & _threaddebuglevel) == 0) return;
	if(p->curthread)
		n = pm_sprint(buf, "%d.%d ", p->pid, p->curthread->id);
	else
		n = pm_sprint(buf, "%d.nothread ", p->pid);

	va_start(arg, fmt);
	n = pm_doprint(buf+n, buf+sizeof(buf)-1, fmt, arg) - buf;
	va_end(arg);
	buf[n] = '\n';
	write(2, buf, n+1);
}

void
_threadassert(char *s)
{
	char buf[256];
	Tproc *p;

	p = _threadgetproc();
	if(p && p->curthread){
		pm_sprint(buf, "%d.%d ", p->pid, p->curthread->id);
		write(2, buf, strlen(buf));
	}
	pm_snprint(buf, sizeof(buf), "libthread: %s: assertion failed\n", s);
	write(2, buf, strlen(buf));
	notify(nil);
	abort();
}


syntax highlighted by Code2HTML, v. 0.9.1