/*
 *	Copyright 1988 by Rayan S. Zachariassen, all rights reserved.
 *	This will be free software, but only when it is finished.
 */
/*
 *	Lots of modifications (new guts, more or less..) by
 *	Matti Aarnio <mea@nic.funet.fi>  (copyright) 1992-2003
 */

/*
 * ZMailer transport scheduler.
 */

#include <stdio.h>
#include <sfio.h>

#include <sys/param.h>
#include "hostenv.h"
#include <ctype.h>
#include <errno.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "mail.h"
#include <string.h>
#include "ta.h"
#include "sysexits.h"
#include "libc.h"

#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif

#ifdef HAVE_DIRENT_H
# include <dirent.h>
#else /* not HAVE_DIRENT_H */
# define dirent direct
# ifdef HAVE_SYS_NDIR_H
#  include <sys/ndir.h>
# endif /* HAVE_SYS_NDIR_H */
# ifdef HAVE_SYS_DIR_H
#  include <sys/dir.h>
# endif /* HAVE_SYS_DIR_H */
# ifdef HAVE_NDIR_H
#  include <ndir.h>
# endif /* HAVE_NDIR_H */
#endif /* HAVE_DIRENT_H */

#include "zmsignal.h"
#ifdef HAVE_MMAP
#include <sys/mman.h>
#endif

#include "scheduler.h"
#include "prototypes.h"
#include "zsyslog.h"
#include "libz.h"
#include <grp.h>

extern int optind;
extern char *optarg;

#ifndef	MAXNAMLEN /* POSIX.1 ... */
#define MAXNAMLEN NAME_MAX
#endif

#ifndef	_IOLBF
# define _IOLBF 0200
#endif	/* !_IOLBF */

#ifdef	HONEYBUM/* not really SVID, just this stupid honeywell compiler */
# define MAX_ENTRIES 3000
#else	/* sane pcc */
# define MAX_ENTRIES 10000
#endif	/* honeywell pcc */

struct sptree *spt_mesh[SIZE_L];

#ifdef	MALLOC_TRACE
struct conshell *envarlist = NULL;
#endif	/* MALLOC_TRACE */

#define TRANSPORTMAXNOFILES 32 /* Number of files a transporter may
				  need open -- or any of its children.. */
int	transportmaxnofiles = TRANSPORTMAXNOFILES; /* Default value */
const char * progname;
extern const char * postoffice;
const char * rendezvous;
const char * pidfile = PID_SCHEDULER;
const char * mailshare;
const char * logfn;
const char * statusfn;
Sfio_t     * statuslog = NULL;
int	slow_shutdown = 0;
extern int readsockcnt; /* from transport.c: mux() */
 
static int mustexit = 0;
static int gotalarm = 0;
static int dumpq    = 0;
static int canexit  = 0;
static int rereadcf = 0;
static int dlyverbose = 0;
time_t	sched_starttime;
int	do_syslog = 0;
int	verbose = 0;
int	querysocket = -1;	/* fd of TCP socket to listen for queries */
int	msgwriteasync = 0;
int	D_alloc = 0;
int	hungry_childs = 0;
int	global_wrkcnt = 0;
int	mailq_Q_mode = 0;
int	syncstart = 0;		/* while set, the thread subsystem shall start
				   no childs! */
int	freeze = 0;		/* For debugging, complete disable of child
				   running.. */
int	hashlevels = 0;		/* How many levels of hashes are supported
				   at the transport directory ?  (To speed
				   up the directory processing, file opens,
				   etc.. ) */
char * procselect = NULL;	/* Non-null defines  channel/host specifier
				   that is ALLOWED TO RUN, specifying this
				   (by means of '-P chan/host' -option) will
				   prevent running anything else, and also
				   prevent running error processing, or
				   job-specifier deletions. */
char *  procselhost = NULL;	/* Just spliced out 'host'-part of the above */
extern int forkrate_limit;	/* How many forks per second ? */
int	mailqmode = 1;		/* ZMailer v1.0 mode on mailq */
char *  mailqsock;
char *  notifysock;

int    interim_report_interval = 5*60; /* 5 minutes */

static int vtxprep_skip;
static int vtxprep_skip_any;
static int vtxprep_skip_lock;
static time_t next_dirscan;
static time_t next_idlecleanup;
static time_t next_interim_report_time;
static time_t next_expiry2_scan;
static struct sptree *dirscan_mesh;

static int newents_limit = 40000;
static int newents_timelimit = 5;

extern int default_full_content; /* on conf.c */

#include "memtypes.h"
extern memtypes stickymem;

static struct ctlfile *schedule __((int fd, const char *file, long ino, const int));
static struct ctlfile *vtxprep __((struct ctlfile *, const char *, const int));
static int  vtxmatch __((struct vertex *, struct config_entry *));
static void link_in __((int flag, struct vertex *vp, const char *s));
static int  lockverify __((struct ctlfile *, const char *, const int));
static int  globmatch   __((const char *, const char*));
static void vtxdo   __((struct vertex *, struct config_entry *, const char *));

extern void  cfp_mksubdirs __((const char *, const char*));

static RETSIGTYPE sig_exit   __((int sig));
static RETSIGTYPE sig_quit   __((int sig));
static RETSIGTYPE sig_alarm  __((int sig));
static RETSIGTYPE sig_iot    __((int sig));
static RETSIGTYPE sig_readcf __((int sig));

extern char *strerror __((int err));

extern time_t mytime          __((time_t *));
static void   init_timeserver __((void));

#ifdef HAVE_SELECT /* Well, not exactly kosher assumption.. */
/* extern int gettimeofday __((struct timeval *,)); */
#else
extern time_t time __((time_t *));
#endif

static int loginitsched __((int));
static int loginitsched(sig)
int sig;
{
	int flags;

	if (logfn != NULL) {
	  sfsync(sfstdout);
	  sfseek(sfstdout, 0, 0);
	  sfsync(sfstderr);
	  sfseek(sfstderr, 0, 0);
	  if (sfopen(sfstdout, logfn, "a") != sfstdout
	      || dup2(sffileno(sfstdout), sffileno(sfstderr)) < 0) {	/* sigh */
	    /* XX: stderr might be closed at this point... */
	    sfprintf(sfstderr, "%s: cannot open log: %s, errno=%d\n", progname, logfn, errno);
	    return -1;
	  }
#if	defined(F_SETFL) && defined(O_APPEND)
	  flags = fcntl(sffileno(sfstdout), F_GETFL, 0);
	  flags |= O_APPEND;
	  fcntl(sffileno(sfstdout), F_SETFL, flags);
#endif	/* F_SETFL */
#if defined(F_SETFD)
	  fcntl(sffileno(sfstdout), F_SETFD, 1); /* close-on-exec */
#endif
	  sfset(sfstdout, SF_LINE, 1);
	  sfset(sfstderr, SF_LINE, 1);
	}
	if (statusfn != NULL && statuslog != NULL) {
	  sfclose(statuslog);
	  if (sfopen(NULL, statusfn, "a") != statuslog) {
	    sfprintf(sfstderr,"%s: cannot open statuslog: %s, errno=%d\n", progname, statusfn, errno);
	    return -1;
	  }
	  sfset(statuslog, SF_WHOLE, 1);
#if defined(F_SETFD)
	  fcntl(sffileno(statuslog), F_SETFD, 1); /* close-on-exec */
#endif
	}
	SIGNAL_HANDLE(SIGHUP, (RETSIGTYPE(*)__((int))) loginitsched);
	return 0;
}

/*
 *  A self-timed log re-init caller to make sure, that
 *  no log file is kept open for more than 30 seconds
 *  in one go.  This allows (within 30 seconds) the files
 *  to be rotates at first to new names, then after that
 *  30 second delay, to be further processed - e.g. compressed.
 *
 */

void timed_log_reinit()
{
  static time_t next_log_reinit;

  mytime(&now);

  if (next_log_reinit < now) {
    struct stat stbuf;

    next_log_reinit = now + 25;
    loginitsched(SIGHUP);

    if (logfn != NULL &&
	lstat(logfn,&stbuf) == 0 &&
	S_ISREG(stbuf.st_mode)) {

      long fsspace;
      fsspace = free_fd_statfs(sffileno(sfstdout));
      MIBMtaEntry->sys.LogFreeSpace = fsspace;
      fsspace = used_fd_statfs(sffileno(sfstdout));
      MIBMtaEntry->sys.LogUsedSpace = fsspace;
    }
  }
}



struct dirstatname {
	struct stat st;
	long ino;
	time_t	not_before;
	char name[1]; /* Allocate enough size */
};
struct dirqueue {
	int	wrksum;
	int	sorted;
	int	wrkcount;
	int	wrkspace;
	int	wrkcount2; /* Back-pushed material a *2 -queue */
	int	wrkspace2;
	struct dirstatname **stats;
	struct dirstatname **stats2;
};

static int dirqueuescan __((const char *dir,struct dirqueue *dq, int subdirs));
int syncweb __((struct dirqueue *dq));

int global_maxkids = 1000;
time_t now;

Sfio_t * vfp_open(cfp)
struct ctlfile *cfp;
{
	Sfio_t *vfp;
	int fd;

	if (!cfp->vfpfn) return NULL;

	/* Open the vfp *ONLY* if the logging file exists,
	   and can be written to.  If the file does not
	   exist, no logging shall happen! */

	SETEUID(cfp->uid);
	fd = open(cfp->vfpfn, O_WRONLY|O_APPEND, 0);
	SETEUID(0);
	if (fd < 0) return NULL; /* Can't open it! */
	vfp = sfnew(NULL, NULL, 0, fd, SF_WRITE|SF_APPENDWR|SF_LINE);
	if (!vfp) return NULL; /* Failure to open */

	sfseek(vfp, (Sfoff_t)0, SEEK_END);
	return vfp;
}


static void cfp_free __((struct ctlfile *cfp, struct spblk *spl));
static void cfp_free0 __((struct ctlfile *cfp));

static void cfp_free0(cfp)
struct ctlfile *cfp;
{

	struct vertex *vp, *nvp;

	/* Delete from memory */

	if (cfp->vfpfn) {
	  Sfio_t *vfp = vfp_open(cfp);
	  if (vfp) {
	    sfprintf(vfp,
		     "ordered deletion of task file from scheduler memory (%s)\n",
		     cfp->mid);
	    sfclose(vfp);
	  }
	}

	/* Throw all way, if no vertices.. */
	if (!cfp->head) {
	  /* This should *not* be happening.. */
	  sfprintf(sfstderr,
		   "%s: SHOULD NOT HAPPEN: cfp->head == NULL; spoolid: %s\n",
		   progname, cfp->spoolid);
	  unctlfile(cfp, 1);
	  return;
	}

	for (vp = cfp->head; vp != NULL; vp = nvp) {
	  /* Stash the next pointer, because the VP content
	     WILL become trashed at the end of the loop. */
	  nvp = vp->next[L_CTLFILE];

	  MIBMtaEntry->sc.StoredRecipientsSc   -= vp->ngroup;
	  vp->ngroup = 0;
	  unvertex(vp,1,1); /* Don't unlink()! Just free()! */
	}
	/* CFP freeup happens at last vertex's unvertex() */
}

static void cfp_free(cfp, spl)
struct ctlfile *cfp;
struct spblk   *spl;
{
	/* Delete from the  spt_mesh[]  */

	if (spl == NULL && cfp->id)
	  spl = sp_lookup((u_long)(cfp->id), spt_mesh[L_CTLFILE]);
	if (spl != NULL)
	  sp_delete(spl, spt_mesh[L_CTLFILE]);

	/* And from the memory */

	cfp_free0(cfp);
}


static int ctl_free __((struct spblk *spl));
static int ctl_free(spl)
struct spblk *spl;
{
	cfp_free((struct ctlfile *)spl->data, NULL);
	return 0;
}


/*
 *  free_cfp_memory(cfp) -- release all of the memory associated with the
 *  control file -- some of it is conditionally allocated, or perhaps
 *  previously released..  (cfp->contents, for example)
 */
void free_cfp_memory(cfp)
struct ctlfile *cfp;
{
	if (cfp->contents)	free(cfp->contents);
	if (cfp->vfpfn)		free(cfp->vfpfn);
	if (cfp->spoolid)	free(cfp->spoolid);
	if (cfp->mid)		free(cfp->mid);
	if (cfp->erroraddr)	free(cfp->erroraddr);
	if (cfp->logident)	free(cfp->logident);
	if (cfp->envid)		free(cfp->envid);

	/* memset(cfp, 0x55, sizeof(*cfp)); */

	free((char *)cfp);

	--global_wrkcnt;
	--MIBMtaEntry->sc.StoredMessagesSc;
}


struct dirqueue dirqb = { 0, };
struct dirqueue *dirq = &dirqb;

char *ArgvSave;
const char *EOArgvSave;

extern int main __((int, const char **));

static struct config_entry *cehead = NULL;


/* #define GLIBC_MALLOC_DEBUG__ */
#ifdef GLIBC_MALLOC_DEBUG__ /* memory allocation debugging with GLIBC */

#include <malloc.h> /* GLIBC malloc.h ! */

/* Global variables used to hold underlaying hook values.  */
static void *(*old_malloc_hook) (size_t, const void * );
static void *(*old_realloc_hook) (void *, size_t, const void *);
static void (*old_free_hook) (void*, const void *);
static void *(*old_memalign_hook) (size_t, size_t, const void *);

/* Prototypes for our hooks.  */
static void *my_malloc_hook  (size_t, const void*);
static void *my_realloc_hook (void *,size_t, const void*);
static void  my_free_hook    (void*, const void*);
static void *my_memalign_hook  (size_t, size_t, const void*);
     
static void *
my_malloc_hook (size_t size, const void *CALLER)
{
  void *result;
  /* Restore all old hooks */
  __malloc_hook = old_malloc_hook;
  __free_hook   = old_free_hook;
  /* Call recursively */
  result = malloc (size);
  /* Save underlaying hooks */
  old_malloc_hook = __malloc_hook;
  old_free_hook   = __free_hook;
  /* `printf' might call `malloc', so protect it too. */
  sfprintf(sfstderr,"# malloc (%u) returns %p @%p\n",
	   (unsigned int) size, result, CALLER);
  /* Restore our own hooks */
  __malloc_hook = my_malloc_hook;
  __free_hook = my_free_hook;
  return result;
}

static void *
my_realloc_hook (void *ptr, size_t size, const void *CALLER)
{
  void *result;
  /* Restore all old hooks */
  __realloc_hook = old_realloc_hook;
  __malloc_hook = old_malloc_hook;
  __free_hook = old_free_hook;
  /* Call recursively */
  result = realloc (ptr, size);
  /* Save underlaying hooks */
  old_realloc_hook = __realloc_hook;
  old_malloc_hook  = __malloc_hook;
  old_free_hook    = __free_hook;
  /* `printf' might call `malloc', so protect it too. */
  sfprintf(sfstderr,"# realloc (%p,%u) returns %p @%p\n", ptr, (unsigned int) size, result, CALLER);
  /* Restore our own hooks */
  __realloc_hook = my_realloc_hook;
  __malloc_hook  = my_malloc_hook;
  __free_hook    = my_free_hook;
  return result;
}

static void *
my_memalign_hook (size_t align, size_t size, const void *CALLER)
{
  void *result;
  /* Restore all old hooks */
  __memalign_hook = old_memalign_hook;
  __malloc_hook = old_malloc_hook;
  __free_hook = old_free_hook;
  /* Call recursively */
  result = memalign (align, size);
  /* Save underlaying hooks */
  old_memalign_hook = __memalign_hook;
  old_malloc_hook  = __malloc_hook;
  old_free_hook    = __free_hook;
  /* `printf' might call `malloc', so protect it too. */
  sfprintf(sfstderr,"# memalign (%u,%u) returns %p @%p\n",
	   (unsigned)align, (unsigned)size, result, CALLER);
  /* Restore our own hooks */
  __memalign_hook = my_memalign_hook;
  __malloc_hook  = my_malloc_hook;
  __free_hook    = my_free_hook;
  return result;
}
     
static void
my_free_hook (void *ptr, const void *CALLER)
{
  /* Restore all old hooks */
  __malloc_hook = old_malloc_hook;
  __free_hook = old_free_hook;
  /* Call recursively */
  free (ptr);
  /* Save underlaying hooks */
  old_malloc_hook = __malloc_hook;
  old_free_hook = __free_hook;
  /* `printf' might call `free', so protect it too. */
  sfprintf(sfstderr,"# freed pointer %p @%p\n", ptr, CALLER);
  /* Restore our own hooks */
  __malloc_hook = my_malloc_hook;
  __free_hook = my_free_hook;
}
#endif

int
main(argc, argv)
	int argc;
	const char *argv[];
{
	struct ctlfile *cfp;
	const char *config, *cp;
	int i, daemonflg, c, errflg, version, fd;
	long offout, offerr;

	const char *t, *syslogflg;

#ifdef GLIBC_MALLOC_DEBUG__ /* memory allocation debugging with GLIBC */
	old_malloc_hook = __malloc_hook;
	__malloc_hook = my_malloc_hook;
	old_memalign_hook = __memalign_hook;
	__memalign_hook = my_memalign_hook;
	old_realloc_hook = __realloc_hook;
	__realloc_hook = my_realloc_hook;
	old_free_hook = __free_hook;
	__free_hook = my_free_hook;
#endif

#ifdef HAVE_SETGROUPS
	/* We null supplementary groups list entirely */
	setgroups(0, NULL);
#endif

	freeze = 0;
	/* setlinebuf(stderr);  -- no need for this ? */

	ArgvSave   = (char *) argv[0];
	EOArgvSave = argv[argc-1] + strlen(argv[argc-1]) + 1;

	mytime(&sched_starttime);

	memset(&dirqb,0,sizeof(dirqb));
	dirscan_mesh = sp_init();

	if ((progname = strrchr(argv[0], '/')) == NULL)
		progname = argv[0];
	else
		++progname;

	stickymem = MEM_MALLOC;

	resources_maximize_nofiles();

	/* The theory is, that scheduler needs circa 30 fd's for its own uses,
	   and it will use all others on child-process communication fifos. */
	/* Of these, 20 are for MQ2 sockets */

	global_maxkids = resources_query_nofiles()-30;

	/* Probe how many FDs each TA needs! */

	{
	  int to[2], from[2];
	  pipes_create(to,from);

	  close(to[0]); close(to[1]);

	  if (to[0] != from[1]) {

	    /* Two pipes! */
	    close(from[0]); close(from[1]);

	    global_maxkids >>= 1; /* Half the MAXKIDS count */

	  } /* else
	       Socketpair or equivalent bidirectional one!
	       Only one FD per child is used!               */
	}

	postoffice = rendezvous = logfn = statusfn = config = NULL;
	daemonflg = 1;
	dlyverbose = 0;
	syncstart = 0;
	verbose = errflg = version = 0;
	for (;;) {
		c = getopt(argc, (char*const*)argv,
			   "divE:f:Fl:HL:M:nN:p:P:q:QR:SVWZ:");
		if (c == EOF)
		  break;
		switch (c) {
		case 'f':	/* override default config file */
			config = optarg;
			break;
		case 'E':
		        sscanf(optarg, "%d,%d", &newents_limit, &newents_timelimit);

			if (newents_limit < 10)      newents_limit = 10;
			if (newents_timelimit < 2)   newents_timelimit = 2;
			if (newents_timelimit > 15)  newents_timelimit = 15;
			break;
		case 'F':
			freeze = 1;
			break;
		case 'l':
			statusfn = optarg;
			statuslog = sfopen(NULL, statusfn, "a");
			if (!statuslog) {
			  perror("Can't open statistics log file (-l)");
			  exit(1);
			}
			sfset(statuslog, SF_LINE, 1);
#if defined(F_SETFD)
			fcntl(sffileno(statuslog), F_SETFD, 1);
			/* close-on-exec */
#endif
			break;
		case 'H':
			if (hashlevels < 2)
				++hashlevels;
			break;
		case 'L':	/* override default log file */
			logfn = optarg;
			break;
		case 'M':
			mailqmode = atoi(optarg);
			if (mailqmode < 1 || mailqmode > 2) {
			  sfprintf(sfstderr,"scheduler: -M parameter is either 1, or 2\n");
			  exit(EX_USAGE);
			}
			break;
		case 'n':
			default_full_content = !default_full_content;
			break;
		case 'N':
			if ((transportmaxnofiles = atoi(optarg)) < 10)
				transportmaxnofiles = TRANSPORTMAXNOFILES;
			break;
		case 'p':
			procselect  = optarg;
			procselhost = strchr(procselect,'/');
			if (procselhost)
			  *procselhost++ = 0;
			else {
			  sfprintf(sfstderr,"scheduler: -p parameter is of form: channel/host\n");
			  exit(64);
			}
			break;
		case 'P':	/* override default postoffice */
			postoffice = optarg;
			break;
		case 'Q':
			mailq_Q_mode = 1;
			break;
		case 'q':	/* override default mail queue rendezvous */
			rendezvous = optarg;
			break;
		case 'R':	/* How many new childs per second to be spawned ? */
			forkrate_limit = atoi(optarg);
			if (forkrate_limit < 1)
			  forkrate_limit = 1;
			break;
		case 'v':	/* be verbose and synchronous */
			++verbose;
			daemonflg = 0;
			break;
		case 'W':
			dlyverbose = verbose;
			verbose = 0;
			break;
		case 'd':	/* daemon again */
			daemonflg = 1;
			break;
		case 'i':	/* interactive */
			daemonflg = 0;
			break;
		case 'V':
			version = 1;
			daemonflg = 0;
			break;
		case 'S':
			syncstart = 1;
			break;
		case 'Z':
			if (readzenv(optarg) == 0)
			  ++errflg;
			break;
		case '?':
		default:
			errflg++;
			break;
		}
	}

	if (errflg) {
	  sfprintf(sfstderr,
		   "Usage: %s [-dHisvV -M (1|2) -f configfile -L logfile -P postoffice -Q rendezvous -Z zenvfile]\n",
		   progname);
	  exit(128+errflg);
	}

	syslogflg = getzenv("SYSLOGFLG");
	if (syslogflg == NULL)
	  syslogflg = "";
	t = syslogflg;
	for ( ; *t ; ++t ) {
	  if (*t == 'c' || *t == 'C')
	    break;
	}
	do_syslog = (*t != '\0');


	cp = getzenv("SCHEDULERDIRHASH");
	if (cp){
	  if ((cp[0] == '1' || cp[0] == '2') &&
	      cp[1] == 0)
	    hashlevels = cp[0] - '0';
	}

	mailshare = getzenv("MAILSHARE");
	if (mailshare == NULL)
	  mailshare = MAILSHARE;
	cp = getzenv("LOGDIR");
	if (cp != NULL)
	  qlogdir = cp;
	if (daemonflg && logfn == NULL) {
	  logfn = emalloc(2 + (u_int)(strlen(qlogdir) + strlen(progname)));
	  sprintf((char*)logfn, "%s/%s", qlogdir, progname);
	}


	/* If we are a daemon, or doing verbose foobar, attach the SHM MIB now */
	if (daemonflg || verbose) {

	  int r = Z_SHM_MIB_Attach (1);

	  if (r < 0) {
	    /* Error processing -- magic set of constants: */
	    switch (r) {
	    case -1:
	      /* fprintf(stderr, "No ZENV variable: SNMPSHAREDFILE\n"); */
	      break;
	    case -2:
	      perror("Failed to open for exclusively creating of the SHMSHAREDFILE");
	      break;
	    case -3:
	      perror("Failure during creation fill of SGMSHAREDFILE");
	      break;
	    case -4:
	      perror("Failed to open the SHMSHAREDFILE at all");
	      break;
	    case -5:
	      perror("The SHMSHAREDFILE isn't of proper size! ");
	      break;
	    case -6:
	      perror("Failed to mmap() of SHMSHAREDFILE into memory");
	      break;
	    case -7:
	      fprintf(stderr, "The SHMSHAREDFILE  has magic value mismatch!\n");
	      break;
	    default:
	      break;
	    }
	    /* return; NO giving up! */
	  }
	}


	if (logfn != NULL) {
	  /* loginit is a signal handler, so can't pass log */
	  if (loginitsched(SIGHUP) < 0) /* do setlinebuf() there */
	    die(1, "log initialization failure");
	  /* close and reopen log files */
	  SIGNAL_HANDLE(SIGHUP, (RETSIGTYPE(*)__((int))) loginitsched);
	} else {
	  SIGNAL_IGNORE(SIGHUP); /* no surprises please */
	  sfset(sfstdout, SF_LINE, 1);
	}
#ifdef USE_SIGREAPER
# ifdef SIGCLD
	SIGNAL_HANDLE(SIGCLD,  sig_chld);
# else
	SIGNAL_HANDLE(SIGCHLD, sig_chld);
# endif
#else
# ifdef SIGCLD
	SIGNAL_HANDLE(SIGCLD,SIG_IGN);		/* Auto-reap the kids.. */
# else
	SIGNAL_HANDLE(SIGCHLD,SIG_IGN);
# endif
#endif

#ifdef	SIGUSR1
	SIGNAL_HANDLE(SIGUSR1, sig_readcf);
#endif	/* SIGUSR1 */
	if (verbose || version) {
	  prversion("scheduler");
	  if (version)
	    exit(0);
	  sfputc(sfstderr, '\n');
	}
	offout = sftell(sfstdout);
	offerr = sftell(sfstderr);
	if (config == NULL) {
	  config = emalloc(3 + (u_int)(strlen(mailshare)
				       + strlen(progname)
				       + strlen(qcf_suffix)));
	  sprintf((char*)config, "%s/%s.%s", mailshare, progname, qcf_suffix);
	}
	cehead = readconfig(config);
	if (cehead == NULL) {
	  cp = emalloc(strlen(config)+50);
	  sprintf((char*)cp, "null control file, probably errors in it: %s", config);
	  die(1, cp);
	  /* NOTREACHED */
	}



	if (postoffice == NULL && (postoffice = getzenv("POSTOFFICE")) == NULL)
	  postoffice = POSTOFFICE;

	if (chdir(postoffice) < 0 || chdir(TRANSPORTDIR) < 0)
	  sfprintf(sfstderr, "%s: cannot chdir to %s/%s.\n",
		   progname, postoffice, TRANSPORTDIR);

	if (rendezvous == NULL && (rendezvous = getzenv("RENDEZVOUS")) == NULL)
	  rendezvous = qoutputfile;
	if (daemonflg) {
	  /* X: check if another daemon is running already */
	  if (!verbose
	      && (offout < sftell(sfstdout) || offerr < sftell(sfstderr))) {
	    sfprintf(sfstderr, "%ld %ld %ld %ld\n", offout, sftell(sfstdout),
		     offerr, sftell(sfstderr));
	    sfprintf(sfstderr, "%s: daemon not started.\n", progname);
	    die(1, "too many scheduler daemons");
	    /* NOTREACHED */
	  }
	  detach();		/* leave worldy matters behind */
	  mytime(&now);
	  sfprintf(sfstdout, "%s: scheduler daemon (%s)\n\tpid %d started at %s\n",
		   progname, Version, (int)getpid(), (char *)rfc822date(&now));
	}


	/* Now we are either interactive, or daemon, lets attach monitoring
	   memory block.. and fill it in.  */

	MIBMtaEntry->sys.SchedulerMasterPID        = getpid();
	MIBMtaEntry->sys.SchedulerMasterStartTime  = time(NULL);
	MIBMtaEntry->sys.SchedulerMasterStarts    += 1;

	/* Zero the gauges at our startup.. */
	MIBMtaEntry->sc.StoredMessagesSc		= 0;
	MIBMtaEntry->sc.StoredThreadsSc		= 0;
	MIBMtaEntry->sc.StoredVerticesSc		= 0;
	MIBMtaEntry->sc.StoredRecipientsSc		= 0;
	MIBMtaEntry->sc.StoredVolumeSc		= 0;
	MIBMtaEntry->sc.TransportAgentProcessesSc	= 0;
	MIBMtaEntry->sc.TransportAgentsActiveSc	= 0;
	MIBMtaEntry->sc.TransportAgentsIdleSc		= 0;


	/* Actually we want this to act as daemon,
	   even when not in daemon mode.. */
	if (killprevious(SIGTERM, pidfile) != 0) {
	  sfprintf(sfstdout, "%s: Can't write my pid to a file ?? Out of diskspace ??\n",progname);
	  die(1,"Can't write scheduler pid to a file!?");
	  /* NOTREACHED */
	}

	for (i = 0; i < SIZE_L; ++i) {
	  spt_mesh[i] = sp_init();
	  spt_mesh[i]->symbols = sp_init();
	}
	zopenlog("scheduler", LOG_PID, LOG_MAIL);

	mustexit = gotalarm = dumpq = rereadcf = 0;
	canexit = 0;
	SIGNAL_IGNORE(SIGPIPE);
	SIGNAL_HANDLE(SIGALRM, sig_alarm);	/* process agenda */
	SIGNAL_HANDLE(SIGUSR2, sig_iot);	/* dump queue info */

	SIGNAL_HANDLE(SIGTERM, sig_exit);	/* split */
	SIGNAL_HANDLE(SIGQUIT, sig_quit);	/* Slow shutdown */

	/* call it to create the timeserver -- if possible */
	init_timeserver(); /* Will take around 3-4 secs.. */


	if (optind < argc) {
	  /* process the specified control files only */
	  for (; optind < argc; ++optind) {
	    long ino = atol(argv[optind]);
	    if ((fd = eopen(argv[optind], O_RDWR, 0)) < 0)
	      continue;
	    /* the close(fd) is done in vtxprep */
	    cfp = schedule(fd, argv[optind], ino, 0);
	    if (cfp == NULL) {
	      if (verbose)
		sfprintf(sfstderr, "Nothing scheduled for %s!\n",
			 argv[optind]);
	    } else
	      eunlink(argv[optind], "sch-argv");
	  }
	  doagenda();
	  killpidfile(pidfile);
	  exit(0);
	}

	queryipcinit();

	dirqueuescan(".", dirq, 1);

	vtxprep_skip_lock = 0;
	syncweb(dirq);

	canexit = 1;
#ifdef	MALLOC_TRACE
	mal_leaktrace(1);
#endif	/* MALLOC_TRACE */

	/* If we do a sync-start, we are synchronous.. */

	if (syncstart) {
	  int startcount;
	  dirqueuescan(".", dirq, 1);
	  startcount = dirq->wrksum;
	  while (dirq->wrksum > 0 && !mustexit) {
	    if (syncweb(dirq) < 0)
	      break;
	    queryipccheck();
	  }
	  sfprintf(sfstderr,"Synchronous startup completed, messages: %d (%d skipped) recipients: %d\n",
		   global_wrkcnt, startcount - global_wrkcnt, thread_count_recipients());
	  sfprintf(sfstderr,"***********************************************************************\n");
	  syncstart = 0;
	}

	if (dlyverbose) verbose = dlyverbose;

	do {
	  time_t timeout;

	  mytime(&now);

	  canexit = 0;

	  /* SAH Hmm... next_dirscan is NOT reset unless we find < 150
	   * new files in dirqueuescan - meaning this will be true.
	   * So a 'busy' system will potentially be running dirqueuescan()
	   * EVERY pass thru the loop... This could be bad
	   */

	  if (now >= next_dirscan) {
	    /* Directory scan time for new jobs ..    */
	    /* Do it recursively every now and then,
	       so that if we forget some jobs, they will
	       become relearned soon enough.          */
	    int wrk;
	    i = dirqueuescan(".", dirq, (now >= next_idlecleanup));
	    mytime(&now);
	    wrk = dirq->wrksum;
	    wrk >>= 5; /* Divide by 32 -- just presume 32 msgs/sec.. */
	    if (wrk > 10) wrk = 10; /* But limit to 10... */
	    next_dirscan = now + sweepinterval + wrk; /* 10 .. 20 second
							 sweep interval */
	  }

	  if (now >= next_idlecleanup) {
	    idle_cleanup();

	    if (next_idlecleanup == 0)  next_idlecleanup = now;

	    /* At regular intervals... */
	    next_idlecleanup += idle_sweepinterval;
	  }

	  if (now >= next_expiry2_scan) {
	    if (next_expiry2_scan == 0)  next_expiry2_scan = now;
	    i = doexpiry2();
	    if (i == 0) 
	      /* At regular intervals... */
	      next_expiry2_scan += expiry2_sweepinterval;
	  }

	  if (now >= next_interim_report_time) {
	    if (next_interim_report_time == 0) next_interim_report_time = now;
	    /* At regular intervals... */
	    next_interim_report_time += interim_report_interval;
	    interim_report_run();
	  }

	  /* See when to timeout from mux() */
	  timeout = next_dirscan;

	  /* If we still have things in pre-scheduler queue... */
	  if (dirq->wrksum > 0 &&
	      dirq->wrkcount > 0 &&
	      dirq->stats[ dirq->wrkcount-1 ]->not_before <= now)
	    timeout = now;

	  /* Submit possible new jobs (unless frozen) */
	  if (!freeze && !syncstart && doagenda() != 0)
	    timeout = now;	/* we still have some jobs avail for start */

	  gotalarm = 0;
	  canexit = 1;

	  /* mux on the stdout of children */
	  for (;;) {
	    if (dumpq) {
	      qprint(-1);
	      dumpq = 0;
	    }
	    if (rereadcf) {
	      cehead = rereadconfig(cehead, config);
	      rereadcf = 0;
	    }
	    mytime(&now);

	    /* Submit one item from pre-scheduler-queue into the scheduler */
	    if (dirq->wrksum > 1 ) {
	      /* If more things in queue, submit them up to 200 pieces, or
		 until spent two seconds at it! */
	      time_t now0 = now + 2;
	      for (i=0; i < 200 && now < now0; ++i) {
		if (syncweb(dirq) < 0)
		  break; /* Out of queue! */
		mytime(&now);
	      }
	    }
	    if (dirq->wrksum > 0)
	      syncweb(dirq);

	    if (slow_shutdown) {
	      timeout = now+2;
	      shutdown_kids();  /* If there are any to shut down.. */
	    }
	    if (!(mux(timeout) == 0 && !gotalarm &&
		  dirq->wrksum == 0 && !mustexit && timeout > now))
	      break;
	    if (readsockcnt == 0 && slow_shutdown) {
	      /* No more childs to read, hop hop die away! */
	      mustexit = 1;
	      break;
	    }
	  }
	} while (!mustexit);

	/* Doing nicely we would kill the childs here, but we are not
	   such a nice people -- we just discard incore data, and crash out.. */

	sp_scan(ctl_free, NULL, spt_mesh[L_CTLFILE]);

#ifdef	MALLOC_TRACE
	mal_dumpleaktrace(stderr);
#endif	/* MALLOC_TRACE */

	killpidfile(pidfile);

	if (mustexit)
		die(0, "signal");
	return 0;
}

static RETSIGTYPE sig_exit(sig)
int sig;
{
	
	if (querysocket >= 0) {		/* give up mailq socket asap */
		close(querysocket);
		querysocket = -1;
	}
	if (canexit)
		die(0, "signal");
	mustexit = 1;
}

static RETSIGTYPE sig_quit(sig)
int sig;
{
	slow_shutdown = 1;
	freeze = 1;
}


#ifdef SIGUSR2
static RETSIGTYPE sig_iot(sig)
int sig;
{
	dumpq = 1;
	SIGNAL_HANDLE(SIGUSR2, sig_iot);
}
#endif

#ifdef SIGUSR1
static RETSIGTYPE sig_readcf(sig)
int sig;
{
	rereadcf = 1;
	SIGNAL_HANDLE(SIGUSR1, sig_readcf);
}
#endif

/*
 * Absorb any new files that showed up in our directory.
 */

int
dq_insert(DQ, ino, file, delay)
	void *DQ;
	long ino;
	const char *file;
	int delay;
{
	struct stat stbuf;
	struct dirstatname *dsn;
	struct dirqueue *dq = DQ;

	if (!ino) return 1; /* Well, actually it isn't, but we "makebelieve" */

	mytime(&now);

	if (dq == NULL)
	  dq = dirq;

	if (lstat(file,&stbuf) != 0 ||
	    !S_ISREG(stbuf.st_mode)) {
	  /* Not a regular file.. Let it be there for the manager
	     to wonder.. */
	  return -1;
	}

	/* 
	   if (DQ == NULL)
	   sfprintf(sfstderr,"dq_insert(NULL,ino=%ld, file='%s', delay=%d)\n",
	   ino,file,delay);
	 */

	/* Is it already in the database ? */
	if (sp_lookup((u_long)ino,dirscan_mesh) != NULL) {
	  sfprintf(sfstderr,"scheduler: tried to dq_insert(ino=%ld, file='%s') already in queue\n",ino, file);
	  return 1; /* It is! */
	}

	/* Now store the entry */
	dsn = (struct dirstatname*)emalloc(sizeof(*dsn)+strlen(file)+1);
	memcpy(&(dsn->st),&stbuf,sizeof(stbuf));
	dsn->ino = ino;
	dsn->not_before = now + delay;
	strcpy(dsn->name,file);

	sp_install(ino, (void *)dsn, 0, dirscan_mesh);

	/* Put the new entry into the normal queue, unless there
	   is stuff at the delayed queue, OR the delay is positive! */

	if (delay <= 0 && dq->wrkcount2 == 0) {
	  /* Into the normal queue */
	  if (dq->wrkspace <= dq->wrkcount) {
	    /* Increase the space */
	    dq->wrkspace = dq->wrkspace ? dq->wrkspace << 1 : 8;
	    /* malloc(size) = realloc(NULL,size) */
	    dq->stats = (struct dirstatname**)erealloc(dq->stats,
						       sizeof(void*) *
						       dq->wrkspace);
	  }
	  dq->stats[dq->wrkcount] = dsn;
	  dq->wrkcount += 1;
	  dq->wrksum   += 1;
	  dq->sorted    = 0;
	} else {
	  /* Into the DELAYED QUEUE */
	  if (dq->wrkspace2 <= dq->wrkcount2) {
	    /* Increase the space */
	    dq->wrkspace2 = dq->wrkspace2 ? dq->wrkspace2 << 1 : 8;
	    /* malloc(size) = realloc(NULL,size) */
	    dq->stats2 = (struct dirstatname**)erealloc(dq->stats2,
							sizeof(void*) *
							dq->wrkspace2);
	  }
	  dq->stats2[dq->wrkcount2] = dsn;
	  dq->wrkcount2 += 1;
	  dq->wrksum    += 1;
	}
	++MIBMtaEntry->sc.ReceivedMessagesSc;
	return 0;
}

int in_dirscanqueue(DQ,ino)
	void *DQ;
	long ino;
{
	struct dirqueue *dq = DQ;

	if (dq == NULL)
	  dq = dirq;

	/* Return 1, if can find the "ino" in the queue */

	if (dq->wrksum == 0 || dirscan_mesh == NULL) return 0;
	if (sp_lookup((u_long)ino,dirscan_mesh) != NULL) return 1;
	return 0;
}

static int dq_ctimecompare __((const void *, const void *));
static int dq_ctimecompare(b,a) /* we want oldest entry LAST */
const void *a; const void *b;
{
	const struct dirstatname **A = (const struct dirstatname **)a;
	const struct dirstatname **B = (const struct dirstatname **)b;
	int rc;

#if 0
	if ((*A)->not_before > now) return  1;
	if ((*B)->not_before > now) return -1;
#endif

	rc = ((*A)->st.st_ctime - (*B)->st.st_ctime);
	return rc;
}

static int dirqueuescan(dir, dq, subdirs)
	const char *dir;
	struct dirqueue *dq;
	int subdirs;
{
	DIR *dirp;
	struct dirent *dp;
	struct stat stbuf;
	char file[MAXNAMLEN+1];
	int newents = 0;
	static time_t timelimit;
	time_t then;

	mytime( &now );
	then = now;

	if (dir && dir[0] == '.' && dir[1] == 0) {
	  timelimit = now + newents_timelimit;
	}

#if 0
	static time_t modtime = 0;

	/* Any changes lately ? */
	if (estat(dir, &stbuf) < 0)
	  return -1;	/* Could not stat.. */
	if (stbuf.st_mtime == modtime)
	  return 0;	/* any changes lately ? */
	modtime = stbuf.st_mtime;
#endif

	if (verbose && dir[0] == '.') {
	  sfprintf(sfstdout, "dirqueuescan(dir='%s') ",dir);
	  sfsync(sfstdout);
	}

	/* Some changes lately, open the dir and read it */

	dirp = opendir(dir);
	if (!dirp) {
	  sfprintf(sfstderr,"A 'this can never fail' opendir('%s') failed!; errno=%d (%s)\n",dir,errno,strerror(errno));
	  return 0;
	}

	for (dp = readdir(dirp); dp != NULL; dp = readdir(dirp)) {
	  /* Scan filenames into memory */

	  /* Here we are presuming the time retrieval is essentially
	     _free_ in form of shared memory segment.. */
	  mytime( &now );
	  if (now >  then) {
	    then = now;
	    queryipccheck();
	  }

	  if (!syncstart && now > timelimit)
	    break; /* At most  newents_timelimit  seconds in this scanner... */

	  if (!syncstart && newents > newents_limit)
	    break; /* At most NNN per one go */

	  if (subdirs &&
	      dp->d_name[0] >= 'A' &&
	      dp->d_name[0] <= 'Z' &&
	      dp->d_name[1] ==  0 ) {
	    /* We do this recursively.. */
	    if (dir[0] == '.' && dir[1] == 0)
	      strcpy(file, dp->d_name);
	    else
	      sprintf(file, "%s/%s", dir, dp->d_name);

	    if (lstat(file,&stbuf) != 0 ||
		!S_ISDIR(stbuf.st_mode)) {
	      /* Not a directory.. Let it be there for the manager
		 to wonder.. */
	      continue;
	    }
	    /* Recurse into levels below.. */
	    newents += dirqueuescan(file, dq, subdirs);
	    continue;
	  } /* End of directories of names "A" .. "Z" */

	  if (dp->d_name[0] >= '0' &&
	      dp->d_name[0] <= '9') {
	    /* A file whose name STARTS with a number (digit) */

	    long ino = atol(dp->d_name);
	    if (in_dirscanqueue(dq,ino))
	      /* Already in pre-schedule-queue */
	      continue;

	    if (dir[0] == '.' && dir[1] == 0)
	      strcpy(file, dp->d_name);
	    else
	      sprintf(file, "%s/%s", dir, dp->d_name);

	    /* We may have this file in processing state...  */
	    {
	      struct spblk *spl;
	      spl = sp_lookup((u_long)ino, spt_mesh[L_CTLFILE]);
	      if (spl != NULL) {
		/* Already in processing, don't touch.. */
		/*printf("File: %s active (not previously locked)\n",file);*/
		++vtxprep_skip_any;
		continue;
	      }
	    }

	    if (dq_insert(dq, ino, file, -1))
	      continue;

	    ++newents;
	  } /* ... end of "filename starts with a [0-9]" */
	}

#ifdef	BUGGY_CLOSEDIR
	/*
	 * Major serious bug time here;  some closedir()'s
	 * free dirp before referring to dirp->dd_fd. GRRR.
	 * XX: remove this when bug is eradicated from System V's.
	 */
	close(dirp->dd_fd);
#endif
	closedir(dirp);

	if (verbose && dir[0] == '.')
	  sfprintf(sfstdout,"wrksum=%d new=%d\n",dq->wrksum,newents);

	return newents;
}

int syncweb(dq)
	struct dirqueue *dq;
{
	struct stat *stbuf;
	char *file;
	struct dirstatname *dqstats;
	struct spblk *spl;
	int wrkcnt = 0;
	int wrkidx = dq->wrkcount -1;
	long ino;

	/* Any work to do ? */
	if (dq->wrksum == 0) return -1;

	/* Be responsive, check query channel */
	queryipccheck();

	if (dq->wrkcount == 0 && dq->wrkcount2 > 0) {
	  /* Ok, the primary queue is empty, move the delayed
	     queue into the primary one. */
	  if (dq->stats != NULL) free(dq->stats);
	  dq->stats = dq->stats2;
	  dq->stats2 = NULL;
	  dq->wrkcount  = dq->wrkcount2;
	  dq->wrkcount2 = 0;
	  dq->wrkspace  = dq->wrkspace2;
	  dq->wrkspace2 = 0;
	  wrkidx = dq->wrkcount - 1; /* SAH ReSync wrkidx                    */
	  dq->sorted = 0;
	}

	if (!dq->sorted && dq->wrkcount > 1) {
	  /* Sort the dq->stats[] per file ctime -- LATEST on slot 0.
	     (we drain this queue from the END) */
	  mytime(&now);
	  qsort((void*)dq->stats,
		dq->wrkcount,
		sizeof(void*),
		dq_ctimecompare );
	  dq->sorted = 1;
	}

	mytime(&now);

	for (; wrkidx >= 0; --wrkidx) {

	  /* Process only one -- from the tail -- but leave
	     'not_before' things behind... */
	  if (dq->stats[wrkidx]->not_before > now)
	    continue; /* WAIT! */

	  break;
	}

	/* Ok some, decrement the count to change it to index */
	dq->wrkcount -= 1;
	dq->wrksum   -= 1;

	dqstats = dq->stats[wrkidx];
	file  =   dqstats->name;
	stbuf = &(dqstats->st);
	ino   =   dqstats->ino;

	if (dq->wrkcount > wrkidx) {
	  /* Skipped some ! Compact the array ! */
	  memcpy( &dq->stats[wrkidx], &dq->stats[wrkidx+1],
		  sizeof(dq->stats[0]) * (dq->wrkcount - wrkidx));
	}
	dq->stats[dq->wrkcount] = NULL;

	/* Now we have pointers */

	/* Deletion from the  dirscan_mesh  should ALWAYS succeed.. */
	spl = sp_lookup((u_long)ino, dirscan_mesh);
	if (spl != NULL)
	  sp_delete(spl,dirscan_mesh);

	/* Sometimes we may get files already in processing
	   into our pre-schedule queue */

	/* Already in processing ? */
	spl = sp_lookup((u_long)ino, spt_mesh[L_CTLFILE]);
	if (spl == NULL) {
	  /* Not yet in processing! */
	  int fd;

	  /* Can open ? */
	  if ((fd = eopen(file, O_RDWR, 0)) < 0) {
	    /* Don't unlink here, ... */
	    /* errno == EMFILE  --> out of FDs!  URGH! */
#if 0
	    if (getuid() == 0)
	      eunlink(file,"sch-syncweb");	/* hrmpf! */
#endif
	  } else {
	    /* Ok, schedule! */
	    if (schedule(fd, file, ino, 0) != NULL) {
	      /* Success, increment counters */
	      ++wrkcnt;
	    }
	  }
	}

	/* Free the pre-schedule queue entry */
	free(dqstats);

	return wrkcnt;
}


static int sync_cfps __((struct ctlfile *,struct ctlfile *,struct procinfo*));
static int sync_cfps(oldcfp, newcfp, proc)
     struct ctlfile *oldcfp, *newcfp;
     struct procinfo *proc;
{
	struct vertex *ovp,  *nvp;
	struct vertex *novp, *nnvp;
	int ovp_count = 0;

	/* Scan both files thru their   vp->next[L_CTLFILE]  vertex chains.
	   If oldcfp has things that newcfp does not have, remove those from
	   oldcfp chains.

	   Chains start at  *cfp->head  pointer.

	   Comparisons can be done in linear order with respective
	   vp->orig[L_CHANNEL] and vp->orig[L_HOST] pointers to struct web..

	   We presume the  newcfp  will not contain objects that oldcfp does
	   not have -- while theorethically possible, it is not supported
	   skenario..
	*/

	if (verbose) {
	  sfprintf(sfstdout,"sync_cfps() OLDVTXES =");
	  for (ovp = oldcfp->head; ovp; ovp = ovp->next[L_CTLFILE])
	    sfprintf(sfstdout," %p{%p %p}", ovp, ovp->orig[L_CHANNEL], ovp->orig[L_HOST]);
	  sfprintf(sfstdout,"\n");
	  sfprintf(sfstdout,"sync_cfps() NEWVTXES =");
	  for (nvp = newcfp->head; nvp; nvp = nvp->next[L_CTLFILE])
	    sfprintf(sfstdout," %p{%p %p}", nvp, nvp->orig[L_CHANNEL], nvp->orig[L_HOST]);
	  sfprintf(sfstdout,"\n");
	}

	ovp = oldcfp->head;
	nvp = newcfp->head;


	while (ovp != NULL && nvp != NULL) {
	  ++ovp_count;

	  /* Always prepare for removal of the ovp object..
	     Pick the next-ovp pointer now */
	  novp  = ovp->next[L_CTLFILE];
	  nnvp  = nvp->next[L_CTLFILE];

	  /* Does this exist also on NVP chain ? */

#define VTXMATCH(ovp,nvp) (((ovp)->orig[L_CHANNEL] == (nvp)->orig[L_CHANNEL]) && ((ovp)->orig[L_HOST] == (nvp)->orig[L_HOST]))

	  if (VTXMATCH(ovp, nvp)) {
	    /* Verify/adjust OVP so that OVP and NVP have same
	       address indexes in them. */
	    int i, j, id;
	    if (verbose) {
	      sfprintf(sfstdout, " OLD ngroup=%d {", ovp->ngroup);
	      for(i = 0; i < ovp->ngroup; ++i)
		sfprintf(sfstdout, " %d", ovp->index[i]);
	      sfprintf(sfstdout, " }\n");
	      sfprintf(sfstdout, " NEW ngroup=%d {", nvp->ngroup);
	      for(i = 0; i < nvp->ngroup; ++i)
		sfprintf(sfstdout, " %d", nvp->index[i]);
	      sfprintf(sfstdout, " }\n");
	    }
	    for (i = ovp->ngroup -1; i >= 0; --i) {
	      id = ovp->index[i];
	      for (j = nvp->ngroup -1; j >= 0; --j) {
		if (nvp->index[j] == id)
		  goto next_i;
	      }
	      /* ovp index elt not found in new set! */
	      for (j = i+1; j < ovp->ngroup; ++j)
		ovp->index[j-1] = ovp->index[j];
	      ovp->ngroup -= 1;
	      MIBMtaEntry->sc.StoredRecipientsSc -= 1;
	    next_i:;
	    }
	    if (ovp->ngroup <= 0)
	      unvertex(ovp,-1,1); /* Don't unlink()! free() *just* ovp! */

	    ovp = novp;
	    nvp = nnvp;
	    continue;
	  }

	  if (!VTXMATCH(ovp,nvp)) {

	    struct vertex *vp;
	    int i;

	    /* Uugh... Does not match :-( */

	    vp = ovp;

	    while (vp && !VTXMATCH(vp,nvp))
	      vp = vp->next[L_CTLFILE];

	    if (vp == NULL) {
	      /* New not in old at all ??? */
	      if (verbose)
		sfprintf(sfstdout," -- NEW vertex %p NOT in OLD set!\n", nvp);
	      /* Ok, skip it, and retry with OLD vs. new NEW */
	      nvp = nnvp;
	      continue;
	    }

	    /* All OVP instances before matching NVP
	       are to be removed from OVP chains */
	    i = 0;
	    while (ovp && ovp != vp) {
	      novp = ovp->next[L_CTLFILE];

	      MIBMtaEntry->sc.StoredRecipientsSc -= vp->ngroup;
	      ovp->ngroup = 0;
	      unvertex(ovp,-1,1); /* Don't unlink()! free() *just* ovp! */

	      ++i;
	      ++ovp_count;
	      ovp = novp;
	    }
	    if (verbose)
	      sfprintf(sfstdout," -- Deleted %d OLD vertices\n", i);
	    /* Adjust NOVP variable too */
	    novp = NULL;
	    if (ovp)
	      novp = ovp->next[L_CTLFILE];
	  }

	  ovp = novp;
	  nvp = nnvp;
	}

	/* Ok,  'ovp' might be non-NULL while 'nvp' is already NULL */
	while (ovp) {
	  novp = ovp->next[L_CTLFILE];

	  MIBMtaEntry->sc.StoredRecipientsSc -= ovp->ngroup;
	  ovp->ngroup = 0;
	  unvertex(ovp,-1,1); /* Don't unlink()! free() *just* ovp! */
	  /* Leaves CFP unharmed */

	  ovp = novp;
	}

	if (verbose)
	  sfprintf(sfstdout," -- Synced %d OLD vertices\n", ovp_count);

	oldcfp->rcpnts_failed = newcfp->rcpnts_failed;
	oldcfp->haderror |= newcfp->haderror;

	/* Now  oldcfp->head  might be NULL -> bad news.. */

	return 0;
}


void resync_file(proc, file)
	struct procinfo *proc;
	const char *file;
{
	struct stat stbuf;
	struct spblk *spl;
	long ino;
	const char *s;
	int fd;
	struct ctlfile *oldcfp, *newcfp;

	queryipccheck();

	lstat(file,&stbuf);

	s = strrchr(file,'/');
	if (s) ++s; else s = file;
	ino = atoi(s);

	/* Sometimes we may get reports of files
	   already deleted from processing.. */

	/* Already in processing ? */
	spl = sp_lookup((u_long)ino, spt_mesh[L_CTLFILE]);
	if (spl == NULL) {
#if 0
	  if (!in_dirscanqueue(NULL,ino)) {
	    sfprintf(sfstdout,"Resyncing file \"%s\" (ino=%ld)", file, ino);
	    sfprintf(sfstdout," .. not in processing database\n");
	  }
#endif
	  /* Not (anymore) in processing! */
	  return;
	}

	oldcfp = (struct ctlfile *)(spl->data);
	oldcfp->id = 0; /* Don't scramble spt_mesh[] later below */

	if (spl != NULL)
	  sp_delete(spl, spt_mesh[L_CTLFILE]);
	spl = NULL;

	++ oldcfp->resynccount;

	sfprintf(sfstdout, "Resyncing file \"%s\" (ino=%d pid=%d of=%d ho='%s') reqcnt=%d global-work-count=%ld\n",
		 file, (int) ino, proc->pid, proc->overfed,
		 (proc->ho ? proc->ho->name : "<NULL>"),
		 oldcfp->resynccount, global_wrkcnt);
	/* sfprintf(sfstdout, " .. in processing db\n"); */

	if (oldcfp->resynccount > MAXRESYNCS) {
	  /* Sigh.. Throw everything away :-( */
	  oldcfp->id = ino;
	  cfp_free(oldcfp, NULL);
	  sfprintf(sfstdout, " ... too many Resync attempts this way, throwing  it away...\n");
	  return;
	}

	/* cfp_free()->unvertex()->unctlfile() will do reinsertion */
	/* dq_insert(NULL,ino,file,31); */

	/* Now read it back! */
	fd = eopen(file, O_RDWR, 0);
	if (fd < 0) {
	  /* ???? */
	  sfprintf(sfstdout," .. FILE OPEN FAILED!\n");

	  /* Delete it from memory */
	  cfp_free(oldcfp, NULL);
	  return;
	}


	newcfp = schedule(fd, file, ino, 1); /* rereading */

	if (newcfp != NULL) {
	  /* ????  What ever, it succeeds, or it fails, all will be well */

	  sync_cfps(oldcfp, newcfp, proc);
	  newcfp->id = 0; /* Don't scramble spt_mesh[] below */

	  spl = sp_lookup((u_long)ino, spt_mesh[L_CTLFILE]);
	  if (spl)
	    sp_delete(spl, spt_mesh[L_CTLFILE]);
	  oldcfp->id = ino;
	  sp_install(oldcfp->id, (void *)oldcfp, 0, spt_mesh[L_CTLFILE]);

	  /* Delete it from memory */
	  cfp_free0(newcfp);

	  if (oldcfp->head == NULL) {
	    cfp_free(oldcfp,spl);
	    sfprintf(sfstdout," .. LOST in resync ?!  (wrkcnt %ld)\n", global_wrkcnt);
	  } else
	    sfprintf(sfstdout," .. resynced!  (wrkcnt %ld)\n", global_wrkcnt);

	} else {

	  /* Sigh.. Throw everything away :-( */
	  oldcfp->id = ino;
	  cfp_free(oldcfp, NULL);

	  sfprintf(sfstdout," .. NOT resynced!  (wrkcnt %ld)\n", global_wrkcnt);
	}
}



void receive_notify(fd)
	int	fd;
{
	char buf[1000], *s, *file;
	int i, ok, cnt;
	u_long ino;

	/* 10 receives at the time AT MOST */
	for (cnt=10;cnt;--cnt) {

	  i = recvfrom(fd, buf, sizeof(buf), 0, NULL, NULL);
	  if (i <= 0 && errno != EINTR) return;
	  if (i < 0) continue;

	  if (i >= sizeof(buf)) i = sizeof(buf)-1;
	  buf[i] = 0;

	  /* The only supported notify is:
	       "NEW X/Y/file-name"
	     message */


	  if (strncmp(buf,"NEW ",4) != 0) continue;

	  file = s = buf+4;
	  ok = 0;

	  if ('1' <= s[0] && s[0] <= '9' && strchr(s, '/') == NULL) {
	    ok = 1;
	  } else  if ('A' <= s[0] && s[0] <= 'Z' && s[1] == '/') {
	    s += 2;
	    if ('1' <= s[0] && s[0] <= '9' && strchr(s, '/') == NULL) {
	      ok = 1;
	    } else  if ('A' <= s[0] && s[0] <= 'Z' && s[1] == '/') {
	      s += 2;
	      if ('1' <= s[0] && s[0] <= '9' && strchr(s, '/') == NULL) {
		ok = 1;
	      }
	    }
	  }

	  ino = atol(s);

#if 0
	  sfprintf(sfstderr,"SCH-NOTIFY: len=%d '%s' file='%s', ino=%ld ok=%d\n",
		   i, buf, file, ino, ok);
	  zsyslog((LOG_INFO, "Got notify: '%s'", buf));
#endif

	  if (!ok) continue;

#if 0
	  if (lstat(file,&stbuf) != 0) continue;
	  if (!S_ISFILE(stbuf.st_mode)) continue;

	  if (in_dirscanqueue(dirq,stbuf.st_ino)) continue;
#endif

	  /* We may have this file in processing state...  */
	  {
	    struct spblk *spl;
	    spl = sp_lookup(ino, spt_mesh[L_CTLFILE]);
	    if (spl != NULL) {
	      /* Already in processing, don't touch.. */
	      printf("File: '%s' active (not previously locked)\n", buf+4);
	      continue;
	    }
	  }

	  dq_insert(dirq, ino, buf+4, -1);
	}
} 




/*
 * The schedule() function is in charge of reading the control file from
 * the scheduler directory, and creating all the appropriate links to the
 * control file.
 * Since it is convenient to do so here, it also goes through the list
 * of transport directives and schedules the appropriate things to be
 * done at the appropriate time in the future.
 */
static struct ctlfile *schedule(fd, file, ino, reread)
	int fd;
	const char *file;
	long ino;
	const int reread;
{
	struct ctlfile *cfp;
	struct vertex *vp;

	/* read and process the control file */
	cfp = vtxprep(slurp(fd, ino), file, reread);
	if (cfp == NULL) {
	  if (!vtxprep_skip) {	/* Unless skipped.. */
	    eunlink(file,"sch-sch-done");	/* everything here has been processed */
	    if (verbose)
	      sfprintf(sfstdout,"completed, unlink %s\n",file);
	    return NULL;
	  }
	  vtxprep_skip_any += vtxprep_skip;
	  return NULL;
	}

	if (cfp->head == NULL) {
	  unctlfile(cfp, 0); /* Delete the file.
				(decrements those counters above!) */
	  return NULL;
	}

	if (!reread) {
	  sp_install(cfp->id, (void *)cfp, 0, spt_mesh[L_CTLFILE]);

	  for (vp = cfp->head; vp != NULL; vp = vp->next[L_CTLFILE]) {
	    /* Put into the schedules */
	    vtxdo(vp, cehead, file);
	  }
	}

	/* Now we have no more need for the contents in core */
	if (cfp->contents != NULL) {
	  free(cfp->contents);
	  cfp->contents = NULL;
	}

	return cfp;
}

/*
 * slurp() gets in the job-descriptor file, and does initial
 *         parsing on it.
 */

struct ctlfile *
slurp(fd, ino)
	int fd;
	long ino;
{
	register char *s;
	register int i;
	char *contents;
	int *offset;
	int offsetspace;
	struct stat stbuf;
	struct ctlfile *cfp;

	if (fd < 0)
	  return NULL;
	if (efstat(fd, &stbuf) < 0) {
	  close(fd);
	  return NULL;
	}
	if (!S_ISREG(stbuf.st_mode)) {
	  close(fd);
	  return NULL;	/* XX: give error */
	}
	contents = emalloc((int)stbuf.st_size + 1);
	if (eread(fd, contents, stbuf.st_size) != stbuf.st_size) { /* slurp */
	  close(fd);
	  free(contents); /* Failed to read ?!?! */
	  return NULL;
	}
	contents[stbuf.st_size] = 0;

	cfp = (struct ctlfile *)emalloc(sizeof(struct ctlfile));
	memset((void*)cfp, 0, sizeof(struct ctlfile));

	cfp->fd       = fd;
	cfp->dirind   = -1; /* Not known -- or top-level */
	cfp->uid      = stbuf.st_uid;
	cfp->envctime = stbuf.st_ctime;
	cfp->contents = contents;

	/* 
	   cfp->vfpfn    = NULL;
	   cfp->spoolid  = NULL;
	   cfp->head     = NULL;
	   cfp->mark     = V_NONE;
	   cfp->haderror = 0;
	   cfp->mid      = NULL;
	   cfp->envid    = NULL;
	   cfp->logident = NULL;
	   cfp->erroraddr = NULL;
	   cfp->msgbodyoffset = 0;
	 */

	/* go through the file and mark it off */
	i = 0;
	offsetspace = 100;
	offset = (int*)emalloc(sizeof(int)*offsetspace);
	offset[i++] = 0L;
	for (s = contents; s - contents < stbuf.st_size; ++s) {
	  if (*s == '\n') {
	    *s++ = '\0';
	    if (s - contents < stbuf.st_size) {
	      if (i >= offsetspace-1) {
		offsetspace += 20;
		offset = (int*)erealloc(offset,sizeof(int)*offsetspace);
	      }
	      offset[i++] = s - contents;
	      if (*s == _CF_MSGHEADERS ||
		  *s == _CF_MIMESTRUCT) {
		/* find a \n\n combination */
		while (!(*s == '\n' && *(s+1) == '\n'))
		  if (s-contents < stbuf.st_size)
		    ++s;
		  else
		    break;
		if (s - contents >= stbuf.st_size) {
		  /* XX: header ran off file */
		}
	      } else if (*s == _CF_BODYOFFSET) {
		cfp->msgbodyoffset = atoi(s+2);
	      } else if (*s == _CF_DSNENVID) {
		/* cfp->envid = strsave(s+2); */
	      } else if (*s == _CF_DSNRETMODE) {
		/* cfp->dsnretmode = strsave(s+2); */
	      } else if (*s == _CF_MESSAGEID) {
		/* cfp->mid = strsave(s+2); */
	      } else if (*s == _CF_LOGIDENT) {
		/* cfp->logident = strsave(s+2); */
	      } else if (*s == _CF_ERRORADDR) {
		/* cfp->erroraddr = strsave(s+2); */
	      } else if (*s == _CF_TURNME) {
		/* Umm... it is a bit complex */
		/* sfprintf(sfstdout,"An ETRN request for target '%s'\n",s+2); */
	      }
	    }
	  }
	}
	cfp->nlines = i;
	/* closing fd must be done in vtxprep(), so we can unlock stuff easy */
	cfp = (struct ctlfile *)erealloc((void*)cfp,
					 (u_int) (sizeof(struct ctlfile) +
						  i * (sizeof offset[0])));
	if (cfp) {
	  /* copy over the offsets */
	  memcpy(&(cfp->offset[0]), offset, sizeof(offset[0]) * i);

	  cfp->id = ino;
	  /* cfp->mid = NULL; */

	  /* INC there in every case! */
	  ++global_wrkcnt;
	  ++MIBMtaEntry->sc.StoredMessagesSc;

	} else {
	  /* realloc() failed.. */
	  free(contents);
	}

	free(offset);	/* release the block */
	return cfp;
}

struct offsort {
	int	offset;
	int	myidx;
	int	headeroffset;
	int	drptoffset;
	int	delayslot;
	int	notifyflg;
	char	*sender;
	/* char	*dsnrecipient; */
	time_t	wakeup;
};

/* ``bcfcn'' is used by the qsort comparison routine,
   bcp points to the control file bytes
 */

static char *bcp;

static int bcfcn __((const void *, const void *));
static int bcfcn(a, b)
     const void *a, *b;
{
	const struct offsort *aa = a, *bb = b;
	return strcmp(bcp + aa->offset, bcp + bb->offset);
}


static int
lockverify(cfp,cp,verbflg)	/* Return 1 when lock process does not exist */
	struct ctlfile *cfp;	/* Call only when the lock is marked active! */
	const char *cp;
	const int verbflg;
{
	char	lockbuf[1+_CFTAG_RCPTPIDSIZE];
	int	lockpid;
	int	sig = 0;

#if 0
# ifdef SIGCONT	/* OSF/1 acts differently if we use SIGNAL 0 :-( */
	sig = SIGCONT;
# endif
#endif

	++cp;
	if (!(*cp == ' ' ||
	      (*cp >= '0' && *cp <= '9'))) return 1; /* Old-style entry */
	memcpy(lockbuf,cp,_CFTAG_RCPTPIDSIZE);
	lockbuf[sizeof(lockbuf)-1] = 0;
	if (sscanf(lockbuf,"%d",&lockpid) != 1) return 1; /* Bad value ? */
	if (kill(lockpid,sig) != 0) return 1; /* PID does not exist, or
					       other error.. */
	if (verbflg)
	  sfprintf(sfstderr,"lockverify: Lock with PID=%d is active on %s:%s\n",
		   lockpid, cfp->mid, cp+_CFTAG_RCPTPIDSIZE);
	return 0;	/* Locking PID does exist.. */
}


/*
 *  The  vtxprep() does deeper analysis on jobs described at the file.
 *  It verifies possible locks (if they are still valid), and gathers
 *  all of the information regarding senders and recipients.
 *
 *  All "recipient"-lines are sorted to ease searching of vertices with
 *  identical channel, and host definitions.  If there are more than one
 *  recipient with given (channel, host)-tuple, all such recipients are
 *  wrapped into same vertex node with its respective ``recipient group''.
 *
 */
static struct ctlfile *vtxprep(cfp, file, rereading)
	struct ctlfile *cfp;
	const char *file;
	const int rereading;
{
	register int i, opcnt;
	register int *lp;
	int svn;
	int c_echannel, c_ehost, c_l_echannel, c_l_ehost;
	char *cp, *channel, *host, *l_channel, *l_host;
	char *echannel, *ehost, *l_echannel, *l_ehost, mfpath[100], flagstr[2];
	char *latest_sender = NULL;
	char *senderchannel = NULL;
	struct vertex *vp, *pvp, **pvpp, *head;
	struct stat stbuf;
	struct offsort *offarr;
	int offspc, mypid;
	int prevrcpt = -1;
	int is_turnme = 0;
	time_t wakeuptime;
	long format = 0;

	char fpath[128], path[128], path2[128];

	if (cfp == NULL)
	  return NULL;

	mytime(&now);

	strcpy(fpath, file);

	channel = host = NULL;

	/* copy offsets into local array */
	offspc = 16;
	offarr = (struct offsort *)emalloc(offspc * sizeof(struct offsort));

	if (!offarr) {
	  /* malloc() failure.. */
	  sfprintf(sfstderr,"malloc failed, discarding job: '%s'\n", file);
	  cfp_free(cfp, NULL);
	  return NULL;
	}


	mypid = getpid();
	opcnt = 0;
	svn = 0;
	vtxprep_skip = 0;
	lp = &cfp->offset[0];
	for (i = 0; i < cfp->nlines; ++i, ++lp) {
	  cp = cfp->contents + *lp + 1;
	  wakeuptime = 0;
	  if (*cp == _CFTAG_LOCK) {
	    /*
	     * This can happen when we restart the scheduler, and
	     * some previous transporter is still running.
	     * (and DEFINITELY during resync! when we simply ignore locks)
	     */
	    if (!lockverify(cfp, cp, !rereading)) {
#if 0
	      long ino = 0;
	      /*
	       * IMO we are better off by forgetting for a while that
	       * this spool-file exists at all.  Thus very least we
	       * won't errorneously delete it.
	       */
	      close(cfp->fd);	/* Was opened on  schedule() */
	      if (cfp->vfpfn != NULL) {
		Sfio_t *vfp = vfp_open(cfp);
		if (vfp) {
		  sfprintf(vfp,
			   "scheduler: Skipped a job-file because it is held locked by PID=%6.6s\n",cp+1);
		  sfclose(vfp);
		}
	      }
	      cfp_free(cfp, NULL);
	      ++vtxprep_skip;
	      ++vtxprep_skip_lock;
	      free(offarr);

	      /* XXX: Should we  dq_insert()  this back in ?
		      Or do we let the occasional recursive
		      scanner to check things deeply ? */
	      cp = strrchr(file,'/');
	      if (cp) ino = atol(cp+1); else ino = atol(file);
	      dq_insert(NULL, ino, file, 32);

	      return NULL;
#else
	      /* We can't simply forget this, we must do something
		 smarter -- We use approach of marking the vertex
		 non-startable until one hour from now. */
	      wakeuptime = now + 3600; /* 1 hour */
#endif
	    } else {
	      if (*cp != _CFTAG_NORMAL) {
		*cp = _CFTAG_NORMAL; /* unlock it */
		lockaddr(cfp->fd, NULL, (long) (cp - cfp->contents),
			 _CFTAG_LOCK, _CFTAG_NORMAL, cfp->mid,
			 cp+_CFTAG_RCPTPIDSIZE, mypid);
	      }
	    }
	  }
	  /* Calculate summary info */
	  if (cp[-1] == _CF_RECIPIENT) {
	    ++cfp->rcpnts_total;
	    if (*cp == _CFTAG_NOTOK) {
#if 0
	      ++cfp->rcpnts_failed;
	      ++MIBMtaEntry->sc.ReceivedRecipientsSc,
	      ++MIBMtaEntry->sc.TransmittedRecipientsSc;
#endif
	      prevrcpt = -1;
	    } else if (*cp != _CFTAG_OK) {
	      ++cfp->rcpnts_work;
	    } else { /* _CFTAG_OK */
#if 0
	      ++MIBMtaEntry->sc.ReceivedRecipientsSc,
	      ++MIBMtaEntry->sc.TransmittedRecipientsSc;
#endif
	    }
	  }
	  if (*cp == _CFTAG_NORMAL ||
	      (rereading && *cp == _CFTAG_LOCK) ||
	      *cp == '\n' /* This appears for msg-header entries.. */ ) {
	    ++cp;
	    switch (cp[-2]) {
	    case _CF_FORMAT:
	      sscanf(cp,"%li",&format);
	      if (format & (~_CF_FORMAT_KNOWN_SET)) {

		sfprintf(sfstderr,"%s: ** FILE: '%s' has unknown/unsupported format set: 0x%08lx !\n",
			 progname, file, format);

		cfp_free(cfp, NULL);
		free(offarr);
		return NULL;
	      }
	      cfp->format = format;
	      break;
	    case _CF_SENDER:
	      while (*cp == ' ') ++cp;
	      senderchannel = cp;
	      while (*cp != 0 && *cp != ' ') ++cp; 
	      if (*cp == ' ') *cp++ = 0;
	      while (*cp == ' ') ++cp;
	      /* Scan over the sender 'host' */
	      cp = skip821address(cp);
	      while (*cp == ' ') ++cp;
	      /* Scan over the sender 'user' */
	      latest_sender = cp;
	      cp = skip821address(cp);
	      *cp = 0;
	      if (cfp->erroraddr)  free(cfp->erroraddr);
	      cfp->erroraddr = strsave(latest_sender);
	      if (strcmp(senderchannel,"error")==0) {
		cfp->iserrmesg = 1;
		cfp->erroraddr[0] = 0; /* Make it '<>' */
	      } break;
	    case _CF_RECIPIENT:
	      if (opcnt >= offspc-1) {
		offspc *= 2;
		offarr = (struct offsort *)erealloc(offarr,
						    sizeof(struct offsort) *
						    offspc);
		if (!offarr) {
		  /* realloc() failure.. */
		  sfprintf(sfstderr,"realloc() failed, discarding job: '%s'\n", file);
		  cfp_free(cfp, NULL);
		  return NULL;
		}
	      }
	      offarr[opcnt].offset = *lp + 2;
	      strlower(cp);
	      if ((format & _CF_FORMAT_TA_PID) || *cp == ' ' ||
		  (*cp >= '0' && *cp <= '9')) {
		/* New PID locking scheme.. */
		offarr[opcnt].offset += _CFTAG_RCPTPIDSIZE;
		cp += _CFTAG_RCPTPIDSIZE;
	      }
	      if ((format & _CF_FORMAT_DELAY1) || *cp == ' ' ||
		  (*cp >= '0' && *cp <= '9')) {
		/* Newer DELAY data slot - _CFTAG_RCPTDELAYSIZE bytes */
		offarr[opcnt].delayslot = offarr[opcnt].offset;
		offarr[opcnt].offset += _CFTAG_RCPTDELAYSIZE;
		cp += _CFTAG_RCPTDELAYSIZE;
	      } else
		offarr[opcnt].delayslot = 0;
	      offarr[opcnt].wakeup = wakeuptime;
	      offarr[opcnt].myidx = i;
	      offarr[opcnt].headeroffset = -1;
	      offarr[opcnt].drptoffset = -1;
	      offarr[opcnt].sender = latest_sender;
	      offarr[opcnt].notifyflg = NOT_FAILURE;
	      prevrcpt = opcnt;
	      ++opcnt;

	      break;
	    case _CF_RCPTNOTARY:
	      /* IETF-NOTARY-DRPT+NOTIFY DATA! */
	      if (prevrcpt >= 0) {
		offarr[prevrcpt].drptoffset = *lp + 2;
		/* Lets parse the input, we want NOTIFY= flags */
		while (*cp != 0) {
		  while (*cp == ' ' || *cp == '\t') ++cp;
		  if (CISTREQN("NOTIFY=",cp,7)) {
		    offarr[prevrcpt].notifyflg = 0;
		    cp += 7;
		    while (*cp) {
		      if (CISTREQN(cp,"NEVER",5)) {
			cp += 5;
			offarr[prevrcpt].notifyflg |= NOT_NEVER;
		      } else if (CISTREQN(cp,"DELAY",5)) {
			cp += 5;
			offarr[prevrcpt].notifyflg |= NOT_DELAY;
		      } else if (CISTREQN(cp,"FAILURE",7)) {
			cp += 7;
			offarr[prevrcpt].notifyflg |= NOT_FAILURE;
		      } else if (CISTREQN(cp,"SUCCESS",7)) {
			cp += 7;
			offarr[prevrcpt].notifyflg |= NOT_SUCCESS;
		      } else if (CISTREQN(cp,"TRACE",  5)) {
			cp += 5;
			offarr[prevrcpt].notifyflg |= NOT_TRACE;
		      } else {
			break; /* Burp ?? */
		      }
		      if (*cp == ',') ++cp;
		    }
		  } else if (CISTREQN("BY=",cp,7)) {
		    /* FIXME: Parse the BY= parameter! */
		    while (*cp && *cp != ' ' && *cp != '\t') ++cp;
		  } else {
		    while (*cp && *cp != ' ' && *cp != '\t') ++cp;
		  }
		}
	      }
	      prevrcpt = -1; /* Add ONCE! */
	      break;
	    case _CF_MSGHEADERS:
	      for (/* we count up.. */; svn < opcnt; ++svn)
		offarr[svn].headeroffset = *lp + 2;
	      break;
	    case _CF_DSNRETMODE:
	      if (cfp->dsnretmode) free(cfp->dsnretmode);
	      cfp->dsnretmode = strsave(cp);
	      break;
	    case _CF_DSNENVID:
	      if (cfp->envid) free(cfp->envid); /* shouldn't happen.. */
	      cfp->envid = strsave(cp);
	      break;
	    case _CF_DIAGNOSTIC:
	      cfp->haderror = 1;
	      break;
	    case _CF_MESSAGEID:
	      if (cfp->mid != NULL) free(cfp->mid); /* shouldn't happen.. */
	      cfp->mid = strsave(cp);
	      break;
	    case _CF_BODYOFFSET:
	      cfp->msgbodyoffset = atoi(cp);
	      break;
	    case _CF_LOGIDENT:
	      if (cfp->logident) free(cfp->logident); /* shouldn't happen..*/
	      cfp->logident = strsave(cp);
	      break;
	    case _CF_ERRORADDR:
	      if (cfp->erroraddr) free(cfp->erroraddr); /* could happen */
	      cfp->erroraddr = strsave(cp);
	      break;
	    case _CF_OBSOLETES:
	      deletemsg(cp, cfp);
	      break;
	    case _CF_VERBOSE:
	      cfp->vfpfn = strsave(cp);
	      break;
	    case _CF_TURNME:
	      sfprintf(sfstdout,"%s ETRN: %s\n", timestring(), cp);
	      strlower(cp);
	      turnme(cp);
	      is_turnme = 1;
	      break;
	    }
	  }
	}
	close(cfp->fd);	/* closes the fd opened in schedule() */

	if (fpath[0] >= 'A') {
	  /* Prefixed with a subdir path */
	  int hash = 0;
	  char *s = fpath;

	  while (*s >= 'A' && *s <= 'Z') {
	    hash <<= 8;
	    hash |= (*s) & 0xff;
	    ++s;
	    if (*s != '/') break;
	    ++s;
	  }
	  cfp->dirind = hash;

	} else { /* Not at subdirs */

	  if (cfp->mid && hashlevels > 0) {
	    /* We have a desire to use subdirs, now the magic of hashing
	       into subdirs...  inode number ? */

	    int hash, rc;

	    if (hashlevels > 1) {
	      hash = atol(cfp->mid) % (26*26);
	      hash = (('A' + hash / 26) << 8) | ('A' + hash % 26);
	    } else {
	      hash = (atol(cfp->mid) % 26) + 'A';
	    }
	    cfp->dirind = hash;
	    /* Ok, we have the hash values, now move the file
	       to match with our hashes.. */
	    sprintf(path, "%s%s", cfpdirname(hash), fpath);
	    
	    while ((rc = rename(fpath, path)) != 0) {
	      if (errno != EINTR) break;
	    }
	    if (rc != 0) {
	      /* Failed, why ? */
	      if (errno != ENOENT) {
		/* For any other than 'no such (target) directory' */
		cfp->dirind = -1;
	      } else {
		cfp_mksubdirs("",cfpdirname(hash));
		while ((rc = rename(fpath, path)) != 0) {
		  if (errno != EINTR) break;
		}
		if (rc != 0)
		  cfp->dirind = -1; /* Failed for any reason */
		else
		  strcpy(fpath, path);
	      }
	    } else
	      strcpy(fpath, path);

	    if (cfp->dirind >= 0) {
	      /* Successfully renamed the transport file to a subdir,
		 now do the same to the queue directory! */
	      sprintf(path,  "../%s/%s", QUEUEDIR, cfp->mid);
	      sprintf(path2, "../%s/%s%s",
		      QUEUEDIR, cfpdirname(cfp->dirind), cfp->mid);

	      while ((rc = rename(path, path2)) != 0) {
		if (errno != EINTR) break;
	      }
	      if (rc != 0) {
		if (errno == ENOENT) {
		  /* No dirs ?? */
		  cfp_mksubdirs(QUEUEDIR,cfpdirname(hash));
		  while ((rc = rename(path, path2)) != 0) {
		    if (errno != EINTR) break;
		  }
		}
	      }
	      /* If failed, it will be reported below */
	    }
	  }
	}

	if (cfp->mid != NULL) {
	  if (cfp->dirind >= 0)
	    sprintf(mfpath, "../%s/%s%s",
		    QUEUEDIR, cfpdirname(cfp->dirind), cfp->mid);
	  else
	    sprintf(mfpath, "../%s/%s", QUEUEDIR, cfp->mid);
	}
	if (cfp->mid == NULL || cfp->logident == NULL ||
	    estat(mfpath, &stbuf) < 0) {
	  if (!is_turnme) {
	    Sfio_t *vfp = vfp_open(cfp);
	    if (vfp) {
	      sfprintf(vfp, "aborted due to missing information\n");
	      sfclose(vfp);
	    }
	  }
	  cfp_free(cfp, NULL);
	  free(offarr);
	  return NULL;
	}
	cfp->mtime = stbuf.st_mtime; /* instead of ctime, we use mtime
					at the queue-dir accesses, this
					way we can move the spool to
					another machine, and run things
					in there, and still have same
					expiration times.. */

	/* Reuse the buffer ...
	   Generate same spoolid string that all other subsystems also
	   report to syslog. */
	taspoolid(path2, cfp->mtime, cfp->id);
	cfp->spoolid = strsave(path2);
	if (!cfp->spoolid) {
	  /* malloc() failure.. */
	  sfprintf(sfstderr,"malloc() failed, discarding job: '%s'\n", file);
	  cfp_free(cfp, NULL);
	  free(offarr);
	  return NULL;
	}

	cfp->fd = -1;
	/* sort it to get channels and channel/hosts together */
	bcp = cfp->contents;
	if (opcnt > 1)
	  qsort((char *)offarr, opcnt, sizeof (struct offsort), bcfcn);
	/*
	 * Loop through them; whenever either channel or host changes,
	 * make a new vertex. All such vertices must be linked together.
	 */
	strcpy(flagstr," ");
	l_channel = l_echannel = l_host = l_ehost = flagstr;
	c_echannel = c_ehost = c_l_echannel = c_l_ehost = 0;
	svn = 0;
	pvp = NULL;
	head = NULL;
	pvpp = &head;
	for (i = 0; i < opcnt; ++i) {
	  channel = bcp + offarr[i].offset /* - 2 */;
	  while (*channel == ' ') ++channel; /* Skip possible space(s) */
	  host = echannel = skip821address(channel);
	  c_l_echannel = *host; if (c_l_echannel) *host++ = '\0';
	  while (*host == ' ') ++host; /* Skip space(s) */
#if 1
	  /* [mea]   channel ((mx.target)(mx.target mx.target)) rest.. */
	  cp = host;
	  if (*cp == '(') {
	    while(*cp == '(')
	      ++cp;
	    while(*cp && *cp != ' ' && *cp != '\t' && *cp != ')')
	      ++cp;
	    if (*cp)
	      ehost = cp;
	    else
	      continue;		/* error! */
	    /* Ok, found ehost, now parsing past parenthesis.. */
	    cp = host;
	    while(*host == '(')  ++host;
	    if (*cp == '(') {
	      ++cp;
	      while(*cp == '(') {
		/* Find ending ')', and skip it */
		while(*cp && *cp != ')') ++cp;
		if (*cp == ')') ++cp;
	      }
	      /* Ok, scanned past all inner parenthesis, now ASSUME
		 next one is ending outer parenthesis, and skip it */
	      ++cp;
	    }
	    if (*cp != ' ' && *cp != '\t')
	      continue;		/* Not proper separator.. error! */
	  } else
#endif
	    {
	      ehost = skip821address(host);
	      strlower(host);
	      if (ehost == NULL || ehost == host) /* error! */
		continue;
	    }

	  c_ehost = *ehost; *ehost = '\0';
	  /* compare with the last ones */
	  if (strcmp(channel, l_channel) || strcmp(host, l_host)) {
	    /* wrap and tie the old vertex node */
	    if (i != svn) {
	      u_int alloc_size = (u_int) (sizeof (struct vertex) +
					  (i - svn - 1) * sizeof (int));
	      vp = (struct vertex *)emalloc(alloc_size);
	      if (!vp) {
		/* malloc() failure.. */
		sfprintf(sfstderr,"malloc() failed, discarding job: '%s'\n", file);
		cfp_free(cfp, NULL);
		free(offarr);
		return NULL;
	      }
	      MIBMtaEntry->sc.StoredVerticesSc += 1;

	      memset((char*)vp, 0, alloc_size);
	      vp->cfp             = cfp;
	      vp->next[L_CTLFILE] = NULL;
	      vp->prev[L_CTLFILE] = pvp;
#if 0
	      vp->thread          = NULL;
	      vp->web[L_CTLFILE]  = NULL;
	      vp->message         = NULL;
	      vp->retryindex      = 0;
	      vp->nextitem        = NULL;
	      vp->previtem        = NULL;
	      vp->proc            = NULL;
	      vp->attempts        = 0;
	      vp->notary          = NULL;
#endif
	      vp->ngroup       = i - svn;
	      MIBMtaEntry->sc.StoredRecipientsSc   += (i - svn);
	      MIBMtaEntry->sc.ReceivedRecipientsSc += (i - svn);

	      /* vp->sender       = strsave(offarr[svn].sender); */
	      vp->wakeup       = offarr[svn].wakeup;
	      vp->headeroffset = offarr[svn].headeroffset; /*They are similar*/
	      vp->drptoffset   = offarr[svn].drptoffset;
	      vp->notaryflg    = offarr[svn].notifyflg;
	      while (svn < i) {
		vp->index[i-svn-1] = offarr[svn].myidx;
		++svn;
	      }
	      *pvpp = vp;
	      pvpp = &vp->next[L_CTLFILE];
	      pvp = vp;
	      link_in(L_HOST,    vp, l_host);
	      link_in(L_CHANNEL, vp, l_channel);
	    }
	    /* create a new vertex node */
	    svn = i;
	  }
	  /* stick the current 'r'ecipient  line into the current vertex */
	  /* restore the characters */
	  *l_echannel  = c_l_echannel;
	  *l_ehost     = c_l_ehost;
	  l_echannel   = echannel;
	  l_ehost      = ehost;
	  c_l_echannel = c_echannel;
	  c_l_ehost    = c_ehost;
	  l_channel    = channel;
	  l_host       = host;
	} /* for( .. i < opcnt .. ) */

	/* wrap and tie the old vertex node (this is a copy of code above) */
	if (i != svn) {
	  u_int alloc_size = (u_int) (sizeof (struct vertex) +
				      (i - svn - 1) * sizeof (int));
	  vp = (struct vertex *)emalloc(alloc_size);
	  if (!vp) {
	    /* malloc() failure.. */
	    sfprintf(sfstderr,"malloc() failed, discarding job: '%s'\n", file);
	    cfp_free(cfp, NULL);
	    free(offarr);
	    return NULL;
	  }
	  MIBMtaEntry->sc.StoredVerticesSc += 1;

	  memset((void*)vp, 0, alloc_size);
	  vp->cfp = cfp;
	  vp->next[L_CTLFILE] = NULL;
	  vp->prev[L_CTLFILE] = pvp;
#if 0
	  vp->message = NULL;
	  vp->retryindex = 0;
	  vp->nextitem = NULL;
	  vp->previtem = NULL;
	  vp->proc = NULL;
	  vp->attempts     = 0;
	  vp->notary       = NULL;
#endif
	  vp->ngroup = i - svn;
	  MIBMtaEntry->sc.StoredRecipientsSc   += (i - svn);
	  MIBMtaEntry->sc.ReceivedRecipientsSc += (i - svn);

	  /* vp->sender = strsave(offarr[snv].sender); */
	  vp->wakeup       = offarr[svn].wakeup;
	  vp->headeroffset = offarr[svn].headeroffset; /* Just any of them will do */
	  vp->drptoffset = offarr[svn].drptoffset;
	  vp->notaryflg  = offarr[svn].notifyflg;
	  while (svn < i) {
	    vp->index[i-svn-1] = offarr[svn].myidx;
	    ++svn;
	  }
	  *pvpp = vp;
	  pvpp = &vp->next[L_CTLFILE];
	  pvp = vp;
	  link_in(L_HOST, vp, host);
	  link_in(L_CHANNEL, vp, channel);
	}

	*l_echannel = c_l_echannel;
	*l_ehost    = c_l_ehost;

	/*
	   for (vp = head; vp != NULL; vp = vp->next[L_CTLFILE]) {
	     sfprintf(sfstdout,"--\n");
	     for (i = 0; i < vp->ngroup; ++i)
	       sfprintf(sfstdout,"\t%s\n", cfp->contents+cfp->offset[vp->index[i]]);
	   }
	*/

	cfp->head = head;
	free(offarr);
	if (verbose) {
	  int completed = cfp->rcpnts_total - cfp->rcpnts_work -
			  cfp->rcpnts_failed;
	  sfprintf(sfstdout,"vtxprep: msg %s rcptns total %d work %d failed %d done %d\n  vertices:",
		 cfp->mid, cfp->rcpnts_total, cfp->rcpnts_work,
		 cfp->rcpnts_failed, completed );
	  vp = cfp->head;
	  for (vp = cfp->head; vp; vp = vp->next[L_CTLFILE]) {
	    sfprintf(sfstdout," %p", vp);
	  }
	  sfprintf(sfstdout,"\n");
	}

	{
	  Sfio_t *vfp = vfp_open(cfp);
	  if (vfp) {
	    if (cfp->mid) sfprintf(vfp, "scheduler processing %s\n", cfp->mid);
	    sfclose(vfp);
	  }
	}

	return cfp;
}

/*
 *  The  vtxmatch()  is a subroutine to  vtxdo()  matching for
 *  scheduler definition entries from the scheduler configuration table.
 *
 */

static int vtxmatch(vp, tp)
	struct vertex *vp;
	struct config_entry *tp;
{
	/* if the channel doesn't match, there's no hope! */
	if (verbose>1)
	  sfprintf(sfstdout,"ch? %s %s\n", vp->orig[L_CHANNEL]->name, tp->channel);
	if (tp->channel[0] == '*' && tp->channel[1] == '\0')
	  return 0; /* Never match the defaults entry! */
	if (!globmatch(tp->channel, vp->orig[L_CHANNEL]->name))
	  return 0;

	if (!(tp->host == NULL || tp->host[0] == '\0' ||
	      (tp->host[0] == '*' && tp->host[1] == '\0'))) {
	  if (!globmatch(tp->host, vp->orig[L_HOST]->name))
	    return 0;
	}

	if (verbose>1)
	  sfprintf(sfstdout,"host %s %s\n", vp->orig[L_HOST]->name, tp->host);

	return 1;
}

static void ce_fillin __((struct threadgroup *, struct config_entry *));
static void ce_fillin(thg,cep)
	struct threadgroup *thg;
	struct config_entry *cep;
{
	struct config_entry *ce = &(thg->ce);

	defaultconfigentry( ce,cep );

	if (cep->interval != -1) ce->interval = cep->interval;
	if (cep->idlemax  != -1) ce->idlemax  = cep->idlemax;
	if (cep->expiry   != -1) ce->expiry   = cep->expiry;
	if (cep->expiry2  != -1) ce->expiry2  = cep->expiry2;
	ce->expiryform     = cep->expiryform;
	if (cep->uid      != -1) ce->uid = cep->uid;
	if (cep->gid      != -1) ce->gid = cep->gid;
	if (cep->maxkids  != -1) ce->maxkids = cep->maxkids;
	if (cep->maxkidChannel != -1) ce->maxkidChannel = cep->maxkidChannel;
	if (cep->maxkidThreads != -1) ce->maxkidThreads = cep->maxkidThreads;
	if (cep->nretries > 0) {
	  ce->nretries = cep->nretries;
	  ce->retries  = cep->retries;
	}
	if (cep->command != NULL) {
	  ce->command = cep->command;
	  ce->argv    = cep->argv;
	}
	ce->flags |= cep->flags; /* XX: Grumble.. additive only.. */
	ce->host   = cep->host;
	if (cep->skew > 0) ce->skew = cep->skew;

	if (ce->interval == -1)
	  ce->interval = 3600;
	if (ce->idlemax == -1)
	  ce->idlemax = ce->interval * 3;
	if (ce->maxkids == -1)
	  ce->maxkids = global_maxkids;
	if (ce->maxkidChannel == -1)
	  ce->maxkidChannel = global_maxkids;
	if (ce->maxkidThreads == -1)
	  ce->maxkidThreads = global_maxkids;
}


/*
 *  The  vtxdo()  tries thru all scheduler configuration entries
 *  looking for a matching one which to fill in for the input vertex.
 *
 *  In the end it calls  reschedule()  to place the vertex into scheduling
 *  queues (threads)
 *
 */

static void vtxdo(vp, cehdr, path)
	struct vertex *vp;
	struct config_entry *cehdr;
	const char *path;
{
	struct config_entry *tp;
	int n;
	int cnt = 0;

	/*
	 * go through scheduler control file entries and
	 * fill in the blanks in the vertex specification
	 */
	n = 0;
	for (tp = cehdr; tp != NULL; tp = tp->next) {
	  ++cnt;
	  if (vtxmatch(vp, tp)) {
	    /* tp points to the selected config entry */
	    n = 1;
	    break;
	  }
	}
	if (n == 0) {
	  sfprintf(sfstderr, "%s: no pattern matched %s/%s address %s%s\n",
		   progname, vp->orig[L_CHANNEL]->name,vp->orig[L_HOST]->name,
		   path ? "file=":"", path);
	  /* XX: memory leak here? */
	  return;
	}
	if (verbose)
	  sfprintf(sfstdout, "Matched %dth config entry with: %s/%s\n", cnt,
		   vp->orig[L_CHANNEL]->name, vp->orig[L_HOST]->name);

	/* set default values */
	if (tp->expiry > 0)
	  vp->ce_expiry = tp->expiry + vp->cfp->mtime;
	else
	  vp->ce_expiry = 0;

	if (tp->expiry2 > 0 && tp->expiry > 0)
	  vp->ce_expiry2 = tp->expiry2 + vp->ce_expiry;
	else
	  vp->ce_expiry2 = /* vp->ce_expiry */ 0;

	if (vp->ce_expiry2 < vp->ce_expiry)
	  vp->ce_expiry2 = vp->ce_expiry + 24*3600;

	if (tp->reporttimes[0])
	  vp->nextrprttime = now + tp->reporttimes[0];
	else
	  vp->nextrprttime = now + global_report_interval;

	thread_linkin(vp,tp,cnt,ce_fillin);

	if (verbose>1)
	  vtxprint(vp);
}


int vtxredo(spl)
        struct spblk *spl;
{
        struct ctlfile *cfp = (struct ctlfile *)spl->data;
	struct vertex *vp;

        /* assert cfp != NULL */
	for (vp = cfp->head ; vp != NULL ; vp = vp->next[L_CTLFILE]) {
	  vtxdo(vp, rrcf_head, NULL);
	}
        return 0;
}


/* Shell-GLOB-style matching */
static int globmatch(pattern, string)
	register const char	*pattern;
	register const char	*string;
{
	while (1) {
	  switch (*pattern) {
	  case '{':
	    {
	      const char *p = pattern+1;
	      const char *s = string;

	      /* This matches at the END of the pattern:  '*.{fii,foo,faa}' */

	      for ( ; *p != 0 && *p != '}'; ++p) {
		if (*p == ',') {
		  if (*s == '\0')
		    return 1; /* We have MATCH! */
		  s = string;
		  continue;
		}
		if (*s != *p) {
		  /* Not the same .. */
		  s = string;
		  /* Ok, perhaps next pattern segment ? */
		  while (*p != '\0' && *p != '}' && *p != ',')
		    ++p;
		  if (*p != ',')
		    return 0; /* No next pattern ?
				 We definitely have no match! */
		  continue;
		}
		if (*s != 0)
		  ++s;
	      }
	      if (*p == '\0' || *p == '}')
		if (*s == 0)
		  return 1;
	      return 0;

	    }
	    break;
	  case '*':
	    ++pattern;
	    if (*pattern == 0) {
	      /* pattern ended with '*', we can accept any string trail.. */
	      return 1;
	    }
	    /* We do 'common case' optimization here, but will loose some
	       performance, if somebody gives '*foo*' as a pattern.. */
	    {
	      const char *p = pattern;
	      int i = 0, c;
	      while ((c = *p++) != 0) {
		/* Scan for special chars in pattern.. */
		if (c == '*' || c == '[' || c == '{' || c == '\\' || c == '?'){
		  i = 1; /* Found! */
		  break;
		}
	      }
	      if (!i) { /* No specials, match from end of string */
		int len = strlen(string);
		i = strlen(pattern);
		if (i > len) return 0; /* Tough.. pattern longer than string */
		if (memcmp(string + len - i, pattern, i) == 0)
		  return 1; /* MATCH! */
	      }
	    }
	    do {
	      if (globmatch(pattern, string))
		return 1;
	    } while (*string++ != '\0');
	    return 0;
	  case '\\':
	    ++pattern;
	    if (*pattern == 0 ||
		*pattern != *string)
	      return 0;
	    break;
	  case '[':
	    if (*string == '\0')
	      return 0;
	    if (*(pattern+1) == '^') {
	      ++pattern;
	      while ((*++pattern != ']')
		     && (*pattern != *string))
		if (*pattern == '\0')
		  return 0;
	      if (*pattern != ']')
		return 0;
	      string++;
	      break;
	    }
	    while ((*++pattern != ']') && (*pattern != *string))
	      if (*pattern == '\0')
		return 0;
	    if (*pattern == ']')
	      return 0;
	    while (*pattern++ != ']')
	      if (*pattern == '\0')
		return 0;
	    string++;
	    break;
	  case '?':
	    ++pattern;
	    if (*string++ == '\0')
	      return 0;
	    break;
	  case '\0':
	    return (*string == '\0');
	  default:
	    if (*pattern++ != *string++)
	      return 0;
	  }
	}
}

/*
 * This routine links a group of addresses (described by what vp points at)
 * into the Tholian Web (err, our matrix). The flag (either L_HOST or
 * L_CHANNEL) selects a namespace of strings pointed at by s. It just
 * happens we need 2 (host and channel names), and we don't care what
 * they are as long as they are in separate spaces.
 */
static void link_in(flag, vp, s)
	int flag;
	struct vertex *vp;
	const char *s;
{
	struct web *wp = web_findcreate(flag,s);

	wp->linkcnt += 1;

	vp->next[flag] = NULL;
	vp->prev[flag] = wp->lastlink;
	vp->orig[flag] = wp;
	if (wp->lastlink != NULL)
	  wp->lastlink->next[flag] = vp;
	wp->lastlink = vp;
	if (wp->link == NULL)
	  wp->link = vp;
}

/*
 * time-string to be prepended to logged messages
 */
char *
timestring()
{
	static char timebuf[40];
	struct tm *tp;

	mytime(&now);
	tp = localtime(&now);
	sprintf(timebuf,"%d%02d%02d%02d%02d%02d",
		tp->tm_year+1900, tp->tm_mon+1, tp->tm_mday,
		tp->tm_hour, tp->tm_min, tp->tm_sec);
	return timebuf;
}

static int running_on_alarm;
static time_t alarm_time;

static RETSIGTYPE sig_alarm(sig)
int sig;
{
	time(&alarm_time);
	SIGNAL_HANDLE(SIGALRM, sig_alarm);
	alarm(1);
}

static int    timeserver_pid = 0;

static void timer_health_check __((void))
{
  /* XXX: TODO! TIMER HEALTH CHECK */
  if (timeserver_pid > 0) {
  }
  if (running_on_alarm) {
  }
}


#if defined(HAVE_MMAP)

struct timeserver *timeserver_segment = NULL;
static void init_timeserver()
{
	int ppid;

	if ( Z_SHM_MIB_is_attached() ) {

	  timeserver_segment = & MIBMtaEntry->ts;

	} else { /* SHARED MIB MEMORY BLOCK IS NOT ATTACHED */

#define MAPSIZE 16*1024 /* Just in case other methods didn't work.. e.g.
			   that MIB-block.. */

#if !defined(MAP_ANONYMOUS) || defined(__linux__)
	  /* must have a file ? (SunOS 4.1, et.al.) */
	  int fd = -1;
	  char blk[1024];
	  int i;
	  Sfio_t *fp = sftmp(0); /* Create the backing file fairly reliably. */

	  if (fp)
	    fd = sffileno(fp);
	  if (fd < 0) {
	    perror("init_timeserver() didn't create tempfile ??");
	    return; /* Brr! */ 
	  }
	  
	  memset(blk,0,sizeof(blk));
	  for (i=0; i < MAPSIZE; i += sizeof(blk))
	    sfwrite(fp, blk, sizeof(blk));
	  sfsync(fp);
	  sfseek(fp, 0, 0);

#ifndef MAP_FILE
# define MAP_FILE 0 /* SunOS 4.1 does not have this */
#endif
	  timeserver_segment = (void*)mmap(NULL, MAPSIZE, PROT_READ|PROT_WRITE,
					   MAP_FILE|MAP_SHARED, fd, 0);
	  sfclose(fp);
	
#else /* MAP_ANONYMOUS and not Linux */

#ifndef MAP_VARIABLE
# define MAP_VARIABLE 0 /* Some system have MAP_ANONYMOUS, but
			   no MAP_VARIABLE.. */
#endif
	  /* This MAP_ANONYMOUS does work at DEC OSF/1 ... */
	  timeserver_segment = (void*)mmap(NULL, MAPSIZE, PROT_READ|PROT_WRITE,
					   MAP_VARIABLE|MAP_ANONYMOUS|MAP_SHARED,
					   -1, 0);
#endif
	  if (-1L == (long)timeserver_segment
	      || timeserver_segment == NULL) {
	    perror("mmap() of timeserver segment gave");
	    timeserver_segment = NULL;
	    return; /* Brr.. */
	  }

	} /* Have global shared server segment, or not ... */

	ppid = fork();
	if (ppid > 0) {
	  time_t now2;
	  timeserver_pid = ppid;
	  timeserver_segment->pid = ppid;

	  time(&now);
	  sleep(3);

#ifdef HAVE_SELECT
	  now2 = timeserver_segment->tv.tv_sec;
#else
	  now2 = timeserver_segment->time_sec;
#endif
	  if (now2 == 0 || now2 == now) {
	    /* HUH ?! No incrementation in 3 seconds ?? */
	    /* Start ticking in alarm() mode! */
	    kill (ppid, SIGTERM);
	    timeserver_pid = 0;
	    timeserver_segment = NULL;
	    alarm(1);
	  }

	  return;
	}

	if (ppid < 0) return; /* Error ?? brr.. */


	/* ========  TIME SERVER CHILD ======= */


	{

	  int space_check_count = 0;

	  MIBMtaEntry->sc.schedulerTimeserverStartTime = time(NULL);
	  MIBMtaEntry->sc.schedulerTimeserverStarts ++;


#ifdef HAVE_SETPROCTITLE
	  setproctitle("[TimeServer]");
#else
	  strncpy(ArgvSave,"[TimeServer]", EOArgvSave - ArgvSave);
#endif

	  if (ArgvSave < EOArgvSave)
	    ArgvSave[EOArgvSave-ArgvSave-1] = 0;

	  ppid = getppid(); /* who is our parent ? */

#ifdef HAVE_SELECT
	  gettimeofday(&timeserver_segment->tv, NULL);
#else
	  time(&timeserver_segment->time_sec);
#endif

	  for(;;) {
#ifdef HAVE_SELECT
	    struct timeval tv;
	    int rc;

	    tv.tv_sec = 0;
	    tv.tv_usec = 300000;

	    gettimeofday(&timeserver_segment->tv, NULL);

	    rc = select(0,NULL,NULL,NULL,&tv);
#else
	    time(&timeserver_segment->time_sec);
	    sleep(1);
#endif
	    /* Is the parent still alive ?? */
	    if (kill(ppid, 0) != 0)
	      break; /* No ?? Out! */

	    /* Now check and fill in the filesystem free space
	       gauges */
	    if (--space_check_count  < 0) {
	      long spc = Z_SHM_FileSysFreeSpace();

	      MIBMtaEntry->sys.SpoolFreeSpace = spc;
	      /* Log FS space is monitored by the self-timed
		 log reinit code. */

#ifdef HAVE_SELECT
	      space_check_count = 100; /* ticks about 3 times a second */
#else
	      space_check_count =  30; /* ticks once a second.. */
#endif
	    }

	  }

	} /* time-server child block */

	_exit(0);
}
#else
static void init_timeserver()
{
  /* Do nothing.. */
}

#endif

time_t mytime(timep)
time_t *timep;
{
	static int healthcheck = 10000;

	if (--healthcheck <= 0) {
	  healthcheck = 10000;
	  timer_health_check();
	}
#if defined(HAVE_MMAP)
	if (timeserver_pid) {
	  time_t t;
#ifdef HAVE_SELECT
	  t = timeserver_segment->tv.tv_sec;
#else
	  t = timeserver_segment->time_sec;
#endif
	  if (timep != NULL)
	    *timep = t;
	  return t;
	}
#endif
	if (running_on_alarm) {
	  if (timep)
	    *timep = alarm_time;
	  return alarm_time;
	}
#ifdef HAVE_GETTIMEOFDAY
	{
	  struct timeval tv;
	  gettimeofday(&tv,NULL);
	  if (timep)
	    *timep = tv.tv_sec;
	  return tv.tv_sec;
	}
#else
	return time(timep); /* The classical old version.. */
#endif
}

const char *
cfpdirname(hash)
int hash;
{
	static char dirbuf[8];

	if (hash < 0)
	  return "";
	if (hash < 256) {
	  sprintf(dirbuf, "%c/", hash);
	} else {
	  sprintf(dirbuf, "%c/%c/", (hash >> 8) & 0xff, hash & 0xff);
	}
	return dirbuf;
}


/* We make one, or at most two subdirs */

void
cfp_mksubdirs(topdir,newpath)
const char *topdir, *newpath;
{
	char path[256];
	int omask = umask(022);
	char *s;

	if (*topdir != 0)
	  sprintf(path, "../%s/%s", topdir, newpath);
	else
	  strcpy(path, newpath);

	s = strrchr(path,'/');
	if (s[1] == 0) *s = 0; /* Trailing slash elimination */

	if (mkdir(path,02755) != 0) {
	  char *s = strrchr(path,'/');
	  if (s) *s = 0;
	  mkdir(path,02755);
	  if (s) *s = '/';
	  mkdir(path,02755);
	}
	umask(omask);
}


syntax highlighted by Code2HTML, v. 0.9.1