/* * ZMailer 2.99.16+ Scheduler "threads" routines * * Copyright Matti Aarnio 1995-2003 * * * These "threads" are for book-keeping of information * regarding schedulable recipient vertices */ #include #include #include #include #include "scheduler.h" #include "prototypes.h" #include "zsyslog.h" /* #include */ /* Each vertex arrives into SOME thread, each of which belong to SOME thread-group, among which groups the transport channel programs started for any member thread can be shared among their members. */ #define MAX_HUNGER_AGE 600 /* Sign of an error ... */ extern char *proc_state_names[]; struct thread *thread_head = NULL; struct thread *thread_tail = NULL; static struct threadgroup *thrg_root = NULL; int idleprocs = 0; extern int global_wrkcnt; extern int syncstart; extern int freeze; extern int slow_shutdown; extern time_t now; extern char *procselect, *procselhost; extern time_t sched_starttime; /* From main() */ extern int mailqmode; /* 1 or 2 */ static long groupid = 0; static long threadid = 0; static void thread_vertex_shuffle __((struct thread *thr)); static struct threadgroup *create_threadgroup __((struct config_entry *cep, struct web *wc, struct web *wh, int withhost, void (*ce_fillin)__((struct threadgroup *, struct config_entry *)) )); static int thread_start_ __((struct thread *thr)); static struct threadgroup * create_threadgroup(cep, wc, wh, withhost, ce_fillin) struct config_entry *cep; struct web *wc, *wh; int withhost; void (*ce_fillin) __((struct threadgroup*, struct config_entry *)); { struct threadgroup *thgp; /* Create a thread-group and link it into group-ring */ thgp = (struct threadgroup*)malloc(sizeof(*thgp)); if (!thgp) return NULL; memset(thgp,0,sizeof(*thgp)); ++groupid; thgp->groupid = groupid; thgp->cep = cep; thgp->withhost = withhost; thgp->wchan = wc; thgp->whost = wh; ce_fillin(thgp,cep); wc->linkcnt += 1; wh->linkcnt += 1; if (thrg_root == NULL) { thrg_root = thgp; thgp->nextthg = thgp; thgp->prevthg = thgp; } else { thgp->nextthg = thrg_root->nextthg; thgp->prevthg = thrg_root->nextthg->prevthg; thgp->prevthg->nextthg = thgp; thgp->nextthg->prevthg = thgp; } return thgp; } void delete_threadgroup(thgp) struct threadgroup *thgp; { struct threadgroup *tgp; /* Time to say good-bye to this group, delete it. We shall not have any threads under us, nor idle processes! */ /* However we may be called with either of these values still non-zero... */ if (thgp->transporters || thgp->threads) return; if (verbose) sfprintf(sfstderr,"delete_threadgroup(%s/%d/%s)\n", thgp->wchan->name,thgp->withhost,thgp->whost->name); if (thgp->idleproc != NULL || thgp->thread != NULL) abort(); /* Deleting non-empty thread-group! */ if (thrg_root == NULL) abort(); /* No thread-group root! */ /* We are possibly the last to keep these web links */ thgp->wchan->linkcnt -= 1; unweb(L_CHANNEL,thgp->wchan); thgp->whost->linkcnt -= 1; unweb(L_HOST,thgp->whost); /* Unlink this thread-group from the ring */ tgp = thgp->nextthg; thgp->prevthg->nextthg = thgp->nextthg; tgp->prevthg = thgp->prevthg; /* are we at the ring root pointer ? */ if (thrg_root == thgp) thrg_root = tgp; if (tgp == thgp) { /* We were the only one! */ thrg_root = NULL; } memset(thgp, 0x55, sizeof(*thgp)); free(thgp); } static void _thread_timechain_unlink __((struct thread *)); static void _thread_timechain_unlink(thr) struct thread *thr; { struct threadgroup *thg = thr->thgrp; /* Doubly linked linear list */ if (thr->prevtr != NULL) thr->prevtr->nexttr = thr->nexttr; if (thr->nexttr != NULL) thr->nexttr->prevtr = thr->prevtr; if (thread_head == thr) thread_head = thr->nexttr; if (thread_tail == thr) thread_tail = thr->prevtr; thr->nexttr = NULL; thr->prevtr = NULL; /* Doubly linked circullar list */ thr->prevthg->nextthg = thr->nextthg; thr->nextthg->prevthg = thr->prevthg; thg->threads -= 1; MIBMtaEntry->sc.StoredThreadsSc -= 1; if (thg->thread == thr) thg->thread = thr->nextthg; /* pick other */ if (thg->thread == thr) thg->thread = NULL; /* was the only one! */ if (thg->thrtail == thr) thg->thrtail = thr->prevthg; /* pick other */ if (thg->thrtail == thr) thg->thrtail = NULL; /* was the only one! */ thr->prevthg = NULL; thr->nextthg = NULL; } static void _thread_timechain_append __((struct thread *)); static void _thread_timechain_append(thr) struct thread *thr; { struct threadgroup *thg = thr->thgrp; /* Doubly linked linear list */ if (thread_head == NULL) { thread_head = thr; thread_tail = thr; thr->nexttr = NULL; thr->prevtr = NULL; } else { thread_tail->nexttr = thr; thr->nexttr = NULL; thr->prevtr = thread_tail; thread_tail = thr; } /* Doubly linked circullar list */ if (thg->thread == NULL) { thg->thread = thr; thg->thrtail = thr; thr->nextthg = thr; thr->prevthg = thr; } else { thr->nextthg = thg->thrtail->nextthg; thg->thrtail->nextthg = thr; thr->prevthg = thr->nextthg->prevthg; thr->nextthg->prevthg = thr; thg->thrtail = thr; } thg->threads += 1; MIBMtaEntry->sc.StoredThreadsSc += 1; } static struct thread *create_thread __((struct threadgroup *thgrp, struct vertex *vtx, struct config_entry *cep)); static struct thread * create_thread(thgrp, vtx, cep) struct threadgroup *thgrp; struct vertex *vtx; struct config_entry *cep; { /* Create a thread-block, link in the group pointer, and link the thread into thread-ring, plus APPEND to the thread time-chain */ struct thread *thr; thr = (struct thread *)emalloc(sizeof(*thr)); if (!thr) return NULL; memset(thr,0,sizeof(*thr)); ++threadid; thr->threadid = threadid; /* thr->attempts = 0; thr->nextthg = NULL; thr->prevthg = NULL; */ thr->thgrp = thgrp; thr->wchan = vtx->orig[L_CHANNEL]; thr->whost = vtx->orig[L_HOST]; if (thgrp->cep->flags & CFG_QUEUEONLY) { /* Start with the first retry */ if(thgrp->cep->nretries) { mytime(&now); thr->wakeup = now + thgrp->cep->retries[0]; } } thr->thvertices = vtx; thr->lastthvertex = vtx; thr->jobs = 1; vtx->thread = thr; thr->channel = strsave(vtx->orig[L_CHANNEL]->name); thr->host = strsave(vtx->orig[L_HOST ]->name); if (verbose) sfprintf(sfstderr,"create_thread(%s/%d/%s) -> %p\n", thr->channel,thgrp->withhost,thr->host,thr); _thread_timechain_append(thr); return thr; } /* * Pick next thread from the group which this process serves. * At call the proc->pthread->proc does not contain us! * * Result is proc->pthread and proc->pthread->nextfeed being * updated to new thread, and function returns 1. * If no new thread can be picked (all are active, and whatnot), * return 0. */ int pick_next_thread(proc) struct procinfo *proc; { struct thread *thr; struct thread *thr0 = proc->pthread; struct threadgroup *thg = proc->thg; int once; proc->pthread = NULL; if (thg->cep->flags & CFG_QUEUEONLY) return 0; /* We are QUEUE ONLY group, no auto-switch! */ mytime(&now); for ( thr = thg->thread, once = 1; thr && (once || (thr != thg->thread)); thr = thr->nextthg, once = 0 ) { struct vertex * vp = thr->thvertices; struct config_entry *ce = &(thr->thgrp->ce); if (thr == thr0) continue; /* No, can't be what we just were! */ if ((thr->wakeup > now) && (thr->attempts > 0)) continue; /* wakeup in future, unless first time around! */ if (vp && (thr->thrkids < ce->maxkidThread) && (thr->thrkids < thr->jobs) /* FIXME: real unfed count ? */) { struct web * ho = vp->orig[L_HOST]; struct web * ch = vp->orig[L_CHANNEL]; if (thr->proc && thr->nextfeed == NULL) continue; /* Nothing more to feed! See other threads! */ if (proc->ch != ch) /* MUST have same CH data - in case we ever run with a clause where channel side is partially wild-carded.. */ continue; proc->pthread = thr; thr->thrkids += 1; if (thr->proc == NULL) { /* Randomize the order of thread vertices (or sort by spool file mtime, if in AGEORDER..) */ /* Also init thr->nextfeed */ thread_vertex_shuffle(thr); } if (thr->proc) proc->pnext = thr->proc; thr->proc = proc; if (proc->pnext) proc->pnext->pprev = proc; if (proc->ho != NULL && proc->ho != ho) { /* Get rid of the old host web */ proc->ho->kids -= 1; unweb(L_HOST,proc->ho); proc->ho = NULL; } /* Move the kid to this host web */ if (proc->ho != ho) { proc->ho = ho; proc->ho->kids += 1; } /* The channel is be the same at old and new threads */ /* Move the pickup pointer forward.. */ thg->thread = thg->thread->nextthg; if (thr->nextfeed == NULL) { thr->pending = "NoNextFeed"; return 0; } return 1; } } /* No result :-( */ return 0; } int delete_thread(thr) struct thread *thr; { /* Unlink this thread from thread-chain, and thread group. Decrement thread-group count */ struct threadgroup *thg = thr->thgrp; if (thr->thrkids || thr->jobs) return 0; if (verbose) sfprintf(sfstderr,"delete_thread(%p:%s/%s) (thg=%p) jobs=%d\n", thr,thr->channel,thr->host,thg, thr->jobs); free(thr->channel); free(thr->host); /* Unlink us from the thread time-chain */ /* ... and thread-group-ring */ _thread_timechain_unlink(thr); memset(thr, 0x55, sizeof(*thr)); free(thr); return 1; } #if 0 /* Dead code.. */ static void _thread_linkfront __((struct thread *, struct vertex *, struct vertex *)); static void _thread_linkfront(thr,ap,vp) struct thread *thr; struct vertex *ap, *vp; { /* Link the VP in front of AP */ vp->previtem = ap->previtem; if (ap->previtem != NULL) ap->previtem->nextitem = vp; ap->previtem = vp; vp->nextitem = ap; if (ap == thr->thvertices) thr->thvertices = vp; vp->thread = thr; } #endif /* the _thread_linktail() links a vertex into thread */ static void _thread_linktail __((struct thread *, struct vertex *)); static void _thread_linktail(thr,vp) struct thread *thr; struct vertex *vp; { if (thr->thvertices != NULL) { thr->lastthvertex->nextitem = vp; vp->previtem = thr->lastthvertex; } else { thr->thvertices = vp; vp->previtem = NULL; } vp->nextitem = NULL; vp->thread = thr; thr->lastthvertex = vp; } void thread_linkin(vp,cep,cfgid, ce_fillin) struct vertex *vp; struct config_entry *cep; int cfgid; void (*ce_fillin) __((struct threadgroup*, struct config_entry *)); { struct threadgroup *thg; struct thread *thr; /* int matched = 0; */ int thg_once; struct web *wc = vp->orig[L_CHANNEL]; struct web *wh = vp->orig[L_HOST]; mytime(&now); if (verbose) sfprintf(sfstderr,"thread_linkin([%s/%s],%s/%d/%s,%d)\n", wc->name, wh->name, cep->channel, cep->flags & CFG_WITHHOST, cep->host, cfgid); /* char const *vp_chan = wc->name; */ /* char const *vp_host = wh->name; */ if (thrg_root == NULL) create_threadgroup(cep,wc,wh,cep->flags & CFG_WITHHOST,ce_fillin); /* * Search for matching config-entry, AND matching channel, * AND matching host (depending how the thread-group formation * is allowed to happen..) * */ for (thg = thrg_root, thg_once = 1; thg && (thg_once || thg != thrg_root); thg = thg->nextthg, thg_once = 0) { int thr_once; if (thg->cep != cep) /* Config entries don't match */ continue; if (thg->wchan != wc) /* Channels don't match */ continue; if (thg->withhost) { if (thg->whost != wh) /* Tough, must have host match! */ continue; } /* The config-entry matches, we have changes to match group */ for (thr = thg->thread, thr_once = 1; thr && (thr_once || (thr != thg->thread)); thr = thr->nextthg, thr_once = 0) { #if 0 if (!thr->vertex) abort(); /* No vertices ?? */ /* no need ? (Channels within a group are identical..) */ if (wc != thr->wchan) abort(); #endif /* Nice! What about host ? */ if (wh != thr->whost) continue; /* We have matching channel, AND matching host */ /* Link the vertex into this thread! */ if (verbose) sfprintf(sfstderr,"thread_linkin() to thg=%p[%s/%d/%s]; added into existing thread [%s/%s] thr->jobs=%d\n", thg,cep->channel,thg->withhost,cep->host, wc->name,wh->name,thr->jobs+1); _thread_linktail(thr,vp); vp->thgrp = thg; thr->jobs += 1; if (thr->proc) /* Caring about the UF count while running */ thr->unfed += 1; if (thr->proc && (thr->nextfeed == NULL)) { /* It is running, but no nextfeed is set (anymore), tackle this vertex into the tail */ thr->nextfeed = vp; } /* Hookay.. Try to start it too... */ thread_start_(thr); return; } /* No matching thread, however this GROUP matches (or does it?) */ /* Add a new thread into this group */ thr = create_thread(thg,vp,cep); vp->thgrp = thg; if (verbose) sfprintf(sfstderr,"thread_linkin() to thg=%p[%s/%d/%s]; created a new thread %p [%s/%s]\n", thg,cep->channel,thg->withhost,cep->host, thr,wc->name,wh->name); /* Try to start it too */ thread_start_(thr); return; } /* Add a new group - and its thread .. */ thg = create_threadgroup(cep, wc, wh, cep->flags & CFG_WITHHOST, ce_fillin); thr = create_thread(thg,vp,cep); vp->thgrp = thg; /* Try to start it too */ thread_start_(thr); } struct web * web_findcreate(flag, s) int flag; const char *s; { struct spblk *spl; struct web *wp; spkey_t spk; /* caller has done 'strlower()' to our input.. */ spk = symbol_db(s, spt_mesh[flag]->symbols); spl = sp_lookup(spk, spt_mesh[flag]); if (spl == NULL || (wp = (struct web *)spl->data) == NULL) { /* Not found, create it */ wp = (struct web *)emalloc(sizeof (struct web)); memset((void*)wp, 0, sizeof (struct web)); sp_install(spk, (void *)wp, 0, spt_mesh[flag]); wp->name = strsave(s); wp->kids = 0; wp->link = NULL; wp->linkcnt = 0; } if (spl != NULL) wp = (struct web*)spl->data; return wp; } /* * Deallocate a web entry (host or channel vertex header structure). */ void unweb(flag, wp) int flag; struct web *wp; { struct spblk *spl = NULL; spkey_t spk; if (verbose) sfprintf(sfstderr,"unweb(flag=%d wp=%p); linkcnt=%d kids=%d\n", flag,wp,wp->kids,wp->linkcnt); if (wp->linkcnt > 0) /* Yet objects holding it */ return; if (wp->kids > 0) /* too early to actually remove it */ return; spk = symbol_lookup_db((u_char *)wp->name, spt_mesh[flag]->symbols); if ((spkey_t)0 == spk) /* Not in the symbol table */ return; spl = sp_lookup(spk, spt_mesh[flag]); if (spl != NULL) /* Should always have this ... */ sp_delete(spl, spt_mesh[flag]); symbol_free_db((u_char *)wp->name, spt_mesh[flag]->symbols); free(wp->name); memset(wp, 0x55, sizeof(*wp)); free((char *)wp); } /* * unthread(vtx) -- detach this vertex from its thread */ static void unthread __((struct thread *thr, struct vertex *vtx)); static void unthread(thr, vtx) struct thread *thr; struct vertex *vtx; { if (vtx->previtem != NULL) vtx->previtem->nextitem = vtx->nextitem; if (vtx->nextitem != NULL) vtx->nextitem->previtem = vtx->previtem; if (thr) { thr->jobs -= 1; if (thr->nextfeed == vtx) thr->nextfeed = thr->nextfeed->nextitem; if (thr->thvertices == vtx) thr->thvertices = vtx->nextitem; if (thr->lastthvertex == vtx) thr->lastthvertex = vtx->previtem; } vtx->nextitem = NULL; vtx->previtem = NULL; } /* * Detach the vertex from its chains * * If here is a process, limbo it! */ void web_detangle(vp, ok) struct vertex *vp; int ok; { /* If it was in processing, remove process node binding. We do this only when we have reaped the channel program. */ struct thread *thr = vp->thread; /* unthread() will also unpick the nextfeed link.. */ unthread(thr, vp); /* The thread can now be EMPTY! */ if (thr && (thr->thvertices == NULL)) delete_thread(thr); } static int vtx_mtime_cmp __((const void *, const void *)); static int vtx_mtime_cmp(ap, bp) const void *ap, *bp; { const struct vertex **a = (const struct vertex **)ap; const struct vertex **b = (const struct vertex **)bp; if ((*a)->cfp->mtime < (*b)->cfp->mtime) return -1; else if ((*a)->cfp->mtime == (*b)->cfp->mtime) return 0; else return 1; } static void thread_vertex_shuffle(thr) struct thread *thr; { register struct vertex *vp; register int n, i, ni; static u_int ur_size = 0; static struct vertex **ur_arr = NULL; /* Randomize the order of vertices in processing, OR sort them by spool-file MTIME, if the thread has AGEORDER -flag set. */ /* 1) Create storage array for the vertex re-arrange */ if (ur_size == 0) { ur_size = 100; ur_arr = (struct vertex **) emalloc(ur_size * sizeof (struct vertex *)); } /* 2) Store the vertices into a re-arrange array (and count) */ for (n = 0, vp = thr->thvertices; vp != NULL; vp = vp->nextitem) { if (n >= ur_size) { ur_size *= 2; ur_arr = (struct vertex **)realloc((char *)ur_arr, ur_size * sizeof (struct vertex *)); } ur_arr[n++] = vp; } /* 3) re-arrange pointers */ if (thr->thgrp->ce.flags & CFG_AGEORDER) { /* mtime order */ if (n > 1) qsort((void*)ur_arr, n, sizeof(struct vertex *), vtx_mtime_cmp); } else /* Random order */ for (i = 0; i < n; ++i) { ni = ranny(n-1); vp = ur_arr[i]; ur_arr[i] = ur_arr[ni]; ur_arr[ni] = vp; } /* 4) Relink previtem/nextitem pointers */ for (i = 0; i < n; ++i) { if (i > 0) ur_arr[i]->previtem = ur_arr[i-1]; if (i < (n-1)) ur_arr[i]->nextitem = ur_arr[i+1]; #if 1 /* 4c) Clear wakeup timer; the feed_child() will refuse to feed us, if this one is not cleared.. */ ur_arr[i]->wakeup = 0; #endif } ur_arr[ 0]->previtem = NULL; ur_arr[n-1]->nextitem = NULL; /* 5) Finish the re-arrangement by saving the head, and tail pointers */ thr->thvertices = ur_arr[ 0]; thr->nextfeed = ur_arr[ 0]; thr->lastthvertex = ur_arr[n-1]; thr->unfed = n; } static int thread_start_(thr) struct thread *thr; { struct config_entry *ce = &(thr->thgrp->ce); if (thr->proc != NULL) { /* There is *somebody* active! Shall we start, or not ? */ if (ce->flags & CFG_WAKEUPRESTARTONLY) return 0; } return thread_start(thr, 0); } /* * thread_start() -- start the thread, if: * - if the thread is not already running * - thread-group has idle processor (feed immediately) * - if no resource limits are exceeded for starting it * * Return non-zero, if did start something. */ int thread_start(thr, queueonly_too) struct thread *thr; int queueonly_too; { int rc; struct vertex *vp = thr->thvertices; struct threadgroup *thg = thr->thgrp; struct config_entry *ce = &(thr->thgrp->ce); struct web *ho; struct web *ch; queryipccheck(); if (!thr->thrkids && !thr->jobs) { /* Cleanup when no processes, nor vertices */ delete_thread(thr); return 0; } if (syncstart || (freeze && !slow_shutdown)) return 0; if (!queueonly_too && (ce->flags & CFG_QUEUEONLY)) return 0; ho = vp->orig[L_HOST]; ch = vp->orig[L_CHANNEL]; if (procselect) { thr->pending = "procsel-mismatch"; if (*procselect != '*' && strcmp(procselect,ch->name) != 0) return 0; if (*procselhost != '*' && strcmp(procselhost,ho->name) != 0) return 0; } thr->pending = NULL; if (verbose) sfprintf(sfstderr,"thread_start(thr=%s/%d/%s) (dt=%d thr=%p jobs=%d)\n", ch->name, thg->withhost, ho->name, (int)(thr->wakeup-now), thr, thr->jobs); if ((thr->thrkids >= ce->maxkidThread) || /* FIXME: real unfed count ? */ (thr->proc && (thr->thrkids >= thr->unfed))) { if (verbose) { struct procinfo * proc = thr->proc; sfprintf(sfstderr," -- already running; thrkids=%d jobs=%d procs={ %p", thr->thrkids, thr->jobs, proc); proc = proc->pnext; while (proc) { sfprintf(sfstderr, " %p", proc); proc = proc->pnext; } sfprintf(sfstderr, " }\n"); } return 0; /* Already running */ } re_pick: if (thg->idleproc) { struct procinfo *proc; /* There is at least one.. */ proc = thg->idleproc; /* Idle processor(s) exists, try to optimize by finding an idle proc with matching channel & host from previous activity. If not found, pick any with matching CHANNEL, unless must have also matching HOST... */ for (; proc && (proc->ho != ho || proc->ch != ch); proc = proc->pnext) ; if (!proc && !thg->withhost) { /* None of the previous ones matched, pick with matching CHANNEL, HOST is allowed to wild-card.. */ proc = thg->idleproc; for (; proc && (proc->ch != ch); proc = proc->pnext) ; } if (!proc) goto create_new; /* Selected one of them.. */ if (proc->pprev) proc->pprev->pnext = proc->pnext; if (proc->pnext) proc->pnext->pprev = proc->pprev; if (thg->idleproc == proc) thg->idleproc = proc->pnext; proc->pnext = proc->pprev = NULL; thg->idlecnt -= 1; --idleprocs; /* Move to ACTIVE state */ MIBMtaEntry->sc.TransportAgentsActiveSc += 1; MIBMtaEntry->sc.TransportAgentsIdleSc -= 1; /* It may be that while we idled it, it died at the idle queue.. */ if (proc->pid <= 0 || proc->tofd < 0) { /* sfprintf(sfstderr, "%% thread_start(thr=%s/%d/%s) (proc=%p ->pid=%d ->tofd=%d)\n", ch->name, thg->withhost, ho->name, proc, proc->pid, proc->tofd); */ goto re_pick; } /* Thread-groups are made such that here at thread_start() we can always switch over in between threads */ if (proc->ho != NULL && proc->ho != ho) { /* Get rid of the old host web */ proc->ho->kids -= 1; unweb(L_HOST,proc->ho); proc->ho = NULL; } /* Move the kid to this host web */ if (proc->ho != ho) { proc->ho = ho; proc->ho->kids += 1; } /* In theory the CHANNEL could be different -- in practice NOT! */ proc->ch = ch; /* MULTI-TA-PER-THREAD -- only the first proc inits feed-state */ if (! thr->proc ) { /* Randomize the order of thread vertices (or sort by spool file mtime, if in AGEORDER..) */ /* Also init thr->nextfeed */ thread_vertex_shuffle(thr); } /* Its idle process, feed it! */ proc->state = CFSTATE_LARVA; proc->overfed = 1; /* A simulated state.. */ proc->pthread = thr; if (thr->proc) proc->pnext = thr->proc; if (proc->pnext) proc->pnext->pprev = proc; thr->proc = proc; thr->thrkids += 1; if (verbose) sfprintf(sfstderr, "%% thread_start(thr=%s/%d/%s) (proc=%p dt=%d thr=%p jobs=%d)\n", ch->name, thg->withhost, ho->name, thr->proc, (int)(thr->wakeup-now), thr, thr->jobs); ta_hungry(proc); return 1; } create_new: /* Check resource limits - MaxTa, MaxChan, MaxThrGrpTa */ /* If resources are exceeded, reschedule.. */ vp = thr->thvertices; if (numkids >= ce->maxkids) { vp->ce_pending = SIZE_L; thr->pending = ">MaxTA"; } else if (vp->orig[L_CHANNEL]->kids >= ce->maxkidChannel) { vp->ce_pending = L_CHANNEL; thr->pending = ">MaxChannel"; } else if (thg->transporters >= ce->maxkidThreads) { vp->ce_pending = L_HOST; thr->pending = ">MaxRing"; } else if (thr->thrkids >= ce->maxkidThread) { vp->ce_pending = SIZE_L; thr->pending = ">MaxThr"; } else { vp->ce_pending = 0; thr->pending = NULL; } if (vp->ce_pending) { if (verbose) sfprintf(sfstderr,"%s: (%d %dC %dT %dTh) >= (%d %dC %dT %dTh)\n", ce->command, numkids, vp->orig[L_CHANNEL]->kids, thg->transporters, thr->thrkids, ce->maxkids, ce->maxkidChannel, ce->maxkidThreads, ce->maxkidThread); /* * Would go over limit. Rescheduling for the next * (single) interval works ok in many situation. * However when the scheduler is very busy one can * run into systemic problems with some set of messages * blocking another set of messages. The only way * around that is a roundrobin scheme, implemented * by the fifo nature of the thread scheduling. */ reschedule(vp, 0, -1); return 0; } /* Now we are ready to start a new child to run our bits */ if (! thr->proc ) { /* MULTI-TA-PER-THREAD -- first proc inits the feed-state */ /* Randomize the order of thread vertices (or sort by spool file mtime, if in AGEORDER..) */ /* Also init thr->nextfeed */ thread_vertex_shuffle(thr); } rc = start_child(thr->thvertices, thr->thvertices->orig[L_CHANNEL], thr->thvertices->orig[L_HOST]); if (thr->proc && verbose) sfprintf(sfstderr,"%% thread_start(thr=%s/%d/%s) (proc=%p dt=%d thr=%p jobs=%d)\n", ch->name, thg->withhost, ho->name, thr->proc, (int)(thr->wakeup-now), thr, thr->jobs); return rc; } /* * pick_next_vertex() -- pick next free to process vertex in this thread * * This is called *only* by feed_child(), and proc->vertex directs * then the caller of feed_child() to tune the process state. * (From STUFFING to FINISHING and possibly to IDLE.) * * - if (proc->pthread->nextfeed != NULL) ...nextfeed = ...nextfeed->nextitem; * - return (...nextfeed != NULL); * */ /* Return 0 for errors, 1 for success; result is at ...nextfeed */ int pick_next_vertex(proc) struct procinfo *proc; { struct thread * thr = proc->pthread; struct vertex * vtx = NULL; if (thr) vtx = thr->nextfeed; if (verbose) sfprintf(sfstderr,"pick_next_vertex(proc=%p) proc->tofd=%d, thr=%p, pvtx=%p, jobs=%d OF=%d S=%s\n", proc, proc->tofd, thr, vtx, thr ? thr->jobs : 0, proc->overfed, proc_state_names[proc->state]); if (proc->pid < 0 || proc->tofd < 0) { /* "He is dead, Jim!" */ if (verbose) sfprintf(sfstderr," ... NONE, 'He is dead, Jim!'\n"); return 0; } if (vtx) /* Pick next item */ thr->nextfeed = vtx = vtx->nextitem; return (vtx != NULL); } /* * The thread_expire2() will handle exceedingly old things * with ages in excess of expire+expire2 (seconds) in queue * even if no successfull delivery attempt has been made. * * Return the kill-count. * * Side-effect warning: * Afterwards the THR may point to nonexistent object! */ int thread_expire2(thr, timelimit, killall, msgstr) struct thread *thr; time_t timelimit; int killall; /* later uses in mind.. now dummy parameter */ const char *msgstr; /* ... likewise. */ { int killcount = 0; struct vertex *vtx = thr->thvertices; struct vertex *nextvtx; for ( ;vtx; vtx = nextvtx) { int expire_this = 0; nextvtx = vtx->nextitem; /* Time to expire ? */ if (vtx->ce_expiry > 0 && vtx->ce_expiry <= now && vtx->attempts > 0) { expire_this = 1; } if (vtx->ce_expiry2 > 0 && vtx->ce_expiry2 <= now) { expire_this = 1; } if (expire_this) { /* ... and now expire it! */ /* this MAY invalidate also the THREAD object! */ expire(vtx, -1); /* ... them all. */ ++killcount; mytime(&now); if (now > timelimit) break; } } return killcount; } /* * The thread_reschedule() updates threads time-chain to match the * new value of wakeup for the doagenda() to later use. * Return 0 for DESTROYED thread, 1 for EXISTING thread. */ int thread_reschedule(thr, retrytime, index) struct thread *thr; int index; time_t retrytime; { struct vertex *vtx = thr->thvertices; struct vertex *nvtx; time_t wakeup = 0; int skew; if (verbose) sfprintf(sfstderr,"thread_reschedule() ch=%s ho=%s jobs=%d thr=%p proc=%p\n", thr->channel,thr->host,thr->jobs,thr,thr->proc); if (!thr->thrkids && !thr->jobs) { delete_thread(thr); return 0; } /* If there are multiple kids working still, DON'T reschedule! */ if ((thr->thrkids > 0) || (vtx == NULL)) return 1; /* find out when to retry */ mytime(&now); /* if we are already scheduled for the future, don't reschedule */ if (vtx->wakeup > now) { thr->wakeup = vtx->wakeup; if (verbose) sfprintf(sfstderr,"...prescheduled\n"); goto timechain_handling; } else if (vtx->wakeup < now-7200 /* more than 2h in history .. */ ) vtx->wakeup = now; if (vtx->thgrp->ce.nretries <= 0) { if (verbose) sfprintf(sfstderr,"...ce->retries = %d\n", vtx->thgrp->ce.nretries); goto timechain_handling; } if (thr->retryindex >= vtx->thgrp->ce.nretries) { if (vtx->thgrp->ce.nretries > 1) thr->retryindex = ranny(vtx->thgrp->ce.nretries-1); else thr->retryindex = 0; } /* * clamp retry time to a predictable interval so we * eventually bunch up deliveries. */ if (retrytime > 100000 && retrytime < now+63) vtx->wakeup = now; #if 0 skew = vtx->wakeup % vtx->thgrp->ce.interval; if (skew <= vtx->thgrp->ce.interval / 2) skew = - (skew + (vtx->thgrp->ce.skew - 1)); else skew = skew + (vtx->thgrp->ce.skew - 1); skew = skew / vtx->thgrp->ce.skew; /* want int div truncation */ vtx->wakeup += (skew + vtx->thgrp->ce.retries[thr->retryindex] * vtx->thgrp->ce.interval); #else /* Actually we do NOT want to have synchronization of threads, as such causes simultaneous start of transporters, which causes "somewhat" spiky load behaviour */ skew = vtx->thgrp->ce.retries[thr->retryindex] * vtx->thgrp->ce.interval; if (retrytime <= 100000 && (int)retrytime > skew) skew = retrytime; vtx->wakeup += skew; #endif thr->retryindex++; /* If history, move forward by ``ce.interval'' multiple */ if (vtx->wakeup < now) vtx->wakeup += ((((now - vtx->wakeup) / vtx->thgrp->ce.interval)+1) * vtx->thgrp->ce.interval); wakeup = vtx->wakeup; if (retrytime < now+63) retrytime = wakeup; /* Reschedule ALL vertices on this thread */ for ( ;vtx; vtx = nvtx) { int expire_this = 0; nvtx = vtx->nextitem; /* Time to expire ? */ if (vtx->ce_expiry > 0 && vtx->ce_expiry <= now && vtx->attempts > 0) { expire_this = 1; } if (vtx->ce_expiry2 > 0 && vtx->ce_expiry2 <= now) { expire_this = 1; } if (expire_this) { /* ... and now expire it! */ /* this MAY invalidate also the THREAD object! */ if (thr->jobs > 1) { expire(vtx, index); } else { expire(vtx, index); thr = NULL; /* The THR-pointed object is now invalid */ } continue; } /* Didn't expire, so time to tune the wakeup ... */ if (vtx->wakeup < retrytime) vtx->wakeup = retrytime; if (wakeup > vtx->wakeup || wakeup == 0) wakeup = vtx->wakeup; } if (thr != NULL) thr->wakeup = wakeup; timechain_handling: /* In every case the rescheduling means we move this thread to the end of the thread_head chain.. */ if (thr != NULL) { _thread_timechain_unlink(thr); _thread_timechain_append(thr); } return (thr != NULL); } /* * reschedule() operates WITHIN a thread, but does *not* move things! * */ void reschedule(vp, factor, index) struct vertex *vp; int factor, index; { int skew; struct thread *thr = vp->thread; struct threadgroup *thg = vp->thgrp; struct config_entry *ce = &(thg->ce); /* Hmm.. The reschedule() is called only when we have a reason to call it, doesn't it ? */ /* find out when to retry */ mytime(&now); if (verbose) sfprintf(sfstderr,"reschedule %p now %d expiry in %d attempts %d factor %d inum %d (%s/%s: %s)\n", vp, (int)now, (int)((vp->ce_expiry > 0) ? (vp->ce_expiry - now) : -999), vp->attempts, factor, (int)(vp->cfp->id), vp->orig[L_CHANNEL]->name, vp->orig[L_HOST]->name, vp->cfp->mid); /* if we are already scheduled for the future, don't reschedule */ if (vp->wakeup > now) { if (verbose) sfprintf(sfstderr,"prescheduled\n"); return; } else if (vp->wakeup < now-7200 /* more than 2h .. */ ) vp->wakeup = now; if (ce->nretries <= 0) { if (verbose) sfprintf(sfstderr,"ce->retries = %d\n", ce->nretries); return; } if (factor == -1 && vp->attempts) { if (thr->retryindex >= ce->nretries) { if (ce->nretries > 1) thr->retryindex = ranny(ce->nretries-1); else thr->retryindex = 0; } /* * clamp retry time to a predictable interval so we * eventually bunch up deliveries. */ skew = vp->wakeup % ce->interval; if (skew <= ce->interval / 2) skew = - (skew + (ce->skew - 1)); else skew = skew + (ce->skew - 1); skew = skew / ce->skew; /* want int div truncation */ vp->wakeup += (skew + ce->retries[thr->retryindex] * ce->interval); thr->retryindex++; } else if (factor < -1) { vp->wakeup = -factor; } else vp->wakeup += factor * ce->interval; /* I THINK there could be an assert that if this happens, something is WRONG.. */ if (vp->attempts == 0) vp->wakeup = now; /* XX: change this to a mod expression */ if (vp->wakeup < now) vp->wakeup = ((((now - vp->wakeup) / ce->interval)+1) * ce->interval) + 10 + 2*thr->jobs; /* Makes sure that next future event is at +10+2*jobcount seconds in the future.. A kludge approach, but still.. */ if (vp->ce_expiry > 0 && vp->ce_expiry <= vp->wakeup && vp->attempts > 0) { if (verbose) sfprintf(sfstderr,"ce_expiry = %d, %d attempts\n", (int)(vp->ce_expiry), vp->attempts); /* expire() will delete this vertex in due time */ expire(vp, index); return; } } /* * With the idle_cleanup() we clean up idle processes, that have * been idle for too long (idlemax=nnnns) * * Because during its progress the thread-groups can disappear, * (as is one of its mandates) this code looks a bit peculiar.. */ int idle_cleanup() { /* global: time_t now */ struct threadgroup *thg, *nthg; int thg_once; int freecount = 0; mytime(&now); if (verbose) sfprintf(sfstderr,"idle_cleanup()\n"); if (!thrg_root) return 0; /* No thread group! */ for (thg = thrg_root, thg_once = 1; thg_once || (thg != thrg_root); thg = nthg, thg_once = 0) { nthg = thg->nextthg; if (thg->thread != NULL) { struct procinfo *p; struct thread *thr, *nthr; int thr_once; /* Clean-up faulty client -- KLUDGE :-( -- OF=0, HA > much */ for (thr = thg->thread, thr_once = 1; thr && (thr_once || (thr != thg->thread)); thr = nthr, thr_once = 0) { nthr = thr->nextthg; p = thr->proc; if (thr->thgrp != thg) /* Not of this group ? */ continue; /* Next! */ if (!p) /* No process */ continue; if ((p->cmdlen == 0) && (p->overfed == 0) && (p->tofd >= 0) && (p->hungertime != 0) && (p->hungertime + MAX_HUNGER_AGE <= now)) { /* Close the command channel, let it die itself. Rest of the cleanup happens via mux() service. */ if (verbose) sfprintf(sfstderr,"idle_cleanup() killing TA on tofd=%d pid=%d\n", p->tofd, (int)p->pid); thr->wakeup = now-1; /* reschedule immediately! */ write(p->tofd,"\n",1); /* XXXX: should this be removed ?? */ pipes_shutdown_child(p->tofd); p->tofd = -1; ++freecount; /* Reclaim will (in due time) detect dead child, and decrement child counters. */ zsyslog((LOG_ERR, "ZMailer scheduler kludge shutdown of TA channel (info for debug only); %s/%s/%d HA=%ds", thr->channel, thr->host, thr->thgrp->withhost, (int)(now - p->hungertime))); } } } if (thg->idleproc) { struct procinfo *p; p = thg->idleproc; while (p != NULL) { if ((thg->cep->idlemax + p->hungertime < now) && (p->cmdlen == 0) && (p->tofd >= 0)) { /* It is old enough -- ancient, one might say.. */ /* Close the command channel, let it die itself. Rest of the cleanup happens via mux() service. */ if (verbose) sfprintf(sfstderr,"idle_cleanup() killing TA on tofd=%d pid=%d\n", p->tofd, (int)p->pid); write(p->tofd,"\n",1); pipes_shutdown_child(p->tofd); p->tofd = -1; ++freecount; } /* Move to the next possible idle process */ p = p->pnext; } } /* If there are no threads, nor transporters, delete the thg */ delete_threadgroup(thg); } return freecount; } static time_t oldest_age_on_thread __((struct thread *)); static time_t oldest_age_on_thread(th) /* returns the AGE in seconds.. */ struct thread *th; { register time_t oo = now+1; register struct vertex *vp; vp = th->thvertices; while (vp) { if (vp->cfp->mtime < oo) oo = vp->cfp->mtime; vp = vp->nextitem; } return (now - oo); } void thread_report(fp,mqmode) Sfio_t *fp; int mqmode; { struct threadgroup *thg; int thg_once = 1; int jobsum = 0, jobtotal = 0; int threadsum = 0; char timebuf[20]; int width; int cnt, procs, thrkidsum; int rcptsum = 0; struct procinfo *p; struct thread *thr; int spc = (mqmode & MQ2MODE_FULL) ? ' ' : '\t'; mytime(&now); #if 0 if (thrg_root == NULL) { *timebuf = 0; saytime((long)(now - sched_starttime), timebuf, 1); sfprintf(fp,"No threads/processes. Uptime: %s\n",timebuf); return; } #endif for (thg = thrg_root; thg && (thg_once || thg != thrg_root); thg = thg->nextthg) { int thr_once; thg_once = 0; if (mqmode & (MQ2MODE_FULL | MQ2MODE_QQ)) { sfprintf(fp,"%s/%s/%d\n", thg->cep->channel, thg->cep->host, thg->withhost); } cnt = 0; procs = 0; jobsum = 0; thrkidsum = 0; #if 1 /* XX: zero for verifying of modified system; turn to 1 for running! */ /* We scan thru the local ring of threads */ for (thr = thg->thread, thr_once = 1; thr && (thr_once || (thr != thg->thread)); thr = thr->nextthg, thr_once = 0) #else /* We scan there in start order from the thread_head chain! */ for (thr = thread_head; thr != NULL; thr = thr->nexttr) #endif { if (thr->thgrp != thg) /* Not of this group ? */ continue; /* Next! */ { struct vertex *vp = thr->thvertices; while (vp != NULL) { rcptsum += vp->ngroup; vp = vp->nextitem; } } if (mqmode & MQ2MODE_FULL2) { width = sfprintf(fp,"%s\t%s\t", /* thr->thvertices->orig[L_CHANNEL]->name */ thr->channel, /* thr->thvertices->orig[L_HOST]->name */ thr->host); } else if (mqmode & MQ2MODE_FULL) { width = sfprintf(fp," %s/%s/%d", /* thr->thvertices->orig[L_CHANNEL]->name */ thr->channel, /* thr->thvertices->orig[L_HOST]->name */ thr->host, thr->thgrp->withhost); if (width < 0) break; /* error.. */ width += 7; if (width < 16-1) sfprintf(fp,"\t"); if (width < 24-1) sfprintf(fp,"\t"); if (width < 32-1) sfprintf(fp,"\t"); if (width < 40-1) sfprintf(fp,"\t"); else sfprintf(fp," "); } jobsum += thr->jobs; if (mqmode & MQ2MODE_FULL) sfprintf(fp,"R=%-3d A=%-2d", thr->jobs, thr->attempts); if (mqmode & MQ2MODE_FULL2) sfprintf(fp,"R=%d\tA=%d", thr->jobs, thr->attempts); ++cnt; if (thr->proc != NULL && thr->proc->pthread == thr) { int thrprocs = 0; struct procinfo *proc; for (proc = thr->proc; proc; proc = proc->pnext) { ++procs; ++thrprocs; ++thrkidsum; } if (mqmode & (MQ2MODE_FULL|MQ2MODE_FULL2)) { proc = thr->proc; if (thr->thrkids != thrprocs) sfprintf(fp, "%cKids=%d/%d", spc, thr->thrkids, thrprocs); sfprintf(fp, "%cP={", spc); while (proc) { sfprintf(fp, "%d", (int)proc->pid); if (proc->pnext) sfprintf(fp, ","); proc = proc->pnext; } sfprintf(fp, "}"); proc = thr->proc; sfprintf(fp, "%cHA={", spc); while (proc) { sfprintf(fp, "%d", (int)(now - proc->hungertime)); if (proc->pnext) sfprintf(fp, ","); proc = proc->pnext; } sfprintf(fp, "}s"); proc = thr->proc; sfprintf(fp, "%cFA={", spc); while (proc) { if (proc->feedtime == 0) sfprintf(fp, "never"); else sfprintf(fp, "%d", (int)(now - proc->feedtime)); if (proc->pnext) sfprintf(fp, ","); proc = proc->pnext; } sfprintf(fp, "}s"); proc = thr->proc; sfprintf(fp, "%cOF={", spc); while (proc) { sfprintf(fp, "%d", proc->overfed); if (proc->pnext) sfprintf(fp, ","); proc = proc->pnext; } sfprintf(fp, "}"); proc = thr->proc; sfprintf(fp, "%cS={", spc); while (proc) { sfprintf(fp, "%s", proc_state_names[proc->state]); if (proc->pnext) sfprintf(fp, ","); proc = proc->pnext; } sfprintf(fp, "}"); sfprintf(fp, "%cUF=%d", spc, thr->unfed); } } else if (thr->wakeup > now) { if (mqmode & (MQ2MODE_FULL|MQ2MODE_FULL2)) { sfprintf(fp,"%cW=%ds", spc, (int)(thr->wakeup - now)); } } else if (thr->pending) { if (mqmode & (MQ2MODE_FULL|MQ2MODE_FULL2)) { sfprintf(fp,"%cpend=%s", spc, thr->pending); } } if (mqmode & (MQ2MODE_FULL|MQ2MODE_FULL2)) { *timebuf = 0; saytime((long)oldest_age_on_thread(thr), timebuf, 1); sfprintf(fp, "%cQA=%s", spc, timebuf); if (thr->thvertices && thr->thvertices->ce_pending) if (thr->thvertices->ce_pending != SIZE_L && spc == ' ') sfprintf(fp, "%s", (thr->thvertices->ce_pending == L_CHANNEL ? " channelwait" : " threadwait")); sfprintf(fp, "\n"); } } if (mqmode & (MQ2MODE_FULL | MQ2MODE_QQ)) { sfprintf(fp,"\tThreads: %4d",thg->threads); if (thg->threads != cnt) sfprintf(fp,"/%d",cnt); } cnt = 0; for (p = thg->idleproc; p != 0; p = p->pnext) ++cnt; procs += cnt; if (mqmode & (MQ2MODE_FULL | MQ2MODE_QQ)) { sfprintf(fp, " Msgs: %5d Procs: %3d", jobsum, thg->transporters); if (thg->transporters != procs) sfprintf(fp,"/%d",procs); sfprintf(fp," Idle: %3d",thg->idlecnt); if (thg->idlecnt != cnt) sfprintf(fp, "/%d", cnt); sfprintf(fp, " Plim: %3d Flim: %3d Tlim: %d\n", thg->ce.maxkidThreads, thg->ce.overfeed, thg->ce.maxkidThread); } jobtotal += jobsum; threadsum += thg->threads; } if (mqmode & (MQ2MODE_FULL | MQ2MODE_QQ | MQ2MODE_SNMP)) { long files; *timebuf = 0; saytime((long)(now - sched_starttime), timebuf, 1); sfprintf(fp,"Kids: %d Idle: %2d Msgs: %3d Thrds: %3d Rcpnts: %4d Uptime: ", numkids, idleprocs, global_wrkcnt, threadsum, jobtotal); if (mqmode & MQ2MODE_SNMP) sfprintf(fp, "%ld sec\n",(long)(now - sched_starttime)); else sfprintf(fp, "%s\n",timebuf); sfprintf(fp, "Msgs in %lu out %lu stored %ld ", (u_long)MIBMtaEntry->sc.ReceivedMessagesSc, (u_long)MIBMtaEntry->sc.TransmittedMessagesSc, (long)MIBMtaEntry->sc.StoredMessagesSc); files = thread_count_files(); if ((long)MIBMtaEntry->sc.StoredMessagesSc != files) sfprintf(fp, "(%ld) ", files); sfprintf(fp, "Rcpnts in %lu out %lu stored %ld", (u_long)MIBMtaEntry->sc.ReceivedRecipientsSc, (u_long)MIBMtaEntry->sc.TransmittedRecipientsSc, (long)MIBMtaEntry->sc.StoredRecipientsSc); if (rcptsum != MIBMtaEntry->sc.StoredRecipientsSc) sfprintf(fp, " (%d)", rcptsum); sfprintf(fp, "\n"); } sfsync(fp); } void thread_detail_report(fp,mqmode,channel,host) Sfio_t *fp; int mqmode; char *channel, *host; { struct thread *th; spkey_t spk; struct spblk *spl; struct web *wh, *wc; struct vertex *vp; int i; char buf[100]; mytime(&now); spk = symbol_lookup_db((void*)channel, spt_mesh[L_CHANNEL]->symbols); spl = sp_lookup(spk, spt_mesh[L_CHANNEL]); if (spl == NULL || spl->data == NULL) { /* Not found, nothing to do.. */ return; } wc = (struct web *)spl->data; spk = symbol_lookup_db((void*)host, spt_mesh[L_HOST]->symbols); spl = sp_lookup(spk, spt_mesh[L_HOST]); if (spl == NULL || spl->data == NULL) { /* Not found, nothing to do.. */ return; } wh = (struct web *)spl->data; for (th = thread_head; th; th = th->nexttr) { if (wh == th->whost && wc == th->wchan) { break; } } if (th) { /* Found it! */ for (vp = th->thvertices; vp; vp = vp->nextitem) { struct ctlfile *cfp = vp->cfp; for (i = 0; i < vp->ngroup; ++i) { /* Spoolfile */ sfprintf(fp, "%s%s", cfpdirname(cfp->dirind), cfp->mid); /* How manyth in a group ? */ sfprintf(fp, "\t%d", i); /* Sender index -- or sender address */ sfprintf(fp, "\t%s", cfp->erroraddr); /* Recipient offset */ sfprintf(fp,"\t%d", cfp->offset[vp->index[i]]); /* Expiry stamp */ sfprintf(fp,"\t%ld", (long)vp->ce_expiry); /* next wakeup */ sfprintf(fp,"\t%ld", (long)vp->wakeup); /* last feed time */ sfprintf(fp,"\t%ld", (long)vp->lastfeed); /* attempts */ sfprintf(fp,"\t%d\t", vp->attempts); /* ce_pending */ if (vp->wakeup > now) { *buf = 0; saytime((long)(vp->wakeup - now), buf, 1); sfprintf(fp,"retry in %s", buf); } else { switch(vp->ce_pending) { case SIZE_L: /* BAD! */ break; case L_CHANNEL: sfprintf(fp,"channel"); break; default: sfprintf(fp,"thread"); break; } } /* message - if any */ sfprintf(fp, "\t"); if (vp->message) sfprintf(fp,"%s", vp->message); sfprintf(fp, "\n"); } } } sfsync(fp); } int thread_count_recipients() { struct threadgroup *thg; struct thread *thr; int thg_once; int jobtotal = 0; if (thrg_root == NULL) return 0; if (thrg_root) for (thg = thrg_root, thg_once = 1; thg_once || thg != thrg_root; thg = thg->nextthg, thg_once = 0) { int thr_once; int jobsum = 0; if (thg->thread) for (thr = thg->thread, thr_once = 1; thr_once || (thr != thg->thread); thr = thr->nextthg, thr_once = 0) { jobsum += thr->jobs; } jobtotal += jobsum; } return jobtotal; } static int thread_files_count; static int spl_thread_cnt_files __((struct spblk *spl)); static int spl_thread_cnt_files(spl) struct spblk *spl; { ++thread_files_count; return 0; } int thread_count_files __((void)) { thread_files_count = 0; sp_scan(spl_thread_cnt_files, NULL, spt_mesh[L_CTLFILE]); return thread_files_count; }