/* * Copyright 1988 by Rayan S. Zachariassen, all rights reserved. * This will be free software, but only when it is finished. */ /* * Lots of modifications (new guts, more or less..) by * Matti Aarnio (copyright) 1992-2003 */ /* * ZMailer transport scheduler. */ #include #include #include #include "hostenv.h" #include #include #include #include #include "mail.h" #include #include "ta.h" #include "sysexits.h" #include "libc.h" #ifdef HAVE_UNISTD_H #include #endif #ifdef HAVE_DIRENT_H # include #else /* not HAVE_DIRENT_H */ # define dirent direct # ifdef HAVE_SYS_NDIR_H # include # endif /* HAVE_SYS_NDIR_H */ # ifdef HAVE_SYS_DIR_H # include # endif /* HAVE_SYS_DIR_H */ # ifdef HAVE_NDIR_H # include # endif /* HAVE_NDIR_H */ #endif /* HAVE_DIRENT_H */ #include "zmsignal.h" #ifdef HAVE_MMAP #include #endif #include "scheduler.h" #include "prototypes.h" #include "zsyslog.h" #include "libz.h" #include extern int optind; extern char *optarg; #ifndef MAXNAMLEN /* POSIX.1 ... */ #define MAXNAMLEN NAME_MAX #endif #ifndef _IOLBF # define _IOLBF 0200 #endif /* !_IOLBF */ #ifdef HONEYBUM/* not really SVID, just this stupid honeywell compiler */ # define MAX_ENTRIES 3000 #else /* sane pcc */ # define MAX_ENTRIES 10000 #endif /* honeywell pcc */ struct sptree *spt_mesh[SIZE_L]; #ifdef MALLOC_TRACE struct conshell *envarlist = NULL; #endif /* MALLOC_TRACE */ #define TRANSPORTMAXNOFILES 32 /* Number of files a transporter may need open -- or any of its children.. */ int transportmaxnofiles = TRANSPORTMAXNOFILES; /* Default value */ const char * progname; extern const char * postoffice; const char * rendezvous; const char * pidfile = PID_SCHEDULER; const char * mailshare; const char * logfn; const char * statusfn; Sfio_t * statuslog = NULL; int slow_shutdown = 0; extern int readsockcnt; /* from transport.c: mux() */ static int mustexit = 0; static int gotalarm = 0; static int dumpq = 0; static int canexit = 0; static int rereadcf = 0; static int dlyverbose = 0; time_t sched_starttime; int do_syslog = 0; int verbose = 0; int querysocket = -1; /* fd of TCP socket to listen for queries */ int msgwriteasync = 0; int D_alloc = 0; int hungry_childs = 0; int global_wrkcnt = 0; int mailq_Q_mode = 0; int syncstart = 0; /* while set, the thread subsystem shall start no childs! */ int freeze = 0; /* For debugging, complete disable of child running.. */ int hashlevels = 0; /* How many levels of hashes are supported at the transport directory ? (To speed up the directory processing, file opens, etc.. ) */ char * procselect = NULL; /* Non-null defines channel/host specifier that is ALLOWED TO RUN, specifying this (by means of '-P chan/host' -option) will prevent running anything else, and also prevent running error processing, or job-specifier deletions. */ char * procselhost = NULL; /* Just spliced out 'host'-part of the above */ extern int forkrate_limit; /* How many forks per second ? */ int mailqmode = 1; /* ZMailer v1.0 mode on mailq */ char * mailqsock; char * notifysock; int interim_report_interval = 5*60; /* 5 minutes */ static int vtxprep_skip; static int vtxprep_skip_any; static int vtxprep_skip_lock; static time_t next_dirscan; static time_t next_idlecleanup; static time_t next_interim_report_time; static time_t next_expiry2_scan; static struct sptree *dirscan_mesh; static int newents_limit = 40000; static int newents_timelimit = 5; extern int default_full_content; /* on conf.c */ #include "memtypes.h" extern memtypes stickymem; static struct ctlfile *schedule __((int fd, const char *file, long ino, const int)); static struct ctlfile *vtxprep __((struct ctlfile *, const char *, const int)); static int vtxmatch __((struct vertex *, struct config_entry *)); static void link_in __((int flag, struct vertex *vp, const char *s)); static int lockverify __((struct ctlfile *, const char *, const int)); static int globmatch __((const char *, const char*)); static void vtxdo __((struct vertex *, struct config_entry *, const char *)); extern void cfp_mksubdirs __((const char *, const char*)); static RETSIGTYPE sig_exit __((int sig)); static RETSIGTYPE sig_quit __((int sig)); static RETSIGTYPE sig_alarm __((int sig)); static RETSIGTYPE sig_iot __((int sig)); static RETSIGTYPE sig_readcf __((int sig)); extern char *strerror __((int err)); extern time_t mytime __((time_t *)); static void init_timeserver __((void)); #ifdef HAVE_SELECT /* Well, not exactly kosher assumption.. */ /* extern int gettimeofday __((struct timeval *,)); */ #else extern time_t time __((time_t *)); #endif static int loginitsched __((int)); static int loginitsched(sig) int sig; { int flags; if (logfn != NULL) { sfsync(sfstdout); sfseek(sfstdout, 0, 0); sfsync(sfstderr); sfseek(sfstderr, 0, 0); if (sfopen(sfstdout, logfn, "a") != sfstdout || dup2(sffileno(sfstdout), sffileno(sfstderr)) < 0) { /* sigh */ /* XX: stderr might be closed at this point... */ sfprintf(sfstderr, "%s: cannot open log: %s, errno=%d\n", progname, logfn, errno); return -1; } #if defined(F_SETFL) && defined(O_APPEND) flags = fcntl(sffileno(sfstdout), F_GETFL, 0); flags |= O_APPEND; fcntl(sffileno(sfstdout), F_SETFL, flags); #endif /* F_SETFL */ #if defined(F_SETFD) fcntl(sffileno(sfstdout), F_SETFD, 1); /* close-on-exec */ #endif sfset(sfstdout, SF_LINE, 1); sfset(sfstderr, SF_LINE, 1); } if (statusfn != NULL && statuslog != NULL) { sfclose(statuslog); if (sfopen(NULL, statusfn, "a") != statuslog) { sfprintf(sfstderr,"%s: cannot open statuslog: %s, errno=%d\n", progname, statusfn, errno); return -1; } sfset(statuslog, SF_WHOLE, 1); #if defined(F_SETFD) fcntl(sffileno(statuslog), F_SETFD, 1); /* close-on-exec */ #endif } SIGNAL_HANDLE(SIGHUP, (RETSIGTYPE(*)__((int))) loginitsched); return 0; } /* * A self-timed log re-init caller to make sure, that * no log file is kept open for more than 30 seconds * in one go. This allows (within 30 seconds) the files * to be rotates at first to new names, then after that * 30 second delay, to be further processed - e.g. compressed. * */ void timed_log_reinit() { static time_t next_log_reinit; mytime(&now); if (next_log_reinit < now) { struct stat stbuf; next_log_reinit = now + 25; loginitsched(SIGHUP); if (logfn != NULL && lstat(logfn,&stbuf) == 0 && S_ISREG(stbuf.st_mode)) { long fsspace; fsspace = free_fd_statfs(sffileno(sfstdout)); MIBMtaEntry->sys.LogFreeSpace = fsspace; fsspace = used_fd_statfs(sffileno(sfstdout)); MIBMtaEntry->sys.LogUsedSpace = fsspace; } } } struct dirstatname { struct stat st; long ino; time_t not_before; char name[1]; /* Allocate enough size */ }; struct dirqueue { int wrksum; int sorted; int wrkcount; int wrkspace; int wrkcount2; /* Back-pushed material a *2 -queue */ int wrkspace2; struct dirstatname **stats; struct dirstatname **stats2; }; static int dirqueuescan __((const char *dir,struct dirqueue *dq, int subdirs)); int syncweb __((struct dirqueue *dq)); int global_maxkids = 1000; time_t now; Sfio_t * vfp_open(cfp) struct ctlfile *cfp; { Sfio_t *vfp; int fd; if (!cfp->vfpfn) return NULL; /* Open the vfp *ONLY* if the logging file exists, and can be written to. If the file does not exist, no logging shall happen! */ SETEUID(cfp->uid); fd = open(cfp->vfpfn, O_WRONLY|O_APPEND, 0); SETEUID(0); if (fd < 0) return NULL; /* Can't open it! */ vfp = sfnew(NULL, NULL, 0, fd, SF_WRITE|SF_APPENDWR|SF_LINE); if (!vfp) return NULL; /* Failure to open */ sfseek(vfp, (Sfoff_t)0, SEEK_END); return vfp; } static void cfp_free __((struct ctlfile *cfp, struct spblk *spl)); static void cfp_free0 __((struct ctlfile *cfp)); static void cfp_free0(cfp) struct ctlfile *cfp; { struct vertex *vp, *nvp; /* Delete from memory */ if (cfp->vfpfn) { Sfio_t *vfp = vfp_open(cfp); if (vfp) { sfprintf(vfp, "ordered deletion of task file from scheduler memory (%s)\n", cfp->mid); sfclose(vfp); } } /* Throw all way, if no vertices.. */ if (!cfp->head) { /* This should *not* be happening.. */ sfprintf(sfstderr, "%s: SHOULD NOT HAPPEN: cfp->head == NULL; spoolid: %s\n", progname, cfp->spoolid); unctlfile(cfp, 1); return; } for (vp = cfp->head; vp != NULL; vp = nvp) { /* Stash the next pointer, because the VP content WILL become trashed at the end of the loop. */ nvp = vp->next[L_CTLFILE]; MIBMtaEntry->sc.StoredRecipientsSc -= vp->ngroup; vp->ngroup = 0; unvertex(vp,1,1); /* Don't unlink()! Just free()! */ } /* CFP freeup happens at last vertex's unvertex() */ } static void cfp_free(cfp, spl) struct ctlfile *cfp; struct spblk *spl; { /* Delete from the spt_mesh[] */ if (spl == NULL && cfp->id) spl = sp_lookup((u_long)(cfp->id), spt_mesh[L_CTLFILE]); if (spl != NULL) sp_delete(spl, spt_mesh[L_CTLFILE]); /* And from the memory */ cfp_free0(cfp); } static int ctl_free __((struct spblk *spl)); static int ctl_free(spl) struct spblk *spl; { cfp_free((struct ctlfile *)spl->data, NULL); return 0; } /* * free_cfp_memory(cfp) -- release all of the memory associated with the * control file -- some of it is conditionally allocated, or perhaps * previously released.. (cfp->contents, for example) */ void free_cfp_memory(cfp) struct ctlfile *cfp; { if (cfp->contents) free(cfp->contents); if (cfp->vfpfn) free(cfp->vfpfn); if (cfp->spoolid) free(cfp->spoolid); if (cfp->mid) free(cfp->mid); if (cfp->erroraddr) free(cfp->erroraddr); if (cfp->logident) free(cfp->logident); if (cfp->envid) free(cfp->envid); /* memset(cfp, 0x55, sizeof(*cfp)); */ free((char *)cfp); --global_wrkcnt; --MIBMtaEntry->sc.StoredMessagesSc; } struct dirqueue dirqb = { 0, }; struct dirqueue *dirq = &dirqb; char *ArgvSave; const char *EOArgvSave; extern int main __((int, const char **)); static struct config_entry *cehead = NULL; /* #define GLIBC_MALLOC_DEBUG__ */ #ifdef GLIBC_MALLOC_DEBUG__ /* memory allocation debugging with GLIBC */ #include /* GLIBC malloc.h ! */ /* Global variables used to hold underlaying hook values. */ static void *(*old_malloc_hook) (size_t, const void * ); static void *(*old_realloc_hook) (void *, size_t, const void *); static void (*old_free_hook) (void*, const void *); static void *(*old_memalign_hook) (size_t, size_t, const void *); /* Prototypes for our hooks. */ static void *my_malloc_hook (size_t, const void*); static void *my_realloc_hook (void *,size_t, const void*); static void my_free_hook (void*, const void*); static void *my_memalign_hook (size_t, size_t, const void*); static void * my_malloc_hook (size_t size, const void *CALLER) { void *result; /* Restore all old hooks */ __malloc_hook = old_malloc_hook; __free_hook = old_free_hook; /* Call recursively */ result = malloc (size); /* Save underlaying hooks */ old_malloc_hook = __malloc_hook; old_free_hook = __free_hook; /* `printf' might call `malloc', so protect it too. */ sfprintf(sfstderr,"# malloc (%u) returns %p @%p\n", (unsigned int) size, result, CALLER); /* Restore our own hooks */ __malloc_hook = my_malloc_hook; __free_hook = my_free_hook; return result; } static void * my_realloc_hook (void *ptr, size_t size, const void *CALLER) { void *result; /* Restore all old hooks */ __realloc_hook = old_realloc_hook; __malloc_hook = old_malloc_hook; __free_hook = old_free_hook; /* Call recursively */ result = realloc (ptr, size); /* Save underlaying hooks */ old_realloc_hook = __realloc_hook; old_malloc_hook = __malloc_hook; old_free_hook = __free_hook; /* `printf' might call `malloc', so protect it too. */ sfprintf(sfstderr,"# realloc (%p,%u) returns %p @%p\n", ptr, (unsigned int) size, result, CALLER); /* Restore our own hooks */ __realloc_hook = my_realloc_hook; __malloc_hook = my_malloc_hook; __free_hook = my_free_hook; return result; } static void * my_memalign_hook (size_t align, size_t size, const void *CALLER) { void *result; /* Restore all old hooks */ __memalign_hook = old_memalign_hook; __malloc_hook = old_malloc_hook; __free_hook = old_free_hook; /* Call recursively */ result = memalign (align, size); /* Save underlaying hooks */ old_memalign_hook = __memalign_hook; old_malloc_hook = __malloc_hook; old_free_hook = __free_hook; /* `printf' might call `malloc', so protect it too. */ sfprintf(sfstderr,"# memalign (%u,%u) returns %p @%p\n", (unsigned)align, (unsigned)size, result, CALLER); /* Restore our own hooks */ __memalign_hook = my_memalign_hook; __malloc_hook = my_malloc_hook; __free_hook = my_free_hook; return result; } static void my_free_hook (void *ptr, const void *CALLER) { /* Restore all old hooks */ __malloc_hook = old_malloc_hook; __free_hook = old_free_hook; /* Call recursively */ free (ptr); /* Save underlaying hooks */ old_malloc_hook = __malloc_hook; old_free_hook = __free_hook; /* `printf' might call `free', so protect it too. */ sfprintf(sfstderr,"# freed pointer %p @%p\n", ptr, CALLER); /* Restore our own hooks */ __malloc_hook = my_malloc_hook; __free_hook = my_free_hook; } #endif int main(argc, argv) int argc; const char *argv[]; { struct ctlfile *cfp; const char *config, *cp; int i, daemonflg, c, errflg, version, fd; long offout, offerr; const char *t, *syslogflg; #ifdef GLIBC_MALLOC_DEBUG__ /* memory allocation debugging with GLIBC */ old_malloc_hook = __malloc_hook; __malloc_hook = my_malloc_hook; old_memalign_hook = __memalign_hook; __memalign_hook = my_memalign_hook; old_realloc_hook = __realloc_hook; __realloc_hook = my_realloc_hook; old_free_hook = __free_hook; __free_hook = my_free_hook; #endif #ifdef HAVE_SETGROUPS /* We null supplementary groups list entirely */ setgroups(0, NULL); #endif freeze = 0; /* setlinebuf(stderr); -- no need for this ? */ ArgvSave = (char *) argv[0]; EOArgvSave = argv[argc-1] + strlen(argv[argc-1]) + 1; mytime(&sched_starttime); memset(&dirqb,0,sizeof(dirqb)); dirscan_mesh = sp_init(); if ((progname = strrchr(argv[0], '/')) == NULL) progname = argv[0]; else ++progname; stickymem = MEM_MALLOC; resources_maximize_nofiles(); /* The theory is, that scheduler needs circa 30 fd's for its own uses, and it will use all others on child-process communication fifos. */ /* Of these, 20 are for MQ2 sockets */ global_maxkids = resources_query_nofiles()-30; /* Probe how many FDs each TA needs! */ { int to[2], from[2]; pipes_create(to,from); close(to[0]); close(to[1]); if (to[0] != from[1]) { /* Two pipes! */ close(from[0]); close(from[1]); global_maxkids >>= 1; /* Half the MAXKIDS count */ } /* else Socketpair or equivalent bidirectional one! Only one FD per child is used! */ } postoffice = rendezvous = logfn = statusfn = config = NULL; daemonflg = 1; dlyverbose = 0; syncstart = 0; verbose = errflg = version = 0; for (;;) { c = getopt(argc, (char*const*)argv, "divE:f:Fl:HL:M:nN:p:P:q:QR:SVWZ:"); if (c == EOF) break; switch (c) { case 'f': /* override default config file */ config = optarg; break; case 'E': sscanf(optarg, "%d,%d", &newents_limit, &newents_timelimit); if (newents_limit < 10) newents_limit = 10; if (newents_timelimit < 2) newents_timelimit = 2; if (newents_timelimit > 15) newents_timelimit = 15; break; case 'F': freeze = 1; break; case 'l': statusfn = optarg; statuslog = sfopen(NULL, statusfn, "a"); if (!statuslog) { perror("Can't open statistics log file (-l)"); exit(1); } sfset(statuslog, SF_LINE, 1); #if defined(F_SETFD) fcntl(sffileno(statuslog), F_SETFD, 1); /* close-on-exec */ #endif break; case 'H': if (hashlevels < 2) ++hashlevels; break; case 'L': /* override default log file */ logfn = optarg; break; case 'M': mailqmode = atoi(optarg); if (mailqmode < 1 || mailqmode > 2) { sfprintf(sfstderr,"scheduler: -M parameter is either 1, or 2\n"); exit(EX_USAGE); } break; case 'n': default_full_content = !default_full_content; break; case 'N': if ((transportmaxnofiles = atoi(optarg)) < 10) transportmaxnofiles = TRANSPORTMAXNOFILES; break; case 'p': procselect = optarg; procselhost = strchr(procselect,'/'); if (procselhost) *procselhost++ = 0; else { sfprintf(sfstderr,"scheduler: -p parameter is of form: channel/host\n"); exit(64); } break; case 'P': /* override default postoffice */ postoffice = optarg; break; case 'Q': mailq_Q_mode = 1; break; case 'q': /* override default mail queue rendezvous */ rendezvous = optarg; break; case 'R': /* How many new childs per second to be spawned ? */ forkrate_limit = atoi(optarg); if (forkrate_limit < 1) forkrate_limit = 1; break; case 'v': /* be verbose and synchronous */ ++verbose; daemonflg = 0; break; case 'W': dlyverbose = verbose; verbose = 0; break; case 'd': /* daemon again */ daemonflg = 1; break; case 'i': /* interactive */ daemonflg = 0; break; case 'V': version = 1; daemonflg = 0; break; case 'S': syncstart = 1; break; case 'Z': if (readzenv(optarg) == 0) ++errflg; break; case '?': default: errflg++; break; } } if (errflg) { sfprintf(sfstderr, "Usage: %s [-dHisvV -M (1|2) -f configfile -L logfile -P postoffice -Q rendezvous -Z zenvfile]\n", progname); exit(128+errflg); } syslogflg = getzenv("SYSLOGFLG"); if (syslogflg == NULL) syslogflg = ""; t = syslogflg; for ( ; *t ; ++t ) { if (*t == 'c' || *t == 'C') break; } do_syslog = (*t != '\0'); cp = getzenv("SCHEDULERDIRHASH"); if (cp){ if ((cp[0] == '1' || cp[0] == '2') && cp[1] == 0) hashlevels = cp[0] - '0'; } mailshare = getzenv("MAILSHARE"); if (mailshare == NULL) mailshare = MAILSHARE; cp = getzenv("LOGDIR"); if (cp != NULL) qlogdir = cp; if (daemonflg && logfn == NULL) { logfn = emalloc(2 + (u_int)(strlen(qlogdir) + strlen(progname))); sprintf((char*)logfn, "%s/%s", qlogdir, progname); } /* If we are a daemon, or doing verbose foobar, attach the SHM MIB now */ if (daemonflg || verbose) { int r = Z_SHM_MIB_Attach (1); if (r < 0) { /* Error processing -- magic set of constants: */ switch (r) { case -1: /* fprintf(stderr, "No ZENV variable: SNMPSHAREDFILE\n"); */ break; case -2: perror("Failed to open for exclusively creating of the SHMSHAREDFILE"); break; case -3: perror("Failure during creation fill of SGMSHAREDFILE"); break; case -4: perror("Failed to open the SHMSHAREDFILE at all"); break; case -5: perror("The SHMSHAREDFILE isn't of proper size! "); break; case -6: perror("Failed to mmap() of SHMSHAREDFILE into memory"); break; case -7: fprintf(stderr, "The SHMSHAREDFILE has magic value mismatch!\n"); break; default: break; } /* return; NO giving up! */ } } if (logfn != NULL) { /* loginit is a signal handler, so can't pass log */ if (loginitsched(SIGHUP) < 0) /* do setlinebuf() there */ die(1, "log initialization failure"); /* close and reopen log files */ SIGNAL_HANDLE(SIGHUP, (RETSIGTYPE(*)__((int))) loginitsched); } else { SIGNAL_IGNORE(SIGHUP); /* no surprises please */ sfset(sfstdout, SF_LINE, 1); } #ifdef USE_SIGREAPER # ifdef SIGCLD SIGNAL_HANDLE(SIGCLD, sig_chld); # else SIGNAL_HANDLE(SIGCHLD, sig_chld); # endif #else # ifdef SIGCLD SIGNAL_HANDLE(SIGCLD,SIG_IGN); /* Auto-reap the kids.. */ # else SIGNAL_HANDLE(SIGCHLD,SIG_IGN); # endif #endif #ifdef SIGUSR1 SIGNAL_HANDLE(SIGUSR1, sig_readcf); #endif /* SIGUSR1 */ if (verbose || version) { prversion("scheduler"); if (version) exit(0); sfputc(sfstderr, '\n'); } offout = sftell(sfstdout); offerr = sftell(sfstderr); if (config == NULL) { config = emalloc(3 + (u_int)(strlen(mailshare) + strlen(progname) + strlen(qcf_suffix))); sprintf((char*)config, "%s/%s.%s", mailshare, progname, qcf_suffix); } cehead = readconfig(config); if (cehead == NULL) { cp = emalloc(strlen(config)+50); sprintf((char*)cp, "null control file, probably errors in it: %s", config); die(1, cp); /* NOTREACHED */ } if (postoffice == NULL && (postoffice = getzenv("POSTOFFICE")) == NULL) postoffice = POSTOFFICE; if (chdir(postoffice) < 0 || chdir(TRANSPORTDIR) < 0) sfprintf(sfstderr, "%s: cannot chdir to %s/%s.\n", progname, postoffice, TRANSPORTDIR); if (rendezvous == NULL && (rendezvous = getzenv("RENDEZVOUS")) == NULL) rendezvous = qoutputfile; if (daemonflg) { /* X: check if another daemon is running already */ if (!verbose && (offout < sftell(sfstdout) || offerr < sftell(sfstderr))) { sfprintf(sfstderr, "%ld %ld %ld %ld\n", offout, sftell(sfstdout), offerr, sftell(sfstderr)); sfprintf(sfstderr, "%s: daemon not started.\n", progname); die(1, "too many scheduler daemons"); /* NOTREACHED */ } detach(); /* leave worldy matters behind */ mytime(&now); sfprintf(sfstdout, "%s: scheduler daemon (%s)\n\tpid %d started at %s\n", progname, Version, (int)getpid(), (char *)rfc822date(&now)); } /* Now we are either interactive, or daemon, lets attach monitoring memory block.. and fill it in. */ MIBMtaEntry->sys.SchedulerMasterPID = getpid(); MIBMtaEntry->sys.SchedulerMasterStartTime = time(NULL); MIBMtaEntry->sys.SchedulerMasterStarts += 1; /* Zero the gauges at our startup.. */ MIBMtaEntry->sc.StoredMessagesSc = 0; MIBMtaEntry->sc.StoredThreadsSc = 0; MIBMtaEntry->sc.StoredVerticesSc = 0; MIBMtaEntry->sc.StoredRecipientsSc = 0; MIBMtaEntry->sc.StoredVolumeSc = 0; MIBMtaEntry->sc.TransportAgentProcessesSc = 0; MIBMtaEntry->sc.TransportAgentsActiveSc = 0; MIBMtaEntry->sc.TransportAgentsIdleSc = 0; /* Actually we want this to act as daemon, even when not in daemon mode.. */ if (killprevious(SIGTERM, pidfile) != 0) { sfprintf(sfstdout, "%s: Can't write my pid to a file ?? Out of diskspace ??\n",progname); die(1,"Can't write scheduler pid to a file!?"); /* NOTREACHED */ } for (i = 0; i < SIZE_L; ++i) { spt_mesh[i] = sp_init(); spt_mesh[i]->symbols = sp_init(); } zopenlog("scheduler", LOG_PID, LOG_MAIL); mustexit = gotalarm = dumpq = rereadcf = 0; canexit = 0; SIGNAL_IGNORE(SIGPIPE); SIGNAL_HANDLE(SIGALRM, sig_alarm); /* process agenda */ SIGNAL_HANDLE(SIGUSR2, sig_iot); /* dump queue info */ SIGNAL_HANDLE(SIGTERM, sig_exit); /* split */ SIGNAL_HANDLE(SIGQUIT, sig_quit); /* Slow shutdown */ /* call it to create the timeserver -- if possible */ init_timeserver(); /* Will take around 3-4 secs.. */ if (optind < argc) { /* process the specified control files only */ for (; optind < argc; ++optind) { long ino = atol(argv[optind]); if ((fd = eopen(argv[optind], O_RDWR, 0)) < 0) continue; /* the close(fd) is done in vtxprep */ cfp = schedule(fd, argv[optind], ino, 0); if (cfp == NULL) { if (verbose) sfprintf(sfstderr, "Nothing scheduled for %s!\n", argv[optind]); } else eunlink(argv[optind], "sch-argv"); } doagenda(); killpidfile(pidfile); exit(0); } queryipcinit(); dirqueuescan(".", dirq, 1); vtxprep_skip_lock = 0; syncweb(dirq); canexit = 1; #ifdef MALLOC_TRACE mal_leaktrace(1); #endif /* MALLOC_TRACE */ /* If we do a sync-start, we are synchronous.. */ if (syncstart) { int startcount; dirqueuescan(".", dirq, 1); startcount = dirq->wrksum; while (dirq->wrksum > 0 && !mustexit) { if (syncweb(dirq) < 0) break; queryipccheck(); } sfprintf(sfstderr,"Synchronous startup completed, messages: %d (%d skipped) recipients: %d\n", global_wrkcnt, startcount - global_wrkcnt, thread_count_recipients()); sfprintf(sfstderr,"***********************************************************************\n"); syncstart = 0; } if (dlyverbose) verbose = dlyverbose; do { time_t timeout; mytime(&now); canexit = 0; /* SAH Hmm... next_dirscan is NOT reset unless we find < 150 * new files in dirqueuescan - meaning this will be true. * So a 'busy' system will potentially be running dirqueuescan() * EVERY pass thru the loop... This could be bad */ if (now >= next_dirscan) { /* Directory scan time for new jobs .. */ /* Do it recursively every now and then, so that if we forget some jobs, they will become relearned soon enough. */ int wrk; i = dirqueuescan(".", dirq, (now >= next_idlecleanup)); mytime(&now); wrk = dirq->wrksum; wrk >>= 5; /* Divide by 32 -- just presume 32 msgs/sec.. */ if (wrk > 10) wrk = 10; /* But limit to 10... */ next_dirscan = now + sweepinterval + wrk; /* 10 .. 20 second sweep interval */ } if (now >= next_idlecleanup) { idle_cleanup(); if (next_idlecleanup == 0) next_idlecleanup = now; /* At regular intervals... */ next_idlecleanup += idle_sweepinterval; } if (now >= next_expiry2_scan) { if (next_expiry2_scan == 0) next_expiry2_scan = now; i = doexpiry2(); if (i == 0) /* At regular intervals... */ next_expiry2_scan += expiry2_sweepinterval; } if (now >= next_interim_report_time) { if (next_interim_report_time == 0) next_interim_report_time = now; /* At regular intervals... */ next_interim_report_time += interim_report_interval; interim_report_run(); } /* See when to timeout from mux() */ timeout = next_dirscan; /* If we still have things in pre-scheduler queue... */ if (dirq->wrksum > 0 && dirq->wrkcount > 0 && dirq->stats[ dirq->wrkcount-1 ]->not_before <= now) timeout = now; /* Submit possible new jobs (unless frozen) */ if (!freeze && !syncstart && doagenda() != 0) timeout = now; /* we still have some jobs avail for start */ gotalarm = 0; canexit = 1; /* mux on the stdout of children */ for (;;) { if (dumpq) { qprint(-1); dumpq = 0; } if (rereadcf) { cehead = rereadconfig(cehead, config); rereadcf = 0; } mytime(&now); /* Submit one item from pre-scheduler-queue into the scheduler */ if (dirq->wrksum > 1 ) { /* If more things in queue, submit them up to 200 pieces, or until spent two seconds at it! */ time_t now0 = now + 2; for (i=0; i < 200 && now < now0; ++i) { if (syncweb(dirq) < 0) break; /* Out of queue! */ mytime(&now); } } if (dirq->wrksum > 0) syncweb(dirq); if (slow_shutdown) { timeout = now+2; shutdown_kids(); /* If there are any to shut down.. */ } if (!(mux(timeout) == 0 && !gotalarm && dirq->wrksum == 0 && !mustexit && timeout > now)) break; if (readsockcnt == 0 && slow_shutdown) { /* No more childs to read, hop hop die away! */ mustexit = 1; break; } } } while (!mustexit); /* Doing nicely we would kill the childs here, but we are not such a nice people -- we just discard incore data, and crash out.. */ sp_scan(ctl_free, NULL, spt_mesh[L_CTLFILE]); #ifdef MALLOC_TRACE mal_dumpleaktrace(stderr); #endif /* MALLOC_TRACE */ killpidfile(pidfile); if (mustexit) die(0, "signal"); return 0; } static RETSIGTYPE sig_exit(sig) int sig; { if (querysocket >= 0) { /* give up mailq socket asap */ close(querysocket); querysocket = -1; } if (canexit) die(0, "signal"); mustexit = 1; } static RETSIGTYPE sig_quit(sig) int sig; { slow_shutdown = 1; freeze = 1; } #ifdef SIGUSR2 static RETSIGTYPE sig_iot(sig) int sig; { dumpq = 1; SIGNAL_HANDLE(SIGUSR2, sig_iot); } #endif #ifdef SIGUSR1 static RETSIGTYPE sig_readcf(sig) int sig; { rereadcf = 1; SIGNAL_HANDLE(SIGUSR1, sig_readcf); } #endif /* * Absorb any new files that showed up in our directory. */ int dq_insert(DQ, ino, file, delay) void *DQ; long ino; const char *file; int delay; { struct stat stbuf; struct dirstatname *dsn; struct dirqueue *dq = DQ; if (!ino) return 1; /* Well, actually it isn't, but we "makebelieve" */ mytime(&now); if (dq == NULL) dq = dirq; if (lstat(file,&stbuf) != 0 || !S_ISREG(stbuf.st_mode)) { /* Not a regular file.. Let it be there for the manager to wonder.. */ return -1; } /* if (DQ == NULL) sfprintf(sfstderr,"dq_insert(NULL,ino=%ld, file='%s', delay=%d)\n", ino,file,delay); */ /* Is it already in the database ? */ if (sp_lookup((u_long)ino,dirscan_mesh) != NULL) { sfprintf(sfstderr,"scheduler: tried to dq_insert(ino=%ld, file='%s') already in queue\n",ino, file); return 1; /* It is! */ } /* Now store the entry */ dsn = (struct dirstatname*)emalloc(sizeof(*dsn)+strlen(file)+1); memcpy(&(dsn->st),&stbuf,sizeof(stbuf)); dsn->ino = ino; dsn->not_before = now + delay; strcpy(dsn->name,file); sp_install(ino, (void *)dsn, 0, dirscan_mesh); /* Put the new entry into the normal queue, unless there is stuff at the delayed queue, OR the delay is positive! */ if (delay <= 0 && dq->wrkcount2 == 0) { /* Into the normal queue */ if (dq->wrkspace <= dq->wrkcount) { /* Increase the space */ dq->wrkspace = dq->wrkspace ? dq->wrkspace << 1 : 8; /* malloc(size) = realloc(NULL,size) */ dq->stats = (struct dirstatname**)erealloc(dq->stats, sizeof(void*) * dq->wrkspace); } dq->stats[dq->wrkcount] = dsn; dq->wrkcount += 1; dq->wrksum += 1; dq->sorted = 0; } else { /* Into the DELAYED QUEUE */ if (dq->wrkspace2 <= dq->wrkcount2) { /* Increase the space */ dq->wrkspace2 = dq->wrkspace2 ? dq->wrkspace2 << 1 : 8; /* malloc(size) = realloc(NULL,size) */ dq->stats2 = (struct dirstatname**)erealloc(dq->stats2, sizeof(void*) * dq->wrkspace2); } dq->stats2[dq->wrkcount2] = dsn; dq->wrkcount2 += 1; dq->wrksum += 1; } ++MIBMtaEntry->sc.ReceivedMessagesSc; return 0; } int in_dirscanqueue(DQ,ino) void *DQ; long ino; { struct dirqueue *dq = DQ; if (dq == NULL) dq = dirq; /* Return 1, if can find the "ino" in the queue */ if (dq->wrksum == 0 || dirscan_mesh == NULL) return 0; if (sp_lookup((u_long)ino,dirscan_mesh) != NULL) return 1; return 0; } static int dq_ctimecompare __((const void *, const void *)); static int dq_ctimecompare(b,a) /* we want oldest entry LAST */ const void *a; const void *b; { const struct dirstatname **A = (const struct dirstatname **)a; const struct dirstatname **B = (const struct dirstatname **)b; int rc; #if 0 if ((*A)->not_before > now) return 1; if ((*B)->not_before > now) return -1; #endif rc = ((*A)->st.st_ctime - (*B)->st.st_ctime); return rc; } static int dirqueuescan(dir, dq, subdirs) const char *dir; struct dirqueue *dq; int subdirs; { DIR *dirp; struct dirent *dp; struct stat stbuf; char file[MAXNAMLEN+1]; int newents = 0; static time_t timelimit; time_t then; mytime( &now ); then = now; if (dir && dir[0] == '.' && dir[1] == 0) { timelimit = now + newents_timelimit; } #if 0 static time_t modtime = 0; /* Any changes lately ? */ if (estat(dir, &stbuf) < 0) return -1; /* Could not stat.. */ if (stbuf.st_mtime == modtime) return 0; /* any changes lately ? */ modtime = stbuf.st_mtime; #endif if (verbose && dir[0] == '.') { sfprintf(sfstdout, "dirqueuescan(dir='%s') ",dir); sfsync(sfstdout); } /* Some changes lately, open the dir and read it */ dirp = opendir(dir); if (!dirp) { sfprintf(sfstderr,"A 'this can never fail' opendir('%s') failed!; errno=%d (%s)\n",dir,errno,strerror(errno)); return 0; } for (dp = readdir(dirp); dp != NULL; dp = readdir(dirp)) { /* Scan filenames into memory */ /* Here we are presuming the time retrieval is essentially _free_ in form of shared memory segment.. */ mytime( &now ); if (now > then) { then = now; queryipccheck(); } if (!syncstart && now > timelimit) break; /* At most newents_timelimit seconds in this scanner... */ if (!syncstart && newents > newents_limit) break; /* At most NNN per one go */ if (subdirs && dp->d_name[0] >= 'A' && dp->d_name[0] <= 'Z' && dp->d_name[1] == 0 ) { /* We do this recursively.. */ if (dir[0] == '.' && dir[1] == 0) strcpy(file, dp->d_name); else sprintf(file, "%s/%s", dir, dp->d_name); if (lstat(file,&stbuf) != 0 || !S_ISDIR(stbuf.st_mode)) { /* Not a directory.. Let it be there for the manager to wonder.. */ continue; } /* Recurse into levels below.. */ newents += dirqueuescan(file, dq, subdirs); continue; } /* End of directories of names "A" .. "Z" */ if (dp->d_name[0] >= '0' && dp->d_name[0] <= '9') { /* A file whose name STARTS with a number (digit) */ long ino = atol(dp->d_name); if (in_dirscanqueue(dq,ino)) /* Already in pre-schedule-queue */ continue; if (dir[0] == '.' && dir[1] == 0) strcpy(file, dp->d_name); else sprintf(file, "%s/%s", dir, dp->d_name); /* We may have this file in processing state... */ { struct spblk *spl; spl = sp_lookup((u_long)ino, spt_mesh[L_CTLFILE]); if (spl != NULL) { /* Already in processing, don't touch.. */ /*printf("File: %s active (not previously locked)\n",file);*/ ++vtxprep_skip_any; continue; } } if (dq_insert(dq, ino, file, -1)) continue; ++newents; } /* ... end of "filename starts with a [0-9]" */ } #ifdef BUGGY_CLOSEDIR /* * Major serious bug time here; some closedir()'s * free dirp before referring to dirp->dd_fd. GRRR. * XX: remove this when bug is eradicated from System V's. */ close(dirp->dd_fd); #endif closedir(dirp); if (verbose && dir[0] == '.') sfprintf(sfstdout,"wrksum=%d new=%d\n",dq->wrksum,newents); return newents; } int syncweb(dq) struct dirqueue *dq; { struct stat *stbuf; char *file; struct dirstatname *dqstats; struct spblk *spl; int wrkcnt = 0; int wrkidx = dq->wrkcount -1; long ino; /* Any work to do ? */ if (dq->wrksum == 0) return -1; /* Be responsive, check query channel */ queryipccheck(); if (dq->wrkcount == 0 && dq->wrkcount2 > 0) { /* Ok, the primary queue is empty, move the delayed queue into the primary one. */ if (dq->stats != NULL) free(dq->stats); dq->stats = dq->stats2; dq->stats2 = NULL; dq->wrkcount = dq->wrkcount2; dq->wrkcount2 = 0; dq->wrkspace = dq->wrkspace2; dq->wrkspace2 = 0; wrkidx = dq->wrkcount - 1; /* SAH ReSync wrkidx */ dq->sorted = 0; } if (!dq->sorted && dq->wrkcount > 1) { /* Sort the dq->stats[] per file ctime -- LATEST on slot 0. (we drain this queue from the END) */ mytime(&now); qsort((void*)dq->stats, dq->wrkcount, sizeof(void*), dq_ctimecompare ); dq->sorted = 1; } mytime(&now); for (; wrkidx >= 0; --wrkidx) { /* Process only one -- from the tail -- but leave 'not_before' things behind... */ if (dq->stats[wrkidx]->not_before > now) continue; /* WAIT! */ break; } /* Ok some, decrement the count to change it to index */ dq->wrkcount -= 1; dq->wrksum -= 1; dqstats = dq->stats[wrkidx]; file = dqstats->name; stbuf = &(dqstats->st); ino = dqstats->ino; if (dq->wrkcount > wrkidx) { /* Skipped some ! Compact the array ! */ memcpy( &dq->stats[wrkidx], &dq->stats[wrkidx+1], sizeof(dq->stats[0]) * (dq->wrkcount - wrkidx)); } dq->stats[dq->wrkcount] = NULL; /* Now we have pointers */ /* Deletion from the dirscan_mesh should ALWAYS succeed.. */ spl = sp_lookup((u_long)ino, dirscan_mesh); if (spl != NULL) sp_delete(spl,dirscan_mesh); /* Sometimes we may get files already in processing into our pre-schedule queue */ /* Already in processing ? */ spl = sp_lookup((u_long)ino, spt_mesh[L_CTLFILE]); if (spl == NULL) { /* Not yet in processing! */ int fd; /* Can open ? */ if ((fd = eopen(file, O_RDWR, 0)) < 0) { /* Don't unlink here, ... */ /* errno == EMFILE --> out of FDs! URGH! */ #if 0 if (getuid() == 0) eunlink(file,"sch-syncweb"); /* hrmpf! */ #endif } else { /* Ok, schedule! */ if (schedule(fd, file, ino, 0) != NULL) { /* Success, increment counters */ ++wrkcnt; } } } /* Free the pre-schedule queue entry */ free(dqstats); return wrkcnt; } static int sync_cfps __((struct ctlfile *,struct ctlfile *,struct procinfo*)); static int sync_cfps(oldcfp, newcfp, proc) struct ctlfile *oldcfp, *newcfp; struct procinfo *proc; { struct vertex *ovp, *nvp; struct vertex *novp, *nnvp; int ovp_count = 0; /* Scan both files thru their vp->next[L_CTLFILE] vertex chains. If oldcfp has things that newcfp does not have, remove those from oldcfp chains. Chains start at *cfp->head pointer. Comparisons can be done in linear order with respective vp->orig[L_CHANNEL] and vp->orig[L_HOST] pointers to struct web.. We presume the newcfp will not contain objects that oldcfp does not have -- while theorethically possible, it is not supported skenario.. */ if (verbose) { sfprintf(sfstdout,"sync_cfps() OLDVTXES ="); for (ovp = oldcfp->head; ovp; ovp = ovp->next[L_CTLFILE]) sfprintf(sfstdout," %p{%p %p}", ovp, ovp->orig[L_CHANNEL], ovp->orig[L_HOST]); sfprintf(sfstdout,"\n"); sfprintf(sfstdout,"sync_cfps() NEWVTXES ="); for (nvp = newcfp->head; nvp; nvp = nvp->next[L_CTLFILE]) sfprintf(sfstdout," %p{%p %p}", nvp, nvp->orig[L_CHANNEL], nvp->orig[L_HOST]); sfprintf(sfstdout,"\n"); } ovp = oldcfp->head; nvp = newcfp->head; while (ovp != NULL && nvp != NULL) { ++ovp_count; /* Always prepare for removal of the ovp object.. Pick the next-ovp pointer now */ novp = ovp->next[L_CTLFILE]; nnvp = nvp->next[L_CTLFILE]; /* Does this exist also on NVP chain ? */ #define VTXMATCH(ovp,nvp) (((ovp)->orig[L_CHANNEL] == (nvp)->orig[L_CHANNEL]) && ((ovp)->orig[L_HOST] == (nvp)->orig[L_HOST])) if (VTXMATCH(ovp, nvp)) { /* Verify/adjust OVP so that OVP and NVP have same address indexes in them. */ int i, j, id; if (verbose) { sfprintf(sfstdout, " OLD ngroup=%d {", ovp->ngroup); for(i = 0; i < ovp->ngroup; ++i) sfprintf(sfstdout, " %d", ovp->index[i]); sfprintf(sfstdout, " }\n"); sfprintf(sfstdout, " NEW ngroup=%d {", nvp->ngroup); for(i = 0; i < nvp->ngroup; ++i) sfprintf(sfstdout, " %d", nvp->index[i]); sfprintf(sfstdout, " }\n"); } for (i = ovp->ngroup -1; i >= 0; --i) { id = ovp->index[i]; for (j = nvp->ngroup -1; j >= 0; --j) { if (nvp->index[j] == id) goto next_i; } /* ovp index elt not found in new set! */ for (j = i+1; j < ovp->ngroup; ++j) ovp->index[j-1] = ovp->index[j]; ovp->ngroup -= 1; MIBMtaEntry->sc.StoredRecipientsSc -= 1; next_i:; } if (ovp->ngroup <= 0) unvertex(ovp,-1,1); /* Don't unlink()! free() *just* ovp! */ ovp = novp; nvp = nnvp; continue; } if (!VTXMATCH(ovp,nvp)) { struct vertex *vp; int i; /* Uugh... Does not match :-( */ vp = ovp; while (vp && !VTXMATCH(vp,nvp)) vp = vp->next[L_CTLFILE]; if (vp == NULL) { /* New not in old at all ??? */ if (verbose) sfprintf(sfstdout," -- NEW vertex %p NOT in OLD set!\n", nvp); /* Ok, skip it, and retry with OLD vs. new NEW */ nvp = nnvp; continue; } /* All OVP instances before matching NVP are to be removed from OVP chains */ i = 0; while (ovp && ovp != vp) { novp = ovp->next[L_CTLFILE]; MIBMtaEntry->sc.StoredRecipientsSc -= vp->ngroup; ovp->ngroup = 0; unvertex(ovp,-1,1); /* Don't unlink()! free() *just* ovp! */ ++i; ++ovp_count; ovp = novp; } if (verbose) sfprintf(sfstdout," -- Deleted %d OLD vertices\n", i); /* Adjust NOVP variable too */ novp = NULL; if (ovp) novp = ovp->next[L_CTLFILE]; } ovp = novp; nvp = nnvp; } /* Ok, 'ovp' might be non-NULL while 'nvp' is already NULL */ while (ovp) { novp = ovp->next[L_CTLFILE]; MIBMtaEntry->sc.StoredRecipientsSc -= ovp->ngroup; ovp->ngroup = 0; unvertex(ovp,-1,1); /* Don't unlink()! free() *just* ovp! */ /* Leaves CFP unharmed */ ovp = novp; } if (verbose) sfprintf(sfstdout," -- Synced %d OLD vertices\n", ovp_count); oldcfp->rcpnts_failed = newcfp->rcpnts_failed; oldcfp->haderror |= newcfp->haderror; /* Now oldcfp->head might be NULL -> bad news.. */ return 0; } void resync_file(proc, file) struct procinfo *proc; const char *file; { struct stat stbuf; struct spblk *spl; long ino; const char *s; int fd; struct ctlfile *oldcfp, *newcfp; queryipccheck(); lstat(file,&stbuf); s = strrchr(file,'/'); if (s) ++s; else s = file; ino = atoi(s); /* Sometimes we may get reports of files already deleted from processing.. */ /* Already in processing ? */ spl = sp_lookup((u_long)ino, spt_mesh[L_CTLFILE]); if (spl == NULL) { #if 0 if (!in_dirscanqueue(NULL,ino)) { sfprintf(sfstdout,"Resyncing file \"%s\" (ino=%ld)", file, ino); sfprintf(sfstdout," .. not in processing database\n"); } #endif /* Not (anymore) in processing! */ return; } oldcfp = (struct ctlfile *)(spl->data); oldcfp->id = 0; /* Don't scramble spt_mesh[] later below */ if (spl != NULL) sp_delete(spl, spt_mesh[L_CTLFILE]); spl = NULL; ++ oldcfp->resynccount; sfprintf(sfstdout, "Resyncing file \"%s\" (ino=%d pid=%d of=%d ho='%s') reqcnt=%d global-work-count=%ld\n", file, (int) ino, proc->pid, proc->overfed, (proc->ho ? proc->ho->name : ""), oldcfp->resynccount, global_wrkcnt); /* sfprintf(sfstdout, " .. in processing db\n"); */ if (oldcfp->resynccount > MAXRESYNCS) { /* Sigh.. Throw everything away :-( */ oldcfp->id = ino; cfp_free(oldcfp, NULL); sfprintf(sfstdout, " ... too many Resync attempts this way, throwing it away...\n"); return; } /* cfp_free()->unvertex()->unctlfile() will do reinsertion */ /* dq_insert(NULL,ino,file,31); */ /* Now read it back! */ fd = eopen(file, O_RDWR, 0); if (fd < 0) { /* ???? */ sfprintf(sfstdout," .. FILE OPEN FAILED!\n"); /* Delete it from memory */ cfp_free(oldcfp, NULL); return; } newcfp = schedule(fd, file, ino, 1); /* rereading */ if (newcfp != NULL) { /* ???? What ever, it succeeds, or it fails, all will be well */ sync_cfps(oldcfp, newcfp, proc); newcfp->id = 0; /* Don't scramble spt_mesh[] below */ spl = sp_lookup((u_long)ino, spt_mesh[L_CTLFILE]); if (spl) sp_delete(spl, spt_mesh[L_CTLFILE]); oldcfp->id = ino; sp_install(oldcfp->id, (void *)oldcfp, 0, spt_mesh[L_CTLFILE]); /* Delete it from memory */ cfp_free0(newcfp); if (oldcfp->head == NULL) { cfp_free(oldcfp,spl); sfprintf(sfstdout," .. LOST in resync ?! (wrkcnt %ld)\n", global_wrkcnt); } else sfprintf(sfstdout," .. resynced! (wrkcnt %ld)\n", global_wrkcnt); } else { /* Sigh.. Throw everything away :-( */ oldcfp->id = ino; cfp_free(oldcfp, NULL); sfprintf(sfstdout," .. NOT resynced! (wrkcnt %ld)\n", global_wrkcnt); } } void receive_notify(fd) int fd; { char buf[1000], *s, *file; int i, ok, cnt; u_long ino; /* 10 receives at the time AT MOST */ for (cnt=10;cnt;--cnt) { i = recvfrom(fd, buf, sizeof(buf), 0, NULL, NULL); if (i <= 0 && errno != EINTR) return; if (i < 0) continue; if (i >= sizeof(buf)) i = sizeof(buf)-1; buf[i] = 0; /* The only supported notify is: "NEW X/Y/file-name" message */ if (strncmp(buf,"NEW ",4) != 0) continue; file = s = buf+4; ok = 0; if ('1' <= s[0] && s[0] <= '9' && strchr(s, '/') == NULL) { ok = 1; } else if ('A' <= s[0] && s[0] <= 'Z' && s[1] == '/') { s += 2; if ('1' <= s[0] && s[0] <= '9' && strchr(s, '/') == NULL) { ok = 1; } else if ('A' <= s[0] && s[0] <= 'Z' && s[1] == '/') { s += 2; if ('1' <= s[0] && s[0] <= '9' && strchr(s, '/') == NULL) { ok = 1; } } } ino = atol(s); #if 0 sfprintf(sfstderr,"SCH-NOTIFY: len=%d '%s' file='%s', ino=%ld ok=%d\n", i, buf, file, ino, ok); zsyslog((LOG_INFO, "Got notify: '%s'", buf)); #endif if (!ok) continue; #if 0 if (lstat(file,&stbuf) != 0) continue; if (!S_ISFILE(stbuf.st_mode)) continue; if (in_dirscanqueue(dirq,stbuf.st_ino)) continue; #endif /* We may have this file in processing state... */ { struct spblk *spl; spl = sp_lookup(ino, spt_mesh[L_CTLFILE]); if (spl != NULL) { /* Already in processing, don't touch.. */ printf("File: '%s' active (not previously locked)\n", buf+4); continue; } } dq_insert(dirq, ino, buf+4, -1); } } /* * The schedule() function is in charge of reading the control file from * the scheduler directory, and creating all the appropriate links to the * control file. * Since it is convenient to do so here, it also goes through the list * of transport directives and schedules the appropriate things to be * done at the appropriate time in the future. */ static struct ctlfile *schedule(fd, file, ino, reread) int fd; const char *file; long ino; const int reread; { struct ctlfile *cfp; struct vertex *vp; /* read and process the control file */ cfp = vtxprep(slurp(fd, ino), file, reread); if (cfp == NULL) { if (!vtxprep_skip) { /* Unless skipped.. */ eunlink(file,"sch-sch-done"); /* everything here has been processed */ if (verbose) sfprintf(sfstdout,"completed, unlink %s\n",file); return NULL; } vtxprep_skip_any += vtxprep_skip; return NULL; } if (cfp->head == NULL) { unctlfile(cfp, 0); /* Delete the file. (decrements those counters above!) */ return NULL; } if (!reread) { sp_install(cfp->id, (void *)cfp, 0, spt_mesh[L_CTLFILE]); for (vp = cfp->head; vp != NULL; vp = vp->next[L_CTLFILE]) { /* Put into the schedules */ vtxdo(vp, cehead, file); } } /* Now we have no more need for the contents in core */ if (cfp->contents != NULL) { free(cfp->contents); cfp->contents = NULL; } return cfp; } /* * slurp() gets in the job-descriptor file, and does initial * parsing on it. */ struct ctlfile * slurp(fd, ino) int fd; long ino; { register char *s; register int i; char *contents; int *offset; int offsetspace; struct stat stbuf; struct ctlfile *cfp; if (fd < 0) return NULL; if (efstat(fd, &stbuf) < 0) { close(fd); return NULL; } if (!S_ISREG(stbuf.st_mode)) { close(fd); return NULL; /* XX: give error */ } contents = emalloc((int)stbuf.st_size + 1); if (eread(fd, contents, stbuf.st_size) != stbuf.st_size) { /* slurp */ close(fd); free(contents); /* Failed to read ?!?! */ return NULL; } contents[stbuf.st_size] = 0; cfp = (struct ctlfile *)emalloc(sizeof(struct ctlfile)); memset((void*)cfp, 0, sizeof(struct ctlfile)); cfp->fd = fd; cfp->dirind = -1; /* Not known -- or top-level */ cfp->uid = stbuf.st_uid; cfp->envctime = stbuf.st_ctime; cfp->contents = contents; /* cfp->vfpfn = NULL; cfp->spoolid = NULL; cfp->head = NULL; cfp->mark = V_NONE; cfp->haderror = 0; cfp->mid = NULL; cfp->envid = NULL; cfp->logident = NULL; cfp->erroraddr = NULL; cfp->msgbodyoffset = 0; */ /* go through the file and mark it off */ i = 0; offsetspace = 100; offset = (int*)emalloc(sizeof(int)*offsetspace); offset[i++] = 0L; for (s = contents; s - contents < stbuf.st_size; ++s) { if (*s == '\n') { *s++ = '\0'; if (s - contents < stbuf.st_size) { if (i >= offsetspace-1) { offsetspace += 20; offset = (int*)erealloc(offset,sizeof(int)*offsetspace); } offset[i++] = s - contents; if (*s == _CF_MSGHEADERS || *s == _CF_MIMESTRUCT) { /* find a \n\n combination */ while (!(*s == '\n' && *(s+1) == '\n')) if (s-contents < stbuf.st_size) ++s; else break; if (s - contents >= stbuf.st_size) { /* XX: header ran off file */ } } else if (*s == _CF_BODYOFFSET) { cfp->msgbodyoffset = atoi(s+2); } else if (*s == _CF_DSNENVID) { /* cfp->envid = strsave(s+2); */ } else if (*s == _CF_DSNRETMODE) { /* cfp->dsnretmode = strsave(s+2); */ } else if (*s == _CF_MESSAGEID) { /* cfp->mid = strsave(s+2); */ } else if (*s == _CF_LOGIDENT) { /* cfp->logident = strsave(s+2); */ } else if (*s == _CF_ERRORADDR) { /* cfp->erroraddr = strsave(s+2); */ } else if (*s == _CF_TURNME) { /* Umm... it is a bit complex */ /* sfprintf(sfstdout,"An ETRN request for target '%s'\n",s+2); */ } } } } cfp->nlines = i; /* closing fd must be done in vtxprep(), so we can unlock stuff easy */ cfp = (struct ctlfile *)erealloc((void*)cfp, (u_int) (sizeof(struct ctlfile) + i * (sizeof offset[0]))); if (cfp) { /* copy over the offsets */ memcpy(&(cfp->offset[0]), offset, sizeof(offset[0]) * i); cfp->id = ino; /* cfp->mid = NULL; */ /* INC there in every case! */ ++global_wrkcnt; ++MIBMtaEntry->sc.StoredMessagesSc; } else { /* realloc() failed.. */ free(contents); } free(offset); /* release the block */ return cfp; } struct offsort { int offset; int myidx; int headeroffset; int drptoffset; int delayslot; int notifyflg; char *sender; /* char *dsnrecipient; */ time_t wakeup; }; /* ``bcfcn'' is used by the qsort comparison routine, bcp points to the control file bytes */ static char *bcp; static int bcfcn __((const void *, const void *)); static int bcfcn(a, b) const void *a, *b; { const struct offsort *aa = a, *bb = b; return strcmp(bcp + aa->offset, bcp + bb->offset); } static int lockverify(cfp,cp,verbflg) /* Return 1 when lock process does not exist */ struct ctlfile *cfp; /* Call only when the lock is marked active! */ const char *cp; const int verbflg; { char lockbuf[1+_CFTAG_RCPTPIDSIZE]; int lockpid; int sig = 0; #if 0 # ifdef SIGCONT /* OSF/1 acts differently if we use SIGNAL 0 :-( */ sig = SIGCONT; # endif #endif ++cp; if (!(*cp == ' ' || (*cp >= '0' && *cp <= '9'))) return 1; /* Old-style entry */ memcpy(lockbuf,cp,_CFTAG_RCPTPIDSIZE); lockbuf[sizeof(lockbuf)-1] = 0; if (sscanf(lockbuf,"%d",&lockpid) != 1) return 1; /* Bad value ? */ if (kill(lockpid,sig) != 0) return 1; /* PID does not exist, or other error.. */ if (verbflg) sfprintf(sfstderr,"lockverify: Lock with PID=%d is active on %s:%s\n", lockpid, cfp->mid, cp+_CFTAG_RCPTPIDSIZE); return 0; /* Locking PID does exist.. */ } /* * The vtxprep() does deeper analysis on jobs described at the file. * It verifies possible locks (if they are still valid), and gathers * all of the information regarding senders and recipients. * * All "recipient"-lines are sorted to ease searching of vertices with * identical channel, and host definitions. If there are more than one * recipient with given (channel, host)-tuple, all such recipients are * wrapped into same vertex node with its respective ``recipient group''. * */ static struct ctlfile *vtxprep(cfp, file, rereading) struct ctlfile *cfp; const char *file; const int rereading; { register int i, opcnt; register int *lp; int svn; int c_echannel, c_ehost, c_l_echannel, c_l_ehost; char *cp, *channel, *host, *l_channel, *l_host; char *echannel, *ehost, *l_echannel, *l_ehost, mfpath[100], flagstr[2]; char *latest_sender = NULL; char *senderchannel = NULL; struct vertex *vp, *pvp, **pvpp, *head; struct stat stbuf; struct offsort *offarr; int offspc, mypid; int prevrcpt = -1; int is_turnme = 0; time_t wakeuptime; long format = 0; char fpath[128], path[128], path2[128]; if (cfp == NULL) return NULL; mytime(&now); strcpy(fpath, file); channel = host = NULL; /* copy offsets into local array */ offspc = 16; offarr = (struct offsort *)emalloc(offspc * sizeof(struct offsort)); if (!offarr) { /* malloc() failure.. */ sfprintf(sfstderr,"malloc failed, discarding job: '%s'\n", file); cfp_free(cfp, NULL); return NULL; } mypid = getpid(); opcnt = 0; svn = 0; vtxprep_skip = 0; lp = &cfp->offset[0]; for (i = 0; i < cfp->nlines; ++i, ++lp) { cp = cfp->contents + *lp + 1; wakeuptime = 0; if (*cp == _CFTAG_LOCK) { /* * This can happen when we restart the scheduler, and * some previous transporter is still running. * (and DEFINITELY during resync! when we simply ignore locks) */ if (!lockverify(cfp, cp, !rereading)) { #if 0 long ino = 0; /* * IMO we are better off by forgetting for a while that * this spool-file exists at all. Thus very least we * won't errorneously delete it. */ close(cfp->fd); /* Was opened on schedule() */ if (cfp->vfpfn != NULL) { Sfio_t *vfp = vfp_open(cfp); if (vfp) { sfprintf(vfp, "scheduler: Skipped a job-file because it is held locked by PID=%6.6s\n",cp+1); sfclose(vfp); } } cfp_free(cfp, NULL); ++vtxprep_skip; ++vtxprep_skip_lock; free(offarr); /* XXX: Should we dq_insert() this back in ? Or do we let the occasional recursive scanner to check things deeply ? */ cp = strrchr(file,'/'); if (cp) ino = atol(cp+1); else ino = atol(file); dq_insert(NULL, ino, file, 32); return NULL; #else /* We can't simply forget this, we must do something smarter -- We use approach of marking the vertex non-startable until one hour from now. */ wakeuptime = now + 3600; /* 1 hour */ #endif } else { if (*cp != _CFTAG_NORMAL) { *cp = _CFTAG_NORMAL; /* unlock it */ lockaddr(cfp->fd, NULL, (long) (cp - cfp->contents), _CFTAG_LOCK, _CFTAG_NORMAL, cfp->mid, cp+_CFTAG_RCPTPIDSIZE, mypid); } } } /* Calculate summary info */ if (cp[-1] == _CF_RECIPIENT) { ++cfp->rcpnts_total; if (*cp == _CFTAG_NOTOK) { #if 0 ++cfp->rcpnts_failed; ++MIBMtaEntry->sc.ReceivedRecipientsSc, ++MIBMtaEntry->sc.TransmittedRecipientsSc; #endif prevrcpt = -1; } else if (*cp != _CFTAG_OK) { ++cfp->rcpnts_work; } else { /* _CFTAG_OK */ #if 0 ++MIBMtaEntry->sc.ReceivedRecipientsSc, ++MIBMtaEntry->sc.TransmittedRecipientsSc; #endif } } if (*cp == _CFTAG_NORMAL || (rereading && *cp == _CFTAG_LOCK) || *cp == '\n' /* This appears for msg-header entries.. */ ) { ++cp; switch (cp[-2]) { case _CF_FORMAT: sscanf(cp,"%li",&format); if (format & (~_CF_FORMAT_KNOWN_SET)) { sfprintf(sfstderr,"%s: ** FILE: '%s' has unknown/unsupported format set: 0x%08lx !\n", progname, file, format); cfp_free(cfp, NULL); free(offarr); return NULL; } cfp->format = format; break; case _CF_SENDER: while (*cp == ' ') ++cp; senderchannel = cp; while (*cp != 0 && *cp != ' ') ++cp; if (*cp == ' ') *cp++ = 0; while (*cp == ' ') ++cp; /* Scan over the sender 'host' */ cp = skip821address(cp); while (*cp == ' ') ++cp; /* Scan over the sender 'user' */ latest_sender = cp; cp = skip821address(cp); *cp = 0; if (cfp->erroraddr) free(cfp->erroraddr); cfp->erroraddr = strsave(latest_sender); if (strcmp(senderchannel,"error")==0) { cfp->iserrmesg = 1; cfp->erroraddr[0] = 0; /* Make it '<>' */ } break; case _CF_RECIPIENT: if (opcnt >= offspc-1) { offspc *= 2; offarr = (struct offsort *)erealloc(offarr, sizeof(struct offsort) * offspc); if (!offarr) { /* realloc() failure.. */ sfprintf(sfstderr,"realloc() failed, discarding job: '%s'\n", file); cfp_free(cfp, NULL); return NULL; } } offarr[opcnt].offset = *lp + 2; strlower(cp); if ((format & _CF_FORMAT_TA_PID) || *cp == ' ' || (*cp >= '0' && *cp <= '9')) { /* New PID locking scheme.. */ offarr[opcnt].offset += _CFTAG_RCPTPIDSIZE; cp += _CFTAG_RCPTPIDSIZE; } if ((format & _CF_FORMAT_DELAY1) || *cp == ' ' || (*cp >= '0' && *cp <= '9')) { /* Newer DELAY data slot - _CFTAG_RCPTDELAYSIZE bytes */ offarr[opcnt].delayslot = offarr[opcnt].offset; offarr[opcnt].offset += _CFTAG_RCPTDELAYSIZE; cp += _CFTAG_RCPTDELAYSIZE; } else offarr[opcnt].delayslot = 0; offarr[opcnt].wakeup = wakeuptime; offarr[opcnt].myidx = i; offarr[opcnt].headeroffset = -1; offarr[opcnt].drptoffset = -1; offarr[opcnt].sender = latest_sender; offarr[opcnt].notifyflg = NOT_FAILURE; prevrcpt = opcnt; ++opcnt; break; case _CF_RCPTNOTARY: /* IETF-NOTARY-DRPT+NOTIFY DATA! */ if (prevrcpt >= 0) { offarr[prevrcpt].drptoffset = *lp + 2; /* Lets parse the input, we want NOTIFY= flags */ while (*cp != 0) { while (*cp == ' ' || *cp == '\t') ++cp; if (CISTREQN("NOTIFY=",cp,7)) { offarr[prevrcpt].notifyflg = 0; cp += 7; while (*cp) { if (CISTREQN(cp,"NEVER",5)) { cp += 5; offarr[prevrcpt].notifyflg |= NOT_NEVER; } else if (CISTREQN(cp,"DELAY",5)) { cp += 5; offarr[prevrcpt].notifyflg |= NOT_DELAY; } else if (CISTREQN(cp,"FAILURE",7)) { cp += 7; offarr[prevrcpt].notifyflg |= NOT_FAILURE; } else if (CISTREQN(cp,"SUCCESS",7)) { cp += 7; offarr[prevrcpt].notifyflg |= NOT_SUCCESS; } else if (CISTREQN(cp,"TRACE", 5)) { cp += 5; offarr[prevrcpt].notifyflg |= NOT_TRACE; } else { break; /* Burp ?? */ } if (*cp == ',') ++cp; } } else if (CISTREQN("BY=",cp,7)) { /* FIXME: Parse the BY= parameter! */ while (*cp && *cp != ' ' && *cp != '\t') ++cp; } else { while (*cp && *cp != ' ' && *cp != '\t') ++cp; } } } prevrcpt = -1; /* Add ONCE! */ break; case _CF_MSGHEADERS: for (/* we count up.. */; svn < opcnt; ++svn) offarr[svn].headeroffset = *lp + 2; break; case _CF_DSNRETMODE: if (cfp->dsnretmode) free(cfp->dsnretmode); cfp->dsnretmode = strsave(cp); break; case _CF_DSNENVID: if (cfp->envid) free(cfp->envid); /* shouldn't happen.. */ cfp->envid = strsave(cp); break; case _CF_DIAGNOSTIC: cfp->haderror = 1; break; case _CF_MESSAGEID: if (cfp->mid != NULL) free(cfp->mid); /* shouldn't happen.. */ cfp->mid = strsave(cp); break; case _CF_BODYOFFSET: cfp->msgbodyoffset = atoi(cp); break; case _CF_LOGIDENT: if (cfp->logident) free(cfp->logident); /* shouldn't happen..*/ cfp->logident = strsave(cp); break; case _CF_ERRORADDR: if (cfp->erroraddr) free(cfp->erroraddr); /* could happen */ cfp->erroraddr = strsave(cp); break; case _CF_OBSOLETES: deletemsg(cp, cfp); break; case _CF_VERBOSE: cfp->vfpfn = strsave(cp); break; case _CF_TURNME: sfprintf(sfstdout,"%s ETRN: %s\n", timestring(), cp); strlower(cp); turnme(cp); is_turnme = 1; break; } } } close(cfp->fd); /* closes the fd opened in schedule() */ if (fpath[0] >= 'A') { /* Prefixed with a subdir path */ int hash = 0; char *s = fpath; while (*s >= 'A' && *s <= 'Z') { hash <<= 8; hash |= (*s) & 0xff; ++s; if (*s != '/') break; ++s; } cfp->dirind = hash; } else { /* Not at subdirs */ if (cfp->mid && hashlevels > 0) { /* We have a desire to use subdirs, now the magic of hashing into subdirs... inode number ? */ int hash, rc; if (hashlevels > 1) { hash = atol(cfp->mid) % (26*26); hash = (('A' + hash / 26) << 8) | ('A' + hash % 26); } else { hash = (atol(cfp->mid) % 26) + 'A'; } cfp->dirind = hash; /* Ok, we have the hash values, now move the file to match with our hashes.. */ sprintf(path, "%s%s", cfpdirname(hash), fpath); while ((rc = rename(fpath, path)) != 0) { if (errno != EINTR) break; } if (rc != 0) { /* Failed, why ? */ if (errno != ENOENT) { /* For any other than 'no such (target) directory' */ cfp->dirind = -1; } else { cfp_mksubdirs("",cfpdirname(hash)); while ((rc = rename(fpath, path)) != 0) { if (errno != EINTR) break; } if (rc != 0) cfp->dirind = -1; /* Failed for any reason */ else strcpy(fpath, path); } } else strcpy(fpath, path); if (cfp->dirind >= 0) { /* Successfully renamed the transport file to a subdir, now do the same to the queue directory! */ sprintf(path, "../%s/%s", QUEUEDIR, cfp->mid); sprintf(path2, "../%s/%s%s", QUEUEDIR, cfpdirname(cfp->dirind), cfp->mid); while ((rc = rename(path, path2)) != 0) { if (errno != EINTR) break; } if (rc != 0) { if (errno == ENOENT) { /* No dirs ?? */ cfp_mksubdirs(QUEUEDIR,cfpdirname(hash)); while ((rc = rename(path, path2)) != 0) { if (errno != EINTR) break; } } } /* If failed, it will be reported below */ } } } if (cfp->mid != NULL) { if (cfp->dirind >= 0) sprintf(mfpath, "../%s/%s%s", QUEUEDIR, cfpdirname(cfp->dirind), cfp->mid); else sprintf(mfpath, "../%s/%s", QUEUEDIR, cfp->mid); } if (cfp->mid == NULL || cfp->logident == NULL || estat(mfpath, &stbuf) < 0) { if (!is_turnme) { Sfio_t *vfp = vfp_open(cfp); if (vfp) { sfprintf(vfp, "aborted due to missing information\n"); sfclose(vfp); } } cfp_free(cfp, NULL); free(offarr); return NULL; } cfp->mtime = stbuf.st_mtime; /* instead of ctime, we use mtime at the queue-dir accesses, this way we can move the spool to another machine, and run things in there, and still have same expiration times.. */ /* Reuse the buffer ... Generate same spoolid string that all other subsystems also report to syslog. */ taspoolid(path2, cfp->mtime, cfp->id); cfp->spoolid = strsave(path2); if (!cfp->spoolid) { /* malloc() failure.. */ sfprintf(sfstderr,"malloc() failed, discarding job: '%s'\n", file); cfp_free(cfp, NULL); free(offarr); return NULL; } cfp->fd = -1; /* sort it to get channels and channel/hosts together */ bcp = cfp->contents; if (opcnt > 1) qsort((char *)offarr, opcnt, sizeof (struct offsort), bcfcn); /* * Loop through them; whenever either channel or host changes, * make a new vertex. All such vertices must be linked together. */ strcpy(flagstr," "); l_channel = l_echannel = l_host = l_ehost = flagstr; c_echannel = c_ehost = c_l_echannel = c_l_ehost = 0; svn = 0; pvp = NULL; head = NULL; pvpp = &head; for (i = 0; i < opcnt; ++i) { channel = bcp + offarr[i].offset /* - 2 */; while (*channel == ' ') ++channel; /* Skip possible space(s) */ host = echannel = skip821address(channel); c_l_echannel = *host; if (c_l_echannel) *host++ = '\0'; while (*host == ' ') ++host; /* Skip space(s) */ #if 1 /* [mea] channel ((mx.target)(mx.target mx.target)) rest.. */ cp = host; if (*cp == '(') { while(*cp == '(') ++cp; while(*cp && *cp != ' ' && *cp != '\t' && *cp != ')') ++cp; if (*cp) ehost = cp; else continue; /* error! */ /* Ok, found ehost, now parsing past parenthesis.. */ cp = host; while(*host == '(') ++host; if (*cp == '(') { ++cp; while(*cp == '(') { /* Find ending ')', and skip it */ while(*cp && *cp != ')') ++cp; if (*cp == ')') ++cp; } /* Ok, scanned past all inner parenthesis, now ASSUME next one is ending outer parenthesis, and skip it */ ++cp; } if (*cp != ' ' && *cp != '\t') continue; /* Not proper separator.. error! */ } else #endif { ehost = skip821address(host); strlower(host); if (ehost == NULL || ehost == host) /* error! */ continue; } c_ehost = *ehost; *ehost = '\0'; /* compare with the last ones */ if (strcmp(channel, l_channel) || strcmp(host, l_host)) { /* wrap and tie the old vertex node */ if (i != svn) { u_int alloc_size = (u_int) (sizeof (struct vertex) + (i - svn - 1) * sizeof (int)); vp = (struct vertex *)emalloc(alloc_size); if (!vp) { /* malloc() failure.. */ sfprintf(sfstderr,"malloc() failed, discarding job: '%s'\n", file); cfp_free(cfp, NULL); free(offarr); return NULL; } MIBMtaEntry->sc.StoredVerticesSc += 1; memset((char*)vp, 0, alloc_size); vp->cfp = cfp; vp->next[L_CTLFILE] = NULL; vp->prev[L_CTLFILE] = pvp; #if 0 vp->thread = NULL; vp->web[L_CTLFILE] = NULL; vp->message = NULL; vp->retryindex = 0; vp->nextitem = NULL; vp->previtem = NULL; vp->proc = NULL; vp->attempts = 0; vp->notary = NULL; #endif vp->ngroup = i - svn; MIBMtaEntry->sc.StoredRecipientsSc += (i - svn); MIBMtaEntry->sc.ReceivedRecipientsSc += (i - svn); /* vp->sender = strsave(offarr[svn].sender); */ vp->wakeup = offarr[svn].wakeup; vp->headeroffset = offarr[svn].headeroffset; /*They are similar*/ vp->drptoffset = offarr[svn].drptoffset; vp->notaryflg = offarr[svn].notifyflg; while (svn < i) { vp->index[i-svn-1] = offarr[svn].myidx; ++svn; } *pvpp = vp; pvpp = &vp->next[L_CTLFILE]; pvp = vp; link_in(L_HOST, vp, l_host); link_in(L_CHANNEL, vp, l_channel); } /* create a new vertex node */ svn = i; } /* stick the current 'r'ecipient line into the current vertex */ /* restore the characters */ *l_echannel = c_l_echannel; *l_ehost = c_l_ehost; l_echannel = echannel; l_ehost = ehost; c_l_echannel = c_echannel; c_l_ehost = c_ehost; l_channel = channel; l_host = host; } /* for( .. i < opcnt .. ) */ /* wrap and tie the old vertex node (this is a copy of code above) */ if (i != svn) { u_int alloc_size = (u_int) (sizeof (struct vertex) + (i - svn - 1) * sizeof (int)); vp = (struct vertex *)emalloc(alloc_size); if (!vp) { /* malloc() failure.. */ sfprintf(sfstderr,"malloc() failed, discarding job: '%s'\n", file); cfp_free(cfp, NULL); free(offarr); return NULL; } MIBMtaEntry->sc.StoredVerticesSc += 1; memset((void*)vp, 0, alloc_size); vp->cfp = cfp; vp->next[L_CTLFILE] = NULL; vp->prev[L_CTLFILE] = pvp; #if 0 vp->message = NULL; vp->retryindex = 0; vp->nextitem = NULL; vp->previtem = NULL; vp->proc = NULL; vp->attempts = 0; vp->notary = NULL; #endif vp->ngroup = i - svn; MIBMtaEntry->sc.StoredRecipientsSc += (i - svn); MIBMtaEntry->sc.ReceivedRecipientsSc += (i - svn); /* vp->sender = strsave(offarr[snv].sender); */ vp->wakeup = offarr[svn].wakeup; vp->headeroffset = offarr[svn].headeroffset; /* Just any of them will do */ vp->drptoffset = offarr[svn].drptoffset; vp->notaryflg = offarr[svn].notifyflg; while (svn < i) { vp->index[i-svn-1] = offarr[svn].myidx; ++svn; } *pvpp = vp; pvpp = &vp->next[L_CTLFILE]; pvp = vp; link_in(L_HOST, vp, host); link_in(L_CHANNEL, vp, channel); } *l_echannel = c_l_echannel; *l_ehost = c_l_ehost; /* for (vp = head; vp != NULL; vp = vp->next[L_CTLFILE]) { sfprintf(sfstdout,"--\n"); for (i = 0; i < vp->ngroup; ++i) sfprintf(sfstdout,"\t%s\n", cfp->contents+cfp->offset[vp->index[i]]); } */ cfp->head = head; free(offarr); if (verbose) { int completed = cfp->rcpnts_total - cfp->rcpnts_work - cfp->rcpnts_failed; sfprintf(sfstdout,"vtxprep: msg %s rcptns total %d work %d failed %d done %d\n vertices:", cfp->mid, cfp->rcpnts_total, cfp->rcpnts_work, cfp->rcpnts_failed, completed ); vp = cfp->head; for (vp = cfp->head; vp; vp = vp->next[L_CTLFILE]) { sfprintf(sfstdout," %p", vp); } sfprintf(sfstdout,"\n"); } { Sfio_t *vfp = vfp_open(cfp); if (vfp) { if (cfp->mid) sfprintf(vfp, "scheduler processing %s\n", cfp->mid); sfclose(vfp); } } return cfp; } /* * The vtxmatch() is a subroutine to vtxdo() matching for * scheduler definition entries from the scheduler configuration table. * */ static int vtxmatch(vp, tp) struct vertex *vp; struct config_entry *tp; { /* if the channel doesn't match, there's no hope! */ if (verbose>1) sfprintf(sfstdout,"ch? %s %s\n", vp->orig[L_CHANNEL]->name, tp->channel); if (tp->channel[0] == '*' && tp->channel[1] == '\0') return 0; /* Never match the defaults entry! */ if (!globmatch(tp->channel, vp->orig[L_CHANNEL]->name)) return 0; if (!(tp->host == NULL || tp->host[0] == '\0' || (tp->host[0] == '*' && tp->host[1] == '\0'))) { if (!globmatch(tp->host, vp->orig[L_HOST]->name)) return 0; } if (verbose>1) sfprintf(sfstdout,"host %s %s\n", vp->orig[L_HOST]->name, tp->host); return 1; } static void ce_fillin __((struct threadgroup *, struct config_entry *)); static void ce_fillin(thg,cep) struct threadgroup *thg; struct config_entry *cep; { struct config_entry *ce = &(thg->ce); defaultconfigentry( ce,cep ); if (cep->interval != -1) ce->interval = cep->interval; if (cep->idlemax != -1) ce->idlemax = cep->idlemax; if (cep->expiry != -1) ce->expiry = cep->expiry; if (cep->expiry2 != -1) ce->expiry2 = cep->expiry2; ce->expiryform = cep->expiryform; if (cep->uid != -1) ce->uid = cep->uid; if (cep->gid != -1) ce->gid = cep->gid; if (cep->maxkids != -1) ce->maxkids = cep->maxkids; if (cep->maxkidChannel != -1) ce->maxkidChannel = cep->maxkidChannel; if (cep->maxkidThreads != -1) ce->maxkidThreads = cep->maxkidThreads; if (cep->nretries > 0) { ce->nretries = cep->nretries; ce->retries = cep->retries; } if (cep->command != NULL) { ce->command = cep->command; ce->argv = cep->argv; } ce->flags |= cep->flags; /* XX: Grumble.. additive only.. */ ce->host = cep->host; if (cep->skew > 0) ce->skew = cep->skew; if (ce->interval == -1) ce->interval = 3600; if (ce->idlemax == -1) ce->idlemax = ce->interval * 3; if (ce->maxkids == -1) ce->maxkids = global_maxkids; if (ce->maxkidChannel == -1) ce->maxkidChannel = global_maxkids; if (ce->maxkidThreads == -1) ce->maxkidThreads = global_maxkids; } /* * The vtxdo() tries thru all scheduler configuration entries * looking for a matching one which to fill in for the input vertex. * * In the end it calls reschedule() to place the vertex into scheduling * queues (threads) * */ static void vtxdo(vp, cehdr, path) struct vertex *vp; struct config_entry *cehdr; const char *path; { struct config_entry *tp; int n; int cnt = 0; /* * go through scheduler control file entries and * fill in the blanks in the vertex specification */ n = 0; for (tp = cehdr; tp != NULL; tp = tp->next) { ++cnt; if (vtxmatch(vp, tp)) { /* tp points to the selected config entry */ n = 1; break; } } if (n == 0) { sfprintf(sfstderr, "%s: no pattern matched %s/%s address %s%s\n", progname, vp->orig[L_CHANNEL]->name,vp->orig[L_HOST]->name, path ? "file=":"", path); /* XX: memory leak here? */ return; } if (verbose) sfprintf(sfstdout, "Matched %dth config entry with: %s/%s\n", cnt, vp->orig[L_CHANNEL]->name, vp->orig[L_HOST]->name); /* set default values */ if (tp->expiry > 0) vp->ce_expiry = tp->expiry + vp->cfp->mtime; else vp->ce_expiry = 0; if (tp->expiry2 > 0 && tp->expiry > 0) vp->ce_expiry2 = tp->expiry2 + vp->ce_expiry; else vp->ce_expiry2 = /* vp->ce_expiry */ 0; if (vp->ce_expiry2 < vp->ce_expiry) vp->ce_expiry2 = vp->ce_expiry + 24*3600; if (tp->reporttimes[0]) vp->nextrprttime = now + tp->reporttimes[0]; else vp->nextrprttime = now + global_report_interval; thread_linkin(vp,tp,cnt,ce_fillin); if (verbose>1) vtxprint(vp); } int vtxredo(spl) struct spblk *spl; { struct ctlfile *cfp = (struct ctlfile *)spl->data; struct vertex *vp; /* assert cfp != NULL */ for (vp = cfp->head ; vp != NULL ; vp = vp->next[L_CTLFILE]) { vtxdo(vp, rrcf_head, NULL); } return 0; } /* Shell-GLOB-style matching */ static int globmatch(pattern, string) register const char *pattern; register const char *string; { while (1) { switch (*pattern) { case '{': { const char *p = pattern+1; const char *s = string; /* This matches at the END of the pattern: '*.{fii,foo,faa}' */ for ( ; *p != 0 && *p != '}'; ++p) { if (*p == ',') { if (*s == '\0') return 1; /* We have MATCH! */ s = string; continue; } if (*s != *p) { /* Not the same .. */ s = string; /* Ok, perhaps next pattern segment ? */ while (*p != '\0' && *p != '}' && *p != ',') ++p; if (*p != ',') return 0; /* No next pattern ? We definitely have no match! */ continue; } if (*s != 0) ++s; } if (*p == '\0' || *p == '}') if (*s == 0) return 1; return 0; } break; case '*': ++pattern; if (*pattern == 0) { /* pattern ended with '*', we can accept any string trail.. */ return 1; } /* We do 'common case' optimization here, but will loose some performance, if somebody gives '*foo*' as a pattern.. */ { const char *p = pattern; int i = 0, c; while ((c = *p++) != 0) { /* Scan for special chars in pattern.. */ if (c == '*' || c == '[' || c == '{' || c == '\\' || c == '?'){ i = 1; /* Found! */ break; } } if (!i) { /* No specials, match from end of string */ int len = strlen(string); i = strlen(pattern); if (i > len) return 0; /* Tough.. pattern longer than string */ if (memcmp(string + len - i, pattern, i) == 0) return 1; /* MATCH! */ } } do { if (globmatch(pattern, string)) return 1; } while (*string++ != '\0'); return 0; case '\\': ++pattern; if (*pattern == 0 || *pattern != *string) return 0; break; case '[': if (*string == '\0') return 0; if (*(pattern+1) == '^') { ++pattern; while ((*++pattern != ']') && (*pattern != *string)) if (*pattern == '\0') return 0; if (*pattern != ']') return 0; string++; break; } while ((*++pattern != ']') && (*pattern != *string)) if (*pattern == '\0') return 0; if (*pattern == ']') return 0; while (*pattern++ != ']') if (*pattern == '\0') return 0; string++; break; case '?': ++pattern; if (*string++ == '\0') return 0; break; case '\0': return (*string == '\0'); default: if (*pattern++ != *string++) return 0; } } } /* * This routine links a group of addresses (described by what vp points at) * into the Tholian Web (err, our matrix). The flag (either L_HOST or * L_CHANNEL) selects a namespace of strings pointed at by s. It just * happens we need 2 (host and channel names), and we don't care what * they are as long as they are in separate spaces. */ static void link_in(flag, vp, s) int flag; struct vertex *vp; const char *s; { struct web *wp = web_findcreate(flag,s); wp->linkcnt += 1; vp->next[flag] = NULL; vp->prev[flag] = wp->lastlink; vp->orig[flag] = wp; if (wp->lastlink != NULL) wp->lastlink->next[flag] = vp; wp->lastlink = vp; if (wp->link == NULL) wp->link = vp; } /* * time-string to be prepended to logged messages */ char * timestring() { static char timebuf[40]; struct tm *tp; mytime(&now); tp = localtime(&now); sprintf(timebuf,"%d%02d%02d%02d%02d%02d", tp->tm_year+1900, tp->tm_mon+1, tp->tm_mday, tp->tm_hour, tp->tm_min, tp->tm_sec); return timebuf; } static int running_on_alarm; static time_t alarm_time; static RETSIGTYPE sig_alarm(sig) int sig; { time(&alarm_time); SIGNAL_HANDLE(SIGALRM, sig_alarm); alarm(1); } static int timeserver_pid = 0; static void timer_health_check __((void)) { /* XXX: TODO! TIMER HEALTH CHECK */ if (timeserver_pid > 0) { } if (running_on_alarm) { } } #if defined(HAVE_MMAP) struct timeserver *timeserver_segment = NULL; static void init_timeserver() { int ppid; if ( Z_SHM_MIB_is_attached() ) { timeserver_segment = & MIBMtaEntry->ts; } else { /* SHARED MIB MEMORY BLOCK IS NOT ATTACHED */ #define MAPSIZE 16*1024 /* Just in case other methods didn't work.. e.g. that MIB-block.. */ #if !defined(MAP_ANONYMOUS) || defined(__linux__) /* must have a file ? (SunOS 4.1, et.al.) */ int fd = -1; char blk[1024]; int i; Sfio_t *fp = sftmp(0); /* Create the backing file fairly reliably. */ if (fp) fd = sffileno(fp); if (fd < 0) { perror("init_timeserver() didn't create tempfile ??"); return; /* Brr! */ } memset(blk,0,sizeof(blk)); for (i=0; i < MAPSIZE; i += sizeof(blk)) sfwrite(fp, blk, sizeof(blk)); sfsync(fp); sfseek(fp, 0, 0); #ifndef MAP_FILE # define MAP_FILE 0 /* SunOS 4.1 does not have this */ #endif timeserver_segment = (void*)mmap(NULL, MAPSIZE, PROT_READ|PROT_WRITE, MAP_FILE|MAP_SHARED, fd, 0); sfclose(fp); #else /* MAP_ANONYMOUS and not Linux */ #ifndef MAP_VARIABLE # define MAP_VARIABLE 0 /* Some system have MAP_ANONYMOUS, but no MAP_VARIABLE.. */ #endif /* This MAP_ANONYMOUS does work at DEC OSF/1 ... */ timeserver_segment = (void*)mmap(NULL, MAPSIZE, PROT_READ|PROT_WRITE, MAP_VARIABLE|MAP_ANONYMOUS|MAP_SHARED, -1, 0); #endif if (-1L == (long)timeserver_segment || timeserver_segment == NULL) { perror("mmap() of timeserver segment gave"); timeserver_segment = NULL; return; /* Brr.. */ } } /* Have global shared server segment, or not ... */ ppid = fork(); if (ppid > 0) { time_t now2; timeserver_pid = ppid; timeserver_segment->pid = ppid; time(&now); sleep(3); #ifdef HAVE_SELECT now2 = timeserver_segment->tv.tv_sec; #else now2 = timeserver_segment->time_sec; #endif if (now2 == 0 || now2 == now) { /* HUH ?! No incrementation in 3 seconds ?? */ /* Start ticking in alarm() mode! */ kill (ppid, SIGTERM); timeserver_pid = 0; timeserver_segment = NULL; alarm(1); } return; } if (ppid < 0) return; /* Error ?? brr.. */ /* ======== TIME SERVER CHILD ======= */ { int space_check_count = 0; MIBMtaEntry->sc.schedulerTimeserverStartTime = time(NULL); MIBMtaEntry->sc.schedulerTimeserverStarts ++; #ifdef HAVE_SETPROCTITLE setproctitle("[TimeServer]"); #else strncpy(ArgvSave,"[TimeServer]", EOArgvSave - ArgvSave); #endif if (ArgvSave < EOArgvSave) ArgvSave[EOArgvSave-ArgvSave-1] = 0; ppid = getppid(); /* who is our parent ? */ #ifdef HAVE_SELECT gettimeofday(×erver_segment->tv, NULL); #else time(×erver_segment->time_sec); #endif for(;;) { #ifdef HAVE_SELECT struct timeval tv; int rc; tv.tv_sec = 0; tv.tv_usec = 300000; gettimeofday(×erver_segment->tv, NULL); rc = select(0,NULL,NULL,NULL,&tv); #else time(×erver_segment->time_sec); sleep(1); #endif /* Is the parent still alive ?? */ if (kill(ppid, 0) != 0) break; /* No ?? Out! */ /* Now check and fill in the filesystem free space gauges */ if (--space_check_count < 0) { long spc = Z_SHM_FileSysFreeSpace(); MIBMtaEntry->sys.SpoolFreeSpace = spc; /* Log FS space is monitored by the self-timed log reinit code. */ #ifdef HAVE_SELECT space_check_count = 100; /* ticks about 3 times a second */ #else space_check_count = 30; /* ticks once a second.. */ #endif } } } /* time-server child block */ _exit(0); } #else static void init_timeserver() { /* Do nothing.. */ } #endif time_t mytime(timep) time_t *timep; { static int healthcheck = 10000; if (--healthcheck <= 0) { healthcheck = 10000; timer_health_check(); } #if defined(HAVE_MMAP) if (timeserver_pid) { time_t t; #ifdef HAVE_SELECT t = timeserver_segment->tv.tv_sec; #else t = timeserver_segment->time_sec; #endif if (timep != NULL) *timep = t; return t; } #endif if (running_on_alarm) { if (timep) *timep = alarm_time; return alarm_time; } #ifdef HAVE_GETTIMEOFDAY { struct timeval tv; gettimeofday(&tv,NULL); if (timep) *timep = tv.tv_sec; return tv.tv_sec; } #else return time(timep); /* The classical old version.. */ #endif } const char * cfpdirname(hash) int hash; { static char dirbuf[8]; if (hash < 0) return ""; if (hash < 256) { sprintf(dirbuf, "%c/", hash); } else { sprintf(dirbuf, "%c/%c/", (hash >> 8) & 0xff, hash & 0xff); } return dirbuf; } /* We make one, or at most two subdirs */ void cfp_mksubdirs(topdir,newpath) const char *topdir, *newpath; { char path[256]; int omask = umask(022); char *s; if (*topdir != 0) sprintf(path, "../%s/%s", topdir, newpath); else strcpy(path, newpath); s = strrchr(path,'/'); if (s[1] == 0) *s = 0; /* Trailing slash elimination */ if (mkdir(path,02755) != 0) { char *s = strrchr(path,'/'); if (s) *s = 0; mkdir(path,02755); if (s) *s = '/'; mkdir(path,02755); } umask(omask); }