/*
* ZMailer 2.99.16+ Scheduler "threads" routines
*
* Copyright Matti Aarnio <mea@nic.funet.fi> 1995-2003
*
*
* These "threads" are for book-keeping of information
* regarding schedulable recipient vertices
*/
#include <stdio.h>
#include <sfio.h>
#include <ctype.h>
#include <unistd.h>
#include "scheduler.h"
#include "prototypes.h"
#include "zsyslog.h"
/* #include <stdlib.h> */
/*
Each vertex arrives into SOME thread, each of which belong to
SOME thread-group, among which groups the transport channel
programs started for any member thread can be shared among
their members.
*/
#define MAX_HUNGER_AGE 600 /* Sign of an error ... */
extern char *proc_state_names[];
struct thread *thread_head = NULL;
struct thread *thread_tail = NULL;
static struct threadgroup *thrg_root = NULL;
int idleprocs = 0;
extern int global_wrkcnt;
extern int syncstart;
extern int freeze;
extern int slow_shutdown;
extern time_t now;
extern char *procselect, *procselhost;
extern time_t sched_starttime; /* From main() */
extern int mailqmode; /* 1 or 2 */
static long groupid = 0;
static long threadid = 0;
static void thread_vertex_shuffle __((struct thread *thr));
static struct threadgroup *create_threadgroup __((struct config_entry *cep, struct web *wc, struct web *wh, int withhost, void (*ce_fillin)__((struct threadgroup *, struct config_entry *)) ));
static int thread_start_ __((struct thread *thr));
static struct threadgroup *
create_threadgroup(cep, wc, wh, withhost, ce_fillin)
struct config_entry *cep;
struct web *wc, *wh;
int withhost;
void (*ce_fillin) __((struct threadgroup*, struct config_entry *));
{
struct threadgroup *thgp;
/* Create a thread-group and link it into group-ring */
thgp = (struct threadgroup*)malloc(sizeof(*thgp));
if (!thgp) return NULL;
memset(thgp,0,sizeof(*thgp));
++groupid;
thgp->groupid = groupid;
thgp->cep = cep;
thgp->withhost = withhost;
thgp->wchan = wc;
thgp->whost = wh;
ce_fillin(thgp,cep);
wc->linkcnt += 1;
wh->linkcnt += 1;
if (thrg_root == NULL) {
thrg_root = thgp;
thgp->nextthg = thgp;
thgp->prevthg = thgp;
} else {
thgp->nextthg = thrg_root->nextthg;
thgp->prevthg = thrg_root->nextthg->prevthg;
thgp->prevthg->nextthg = thgp;
thgp->nextthg->prevthg = thgp;
}
return thgp;
}
void
delete_threadgroup(thgp)
struct threadgroup *thgp;
{
struct threadgroup *tgp;
/* Time to say good-bye to this group, delete it.
We shall not have any threads under us, nor
idle processes! */
/* However we may be called with either of these values
still non-zero... */
if (thgp->transporters || thgp->threads) return;
if (verbose) sfprintf(sfstderr,"delete_threadgroup(%s/%d/%s)\n",
thgp->wchan->name,thgp->withhost,thgp->whost->name);
if (thgp->idleproc != NULL || thgp->thread != NULL)
abort(); /* Deleting non-empty thread-group! */
if (thrg_root == NULL) abort(); /* No thread-group root! */
/* We are possibly the last to keep these web links */
thgp->wchan->linkcnt -= 1;
unweb(L_CHANNEL,thgp->wchan);
thgp->whost->linkcnt -= 1;
unweb(L_HOST,thgp->whost);
/* Unlink this thread-group from the ring */
tgp = thgp->nextthg;
thgp->prevthg->nextthg = thgp->nextthg;
tgp->prevthg = thgp->prevthg;
/* are we at the ring root pointer ? */
if (thrg_root == thgp)
thrg_root = tgp;
if (tgp == thgp) {
/* We were the only one! */
thrg_root = NULL;
}
memset(thgp, 0x55, sizeof(*thgp));
free(thgp);
}
static void _thread_timechain_unlink __((struct thread *));
static void _thread_timechain_unlink(thr)
struct thread *thr;
{
struct threadgroup *thg = thr->thgrp;
/* Doubly linked linear list */
if (thr->prevtr != NULL)
thr->prevtr->nexttr = thr->nexttr;
if (thr->nexttr != NULL)
thr->nexttr->prevtr = thr->prevtr;
if (thread_head == thr)
thread_head = thr->nexttr;
if (thread_tail == thr)
thread_tail = thr->prevtr;
thr->nexttr = NULL;
thr->prevtr = NULL;
/* Doubly linked circullar list */
thr->prevthg->nextthg = thr->nextthg;
thr->nextthg->prevthg = thr->prevthg;
thg->threads -= 1;
MIBMtaEntry->sc.StoredThreadsSc -= 1;
if (thg->thread == thr)
thg->thread = thr->nextthg; /* pick other */
if (thg->thread == thr)
thg->thread = NULL; /* was the only one! */
if (thg->thrtail == thr)
thg->thrtail = thr->prevthg; /* pick other */
if (thg->thrtail == thr)
thg->thrtail = NULL; /* was the only one! */
thr->prevthg = NULL;
thr->nextthg = NULL;
}
static void _thread_timechain_append __((struct thread *));
static void _thread_timechain_append(thr)
struct thread *thr;
{
struct threadgroup *thg = thr->thgrp;
/* Doubly linked linear list */
if (thread_head == NULL) {
thread_head = thr;
thread_tail = thr;
thr->nexttr = NULL;
thr->prevtr = NULL;
} else {
thread_tail->nexttr = thr;
thr->nexttr = NULL;
thr->prevtr = thread_tail;
thread_tail = thr;
}
/* Doubly linked circullar list */
if (thg->thread == NULL) {
thg->thread = thr;
thg->thrtail = thr;
thr->nextthg = thr;
thr->prevthg = thr;
} else {
thr->nextthg = thg->thrtail->nextthg;
thg->thrtail->nextthg = thr;
thr->prevthg = thr->nextthg->prevthg;
thr->nextthg->prevthg = thr;
thg->thrtail = thr;
}
thg->threads += 1;
MIBMtaEntry->sc.StoredThreadsSc += 1;
}
static struct thread *create_thread __((struct threadgroup *thgrp,
struct vertex *vtx,
struct config_entry *cep));
static struct thread *
create_thread(thgrp, vtx, cep)
struct threadgroup *thgrp;
struct vertex *vtx;
struct config_entry *cep;
{
/* Create a thread-block, link in the group pointer,
and link the thread into thread-ring, plus APPEND
to the thread time-chain */
struct thread *thr;
thr = (struct thread *)emalloc(sizeof(*thr));
if (!thr) return NULL;
memset(thr,0,sizeof(*thr));
++threadid;
thr->threadid = threadid;
/* thr->attempts = 0;
thr->nextthg = NULL;
thr->prevthg = NULL; */
thr->thgrp = thgrp;
thr->wchan = vtx->orig[L_CHANNEL];
thr->whost = vtx->orig[L_HOST];
if (thgrp->cep->flags & CFG_QUEUEONLY) {
/* Start with the first retry */
if(thgrp->cep->nretries) {
mytime(&now);
thr->wakeup = now + thgrp->cep->retries[0];
}
}
thr->thvertices = vtx;
thr->lastthvertex = vtx;
thr->jobs = 1;
vtx->thread = thr;
thr->channel = strsave(vtx->orig[L_CHANNEL]->name);
thr->host = strsave(vtx->orig[L_HOST ]->name);
if (verbose) sfprintf(sfstderr,"create_thread(%s/%d/%s) -> %p\n",
thr->channel,thgrp->withhost,thr->host,thr);
_thread_timechain_append(thr);
return thr;
}
/*
* Pick next thread from the group which this process serves.
* At call the proc->pthread->proc does not contain us!
*
* Result is proc->pthread and proc->pthread->nextfeed being
* updated to new thread, and function returns 1.
* If no new thread can be picked (all are active, and whatnot),
* return 0.
*/
int
pick_next_thread(proc)
struct procinfo *proc;
{
struct thread *thr;
struct thread *thr0 = proc->pthread;
struct threadgroup *thg = proc->thg;
int once;
proc->pthread = NULL;
if (thg->cep->flags & CFG_QUEUEONLY)
return 0; /* We are QUEUE ONLY group, no auto-switch! */
mytime(&now);
for ( thr = thg->thread, once = 1;
thr && (once || (thr != thg->thread));
thr = thr->nextthg, once = 0 ) {
struct vertex * vp = thr->thvertices;
struct config_entry *ce = &(thr->thgrp->ce);
if (thr == thr0)
continue; /* No, can't be what we just were! */
if ((thr->wakeup > now) && (thr->attempts > 0))
continue; /* wakeup in future, unless first time around! */
if (vp && (thr->thrkids < ce->maxkidThread) &&
(thr->thrkids < thr->jobs) /* FIXME: real unfed count ? */) {
struct web * ho = vp->orig[L_HOST];
struct web * ch = vp->orig[L_CHANNEL];
if (thr->proc && thr->nextfeed == NULL)
continue; /* Nothing more to feed! See other threads! */
if (proc->ch != ch) /* MUST have same CH data - in case we ever
run with a clause where channel side is
partially wild-carded.. */
continue;
proc->pthread = thr;
thr->thrkids += 1;
if (thr->proc == NULL) {
/* Randomize the order of thread vertices
(or sort by spool file mtime, if in AGEORDER..) */
/* Also init thr->nextfeed */
thread_vertex_shuffle(thr);
}
if (thr->proc)
proc->pnext = thr->proc;
thr->proc = proc;
if (proc->pnext)
proc->pnext->pprev = proc;
if (proc->ho != NULL && proc->ho != ho) {
/* Get rid of the old host web */
proc->ho->kids -= 1;
unweb(L_HOST,proc->ho);
proc->ho = NULL;
}
/* Move the kid to this host web */
if (proc->ho != ho) {
proc->ho = ho;
proc->ho->kids += 1;
}
/* The channel is be the same at old and new threads */
/* Move the pickup pointer forward.. */
thg->thread = thg->thread->nextthg;
if (thr->nextfeed == NULL) {
thr->pending = "NoNextFeed";
return 0;
}
return 1;
}
}
/* No result :-( */
return 0;
}
int
delete_thread(thr)
struct thread *thr;
{
/* Unlink this thread from thread-chain, and thread
group. Decrement thread-group count */
struct threadgroup *thg = thr->thgrp;
if (thr->thrkids || thr->jobs) return 0;
if (verbose)
sfprintf(sfstderr,"delete_thread(%p:%s/%s) (thg=%p) jobs=%d\n",
thr,thr->channel,thr->host,thg, thr->jobs);
free(thr->channel);
free(thr->host);
/* Unlink us from the thread time-chain */
/* ... and thread-group-ring */
_thread_timechain_unlink(thr);
memset(thr, 0x55, sizeof(*thr));
free(thr);
return 1;
}
#if 0 /* Dead code.. */
static void _thread_linkfront __((struct thread *, struct vertex *, struct vertex *));
static void _thread_linkfront(thr,ap,vp)
struct thread *thr;
struct vertex *ap, *vp;
{
/* Link the VP in front of AP */
vp->previtem = ap->previtem;
if (ap->previtem != NULL)
ap->previtem->nextitem = vp;
ap->previtem = vp;
vp->nextitem = ap;
if (ap == thr->thvertices)
thr->thvertices = vp;
vp->thread = thr;
}
#endif
/* the _thread_linktail() links a vertex into thread */
static void _thread_linktail __((struct thread *, struct vertex *));
static void _thread_linktail(thr,vp)
struct thread *thr;
struct vertex *vp;
{
if (thr->thvertices != NULL) {
thr->lastthvertex->nextitem = vp;
vp->previtem = thr->lastthvertex;
} else {
thr->thvertices = vp;
vp->previtem = NULL;
}
vp->nextitem = NULL;
vp->thread = thr;
thr->lastthvertex = vp;
}
void thread_linkin(vp,cep,cfgid, ce_fillin)
struct vertex *vp;
struct config_entry *cep;
int cfgid;
void (*ce_fillin) __((struct threadgroup*, struct config_entry *));
{
struct threadgroup *thg;
struct thread *thr;
/* int matched = 0; */
int thg_once;
struct web *wc = vp->orig[L_CHANNEL];
struct web *wh = vp->orig[L_HOST];
mytime(&now);
if (verbose)
sfprintf(sfstderr,"thread_linkin([%s/%s],%s/%d/%s,%d)\n",
wc->name, wh->name, cep->channel,
cep->flags & CFG_WITHHOST, cep->host, cfgid);
/* char const *vp_chan = wc->name; */
/* char const *vp_host = wh->name; */
if (thrg_root == NULL)
create_threadgroup(cep,wc,wh,cep->flags & CFG_WITHHOST,ce_fillin);
/*
* Search for matching config-entry, AND matching channel,
* AND matching host (depending how the thread-group formation
* is allowed to happen..)
*
*/
for (thg = thrg_root, thg_once = 1;
thg && (thg_once || thg != thrg_root);
thg = thg->nextthg, thg_once = 0) {
int thr_once;
if (thg->cep != cep) /* Config entries don't match */
continue;
if (thg->wchan != wc) /* Channels don't match */
continue;
if (thg->withhost) {
if (thg->whost != wh) /* Tough, must have host match! */
continue;
}
/* The config-entry matches, we have changes to match group */
for (thr = thg->thread, thr_once = 1;
thr && (thr_once || (thr != thg->thread));
thr = thr->nextthg, thr_once = 0) {
#if 0
if (!thr->vertex) abort(); /* No vertices ?? */
/* no need ? (Channels within a group are identical..) */
if (wc != thr->wchan) abort();
#endif
/* Nice! What about host ? */
if (wh != thr->whost) continue;
/* We have matching channel, AND matching host */
/* Link the vertex into this thread! */
if (verbose)
sfprintf(sfstderr,"thread_linkin() to thg=%p[%s/%d/%s]; added into existing thread [%s/%s] thr->jobs=%d\n",
thg,cep->channel,thg->withhost,cep->host,
wc->name,wh->name,thr->jobs+1);
_thread_linktail(thr,vp);
vp->thgrp = thg;
thr->jobs += 1;
if (thr->proc) /* Caring about the UF count while running */
thr->unfed += 1;
if (thr->proc && (thr->nextfeed == NULL)) {
/* It is running, but no nextfeed is set (anymore),
tackle this vertex into the tail */
thr->nextfeed = vp;
}
/* Hookay.. Try to start it too... */
thread_start_(thr);
return;
}
/* No matching thread, however this GROUP matches (or does it?) */
/* Add a new thread into this group */
thr = create_thread(thg,vp,cep);
vp->thgrp = thg;
if (verbose)
sfprintf(sfstderr,"thread_linkin() to thg=%p[%s/%d/%s]; created a new thread %p [%s/%s]\n",
thg,cep->channel,thg->withhost,cep->host,
thr,wc->name,wh->name);
/* Try to start it too */
thread_start_(thr);
return;
}
/* Add a new group - and its thread .. */
thg = create_threadgroup(cep, wc, wh, cep->flags & CFG_WITHHOST, ce_fillin);
thr = create_thread(thg,vp,cep);
vp->thgrp = thg;
/* Try to start it too */
thread_start_(thr);
}
struct web *
web_findcreate(flag, s)
int flag;
const char *s;
{
struct spblk *spl;
struct web *wp;
spkey_t spk;
/* caller has done 'strlower()' to our input.. */
spk = symbol_db(s, spt_mesh[flag]->symbols);
spl = sp_lookup(spk, spt_mesh[flag]);
if (spl == NULL || (wp = (struct web *)spl->data) == NULL) {
/* Not found, create it */
wp = (struct web *)emalloc(sizeof (struct web));
memset((void*)wp, 0, sizeof (struct web));
sp_install(spk, (void *)wp, 0, spt_mesh[flag]);
wp->name = strsave(s);
wp->kids = 0;
wp->link = NULL;
wp->linkcnt = 0;
}
if (spl != NULL)
wp = (struct web*)spl->data;
return wp;
}
/*
* Deallocate a web entry (host or channel vertex header structure).
*/
void
unweb(flag, wp)
int flag;
struct web *wp;
{
struct spblk *spl = NULL;
spkey_t spk;
if (verbose)
sfprintf(sfstderr,"unweb(flag=%d wp=%p); linkcnt=%d kids=%d\n",
flag,wp,wp->kids,wp->linkcnt);
if (wp->linkcnt > 0) /* Yet objects holding it */
return;
if (wp->kids > 0) /* too early to actually remove it */
return;
spk = symbol_lookup_db((u_char *)wp->name, spt_mesh[flag]->symbols);
if ((spkey_t)0 == spk) /* Not in the symbol table */
return;
spl = sp_lookup(spk, spt_mesh[flag]);
if (spl != NULL) /* Should always have this ... */
sp_delete(spl, spt_mesh[flag]);
symbol_free_db((u_char *)wp->name, spt_mesh[flag]->symbols);
free(wp->name);
memset(wp, 0x55, sizeof(*wp));
free((char *)wp);
}
/*
* unthread(vtx) -- detach this vertex from its thread
*/
static void unthread __((struct thread *thr, struct vertex *vtx));
static void unthread(thr, vtx)
struct thread *thr;
struct vertex *vtx;
{
if (vtx->previtem != NULL)
vtx->previtem->nextitem = vtx->nextitem;
if (vtx->nextitem != NULL)
vtx->nextitem->previtem = vtx->previtem;
if (thr) {
thr->jobs -= 1;
if (thr->nextfeed == vtx)
thr->nextfeed = thr->nextfeed->nextitem;
if (thr->thvertices == vtx)
thr->thvertices = vtx->nextitem;
if (thr->lastthvertex == vtx)
thr->lastthvertex = vtx->previtem;
}
vtx->nextitem = NULL;
vtx->previtem = NULL;
}
/*
* Detach the vertex from its chains
*
* If here is a process, limbo it!
*/
void
web_detangle(vp, ok)
struct vertex *vp;
int ok;
{
/* If it was in processing, remove process node binding.
We do this only when we have reaped the channel program. */
struct thread *thr = vp->thread;
/* unthread() will also unpick the nextfeed link.. */
unthread(thr, vp);
/* The thread can now be EMPTY! */
if (thr && (thr->thvertices == NULL))
delete_thread(thr);
}
static int vtx_mtime_cmp __((const void *, const void *));
static int vtx_mtime_cmp(ap, bp)
const void *ap, *bp;
{
const struct vertex **a = (const struct vertex **)ap;
const struct vertex **b = (const struct vertex **)bp;
if ((*a)->cfp->mtime < (*b)->cfp->mtime)
return -1;
else if ((*a)->cfp->mtime == (*b)->cfp->mtime)
return 0;
else
return 1;
}
static void
thread_vertex_shuffle(thr)
struct thread *thr;
{
register struct vertex *vp;
register int n, i, ni;
static u_int ur_size = 0;
static struct vertex **ur_arr = NULL;
/* Randomize the order of vertices in processing, OR
sort them by spool-file MTIME, if the thread has
AGEORDER -flag set. */
/* 1) Create storage array for the vertex re-arrange */
if (ur_size == 0) {
ur_size = 100;
ur_arr = (struct vertex **)
emalloc(ur_size * sizeof (struct vertex *));
}
/* 2) Store the vertices into a re-arrange array (and count) */
for (n = 0, vp = thr->thvertices; vp != NULL; vp = vp->nextitem) {
if (n >= ur_size) {
ur_size *= 2;
ur_arr = (struct vertex **)realloc((char *)ur_arr,
ur_size *
sizeof (struct vertex *));
}
ur_arr[n++] = vp;
}
/* 3) re-arrange pointers */
if (thr->thgrp->ce.flags & CFG_AGEORDER) {
/* mtime order */
if (n > 1)
qsort((void*)ur_arr, n, sizeof(struct vertex *), vtx_mtime_cmp);
} else
/* Random order */
for (i = 0; i < n; ++i) {
ni = ranny(n-1);
vp = ur_arr[i];
ur_arr[i] = ur_arr[ni];
ur_arr[ni] = vp;
}
/* 4) Relink previtem/nextitem pointers */
for (i = 0; i < n; ++i) {
if (i > 0)
ur_arr[i]->previtem = ur_arr[i-1];
if (i < (n-1))
ur_arr[i]->nextitem = ur_arr[i+1];
#if 1
/* 4c) Clear wakeup timer; the feed_child() will refuse
to feed us, if this one is not cleared.. */
ur_arr[i]->wakeup = 0;
#endif
}
ur_arr[ 0]->previtem = NULL;
ur_arr[n-1]->nextitem = NULL;
/* 5) Finish the re-arrangement by saving the head,
and tail pointers */
thr->thvertices = ur_arr[ 0];
thr->nextfeed = ur_arr[ 0];
thr->lastthvertex = ur_arr[n-1];
thr->unfed = n;
}
static int
thread_start_(thr)
struct thread *thr;
{
struct config_entry *ce = &(thr->thgrp->ce);
if (thr->proc != NULL) {
/* There is *somebody* active! Shall we start, or not ? */
if (ce->flags & CFG_WAKEUPRESTARTONLY)
return 0;
}
return thread_start(thr, 0);
}
/*
* thread_start() -- start the thread, if:
* - if the thread is not already running
* - thread-group has idle processor (feed immediately)
* - if no resource limits are exceeded for starting it
*
* Return non-zero, if did start something.
*/
int
thread_start(thr, queueonly_too)
struct thread *thr;
int queueonly_too;
{
int rc;
struct vertex *vp = thr->thvertices;
struct threadgroup *thg = thr->thgrp;
struct config_entry *ce = &(thr->thgrp->ce);
struct web *ho;
struct web *ch;
queryipccheck();
if (!thr->thrkids && !thr->jobs) {
/* Cleanup when no processes, nor vertices */
delete_thread(thr);
return 0;
}
if (syncstart || (freeze && !slow_shutdown)) return 0;
if (!queueonly_too && (ce->flags & CFG_QUEUEONLY)) return 0;
ho = vp->orig[L_HOST];
ch = vp->orig[L_CHANNEL];
if (procselect) {
thr->pending = "procsel-mismatch";
if (*procselect != '*' &&
strcmp(procselect,ch->name) != 0)
return 0;
if (*procselhost != '*' &&
strcmp(procselhost,ho->name) != 0)
return 0;
}
thr->pending = NULL;
if (verbose)
sfprintf(sfstderr,"thread_start(thr=%s/%d/%s) (dt=%d thr=%p jobs=%d)\n",
ch->name, thg->withhost, ho->name, (int)(thr->wakeup-now),
thr, thr->jobs);
if ((thr->thrkids >= ce->maxkidThread) ||
/* FIXME: real unfed count ? */
(thr->proc && (thr->thrkids >= thr->unfed))) {
if (verbose) {
struct procinfo * proc = thr->proc;
sfprintf(sfstderr," -- already running; thrkids=%d jobs=%d procs={ %p",
thr->thrkids, thr->jobs, proc);
proc = proc->pnext;
while (proc) {
sfprintf(sfstderr, " %p", proc);
proc = proc->pnext;
}
sfprintf(sfstderr, " }\n");
}
return 0; /* Already running */
}
re_pick:
if (thg->idleproc) {
struct procinfo *proc;
/* There is at least one.. */
proc = thg->idleproc;
/* Idle processor(s) exists, try to optimize by finding
an idle proc with matching channel & host from previous
activity. If not found, pick any with matching CHANNEL,
unless must have also matching HOST... */
for (; proc && (proc->ho != ho || proc->ch != ch); proc = proc->pnext) ;
if (!proc && !thg->withhost) {
/* None of the previous ones matched, pick with matching CHANNEL,
HOST is allowed to wild-card.. */
proc = thg->idleproc;
for (; proc && (proc->ch != ch); proc = proc->pnext) ;
}
if (!proc)
goto create_new;
/* Selected one of them.. */
if (proc->pprev) proc->pprev->pnext = proc->pnext;
if (proc->pnext) proc->pnext->pprev = proc->pprev;
if (thg->idleproc == proc) thg->idleproc = proc->pnext;
proc->pnext = proc->pprev = NULL;
thg->idlecnt -= 1;
--idleprocs;
/* Move to ACTIVE state */
MIBMtaEntry->sc.TransportAgentsActiveSc += 1;
MIBMtaEntry->sc.TransportAgentsIdleSc -= 1;
/* It may be that while we idled it, it died at the idle queue.. */
if (proc->pid <= 0 || proc->tofd < 0) {
/* sfprintf(sfstderr,
"%% thread_start(thr=%s/%d/%s) (proc=%p ->pid=%d ->tofd=%d)\n",
ch->name, thg->withhost, ho->name, proc,
proc->pid, proc->tofd); */
goto re_pick;
}
/* Thread-groups are made such that here at thread_start() we
can always switch over in between threads */
if (proc->ho != NULL && proc->ho != ho) {
/* Get rid of the old host web */
proc->ho->kids -= 1;
unweb(L_HOST,proc->ho);
proc->ho = NULL;
}
/* Move the kid to this host web */
if (proc->ho != ho) {
proc->ho = ho;
proc->ho->kids += 1;
}
/* In theory the CHANNEL could be different -- in practice NOT! */
proc->ch = ch;
/* MULTI-TA-PER-THREAD -- only the first proc inits feed-state */
if (! thr->proc ) {
/* Randomize the order of thread vertices
(or sort by spool file mtime, if in AGEORDER..) */
/* Also init thr->nextfeed */
thread_vertex_shuffle(thr);
}
/* Its idle process, feed it! */
proc->state = CFSTATE_LARVA;
proc->overfed = 1; /* A simulated state.. */
proc->pthread = thr;
if (thr->proc) proc->pnext = thr->proc;
if (proc->pnext) proc->pnext->pprev = proc;
thr->proc = proc;
thr->thrkids += 1;
if (verbose)
sfprintf(sfstderr, "%% thread_start(thr=%s/%d/%s) (proc=%p dt=%d thr=%p jobs=%d)\n",
ch->name, thg->withhost, ho->name, thr->proc,
(int)(thr->wakeup-now), thr, thr->jobs);
ta_hungry(proc);
return 1;
}
create_new:
/* Check resource limits - MaxTa, MaxChan, MaxThrGrpTa */
/* If resources are exceeded, reschedule.. */
vp = thr->thvertices;
if (numkids >= ce->maxkids) {
vp->ce_pending = SIZE_L;
thr->pending = ">MaxTA";
} else if (vp->orig[L_CHANNEL]->kids >= ce->maxkidChannel) {
vp->ce_pending = L_CHANNEL;
thr->pending = ">MaxChannel";
} else if (thg->transporters >= ce->maxkidThreads) {
vp->ce_pending = L_HOST;
thr->pending = ">MaxRing";
} else if (thr->thrkids >= ce->maxkidThread) {
vp->ce_pending = SIZE_L;
thr->pending = ">MaxThr";
} else {
vp->ce_pending = 0;
thr->pending = NULL;
}
if (vp->ce_pending) {
if (verbose)
sfprintf(sfstderr,"%s: (%d %dC %dT %dTh) >= (%d %dC %dT %dTh)\n",
ce->command,
numkids,
vp->orig[L_CHANNEL]->kids,
thg->transporters,
thr->thrkids,
ce->maxkids,
ce->maxkidChannel,
ce->maxkidThreads,
ce->maxkidThread);
/*
* Would go over limit. Rescheduling for the next
* (single) interval works ok in many situation.
* However when the scheduler is very busy one can
* run into systemic problems with some set of messages
* blocking another set of messages. The only way
* around that is a roundrobin scheme, implemented
* by the fifo nature of the thread scheduling.
*/
reschedule(vp, 0, -1);
return 0;
}
/* Now we are ready to start a new child to run our bits */
if (! thr->proc ) {
/* MULTI-TA-PER-THREAD -- first proc inits the feed-state */
/* Randomize the order of thread vertices
(or sort by spool file mtime, if in AGEORDER..) */
/* Also init thr->nextfeed */
thread_vertex_shuffle(thr);
}
rc = start_child(thr->thvertices,
thr->thvertices->orig[L_CHANNEL],
thr->thvertices->orig[L_HOST]);
if (thr->proc && verbose)
sfprintf(sfstderr,"%% thread_start(thr=%s/%d/%s) (proc=%p dt=%d thr=%p jobs=%d)\n",
ch->name, thg->withhost, ho->name, thr->proc,
(int)(thr->wakeup-now), thr, thr->jobs);
return rc;
}
/*
* pick_next_vertex() -- pick next free to process vertex in this thread
*
* This is called *only* by feed_child(), and proc->vertex directs
* then the caller of feed_child() to tune the process state.
* (From STUFFING to FINISHING and possibly to IDLE.)
*
* - if (proc->pthread->nextfeed != NULL) ...nextfeed = ...nextfeed->nextitem;
* - return (...nextfeed != NULL);
*
*/
/* Return 0 for errors, 1 for success; result is at ...nextfeed */
int
pick_next_vertex(proc)
struct procinfo *proc;
{
struct thread * thr = proc->pthread;
struct vertex * vtx = NULL;
if (thr) vtx = thr->nextfeed;
if (verbose)
sfprintf(sfstderr,"pick_next_vertex(proc=%p) proc->tofd=%d, thr=%p, pvtx=%p, jobs=%d OF=%d S=%s\n",
proc, proc->tofd, thr, vtx, thr ? thr->jobs : 0,
proc->overfed, proc_state_names[proc->state]);
if (proc->pid < 0 || proc->tofd < 0) { /* "He is dead, Jim!" */
if (verbose) sfprintf(sfstderr," ... NONE, 'He is dead, Jim!'\n");
return 0;
}
if (vtx) /* Pick next item */
thr->nextfeed = vtx = vtx->nextitem;
return (vtx != NULL);
}
/*
* The thread_expire2() will handle exceedingly old things
* with ages in excess of expire+expire2 (seconds) in queue
* even if no successfull delivery attempt has been made.
*
* Return the kill-count.
*
* Side-effect warning:
* Afterwards the THR may point to nonexistent object!
*/
int
thread_expire2(thr, timelimit, killall, msgstr)
struct thread *thr;
time_t timelimit;
int killall; /* later uses in mind.. now dummy parameter */
const char *msgstr; /* ... likewise. */
{
int killcount = 0;
struct vertex *vtx = thr->thvertices;
struct vertex *nextvtx;
for ( ;vtx; vtx = nextvtx) {
int expire_this = 0;
nextvtx = vtx->nextitem;
/* Time to expire ? */
if (vtx->ce_expiry > 0 && vtx->ce_expiry <= now &&
vtx->attempts > 0) {
expire_this = 1;
}
if (vtx->ce_expiry2 > 0 && vtx->ce_expiry2 <= now) {
expire_this = 1;
}
if (expire_this) {
/* ... and now expire it! */
/* this MAY invalidate also the THREAD object! */
expire(vtx, -1); /* ... them all. */
++killcount;
mytime(&now);
if (now > timelimit) break;
}
}
return killcount;
}
/*
* The thread_reschedule() updates threads time-chain to match the
* new value of wakeup for the doagenda() to later use.
* Return 0 for DESTROYED thread, 1 for EXISTING thread.
*/
int
thread_reschedule(thr, retrytime, index)
struct thread *thr;
int index;
time_t retrytime;
{
struct vertex *vtx = thr->thvertices;
struct vertex *nvtx;
time_t wakeup = 0;
int skew;
if (verbose)
sfprintf(sfstderr,"thread_reschedule() ch=%s ho=%s jobs=%d thr=%p proc=%p\n",
thr->channel,thr->host,thr->jobs,thr,thr->proc);
if (!thr->thrkids && !thr->jobs) {
delete_thread(thr);
return 0;
}
/* If there are multiple kids working still, DON'T reschedule! */
if ((thr->thrkids > 0) || (vtx == NULL)) return 1;
/* find out when to retry */
mytime(&now);
/* if we are already scheduled for the future, don't reschedule */
if (vtx->wakeup > now) {
thr->wakeup = vtx->wakeup;
if (verbose)
sfprintf(sfstderr,"...prescheduled\n");
goto timechain_handling;
} else if (vtx->wakeup < now-7200 /* more than 2h in history .. */ )
vtx->wakeup = now;
if (vtx->thgrp->ce.nretries <= 0) {
if (verbose)
sfprintf(sfstderr,"...ce->retries = %d\n", vtx->thgrp->ce.nretries);
goto timechain_handling;
}
if (thr->retryindex >= vtx->thgrp->ce.nretries) {
if (vtx->thgrp->ce.nretries > 1)
thr->retryindex = ranny(vtx->thgrp->ce.nretries-1);
else
thr->retryindex = 0;
}
/*
* clamp retry time to a predictable interval so we
* eventually bunch up deliveries.
*/
if (retrytime > 100000 && retrytime < now+63)
vtx->wakeup = now;
#if 0
skew = vtx->wakeup % vtx->thgrp->ce.interval;
if (skew <= vtx->thgrp->ce.interval / 2)
skew = - (skew + (vtx->thgrp->ce.skew - 1));
else
skew = skew + (vtx->thgrp->ce.skew - 1);
skew = skew / vtx->thgrp->ce.skew; /* want int div truncation */
vtx->wakeup += (skew +
vtx->thgrp->ce.retries[thr->retryindex] * vtx->thgrp->ce.interval);
#else
/* Actually we do NOT want to have synchronization of threads,
as such causes simultaneous start of transporters, which
causes "somewhat" spiky load behaviour */
skew = vtx->thgrp->ce.retries[thr->retryindex] * vtx->thgrp->ce.interval;
if (retrytime <= 100000 &&
(int)retrytime > skew)
skew = retrytime;
vtx->wakeup += skew;
#endif
thr->retryindex++;
/* If history, move forward by ``ce.interval'' multiple */
if (vtx->wakeup < now)
vtx->wakeup += ((((now - vtx->wakeup) / vtx->thgrp->ce.interval)+1)
* vtx->thgrp->ce.interval);
wakeup = vtx->wakeup;
if (retrytime < now+63)
retrytime = wakeup;
/* Reschedule ALL vertices on this thread */
for ( ;vtx; vtx = nvtx) {
int expire_this = 0;
nvtx = vtx->nextitem;
/* Time to expire ? */
if (vtx->ce_expiry > 0 && vtx->ce_expiry <= now &&
vtx->attempts > 0) {
expire_this = 1;
}
if (vtx->ce_expiry2 > 0 && vtx->ce_expiry2 <= now) {
expire_this = 1;
}
if (expire_this) {
/* ... and now expire it! */
/* this MAY invalidate also the THREAD object! */
if (thr->jobs > 1) {
expire(vtx, index);
} else {
expire(vtx, index);
thr = NULL; /* The THR-pointed object is now invalid */
}
continue;
}
/* Didn't expire, so time to tune the wakeup ... */
if (vtx->wakeup < retrytime)
vtx->wakeup = retrytime;
if (wakeup > vtx->wakeup || wakeup == 0)
wakeup = vtx->wakeup;
}
if (thr != NULL)
thr->wakeup = wakeup;
timechain_handling:
/* In every case the rescheduling means we move this thread
to the end of the thread_head chain.. */
if (thr != NULL) {
_thread_timechain_unlink(thr);
_thread_timechain_append(thr);
}
return (thr != NULL);
}
/*
* reschedule() operates WITHIN a thread, but does *not* move things!
*
*/
void
reschedule(vp, factor, index)
struct vertex *vp;
int factor, index;
{
int skew;
struct thread *thr = vp->thread;
struct threadgroup *thg = vp->thgrp;
struct config_entry *ce = &(thg->ce);
/* Hmm.. The reschedule() is called only when we have a reason
to call it, doesn't it ? */
/* find out when to retry */
mytime(&now);
if (verbose)
sfprintf(sfstderr,"reschedule %p now %d expiry in %d attempts %d factor %d inum %d (%s/%s: %s)\n",
vp, (int)now,
(int)((vp->ce_expiry > 0) ? (vp->ce_expiry - now) : -999),
vp->attempts,
factor, (int)(vp->cfp->id),
vp->orig[L_CHANNEL]->name,
vp->orig[L_HOST]->name,
vp->cfp->mid);
/* if we are already scheduled for the future, don't reschedule */
if (vp->wakeup > now) {
if (verbose)
sfprintf(sfstderr,"prescheduled\n");
return;
} else if (vp->wakeup < now-7200 /* more than 2h .. */ )
vp->wakeup = now;
if (ce->nretries <= 0) {
if (verbose)
sfprintf(sfstderr,"ce->retries = %d\n", ce->nretries);
return;
}
if (factor == -1 && vp->attempts) {
if (thr->retryindex >= ce->nretries) {
if (ce->nretries > 1)
thr->retryindex = ranny(ce->nretries-1);
else
thr->retryindex = 0;
}
/*
* clamp retry time to a predictable interval so we
* eventually bunch up deliveries.
*/
skew = vp->wakeup % ce->interval;
if (skew <= ce->interval / 2)
skew = - (skew + (ce->skew - 1));
else
skew = skew + (ce->skew - 1);
skew = skew / ce->skew; /* want int div truncation */
vp->wakeup += (skew +
ce->retries[thr->retryindex] * ce->interval);
thr->retryindex++;
} else if (factor < -1) {
vp->wakeup = -factor;
} else
vp->wakeup += factor * ce->interval;
/* I THINK there could be an assert that if this happens,
something is WRONG.. */
if (vp->attempts == 0)
vp->wakeup = now;
/* XX: change this to a mod expression */
if (vp->wakeup < now)
vp->wakeup = ((((now - vp->wakeup) / ce->interval)+1)
* ce->interval) + 10 + 2*thr->jobs;
/* Makes sure that next future event is at +10+2*jobcount seconds
in the future.. A kludge approach, but still.. */
if (vp->ce_expiry > 0
&& vp->ce_expiry <= vp->wakeup
&& vp->attempts > 0) {
if (verbose)
sfprintf(sfstderr,"ce_expiry = %d, %d attempts\n",
(int)(vp->ce_expiry), vp->attempts);
/* expire() will delete this vertex in due time */
expire(vp, index);
return;
}
}
/*
* With the idle_cleanup() we clean up idle processes, that have
* been idle for too long (idlemax=nnnns)
*
* Because during its progress the thread-groups can disappear,
* (as is one of its mandates) this code looks a bit peculiar..
*/
int
idle_cleanup()
{
/* global: time_t now */
struct threadgroup *thg, *nthg;
int thg_once;
int freecount = 0;
mytime(&now);
if (verbose) sfprintf(sfstderr,"idle_cleanup()\n");
if (!thrg_root) return 0; /* No thread group! */
for (thg = thrg_root, thg_once = 1;
thg_once || (thg != thrg_root);
thg = nthg, thg_once = 0) {
nthg = thg->nextthg;
if (thg->thread != NULL) {
struct procinfo *p;
struct thread *thr, *nthr;
int thr_once;
/* Clean-up faulty client -- KLUDGE :-( -- OF=0, HA > much */
for (thr = thg->thread, thr_once = 1;
thr && (thr_once || (thr != thg->thread));
thr = nthr, thr_once = 0) {
nthr = thr->nextthg;
p = thr->proc;
if (thr->thgrp != thg) /* Not of this group ? */
continue; /* Next! */
if (!p) /* No process */
continue;
if ((p->cmdlen == 0) && (p->overfed == 0) && (p->tofd >= 0) &&
(p->hungertime != 0) && (p->hungertime + MAX_HUNGER_AGE <= now)) {
/* Close the command channel, let it die itself.
Rest of the cleanup happens via mux() service. */
if (verbose)
sfprintf(sfstderr,"idle_cleanup() killing TA on tofd=%d pid=%d\n",
p->tofd, (int)p->pid);
thr->wakeup = now-1; /* reschedule immediately! */
write(p->tofd,"\n",1); /* XXXX: should this be removed ?? */
pipes_shutdown_child(p->tofd);
p->tofd = -1;
++freecount;
/* Reclaim will (in due time) detect dead child, and
decrement child counters. */
zsyslog((LOG_ERR, "ZMailer scheduler kludge shutdown of TA channel (info for debug only); %s/%s/%d HA=%ds",
thr->channel, thr->host, thr->thgrp->withhost,
(int)(now - p->hungertime)));
}
}
}
if (thg->idleproc) {
struct procinfo *p;
p = thg->idleproc;
while (p != NULL) {
if ((thg->cep->idlemax + p->hungertime < now) &&
(p->cmdlen == 0) && (p->tofd >= 0)) {
/* It is old enough -- ancient, one might say.. */
/* Close the command channel, let it die itself.
Rest of the cleanup happens via mux() service. */
if (verbose)
sfprintf(sfstderr,"idle_cleanup() killing TA on tofd=%d pid=%d\n",
p->tofd, (int)p->pid);
write(p->tofd,"\n",1);
pipes_shutdown_child(p->tofd);
p->tofd = -1;
++freecount;
}
/* Move to the next possible idle process */
p = p->pnext;
}
}
/* If there are no threads, nor transporters, delete the thg */
delete_threadgroup(thg);
}
return freecount;
}
static time_t oldest_age_on_thread __((struct thread *));
static time_t oldest_age_on_thread(th) /* returns the AGE in seconds.. */
struct thread *th;
{
register time_t oo = now+1;
register struct vertex *vp;
vp = th->thvertices;
while (vp) {
if (vp->cfp->mtime < oo)
oo = vp->cfp->mtime;
vp = vp->nextitem;
}
return (now - oo);
}
void thread_report(fp,mqmode)
Sfio_t *fp;
int mqmode;
{
struct threadgroup *thg;
int thg_once = 1;
int jobsum = 0, jobtotal = 0;
int threadsum = 0;
char timebuf[20];
int width;
int cnt, procs, thrkidsum;
int rcptsum = 0;
struct procinfo *p;
struct thread *thr;
int spc = (mqmode & MQ2MODE_FULL) ? ' ' : '\t';
mytime(&now);
#if 0
if (thrg_root == NULL) {
*timebuf = 0;
saytime((long)(now - sched_starttime), timebuf, 1);
sfprintf(fp,"No threads/processes. Uptime: %s\n",timebuf);
return;
}
#endif
for (thg = thrg_root;
thg && (thg_once || thg != thrg_root);
thg = thg->nextthg) {
int thr_once;
thg_once = 0;
if (mqmode & (MQ2MODE_FULL | MQ2MODE_QQ)) {
sfprintf(fp,"%s/%s/%d\n",
thg->cep->channel, thg->cep->host, thg->withhost);
}
cnt = 0;
procs = 0;
jobsum = 0;
thrkidsum = 0;
#if 1 /* XX: zero for verifying of modified system; turn to 1 for running! */
/* We scan thru the local ring of threads */
for (thr = thg->thread, thr_once = 1;
thr && (thr_once || (thr != thg->thread));
thr = thr->nextthg, thr_once = 0)
#else
/* We scan there in start order from the thread_head
chain! */
for (thr = thread_head;
thr != NULL;
thr = thr->nexttr)
#endif
{
if (thr->thgrp != thg) /* Not of this group ? */
continue; /* Next! */
{
struct vertex *vp = thr->thvertices;
while (vp != NULL) {
rcptsum += vp->ngroup;
vp = vp->nextitem;
}
}
if (mqmode & MQ2MODE_FULL2) {
width = sfprintf(fp,"%s\t%s\t",
/* thr->thvertices->orig[L_CHANNEL]->name */
thr->channel,
/* thr->thvertices->orig[L_HOST]->name */
thr->host);
} else if (mqmode & MQ2MODE_FULL) {
width = sfprintf(fp," %s/%s/%d",
/* thr->thvertices->orig[L_CHANNEL]->name */
thr->channel,
/* thr->thvertices->orig[L_HOST]->name */
thr->host,
thr->thgrp->withhost);
if (width < 0) break; /* error.. */
width += 7;
if (width < 16-1)
sfprintf(fp,"\t");
if (width < 24-1)
sfprintf(fp,"\t");
if (width < 32-1)
sfprintf(fp,"\t");
if (width < 40-1)
sfprintf(fp,"\t");
else
sfprintf(fp," ");
}
jobsum += thr->jobs;
if (mqmode & MQ2MODE_FULL)
sfprintf(fp,"R=%-3d A=%-2d", thr->jobs, thr->attempts);
if (mqmode & MQ2MODE_FULL2)
sfprintf(fp,"R=%d\tA=%d", thr->jobs, thr->attempts);
++cnt;
if (thr->proc != NULL &&
thr->proc->pthread == thr) {
int thrprocs = 0;
struct procinfo *proc;
for (proc = thr->proc; proc; proc = proc->pnext) {
++procs;
++thrprocs;
++thrkidsum;
}
if (mqmode & (MQ2MODE_FULL|MQ2MODE_FULL2)) {
proc = thr->proc;
if (thr->thrkids != thrprocs)
sfprintf(fp, "%cKids=%d/%d", spc, thr->thrkids, thrprocs);
sfprintf(fp, "%cP={", spc);
while (proc) {
sfprintf(fp, "%d", (int)proc->pid);
if (proc->pnext) sfprintf(fp, ",");
proc = proc->pnext;
}
sfprintf(fp, "}");
proc = thr->proc;
sfprintf(fp, "%cHA={", spc);
while (proc) {
sfprintf(fp, "%d", (int)(now - proc->hungertime));
if (proc->pnext) sfprintf(fp, ",");
proc = proc->pnext;
}
sfprintf(fp, "}s");
proc = thr->proc;
sfprintf(fp, "%cFA={", spc);
while (proc) {
if (proc->feedtime == 0)
sfprintf(fp, "never");
else
sfprintf(fp, "%d", (int)(now - proc->feedtime));
if (proc->pnext) sfprintf(fp, ",");
proc = proc->pnext;
}
sfprintf(fp, "}s");
proc = thr->proc;
sfprintf(fp, "%cOF={", spc);
while (proc) {
sfprintf(fp, "%d", proc->overfed);
if (proc->pnext) sfprintf(fp, ",");
proc = proc->pnext;
}
sfprintf(fp, "}");
proc = thr->proc;
sfprintf(fp, "%cS={", spc);
while (proc) {
sfprintf(fp, "%s", proc_state_names[proc->state]);
if (proc->pnext) sfprintf(fp, ",");
proc = proc->pnext;
}
sfprintf(fp, "}");
sfprintf(fp, "%cUF=%d", spc, thr->unfed);
}
} else if (thr->wakeup > now) {
if (mqmode & (MQ2MODE_FULL|MQ2MODE_FULL2)) {
sfprintf(fp,"%cW=%ds", spc, (int)(thr->wakeup - now));
}
} else if (thr->pending) {
if (mqmode & (MQ2MODE_FULL|MQ2MODE_FULL2)) {
sfprintf(fp,"%cpend=%s", spc, thr->pending);
}
}
if (mqmode & (MQ2MODE_FULL|MQ2MODE_FULL2)) {
*timebuf = 0;
saytime((long)oldest_age_on_thread(thr), timebuf, 1);
sfprintf(fp, "%cQA=%s", spc, timebuf);
if (thr->thvertices && thr->thvertices->ce_pending)
if (thr->thvertices->ce_pending != SIZE_L && spc == ' ')
sfprintf(fp, "%s",
(thr->thvertices->ce_pending ==
L_CHANNEL ? " channelwait" : " threadwait"));
sfprintf(fp, "\n");
}
}
if (mqmode & (MQ2MODE_FULL | MQ2MODE_QQ)) {
sfprintf(fp,"\tThreads: %4d",thg->threads);
if (thg->threads != cnt)
sfprintf(fp,"/%d",cnt);
}
cnt = 0;
for (p = thg->idleproc; p != 0; p = p->pnext) ++cnt;
procs += cnt;
if (mqmode & (MQ2MODE_FULL | MQ2MODE_QQ)) {
sfprintf(fp, " Msgs: %5d Procs: %3d", jobsum, thg->transporters);
if (thg->transporters != procs)
sfprintf(fp,"/%d",procs);
sfprintf(fp," Idle: %3d",thg->idlecnt);
if (thg->idlecnt != cnt)
sfprintf(fp, "/%d", cnt);
sfprintf(fp, " Plim: %3d Flim: %3d Tlim: %d\n",
thg->ce.maxkidThreads, thg->ce.overfeed, thg->ce.maxkidThread);
}
jobtotal += jobsum;
threadsum += thg->threads;
}
if (mqmode & (MQ2MODE_FULL | MQ2MODE_QQ | MQ2MODE_SNMP)) {
long files;
*timebuf = 0;
saytime((long)(now - sched_starttime), timebuf, 1);
sfprintf(fp,"Kids: %d Idle: %2d Msgs: %3d Thrds: %3d Rcpnts: %4d Uptime: ",
numkids, idleprocs, global_wrkcnt, threadsum, jobtotal);
if (mqmode & MQ2MODE_SNMP)
sfprintf(fp, "%ld sec\n",(long)(now - sched_starttime));
else
sfprintf(fp, "%s\n",timebuf);
sfprintf(fp, "Msgs in %lu out %lu stored %ld ",
(u_long)MIBMtaEntry->sc.ReceivedMessagesSc,
(u_long)MIBMtaEntry->sc.TransmittedMessagesSc,
(long)MIBMtaEntry->sc.StoredMessagesSc);
files = thread_count_files();
if ((long)MIBMtaEntry->sc.StoredMessagesSc != files)
sfprintf(fp, "(%ld) ", files);
sfprintf(fp, "Rcpnts in %lu out %lu stored %ld",
(u_long)MIBMtaEntry->sc.ReceivedRecipientsSc,
(u_long)MIBMtaEntry->sc.TransmittedRecipientsSc,
(long)MIBMtaEntry->sc.StoredRecipientsSc);
if (rcptsum != MIBMtaEntry->sc.StoredRecipientsSc)
sfprintf(fp, " (%d)", rcptsum);
sfprintf(fp, "\n");
}
sfsync(fp);
}
void thread_detail_report(fp,mqmode,channel,host)
Sfio_t *fp;
int mqmode;
char *channel, *host;
{
struct thread *th;
spkey_t spk;
struct spblk *spl;
struct web *wh, *wc;
struct vertex *vp;
int i;
char buf[100];
mytime(&now);
spk = symbol_lookup_db((void*)channel, spt_mesh[L_CHANNEL]->symbols);
spl = sp_lookup(spk, spt_mesh[L_CHANNEL]);
if (spl == NULL || spl->data == NULL) {
/* Not found, nothing to do.. */
return;
}
wc = (struct web *)spl->data;
spk = symbol_lookup_db((void*)host, spt_mesh[L_HOST]->symbols);
spl = sp_lookup(spk, spt_mesh[L_HOST]);
if (spl == NULL || spl->data == NULL) {
/* Not found, nothing to do.. */
return;
}
wh = (struct web *)spl->data;
for (th = thread_head; th; th = th->nexttr) {
if (wh == th->whost && wc == th->wchan) {
break;
}
}
if (th) {
/* Found it! */
for (vp = th->thvertices; vp; vp = vp->nextitem) {
struct ctlfile *cfp = vp->cfp;
for (i = 0; i < vp->ngroup; ++i) {
/* Spoolfile */
sfprintf(fp, "%s%s", cfpdirname(cfp->dirind), cfp->mid);
/* How manyth in a group ? */
sfprintf(fp, "\t%d", i);
/* Sender index -- or sender address */
sfprintf(fp, "\t%s", cfp->erroraddr);
/* Recipient offset */
sfprintf(fp,"\t%d", cfp->offset[vp->index[i]]);
/* Expiry stamp */
sfprintf(fp,"\t%ld", (long)vp->ce_expiry);
/* next wakeup */
sfprintf(fp,"\t%ld", (long)vp->wakeup);
/* last feed time */
sfprintf(fp,"\t%ld", (long)vp->lastfeed);
/* attempts */
sfprintf(fp,"\t%d\t", vp->attempts);
/* ce_pending */
if (vp->wakeup > now) {
*buf = 0;
saytime((long)(vp->wakeup - now), buf, 1);
sfprintf(fp,"retry in %s", buf);
} else {
switch(vp->ce_pending) {
case SIZE_L: /* BAD! */
break;
case L_CHANNEL:
sfprintf(fp,"channel");
break;
default:
sfprintf(fp,"thread");
break;
}
}
/* message - if any */
sfprintf(fp, "\t");
if (vp->message)
sfprintf(fp,"%s", vp->message);
sfprintf(fp, "\n");
}
}
}
sfsync(fp);
}
int thread_count_recipients()
{
struct threadgroup *thg;
struct thread *thr;
int thg_once;
int jobtotal = 0;
if (thrg_root == NULL)
return 0;
if (thrg_root)
for (thg = thrg_root, thg_once = 1;
thg_once || thg != thrg_root;
thg = thg->nextthg, thg_once = 0) {
int thr_once;
int jobsum = 0;
if (thg->thread)
for (thr = thg->thread, thr_once = 1;
thr_once || (thr != thg->thread);
thr = thr->nextthg, thr_once = 0) {
jobsum += thr->jobs;
}
jobtotal += jobsum;
}
return jobtotal;
}
static int thread_files_count;
static int spl_thread_cnt_files __((struct spblk *spl));
static int spl_thread_cnt_files(spl)
struct spblk *spl;
{
++thread_files_count;
return 0;
}
int thread_count_files __((void))
{
thread_files_count = 0;
sp_scan(spl_thread_cnt_files, NULL, spt_mesh[L_CTLFILE]);
return thread_files_count;
}
syntax highlighted by Code2HTML, v. 0.9.1