/* * Copyright 1988 by Rayan S. Zachariassen, all rights reserved. * This will be free software, but only when it is finished. * Some functions Copyright 1991-2002 Matti Aarnio. */ /* * This file contains "daemon" command callable from configuration * script, along with its support facilities. */ #include "mailer.h" #include #include #ifdef HAVE_UNISTD_H # include #endif #include #include #include /* O_RDONLY for run_praliases() */ #include /* for run_homedir() */ #include /* for run_grpmems() */ #include #ifdef HAVE_SYS_UN_H #include #include #endif #include "zsyslog.h" #include "shmmib.h" #include "sysexits.h" extern struct MIB_MtaEntry *MIBMtaEntry; #ifdef HAVE_DIRENT_H # include #else /* not HAVE_DIRENT_H */ # define dirent direct # ifdef HAVE_SYS_NDIR_H # include # endif /* HAVE_SYS_NDIR_H */ # ifdef HAVE_SYS_DIR_H # include # endif /* HAVE_SYS_DIR_H */ # ifdef HAVE_NDIR_H # include # endif /* HAVE_NDIR_H */ #endif /* HAVE_DIRENT_H */ #ifdef HAVE_DIRFD # define Z_DIRFD(dir) dirfd(dir) #else # ifdef USE_DIRFD_DD_FD # define Z_DIRFD(dir) ((dir).dd_fd) # else # ifdef USE_DIRFD_D_FD # define Z_DIRFD(dir) ((dir).dd_fd) # endif # endif #endif #ifdef HAVE_SYS_RESOURCE_H #ifdef linux # define _USE_BSD #endif #include #endif #include "zmsignal.h" #include "zsyslog.h" #include "mail.h" #include "interpret.h" #include "io.h" #include "libz.h" #include "libc.h" #include "prototypes.h" #ifdef HAVE_LOCKF #ifdef F_OK #undef F_OK #endif /* F_OK */ #ifdef X_OK #undef X_OK #endif /* X_OK */ #ifdef W_OK #undef W_OK #endif /* W_OK */ #ifdef R_OK #undef R_OK #endif /* R_OK */ #endif /* HAVE_LOCKF */ #ifndef _IOFBF #define _IOFBF 0 #endif /* !_IOFBF */ extern const char *traps[]; extern time_t time __((time_t *)); extern int routerdirloops; #ifndef strchr extern char *strchr(), *strrchr(); #endif extern void free_gensym __((void)); static int gothup = 0; static RETSIGTYPE sig_exit __((int)); static RETSIGTYPE sig_exit(sig) int sig; { if (canexit) { #ifdef MALLOC_TRACE dbfree(); zshfree(); #endif /* MALLOC_TRACE */ die(0, "signal"); } mustexit = 1; /* no need to reenable signal in USG, once will be enough */ } RETSIGTYPE sig_hup(sigarg) int sigarg; { gothup = 1; /* fprintf(stderr,"HUP\n"); */ SIGNAL_HANDLE(SIGHUP, sig_hup); } static void dohup __((int)); static void dohup(sig) int sig; { gothup = 0; if (traps[SIGHUP] != NULL) eval(traps[SIGHUP], "trap", NULL, NULL); } /* * Run the Router in Daemon mode. */ /* DIRQUEUE structure forward definition */ struct dirqueue; /* * We run in multiple processes mode; much of the same how scheduler * does its magic: Children send log messages, and hunger announcements * to the master, and the master feeds jobs to the children. * Minimum number of child processes started is 1. */ #define MAXROUTERCHILDS 40 struct router_child { int tochild; int fromchild; int childpid; int hungry; char *linebuf; int linespace; int linelen; char childline[512]; int childsize, childout; char readbuf[512]; int readsize, readout; int statloc; #if defined(HAVE_SYS_RESOURCE_H) struct rusage r; #endif struct dirqueue *dq; u_long task_ino; }; struct router_child routerchilds[MAXROUTERCHILDS]; extern int nrouters; extern const char *logfn; /* ../scheduler/pipes.c */ extern int pipes_create __((int *tochild, int *fromchild)); extern void pipes_close_parent __((int *tochild, int *fromchild)); extern void pipes_to_child_fds __((int *tochild, int *fromchild)); extern void pipes_shutdown_child __((int tochild)); /* At parent, shutdown channel towards child */ /* ../scheduler/resources.c */ extern int resources_query_nofiles __((void)); extern void resources_maximize_nofiles __((void)); extern void resources_limit_nofiles __((int nfiles)); extern int resources_query_pipesize __((int fildes)); static void child_server __((int tofd, int frmfd)); static int rd_doit __((const char *filename, const char *dirs)); static int parent_reader __((int waittime)); static void notify_reader __((int sock)); static int notifysocket = -1; static time_t notifysocket_reinit = 1; static void notifysock_init() { while (notifysocket < 0) { struct sockaddr_un sad; int on = 1; char *notifysock; int oldumask; notifysock = (char *)getzenv("ROUTERNOTIFY"); if (!notifysock) { notifysocket_reinit = 0; return; } #ifdef AF_UNIX memset(&sad, 0, sizeof(sad)); sad.sun_family = AF_UNIX; strncpy(sad.sun_path, notifysock, sizeof(sad.sun_path)); sad.sun_path[ sizeof(sad.sun_path)-1 ] = 0; notifysocket = socket(PF_UNIX, SOCK_DGRAM, 0); if (notifysocket < 0) { perror("notifysocket: socket(PF_UNIX)"); break; } setsockopt(notifysocket, SOL_SOCKET, SO_REUSEADDR, (void*)&on, sizeof(on)); /* In case that one already exists.. */ unlink(sad.sun_path); oldumask = umask(0555); if (bind(notifysocket, (struct sockaddr *)&sad, sizeof sad) < 0) { perror("bind:UNIX notify socket"); umask(oldumask); close(notifysocket); notifysocket = -1; break; } umask(oldumask); fd_nonblockingmode(notifysocket); #if defined(F_SETFD) fcntl(notifysocket, F_SETFD, 1); /* close-on-exec */ #endif #endif /* AF_UNIX */ break; } if (notifysocket < 0) { time(¬ifysocket_reinit); notifysocket_reinit += 10; } else notifysocket_reinit = 0; } static int start_child __((int idx)); static int start_child (i) const int i; { int pid; int tofd[2], frmfd[2]; if (pipes_create(tofd, frmfd) < 0) return -1; /* D'uh :-( */ pid = fork(); if (pid == 0) { /* child */ int idx; pipes_to_child_fds(tofd,frmfd); for (idx = resources_query_nofiles(); idx >= 3; --idx) close(idx); #if 0 resources_maximize_nofiles(); #endif zcloselog(); /* Each (sub-)process does openlog() all by themselves */ zopenlog("router", LOG_PID, LOG_MAIL); child_server(0, 1); exit(0); } else if (pid < 0) { /* fork failed - yell and forget it! */ close(tofd[0]); close(tofd[1]); close(frmfd[0]); close(frmfd[1]); fprintf(stderr, "router: start_child(): Fork failed!\n"); return -1; } /* Parent */ MIBMtaEntry->rt.RouterProcesses += 1; MIBMtaEntry->rt.RouterProcessForks += 1; pipes_close_parent(tofd,frmfd); fd_nonblockingmode(tofd[1]); fd_nonblockingmode(frmfd[0]); routerchilds[i].tochild = tofd[1]; routerchilds[i].fromchild = frmfd[0]; routerchilds[i].childpid = pid; routerchilds[i].hungry = 0; routerchilds[i].childsize = 0; routerchilds[i].childout = 0; return 0; } /* * Catch each child-process death, and reap them.. */ RETSIGTYPE sig_chld(signum) int signum; { int pid; int i; int statloc; #ifdef HAVE_SYS_RESOURCE_H struct rusage r; #endif for (;;) { #ifdef HAVE_SYS_RESOURCE_H memset(&r, 0, sizeof(r)); #endif #ifdef HAVE_WAIT4 #ifdef HAVE_SYS_RESOURCE_H pid = wait4(-1, &statloc, WNOHANG, &r); #else pid = wait4(-1, &statloc, WNOHANG, NULL); #endif #else #ifdef HAVE_WAIT3 #ifdef HAVE_SYS_RESOURCE_H pid = wait3(&statloc, WNOHANG, &r); #else pid = wait3(&statloc, WNOHANG, NULL); #endif #else #ifdef HAVE_WAITPID pid = waitpid(-1, &statloc, WNOHANG); #else pid = wait(&statloc); #endif #endif #endif if (pid <= 0) break; for (i = 0; i < MAXROUTERCHILDS; ++i) if (pid == routerchilds[i].childpid) { routerchilds[i].childpid = -pid; routerchilds[i].statloc = statloc; #if defined(HAVE_SYS_RESOURCE_H) routerchilds[i].r = r; #endif MIBMtaEntry->rt.RouterProcesses -= 1; } } /* re-instantiate the signal handler.. */ #ifdef SIGCLD SIGNAL_HANDLE(SIGCLD, sig_chld); #else SIGNAL_HANDLE(SIGCHLD, sig_chld); #endif } /* * Read whatever there is, detect "#hungry\n" line, and return * status of the hunger flag.. */ static int reader_getc __((struct router_child *)); static int reader_getc(rc) struct router_child *rc; { unsigned char c; /* Child exited but 'tochild' still set ? close it! */ if (rc->childpid < 0 && rc->tochild >= 0) { pipes_shutdown_child(rc->tochild); rc->tochild = -1; /* Back to positive value */ rc->childpid = - rc->childpid; if (logfn) { loginit(SIGHUP); /* Reinit/rotate the log every at line .. */ fprintf(stdout, "[%d] ROUTER CHILD PROCESS TERMINATED; wait() status = ", rc->childpid); if (WIFSIGNALED(rc->statloc)) { fprintf(stdout, "SIGNAL %d", WSIGNALSTATUS(rc->statloc)); MIBMtaEntry->rt.RouterProcessFaults ++; } else if (WIFEXITED(rc->statloc)) { fprintf(stdout, "EXIT %d", WEXITSTATUS(rc->statloc)); if (WEXITSTATUS(rc->statloc)) /* non-zero */ MIBMtaEntry->rt.RouterProcessFaults ++; } else { /* Uh ?? SIGNALED, and EXITED are handled above, what is this ? */ fprintf(stdout, "0x%04X ??", WEXITSTATUS(rc->statloc)); if (WEXITSTATUS(rc->statloc)) /* non-zero */ MIBMtaEntry->rt.RouterProcessFaults ++; } #if (defined(HAVE_WAIT3) || defined(HAVE_WAIT4)) && \ defined(HAVE_SYS_RESOURCE_H) fprintf(stdout, "; time = %ld.%06ld usr %ld.%06ld sys", (long)rc->r.ru_utime.tv_sec, (long)rc->r.ru_utime.tv_usec, (long)rc->r.ru_stime.tv_sec, (long)rc->r.ru_stime.tv_usec); #endif fprintf(stdout,"\n"); fflush(stdout); } } errno = 0; if (rc->readout >= rc->readsize) rc->readout = rc->readsize = 0; if (rc->readsize <= 0) rc->readsize = read(rc->fromchild, rc->readbuf, sizeof(rc->readbuf)); /* Now either we have something, or we don't.. */ if (rc->readsize < 0) { errno = EAGAIN; return EOF; } if (rc->readsize == 0) { errno = 0; return EOF; /* REAL EOF */ } c = rc->readbuf[rc->readout++]; return (int)c; } /* Single child reader.. */ static int _parent_reader __((struct router_child *rc)); static int _parent_reader(rc) struct router_child *rc; { int c; if (rc->fromchild < 0) return 0; /* The FD is in non-blocking mode */ for (;;) { c = reader_getc(rc); if (c == EOF) { /* Because the socket/pipe is in NON-BLOCKING mode, we may drop here with an ERROR indication, which can be cleared and thing resume later.. */ if (errno) break; /* An EOF ? -- child existed ?? */ if (rc->tochild >= 0) close(rc->tochild); rc->tochild = -1; if (rc->fromchild >= 0) close(rc->fromchild); rc->fromchild = -1; rc->hungry = 0; rc->task_ino = 0; rc->childpid = 0; break; } if (rc->linebuf == NULL) { rc->linespace = 120; rc->linelen = 0; rc->linebuf = emalloc(rc->linespace+2); } if (rc->linelen+2 >= rc->linespace) { rc->linespace <<= 1; /* Double the size */ rc->linebuf = erealloc(rc->linebuf, rc->linespace+2); } rc->linebuf[rc->linelen++] = c; if (c == '\n') { /* End of line */ rc->linebuf[rc->linelen] = 0; /*fprintf(stderr,"_parent_reader[%d] len=%d buf='%s'\n",rc->childpid,rc->linelen,rc->linebuf);*/ if (rc->linelen == 1) { /* Just a newline.. */ rc->linelen = 0; continue; } if (rc->linelen == 8 && strcmp(rc->linebuf,"#hungry\n")==0) { rc->linelen = 0; rc->hungry = 1; rc->task_ino = 0; continue; } /* LOG THIS LINE! */ if (logfn) { loginit(SIGHUP); /* Reinit/rotate the log every at line .. */ fprintf(stdout, "[%d] ", rc->childpid); fputs(rc->linebuf, stdout); fflush(stdout); } rc->linelen = 0; } } return rc->hungry; } /* Single child writer.. */ static void _parent_writer __((struct router_child *rc)); static void _parent_writer(rc) struct router_child *rc; { int c, left; /* FD for writing ?? */ if (rc->tochild < 0) return; /* Anything to write ? */ for (;rc->childout < rc->childsize;) { left = rc->childsize - rc->childout; c = write(rc->tochild, rc->childline + rc->childout, left); if (c < 0 && errno == EPIPE) { pipes_shutdown_child(rc->tochild); rc->tochild = -1; } if (c <= 0) break; rc->childout += c; } /* All written ?? */ if (rc->childout >= rc->childsize) rc->childout = rc->childsize = 0; } #ifdef HAVE_SELECT #ifdef _AIX /* The select.h defines NFDBITS, etc.. */ # include # include #endif #if defined(BSD4_3) || defined(sun) #include #endif #include #include #include #include #include #ifndef NFDBITS /* * This stuff taken from the 4.3bsd /usr/include/sys/types.h, but on the * assumption we are dealing with pre-4.3bsd select(). */ typedef long fd_mask; #ifndef NBBY #define NBBY 8 #endif /* NBBY */ #define NFDBITS ((sizeof fd_mask) * NBBY) /* SunOS 3.x and 4.x>2 BSD already defines this in /usr/include/sys/types.h */ #ifdef notdef typedef struct fd_set { fd_mask fds_bits[1]; } fd_set; #endif /* notdef */ #ifndef _Z_FD_SET #define _Z_FD_SET(n, p) ((p)->fds_bits[0] |= (1 << (n))) #define _Z_FD_CLR(n, p) ((p)->fds_bits[0] &= ~(1 << (n))) #define _Z_FD_ISSET(n, p) ((p)->fds_bits[0] & (1 << (n))) #define _Z_FD_ZERO(p) memset((char *)(p), 0, sizeof(*(p))) #endif /* !FD_SET */ #endif /* !NFDBITS */ #ifdef FD_SET #define _Z_FD_SET(sock,var) FD_SET(sock,&var) #define _Z_FD_CLR(sock,var) FD_CLR(sock,&var) #define _Z_FD_ZERO(var) FD_ZERO(&var) #define _Z_FD_ISSET(i,var) FD_ISSET(i,&var) #else #define _Z_FD_SET(sock,var) var |= (1 << sock) #define _Z_FD_CLR(sock,var) var &= ~(1 << sock) #define _Z_FD_ZERO(var) var = 0 #define _Z_FD_ISSET(i,var) ((var & (1 << i)) != 0) #endif /* Return info re has some waiting been done */ static int parent_reader(waittime) int waittime; { fd_set rdset, wrset; int i, highfd, fd, rc; struct timeval tv; if (notifysocket_reinit && notifysocket_reinit < now) notifysock_init(); redo_again:; _Z_FD_ZERO(rdset); _Z_FD_ZERO(wrset); highfd = notifysocket; if (notifysocket >= 0) _Z_FD_SET(notifysocket, rdset); for (i = 0; i < MAXROUTERCHILDS; ++i) { /* Places to read from ?? */ fd = routerchilds[i].fromchild; if (fd >= 0) { _Z_FD_SET(fd, rdset); if (highfd < fd) highfd = fd; } /* Something wanting to write ?? */ fd = routerchilds[i].tochild; if (fd >= 0 && routerchilds[i].childout < routerchilds[i].childsize) { _Z_FD_SET(fd, wrset); if (highfd < fd) highfd = fd; } } #if 0 if (highfd < 0) return 0; /* Nothing to do! */ #endif tv.tv_sec = waittime; tv.tv_usec = 0; rc = select(highfd+1, &rdset, &wrset, NULL, &tv); if (rc == 0) return 1; /* Nothing to do, leave.. Did sleep for a second! */ if (rc < 0) { /* Drat, some error.. */ if (errno == EINTR) goto redo_again; /* Hmm.. Do it just blindly (will handle error situations) */ for (i = 0; i < MAXROUTERCHILDS; ++i) { _parent_writer(&routerchilds[i]); _parent_reader(&routerchilds[i]); } return 0; /* Urgh, an error.. */ } /* Ok, select gave indication of *something* being ready for read */ if (notifysocket >= 0 && _Z_FD_ISSET(notifysocket, rdset)) notify_reader(notifysocket); for (i = 0; i < MAXROUTERCHILDS; ++i) { fd = routerchilds[i].tochild; if (fd >= 0 && _Z_FD_ISSET(fd, wrset)) _parent_writer(&routerchilds[i]); fd = routerchilds[i].fromchild; if (fd >= 0 && _Z_FD_ISSET(fd, rdset)) _parent_reader(&routerchilds[i]); } return 1; /* Did some productive job, don't sleep at the caller.. */ } #else /* NO HAVE_SELECT */ static int parent_reader(waittime) int waittime; { int i; /* No select, but can do non-blocking -- we hope.. */ for (i = 0; i < MAXROUTERCHILDS; ++i) _parent_reader(&routerchilds[i], waittime); return 0; } #endif /* * Actual child-process job feeder. This just puts the thing * into the buffer, and writes (if it can).. * * Requires: childsize == 0 (buffer is free), * tochild >= 0 (socket exists (and is ready to receive)) */ static int _parent_feed_child __(( struct router_child *rc, const char *fname, const char *dir )); static int _parent_feed_child(rc, fname, dir) struct router_child *rc; const char *fname, *dir; { int i; /* Ok, we are feeding him.. */ rc->hungry = 0; /* What shall we feed ?? */ sprintf(rc->childline, "%s\n", fname); /* DIR information is already included at the fname ! */ rc->childsize = strlen(rc->childline); rc->childout = 0; /* Lets try to write it in one go.. */ i = write(rc->tochild, rc->childline, rc->childsize); if (i > 0) rc->childout = i; if (rc->childout >= rc->childsize) rc->childout = rc->childsize = 0; return 0; } #if 0 /* !!! NOT USED !!!! */ /* ----------------------------------------------------------------- * * Child-process job feeder - distributes jobs in even round-robin * manner to all children. Might some day do resource control a'la * "Priority: xyz" -> process subset 2,3,4 * * This looks for free child, and waits for successfull write. * ----------------------------------------------------------------- */ static int parent_feed_child __((const char *fname, const char *dir)); static int parent_feed_child(fname,dir) const char *fname, *dir; { static int rridx = -1; struct router_child *rc = NULL; int i; char *s; do { /* Loop until can feed, our caller shall not need to refeed message again! */ /* FIXME: Real proper way is needed, messages to be queued to sets of server processes... Compare with Scheduler's channels. */ s = strchr(fname,'-'); if (s && isdigit(*fname)) { long thatpid = atoi(s+1); if ((thatpid > 1) && (thatpid != MYPID) && (kill(thatpid,0)==0)) { /* Process of that PID does exist, and possibly even something we can kick.. (we should be *root* here anyway!) */ for (i = 0; i < MAXROUTERCHILDS; ++i) { /* Is it one of ours children ?? */ if (routerchilds[i].childpid == thatpid) return 0; /* Yes! No refeed! */ } } /* Hmm.. perhaps it is safe to feed to a subprocess */ } parent_reader(0); rridx = 0; rc = NULL; if (rridx >= MAXROUTERCHILDS) rridx = 0; for (i = 0; i < MAXROUTERCHILDS; ++i) { rc = &routerchilds[rridx]; /* If no child at this slot, start one! (whatever has been the reason for its death..) */ if (rc->childpid == 0) { start_child(rridx); sleep(2); /* Allow a moment for child startup */ parent_reader(0); } /* If we have a hungry child with all faculties intact.. */ if (rc->tochild >= 0 && rc->fromchild >= 0 && rc->hungry) break; /* Next index.. */ ++rridx; if (rridx >= MAXROUTERCHILDS) rridx = 0; } /* Failed to find a hungry child!? We should not have been called in the first place.. */ } while (!rc || !rc->hungry || rc->tochild < 0 || rc->fromchild < 0); _parent_feed_child(rc, fname, dir); #if 0 /* NO sync waiting here! */ /* .. or if not, we wait here until it has been fed.. */ while (rc->childout < rc->childsize) parent_reader(0); #endif return 1; /* Did feed successfully ?? */ } #endif /* NOT USED!!! */ /* * child_server() * The real workhorse at the child, receives work, reports status * */ static void child_server(tofd,frmfd) int tofd, frmfd; { FILE *fromfp = fdopen(tofd, "r"); FILE *tofp = fdopen(frmfd, "w"); char linebuf[8000]; char *s, *fn; setvbuf(fromfp, NULL, _IOLBF, 0); setvbuf(tofp, NULL, _IOFBF, 0); fprintf(tofp, "ROUTER CHILD PROCESS STARTED\n"); fflush(tofp); linebuf[sizeof(linebuf)-1] = 0; while (!feof(fromfp) && !ferror(fromfp)) { fprintf(tofp, "\n#hungry\n"); fflush(tofp); if (gothup) dohup(SIGHUP); if (fgets(linebuf, sizeof(linebuf)-1, fromfp) == NULL) break; /* EOF ?? */ s = strchr(linebuf,'\n'); if (s) *s = 0; if (*linebuf == 0) break; /* A newline -> exit */ /* Input is either: "file.name" or "../path/file.name" */ fn = strrchr(linebuf,'/'); if (fn) { *fn++ = 0; s = linebuf; /* 'Dirs' */ } else { fn = linebuf; s = linebuf + strlen(linebuf); } rd_doit(fn, s); } /* Loop ends for some reason, perhaps parent died and pipe got an EOF ? We leave... Our caller exits. */ fprintf(tofp, "ROUTER CHILD PROCESS TERMINATING\n"); fflush(tofp); } /* ----------------------------------------------------------------- * Directory Input Queue subsystem * * Copied from the Scheduler * * ----------------------------------------------------------------- */ struct dirstatname { struct stat st; long ino; char *dir; /* Points to stable strings.. */ char name[1]; /* Allocate enough size */ }; struct dirqueue { int wrksum; int sorted; int wrkcount; int wrkspace; struct sptree *mesh; struct dirstatname **stats; }; static int dirqueuescan __((const char *dir,struct dirqueue *dq, int subdirs)); #define ROUTERDIR_CNT 30 struct dirqueue dirqb[ROUTERDIR_CNT]; struct dirqueue *dirq[ROUTERDIR_CNT]; /* Following is filled at run_daemon(), and cleared also.. */ static char *routerdirs[ROUTERDIR_CNT]; static char *routerdirs2[ROUTERDIR_CNT]; /* * Absorb any new files that showed up in our directory. */ int dq_insert(DQ, ino, file, dir) void *DQ; long ino; const char *file, *dir; { struct stat stbuf; struct dirstatname *dsn; struct dirqueue *dq = DQ; int i; if (!ino) return 1; /* Well, actually it isn't, but we "makebelieve" */ time(&now); if (dq == NULL) dq = dirq[0]; if (lstat(file,&stbuf) != 0 || !S_ISREG(stbuf.st_mode)) { /* Not a regular file.. Let it be there for the manager to wonder.. */ return -1; } #if 0 loginit(SIGHUP); /* Reinit/rotate the log every at line .. */ fprintf(stdout,"dqinsert: ino=%ld file='%s' dir='%s'\n",ino,file,dir); #endif /* Is it already in the database ? */ if (sp_lookup((u_long)ino,dq->mesh) != NULL) { #if 0 fprintf(stderr,"daemonsub: tried to dq_insert(ino=%ld, file='%s') already in queue\n",ino, file); #endif return 1; /* It is! */ } for (i = 0; i < MAXROUTERCHILDS; ++i) if (ino == routerchilds[i].task_ino) return 1; /* In active processing! */ /* Now store the entry */ dsn = (struct dirstatname*)emalloc(sizeof(*dsn)+strlen(file)+1); memcpy(&(dsn->st),&stbuf,sizeof(stbuf)); dsn->ino = ino; dsn->dir = strdup(dir); strcpy(dsn->name,file); sp_install(ino, (void *)dsn, 0, dq->mesh); /* Into the normal queue */ if (dq->wrkspace <= dq->wrkcount) { /* Increase the space */ dq->wrkspace = dq->wrkspace ? dq->wrkspace << 1 : 8; /* malloc(size) == realloc(NULL,size) */ dq->stats = (struct dirstatname**)erealloc(dq->stats, sizeof(void*) * dq->wrkspace); } dq->stats[dq->wrkcount] = dsn; dq->wrkcount += 1; dq->wrksum += 1; dq->sorted = 0; /* Account these into router job queue, counting them OUT of that queue is job for rfc822.c::sequencer() ! */ MIBMtaEntry->rt.ReceivedMessages += 1; MIBMtaEntry->rt.StoredMessages += 1; i = (stbuf.st_size + stbuf.st_blksize -1)/1024; MIBMtaEntry->rt.ReceivedVolume += i; MIBMtaEntry->rt.StoredVolume += i; return 0; } int in_dirscanqueue(DQ,ino) void *DQ; long ino; { struct dirqueue *dq = DQ; if (dq == NULL) dq = dirq[0]; /* Return 1, if can find the "ino" in the queue */ if (dq->wrksum == 0 || dq->mesh == NULL) return 0; if (sp_lookup((u_long)ino, dq->mesh) != NULL) return 1; return 0; } static int dq_ctimecompare __((const void *, const void *)); static int dq_ctimecompare(b,a) /* we want oldest entry LAST */ const void *a; const void *b; { const struct dirstatname **A = (const struct dirstatname **)a; const struct dirstatname **B = (const struct dirstatname **)b; int rc; rc = ((*B)->st.st_mtime - (*A)->st.st_mtime); return rc; } static int dirqueuescan(dir, dq, subdirs) const char *dir; struct dirqueue *dq; int subdirs; { DIR *dirp; struct dirent *dp; struct stat stbuf; char file[MAXNAMLEN+1]; int newents = 0; #if 0 static time_t modtime = 0; /* Any changes lately ? */ if (estat(dir, &stbuf) < 0) return -1; /* Could not stat.. */ if (stbuf.st_mtime == modtime) return 0; /* any changes lately ? */ modtime = stbuf.st_mtime; #endif /* Some changes lately, open the dir and read it */ dirp = opendir(dir); if (!dirp) return 0; for (dp = readdir(dirp); dp != NULL; dp = readdir(dirp)) { /* Scan filenames into memory */ if (subdirs && dp->d_name[0] >= 'A' && dp->d_name[0] <= 'Z' && dp->d_name[1] == 0 ) { /* We do this recursively.. */ if (dir[0] == '.' && dir[1] == 0) strcpy(file, dp->d_name); else sprintf(file, "%s/%s", dir, dp->d_name); if (lstat(file,&stbuf) != 0 || !S_ISDIR(stbuf.st_mode)) { /* Not a directory.. Let it be there for the manager to wonder.. */ continue; } /* Recurse into levels below.. */ newents += dirqueuescan(file, dq, subdirs); continue; } /* End of directories of names "A" .. "Z" */ if (dp->d_name[0] >= '0' && dp->d_name[0] <= '9') { /* A file whose name STARTS with a number (digit) */ long ino = atol(dp->d_name); if (in_dirscanqueue(dq,ino)) /* Already in pre-schedule-queue */ continue; if (dir[0] == '.' && dir[1] == 0) strcpy(file, dp->d_name); else sprintf(file, "%s/%s", dir, dp->d_name); if (dq_insert(dq, ino, file, dir)) continue; ++newents; dq->sorted = 0; } /* ... end of "filename starts with a [0-9]" */ } #ifdef BUGGY_CLOSEDIR /* * Major serious bug time here; some closedir()'s * free dirp before referring to dirp->dd_fd. GRRR. * XX: remove this when bug is eradicated from System V's. */ close(dirp->dd_fd); #endif closedir(dirp); return newents; } int syncweb(rc) struct router_child *rc; { struct stat *stbuf; char *file, *ddir; struct dirstatname *dqstats; struct spblk *spl; int wrkcnt = 0; long ino; struct dirqueue *dq = rc->dq; int wrkidx = dq->wrkcount -1; /* Any work to do ? */ if (dq->wrksum == 0) return 0; time(&now); if (stability && !dq->sorted && dq->wrkcount > 1) { /* Sort the dq->stats[] per file ctime -- LATEST on slot 0. (we drain this queue from the END) */ qsort( (void*)dq->stats, dq->wrkcount, sizeof(void*), dq_ctimecompare ); dq->sorted = 1; } /* Ok some, decrement the count to change it to index */ dqstats = dq->stats[wrkidx]; file = dqstats->name; ddir = dqstats->dir; stbuf = &(dqstats->st); ino = dqstats->ino; rc->task_ino = ino; /* Mark this INO into processing */ _parent_feed_child(rc, file, ddir); dq->wrkcount -= 1; dq->wrksum -= 1; if (dq->wrkcount > wrkidx) { /* Skipped some ! Compact the array ! */ memcpy( &dq->stats[wrkidx], &dq->stats[wrkidx+1], sizeof(dq->stats[0]) * (dq->wrkcount - wrkidx)); } dq->stats[dq->wrkcount] = NULL; /* Now we have pointers */ /* Deletion from the dq->mesh should ALWAYS succeed.. */ spl = sp_lookup((u_long)ino, dq->mesh); if (spl != NULL) sp_delete(spl, dq->mesh); /* Free the pre-schedule queue entry */ free(dqstats); free(ddir); return wrkcnt; } /* * The notify_reader() receives messages from message submitters, * and uses them to find (most of the) new jobs. * * The messages are of SPACE separated string: 'NEW dirname X/Y/filename' * */ static void notify_reader(sock) int sock; { char buf[1000], *r, *p, *s; int i, ok, cnt; long ino; /* Pick at most 10 in a row */ for (cnt=10; cnt!=0; --cnt) { i = recvfrom(sock, buf, sizeof(buf), 0, NULL, NULL); if (i <= 0 && errno != EINTR) return; if (i < 0) continue; if (i >= sizeof(buf)) i = sizeof(buf)-1; buf[i] = 0; /* The only supported notify is: "NEW router X/Y/file-name" message */ #if 0 /* DEBUG IT! */ if (logfn) { loginit(SIGHUP); /* Reinit/rotate the log every at line .. */ fprintf(stdout, "NOTIFY got: i=%d '%s'\n", i, buf); fflush(stdout); } #endif if (strncmp(buf,"NEW ",4) != 0) continue; r = s = p = buf+4; while (*s != 0 && *s != ' ') ++s; if (*s == ' ') *s++ = 0; p = s; ok = 0; if ('1' <= s[0] && s[0] <= '9' && strchr(s, '/') == NULL) { ok = 1; } else if ('A' <= s[0] && s[0] <= 'Z' && s[1] == '/') { s += 2; if ('1' <= s[0] && s[0] <= '9' && strchr(s, '/') == NULL) { ok = 1; } else if ('A' <= s[0] && s[0] <= 'Z' && s[1] == '/') { s += 2; if ('1' <= s[0] && s[0] <= '9' && strchr(s, '/') == NULL) { ok = 1; } } } if (!ok) continue; ino = atol(p); for (i = 0; i < ROUTERDIR_CNT; ++i) { if (routerdirs[i] && strcmp(routerdirs[i],r) == 0) { #if 0 if (lstat(p,&stbuf) != 0) continue; if (!S_ISFILE(stbuf.st_mode)) continue; if (in_dirscanqueue(dirq[i],stbuf.st_ino)) continue; #endif #if 0 /* DEBUG IT! */ if (logfn) { loginit(SIGHUP); /* Reinit/rotate the log every at line .. */ fprintf(stdout, " ... routerdirs[%d] == '%s', ino=%ld, file='%s'\n", i, routerdirs[i], ino, p); fflush(stdout); } zsyslog((LOG_INFO, "notify dir='%s' ino=%ld file='%s'", r, ino, p)); #endif dq_insert(dirq[i], ino, p, routerdirs2[i]); break; } } } } /* "rd_doit()" at the feeding parent server */ /* * STABILITY option will make the router process incoming messages in * arrival (modtime) order, instead of randomly determined by position * in the router directory. The scheme is to read in all the names, * and stat the files. It would be possible to reuse the stat information * later, but I'm not convinced it is worthwhile since the later stat is * an fstat(). On the other hand, if we used splay tree insertion to * sort the entries, then the stat buffer could easily be reused in * makeLetter(). * * SECURITY WARNING: iff makeLetter does not stat again, then there is * a window of opportunity for a Bad Guy to remove a file after it has * been stat'ed (with root privs), and before it has been processed. * This can be avoided partially by sticky-bitting the router directory, * and entirely by NOT saving the stat information we get here. */ int run_doit(argc, argv) int argc; const char *argv[]; { const char *filename; char *sh_memlevel = getlevel(MEM_SHCMD); int r; const char *av[3]; if (argc != 2) { fprintf(stderr, "Usage: %s \n", argv[0]); return 1; } filename = argv[1]; /* Do one file, return value is 0 or 1, depending on actually doing something on a file */ gensym = 1; av[0] = "process"; /* I think this needs to be here */ av[1] = filename; av[2] = NULL; r = s_apply(2, av); /* "process" filename (within rd_doit() ) */ free_gensym(); setlevel(MEM_SHCMD,sh_memlevel); return r; } static int rd_doit(filename, dirs) const char *filename, *dirs; { /* Do one file, return value is 0 or 1, depending on actually doing something on a file */ int dir_fd_ = -1; DIR *dp = NULL; /* The 'filename' is always directoryless, whereas 'dirs' points to the directory where it resides */ #ifdef USE_ALLOCA char *buf; #else static char *buf = NULL; static u_int blen = 0; #endif const char *av[3]; char *p; int len, rc; int lockfd = -1; char pathbuf[512]; char dirhash[5]; char *sh_memlevel = getlevel(MEM_SHCMD); int thatpid; struct stat stbuf; router_id = getpid(); dirhash[0] = 0; *pathbuf = 0; #if defined(Z_DIRFD) && defined(HAVE_FCHDIR) if (*dirs) { dp = opendir("."); if (dp) dir_fd_ = Z_DIRFD(dp); if (*dirs && dirs[0] == '.' && dirs[1] == '.' && dirs[2] == '/') { char *p = pathbuf + 3; strcpy(pathbuf, dirs); while (*p && *p != '/') ++p; if (*p) *p++ = 0; chdir(pathbuf); strncpy(dirhash, p, sizeof(dirhash)-1); dirhash[sizeof(dirhash)-1] = 0; } else { strncpy(dirhash, dirs, sizeof(dirhash)-1); dirhash[sizeof(dirhash)-1] = 0; } *pathbuf = 0; if (dirhash[0]) { strcpy(pathbuf,dirhash); strcat(pathbuf,"/"); } } #endif if (dirhash[0] && dir_fd_ < 0) { /* If it is in alternate dir, move to primary one, and process there! */ strcpy(pathbuf,dirhash); strcat(pathbuf,"/"); } strcat(pathbuf,filename); len = strlen(filename); thatpid = 0; p = strchr(filename, '-'); if (p != NULL) { /* message file is "inode-pid" */ thatpid = atoi(p+1); #if 0 /* very old thing, may harm at Solaris 2.6 ! */ if (thatpid < 10) { /* old-style locking.. */ thatpid = 0; } #endif /* Probe it! Does the process exist ? */ if (thatpid && (kill(thatpid,0)==0) && (thatpid != router_id)) { /* * an already locked message file, * belonging to another process */ if (*dirs) { fprintf(stderr, "** BUG **: %s%s not in primary router directory!\n", dirs,filename); } #if defined(HAVE_FCHDIR) if (dir_fd_ >= 0) fchdir(dir_fd_); #endif if (dp) closedir(dp); return 0; /* * This should not happen anywhere but at * primary router directory. If xxxx-nn * format file exists anywhere else, it is * a bug time! */ } } lockfd = open(pathbuf, O_RDWR, 0); if (lockfd >= 0) { #if defined(HAVE_LOCKF) && defined(F_LOCK) lseek(lockfd, (off_t)0, SEEK_SET); /* To the end of the file */ if (lockf(lockfd, F_LOCK, 0) < 0) { /* return 0; FIXME: ??? ERRORS ?? HOW ??? */ } #else #ifdef HAVE_FLOCK if (flock(lockfd, LOCK_EX) < 0) { /* return 0; FIXME: ??? ERRORS ?? HOW ??? */ } #else /* FCNTL() locking */ struct flock ft; memset( & ft, 0, sizeof(ft) ); ft.l_type = F_WRLCK; ft.l_whence = SEEK_SET; ft.l_start = 0; ft.l_len = 0; fcntl(lockfd, F_GETLK, &ft); /* XXX: ERROR PROCESSING! */ #endif /* HAVE_FLOCK */ #endif /* HAVE_LOCKF */ } if (strncmp(filename,"core",4) != 0 && ((p == NULL) || (thatpid != router_id))) { /* Not a core file, and ... not already in format of 'inode-pid' */ /* If the pid did exist, we do not touch on that file, on the other hand, we need to rename the file now.. */ #ifdef USE_ALLOCA buf = (char*)alloca(len+20); #else if (blen == 0) { blen = len+20; buf = (char *)malloc(len+20); } while (len + 18 > blen) { blen = 2 * blen; buf = (char *)realloc(buf, blen); } #endif /* Figure out its inode number */ rc = lstat(pathbuf,&stbuf); if (rc != 0) { if (lockfd >= 0) close(lockfd); #if defined(HAVE_FCHDIR) if (dir_fd_ >= 0) fchdir(dir_fd_); #endif if (dp) closedir(dp); return 0; /* Failed ? Well, skip it */ } if (!S_ISREG(stbuf.st_mode)) return 0; /* Not a regular file ?? */ if (dirhash[0]) { sprintf(buf, "%s/%ld", dirhash, (long)stbuf.st_ino); } else { sprintf(buf, "%ld", (long)stbuf.st_ino); } if (strcmp(pathbuf, buf) != 0 && eqrename(pathbuf, buf) < 0) { if (lockfd >= 0) close(lockfd); #if defined(HAVE_FCHDIR) if (dir_fd_ >= 0) fchdir(dir_fd_); #endif if (dp) closedir(dp); return 0; /* something is wrong, erename() complains. (some other process picked it ?) */ } filename = buf; /* message file is now "file-#" and belongs to this process */ } #ifdef MALLOC_TRACE mal_contents(stdout); #endif gensym = 1; av[0] = "process"; /* I think this needs to be here */ av[1] = filename; av[2] = NULL; s_apply(2, av); /* "process" filename (within rd_doit() ) */ free_gensym(); if (lockfd >= 0) close(lockfd); #if defined(HAVE_FCHDIR) if (dir_fd_ >= 0) fchdir(dir_fd_); #endif if (dp) closedir(dp); setlevel(MEM_SHCMD,sh_memlevel); #ifdef MALLOC_TRACE mal_contents(stdout); #endif return 1; } int run_stability(argc, argv) int argc; const char *argv[]; { switch (argc) { case 1: printf("%s %s\n", argv[0], stability ? "on" : "off"); break; case 2: if (strcmp(argv[1], "on") == 0) { real_stability = 1; break; } else if (strcmp(argv[1], "off") == 0) { real_stability = 0; break; } default: fprintf(stderr, "Usage: %s [ on | off ]\n", argv[0]); return EX_USAGE; } return 0; } int run_daemon(argc, argv) int argc; const char *argv[]; { DIR *dirp[ROUTERDIR_CNT]; /* Lets say we have max 30 router dirs.. */ int i, ii; const char *s, *rd; const char *routerdir_s = getzenv("ROUTERDIRS"); char pathbuf[256]; memtypes oval = stickymem; time_t nextdirscan = 0; MIBMtaEntry->sys.RouterMasterPID = getpid(); MIBMtaEntry->sys.RouterMasterStartTime = time(NULL); MIBMtaEntry->sys.RouterMasterStarts += 1; /* Zero the gauges at our startup.. */ MIBMtaEntry->rt.StoredMessages = 0; /* in input queue */ MIBMtaEntry->rt.StoredVolume = 0; /* in input queue */ MIBMtaEntry->rt.RouterProcesses = 1; /* myself = 1 */ MIBMtaEntry->rt.RouterProcessForks = 0; MIBMtaEntry->rt.RouterProcessFaults = 0; if (nrouters > MAXROUTERCHILDS) nrouters = MAXROUTERCHILDS; memset(routerchilds, 0, sizeof(routerchilds)); for (i = 0; i < MAXROUTERCHILDS; ++i) routerchilds[i].fromchild = routerchilds[i].tochild = -1; /* instantiate the signal handler.. */ #ifdef SIGCLD SIGNAL_HANDLE(SIGCLD, sig_chld); #else SIGNAL_HANDLE(SIGCHLD, sig_chld); #endif SIGNAL_HANDLE(SIGTERM, sig_exit); /* mustexit = 1 */ memset(&dirqb, 0, sizeof(dirqb)); for (i=0; idd_size = 0; /* stupid Berkeley bug workaround */ #endif #endif for (ii = 0; ii < nrouters; ++ii) routerchilds[ii].dq = dirq[0]; ii = MAXROUTERCHILDS; stickymem = MEM_MALLOC; routerdirs[0] = strnsave("router",6); routerdirs2[0] = strnsave(".",2); if (routerdir_s) { /* Store up secondary router dirs! */ rd = routerdir_s; for (i = 1; i < ROUTERDIR_CNT && *rd; ) { s = strchr(rd,':'); if (s) *(char*)s = 0; sprintf(pathbuf,"../%s",rd); /* strcat(pathbuf,"/"); */ routerdirs[i] = strdup(rd); routerdirs2[i] = strdup(pathbuf); if (ii > 2) routerchilds[ --ii ].dq = dirq[i]; ++i; if (s) *(char*)s = ':'; if (s) rd = s+1; else break; } } setfreefd(); stickymem = oval; /* Do initial synchronous queue scan now */ for (i = 0; i < ROUTERDIR_CNT; ++i) { if (routerdirs2[i]) dirqueuescan(routerdirs2[i], dirq[i], 1); } for (i = 0; i < MAXROUTERCHILDS; ++i) { if ( routerchilds[i].dq && routerchilds[i].dq->wrkcount ) if (start_child(i)) break; /* fork failed.. */ } sleep(5); /* Wait a bit before continuting so that child processes have a change to start */ /* Collect start reports (initial "#hungry\n" lines) */ parent_reader(0); for (; !mustexit ;) { time(&now); if (now > nextdirscan) { nextdirscan = now + 10; for (i = 0; i < ROUTERDIR_CNT; ++i) { if (routerdirs2[i]) dirqueuescan(routerdirs2[i], dirq[i], 1); } } for (i = 0; i < MAXROUTERCHILDS; ++i) { if ( routerchilds[i].tochild < 0 && routerchilds[i].dq && routerchilds[i].dq->wrkcount ) start_child(i); if ( routerchilds[i].tochild >= 0 && routerchilds[i].childsize == 0 && routerchilds[i].hungry && routerchilds[i].dq && routerchilds[i].dq->wrkcount > 0 ) { /* Feed this ! */ syncweb(&routerchilds[i]); } } parent_reader(1); } for (i=0; i < ROUTERDIR_CNT; ++i) { if (routerdirs2[i]) free(routerdirs2[i]); routerdirs2[i] = NULL; if (routerdirs[i]) free(routerdirs[i]); routerdirs[i] = NULL; } return 0; } /* * Based on the name of a message file, figure out what to do with it. */ struct protosw { const char *pattern; const char *function; } psw[] = { /*{ "[0-9]*.x400", "x400" }, */ /*{ "[0-9]*.fax", "fax" }, */ /*{ "[0-9]*.uucp", "uucp" }, */ { "[0-9]*", "rfc822" }, }; int run_process(argc, argv) int argc; const char *argv[]; { struct protosw *pswp; char *file; int r; char *sh_memlevel = getlevel(MEM_SHCMD); if (argc != 2 || argv[1][0] == '\0') { fprintf(stderr, "Usage: %s messagefile\n", argv[0]); return PERR_USAGE; } #ifdef USE_ALLOCA file = (char*)alloca(strlen(argv[1])+1); #else file = (char*)emalloc(strlen(argv[1])+1); #endif strcpy(file, argv[1]); r = 0; /* by default, ignore it */ for (pswp = &psw[0]; pswp < &psw[(sizeof psw / sizeof psw[0])]; ++pswp) if (strmatch(pswp->pattern, file)) { printf("process %s %s\n", pswp->function, file); argv[0] = pswp->function; r = s_apply(argc, argv); /* process-by-FUNC filename */ printf("done with %s\n", file); if (r) printf("status %d\n", r); break; } #ifndef USE_ALLOCA free(file); #endif setlevel(MEM_SHCMD,sh_memlevel); return r; }