/*
 *	ZMailer 2.99.16+ Scheduler "threads" routines
 *
 *	Copyright Matti Aarnio <mea@nic.funet.fi> 1995-2003
 *
 *
 *	These "threads" are for book-keeping of information
 *	regarding schedulable recipient vertices
 */

#include <stdio.h>
#include <sfio.h>
#include <ctype.h>
#include <unistd.h>
#include "scheduler.h"
#include "prototypes.h"
#include "zsyslog.h"
/* #include <stdlib.h> */

/*
   Each vertex arrives into SOME thread, each of which belong to
   SOME thread-group, among which groups the transport channel
   programs started for any member thread can be shared among
   their members.
*/


#define MAX_HUNGER_AGE 600 /* Sign of an error ... */

extern char *proc_state_names[];

struct thread      *thread_head = NULL;
struct thread      *thread_tail = NULL;

static struct threadgroup *thrg_root   = NULL;
int idleprocs = 0;
extern int global_wrkcnt;
extern int syncstart;
extern int freeze;
extern int slow_shutdown;
extern time_t now;
extern char *procselect, *procselhost;
extern time_t sched_starttime; /* From main() */
extern int mailqmode;	/* 1 or 2 */

static long groupid  = 0;
static long threadid = 0;

static void  thread_vertex_shuffle __((struct thread *thr));
static struct threadgroup *create_threadgroup __((struct config_entry *cep, struct web *wc, struct web *wh, int withhost, void (*ce_fillin)__((struct threadgroup *, struct config_entry *)) ));
static int   thread_start_ __((struct thread *thr));


static struct threadgroup *
create_threadgroup(cep, wc, wh, withhost, ce_fillin)
struct config_entry *cep;
struct web *wc, *wh;
int withhost;
void (*ce_fillin) __((struct threadgroup*, struct config_entry *));
{
	struct threadgroup *thgp;

	/* Create a thread-group and link it into group-ring */

	thgp = (struct threadgroup*)malloc(sizeof(*thgp));
	if (!thgp) return NULL;
	memset(thgp,0,sizeof(*thgp));

	++groupid;
	thgp->groupid  = groupid;

	thgp->cep      = cep;
	thgp->withhost = withhost;
	thgp->wchan    = wc;
	thgp->whost    = wh;

	ce_fillin(thgp,cep);

	wc->linkcnt += 1;
	wh->linkcnt += 1;

	if (thrg_root == NULL) {
	  thrg_root     = thgp;
	  thgp->nextthg = thgp;
	  thgp->prevthg = thgp;
	} else {
	  thgp->nextthg       = thrg_root->nextthg;
	  thgp->prevthg       = thrg_root->nextthg->prevthg;
	  thgp->prevthg->nextthg = thgp;
	  thgp->nextthg->prevthg = thgp;
	}

	return thgp;
}

void
delete_threadgroup(thgp)
struct threadgroup *thgp;
{
	struct threadgroup *tgp;

	/* Time to say good-bye to this group, delete it.
	   We shall not have any threads under us, nor
	   idle processes!				  */

	/* However we may be called with either of these values
	   still non-zero... */
	if (thgp->transporters || thgp->threads) return;

if (verbose) sfprintf(sfstderr,"delete_threadgroup(%s/%d/%s)\n",
		      thgp->wchan->name,thgp->withhost,thgp->whost->name);

	if (thgp->idleproc != NULL || thgp->thread != NULL)
	  abort(); /* Deleting non-empty thread-group! */
	if (thrg_root == NULL) abort(); /* No thread-group root! */


	/* We are possibly the last to keep these web links */

	thgp->wchan->linkcnt -= 1;
	unweb(L_CHANNEL,thgp->wchan);

	thgp->whost->linkcnt -= 1;
	unweb(L_HOST,thgp->whost);

	/* Unlink this thread-group from the ring */

	tgp                    = thgp->nextthg;
	thgp->prevthg->nextthg = thgp->nextthg;
	tgp->prevthg           = thgp->prevthg;

	/* are we at the ring root pointer ? */
	if (thrg_root == thgp)
	  thrg_root = tgp;
	if (tgp == thgp) {
	  /* We were the only one! */
	  thrg_root = NULL;
	}

memset(thgp, 0x55, sizeof(*thgp));

	free(thgp);
}

static void _thread_timechain_unlink __((struct thread *));
static void _thread_timechain_unlink(thr)
struct thread *thr;
{
	struct threadgroup *thg = thr->thgrp;

	/* Doubly linked linear list */

	if (thr->prevtr != NULL)
	  thr->prevtr->nexttr = thr->nexttr;
	if (thr->nexttr != NULL)
	  thr->nexttr->prevtr = thr->prevtr;

	if (thread_head == thr)
	  thread_head = thr->nexttr;
	if (thread_tail == thr)
	  thread_tail = thr->prevtr;

	thr->nexttr = NULL;
	thr->prevtr = NULL;

	/* Doubly linked circullar list */

	thr->prevthg->nextthg = thr->nextthg;
	thr->nextthg->prevthg = thr->prevthg;

	thg->threads                      -= 1;
	MIBMtaEntry->sc.StoredThreadsSc -= 1;

	if (thg->thread == thr)
	  thg->thread = thr->nextthg;	/* pick other */
	if (thg->thread == thr)
	  thg->thread = NULL;		/* was the only one! */

	if (thg->thrtail == thr)
	  thg->thrtail = thr->prevthg;	/* pick other */
	if (thg->thrtail == thr)
	  thg->thrtail = NULL;		/* was the only one! */

	thr->prevthg = NULL;
	thr->nextthg = NULL;
}

static void _thread_timechain_append __((struct thread *));
static void _thread_timechain_append(thr)
struct thread *thr;
{
	struct threadgroup *thg = thr->thgrp;

	/* Doubly linked linear list */

	if (thread_head == NULL) {

	  thread_head = thr;
	  thread_tail = thr;
	  thr->nexttr = NULL;
	  thr->prevtr = NULL;

	} else {

	  thread_tail->nexttr = thr;
	  thr->nexttr = NULL;
	  thr->prevtr = thread_tail;
	  thread_tail = thr;

	}

	/* Doubly linked circullar list */

	if (thg->thread == NULL) {

	  thg->thread  = thr;
	  thg->thrtail = thr;
	  thr->nextthg = thr;
	  thr->prevthg = thr;

	} else {

	  thr->nextthg = thg->thrtail->nextthg;
	  thg->thrtail->nextthg = thr;
	  thr->prevthg = thr->nextthg->prevthg;
	  thr->nextthg->prevthg = thr;

	  thg->thrtail = thr;

	}

	thg->threads += 1;
	MIBMtaEntry->sc.StoredThreadsSc += 1;
}


static struct thread *create_thread __((struct threadgroup *thgrp,
					struct vertex *vtx,
					struct config_entry *cep));

static struct thread *
create_thread(thgrp, vtx, cep)
struct threadgroup *thgrp;
struct vertex *vtx;
struct config_entry *cep;
{
	/* Create a thread-block, link in the group pointer,
	   and link the thread into thread-ring, plus APPEND
	   to the thread time-chain				*/

	struct thread *thr;
	thr = (struct thread *)emalloc(sizeof(*thr));
	if (!thr) return NULL;

	memset(thr,0,sizeof(*thr));

	++threadid;
	thr->threadid = threadid;

	/* thr->attempts = 0;
	   thr->nextthg = NULL;
	   thr->prevthg = NULL; */

	thr->thgrp   = thgrp;
	thr->wchan   = vtx->orig[L_CHANNEL];
	thr->whost   = vtx->orig[L_HOST];

	if (thgrp->cep->flags & CFG_QUEUEONLY) {
		/* Start with the first retry */
		if(thgrp->cep->nretries) {
			mytime(&now);
			thr->wakeup = now + thgrp->cep->retries[0];
		}
	}

	thr->thvertices   = vtx;
	thr->lastthvertex = vtx;
	thr->jobs       = 1;

	vtx->thread     = thr;
	thr->channel    = strsave(vtx->orig[L_CHANNEL]->name);
	thr->host       = strsave(vtx->orig[L_HOST   ]->name);

	if (verbose) sfprintf(sfstderr,"create_thread(%s/%d/%s) -> %p\n",
			      thr->channel,thgrp->withhost,thr->host,thr);

	_thread_timechain_append(thr);

	return thr;
}


/*
 * Pick next thread from the group which this process serves.
 * At call the proc->pthread->proc does not contain us!
 *
 * Result is  proc->pthread  and  proc->pthread->nextfeed  being
 * updated to new thread, and function returns 1.
 * If no new thread can be picked (all are active, and whatnot),
 * return 0.
 */

int
pick_next_thread(proc)
     struct procinfo *proc;
{
	struct thread	    *thr;
	struct thread       *thr0 = proc->pthread;
	struct threadgroup  *thg  = proc->thg;
	int once;

	proc->pthread = NULL;

	if (thg->cep->flags & CFG_QUEUEONLY)
	  return 0; /* We are QUEUE ONLY group, no auto-switch! */

	mytime(&now);

	for ( thr = thg->thread, once = 1;
	      thr && (once || (thr != thg->thread));
	      thr = thr->nextthg, once = 0 ) {

	  struct vertex  * vp = thr->thvertices;
	  struct config_entry *ce = &(thr->thgrp->ce);

	  if (thr == thr0)
	    continue; /* No, can't be what we just were! */

	  if ((thr->wakeup > now) && (thr->attempts > 0))
	    continue; /* wakeup in future, unless first time around! */

	  if (vp && (thr->thrkids < ce->maxkidThread) &&
	      (thr->thrkids < thr->jobs) /* FIXME: real unfed count ? */) {

	    struct web     * ho = vp->orig[L_HOST];
	    struct web     * ch = vp->orig[L_CHANNEL];

	    if (thr->proc && thr->nextfeed == NULL)
	      continue; /* Nothing more to feed! See other threads! */

	    if (proc->ch != ch) /* MUST have same CH data - in case we ever
				   run with a clause where channel side is
				   partially wild-carded.. */
	      continue;

	    proc->pthread = thr;
	    thr->thrkids += 1;

	    if (thr->proc == NULL) {
	      /* Randomize the  order of thread vertices
		 (or sort by spool file mtime, if in AGEORDER..) */
	      /* Also init thr->nextfeed */
	      thread_vertex_shuffle(thr);
	    }

	    if (thr->proc)
	      proc->pnext = thr->proc;
	    thr->proc     = proc;
	    if (proc->pnext)
	      proc->pnext->pprev = proc;

	    if (proc->ho != NULL && proc->ho != ho) {
	      /* Get rid of the old host web */
	      proc->ho->kids -= 1;
	      unweb(L_HOST,proc->ho);
	      proc->ho = NULL;
	    }

	    /* Move the kid to this host web */
	    
	    if (proc->ho != ho) {
	      proc->ho = ho;
	      proc->ho->kids += 1;
	    }

	    /* The channel is be the same at old and new threads */

	    /* Move the pickup pointer forward.. */
	    thg->thread = thg->thread->nextthg;

	    if (thr->nextfeed == NULL) {
	      thr->pending = "NoNextFeed";
	      return 0;
	    }

	    return 1;
	  }
	}
	/* No result :-( */
	return 0;
}


int
delete_thread(thr)
     struct thread *thr;
{
	/* Unlink this thread from thread-chain, and thread
	   group.  Decrement thread-group count		    */

	struct threadgroup *thg = thr->thgrp;

	if (thr->thrkids || thr->jobs) return 0;

	if (verbose)
	  sfprintf(sfstderr,"delete_thread(%p:%s/%s) (thg=%p) jobs=%d\n",
		   thr,thr->channel,thr->host,thg, thr->jobs);

	free(thr->channel);
	free(thr->host);

	/* Unlink us from the thread time-chain */
	/* ... and thread-group-ring */
	_thread_timechain_unlink(thr);

memset(thr, 0x55, sizeof(*thr));

	free(thr);
	return 1;
}

#if 0 /* Dead code.. */
static void _thread_linkfront __((struct thread *, struct vertex *, struct vertex *));
static void _thread_linkfront(thr,ap,vp)
struct thread *thr;
struct vertex *ap, *vp;
{
	/* Link the VP in front of AP */
	vp->previtem = ap->previtem;
	if (ap->previtem != NULL)
	  ap->previtem->nextitem = vp;
	ap->previtem = vp;
	vp->nextitem = ap;
	if (ap == thr->thvertices)
	  thr->thvertices = vp;
	vp->thread = thr;
}
#endif

/* the  _thread_linktail()  links a vertex into thread */
static void _thread_linktail __((struct thread *, struct vertex *));

static void _thread_linktail(thr,vp)
struct thread *thr;
struct vertex *vp;
{
	if (thr->thvertices != NULL) {
	  thr->lastthvertex->nextitem = vp;
	  vp->previtem    = thr->lastthvertex;
	} else {
	  thr->thvertices = vp;
	  vp->previtem    = NULL;
	}
	vp->nextitem      = NULL;
	vp->thread        = thr;
	thr->lastthvertex = vp;
}

void thread_linkin(vp,cep,cfgid, ce_fillin)
struct vertex *vp;
struct config_entry *cep;
int cfgid;
void (*ce_fillin) __((struct threadgroup*, struct config_entry *));
{
	struct threadgroup *thg;
	struct thread *thr;

	/* int matched = 0; */
	int thg_once;

	struct web *wc = vp->orig[L_CHANNEL];
	struct web *wh = vp->orig[L_HOST];

	mytime(&now);

	if (verbose)
	  sfprintf(sfstderr,"thread_linkin([%s/%s],%s/%d/%s,%d)\n",
		   wc->name, wh->name, cep->channel,
		   cep->flags & CFG_WITHHOST, cep->host, cfgid);

	/* char const *vp_chan = wc->name; */
	/* char const *vp_host = wh->name; */

	if (thrg_root == NULL)
	  create_threadgroup(cep,wc,wh,cep->flags & CFG_WITHHOST,ce_fillin);

	/*
	 *  Search for matching config-entry, AND matching channel,
	 *  AND matching host (depending how the thread-group formation
	 *  is allowed to happen..)
	 *
	 */
	for (thg = thrg_root, thg_once = 1;
	     thg && (thg_once || thg != thrg_root);
	     thg = thg->nextthg, thg_once = 0) {

	  int thr_once;

	  if (thg->cep != cep)	/* Config entries don't match */
	    continue;

	  if (thg->wchan != wc)	/* Channels don't match */
	    continue;

	  if (thg->withhost) {
	    if (thg->whost != wh) /* Tough, must have host match! */
	      continue;
	  }
	  
	  /* The config-entry matches, we have changes to match group */

	  for (thr = thg->thread, thr_once = 1;
	       thr && (thr_once || (thr != thg->thread));
	       thr = thr->nextthg, thr_once = 0) {

#if 0
	    if (!thr->vertex) abort();	/* No vertices ?? */

	    /* no need ? (Channels within a group are identical..) */
	    if (wc != thr->wchan)  abort();
#endif
	    /* Nice! What about host ? */
	    if (wh != thr->whost)  continue;
	    
	    /* We have matching channel, AND matching host */

	    /* Link the vertex into this thread! */

	    if (verbose)
	      sfprintf(sfstderr,"thread_linkin() to thg=%p[%s/%d/%s]; added into existing thread [%s/%s] thr->jobs=%d\n",
		       thg,cep->channel,thg->withhost,cep->host,
		       wc->name,wh->name,thr->jobs+1);

	    _thread_linktail(thr,vp);
	    vp->thgrp   = thg;
	    thr->jobs  += 1;

	    if (thr->proc)     /* Caring about the UF count while running */
	      thr->unfed += 1;

	    if (thr->proc && (thr->nextfeed == NULL)) {
	      /* It is running, but no nextfeed is set (anymore),
		 tackle this vertex into the tail */

	      thr->nextfeed = vp;
	    }

	    /* Hookay..  Try to start it too... */
	    thread_start_(thr);

	    return;
	  }

	  /* No matching thread, however this GROUP matches (or does it?) */

	  /* Add a new thread into this group */
	  thr = create_thread(thg,vp,cep);
	  vp->thgrp = thg;

	  if (verbose)
	    sfprintf(sfstderr,"thread_linkin() to thg=%p[%s/%d/%s]; created a new thread %p [%s/%s]\n",
		     thg,cep->channel,thg->withhost,cep->host,
		     thr,wc->name,wh->name);

	  /* Try to start it too */
	  thread_start_(thr);

	  return;
	}

	/* Add a new group - and its thread .. */
	thg = create_threadgroup(cep, wc, wh, cep->flags & CFG_WITHHOST, ce_fillin);
	thr = create_thread(thg,vp,cep);
	vp->thgrp = thg;

	/* Try to start it too */
	thread_start_(thr);
}

struct web *
web_findcreate(flag, s)
int flag;
const char *s;
{
	struct spblk *spl;
	struct web *wp;
	spkey_t spk;

	/* caller has done 'strlower()' to our input.. */

	spk = symbol_db(s, spt_mesh[flag]->symbols);
	spl = sp_lookup(spk, spt_mesh[flag]);
	if (spl == NULL || (wp = (struct web *)spl->data) == NULL) {
	  /* Not found, create it */
	  wp = (struct web *)emalloc(sizeof (struct web));
	  memset((void*)wp, 0, sizeof (struct web));
	  sp_install(spk, (void *)wp, 0, spt_mesh[flag]);
	  wp->name     = strsave(s);
	  wp->kids     = 0;
	  wp->link     = NULL;
	  wp->linkcnt  = 0;
	}
	if (spl != NULL)
	  wp = (struct web*)spl->data;

	return wp;
}


/*
 * Deallocate a web entry (host or channel vertex header structure).
 */

void
unweb(flag, wp)
int flag;
	struct web *wp;
{
	struct spblk *spl = NULL;
	spkey_t spk;

if (verbose)
  sfprintf(sfstderr,"unweb(flag=%d wp=%p); linkcnt=%d kids=%d\n",
	   flag,wp,wp->kids,wp->linkcnt);

	if (wp->linkcnt > 0)	/* Yet objects holding it */
	  return;
	if (wp->kids > 0)	/* too early to actually remove it */
	  return;

	spk = symbol_lookup_db((u_char *)wp->name, spt_mesh[flag]->symbols);
	if ((spkey_t)0 == spk)	/* Not in the symbol table */
	  return;
	spl = sp_lookup(spk, spt_mesh[flag]);
	if (spl != NULL)	/* Should always have this ... */
	  sp_delete(spl, spt_mesh[flag]);
	symbol_free_db((u_char *)wp->name, spt_mesh[flag]->symbols);
	free(wp->name);

memset(wp, 0x55, sizeof(*wp));

	free((char *)wp);
}


/*
 * unthread(vtx) -- detach this vertex from its thread
 */
static void unthread __((struct thread *thr, struct vertex *vtx));
static void unthread(thr, vtx)
     struct thread *thr;
     struct vertex *vtx;
{
	if (vtx->previtem != NULL)
	  vtx->previtem->nextitem = vtx->nextitem;
	if (vtx->nextitem != NULL)
	  vtx->nextitem->previtem = vtx->previtem;

	if (thr) {
	  thr->jobs                        -= 1;

	  if (thr->nextfeed     == vtx)
	    thr->nextfeed       = thr->nextfeed->nextitem;
	  if (thr->thvertices   == vtx)
	    thr->thvertices     = vtx->nextitem;
	  if (thr->lastthvertex == vtx)
	    thr->lastthvertex   = vtx->previtem;
	}

	vtx->nextitem = NULL;
	vtx->previtem = NULL;
}

/*
 * Detach the vertex from its chains
 *
 * If here is a process, limbo it!
 */
void
web_detangle(vp, ok)
	struct vertex *vp;
	int ok;
{
	/* If it was in processing, remove process node binding.
	   We do this only when we have reaped the channel program. */

	struct thread *thr = vp->thread;

	/* unthread() will also unpick the  nextfeed link.. */

	unthread(thr, vp);

	/* The thread can now be EMPTY! */

	if (thr && (thr->thvertices == NULL))
	  delete_thread(thr);
}

static int vtx_mtime_cmp __((const void *, const void *));
static int vtx_mtime_cmp(ap, bp)
     const void *ap, *bp;
{
	const struct vertex **a = (const struct vertex **)ap;
	const struct vertex **b = (const struct vertex **)bp;

	if ((*a)->cfp->mtime < (*b)->cfp->mtime)
	  return -1;
	else if ((*a)->cfp->mtime == (*b)->cfp->mtime)
	  return 0;
	else
	  return 1;
}


static void
thread_vertex_shuffle(thr)
struct thread *thr;
{
	register struct vertex *vp;
	register int n, i, ni;
	static u_int           ur_size = 0;
	static struct vertex **ur_arr  = NULL;

	/* Randomize the order of vertices in processing, OR
	   sort them by spool-file MTIME, if the thread has
	   AGEORDER -flag set. */

	/* 1) Create storage array for the vertex re-arrange */
	if (ur_size == 0) {
	  ur_size = 100;
	  ur_arr = (struct vertex **)
	    emalloc(ur_size * sizeof (struct vertex *));
	}
	/* 2) Store the vertices into a re-arrange array (and count) */
	for (n = 0, vp = thr->thvertices; vp != NULL; vp = vp->nextitem) {
	  if (n >= ur_size) {
	    ur_size *= 2;
	    ur_arr = (struct vertex **)realloc((char *)ur_arr,
					       ur_size *
					       sizeof (struct vertex *));
	  }
	  ur_arr[n++] = vp;
	}

	/* 3) re-arrange pointers */
	if (thr->thgrp->ce.flags & CFG_AGEORDER) {
	  /* mtime order */
	  if (n > 1)
	    qsort((void*)ur_arr, n, sizeof(struct vertex *), vtx_mtime_cmp);
	} else
	  /* Random order */
	  for (i = 0; i < n; ++i) {
	    ni = ranny(n-1);
	    vp = ur_arr[i];
	    ur_arr[i] = ur_arr[ni];
	    ur_arr[ni] = vp;
	  }
	/* 4) Relink previtem/nextitem pointers */
	for (i = 0; i < n; ++i) {
	  if (i > 0)
	    ur_arr[i]->previtem = ur_arr[i-1];
	  if (i < (n-1))
	    ur_arr[i]->nextitem = ur_arr[i+1];
#if 1
	  /* 4c) Clear wakeup timer; the feed_child() will refuse
	     to feed us, if this one is not cleared.. */
	  ur_arr[i]->wakeup = 0;
#endif
	}
	ur_arr[  0]->previtem = NULL;
	ur_arr[n-1]->nextitem = NULL;
	/* 5) Finish the re-arrangement by saving the head,
	      and tail pointers */
	thr->thvertices   = ur_arr[  0];
	thr->nextfeed     = ur_arr[  0];
	thr->lastthvertex = ur_arr[n-1];
	thr->unfed = n;
}

static int
thread_start_(thr)
     struct thread *thr;
{
	struct config_entry *ce = &(thr->thgrp->ce);

	if (thr->proc != NULL) {
	  /* There is *somebody* active!  Shall we start, or not ? */
	  if (ce->flags & CFG_WAKEUPRESTARTONLY)
	    return 0;
	}

	return thread_start(thr, 0);
}


/*
 * thread_start() -- start the thread, if:
 *   - if the thread is not already running
 *   - thread-group has idle processor (feed immediately)
 *   - if no resource limits are exceeded for starting it
 *
 * Return non-zero, if did start something.
 */

int
thread_start(thr, queueonly_too)
     struct thread *thr;
     int queueonly_too;
{
	int rc;
	struct vertex      *vp  = thr->thvertices;
	struct threadgroup *thg = thr->thgrp;
	struct config_entry *ce = &(thr->thgrp->ce);
	struct web         *ho;
	struct web         *ch;

	queryipccheck();

	if (!thr->thrkids && !thr->jobs) {
	  /* Cleanup when no processes, nor vertices */
	  delete_thread(thr);
	  return 0;
	}

	if (syncstart || (freeze && !slow_shutdown)) return 0;
	if (!queueonly_too && (ce->flags & CFG_QUEUEONLY)) return 0;

	ho = vp->orig[L_HOST];
	ch = vp->orig[L_CHANNEL];

	if (procselect) {
	  thr->pending = "procsel-mismatch";
	  if (*procselect != '*' &&
	      strcmp(procselect,ch->name) != 0)
	    return 0;
	  if (*procselhost != '*' &&
	      strcmp(procselhost,ho->name) != 0)
	    return 0;
	}
	thr->pending = NULL;

	if (verbose)
	  sfprintf(sfstderr,"thread_start(thr=%s/%d/%s) (dt=%d thr=%p jobs=%d)\n",
		   ch->name, thg->withhost, ho->name, (int)(thr->wakeup-now),
		   thr, thr->jobs);

	if ((thr->thrkids >= ce->maxkidThread) ||
	    /* FIXME: real unfed count ? */
	    (thr->proc && (thr->thrkids >= thr->unfed))) {
	  if (verbose) {
	    struct procinfo * proc = thr->proc;
	    sfprintf(sfstderr," -- already running; thrkids=%d jobs=%d procs={ %p",
		     thr->thrkids, thr->jobs, proc);
	    proc = proc->pnext;
	    while (proc) {
	      sfprintf(sfstderr, " %p", proc);
	      proc = proc->pnext;
	    }
	    sfprintf(sfstderr, " }\n");
	  }
	  return 0; /* Already running */
	}

      re_pick:
	if (thg->idleproc) {
	  struct procinfo *proc;

	  /* There is at least one.. */
	  proc = thg->idleproc;

	  /* Idle processor(s) exists, try to optimize by finding
	     an idle proc with matching channel & host from previous
	     activity. If not found, pick any with matching CHANNEL,
	     unless must have also matching HOST... */

	  for (; proc && (proc->ho != ho || proc->ch != ch); proc = proc->pnext) ;
	  if (!proc && !thg->withhost) {
	    /* None of the previous ones matched, pick with matching CHANNEL,
	       HOST is allowed to wild-card.. */
	    proc = thg->idleproc;
	    for (; proc && (proc->ch != ch); proc = proc->pnext) ;
	  }
	  if (!proc)
	    goto create_new;

	  /* Selected one of them.. */

	  if (proc->pprev) proc->pprev->pnext = proc->pnext;
	  if (proc->pnext) proc->pnext->pprev = proc->pprev;
	  if (thg->idleproc == proc) thg->idleproc = proc->pnext;

	  proc->pnext = proc->pprev   = NULL;

	  thg->idlecnt -= 1;

	  --idleprocs;

	  /* Move to ACTIVE state */
	  MIBMtaEntry->sc.TransportAgentsActiveSc += 1;
	  MIBMtaEntry->sc.TransportAgentsIdleSc   -= 1;

	  
	  /* It may be that while we idled it, it died at the idle queue.. */
	  if (proc->pid <= 0 || proc->tofd < 0) {

	    /* sfprintf(sfstderr,
	       "%% thread_start(thr=%s/%d/%s) (proc=%p ->pid=%d ->tofd=%d)\n",
	       ch->name, thg->withhost, ho->name, proc,
	       proc->pid, proc->tofd); */

	    goto re_pick;
	  }

	  /* Thread-groups are made such that here at thread_start() we
	     can always switch over in between threads */

	  if (proc->ho != NULL && proc->ho != ho) {
	    /* Get rid of the old host web */
	    proc->ho->kids -= 1;
	    unweb(L_HOST,proc->ho);
	    proc->ho = NULL;
	  }

	  /* Move the kid to this host web */
	    
	  if (proc->ho != ho) {
	    proc->ho = ho;
	    proc->ho->kids += 1;
	  }

	  /* In theory the CHANNEL could be different -- in practice NOT! */
	  proc->ch = ch;

	  /* MULTI-TA-PER-THREAD -- only the first proc inits feed-state */
	  if (! thr->proc ) {

	    /* Randomize the order of thread vertices
	       (or sort by spool file mtime, if in AGEORDER..) */
	    /* Also init thr->nextfeed */
	    thread_vertex_shuffle(thr);
	  }

	  /* Its idle process, feed it! */

	  proc->state   = CFSTATE_LARVA;
	  proc->overfed = 1; /* A simulated state.. */

	  proc->pthread = thr;

	  if (thr->proc)   proc->pnext = thr->proc;
	  if (proc->pnext) proc->pnext->pprev = proc;

	  thr->proc     = proc;
	  thr->thrkids += 1;

	  if (verbose)
	    sfprintf(sfstderr, "%% thread_start(thr=%s/%d/%s) (proc=%p dt=%d thr=%p jobs=%d)\n",
		     ch->name, thg->withhost, ho->name, thr->proc,
		     (int)(thr->wakeup-now), thr, thr->jobs);

	  ta_hungry(proc);

	  return 1;
	}

 create_new:

	/* Check resource limits - MaxTa, MaxChan, MaxThrGrpTa */
	/* If resources are exceeded, reschedule.. */

	vp = thr->thvertices;

	if (numkids >= ce->maxkids) {
	  vp->ce_pending = SIZE_L;
	  thr->pending = ">MaxTA";
	} else if (vp->orig[L_CHANNEL]->kids >= ce->maxkidChannel) {
	  vp->ce_pending = L_CHANNEL;
	  thr->pending = ">MaxChannel";
	} else if (thg->transporters >= ce->maxkidThreads) {
	  vp->ce_pending = L_HOST;
	  thr->pending = ">MaxRing";
	} else if (thr->thrkids >= ce->maxkidThread) {
	  vp->ce_pending = SIZE_L;
	  thr->pending = ">MaxThr";
	} else {
	  vp->ce_pending = 0;
	  thr->pending = NULL;
	}

	if (vp->ce_pending) {
	  if (verbose)
	    sfprintf(sfstderr,"%s: (%d %dC %dT %dTh) >= (%d %dC %dT %dTh)\n",
		     ce->command,
		     numkids,
		     vp->orig[L_CHANNEL]->kids,
		     thg->transporters,
		     thr->thrkids,
		     ce->maxkids,
		     ce->maxkidChannel,
		     ce->maxkidThreads,
		     ce->maxkidThread);
	  /*
	   * Would go over limit.  Rescheduling for the next
	   * (single) interval works ok in many situation.
	   * However when the scheduler is very busy one can
	   * run into systemic problems with some set of messages
	   * blocking another set of messages.  The only way
	   * around that is a roundrobin scheme, implemented
	   * by the fifo nature of the thread scheduling.
	   */
	  reschedule(vp, 0, -1);
	  return 0;
	}

	/* Now we are ready to start a new child to run our bits */
	
	if (! thr->proc ) {
	  /* MULTI-TA-PER-THREAD -- first proc inits the feed-state */

	  /* Randomize the order of thread vertices
	     (or sort by spool file mtime, if in AGEORDER..) */
	  /* Also init thr->nextfeed */
	  thread_vertex_shuffle(thr);
	}

	rc = start_child(thr->thvertices,
			 thr->thvertices->orig[L_CHANNEL],
			 thr->thvertices->orig[L_HOST]);

	if (thr->proc && verbose)
	  sfprintf(sfstderr,"%% thread_start(thr=%s/%d/%s) (proc=%p dt=%d thr=%p jobs=%d)\n",
		   ch->name, thg->withhost, ho->name, thr->proc,
		   (int)(thr->wakeup-now), thr, thr->jobs);


	return rc;
}


/*
 * pick_next_vertex() -- pick next free to process vertex in this thread
 *
 * This is called *only* by  feed_child(), and  proc->vertex  directs
 * then the caller of feed_child() to tune the process state.
 * (From STUFFING to FINISHING and possibly to IDLE.)
 * 
 * - if (proc->pthread->nextfeed != NULL) ...nextfeed = ...nextfeed->nextitem;
 * - return (...nextfeed != NULL);
 *
 */

/* Return 0 for errors, 1 for success; result is at  ...nextfeed */

int
pick_next_vertex(proc)
     struct procinfo *proc;
{
	struct thread * thr = proc->pthread;
	struct vertex * vtx = NULL;

	if (thr) vtx = thr->nextfeed;

	if (verbose)
	  sfprintf(sfstderr,"pick_next_vertex(proc=%p) proc->tofd=%d, thr=%p, pvtx=%p, jobs=%d OF=%d S=%s\n",
		   proc, proc->tofd, thr, vtx, thr ? thr->jobs : 0,
		   proc->overfed, proc_state_names[proc->state]);

	if (proc->pid < 0 || proc->tofd < 0) {	/* "He is dead, Jim!"	*/
	  if (verbose) sfprintf(sfstderr," ... NONE, 'He is dead, Jim!'\n");
	  return 0;
	}

	if (vtx) /* Pick next item */
	  thr->nextfeed = vtx = vtx->nextitem;

	return (vtx != NULL);
}

/*
 *  The  thread_expire2()  will handle exceedingly old things
 *  with ages in excess of expire+expire2 (seconds) in queue
 *  even if no successfull delivery attempt has been made.
 *
 *  Return the kill-count.
 *
 *  Side-effect warning: 
 *         Afterwards the THR may point to nonexistent object!
 */


int
thread_expire2(thr, timelimit, killall, msgstr)
     struct thread *thr;
     time_t timelimit;
     int killall;	  /* later uses in mind.. now dummy parameter */
     const char *msgstr;  /* ... likewise. */
{
	int killcount = 0;
	struct vertex       *vtx  = thr->thvertices;
	struct vertex	*nextvtx;

	for ( ;vtx; vtx = nextvtx) {
	  int expire_this = 0;

	  nextvtx = vtx->nextitem;

	  /* Time to expire ? */
	  if (vtx->ce_expiry > 0 && vtx->ce_expiry <= now &&
	      vtx->attempts  > 0) {
	    expire_this = 1;
	  }
	  if (vtx->ce_expiry2 > 0 && vtx->ce_expiry2 <= now) {
	    expire_this = 1;
	  }
	  if (expire_this) {
	    /* ... and now expire it! */
	    /* this MAY invalidate also the THREAD object! */

	    expire(vtx, -1); /* ... them all. */
	    ++killcount;

	    mytime(&now);
	    if (now > timelimit) break;
	  }
	}

	return killcount;
}


/*
 * The  thread_reschedule()  updates threads time-chain to match the
 * new value of wakeup for the  doagenda()  to later use.
 * Return 0 for DESTROYED thread, 1 for EXISTING thread.
 */

int
thread_reschedule(thr, retrytime, index)
     struct thread *thr;
     int index;
     time_t retrytime;
{
	struct vertex *vtx = thr->thvertices;
	struct vertex *nvtx;
	time_t wakeup = 0;
	int skew;

	if (verbose)
	  sfprintf(sfstderr,"thread_reschedule() ch=%s ho=%s jobs=%d thr=%p proc=%p\n",
		   thr->channel,thr->host,thr->jobs,thr,thr->proc);


	if (!thr->thrkids && !thr->jobs) {
	  delete_thread(thr);
	  return 0;
	}

	/* If there are multiple kids working still, DON'T reschedule! */
	if ((thr->thrkids > 0)  ||  (vtx == NULL)) return 1;

	/* find out when to retry */
	mytime(&now);

	/* if we are already scheduled for the future, don't reschedule */
	if (vtx->wakeup > now) {
	  thr->wakeup = vtx->wakeup;
	  if (verbose)
	    sfprintf(sfstderr,"...prescheduled\n");
	  goto timechain_handling;
	} else if (vtx->wakeup < now-7200 /* more than 2h in history .. */ )
	  vtx->wakeup = now;

	if (vtx->thgrp->ce.nretries <= 0) {
	  if (verbose)
	    sfprintf(sfstderr,"...ce->retries = %d\n", vtx->thgrp->ce.nretries);
	  goto timechain_handling;
	}

	if (thr->retryindex >= vtx->thgrp->ce.nretries) {
	  if (vtx->thgrp->ce.nretries > 1)
	    thr->retryindex = ranny(vtx->thgrp->ce.nretries-1);
	  else
	    thr->retryindex = 0;
	}

	/*
	 * clamp retry time to a predictable interval so we
	 * eventually bunch up deliveries.
	 */
	if (retrytime > 100000 && retrytime < now+63)
	  vtx->wakeup = now;
#if 0
	skew = vtx->wakeup % vtx->thgrp->ce.interval;
	if (skew <= vtx->thgrp->ce.interval / 2)
	  skew = - (skew + (vtx->thgrp->ce.skew - 1));
	else
	  skew = skew + (vtx->thgrp->ce.skew - 1);
	skew = skew / vtx->thgrp->ce.skew; /* want int div truncation */
	
	vtx->wakeup += (skew +
			vtx->thgrp->ce.retries[thr->retryindex] * vtx->thgrp->ce.interval);
#else
	/* Actually we do NOT want to have synchronization of threads,
	   as such causes simultaneous start of transporters, which
	   causes "somewhat" spiky load behaviour */

	skew = vtx->thgrp->ce.retries[thr->retryindex] * vtx->thgrp->ce.interval;
	if (retrytime <= 100000 &&
	    (int)retrytime > skew)
	  skew = retrytime;
	vtx->wakeup += skew;
#endif
	thr->retryindex++;

	/* If history, move forward by ``ce.interval'' multiple */
	if (vtx->wakeup < now)
	  vtx->wakeup += ((((now - vtx->wakeup) / vtx->thgrp->ce.interval)+1)
			  * vtx->thgrp->ce.interval);

	wakeup = vtx->wakeup;

	if (retrytime < now+63)
	  retrytime = wakeup;

	/* Reschedule ALL vertices on this thread */
	for ( ;vtx; vtx = nvtx) {
	  int expire_this = 0;

	  nvtx = vtx->nextitem;

	  /* Time to expire ? */
	  if (vtx->ce_expiry > 0 && vtx->ce_expiry <= now &&
	      vtx->attempts  > 0) {
	    expire_this = 1;
	  }
	  if (vtx->ce_expiry2 > 0 && vtx->ce_expiry2 <= now) {
	    expire_this = 1;
	  }
	  if (expire_this) {
	    /* ... and now expire it! */
	    /* this MAY invalidate also the THREAD object! */

	    if (thr->jobs > 1) {
	      expire(vtx, index);
	    } else {
	      expire(vtx, index);
	      thr = NULL; /* The THR-pointed object is now invalid */
	    }
	    continue;
	  }

	  /* Didn't expire, so time to tune the wakeup ... */

	  if (vtx->wakeup < retrytime)
	    vtx->wakeup = retrytime;
	  if (wakeup > vtx->wakeup || wakeup == 0)
	    wakeup = vtx->wakeup;
	}

	if (thr != NULL)
	  thr->wakeup = wakeup;

 timechain_handling:
	/* In every case the rescheduling means we move this thread
	   to the end of the thread_head chain.. */

	if (thr != NULL) {
	  _thread_timechain_unlink(thr);
	  _thread_timechain_append(thr);
	}

	return (thr != NULL);
}


/*
 * reschedule() operates WITHIN a thread, but does *not* move things!
 *
 */

void
reschedule(vp, factor, index)
	struct vertex *vp;
	int factor, index;
{
	int skew;
	struct thread *thr = vp->thread;
	struct threadgroup *thg = vp->thgrp;
	struct config_entry *ce = &(thg->ce);

	/* Hmm.. The reschedule() is called only when we have a reason
	   to call it, doesn't it ?  */

	/* find out when to retry */
	mytime(&now);

	if (verbose)
	  sfprintf(sfstderr,"reschedule %p now %d expiry in %d attempts %d factor %d inum %d (%s/%s: %s)\n",
		   vp, (int)now,
		   (int)((vp->ce_expiry > 0) ? (vp->ce_expiry - now) : -999),
		   vp->attempts,
		   factor, (int)(vp->cfp->id),
		   vp->orig[L_CHANNEL]->name,
		   vp->orig[L_HOST]->name,
		   vp->cfp->mid);
	/* if we are already scheduled for the future, don't reschedule */
	if (vp->wakeup > now) {
	  if (verbose)
	    sfprintf(sfstderr,"prescheduled\n");
	  return;
	} else if (vp->wakeup < now-7200 /* more than 2h .. */ )
	  vp->wakeup = now;

	if (ce->nretries <= 0) {
	  if (verbose)
	    sfprintf(sfstderr,"ce->retries = %d\n", ce->nretries);
	  return;
	}
	if (factor == -1 && vp->attempts) {
	  if (thr->retryindex >= ce->nretries) {
	    if (ce->nretries > 1)
	      thr->retryindex = ranny(ce->nretries-1);
	    else
	      thr->retryindex = 0;
	  }

	  /*
	   * clamp retry time to a predictable interval so we
	   * eventually bunch up deliveries.
	   */
	  skew = vp->wakeup % ce->interval;
	  if (skew <= ce->interval / 2)
	    skew = - (skew + (ce->skew - 1));
	  else
	    skew = skew + (ce->skew - 1);
	  skew = skew / ce->skew; /* want int div truncation */

	  vp->wakeup += (skew +
			 ce->retries[thr->retryindex] * ce->interval);
	  thr->retryindex++;
	} else if (factor < -1) {
	  vp->wakeup = -factor;
	} else
	  vp->wakeup += factor * ce->interval;

	/* I THINK there could be an assert that if this happens,
	   something is WRONG.. */
	if (vp->attempts == 0)
	  vp->wakeup = now;

	/* XX: change this to a mod expression */
	if (vp->wakeup < now)
	  vp->wakeup = ((((now - vp->wakeup) / ce->interval)+1)
			* ce->interval) + 10 + 2*thr->jobs;

	/* Makes sure that next future event is at +10+2*jobcount seconds
	   in the future..  A kludge approach, but still.. */

	if (vp->ce_expiry > 0
	    && vp->ce_expiry <= vp->wakeup
	    && vp->attempts > 0) {
	  if (verbose)
	    sfprintf(sfstderr,"ce_expiry = %d, %d attempts\n",
		     (int)(vp->ce_expiry), vp->attempts);

	  /* expire() will delete this vertex in due time */
	  expire(vp, index);

	  return;
	}
}


/*
 * With the  idle_cleanup()  we clean up idle processes, that have
 * been idle for too long (idlemax=nnnns)
 *
 * Because during its progress the thread-groups can disappear,
 * (as is one of its mandates) this code looks a bit peculiar..
 */
int
idle_cleanup()
{
	/* global: time_t now */
	struct threadgroup *thg, *nthg;
	int thg_once;
	int freecount = 0;

	mytime(&now);

	if (verbose) sfprintf(sfstderr,"idle_cleanup()\n");

	if (!thrg_root) return 0; /* No thread group! */

	for (thg = thrg_root, thg_once = 1;
	     thg_once || (thg != thrg_root);
	     thg = nthg, thg_once = 0) {

	  nthg = thg->nextthg;

	  if (thg->thread != NULL) {
	    struct procinfo *p;
	    struct thread *thr, *nthr;
	    int thr_once;
	    
	    /* Clean-up faulty client  --  KLUDGE :-(  --  OF=0, HA > much */

	    for (thr = thg->thread, thr_once = 1;
		 thr && (thr_once || (thr != thg->thread));
		 thr = nthr, thr_once = 0) {

	      nthr = thr->nextthg;

	      p = thr->proc;
	      if (thr->thgrp != thg) /* Not of this group ? */
		continue; /* Next! */
	      if (!p) /* No process */
		continue;
	      if ((p->cmdlen == 0) && (p->overfed == 0) && (p->tofd >= 0) &&
		  (p->hungertime != 0) && (p->hungertime + MAX_HUNGER_AGE <= now)) {

		/* Close the command channel, let it die itself.
		   Rest of the cleanup happens via mux() service. */
		if (verbose)
		  sfprintf(sfstderr,"idle_cleanup() killing TA on tofd=%d pid=%d\n",
			 p->tofd, (int)p->pid);

		thr->wakeup = now-1; /* reschedule immediately! */

		write(p->tofd,"\n",1); /* XXXX: should this be removed ?? */

		pipes_shutdown_child(p->tofd);
		p->tofd = -1;

		++freecount;

		/* Reclaim will (in due time) detect dead child, and
		   decrement child counters. */

		zsyslog((LOG_ERR, "ZMailer scheduler kludge shutdown of TA channel (info for debug only); %s/%s/%d HA=%ds",
			 thr->channel, thr->host, thr->thgrp->withhost,
			 (int)(now - p->hungertime)));
	      }
	    }
	  }

	  if (thg->idleproc) {
	    struct procinfo *p;
	    
	    p  =  thg->idleproc;

	    while (p != NULL) {
	      if ((thg->cep->idlemax + p->hungertime < now) &&
		  (p->cmdlen == 0) && (p->tofd >= 0)) {
		/* It is old enough -- ancient, one might say.. */

		/* Close the command channel, let it die itself.
		   Rest of the cleanup happens via mux() service. */
		if (verbose)
		  sfprintf(sfstderr,"idle_cleanup() killing TA on tofd=%d pid=%d\n",
			   p->tofd, (int)p->pid);
		write(p->tofd,"\n",1);
		pipes_shutdown_child(p->tofd);
		p->tofd       = -1;
		++freecount;
	      }

	      /* Move to the next possible idle process */
	      p = p->pnext;
	    }
	  }
	  /* If there are no threads, nor transporters, delete the thg */
	  delete_threadgroup(thg);
	}
	return freecount;
}

static time_t oldest_age_on_thread __((struct thread *));
static time_t oldest_age_on_thread(th) /* returns the AGE in seconds.. */
struct thread *th;
{
	register time_t oo = now+1;
	register struct vertex *vp;

	vp = th->thvertices;
	while (vp) {
	  if (vp->cfp->mtime < oo)
	    oo = vp->cfp->mtime;
	  vp = vp->nextitem;
	}
	return (now - oo);
}

void thread_report(fp,mqmode)
     Sfio_t *fp;
     int mqmode;
{
	struct threadgroup *thg;
	int thg_once = 1;
	int jobsum = 0, jobtotal = 0;
	int threadsum = 0;
	char timebuf[20];

	int width;
	int cnt, procs, thrkidsum;
	int rcptsum = 0;
	struct procinfo *p;
	struct thread *thr;

	int spc = (mqmode & MQ2MODE_FULL) ? ' ' : '\t';

	mytime(&now);

#if 0
	if (thrg_root == NULL) {
	  *timebuf = 0;
	  saytime((long)(now - sched_starttime), timebuf, 1);
	  sfprintf(fp,"No threads/processes.  Uptime: %s\n",timebuf);
	  return;
	}
#endif

	for (thg = thrg_root;
	     thg && (thg_once || thg != thrg_root);
	     thg = thg->nextthg) {

	  int thr_once;

	  thg_once = 0;
	  if (mqmode & (MQ2MODE_FULL | MQ2MODE_QQ)) {
	    sfprintf(fp,"%s/%s/%d\n",
		     thg->cep->channel, thg->cep->host, thg->withhost);
	  }

	  cnt   = 0;
	  procs = 0;
	  jobsum = 0;
	  thrkidsum = 0;

#if 1 /* XX: zero for verifying of modified system; turn to 1 for running! */

	  /* We scan thru the local ring of threads */

	  for (thr = thg->thread, thr_once = 1;
	       thr && (thr_once || (thr != thg->thread));
	       thr = thr->nextthg, thr_once = 0)
#else
	  /* We scan there in start order from the  thread_head
	     chain! */

	  for (thr = thread_head;
	       thr != NULL;
	       thr = thr->nexttr)
#endif
	  {
	    if (thr->thgrp != thg) /* Not of this group ? */
	      continue; /* Next! */

	    {
	      struct vertex *vp = thr->thvertices;
	      while (vp != NULL) {
		rcptsum += vp->ngroup;
		vp = vp->nextitem;
	      }
	    }

	    if (mqmode & MQ2MODE_FULL2) {
	      width = sfprintf(fp,"%s\t%s\t",
			       /* thr->thvertices->orig[L_CHANNEL]->name */
			       thr->channel,
			       /* thr->thvertices->orig[L_HOST]->name */
			       thr->host);
	    } else if (mqmode & MQ2MODE_FULL) {
	      width = sfprintf(fp,"    %s/%s/%d",
			       /* thr->thvertices->orig[L_CHANNEL]->name */
			       thr->channel,
			       /* thr->thvertices->orig[L_HOST]->name */
			       thr->host,
			       thr->thgrp->withhost);
	      if (width < 0) break; /* error.. */
	      width += 7;
	      if (width < 16-1)
		sfprintf(fp,"\t");
	      if (width < 24-1)
		sfprintf(fp,"\t");
	      if (width < 32-1)
		sfprintf(fp,"\t");
	      if (width < 40-1)
		sfprintf(fp,"\t");
	      else
		sfprintf(fp," ");
	    }

	    jobsum += thr->jobs;

	    if (mqmode & MQ2MODE_FULL)
	      sfprintf(fp,"R=%-3d A=%-2d", thr->jobs, thr->attempts);
	    if (mqmode & MQ2MODE_FULL2)
	      sfprintf(fp,"R=%d\tA=%d", thr->jobs, thr->attempts);

	    ++cnt;
	    if (thr->proc != NULL &&
		thr->proc->pthread == thr) {

	      int thrprocs = 0;
	      struct procinfo *proc;

	      for (proc = thr->proc; proc; proc = proc->pnext) {
		++procs;
		++thrprocs;
		++thrkidsum;
	      }

	      if (mqmode & (MQ2MODE_FULL|MQ2MODE_FULL2)) {
		proc = thr->proc;

		if (thr->thrkids != thrprocs)
		  sfprintf(fp, "%cKids=%d/%d", spc, thr->thrkids, thrprocs);

		sfprintf(fp, "%cP={", spc);
		while (proc) {
		  sfprintf(fp, "%d", (int)proc->pid);
		  if (proc->pnext) sfprintf(fp, ",");
		  proc = proc->pnext;
		}
		sfprintf(fp, "}");

		proc = thr->proc;
		sfprintf(fp, "%cHA={", spc);
		while (proc) {
		  sfprintf(fp, "%d", (int)(now - proc->hungertime));
		  if (proc->pnext) sfprintf(fp, ",");
		  proc = proc->pnext;
		}
		sfprintf(fp, "}s");

		proc = thr->proc;
		sfprintf(fp, "%cFA={", spc);
		while (proc) {
		  if (proc->feedtime == 0)
		    sfprintf(fp, "never");
		  else
		    sfprintf(fp, "%d", (int)(now - proc->feedtime));
		  if (proc->pnext) sfprintf(fp, ",");
		  proc = proc->pnext;
		}
		sfprintf(fp, "}s");

		proc = thr->proc;
		sfprintf(fp, "%cOF={", spc);
		while (proc) {
		  sfprintf(fp, "%d", proc->overfed);
		  if (proc->pnext) sfprintf(fp, ",");
		  proc = proc->pnext;
		}
		sfprintf(fp, "}");

		proc = thr->proc;
		sfprintf(fp, "%cS={", spc);
		while (proc) {
		  sfprintf(fp, "%s", proc_state_names[proc->state]);
		  if (proc->pnext) sfprintf(fp, ",");
		  proc = proc->pnext;
		}
		sfprintf(fp, "}");

		sfprintf(fp, "%cUF=%d", spc, thr->unfed);
	      }

	    } else if (thr->wakeup > now) {
	      if (mqmode & (MQ2MODE_FULL|MQ2MODE_FULL2)) {
		sfprintf(fp,"%cW=%ds", spc, (int)(thr->wakeup - now));
	      }
	    } else if (thr->pending) {
	      if (mqmode & (MQ2MODE_FULL|MQ2MODE_FULL2)) {
		sfprintf(fp,"%cpend=%s", spc, thr->pending);
	      }
	    }

	    if (mqmode & (MQ2MODE_FULL|MQ2MODE_FULL2)) {
	      *timebuf = 0;
	      saytime((long)oldest_age_on_thread(thr), timebuf, 1);
	      sfprintf(fp, "%cQA=%s", spc, timebuf);

	      if (thr->thvertices && thr->thvertices->ce_pending)
		if (thr->thvertices->ce_pending != SIZE_L && spc == ' ')
		  sfprintf(fp, "%s",
			   (thr->thvertices->ce_pending ==
			    L_CHANNEL ? " channelwait" : " threadwait"));
	      sfprintf(fp, "\n");
	    }

	  }

	  if (mqmode & (MQ2MODE_FULL | MQ2MODE_QQ)) {

	    sfprintf(fp,"\tThreads: %4d",thg->threads);

	    if (thg->threads != cnt)
	      sfprintf(fp,"/%d",cnt);

	  }

	  cnt = 0;
	  for (p = thg->idleproc; p != 0; p = p->pnext) ++cnt;
	  procs += cnt;

	  if (mqmode & (MQ2MODE_FULL | MQ2MODE_QQ)) {

	    sfprintf(fp, " Msgs: %5d Procs: %3d", jobsum, thg->transporters);

	    if (thg->transporters != procs)
	      sfprintf(fp,"/%d",procs);

	    sfprintf(fp," Idle: %3d",thg->idlecnt);
	    if (thg->idlecnt != cnt)
	      sfprintf(fp, "/%d", cnt);

	    sfprintf(fp, " Plim: %3d Flim: %3d Tlim: %d\n",
		     thg->ce.maxkidThreads, thg->ce.overfeed, thg->ce.maxkidThread);
	  }

	  jobtotal  += jobsum;
	  threadsum += thg->threads;
	}

	if (mqmode & (MQ2MODE_FULL | MQ2MODE_QQ | MQ2MODE_SNMP)) {
	  long files;
	  *timebuf = 0;
	  saytime((long)(now - sched_starttime), timebuf, 1);
	  sfprintf(fp,"Kids: %d  Idle: %2d  Msgs: %3d  Thrds: %3d  Rcpnts: %4d  Uptime: ",
		   numkids, idleprocs, global_wrkcnt, threadsum, jobtotal);
	  if (mqmode & MQ2MODE_SNMP)
	    sfprintf(fp, "%ld sec\n",(long)(now - sched_starttime));
	  else
	    sfprintf(fp, "%s\n",timebuf);

	  sfprintf(fp, "Msgs in %lu out %lu stored %ld ",
		   (u_long)MIBMtaEntry->sc.ReceivedMessagesSc,
		   (u_long)MIBMtaEntry->sc.TransmittedMessagesSc,
		   (long)MIBMtaEntry->sc.StoredMessagesSc);

	  files = thread_count_files();
	  if ((long)MIBMtaEntry->sc.StoredMessagesSc != files)
	    sfprintf(fp, "(%ld) ", files);

	  sfprintf(fp, "Rcpnts in %lu out %lu stored %ld",
		   (u_long)MIBMtaEntry->sc.ReceivedRecipientsSc,
		   (u_long)MIBMtaEntry->sc.TransmittedRecipientsSc,
		   (long)MIBMtaEntry->sc.StoredRecipientsSc);

	  if (rcptsum != MIBMtaEntry->sc.StoredRecipientsSc)
	    sfprintf(fp, " (%d)", rcptsum);

	  sfprintf(fp, "\n");
	}
	sfsync(fp);
}


void thread_detail_report(fp,mqmode,channel,host)
     Sfio_t *fp;
     int mqmode;
     char *channel, *host;
{
	struct thread *th;
	spkey_t spk;
	struct spblk *spl;
	struct web *wh, *wc;
	struct vertex *vp;
	int i;
	char buf[100];

	mytime(&now);

	spk = symbol_lookup_db((void*)channel, spt_mesh[L_CHANNEL]->symbols);
	spl = sp_lookup(spk, spt_mesh[L_CHANNEL]);
	if (spl == NULL || spl->data == NULL) {
	  /* Not found, nothing to do.. */
	  return;
	}
	wc = (struct web *)spl->data;

	spk = symbol_lookup_db((void*)host, spt_mesh[L_HOST]->symbols);
	spl = sp_lookup(spk, spt_mesh[L_HOST]);
	if (spl == NULL || spl->data == NULL) {
	  /* Not found, nothing to do.. */
	  return;
	}
	wh = (struct web *)spl->data;

	for (th = thread_head; th; th = th->nexttr) {
	  if (wh == th->whost && wc == th->wchan) {
	    break;
	  }
	}
	if (th) {
	  /* Found it! */
	  for (vp = th->thvertices; vp; vp = vp->nextitem) {
	    struct ctlfile *cfp = vp->cfp;
	    for (i = 0; i < vp->ngroup; ++i) {
	      /* Spoolfile */
	      sfprintf(fp, "%s%s", cfpdirname(cfp->dirind), cfp->mid);

	      /* How manyth in a group ? */
	      sfprintf(fp, "\t%d", i);
	      /* Sender index -- or sender address */
	      sfprintf(fp, "\t%s", cfp->erroraddr);
	      /* Recipient offset */
	      sfprintf(fp,"\t%d", cfp->offset[vp->index[i]]);
	      /* Expiry stamp */
	      sfprintf(fp,"\t%ld", (long)vp->ce_expiry);
	      /* next wakeup */
	      sfprintf(fp,"\t%ld", (long)vp->wakeup);
	      /* last feed time */
	      sfprintf(fp,"\t%ld", (long)vp->lastfeed);
	      /* attempts */
	      sfprintf(fp,"\t%d\t", vp->attempts);
	      /* ce_pending */
	      if (vp->wakeup > now) {
		*buf = 0;
		saytime((long)(vp->wakeup - now), buf, 1);
		sfprintf(fp,"retry in %s", buf);
	      } else {
		switch(vp->ce_pending) {
		case SIZE_L: /* BAD! */
		  break;
		case L_CHANNEL:
		  sfprintf(fp,"channel");
		  break;
		default:
		  sfprintf(fp,"thread");
		  break;
		}
	      }
	      /* message - if any */
	      sfprintf(fp, "\t");
	      if (vp->message)
		sfprintf(fp,"%s", vp->message);
	      sfprintf(fp, "\n");
	    }
	  }
	}

	sfsync(fp);
}


int thread_count_recipients()
{
	struct threadgroup *thg;
	struct thread *thr;
	int thg_once;
	int jobtotal = 0;

	if (thrg_root == NULL)
	  return 0;

	if (thrg_root)
	  for (thg = thrg_root, thg_once = 1;
	       thg_once || thg != thrg_root;
	       thg = thg->nextthg, thg_once = 0) {

	    int thr_once;
	    int jobsum = 0;

	    if (thg->thread)
	      for (thr = thg->thread, thr_once = 1;
		   thr_once || (thr != thg->thread);
		   thr = thr->nextthg, thr_once = 0) {

		jobsum += thr->jobs;

	      }
	    jobtotal += jobsum;
	  }
	return jobtotal;
}


static int thread_files_count;

static int spl_thread_cnt_files __((struct spblk *spl));
static int spl_thread_cnt_files(spl)
struct spblk *spl;
{
	++thread_files_count;
	return 0;
}


int thread_count_files __((void))
{
	thread_files_count = 0;
	sp_scan(spl_thread_cnt_files, NULL, spt_mesh[L_CTLFILE]);
	return  thread_files_count;
}


syntax highlighted by Code2HTML, v. 0.9.1