/*
 *	Copyright 1988 by Rayan S. Zachariassen, all rights reserved.
 *	This will be free software, but only when it is finished.
 */
/*
 *	Lots of modifications (new guts, more or less..) by
 *	Matti Aarnio <mea@nic.funet.fi>  (copyright) 1992-2003
 */


#include "hostenv.h"
#include <sfio.h>
#include <sys/param.h>
#include "mail.h"
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#endif
#include <sys/stat.h>
#include <errno.h>
#include <sys/file.h>
#include "zmsignal.h"
/* #include <stdlib.h> */
#include <unistd.h>

#include "zsyslog.h"
#include <sysexits.h>
#ifdef HAVE_SYS_RESOURCE_H
#include <sys/resource.h>
#endif
#ifdef HAVE_SYS_UN_H
#include <sys/socket.h>
#include <sys/un.h>
#endif

#include "scheduler.h"
#include "prototypes.h"

#include "ta.h"
#include "libz.h"

#include "libc.h"

extern int forkrate_limit;
extern int freeze;
extern int mailqmode;

static int  scheduler_nofiles = -1; /* Will be filled below */
static int  runcommand   __((char * const argv[], char * const env[], struct vertex *, struct web *, struct web*));
static void stashprocess __((int, int, int, struct web*, struct web*, struct vertex *, char * const argv[]));
static void reclaim      __((int, int));
static void waitandclose __((int));
static void readfrom     __((int));

/* These two are *not* exactly proper ones, but at least at most
   systems it links properly without wider exportation of related
   data structure types.  */

extern int syncweb __((void *));
extern void *dirq;


extern FILE *vfp_open __((struct ctlfile *));

#ifdef HAVE_SYS_WAIT_H /* POSIX.1 compatible */
# include <sys/wait.h>
#else /* Not POSIX.1 compatible, lets fake it.. */
extern int wait();
#endif

#ifndef WEXITSTATUS
# define WEXITSTATUS(s) (((s) >> 8) & 0377)
#endif
#ifndef WSIGNALSTATUS
# define WSIGNALSTATUS(s) ((s) & 0177)
#endif

struct procinfo *cpids = NULL;

#define	MAXFILESPERTRANSPORT	1000

int	numkids;
int	readsockcnt;		/* Count how many childs to read there are;
				   this for the SLOW Shutdown */

int	notifysocket = -1;	/* fd of UDP/AF_UNIX socket to listen for notifies */


static void cmdbufalloc __((int, char **, int *));

static void
cmdbufalloc(newlen, bufp, spcp)
     int newlen;
     char **bufp;
     int *spcp;
{
  if (*bufp == NULL) {
    *bufp = emalloc(newlen+1);
    *spcp = newlen;
  }
  if (newlen > *spcp) {
    *bufp = erealloc(*bufp, newlen+1);
    *spcp = newlen;
  }
}

extern int errno;
extern int slow_shutdown;



/* 
 * Flush to child;
 * return -1 for failures and incomplete writes, 0 for success!
 */
static int  flush_child     __((struct procinfo *cpidp));

static int
flush_child(proc)
     struct procinfo *proc;
{

	if (proc->pid < 0 && proc->tofd >= 0) {
	  pipes_shutdown_child(proc->tofd);
	  proc->tofd = -1;

	  if (verbose)
	    sfprintf(sfstderr,
		     "%% shutdown of proc=%p pid=%d flush_child() pid<0 && tofd>=0\n",
		     proc, proc->pid);

	}
	if (proc->tofd < 0) {
	  proc->cmdlen = 0;
	  proc->state = CFSTATE_ERROR;

	  if (verbose)
	    sfprintf(sfstderr,
		     "%% disconnect of flush_child(proc=%p pid=%d) pid<0 && tofd<0 BAD?? \n",
		     proc, proc->pid);

	  return -1;
	}
#if 0
	/* Make sure the buffer exists.. */
	if (proc->cmdbuf == NULL)
	  cmdbufalloc(120, &proc->cmdbuf, &proc->cmdspc);
#endif
	if (proc->cmdlen == 0 && proc->tofd >= 0)
	  return 0; /* All done! */

	/* We have some leftovers from previous feed..
	   .. feed them now.  */

	/* sfprintf(sfstderr,
	   "flushing to child pid %d, cmdlen=%d, cmdbuf='%s'\n",
	   proc->pid, proc->cmdlen, proc->cmdbuf);  */

	while (proc->tofd >= 0 && proc->cmdlen > 0) {

	  int rc = write(proc->tofd, proc->cmdbuf, proc->cmdlen);

	  if (rc < 0 && (errno != EAGAIN && errno != EINTR &&
			 errno != EWOULDBLOCK)) {
	    int e = errno;
	    /* Some real failure :-( */
	    pipes_shutdown_child(proc->tofd);
	    proc->tofd = -1;
	    proc->cmdlen = 0;
	    proc->state = CFSTATE_ERROR;

	    if (verbose)
	      sfprintf(sfstderr,
		       "%% shutdown of proc=%p pid=%d flush_child() write() errno=%d\n",
		       proc, proc->pid, e);

	    return -1;
	  }
	  if (rc > 0) {
	    memcpy(proc->cmdbuf, proc->cmdbuf + rc, proc->cmdlen - rc);
	    proc->feedtime = now;
	  } else
	    break; /* Zero or negative.. let the writing
		      proceed later... */
	  proc->cmdlen -= rc;
	}
	if (proc->cmdlen) return 1; /* Incomplete write */
	return 0;
}


/*
 * The 'feed_child()' sends whatever is pointed by   proc->pthread->nextfeed;
 * return -1 for failures or "EOT", 1 for incomplete writes, 0 for success!
 */

static int  feed_child      __((struct procinfo *cpidp));

static int
feed_child(proc)
     struct procinfo *proc;
{
	struct vertex *vtx;
	int cmdlen;

	static char *cmdbuf = NULL;
	static int cmdbufspc = 0;

	if (proc->pthread == NULL) {
	  proc->state = CFSTATE_ERROR;
	  pipes_shutdown_child(proc->tofd);
	  proc->tofd = -1;

	  if (verbose)
	    sfprintf(sfstderr,
		     "%% shutdown of proc=%p pid=%d feed_child() proc->pthread == NULL\n",
		     proc, proc->pid);

	  return -1; /* BUG if called without next THREAD.. */
	}
	if (proc->pthread->nextfeed == NULL) {

	  if (verbose)
	    sfprintf(sfstderr,
		     "%% feed_child(proc=%p pid=%d) proc->pthread->nextfeed == NULL\n",
		     proc, proc->pid);

	  return -1; /* Might be called without next thing to process.. */
	}
	if (proc->pid <= 0 || proc->tofd < 0) {
	  proc->state = CFSTATE_ERROR;

	  if (verbose)
	    sfprintf(sfstderr,
		     "%% feed_child(proc=%p pid=%d tofd=%d) pid<0 || tofd<0 BAD??\n",
		     proc, proc->pid, proc->tofd);

	  return -1; /* No process../No write channel.. */
	}

	vtx = proc->pthread->nextfeed;

	mytime(&now);
#if 0
	if (vtx->lastfeed + 10 >= now)
	  return -1; /* Force at least 10 seconds in between feeds! */
#endif
	vtx->lastfeed = now;

	if (slow_shutdown) {

	  cmdlen = 1;
	  cmdbufalloc(cmdlen, & cmdbuf, & cmdbufspc);
	  strcpy(cmdbuf,"\n");

	} else {

	  const char *d = cfpdirname(vtx->cfp->dirind);
	  if (proc->thg->withhost) { /* cmd-line was with host */
	    cmdlen = 1 + strlen(d) + strlen(vtx->cfp->mid);
	    cmdbufalloc(cmdlen, & cmdbuf, & cmdbufspc);
	    sprintf(cmdbuf, "%s%s\n", d, vtx->cfp->mid);
	  } else {
	    cmdlen = 2+strlen(d)+strlen(vtx->cfp->mid)+strlen(proc->ho->name);
	    cmdbufalloc(cmdlen, & cmdbuf, & cmdbufspc);
	    sprintf(cmdbuf, "%s%s\t%s\n", d, vtx->cfp->mid, proc->ho->name);
	  }

	}

	if ((proc->cmdlen + cmdlen) >= proc->cmdspc)
	  cmdbufalloc(proc->cmdlen + cmdlen + 1, &proc->cmdbuf, &proc->cmdspc);

	/* Ok, copy it there.. */
	memcpy(proc->cmdbuf + proc->cmdlen, cmdbuf, cmdlen+1);
	proc->cmdlen += cmdlen;

	if (verbose) {
	  sfprintf(sfstdout,
		   "feed: tofd=%d, chan='%s', proc=%p, vtx=%p thr=%p ",
		   proc->tofd, proc->ch->name, proc, vtx, vtx->thread);
	}

	if (vtx->cfp->vfpfn != NULL) {
	  Sfio_t *vfp = vfp_open(vtx->cfp);
	  int i;
	  if (vfp) {
	    sfprintf(vfp, "Feeding to child; ce.argv = \"");
	    for (i = 0; vtx->thgrp->ce.argv[i] != NULL; ++i) {
	      if (i == 0)
		sfprintf(vfp, "'%s'", vtx->thgrp->ce.argv[i]);
	      else
		sfprintf(vfp, " '%s'", vtx->thgrp->ce.argv[i]);
	    }
	    sfprintf(vfp, "\" chan = '%s' cmd: %s", proc->ch->name, cmdbuf);
	    sfclose(vfp);
	  }
	}

	vtx->ce_pending  = 0; /* and clear the pending..       */
	/* DON'T double-count attempts on vertices!
	   This gets counted when some diagnostics is received for
	   this vertex. */
	/* vtx->attempts   += 1; */
	
	/* It was fed (to buffer), clear this flag.. */
	proc->overfed        += 1;
	proc->pthread->unfed -= 1;

	if (verbose)
	  sfprintf(sfstdout,"len=%d buf=%s", cmdlen, cmdbuf);

	mytime(&proc->feedtime);


	pick_next_vertex(proc);

	return flush_child(proc);
}


char *proc_state_names[] = {
  "ERROR", "LARVA", "STUFF", "FINISH", "IDLE"
};

void
ta_hungry(proc)
     struct procinfo *proc;
{
	/* This is an "actor model" behaviour,
	   where actor tells, when it needs a new
	   job to be fed to it. */

	struct thread *thr0;
	int i;

	mytime(&proc->hungertime);

	if (verbose)
	  sfprintf(sfstdout,"%% ta_hungry(%p) OF=%d S=%s tofd=%d\n",
		   proc, proc->overfed, proc_state_names[proc->state],
		   proc->tofd);

	/* If ``proc->tofd'' is negative, we can't feed anyway.. */

	if (proc->tofd < 0) {

	  --proc->overfed;

	  if (proc->overfed == 0 && proc->pthread)
	    /* If we have a thread here still, reschedule it... */
	    if (! thread_reschedule(proc->pthread,0,-1))
	      proc->pthread = NULL;

	  if (proc->pthread)
	    if (delete_thread(proc->pthread))
	      proc->pthread = NULL;

	  return;
	}

	/* We have valid child-feed state */

	proc->overfed -= 1;

	switch(proc->state) {
	case CFSTATE_LARVA: /* "1" */

	  /* Thread selected already along with its first vertex,
	     which is pointer by  proc->pthread->nextfeed  */

	  /* However if we have multiple parallel processes
	     in single thread, we may end up driving a new
	     process straight from LARVA to IDLE... */

	  proc->overfed = 0; /* Should not need setting ... */

	  if (proc->pthread->nextfeed != NULL &&
	      feed_child(proc) < 0)
	    /* We got some error :-/  D'uh! */
	    goto feed_error_handler;

	  proc->state = CFSTATE_STUFFING;
	  /* Not YET drained.. */
	  if (proc->pthread->nextfeed)
	    return;

	case CFSTATE_STUFFING: /* "2" */

	  if (proc->overfed > 0) return;

	  if (proc->pthread && proc->pthread->nextfeed) {

	    while ((proc->state == CFSTATE_STUFFING) &&
		   proc->pthread && proc->pthread->nextfeed) {

	      /* As long as:
		 - we have next vertex to feed
		 - there is no command buffer backlog
		 - state stays in STUFFING
	      */

	      i = feed_child(proc);
	      if (i < 0)
		goto feed_error_handler; /* Outch! */

	      if (proc->tofd >= 0 && proc->cmdlen != 0)
		break; /* Incomplete feed -- stop feeding here */

	      if (proc->overfed > proc->thg->ce.overfeed)
		break; /* Or over limit ...       */

	    }
	    /* As long as we have things to feed (or have fed anything!),
	       we stay in the STUFFING state.  This was if we later get
	       new workitems,  thread_linkin() can simply put them into
	       the 'nextfeed' pointer, and we feed them right away. */
	    return;
	  }

	  /* Didn't have anything to feed :-( */

	  proc->state = CFSTATE_FINISHING;
	  /* FALL THRU! */

	case CFSTATE_FINISHING: /* "3" */

	  /* "retryat" may have kicked us into this state also.. */

	  if (proc->overfed > 0) return;

	  /* Well, this thread was done, so long!
	     This TA may have other things to poke at! */

	  thr0 = proc->pthread;

	  /* proc->pthread = NULL; */
	  /* ^^^  NO KILL YET! pick_next_thread() needs this! */

	  /* Disconnect the previous thread from the proc. */

	  thr0->thrkids -= 1;

	  if (thr0->proc == proc) /* Thread Process Chain Leader */
	      thr0->proc = proc->pnext;

	  /* Disconnect from process chain */
	  if (proc->pnext) proc->pnext->pprev = proc->pprev;
	  if (proc->pprev) proc->pprev->pnext = proc->pnext;
	  proc->pnext = proc->pprev = NULL;

	  /* Next: either the thread changes, or
	     the process moves into IDLE state. */

	  if (pick_next_thread(proc)) {

	    /* We have WORK !  We are reconnected to the new thread! */
	    /* Picked a new thread, which isn't same as THR0! */

	    /* Picked something, reschedule the old thread,
	       and if it got destroyed (by expiry) loose it... */
	    if (thr0 && !thread_reschedule(thr0, 0, -1))
	      thr0 = NULL;
	    if (thr0) delete_thread(thr0); /* possibly kill it if no more
					      things in it active! */
	    /* thr0 = NULL; -- this is dead variable.. */

	    if (verbose)
	      sfprintf(sfstdout, "%% pick_next_thread(proc=%p) gave thread %p\n",
		       proc, proc->pthread);

	    /* DON'T double-count attempts on threads!
	       This gets counted when some diagnostic is received
	       for this thing.. */
	    /* proc->pthread->attempts += 1; */

	    if (feed_child(proc))
	      /* non-zero return means things went wrong somehow.. */
	      goto feed_error_handler;

	    proc->state = CFSTATE_STUFFING;
	    return;
	  }

	  /* proc->pthread == NULL  now */
	  /* thr0->thrkids has been decremented */
	  /* No work in sight, queue up '#idle\n' string. */

	  if (thr0) {

	    /* Previously we were disconnected from the thread,
	     now join back so that IDLE will disconnect us... */

	    proc->pthread = thr0;
	    thr0->thrkids += 1;
	    proc->pnext   = thr0->proc;
	    if (proc->pnext) proc->pnext->pprev = proc;
	    thr0->proc = proc;
	  }

	  if ((proc->cmdlen + 7) >= proc->cmdspc) {
	    cmdbufalloc(proc->cmdlen+7, &proc->cmdbuf, &proc->cmdspc);
	  }
	  if (slow_shutdown) {
	    proc->cmdbuf[ proc->cmdlen ] = '\n';
	    proc->cmdlen += 1;
	  } else {
	    memcpy(proc->cmdbuf + proc->cmdlen, "#idle\n", 6);
	    proc->cmdlen += 6;
	  }
	  proc->state = CFSTATE_IDLE;
	  proc->overfed += 1;
	  flush_child(proc);  /* May flip the state to CFSTATE_ERROR */
	  return;

	case CFSTATE_IDLE: /* "4" */
	  /* The process has arrived into IDLE pool! */

	  if (proc->overfed > 0) abort();
	  /* We have seen at least once that this value went NEGATIVE! */

	  if (verbose)
	    sfprintf(sfstdout, " ... IDLE THE PROCESS %p (of=%d).\n",
		     proc, proc->overfed);

	  thr0 = proc->pthread;
	  if (thr0) {

	    proc->pthread = NULL;

	    if (thr0->proc == proc) /* Was chain head */
	      thr0->proc = proc->pnext;

	    thr0->thrkids -= 1;

	  }

	  /* Unlink me from the active chain */
	  if (proc->pnext) proc->pnext->pprev = proc->pprev;
	  if (proc->pprev) proc->pprev->pnext = proc->pnext;

	  proc->pnext = proc->thg->idleproc;
	  if (proc->pnext) proc->pnext->pprev = proc;
	  proc->pprev = NULL;
	  proc->thg->idleproc = proc;

	  proc->thg->idlecnt += 1;

	  ++idleprocs;
	  MIBMtaEntry->sc.TransportAgentsActiveSc -= 1;
	  MIBMtaEntry->sc.TransportAgentsIdleSc   += 1;


	  /* Failed to pick anything, reschedule the old thread.
	     Possibly killed the thread.. */
	  if (thr0) thread_reschedule(thr0, 0, -1);
	  return;

	default: /* CFSTATE_ERROR: "0" */
	  /* Some error was encountered at  feed_child()  at some
	     point, we do nothing! */

	  /* Some (real?) failure :-( */

	feed_error_handler:

	  if (verbose)
	    sfprintf(sfstdout,"%% ta_hungry(%p) OF=%d S=%s tofd=%d\n",
		     proc, proc->overfed, proc_state_names[proc->state],
		     proc->tofd);

	  proc->state = CFSTATE_ERROR;

	  /* We shut-down the child feed pipe */
	  pipes_shutdown_child(proc->tofd);
	  proc->tofd = -1;
	  break;
	}
	return;
}

/*
 * start_child() -- build argv[], and do other inits for fork()ing
 *                  and execve()ing a new transport program for us.
 */

int
start_child(vhead, chwp, howp)
	struct vertex *vhead;
	struct web *chwp, *howp;
{
#define MAXARGC 40
	char * av[1+MAXARGC];
	char * ev[1+MAXARGC];
	char *os, *cp, *ocp, *s;
	char buf[MAXPATHLEN*4];
	char buf2[MAXPATHLEN];

	int	 i, avi, evi;
	static time_t prev_time = 0;
	static int startcnt = 0; /* How many childs per second (time_t tick..) ? */
	time_t this_time;


	if (freeze) {
	  vhead->thread->pending = "Frozen";
	  return 0;
	}

	if (verbose)
	  sfprintf(sfstdout,"transport(vhead=%p,chan=%s,host=%s)\n",
		   vhead,chwp->name,howp->name);

	++startcnt;
	this_time = mytime(NULL);
	if (this_time != prev_time) {
	  startcnt = 0;
	  prev_time = this_time;
	} else if (startcnt > forkrate_limit) {
	  if (verbose)
	    sfprintf(sfstdout," ... too many forks per second!\n");
	  vhead->thread->pending = "ForkRateLimit";
	  return 0;
	}

	if (vhead->thgrp->ce.argv == NULL) {
	  sfprintf(sfstderr, "No command defined for %s/%s!\n",
		  chwp->name, howp->name);
	  vhead->thread->pending = "ConfBUG:NoCmdDefined!";
	  return 0;
	}

	/*
	 * Replace the $host and $channel strings in the command line.
	 * (also any ${ZENV} variable)
	 */
	os = buf;
	avi = evi = 0;
	for (i = 0; vhead->thgrp->ce.argv[i] != NULL; ++i) {
	  if (strcmp(vhead->thgrp->ce.argv[i], replhost) == 0) {
	    av[avi] = howp->name;
	  } else if (strcmp(vhead->thgrp->ce.argv[i], replchannel) == 0) {
	    av[avi] = chwp->name;
	  } else if (strchr(vhead->thgrp->ce.argv[i], '$') != NULL) {
	    s = os;
	    for (cp = vhead->thgrp->ce.argv[i]; *cp != '\0'; ++cp) {
	      if (*cp == '$' && *(cp+1) == '{') {
		cp += 2;
		ocp = cp;
		while (*cp != '\0' && *cp != '}')
		  ++cp;
		if (*cp == '}') {
		  *cp = '\0';
		  if (strcmp(ocp,"host")==0) {
		    strcpy(s,howp->name);
		  } else if (strcmp(ocp,"channel")==0) {
		    strcpy(s,chwp->name);
		  } else {
		    const char *t = getzenv(ocp);
		    if (t)
		      strcpy(s, t);
		  }
		  s += strlen(s);
		  *cp = '}';
		} else
		  --cp;
	      } else
		*s++ = *cp;
	    }
	    *s = '\0';
	    av[avi] = os;
	    os = s + 1;
	  } else
	    av[avi] = vhead->thgrp->ce.argv[i];

	  if (os >= (buf+sizeof(buf))) {
	    sfprintf(sfstderr,"BUFFER OVERFLOW IN ARGV[] SUBSTITUTIONS!\n");
	    sfsync(sfstderr);
	    abort();
	  }

	  if (avi == 0 && strchr(av[0],'=') != NULL) {
	    ev[evi] = av[0];
	    ++evi;
	  } else if (avi == 0 && av[0][0] != '/') {
	    /* Must add ${MAILBIN}/ta/ to be the prefix.. */

	    static const char *mailbin = NULL;

	    if (!mailbin) mailbin = getzenv("MAILBIN");
	    if (!mailbin) mailbin = MAILBIN;

	    sprintf(buf2,"%s/%s/%s", mailbin, qdefaultdir, av[0]);
	    av[avi++] = buf2;
	    if (strlen(buf2) > sizeof(buf2)) {
	      /* Buffer overflow ! This should not happen, but ... */
	      sfprintf(sfstderr,"BUFFER OVERFLOW IN ARGV[0] CONSTRUCTION!\n");
	      sfsync(sfstderr);
	      abort();
	    }
	  } else
	    ++avi;
	  if (avi >= MAXARGC) avi = MAXARGC;
	  if (evi >= MAXARGC) evi = MAXARGC;
	}
	av[avi] = NULL;
	{
	  const char *t;
	  if ((t = getenv("TZ")))
	    ev[evi++] = (char*) t-3; /* Pass the TZ      */
	  if ((t = getzenv("PATH")))
	    ev[evi++] = (char*) t-5; /* Pass the PATH    */
	  if ((t = getzenv("ZCONFIG")))
	    ev[evi++] = (char*) t-8; /* Pass the ZCONFIG */
	}
	ev[evi] = NULL;

	/* fork off the appropriate command with the appropriate stdin */
	if (verbose) {
	  sfprintf(sfstdout,"${");
	  for (i = 0; ev[i] != NULL; ++i)
	    sfprintf(sfstdout," %s", ev[i]);
	  sfprintf(sfstdout," }");
	  for (i = 0; av[i] != NULL; ++i)
	    sfprintf(sfstdout," %s", av[i]);
	  sfprintf(sfstdout,"\n");
	}
	return runcommand(av, ev, vhead, chwp, howp);
}

static int runcommand(argv, env, vhead, chwp, howp)
	char * const argv[];
	char * const env[];
	struct vertex *vhead;
	struct web *chwp, *howp;
{
	int	i, pid, to[2], from[2], uid, gid, prio;
	char	*cmd;
	static int pipesize = 0;


	uid = vhead->thgrp->ce.uid;
	gid = vhead->thgrp->ce.gid;
	cmd = argv[0];
	prio= vhead->thgrp->ce.priority;

	if (pipes_create(to,from) < 0) return 0;
	if (pipesize == 0)
	  pipesize = resources_query_pipesize(to[0]);

	if (verbose)
	  sfprintf(sfstderr, "to %d/%d from %d/%d\n",
		  to[0],to[1],from[0],from[1]);

	pid = fork();
	if (pid == 0) {	/* child */

	  pipes_to_child_fds(to,from);

	  /* keep current stderr for child stderr */
	  /* close all other open filedescriptors */

	  /* ... if detach() did its job, there shouldn't be any! */
	  /* ... no, the 'querysock' is there somewhere!   */
	  if (scheduler_nofiles < 1)
	    scheduler_nofiles = resources_query_nofiles();
	  for (i = 3; i < scheduler_nofiles; ++i)
	    close(i);

#if defined(HAVE_SETPRIORITY) && defined(HAVE_SYS_RESOURCE_H)
	  if (prio >= 80) { /* MAGIC LIMIT VALUE FOR ABSOLUTE SET! */
	    setpriority(PRIO_PROCESS, 0, i - 100);
	  } else
#endif
	    if (prio != 0) {
	      nice(prio);
	    }

	  resources_limit_nofiles(transportmaxnofiles);
	  setgid(gid);	/* Do GID setup while still UID 0..   */
	  setuid(uid);	/* Now discard all excessive powers.. */
	  execve(cmd, argv, env);
	  sfprintf(sfstderr, "Exec of %s failed!\n", cmd);
	  _exit(1);
	} else if (pid < 0) {	/* fork failed - yell and forget it */
	  close(to[0]); close(to[1]);
	  close(from[0]); close(from[1]);
	  sfprintf(sfstderr, "Fork failed!\n");
	  vhead->thread->pending = "System:ForkFailure!";
	  return 0;
	}

	/* parent */

	pipes_close_parent(to,from);

	/* save from[0] away as a descriptor to watch */
	stashprocess(pid, from[0], to[1], chwp, howp, vhead, argv);
	/* We wait for the child to report "#hungry", then we feed it.. */
	return 1;
}


static void stashprocess(pid, fromfd, tofd, chwp, howp, vhead, argv)
	int pid, fromfd, tofd;
	struct web *chwp, *howp;
	struct vertex *vhead;
	char * const argv[];
{
	int i, l, j;
	struct procinfo *proc;

	if (cpids == NULL) {
	  if (scheduler_nofiles < 1)
	    scheduler_nofiles = resources_query_nofiles();
	  i = scheduler_nofiles;
	  cpids = (struct procinfo *)
	    emalloc((unsigned)(i * sizeof (struct procinfo)));
	  memset(cpids, 0, sizeof(struct procinfo) * i);
	}
	proc = &cpids[fromfd];

	/* Free these buffers in case they exist from last use.. */
	if (proc->cmdbuf)  free(proc->cmdbuf);
	if (proc->cmdline) free(proc->cmdline);

	memset(proc,0,sizeof(struct procinfo));
#if 0 /* the memset() does this more efficiently.. */
	proc->pnext   = NULL;
	proc->pprev   = NULL;
	proc->cmdlen = 0;
	proc->reaped = 0;
	proc->carryover = NULL;
#endif

	proc->overfed = 1;
	proc->state   = CFSTATE_LARVA;

	proc->pid     = pid;

	proc->ch      = chwp;
	chwp->kids   += 1;

	proc->ho      = howp;
	howp->kids   += 1;

	proc->pthread            = vhead->thread;

	proc->thg           = vhead->thread->thgrp;
	proc->thg->transporters += 1;
	proc->pthread->thrkids  += 1;

	++numkids;
	MIBMtaEntry->sc.TransportAgentProcessesSc += 1;
	MIBMtaEntry->sc.TransportAgentForksSc     += 1;
	MIBMtaEntry->sc.TransportAgentsActiveSc   += 1;


	proc->tofd          = tofd;
	proc->pnext         = proc->pthread->proc;
	proc->pthread->proc = proc;
	if (proc->pnext) proc->pnext->pprev = proc;

	mytime(&proc->hungertime); /* Actually it is not yet 'hungry' as
				      per reporting so, but we store the
				      time-stamp anyway */

	cmdbufalloc(200, &proc->cmdbuf,  &proc->cmdspc);
	cmdbufalloc(200, &proc->cmdline, &proc->cmdlspc);

	fd_nonblockingmode(fromfd);
	if (fromfd != tofd)
	  fd_nonblockingmode(tofd);

	/* Construct a faximille of the argv[] in a single string.
	   This is entirely for debug porposes in some rare cases
	   where transport subprocess returns EX_SOFTWARE, and we
	   send out LOG_EMERG alerts thru syslog.  */
	proc->cmdline[0] = 0;
	l = 0;
	for (i = 0; argv[i] != NULL; ++i) {
	  if (i > 0)
	    proc->cmdline[l++] = ' ';
	  j = strlen(argv[i]);
	  cmdbufalloc(l+j+1, &proc->cmdline, &proc->cmdlspc);
	  memcpy(proc->cmdline+l, argv[i], j);
	  l += j;
	}
	proc->cmdline[l] = '\0';

	if (verbose)
	  sfprintf(sfstderr,
		   "stashprocess(%d, %d, %d, %s, %s, '%s' proc=%p)\n",
		   pid, fromfd, tofd, chwp ? chwp->name : "nil",
		   howp ? howp->name : "nil", proc->cmdline, proc);
}

/*
 * shutdown all kids that we have
 */
void
shutdown_kids()
{
	int i;
	struct procinfo *proc = cpids;

	if (!cpids) return; /* Nothing to do! */

	for (i = 0; i < scheduler_nofiles; ++i,++proc)
	  if (proc->pid > 0 && proc->tofd >= 0) {
	    /* Send the death-marker to the kid, and
	       then close the command channel */
	    write(proc->tofd,"\n\n",2);
	    pipes_shutdown_child(proc->tofd);
	    proc->tofd = -1;
	    /* Signals may happen... */
	    if (proc->pid > 1)
	      kill(proc->pid, SIGQUIT);
	  }
}

/* 
 *  Reclaim the process slot -- this process is dead now.
 */
static void reclaim(fromfd, tofd)
	int fromfd, tofd;
{
	struct procinfo *proc = &cpids[fromfd];

	if (verbose)
	  sfprintf(sfstderr,"reclaim(%d,%d) pid=%d, reaped=%d, chan=%s, host=%s\n",
		   fromfd, tofd, (int)proc->pid, proc->reaped,
		   proc->ch->name, proc->ho->name);

	if (proc->reaped &&
	    ((WIFEXITED(proc->waitstat) && WEXITSTATUS(proc->waitstat)) ||
	     WIFSIGNALED(proc->waitstat))) {

	  /* Child sig-faulted, or had non-zero exit status! */

	  sfprintf(sfstderr,
		   "reclaim(%d,%d) pid=%d, reaped=%d, chan=%s, host=%s ",
		   fromfd, tofd, (int)proc->pid, proc->reaped,
		   proc->ch->name, proc->ho->name);

	  if (WIFEXITED(proc->waitstat))
	    sfprintf(sfstderr,"EXIT=%d\n", WEXITSTATUS(proc->waitstat));
	  else if (WIFSIGNALED(proc->waitstat))
	    sfprintf(sfstderr,"SIGNAL=%d\n", WTERMSIG(proc->waitstat));
	  else
	    sfprintf(sfstderr,"WAITSTAT=0x%04x\n", proc->waitstat);
	}

	proc->pid = 0;
	proc->reaped = 0;
	if (proc->carryover != NULL) {
	  sfprintf(sfstderr, "%s: HELP! Lost %d bytes: '%s'\n",
		  progname, (int)strlen(proc->carryover), proc->carryover);
	  free(proc->carryover);
	  proc->carryover = NULL;
	}
	if (proc->ch) {
	  proc->ch->kids -= 1;
	  unweb(L_CHANNEL, proc->ch);
	  proc->ch = NULL;
	}
	if (proc->ho) {
	  proc->ho->kids -= 1;
	  unweb(L_HOST, proc->ho);
	  proc->ho = NULL;
	}
	if (tofd >= 0)
	  pipes_shutdown_child(tofd);
	close(fromfd);

	/* Reschedule the vertices that are left
	   (that were not reported on).		*/

	/* ... but only if we were not in IDLE chain! */
	if (proc->pthread) {

	  /* Remove this entry from the chains */
	  if (proc->pthread->proc == proc)
	    proc->pthread->proc = proc->pnext;

	  /* Disjoin the thread from the proc */
	  proc->pthread->thrkids -= 1;

	  /* Conditionally reschedule the thread */
	  thread_reschedule(proc->pthread,0,-1);

	  proc->pthread = NULL;

	} else {

	  /* Maybe we were in idle chain! */
	  struct procinfo *p;

	  p  =  proc->thg->idleproc;
	  while (p && p != proc) p = p->pnext;
	  if (p == proc) {
	    proc->thg->idlecnt -= 1;
	    --idleprocs;

	    /* Virtually move to ACTIVE state for count off below */
	    MIBMtaEntry->sc.TransportAgentsActiveSc += 1;
	    MIBMtaEntry->sc.TransportAgentsIdleSc   -= 1;

	    /* Remove this entry from the chains */
	    if (proc == proc->thg->idleproc)
	      proc->thg->idleproc = proc->pnext;
	  }
	}

	/* Remove this entry from the chains */
	if (proc->pnext) proc->pnext->pprev = proc->pprev;
	if (proc->pprev) proc->pprev->pnext = proc->pnext;

	/* If e.g. RESCHEDULE has not destroyed this thread-group.. */
	if (proc->thg) {
	  proc->thg->transporters -= 1;
	  /* It may go down to zero and be deleted.. */
	  delete_threadgroup(proc->thg);
	  proc->thg    = NULL;
	}

	MIBMtaEntry->sc.TransportAgentsActiveSc   -= 1;
	MIBMtaEntry->sc.TransportAgentProcessesSc -= 1;
	--numkids;
	proc->thg    = NULL;
}

static void waitandclose(fd)
	int	fd;
{
	/* This is called when
	   - fd return 0 (EOF)
	   - fd returns -1, and errno != {EAGAIN|EWOULDBLOCK|EINTR}
	 */
	reclaim(fd, cpids[fd].tofd);
}

#ifdef	HAVE_SELECT

#ifdef _AIX /* The select.h  defines NFDBITS, etc.. */
# include <sys/types.h>
# include <sys/select.h>
#endif

#ifndef	NFDBITS
/*
 * This stuff taken from the 4.3bsd /usr/include/sys/types.h, but on the
 * assumption we are dealing with pre-4.3bsd select().
 */

typedef long	fd_mask;

#ifndef	NBBY
#define	NBBY	8
#endif	/* NBBY */
#define	NFDBITS		((sizeof fd_mask) * NBBY)

/* SunOS 3.x and 4.x>2 BSD already defines this in /usr/include/sys/types.h */
#ifdef	notdef
typedef	struct fd_set { fd_mask	fds_bits[1]; } fd_set;
#endif	/* notdef */

#ifndef	_Z_FD_SET
#define	_Z_FD_SET(n, p)   ((p)->fds_bits[0] |= (1 << (n)))
#define	_Z_FD_CLR(n, p)   ((p)->fds_bits[0] &= ~(1 << (n)))
#define	_Z_FD_ISSET(n, p) ((p)->fds_bits[0] & (1 << (n)))
#define _Z_FD_ZERO(p)	  memset((char *)(p), 0, sizeof(*(p)))
#endif	/* !FD_SET */
#endif	/* !NFDBITS */

#ifdef FD_SET
#define _Z_FD_SET(sock,var) FD_SET(sock,&var)
#define _Z_FD_CLR(sock,var) FD_CLR(sock,&var)
#define _Z_FD_ZERO(var) FD_ZERO(&var)
#define _Z_FD_ISSET(i,var) FD_ISSET(i,&var)
#else
#define _Z_FD_SET(sock,var) var |= (1 << sock)
#define _Z_FD_CLR(sock,var) var &= ~(1 << sock)
#define _Z_FD_ZERO(var) var = 0
#define _Z_FD_ISSET(i,var) ((var & (1 << i)) != 0)
#endif


int in_select = 0;

int
mux(timeout)
time_t timeout;
{
	int	i, n, maxf;
	fd_set	rdmask;
	fd_set	wrmask;
	struct timeval tv;
	struct procinfo *proc = cpids;

	timed_log_reinit();

	if (in_select) {
	  sfprintf(sfstderr,"**** recursed into mux()! ***\n");
	  return 0;
	}

	queryipccheck();

	tv.tv_sec = timeout - now; /* Timeout in seconds */
	if (timeout < now)
	  tv.tv_sec = 0;
	tv.tv_usec = 0;

	maxf = -1;
	_Z_FD_ZERO(rdmask);
	_Z_FD_ZERO(wrmask);
	readsockcnt = 0;
	if (cpids != NULL)
	  for (proc = cpids,i = 0; i < scheduler_nofiles ; ++i,++proc)
	    if (proc->pid != 0) {
	      /* Something can be read ? */
	      _Z_FD_SET(i, rdmask);
	      if (maxf < i)
		maxf = i;

	      ++readsockcnt;
	      /* Something to write ? */
	      if (proc->cmdlen > 0 && proc->tofd >= 0) {
		_Z_FD_SET(proc->tofd, wrmask);
		if (maxf < proc->tofd)
		  maxf = proc->tofd;
	      }

	    }

	if (querysocket >= 0) {
	  _Z_FD_SET(querysocket, rdmask);
	  if (maxf < querysocket)
	    maxf = querysocket;
	}
	if (notifysocket >= 0) {
	  _Z_FD_SET(notifysocket, rdmask);
	  if (maxf < notifysocket)
	    maxf = notifysocket;
	}

	/* Although we don't react on the results of these MQ2 fd's,
	   getting them to break timeouts is important for MAILQv2
	   responsiveness. ! */

	if (mailqmode == 2)
	  maxf = mq2add_to_mask(&rdmask, &wrmask, maxf);

	if (maxf < 0)
	  return -1;

	++maxf;
	/* sfprintf(sfstderr, "about to select on %x [%d]\n",
	   mask.fds_bits[0], maxf); */

	in_select = 1;

	n = select(maxf, &rdmask, &wrmask, NULL, &tv);

	if (n < 0) {
	  int err = errno;

	  timed_log_reinit();

	  /* sfprintf(sfstderr, "got an interrupt (%d)\n", errno); */
	  in_select = 0;
	  if (errno == EINTR || errno == EAGAIN)
	    return 0;
	  if (errno == EINVAL || errno == EBADF) {
	    sfprintf(sfstderr, "** select() returned errno=%d\n", err);
	    for (i = 0; i < maxf; ++i) {
	      if (_Z_FD_ISSET(i,rdmask)  &&  fcntl(i,F_GETFL,0) < 0)
		sfprintf(sfstderr,"** Invalid fd on a select() rdmask: %d\n",i);
	      if (_Z_FD_ISSET(i,wrmask)  &&  fcntl(i,F_GETFL,0) < 0)
		sfprintf(sfstderr,"** Invalid fd on a select() wrmask: %d\n",i);
	    }
	    sfsync(sfstderr);
	    abort(); /* mux() select() error EINVAL or EBADF !? */
	  }
	  perror("select() returned unknown error ");
	  fflush(stderr);
	  sfsync(sfstderr);
	  abort(); /* Select with unknown error */
	} else if (n == 0) {
	  /* sfprintf(sfstderr, "abnormal 0 return from select!\n"); */
	  /* -- just a timeout -- fast or long */
	  timed_log_reinit();
	  in_select = 0;
	  return 1;
	} else {
	  /*sfprintf(sfstderr, "got %d ready (%x)\n", n, rdmask.fds_bits[0]);*/

	  /* In case we really should react.. */
	  if (querysocket >= 0 &&
	      _Z_FD_ISSET(querysocket, rdmask))
	    queryipccheck();
	  
	  if (notifysocket >= 0 &&
	      _Z_FD_ISSET(notifysocket, rdmask))
	    receive_notify(notifysocket);

	  if (cpids != NULL) {

	    for (proc = cpids, i = 0; i < maxf; ++i, ++proc) {
	      
	      timed_log_reinit();

	      if (proc->pid < 0 ||
		  (proc->pid > 0 && _Z_FD_ISSET(i, rdmask))) {
		_Z_FD_CLR(i, rdmask);
		/*sfprintf(sfstderr,"that is fd %d\n",i);*/
		/* do non-blocking reads from this fd */
		readfrom(i);
	      }

	      /* In case we have non-completed 'feeds', try feeding them */
	      if (proc->pid > 0    &&  proc->tofd >= 0   &&
		  proc->cmdlen > 0 &&  _Z_FD_ISSET(proc->tofd, wrmask))
		flush_child(proc);

	      /* Because this loop might take a while ... */
	      queryipccheck();

	      syncweb(dirq);

	    }
	  }

	  in_select = 0;
	}
	/* sfprintf(sfstderr, "return from mux\n"); */
	return 0;
}

/* Call it how often you wish, but act it only once a second, or so.. */
static time_t lastqueryipccheck;
static time_t qipcretry;

void
queryipccheck()
{
	int	n;
	fd_set	rdmask;
	fd_set	wrmask;
	struct timeval tv;
	int maxfd;

	timed_log_reinit(); /* internal: mytime(&now); */

	if (!mq2_active() && (lastqueryipccheck == now)) return;
	lastqueryipccheck = now;

	if (qipcretry > 0 && qipcretry <= now) {
	  qipcretry = 0;
	  queryipcinit();
	  /*
	   * If qipcretry is set here, the value will be ignored, but
	   * that's ok since sweepretry is active by now
	   */
	}

	if (querysocket >= 0 || notifysocket >= 0) {

	  maxfd = querysocket;

	  tv.tv_sec = 0;
	  tv.tv_usec = 0;

	  _Z_FD_ZERO(rdmask);
	  _Z_FD_ZERO(wrmask);
	  if (querysocket >= 0)
	    _Z_FD_SET(querysocket, rdmask);
	  if (notifysocket >= 0) {
	    _Z_FD_SET(notifysocket, rdmask);
	    if (notifysocket > maxfd)
	      maxfd = notifysocket;
	  }

	  if (mailqmode == 2) {
	    maxfd = mq2add_to_mask(&rdmask, &wrmask, maxfd);
	    n = select(maxfd+1, &rdmask, &wrmask, NULL, &tv);
	    if (n > 0)
	      mq2_areinsets(&rdmask, &wrmask);
	  } else 
	    n = select(maxfd+1, &rdmask, &wrmask, NULL, &tv);

	  if (n > 0 && notifysocket >= 0 &&
	      _Z_FD_ISSET(notifysocket, rdmask)) {
	    receive_notify(notifysocket);
	  }

	  if (n > 0 && querysocket >= 0 &&
	      _Z_FD_ISSET(querysocket, rdmask)) {
	    Usockaddr raddr;
	    int raddrlen = sizeof(raddr);

	    n = accept(querysocket, (struct sockaddr *)&raddr, &raddrlen);
	    if (n >= 0) {
	      if (mailqmode == 1) {
		int pid;

		MIBMtaEntry->sc.MQ1sockConnects ++;
		MIBMtaEntry->sc.MQ1sockParallel ++;

		pid = fork();
		if (pid == 0) {
#if defined(F_SETFD)
		  fcntl(n, F_SETFD, 1); /* close-on-exec */
#endif
#ifdef USE_TCPWRAPPER
#ifdef HAVE_TCPD_H /* TCP-Wrapper code */
		  if (wantconn(n, "mailq") == 0) {
		    char *msg = "500 TCP-WRAPPER refusing 'mailq' query from your whereabouts\r\n";
		    int   len = strlen(msg);
		    write(n,msg,len);
		    MIBMtaEntry->sc.MQ1sockParallel --;
		    MIBMtaEntry->sc.MQ1sockTcpWrapRej ++;
		    _exit(0);
		  }
#endif
#endif
		  qprint(n);
		  close(n);
		  MIBMtaEntry->sc.MQ1sockParallel --;
		  /* Silence memory debuggers about this child's
		     activities by doing exec() on the process.. */
		  /* execl("/bin/false","false",NULL); */
		  _exit(0); /* _exit() should be silent, too.. */
		}
		if (pid < 0)
		  MIBMtaEntry->sc.MQ1sockParallel --;
		close(n);
	      } else {
		/* mailqmode == 2 */

		MIBMtaEntry->sc.MQ2sockConnects ++;

#if 0  /* NOT IN MAILQ-V2 MODE ! */
#ifdef USE_TCPWRAPPER
#ifdef HAVE_TCPD_H /* TCP-Wrapper code */
		if (wantconn(n, "mailq") == 0) {
		  char *msg = "500 TCP-WRAPPER refusing 'mailq' query from your whereabouts\r\n";
		  int   len = strlen(msg);
		  write(n,msg,len);
		  MIBMtaEntry->sc.MQ2sockTcpWrapRej ++;
		  close(n);
		}
		else
#endif
#endif
#endif
		  mq2_register(n, &raddr);
	      }
	    }
	  }
	}
}

void
queryipcinit()
{
	int modecode = 1; /* Modes: 1=TCP, 2=UNIX, default=TCP */
	char *modedata = NULL;

	while (notifysocket < 0) {

	  if (!notifysock) {
	    notifysock = (char *)getzenv("SCHEDULERNOTIFY");
	    if (notifysock) {
	      notifysock = zenvexpand(notifysock);
	    } else {
	      notifysock = strsave("UNIX:${POSTOFFICE}/.scheduler.notify");
	      notifysock = zenvexpand(notifysock);
	    }
	  }

	  if (notifysock) {
	    if (cistrncmp(notifysock,"UNIX:",5)==0) {
	      modedata = (char *)notifysock+5;
	      modecode = 2;
	    } else if (*notifysock == '/') {
	      /* If it begins with '/', it is AF_UNIX socket */
	      modedata = (char *)notifysock;
	      modecode = 2;
	    } else if (cistrncmp(notifysock,"UDP:",4)==0) {
	      modedata = (char *)notifysock+4;
	      modecode = 1;
	    } else {
	      /* The default mode is UDP/IP socket */
	      modedata = (char *)notifysock;
	      modecode = 1;
	    }
	  } else
	    modecode = 0;

#ifdef  AF_UNIX
	  if (modecode == 2) {
	    struct sockaddr_un sad;
	    int on = 1, oldumask;

	    memset(&sad, 0, sizeof(sad));
	    sad.sun_family = AF_UNIX;
	    strncpy(sad.sun_path, modedata, sizeof(sad.sun_path));
	    sad.sun_path[ sizeof(sad.sun_path)-1 ] = 0;

	    if ((notifysocket = socket(PF_UNIX, SOCK_DGRAM, 0)) < 0) {
	      perror("notifysocket: socket(PF_UNIX)");
	      break;
	    }

	    setsockopt(notifysocket, SOL_SOCKET, SO_REUSEADDR, (void*)&on, sizeof(on));

	    /* In case that one already exists.. */
	    unlink(sad.sun_path);

	    oldumask = umask(0577);

	    if (bind(notifysocket, (struct sockaddr *)&sad, sizeof sad) < 0) {
	      perror("bind:UNIX notify socket");
	      umask(oldumask);
	      close(notifysocket);
	      notifysocket = -1;
	      break;
	    }
	    umask(oldumask);

	    fcntl(notifysocket, F_SETFL,
		  fcntl(notifysocket, F_GETFL, 0)|O_NONBLOCK);

#if defined(F_SETFD)
	    fcntl(notifysocket, F_SETFD, 1); /* close-on-exec */
#endif
	  }
#endif /* AF_UNIX */
	  break;
	}

	while (querysocket < 0) {

	  modedata = NULL;
	  modecode = 0;
	  if (mailqsock) {
	    if (cistrncmp(mailqsock,"UNIX:",5)==0) {
	      modedata = (char *)mailqsock+5;
	      modecode = 2;
	    } else if (*mailqsock == '/') {
	      /* If it begins with '/', it is AF_UNIX socket */
	      modedata = (char *)mailqsock;
	      modecode = 2;
	    } else if (cistrncmp(mailqsock,"TCP:",4)==0) {
	      modedata = (char *)mailqsock+4;
	      modecode = 1;
	    } else {
	      /* The default mode is TCP/IP socket */
	      modedata = (char *)mailqsock;
	      modecode = 1;
	    }
	  } else {
	    /* The default mode is TCP/IP socket */
	    modedata = (char *)mailqsock;
	    modecode = 1;
	  }

#ifdef  AF_UNIX
	  if (modecode == 2) {
	    struct sockaddr_un sad;
	    int on = 1, oldumask;

	    memset(&sad, 0, sizeof(sad));
	    sad.sun_family = AF_UNIX;
	    strncpy(sad.sun_path, modedata, sizeof(sad.sun_path));
	    sad.sun_path[ sizeof(sad.sun_path)-1 ] = 0;

	    if ((querysocket = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
	      perror("querysocket: socket(PF_UNIX)");
	      break;
	    }

	    setsockopt(querysocket, SOL_SOCKET, SO_REUSEADDR, (void*)&on, sizeof(on));

	    /* In case that one already exists.. */
	    unlink(sad.sun_path);

	    oldumask = umask(0111);

	    if (bind(querysocket, (struct sockaddr *)&sad, sizeof sad) < 0) {
	      perror("bind:UNIX mailq socket");
	      umask(oldumask);
	      close(querysocket);
	      querysocket = -1;
	      break;
	    }
	    umask(oldumask);

	    fd_nonblockingmode(querysocket);

#if defined(F_SETFD)
	    fcntl(querysocket, F_SETFD, 1); /* close-on-exec */
#endif

	    if (listen(querysocket, 5) < 0) {
	      perror("listen:UNIX mailq socket");
	      close(querysocket);
	      querysocket = -1;
	      break;
	    }
	  }
#endif
#ifdef	AF_INET
	  if (modecode == 1) {
	    struct servent *serv;
	    Usockaddr ua;
	    int on = 1;
	    int port = 174;
	    char *modedata2 = NULL;

	    if (modedata) {
	      modedata2 = strchr(modedata,'@');
	      if (modedata2) *modedata2++ = 0;
	    }

	    if (!modedata || !*modedata || sscanf(modedata,"%d",&port) != 1) {
	      serv = getservbyname(modedata ? modedata : "mailq", "tcp");
	      if (serv == NULL) {
		sfprintf(sfstderr, "No 'mailq' tcp service defined!\n");
		port = 174; /* magic knowledge */
	      } else
		port = ntohs(serv->s_port);
	    }
	    if (modedata2)  modedata2[-1] = '@';

	    memset(&ua, 0, sizeof(ua));
	    if (zgetbindaddr(NULL, AF_INET, &ua))
		ua.v4.sin_addr.s_addr = htonl(INADDR_ANY);
	    ua.v4.sin_port        = htons(port);
	    ua.v4.sin_family      = AF_INET;
	    querysocket = -1;
#ifdef INET6
	    querysocket = socket(PF_INET6, SOCK_STREAM, 0);
#endif
	    if (querysocket < 0)
	      querysocket = socket(PF_INET, SOCK_STREAM, 0);
#ifdef INET6
	    else {
	      memset(&ua, 0, sizeof(ua));
	      if (zgetbindaddr(NULL, AF_INET6, &ua))
		  ua.v4.sin_addr.s_addr = htonl(INADDR_ANY);
	      ua.v6.sin6_port   = htons(port);
	      ua.v6.sin6_family = AF_INET6;
	    }
#endif
	    if (querysocket < 0) {
#ifdef INET6
	      perror("querysocket: socket(PF_INET6/PF_INET)");
#else
	      perror("querysocket: socket(PF_INET)");
#endif
	      break;
	    }
	    setsockopt(querysocket, SOL_SOCKET, SO_REUSEADDR, (void*)&on, sizeof(on));

#ifdef INET6
	    if (ua.v6.sin6_family == AF_INET6) {
	      if (bind(querysocket, (struct sockaddr *)&ua, sizeof ua.v6) < 0) {
		perror("bind:TCP6 mailq socket");
		close(querysocket);
		querysocket = -1;
		break;
	      }
	    } else
#endif
	      if (bind(querysocket, (struct sockaddr *)&ua, sizeof ua.v4) < 0) {
		perror("bind:TCP4 mailq socket");
		close(querysocket);
		querysocket = -1;
		break;
	      }

	    fcntl(querysocket, F_SETFL,
		  fcntl(querysocket, F_GETFL, 0)|O_NONBLOCK);


#if defined(F_SETFD)
	    fcntl(querysocket, F_SETFD, 1); /* close-on-exec */
#endif

	    if (listen(querysocket, 5) < 0) {
	      perror("listen:TCP mailq socket");
	      close(querysocket);
	      querysocket = -1;
	      break;
	    }
	  }
#endif	/* AF_INET */
	  break;
	}

	if (querysocket >= 0 && notifysocket >= 0)
	  qipcretry = 0; /* Successfull init done. */
	else {
	  mytime(&now); 
	  qipcretry = now + 5; /* Will do retry soon.. */

	}
}

#else	/* !HAVE_SELECT */

int
mux(timeout)
time_t timeout;
{
	int	fd;

	/*
	 * Nice'n easy and simpleminded: grab a random file descriptor,
	 * and sit and read off it until something happens.
	 * Some very complicated mux'ing schemes (with shared pipes'n stuff)
	 * are possible in the absence of async i/o like select() or the
	 * simulation USG supplies, but it ain't worth the hassle.
	 */
	readsockcnt = 0;
	if (cpids != NULL)
	  for (fd = 0; fd < scheduler_nofiles ; ++fd)
	    if (cpids[fd].pid != 0) {
	      readfrom(fd);
	      ++readsockcnt;
	    }

	mytime(&now);
	if (timeout > now)
	  sleep(1);
	return 1;
}

void queryipccheck()
{
	/* NOTHING AT ALL -- No select(), no querysocket.. */
}

void
queryipcinit()
{
}
#endif	/* HAVE_SELECT */

static void readfrom(fd)
	int fd;
{
	int	n, e, bufsize = 2048;
	char	*cp, *eobuf, *buf;
	struct procinfo *proc = &cpids[fd];

	cp = buf = (char *)emalloc(bufsize);

	if (proc->carryover != NULL) {
	  int carrylen = strlen(proc->carryover);
	  if (carrylen > bufsize) {
	    while (carrylen > bufsize)
	      bufsize += 1024;
	    buf = erealloc(buf,bufsize);
	  }
	  strcpy(buf, proc->carryover);
	  cp = buf+strlen(buf);
	  free(proc->carryover);
	  proc->carryover = NULL;
	}

	/* Note that if we get an alarm() call, the read will return -1, TG */
	errno = 0;
	while ((n = read(fd, cp, bufsize - (cp - buf))) > 0) {

	  if (verbose)
	    sfprintf(sfstderr, "read from %d returns %d\n", fd, n);

	  eobuf = cp + n;

	  for (cp = buf; cp < eobuf;) {
	    if (*cp == '\n') {
	      int rlen = eobuf - (cp+1);
	      *cp = '\0';
	      if (verbose)
		sfprintf(sfstderr, "%p %d fd=%d processed: %s\n",
			 proc, (int)proc->pid, fd, buf);
	      update(fd,buf);
	      ++cp;
	      if (rlen > 0)
		memcpy(buf, cp, rlen);
	      else
		rlen = 0;
	      cp = buf;
	      eobuf = buf + rlen;

	    } else

	      ++cp;
	  }

	  if (cp == (buf + bufsize)) {
	    /* 
	     * can't happen, this would mean a status report line 
	     * that is rather long...
	     * (oh no! it did happen, it did, it did!...)
	     */
	    int oldsize = bufsize;
	    bufsize <<= 1;
	    buf = erealloc(buf,bufsize);
	    cp = buf + oldsize;
	    *cp = '\0';
	  }

	}
	e = errno;

	if (verbose) {
	  if (!(e == EAGAIN || e == EWOULDBLOCK))
	    sfprintf(sfstderr,
		    "read from %d returns %d, errno=%d\n", fd, n, e);
	}
	if (n == 0 || (n < 0 && !(e == EWOULDBLOCK ||
				  e == EAGAIN || e == EINTR))) {
	  /*sfprintf(sfstdout,
	    "about to call waitandclose(), n=%d, errno=%d\n",n,e);*/

	  if (proc->tofd >= 0)
	    pipes_shutdown_child(proc->tofd);
	  proc->tofd = -1;
	  waitandclose(fd);
	}

	/* sfprintf(sfstderr, "n = %d, errno = %d\n", n, errno); */
	/*
	 * if n < 0, then either we got an interrupt or the read would
	 * block (EINTR or EWOULDBLOCK). In both cases we basically just
	 * want to get back to whatever we were doing. We just need to
	 * make darned sure that a newline was the last character we saw,
	 * or else some report may get lost somewhere.
	 */
	if (proc->pid != 0) {
	  proc->carryover = emalloc(cp - buf + 1);
	  memcpy(proc->carryover, buf, cp - buf);
	  proc->carryover[cp - buf] = '\0';
	} else if (cp > buf)
	  sfprintf(sfstderr,
		   "HELP! Lost %ld bytes (n=%d/%d): '%s'\n",
		   (long)(cp - buf), n, errno, buf);
	free(buf);
}

#if defined(USE_BINMKDIR) || defined(USE_BINRMDIR)

/*
 * From Ross Ridge's Xenix port:
 * - A nasty problem occurs with scheduler if rmdir (and mkdir also I think),
 *   is implented as system("/bin/rmdir ...").  When system() calls wait()
 *   it can reap the scheduler's children without it knowing.  I fixed this
 *   problem by writing a replacement system() function for scheduler.
 *
 */

int
system(name)
	char *name;
{
	char *sh;
	int st, r;
	int pid;
	int i;

	pid = fork();
	switch(pid) {
	case -1:
		return -1;
	case 0:
		sh = getenv("SHELL");
		if (sh == NULL) {
		  sh = "/bin/sh";
		}
		execl(sh, sh, "-c", name, NULL);
		_exit(1);
	default:
#ifndef USE_SIGREAPER
		while(1) {
		  r = wait(&st);
		  if (r == -1) {
		    if (errno != EINTR) {
		      return -1;
		      if (errno != EINTR) {
			return -1;
		      }
		    } else if (r == pid) {
		      break;
		    }
		    for(i = 0; i < scheduler_nofiles; i++) {
		      if (cpids[i].pid == r) {
			cpids[i].pid = -r;
			break;
		      }
		    }
		  }

		  if ((st & 0x00ff) == 0) {
		    return st >> 8;
		  }
		  return 1;
		}
#endif
		break;
	   }
}

#endif


#ifdef USE_SIGREAPER
/*
 *	Catch each child-process death, and reap them..
 */
RETSIGTYPE sig_chld(signum)
int signum;
{
	int pid;
	int ok = 0;
	int i;
	int statloc;

	for (;;) {

#ifdef	HAVE_WAITPID
	  pid = waitpid(-1, &statloc, WNOHANG);
#else
#ifdef  HAVE_WAIT4
	  pid = wait4(-1, &statloc, WNOHANG, NULL);
#else
#ifdef  HAVE_WAIT3
	  pid = wait3(&statloc, WNOHANG, NULL);
#else
	  pid = wait(&statloc);
#endif
#endif
#endif
	  if (pid <= 0) break;

	  if (WIFEXITED  (statloc)) ok = 1;
	  if (WIFSIGNALED(statloc)) ok = 1;

	  if (verbose) {
	    sfprintf(sfstderr,"sig_chld() pid=%d, ok=%d, stat=0x%x ",
		    pid,ok,statloc);
	    if (WIFEXITED  (statloc))
	      sfprintf(sfstderr,"EXIT=%d\n", WEXITSTATUS(statloc));
	    if (WIFSIGNALED(statloc))
	      sfprintf(sfstderr,"SIGNAL=%d\n", WTERMSIG(statloc));
	  }

	  if (ok && cpids != NULL) {
	    /* Only EXIT and SIGxx DEATHS accepted */

	    for (i = scheduler_nofiles-1; i >= 0; --i) {
	      if (cpids[i].pid == pid) {
		cpids[i].pid = -pid; /* Mark it as reaped.. */
		cpids[i].reaped = 1;
		cpids[i].waitstat = statloc;
		ok = 0;
		if (WSIGNALSTATUS(statloc) == 0 &&
		    WEXITSTATUS(statloc)   == EX_SOFTWARE) {
		  zsyslog((LOG_EMERG, "Transporter process %d exited with EX_SOFTWARE!", pid));
		  sfprintf(sfstderr, "Transporter process %d exited with EX_SOFTWARE; cmdline='%s'\n", pid, cpids[i].cmdline);
		}
		break;
	      }
	      if (cpids[i].pid == -pid) {
		sfprintf(sfstdout," .. already reaped ??\n");
		cpids[i].pid = -pid; /* Mark it as reaped.. */
		cpids[i].reaped = 1;
		cpids[i].waitstat = statloc;
		ok = 0;
		break;
	      }
	    }
	  }
	}

	/* re-instantiate the signal handler.. */
#ifdef SIGCLD
	SIGNAL_HANDLE(SIGCLD,  sig_chld);
#else
	SIGNAL_HANDLE(SIGCHLD, sig_chld);
#endif
}
#endif /* USE_SIGREAPER */


syntax highlighted by Code2HTML, v. 0.9.1