/* * DREADERD/MAIN.C - diablo newsreader * * This is the startup for the dreaderd daemon. * * The diablo newsreader requires one or remote diablo news boxes to * obtain articles from, but keeps track of article assignments to groups * and overview information locally. * * Operationally, dreaderd runs as a standalone multi-forking, * multi-threaded daemon. It forks X resolvers and Y readers. Each * reader can support N connections (NxY total connections) and maintains * its own links to one or more news servers (diablo, INN, whatever). * Overview information is utilized and articles are accessed by * message-id. * * The diablo newsreader is also able to accept header-only feeds from * one or more diablo servers via 'mode headfeed'. Processing of these * feeds is not multi-threaded, only processing of newsreader functions is * multithreaded. The diablo newsreader does not operate with a dhistory * file. If taking a feed from several sources you must also run the * diablo server on the same box and use that to feed the dreaderd system, * though the diablo server need not store any articles itself. * * (c)Copyright 1998, Matthew Dillon, All Rights Reserved. Refer to * the COPYRIGHT file in the base directory of this distribution * for specific rights granted. */ #include "defs.h" #if NEED_TERMIOS #include #endif Prototype void RTClearStatus(int index, int count); Prototype void RTUpdateStatus(int slot, const char *ctl, ...); Prototype pid_t ForkPipe(int *pfd, void (*task)(int fd, const char *id), const char *description); Prototype int NumReaderForks; Prototype int ThisReaderFork; Prototype int FeedOnlyServer; Prototype int CoreDebugOpt; Prototype int FastCopyOpt; Prototype pid_t *ReaderPids; Prototype char *RTStatus; Prototype char *MyGroupHome; Prototype struct vserver *VServerConfig; void HandleDnsThread(ForkDesc *desc); void ValidateTcpBufferSize(int *psize); void deleteDnsResHash(DnsRes *dres, ForkDesc *desc); int getReaderForkSlot(void); void delReaderForkSlot(pid_t pid); #ifdef INET6 void SendDns(int fd, struct sockaddr_storage *rsin, DnsRes *dres); #else void SendDns(int fd, struct sockaddr_in *rsin, DnsRes *dres); #endif void FinishRetain(int what); void DoListen(ForkDesc *desc, time_t t); void DoCommand(int fd); void getVStats(FILE*fo); void getStats(FILE*fo, int raw); #ifdef READER_BAN_LISTS void SetBanList(char *ban); int Banned(char *ip, time_t t); void DumpBannedLists(FILE *fo); void DumpBannedConfigs(FILE *fo); #endif char *RTStatus = NULL; char *MyGroupHome; char *PidFile; char AdminVersion; const char *SysLogDesc; int CoreDebugOpt = 0; int FastCopyOpt = 1; int TxBufSize = 4096; int RxBufSize = 2048; int STxBufSize = 4096; int SRxBufSize = 16384; int MaxSDnsForks = 0; /* future */ int MaxConnects; int ConnectCount; int FailCount; int ListenersEnabled = 1; time_t TimeStart; #ifdef DREADER_CLIENT_TIMING float ticksPerSecond; #endif int NumReaderFeedForks; int NumReaderForks; int NumDnsForks; int NumSDnsForks; int NumPending; /* pending connections */ int NumActive; /* active connections */ double TotalClientGroups = 0.0; double TotalClientArticles = 0.0; double TotalClientArtBytes = 0.0; double TotalClientPosts = 0.0; double TotalClientPostFail = 0.0; double TotalClientPostBytes = 0.0; double TotalSpoolArticles = 0.0; double TotalSpoolBytes = 0.0; double TotalFeedArticles = 0.0; double TotalFeedBytes = 0.0; double TotalFeedArtFail = 0.0; int ufd; /* drcmd listen fd */ int ThisReaderFork; int FeedOnlyServer = -1; pid_t *ReaderPids; MemPool *DnsResPool; DnsRes *DnsResHash[DRHSIZE]; BindList *BindBase = NULL; typedef struct PendingClose { int fd; time_t when; int quickclose; char msgbuf[1024]; struct PendingClose *next; } PendingClose; PendingClose *NeedClosing = NULL; MemPool *PendingMemPool; int DelayedClose = 0; #ifdef READER_BAN_LISTS typedef struct BanInfo { IPList *BanList; int BanHashSize; int BanLinkSize; int BanCount; time_t BanTime; } BanInfo; BanInfo HostBan; BanInfo UserBan; BanInfo GroupBan; BanInfo GlobalBan; char *LastBan = NULL; #endif #define RET_CLOSE 1 #define RET_PAUSE 2 #define RET_LOCK 3 typedef struct Retain { struct Retain *re_Next; FILE *re_Fi; FILE *re_Fo; int re_What; } Retain; Retain *ReBase; int PausedCount = 0; int Exiting = 0; MemPool *ReMemPool; void Usage(void) { fprintf(stderr, "Usage: dreaderd -p pathhost [-A admin] [-B bindip[:port] [-b fd][-C file]\n"); fprintf(stderr, " [-c delaysecs] [-d [debugval] [-D dnsprocs ] [-F feedprocs]\n"); fprintf(stderr, " [-f val] [-h hostname ] [-M readerprocs ] [-N readthreads]\n"); fprintf(stderr, " [-P [bindip:]port ] [-R rxbufsize ]\n"); fprintf(stderr, " [-r rtstatusfile ] [ -S[T|R]# ] [-s statusline ]\n"); fprintf(stderr, " [-T txbufsize] [ -V ] [-X xrefhost ] [ -x xrefslavehost ] [ -Z ]\n"); fprintf(stderr, "\n"); fprintf(stderr, "where:\n"); fprintf(stderr, " -p pathhost Set the default Path: entry for this host:\n"); fprintf(stderr, " -A admin Set the default newsadmin email for this host:\n"); fprintf(stderr, " -B bindip[:port] Bind to a specific interface\n"); fprintf(stderr, " -b fd Provide an open socket to listen on\n"); fprintf(stderr, " -C file specify diablo.config to use\n"); fprintf(stderr, " -c delaysecs Delay closing of failed connections\n"); fprintf(stderr, " -d [debugval] Enable debugging\n"); fprintf(stderr, " -D dnsprocs Specify the number of DNS processes to start\n"); fprintf(stderr, " -F feedprocs Specify the number of dedicated feed processes to start\n"); fprintf(stderr, " -f [0|1] Disable/enable fast copy of data from spool to client\n"); fprintf(stderr, " -h hostname Set the default banner hostname for this server\n"); fprintf(stderr, " -M readprocs Specify the number of reader processes to start\n"); fprintf(stderr, " -N readthreads Specify the number of threads per reader process\n"); fprintf(stderr, " -P [bindip]:port Bind to a specific port\n"); fprintf(stderr, " -R rxbufsize Set the TCP rcv buffer size\n"); fprintf(stderr, " -r rtstatusfile Set the filename to store realtime status data\n"); fprintf(stderr, " -S[T:R]# Set TCP rx/tx buffer sizes\n"); fprintf(stderr, " -s statusline Use this area of process space to write status data\n"); fprintf(stderr, " -T txbufsize Set the TCP txmit buffer size\n"); fprintf(stderr, " -V Print version\n"); fprintf(stderr, " -X Set the default Xref: hostname shown to clients\n"); fprintf(stderr, " -x Set the Xref: hostname that must match incoming\n"); fprintf(stderr, " feed Xref: lines\n"); fprintf(stderr, " -Z Enable core file debugging\n"); exit(1); } int main(int ac, char **av) { int i; int passedfd = -1; LoadDiabloConfig(ac, av); if (strcmp(DRIncomingLogPat, "NONE") == 0) DRIncomingLogPat = NULL; else IncomingLogPat = DRIncomingLogPat; for (i = 1; i < ac; ++i) { char *ptr = av[i]; int v; if (*ptr != '-') { fprintf(stderr, "Unexpected option: %s\n", ptr); exit(1); } ptr += 2; v = (*ptr) ? strtol(ptr, NULL, 0) : 1; switch(ptr[-1]) { case 'A': strdupfree(&DOpts.NewsAdmin, (*ptr) ? ptr : av[++i], ""); break; case 'B': { BindList *bl = calloc(sizeof(BindList), 1); bl->bl_Port = NULL; bl->bl_Host = NULL; if (*ptr == 0) ptr = av[++i]; if (*ptr == '[') { char *p = strchr(ptr, ']'); bl->bl_Host = strdup(SanitiseAddr(ptr)); if (p != NULL && (p = strrchr(p, ':')) != NULL) bl->bl_Port = strdup(p + 1); } else if (strrchr(ptr, ':')) { bl->bl_Port = strdup(strrchr(ptr, ':') + 1); bl->bl_Host = strdup(SanitiseAddr(ptr)); } else { bl->bl_Host = strdup(SanitiseAddr(ptr)); } bl->bl_Next = BindBase; BindBase = bl; } break; case 'b': passedfd = strtol(((*ptr) ? ptr : av[++i]), NULL, 0); break; case 'C': if (*ptr == 0) ++i; break; case 'c': DelayedClose = strtol(((*ptr) ? ptr : av[++i]), NULL, 0); break; case 'd': DebugOpt = v; break; case 'D': DOpts.ReaderDns = strtol(((*ptr) ? ptr : av[++i]), NULL, 0); break; case 'F': DOpts.ReaderFeedForks = strtol(((*ptr) ? ptr : av[++i]), NULL, 0); break; case 'f': FastCopyOpt = v; break; case 'h': strdupfree(&DOpts.ReaderHostName, (*ptr) ? ptr : av[++i], NULL); break; case 'M': DOpts.ReaderForks = strtol(((*ptr) ? ptr : av[++i]), NULL, 0); break; case 'N': DOpts.ReaderThreads = strtol(((*ptr) ? ptr : av[++i]), NULL, 0); break; case 'P': { BindList *bl = calloc(sizeof(BindList), 1); bl->bl_Port = NULL; bl->bl_Host = NULL; if (*ptr == 0) ptr = av[++i]; if (*ptr == '[') { char *p = strchr(ptr, ']'); bl->bl_Host = strdup(SanitiseAddr(ptr)); if (p != NULL && (p = strrchr(p, ':')) != NULL) bl->bl_Port = strdup(p + 1); } else if (strrchr(ptr, ':')) { bl->bl_Port = strdup(strrchr(ptr, ':') + 1); bl->bl_Host = strdup(SanitiseAddr(ptr)); } else { bl->bl_Port = strdup(ptr); } bl->bl_Next = BindBase; BindBase = bl; } break; case 'p': strdupfree(&DOpts.ReaderPathHost, (*ptr) ? ptr : av[++i], ""); break; case 'R': RxBufSize = strtol(((*ptr) ? ptr : av[++i]), NULL, 0); break; case 'r': strdupfree(&RTStatus, (*ptr) ? ptr : av[++i], NULL); break; case 'S': switch(*ptr) { case 'T': ++ptr; STxBufSize = strtol(((*ptr) ? ptr : av[++i]), NULL, 0); break; case 0: case '0': case '1': case '2': case '3': case '4': case '5': case '6': case '7': case '8': case '9': STxBufSize = strtol(((*ptr) ? ptr : av[++i]), NULL, 0); case 'R': ++ptr; SRxBufSize = strtol(((*ptr) ? ptr : av[++i]), NULL, 0); break; default: fatal("bad option: -S (must be R#, T#, or #)"); break; } break; case 's': ptr = (*ptr) ? ptr : av[++i]; SetStatusLine(ptr, strlen(ptr)); break; case 'T': TxBufSize = strtol(((*ptr) ? ptr : av[++i]), NULL, 0); break; case 'V': PrintVersion(); break; case 'X': strdupfree(&DOpts.ReaderXRefHost, (*ptr) ? ptr : av[++i], NULL); break; case 'x': strdupfree(&DOpts.ReaderXRefSlaveHost, (*ptr) ? ptr : av[++i], NULL); break; case 'Z': CoreDebugOpt = 1; break; default: fprintf(stderr, "unknown option: %s\n", ptr - 2); Usage(); } } ValidateTcpBufferSize(&RxBufSize); ValidateTcpBufferSize(&TxBufSize); ValidateTcpBufferSize(&SRxBufSize); ValidateTcpBufferSize(&STxBufSize); if (BindBase == NULL) { BindList *bl = calloc(sizeof(BindList), 1); bl->bl_Host = DOpts.ReaderBindHost; bl->bl_Port = DOpts.ReaderPort; bl->bl_Next = BindBase; BindBase = bl; } MaxConnects = DOpts.ReaderThreads * DOpts.ReaderForks; MyGroupHome = strdup(PatExpand(GroupHomePat)); if (BindBase == NULL) { BindList *bl = calloc(sizeof(BindList), 1); bl->bl_Host = DOpts.ReaderBindHost; bl->bl_Port = DOpts.ReaderPort; bl->bl_Next = BindBase; BindBase = bl; } /* * To stdout, not an error, no arguments == user requesting * options list. We can do this because the -p option is mandatory. */ if (DOpts.ReaderPathHost == NULL) Usage(); /* * bind to our socket prior to changing uid/gid */ if (passedfd < 0) { BindList *bl; for (bl = BindBase; bl; bl = bl->bl_Next) { int lfd; #ifdef INET6 int error; struct addrinfo hints; struct addrinfo *res; /* * Open a wildcard listening socket */ memset(&hints, 0, sizeof(hints)); hints.ai_flags = AI_PASSIVE; hints.ai_family = PF_UNSPEC; hints.ai_socktype = SOCK_STREAM; error = getaddrinfo(bl->bl_Host, bl->bl_Port, &hints, &res); if (error == EAI_NODATA) { hints.ai_flags = 0; error = getaddrinfo(bl->bl_Host, 0, &hints, &res); } if (error != 0) { fprintf(stderr, "getaddrinfo: %s:%s: %s\n", bl->bl_Host ? bl->bl_Host : "ALL", bl->bl_Port, gai_strerror(error)); exit(1); } lfd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); #else struct sockaddr_in lsin; lfd = socket(AF_INET, SOCK_STREAM, 0); bzero(&lsin, sizeof(lsin)); if (bl->bl_Port == NULL) bl->bl_Port = DOpts.ReaderPort; lsin.sin_port = strtol(bl->bl_Port, NULL, 0); if (lsin.sin_port == 0) { struct servent *se; se = getservbyname(bl->bl_Port, NULL); if (se == NULL) { fprintf(stderr, "Unknown service name: %s\n", bl->bl_Port); exit(1); } lsin.sin_port = se->s_port; } else { lsin.sin_port = htons(lsin.sin_port); } lsin.sin_family = AF_INET; lsin.sin_addr.s_addr = INADDR_ANY; if (bl->bl_Host != NULL) { struct hostent *he; if ((he = gethostbyname(bl->bl_Host)) != NULL) { lsin.sin_addr = *(struct in_addr *)he->h_addr; } else { lsin.sin_addr.s_addr = inet_addr(bl->bl_Host); if (lsin.sin_addr.s_addr == INADDR_NONE) { perror("gethostbyname(BindHost)"); exit(1); } } } #endif { int on = 1; setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, (void *)&on, sizeof(on)); setsockopt(lfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&on, sizeof(on)); } setsockopt(lfd, SOL_SOCKET, SO_SNDBUF, (void *)&TxBufSize, sizeof(int)); setsockopt(lfd, SOL_SOCKET, SO_RCVBUF, (void *)&RxBufSize, sizeof(int)); #ifdef INET6 if (bind(lfd, res->ai_addr, res->ai_addrlen) < 0) { perror("bind"); exit(1); } freeaddrinfo(res); #else if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) { perror("bind"); exit(1); } #endif listen(lfd, 256); if (bl->bl_Host != NULL && strchr(bl->bl_Host, ':') != NULL) logit(LOG_INFO, "Listening on [%s]:%s\n", bl->bl_Host ? bl->bl_Host : "ALL", bl->bl_Port); else logit(LOG_INFO, "Listening on %s:%s\n", bl->bl_Host ? bl->bl_Host : "ALL", bl->bl_Port); AddThread("acceptor", lfd, -1, THREAD_LISTEN, -1, 0); } } else { { int on = 1; setsockopt(passedfd, SOL_SOCKET, SO_REUSEADDR, (void *)&on, sizeof(on)); setsockopt(passedfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&on, sizeof(on)); } setsockopt(passedfd, SOL_SOCKET, SO_SNDBUF, (void *)&TxBufSize, sizeof(int)); setsockopt(passedfd, SOL_SOCKET, SO_RCVBUF, (void *)&RxBufSize, sizeof(int)); listen(passedfd, 256); AddThread("acceptor", passedfd, -1, THREAD_LISTEN, -1, 0); } /* * Fork if not in debug mode and detach from terminal */ if (DebugOpt == 0) { pid_t pid; int fd; fflush(stdout); fflush(stderr); if ((pid = fork()) != 0) { if (pid < 0) { perror("fork"); exit(1); } exit(0); } /* * child continues, close stdin, stdout, stderr, detach tty, * detach process group. */ { int fd = open("/dev/null", O_RDWR); if (fd != 0) dup2(fd, 0); if (fd != 1) dup2(fd, 1); if (fd != 2) dup2(fd, 2); if (fd > 2) close(fd); } #if USE_TIOCNOTTY if ((fd = open("/dev/tty", O_RDWR)) >= 0) { ioctl(fd, TIOCNOTTY, 0); close(fd); } #endif #if USE_SYSV_SETPGRP setpgrp(); #else setpgrp(0, 0); #endif } /* * Ignore SIGPIPE, in case a write() fails we do not * want to segfault. */ rsignal(SIGPIPE, SIG_IGN); /* * change my uid/gid */ { struct passwd *pw = getpwnam("news"); struct group *gr = getgrnam("news"); gid_t gid; if (pw == NULL) { perror("getpwnam('news')"); exit(1); } if (gr == NULL) { perror("getgrnam('news')"); exit(1); } gid = gr->gr_gid; setgroups(1, &gid); setgid(gr->gr_gid); setuid(pw->pw_uid); } if (DOpts.ReaderHostName == NULL) SetMyHostName(&DOpts.ReaderHostName); if (*DOpts.ReaderHostName == 0) { free(DOpts.ReaderHostName); DOpts.ReaderHostName = NULL; } if (DOpts.NewsAdmin == NULL) SetNewsAdmin(&DOpts.NewsAdmin, DOpts.ReaderHostName); if (*DOpts.NewsAdmin == 0) { free(DOpts.NewsAdmin); DOpts.NewsAdmin = NULL; } if (DOpts.ReaderXRefHost == NULL) strdupfree(&DOpts.ReaderXRefHost, DOpts.ReaderXRefSlaveHost, NULL); if (DOpts.ReaderXRefHost == NULL) strdupfree(&DOpts.ReaderXRefHost, DOpts.ReaderPathHost, NULL); if (DOpts.ReaderXRefHost == NULL) strdupfree(&DOpts.ReaderXRefHost, DOpts.ReaderHostName, NULL); /* * Setup realtime status file name and remove file in case * dreaderd's are left over from a previous run. */ if (RTStatus == NULL) { RTStatus = malloc(strlen(PatExpand(LogHomePat)) + 32); sprintf(RTStatus, "%s/dreaderd.status", PatExpand(LogHomePat)); } remove(RTStatus); /* * RTStatus file initial open */ RTStatusOpen(RTStatus, 0, 1); /* * open syslog */ OpenLog("dreaderd", (DebugOpt ? LOG_PERROR : 0) | LOG_NDELAY | LOG_PID); SysLogDesc = "dreaderd"; /* * Save PID */ { int fd; char buf[32]; sprintf(buf, "%d\n", (int)getpid()); PidFile = malloc(strlen(PatExpand(LogHomePat)) + 32); sprintf(PidFile, "%s/dreaderd.pid", PatExpand(LogHomePat)); remove(PidFile); if ((fd = open(PidFile, O_RDWR|O_CREAT|O_TRUNC, 0644)) < 0) { logit(LOG_ERR, "unable to create %s\n", PidFile); exit(1); } write(fd, buf, strlen(buf)); close(fd); } #ifdef DREADER_CLIENT_TIMING ticksPerSecond=sysconf(_SC_CLK_TCK); #endif /* * Cancel cache (inherited by children) */ InitCancelCache(); InstallAccessCache(); #ifdef READER_BAN_LISTS bzero(&UserBan, sizeof(UserBan)); bzero(&HostBan, sizeof(HostBan)); bzero(&GroupBan, sizeof(GroupBan)); bzero(&GlobalBan, sizeof(GlobalBan)); #endif ReaderPids = malloc((DOpts.ReaderForks + DOpts.ReaderFeedForks) * sizeof(pid_t)); bzero(ReaderPids, (DOpts.ReaderForks + DOpts.ReaderFeedForks) * sizeof(pid_t)); /* * Add the UNIX domain socket listener for drcmd */ { struct sockaddr_un soun; memset(&soun, 0, sizeof(soun)); if ((ufd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { perror("udom-socket"); exit(1); } soun.sun_family = AF_UNIX; sprintf(soun.sun_path, "%s", PatRunExpand(DReaderSocketPat)); remove(soun.sun_path); if (bind(ufd, (struct sockaddr *)&soun, offsetof(struct sockaddr_un, sun_path[strlen(soun.sun_path)+1])) < 0) { perror("udom-bind"); exit(1); } chmod(soun.sun_path, 0770); #if NONBLK_ACCEPT_BROKEN /* HPUX is broken, see lib/config.h */ #else fcntl(ufd, F_SETFL, O_NONBLOCK); #endif if (listen(ufd, 10) < 0) { perror("udom-listen"); exit(1); } AddThread("drcmd", ufd, -1, THREAD_DRCMD, -1, 0); } /* * accept()/pipe-processing loop, fork child processes as required * to maintain requested parameters (initial fork is done as part of * the nominal re-fork code when a child dies). */ logit(LOG_NOTICE, "Waiting for connections"); TimeStart = time(NULL); for (;;) { /* * handle required forks and child process cleanup * * (A) reader NNTP processes */ while (!Exiting && NumReaderForks < DOpts.ReaderForks) { pid_t pid; int fd; ThisReaderFork = getReaderForkSlot(); FeedOnlyServer = 0; pid = ForkPipe(&fd, ReaderTask, "reader"); FeedOnlyServer = -1; if (pid < 0) break; ReaderPids[ThisReaderFork] = pid; AddThread("reader", fd, pid, THREAD_READER, -1, 0); ++NumReaderForks; } /* * (B) reader feeder-only processes ('mode reader' not available) */ while (!Exiting && NumReaderFeedForks < DOpts.ReaderFeedForks) { pid_t pid; int fd; ThisReaderFork = getReaderForkSlot(); FeedOnlyServer = 1; pid = ForkPipe(&fd, ReaderTask, "feed"); FeedOnlyServer = -1; if (pid < 0) break; ReaderPids[ThisReaderFork] = pid; AddThread("feed", fd, pid, THREAD_FEEDER, -1, 0); ++NumReaderFeedForks; } /* * (C) standard DNS processes */ while (!Exiting && NumDnsForks < DOpts.ReaderDns) { pid_t pid; int fd; if ((pid = ForkPipe(&fd, DnsTask, "reader-dns")) < 0) break; AddThread("dns", fd, pid, THREAD_DNS, -1, 0); ++NumDnsForks; } #ifdef NOTDEF /* * (D) server hostname resolver (future) */ while (!Exiting && NumSDnsForks < MaxSDnsForks) { pid_t pid; int fd; if ((pid = ForkPipe(&fd, DnsTask, "reader-sdns")) < 0) break; AddThread("sdns", fd, pid, THREAD_SDNS, -1, 0); ++NumSDnsForks; } #endif /* * (E) dead processes */ { pid_t pid; while ((pid = wait3(NULL, WNOHANG, NULL)) > 0) { ForkDesc *desc; if ((desc = FindThread(-1, pid)) != NULL) { if (desc->d_Type == THREAD_DNS) { if (!Exiting) logit(LOG_CRIT, "Lost dns resolver %d", (int)pid); --NumDnsForks; if (desc->d_FdPend >= 0) { logit(LOG_NOTICE, "pending descriptor from %s on dead DNS pid closed", NetAddrToSt(0, (struct sockaddr *)&desc->d_SaveRSin, 1, 1, 1) ); close(desc->d_FdPend); desc->d_FdPend = -1; NumPending -= desc->d_Count; /* note: d_Count may already be 0 */ NumActive -= desc->d_Count; desc->d_Count = 0; } } else if (desc->d_Type == THREAD_READER) { if (!Exiting) logit(LOG_CRIT, "Lost reader process %d", (int)pid); --NumReaderForks; delReaderForkSlot(pid); NumActive -= desc->d_Count; deleteDnsResHash(NULL, desc); } else if (desc->d_Type == THREAD_FEEDER) { if (!Exiting) logit(LOG_CRIT, "Lost feeder process %d", (int)pid); --NumReaderFeedForks; delReaderForkSlot(pid); NumActive -= desc->d_Count; deleteDnsResHash(NULL, desc); } else if (desc->d_Type == THREAD_SDNS) { if (!Exiting) logit(LOG_CRIT, "Lost sdns process %d", (int)pid); --NumSDnsForks; } else { fatal("main loop, unknown thread type %d pid %d\n", desc->d_Type, (int)pid); } DelThread(desc); } else { fatal("main loop, wait3() pid %d, pid is not known to me!", (int)pid); } } } #ifdef READER_BAN_LISTS /* * Check for changes in the Ban list */ if (DOpts.ReaderBan != NULL && LastBan != DOpts.ReaderBan) { logit(LOG_INFO, "Setting ban list"); SetBanList(DOpts.ReaderBan); LastBan = DOpts.ReaderBan; } #endif /* * (F) select core * * select on descriptors (listen socket and pipe sockets only), * process results */ { struct timeval tv; fd_set rfds = SFds; int i; if (Exiting) NextTimeout(&tv, 50); else NextTimeout(&tv, 10 * 1000); stprintf("Connect=%d Failed=%d Dns=%d/%d Act=%d/%d", ConnectCount, FailCount, NumPending, DOpts.ReaderDns, NumActive, MaxConnects ); RTStatusUpdate(0, "Connect=%d Failed=%d Dns=%d/%d Act=%d/%d", ConnectCount, FailCount, NumPending, DOpts.ReaderDns, NumActive, MaxConnects ); select(MaxFds, &rfds, NULL, NULL, &tv); gettimeofday(&CurTime, NULL); for (i = 0; i < MaxFds; ++i) { if (FD_ISSET(i, &rfds)) { ForkDesc *desc; if ((desc = FindThread(i, -1)) != NULL) { #ifdef DREADER_CLIENT_TIMING struct tms start,end; times(&start); #endif switch(desc->d_Type) { case THREAD_LISTEN: DoListen(desc, CurTime.tv_sec); break; case THREAD_DNS: /* * read query result from DNS handler, then * issue appropriate action to descriptor. */ HandleDnsThread(desc); break; case THREAD_READER: case THREAD_FEEDER: /* * reader process returns a dres struc for each * closed connection. Update our ref counts. */ { DnsRes dres; int n; int recv_fd; n = RecvMsg(desc->d_Fd, &recv_fd, &dres); if (n != sizeof(dres)) break; if (dres.dr_ResultFlags == DR_SERVER_STATS) { TotalSpoolArticles += dres.dr_ArtCount; TotalSpoolBytes += dres.dr_ByteCount; } else { #ifdef INET6 struct sockaddr_storage rsin; memcpy(&rsin, &dres.dr_Addr, sizeof(struct sockaddr_storage)); #else struct sockaddr_in rsin; bzero(&rsin, sizeof(rsin)); memcpy(&rsin, &dres.dr_Addr, sizeof(rsin)); #endif if (dres.dr_ResultFlags == DR_REQUIRE_DNS) { SendDns(recv_fd, &rsin, &dres); } else { dres.dr_ResultFlags = DR_SESSEXIT_RPT; SendDns(recv_fd, &rsin, &dres); close(recv_fd); } --NumActive; --desc->d_Count; deleteDnsResHash(&dres, desc); } } break; case THREAD_DRCMD: DoCommand(desc->d_Fd); break; default: fatal("Unknown descriptor type %d", desc->d_Type); break; } #ifdef DREADER_CLIENT_TIMING if ((desc = FindThread(i, -1)) != NULL) { times(&end); desc->d_utime+=end.tms_utime-start.tms_utime; desc->d_stime+=end.tms_stime-start.tms_stime; } #endif } else { fatal("select returned unknown descriptor %d", i); } } } (void)ScanTimers(1, 0); } /* * Disable listen socket if we have too many connections pending or * we are full up, enable listen socket otherwise. NOTE: NumPending * is included in the NumActive number. */ if (NumPending < NumDnsForks && NumActive < NumReaderForks * DOpts.ReaderThreads ) { if (ListenersEnabled == 0) { int i; ListenersEnabled = 1; for (i = 0; i < MaxFds; ++i) { ForkDesc *desc; if ((desc = FindThread(i, -1)) != NULL && desc->d_Fd >= 0 && desc->d_Type == THREAD_LISTEN ) { FD_SET(desc->d_Fd, &SFds); } } } } else { if (ListenersEnabled == 1) { int i; ListenersEnabled = 0; for (i = 0; i < MaxFds; ++i) { ForkDesc *desc; if ((desc = FindThread(i, -1)) != NULL && desc->d_Fd >= 0 && desc->d_Type == THREAD_LISTEN ) { FD_CLR(desc->d_Fd, &SFds); } } } } InstallAccessCache(); /* Clean out the delayed closes */ if (NeedClosing != NULL) { PendingClose *pc; PendingClose *ppc = NULL; time_t timenow = time(NULL); for (pc = NeedClosing; pc != NULL; ppc = pc, pc = pc->next) { if (pc->when > timenow) continue; write(pc->fd, pc->msgbuf, strlen(pc->msgbuf)); close(pc->fd); pc->fd = -1; ++FailCount; if (!pc->quickclose) { --NumActive; } if (pc == NeedClosing) NeedClosing = pc->next; if (ppc != NULL) ppc->next = pc->next; zfree(&PendingMemPool, pc, sizeof(PendingClose)); } } if (Exiting && !NumReaderFeedForks && !NumReaderForks && !NumDnsForks && !NumSDnsForks) break; } FinishRetain(RET_CLOSE); logit(LOG_NOTICE, "Exiting"); return(0); } void FinishRetain(int what) { Retain *ret; Retain **pret; for (pret = &ReBase; (ret = *pret) != NULL; ) { if (ret->re_What == what) { if (ret->re_Fo) fprintf(ret->re_Fo, "200 Operation Complete\n.\n"); *pret = ret->re_Next; if (ret->re_Fi) fclose(ret->re_Fi); if (ret->re_Fo) fclose(ret->re_Fo); zfree(&ReMemPool, ret, sizeof(Retain)); } else { pret = &ret->re_Next; } } } void DoListen(ForkDesc *desc, time_t t) { /* * accept new descriptor, depend on socket * buffer for write to not block if descriptor * beyond what we can handle. */ { char errbuf[1024]; #ifdef INET6 struct sockaddr_storage rsin; #else struct sockaddr_in rsin; #endif int rslen = sizeof(rsin); int fd = accept(desc->d_Fd, (struct sockaddr *)&rsin, &rslen); if (PausedCount) { snprintf(errbuf, sizeof(errbuf), "400 %s: Server is temporarily unavailable, please try later%s%s\r\n", DOpts.ReaderHostName, (DOpts.NewsAdmin != NULL) ? " - " : "", safestr(DOpts.NewsAdmin, "")); write(fd, errbuf, strlen(errbuf)); close(fd); fd = -1; ++ConnectCount; ++FailCount; return; } if (fd >= MAXFDS) { logit(LOG_CRIT, "fd beyond MAXFDS: %d, closing", fd); snprintf(errbuf, sizeof(errbuf), "400 %s: Too many open descriptors, please try later%s%s\r\n", DOpts.ReaderHostName, (DOpts.NewsAdmin != NULL) ? " - " : "", safestr(DOpts.NewsAdmin, "")); write(fd, errbuf, strlen(errbuf)); close(fd); fd = -1; ++ConnectCount; ++FailCount; return; } if (fd >= 0) { #ifdef READER_BAN_LISTS if (Banned(inet_ntoa(rsin.sin_addr), t)) { PendingClose *pc; logit(LOG_INFO, "rejected auto-banned IP %s", inet_ntoa(rsin.sin_addr)); snprintf(errbuf, sizeof(errbuf), "502 %s: Access denied to your node%s%s\r\n", DOpts.ReaderHostName, (DOpts.NewsAdmin != NULL) ? " - " : "", safestr(DOpts.NewsAdmin, "")); if (DelayedClose) { for (pc = NeedClosing; pc != NULL && pc->next != NULL; pc = pc->next); if (pc == NULL) { pc = zalloc(&PendingMemPool, sizeof(PendingClose)); NeedClosing = pc; } else { pc->next = zalloc(&PendingMemPool, sizeof(PendingClose)); pc = pc->next; } pc->fd = fd; strncpy(pc->msgbuf, errbuf, sizeof(pc->msgbuf) - 1); pc->msgbuf[sizeof(pc->msgbuf) - 1] = '\0'; pc->when = time(NULL) + DelayedClose; pc->quickclose = 1; pc->next = NULL; } else { write(fd, errbuf, strlen(errbuf)); close(fd); fd = -1; } ++ConnectCount; ++FailCount; return; } #endif /* * queue descriptor information to DNS * resolver and put descriptor on hold. */ SendDns(fd, &rsin, NULL); } } } /* * DOCOMMAND() - handles a control connection */ void DoCommand(int ufd) { int fd; int retain = 0; #ifdef INET6 struct sockaddr_storage asin; #else struct sockaddr_in asin; #endif ACCEPT_ARG3_TYPE alen = sizeof(asin); /* * This can be a while() or an if(), but apparently some OS's fail to * properly handle O_NONBLOCK for accept() on unix domain sockets so... */ if ((fd = accept(ufd, (struct sockaddr *)&asin, &alen)) > 0) { FILE *fi; FILE *fo; fcntl(fd, F_SETFL, 0); fo = fdopen(dup(fd), "w"); fi = fdopen(fd, "r"); if (fi && fo) { char buf[MAXLINE]; while (fgets(buf, sizeof(buf), fi) != NULL) { char *s1; char *s2; if (DebugOpt) printf("%d << %s", (int)getpid(), buf); if ((s1 = strtok(buf, " \t\r\n")) == NULL) break; s2 = strtok(NULL, "\n"); if (strcmp(s1, "status") == 0) { fprintf(fo, "Connect=%d Failed=%d Dns=%d/%d Act=%d/%d", ConnectCount, FailCount, NumPending, DOpts.ReaderDns, NumActive, MaxConnects ); } else if (strcmp(s1, "version") == 0) { fprintf(fo, "200 DREADERD Version %s - %s\n", VERS, SUBREV); } else if (strcmp(s1, "vstats") == 0) { getVStats(fo); } else if (strcmp(s1, "stats") == 0) { getStats(fo, 0); } else if (strcmp(s1, "rawstats") == 0) { getStats(fo, 1); #ifdef READER_BAN_LISTS } else if (strcmp(s1, "banlist") == 0) { DumpBannedLists(fo); } else if (strcmp(s1, "banconfig") == 0) { DumpBannedConfigs(fo); #endif } else if (strcmp(s1, "pause") == 0) { /* retain = RET_PAUSE; */ ++PausedCount; fprintf(fo, "200 Pause, count %d.\n", PausedCount); } else if (strcmp(s1, "go") == 0) { if (PausedCount) --PausedCount; fprintf(fo, "200 Resume, count %d\n", PausedCount); } else if (strcmp(s1, "quit") == 0) { fprintf(fo, "200 Goodbye\n"); fprintf(fo, ".\n"); break; } else if (strcmp(s1, "exit") == 0 || strcmp(s1, "aexit")==0) { int i; if (s1[0] == 'e') retain = RET_CLOSE; fprintf(fo, "211 Exiting\n"); fflush(fo); if (Exiting == 0) { ForkDesc *desc; Exiting = 1; logit(LOG_INFO, "Shutting down sub-processes"); for (i = 0; i < MaxFds; ++i) if ((desc = FindThread(i, -1)) != NULL) { switch(desc->d_Type) { case THREAD_DNS: if (desc->d_Pid) kill(desc->d_Pid, SIGKILL); break; case THREAD_READER: case THREAD_FEEDER: if (desc->d_Pid) kill(desc->d_Pid, SIGHUP); break; case THREAD_LISTEN: close(desc->d_Fd); break; case THREAD_DRCMD: break; } } } else { fprintf(fo, "200 Exit is already in progress\n"); } } else if (strcmp(s1, "config") == 0) { if (s2 != NULL) { char *opt; char *cmd; int cmdErr = 0; if ((cmd = strtok(s2, " \t\n")) != NULL && (opt = strtok(NULL, " \t\n")) != NULL) { if (SetCommand(fo, cmd, opt, &cmdErr) == 0) { CheckConfigOptions(1); fprintf(fo, "Set '%s' to '%s'\n", cmd, opt); } else if (cmdErr == 1) { fprintf(fo, "Invalid config command: %s %s\n", cmd, opt); fprintf(fo, "Usage: config command option\n"); } else { fprintf(fo, "Invalid option for config command: %s %s\n", cmd, opt); fprintf(fo, "Usage: config command option\n"); } } else { DumpConfigOptions(fo, s2, CONF_READER); } } else { DumpConfigOptions(fo, NULL, CONF_READER); } } else if (strcmp(s1, "help") == 0) { fprintf(fo,"Supported commands:\n"); fprintf(fo," help you are looking at it\n"); fprintf(fo," status short status line\n"); fprintf(fo," version return dreaderd version\n"); fprintf(fo," config view/set run-time config\n"); fprintf(fo," stats general server statistics\n"); fprintf(fo," rawstats general server statistics (without pretty printing)\n"); fprintf(fo," vstats virtual host statistics\n"); #ifdef READER_BAN_LISTS fprintf(fo," banconfig show current ban configs\n"); fprintf(fo," banlist show the current ban lists\n"); #endif fprintf(fo," go resume accepting new connections\n"); fprintf(fo," exit close all connections and exit\n"); } else { fprintf(fo, "500 Unrecognized command: %s\n", s1); } if (retain == 0) fprintf(fo, ".\n"); fflush(fo); /* if (retain) */ break; } } if (retain) { Retain *ret = zalloc(&ReMemPool, sizeof(Retain)); ret->re_Next = ReBase; ret->re_Fi = fi; ret->re_Fo = fo; ret->re_What = retain; ReBase = ret; } else { if (fi) fclose(fi); if (fo) fclose(fo); } } } #ifdef INET6 void SendDns(int fd, struct sockaddr_storage *rsin, DnsRes *dres) #else void SendDns(int fd, struct sockaddr_in *rsin, DnsRes *dres) #endif { ForkDesc *best; #ifdef INET6 struct sockaddr_storage lsin; #else struct sockaddr_in lsin; #endif int lslen = sizeof(lsin); /* * Check for auth changes */ InstallAccessCache(); bzero((void *)&lsin, sizeof(lsin)); if (getsockname(fd, (struct sockaddr *)&lsin, &lslen) < 0) { logit(LOG_ERR, "getsockname(%d) call failed: %s", fd, strerror(errno)); } ++ConnectCount; best = FindLeastUsedThread(THREAD_DNS, 1, 0, NULL, -1, NULL, NULL); if (best != NULL) { DnsReq dreq; ++NumActive; ++NumPending; ++best->d_Count; bzero(&dreq, sizeof(dreq)); bcopy(rsin, &dreq.dr_RSin, sizeof(dreq.dr_RSin)); bcopy(&lsin, &dreq.dr_LSin, sizeof(dreq.dr_LSin)); if (dres != NULL) { dreq.dr_ResultFlags = dres->dr_ResultFlags; strcpy(dreq.dr_AuthUser, dres->dr_AuthUser); strcpy(dreq.dr_AuthPass, dres->dr_AuthPass); dreq.dr_ByteCount = dres->dr_ByteCount; dreq.dr_GrpCount = dres->dr_GrpCount; dreq.dr_ArtCount = dres->dr_ArtCount; dreq.dr_PostBytes = dres->dr_PostBytes; dreq.dr_PostCount = dres->dr_PostCount; dreq.dr_PostFailCount = dres->dr_PostFailCount; dreq.dr_ByteCountArticle = dres->dr_ByteCountArticle; dreq.dr_ByteCountHead = dres->dr_ByteCountHead; dreq.dr_ByteCountBody = dres->dr_ByteCountBody; dreq.dr_ByteCountList = dres->dr_ByteCountList; dreq.dr_ByteCountXover = dres->dr_ByteCountXover; dreq.dr_ByteCountXhdr = dres->dr_ByteCountXhdr; dreq.dr_ByteCountOther = dres->dr_ByteCountOther; dreq.dr_SessionLength = time(NULL) - dres->dr_TimeStart; } best->d_FdPend = fd; bcopy(rsin, &best->d_SaveRSin, sizeof(best->d_SaveRSin)); write(best->d_Fd, &dreq, sizeof(dreq)); /* * ignore return code. If DNS process died, it * will be cleaned up later. */ } else { char errbuf[1024]; logit(LOG_CRIT, "Unable to find spare DNS thread - rejecting connection"); snprintf(errbuf, sizeof(errbuf), "400 %s: Server too busy - please try later%s%s\r\n", DOpts.ReaderHostName, (DOpts.NewsAdmin != NULL) ? " - " : "", safestr(DOpts.NewsAdmin, "")); write(fd, errbuf, strlen(errbuf)); close(fd); fd = -1; ++FailCount; } } pid_t ForkPipe(int *pfd, void (*task)(int fd, const char *id), const char *description) { int fds[2]; pid_t pid; *pfd = -1; /* * NOTE: must use socketpair in order to be able to pass descriptors. */ if (socketpair(PF_UNIX, SOCK_STREAM, IPPROTO_IP, fds) < 0) { logit(LOG_ERR, "socketpair() call failed"); return((pid_t)-1); } if (fds[0] > MAXFDS || fds[1] > MAXFDS) { logit(LOG_ERR, "pipe desc > MAXFDS: %d/%d/%d", fds[0], fds[1], MAXFDS); close(fds[0]); close(fds[1]); return((pid_t)-1); } *pfd = fds[0]; fflush(stdout); fflush(stderr); /* * Some systems can't deal with syslog through a * fork. */ CloseLog(NULL, 1); pid = fork(); if (pid == 0) { OpenLog(description, (DebugOpt ? LOG_PERROR : 0) | LOG_NDELAY | LOG_PID); SysLogDesc = description; } else { OpenLog(SysLogDesc, (DebugOpt ? LOG_PERROR : 0) | LOG_NDELAY | LOG_PID); } if (pid == 0) { freePool(&DnsResPool); stprintf("%s startup", description); /* note: DnsResHash not longer valid in children */ /* note: fds[1] is a normal descriptor, not a non-blocking desc */ ResetThreads(); close(*pfd); task(fds[1], description); _exit(1); } else if (pid < 0) { logit(LOG_ERR, "fork() failed: %s", strerror(errno)); close(*pfd); close(fds[1]); *pfd = -1; } else if (pid > 0) { close(fds[1]); if (DebugOpt) printf("forked %s child process %d\n", description, (int)pid); } return(pid); } void HandleDnsThread(ForkDesc *desc) { DnsRes dres; ForkDesc *reader = NULL; static int RandCounter = 0; const char *error1 = NULL; const char *error2 = NULL; char errbuf[1024]; int r; char tmpaddr[NI_MAXHOST]; errno = 0; if ((r = read(desc->d_Fd, &dres, sizeof(dres))) != sizeof(dres)) { /* * The dns subprocesses return a fixed-size structure. If we * get part of it, we should get the whole thing but may not * since the descriptor is set for non-blocking. Handle the * case. */ if (r > 0) { int n; n = read(desc->d_Fd, (char *)&dres + r, sizeof(dres) - r); if (n > 0) r += n; } if (r != sizeof(dres)) { if (errno != EINTR && errno != EWOULDBLOCK && errno != EINPROGRESS && r != 0 ) { /* * let the child process cleanup any operations in * progress. */ logit(LOG_CRIT, "read %d error on DNS subprocess, killing %d (%s)", r, (int)desc->d_Pid, strerror(errno)); kill(desc->d_Pid, 9); NumActive -= desc->d_Count; NumPending -= desc->d_Count; desc->d_Count = 0; } return; } } --desc->d_Count; --NumPending; if (dres.dr_ResultFlags & DR_SESSEXIT_RPT) { /* we don't need the rest of this */ --NumActive; return; } if (isdigit(dres.dr_ReaderName[0]) && isdigit(dres.dr_ReaderName[1]) && isdigit(dres.dr_ReaderName[2])) { snprintf(errbuf, sizeof(errbuf), "%s\r\n", dres.dr_ReaderName); error1 = errbuf; /* Point error2 at errbuf + 4 to catch the specific error */ /* error2 = "CustomError"; */ error2 = errbuf + 4; } if (error1 == NULL) { if ((r = ReadAccessCache()) < 0) { logit(LOG_CRIT, "Fatal: Unable to load access cache"); exit(1); } SetAuthDetails(&dres, dres.dr_ReaderName); dres.dr_TimeStart = time(NULL); if (r == 1) ClearOldAccessMap(); if (dres.dr_Code == 0) { snprintf(errbuf, sizeof(errbuf), "502 %s: Access denied to your node%s%s\r\n", dres.dr_VServerDef->vs_HostName, *dres.dr_VServerDef->vs_NewsAdm ? " - " : "", dres.dr_VServerDef->vs_NewsAdm); error1 = errbuf; error2 = "Unauthorized"; } if (dres.dr_Flags & DF_STATUS) { snprintf(errbuf, sizeof(errbuf), "180 %s status: connect=%d failed=%d dns=%d/%d act=%d/%d\r\n", dres.dr_VServerDef->vs_HostName, ConnectCount, FailCount, NumPending, DOpts.ReaderDns, NumActive, MaxConnects ); write(desc->d_FdPend, errbuf, strlen(errbuf)); close(desc->d_FdPend); desc->d_FdPend = -1; --NumActive; deleteDnsResHash(&dres, desc); return; } } /* * Pass the descriptor to the correct thread. A DF_FEED only connection * can be passed to a feed-specific thread. */ if (error1 == NULL) { if ((dres.dr_Flags & (DF_FEED|DF_READ|DF_POST)) == DF_FEED) { reader = FindLeastUsedThread(THREAD_FEEDER, DOpts.ReaderThreads, 0, &RandCounter, -1, NULL, NULL); dres.dr_Flags |= DF_FEEDONLY; } else { reader = FindLeastUsedThread(THREAD_READER, DOpts.ReaderThreads, 0, &RandCounter, -1, NULL, NULL); } if (reader == NULL) { snprintf(errbuf, sizeof(errbuf), "400 %s: Global connection limit reached, please try later%s%s\r\n", DOpts.ReaderHostName, (DOpts.NewsAdmin != NULL) ? " - " : "", safestr(DOpts.NewsAdmin, "")); error1 = errbuf; error2 = "ServerMaxedOut"; } } /* * Add DnsRes to hash table */ strncpy(tmpaddr, NetAddrToSt(0, (struct sockaddr *)&dres.dr_Addr, 1, 0, 0), sizeof(tmpaddr) - 1); tmpaddr[sizeof(tmpaddr) - 1] = '\0'; if (error1 == NULL && tmpaddr != NULL) { int hi = shash(tmpaddr) & DRHMASK; int hmi; DnsRes *dr; int gcount = 0; int hcount = 0; int ucount = 0; int icount = 0; int vcount = 0; /* * Until later, the counters only reflect the values of * sessions prior to the current one. This makes it easier to * calculate the rate limit. */ for (dr = DnsResHash[hi]; dr; dr = dr->dr_HNext) { if (strcmp(tmpaddr, NetAddrToSt(0, (struct sockaddr *)&dr->dr_Addr, 1, 0, 0)) == 0) { ++hcount; if (*dres.dr_IdentUser && *dr->dr_IdentUser && strcmp(dres.dr_IdentUser, dr->dr_IdentUser) == 0) ++icount; } } if (dres.dr_ReaderDef->rd_MaxConnPerGroup || dres.dr_ReaderDef->rd_MaxConnPerUser) { for (hmi=0; hmi < DRHSIZE; hmi++) for (dr = DnsResHash[hmi]; dr; dr = dr->dr_HNext) { if (*dres.dr_AuthUser && *dr->dr_AuthUser && strcmp(dres.dr_AuthUser, dr->dr_AuthUser) == 0) ++ucount; if (strcmp(dres.dr_ReaderName, dr->dr_ReaderName) == 0) ++gcount; if (strcmp(dres.dr_VServer, dr->dr_VServer) == 0) ++vcount; } } /* * Tax the rate limit based on number of connections */ { int count = 0; if (ucount) count = ucount; else if (icount) count = icount; else if (hcount) count = hcount; dres.dr_ConnCount = count; } /* * Now we add the current connection before we print the totals * and work out the limit restrictions */ gcount++; hcount++; ucount++; icount++; vcount++; if ((dres.dr_Flags & DF_QUIET) == 0) logit(LOG_INFO, "counters %s%s%s%s%s %s %s tcount=%d/%d vcount=%d/%d gcount=%d/%d hcount=%d/%d ucount=%d/%d icount=%d byte=%d", (dres.dr_AuthUser[0] ? dres.dr_AuthUser : ""), (dres.dr_AuthUser[0] ? "/" : ""), (dres.dr_IdentUser[0] ? dres.dr_IdentUser : ""), (dres.dr_IdentUser[0] ? "@" : ""), dres.dr_Host, dres.dr_ReaderDef->rd_Name, dres.dr_VServerDef->vs_Name, NumActive, dres.dr_ReaderDef->rd_MaxConnTotal, vcount, dres.dr_ReaderDef->rd_MaxConnPerVs, gcount, dres.dr_ReaderDef->rd_MaxConnPerGroup, hcount, dres.dr_ReaderDef->rd_MaxConnPerHost, ucount, dres.dr_ReaderDef->rd_MaxConnPerUser, icount, dres.dr_ReaderDef->rd_ByteLimit); if (dres.dr_ReaderDef->rd_MaxConnTotal && NumActive > dres.dr_ReaderDef->rd_MaxConnTotal) { snprintf(errbuf, sizeof(errbuf), "400 %s: Server connection limit reached%s%s\r\n", dres.dr_VServerDef->vs_HostName, *dres.dr_VServerDef->vs_NewsAdm ? " - " : "", dres.dr_VServerDef->vs_NewsAdm); error1 = errbuf; error2 = "TooManyFromTotal"; #ifdef READER_BAN_LISTS if (GlobalBan.BanCount > 0) IPListAdd(&GlobalBan.BanList, NetAddrToSt(0, (struct sockaddr *)&desc->d_SaveRSin, 1, 0, 1), GlobalBan.BanCount, time(NULL) + GlobalBan.BanTime, GlobalBan.BanHashSize, GlobalBan.BanLinkSize); #endif } if (dres.dr_ReaderDef->rd_MaxConnPerVs && vcount > dres.dr_ReaderDef->rd_MaxConnPerVs) { snprintf(errbuf, sizeof(errbuf), "400 %s: VServer connection limit reached%s%s\r\n", dres.dr_VServerDef->vs_HostName, *dres.dr_VServerDef->vs_NewsAdm ? " - " : "", dres.dr_VServerDef->vs_NewsAdm); error1 = errbuf; error2 = "TooManyFromVServer"; } if (dres.dr_ReaderDef->rd_MaxConnPerGroup && gcount > dres.dr_ReaderDef->rd_MaxConnPerGroup) { snprintf(errbuf, sizeof(errbuf), "400 %s: You have reached the connection limit for your access classification%s%s\r\n", dres.dr_VServerDef->vs_HostName, *dres.dr_VServerDef->vs_NewsAdm ? " - " : "", dres.dr_VServerDef->vs_NewsAdm); error1 = errbuf; error2 = "TooManyFromGroup"; #ifdef READER_BAN_LISTS if (GroupBan.BanCount > 0) IPListAdd(&GroupBan.BanList, NetAddrToSt(0, (struct sockaddr *)&desc->d_SaveRSin, 1, 0, 1), GroupBan.BanCount, time(NULL) + GroupBan.BanTime, GroupBan.BanHashSize, GroupBan.BanLinkSize); #endif } if (dres.dr_ReaderDef->rd_MaxConnPerHost && hcount > dres.dr_ReaderDef->rd_MaxConnPerHost) { snprintf(errbuf, sizeof(errbuf), "400 %s: Your host connection limit reached%s%s\r\n", dres.dr_VServerDef->vs_HostName, *dres.dr_VServerDef->vs_NewsAdm ? " - " : "", dres.dr_VServerDef->vs_NewsAdm); error1 = errbuf; error2 = "TooManyFromHost"; #ifdef READER_BAN_LISTS if (HostBan.BanCount > 0) IPListAdd(&HostBan.BanList, NetAddrToSt(0, (struct sockaddr *)&desc->d_SaveRSin, 1, 0, 1), HostBan.BanCount, time(NULL) + HostBan.BanTime, HostBan.BanHashSize, HostBan.BanLinkSize); #endif } if (dres.dr_ReaderDef->rd_MaxConnPerUser && (ucount > dres.dr_ReaderDef->rd_MaxConnPerUser || icount > dres.dr_ReaderDef->rd_MaxConnPerUser)) { snprintf(errbuf, sizeof(errbuf), "400 %s: Your per-user connection limit reached%s%s\r\n", dres.dr_VServerDef->vs_HostName, *dres.dr_VServerDef->vs_NewsAdm ? " - " : "", dres.dr_VServerDef->vs_NewsAdm); error1 = errbuf; error2 = "TooManyFromUser"; #ifdef READER_BAN_LISTS if (UserBan.BanCount > 0) IPListAdd(&UserBan.BanList, NetAddrToSt(0, (struct sockaddr *)&desc->d_SaveRSin, 1, 0, 1), UserBan.BanCount, time(NULL) + UserBan.BanTime, UserBan.BanHashSize, UserBan.BanLinkSize); #endif } if (error1 == NULL) { dr = zalloc(&DnsResPool, sizeof(DnsRes)); *dr = dres; dr->dr_HNext = DnsResHash[hi]; dr->dr_ReaderPid = reader->d_Pid; DnsResHash[hi] = dr; } } /* * error return */ if (error1 != NULL) { PendingClose *pc; if ((dres.dr_Flags & DF_QUIET) == 0) { logit( LOG_NOTICE, "connection to %s from %s%s%s (%s) rejected: %s", dres.dr_VServerDef->vs_Name, (dres.dr_IdentUser[0] ? dres.dr_IdentUser : ""), (dres.dr_IdentUser[0] ? "@" : ""), dres.dr_Host, NetAddrToSt(0, (struct sockaddr *)&desc->d_SaveRSin, 1, 1, 1), error2 ); } if (!DelayedClose) { write(desc->d_FdPend, error1, strlen(error1)); close(desc->d_FdPend); desc->d_FdPend = -1; ++FailCount; --NumActive; return; } /* * Add the fd to the "pending close" list * They will be closed after the DelayedClose timeout */ for (pc = NeedClosing; pc != NULL && pc->next != NULL; pc = pc->next); if (pc == NULL) { pc = zalloc(&PendingMemPool, sizeof(PendingClose)); NeedClosing = pc; } else { pc->next = zalloc(&PendingMemPool, sizeof(PendingClose)); pc = pc->next; } pc->fd = desc->d_FdPend; strncpy(pc->msgbuf, error1, sizeof(pc->msgbuf) - 1); pc->msgbuf[sizeof(pc->msgbuf) - 1] = '\0'; pc->when = time(NULL) + DelayedClose; pc->quickclose = 0; pc->next = NULL; desc->d_FdPend = -1; deleteDnsResHash(&dres, desc); return; } /* * Leave NumActive bumped, transfer descriptor to subprocess * * pass the descriptor to an appropriate subprocess */ if (SendMsg(reader->d_Fd, desc->d_FdPend, &dres) < 0) { char errbuf[1024]; logit(LOG_ERR, "connect: error passing file descriptor: %s, killing reader pid %d", strerror(errno), reader->d_Pid ); if (DebugOpt) printf("sendmsg() failed: %s\n", strerror(errno)); snprintf(errbuf, sizeof(errbuf), "400 %s: Server error, fd pass failed%s%s\r\n", DOpts.ReaderHostName, (DOpts.NewsAdmin != NULL) ? " - " : "", safestr(DOpts.NewsAdmin, "")); write(desc->d_FdPend, errbuf, strlen(errbuf)); close(desc->d_FdPend); desc->d_FdPend = -1; ++FailCount; --NumActive; deleteDnsResHash(&dres, desc); if (reader->d_Pid > 0) kill(reader->d_Pid, SIGQUIT); return; } close(desc->d_FdPend); desc->d_FdPend = -1; ++reader->d_Count; if ((dres.dr_Flags & DF_QUIET) == 0) { logit(LOG_INFO, "connect to %s from %s%s%s%s%s (%s)", dres.dr_VServerDef->vs_Name, (dres.dr_AuthUser[0] ? dres.dr_AuthUser : ""), (dres.dr_AuthUser[0] ? "/" : ""), (dres.dr_IdentUser[0] ? dres.dr_IdentUser : ""), (dres.dr_IdentUser[0] ? "@" : ""), dres.dr_Host, NetAddrToSt(0, (struct sockaddr *)&desc->d_SaveRSin, 1, 1, 1) ); } } void ValidateTcpBufferSize(int *psize) { if (*psize < 512) *psize = 512; if (*psize > 256*1024) *psize = 256*1024; } void deleteDnsResHash(DnsRes *dres, ForkDesc *desc) { if (dres == NULL) { int i; for (i = 0; i < DRHSIZE; ++i) { DnsRes **pdr = &DnsResHash[i]; DnsRes *dr; while ((dr = *pdr) != NULL) { if (dr->dr_ReaderPid == desc->d_Pid) { logit( LOG_NOTICE, "killed from %s%s%s%s%s (%s)", (dr->dr_AuthUser[0] ? dr->dr_AuthUser : ""), (dr->dr_AuthUser[0] ? "/" : ""), (dr->dr_IdentUser[0] ? dr->dr_IdentUser : ""), (dr->dr_IdentUser[0] ? "@" : ""), dr->dr_Host, NetAddrToSt(0, (struct sockaddr *)&dr->dr_Addr, 1, 1, 1) ); *pdr = dr->dr_HNext; zfree(&DnsResPool, dr, sizeof(DnsRes)); } else { pdr = &dr->dr_HNext; } } } } else { int hi = shash(NetAddrToSt(0, (struct sockaddr *)&dres->dr_Addr, 1, 0, 0)) & DRHMASK; DnsRes **pdr = &DnsResHash[hi]; DnsRes *dr; while ((dr = *pdr) != NULL) { if (dr->dr_ReaderPid == desc->d_Pid && memcmp(&dr->dr_Addr, &dres->dr_Addr, sizeof(dr->dr_Addr)) == 0 ) { if ((dr->dr_Flags & DF_QUIET) == 0) { logit( LOG_INFO, #ifdef DREADER_CLIENT_TIMING "closed from %s%s%s%s%s (%s) groups=%u arts=%u bytes=%.0f posts=%d postbytes=%d postfail=%d secs=%d utime=%.3f stime=%.3f", #else "closed from %s%s%s%s%s (%s) groups=%u arts=%u bytes=%.0f posts=%d postbytes=%d postfail=%d secs=%d", #endif (dr->dr_AuthUser[0] ? dr->dr_AuthUser : ""), (dr->dr_AuthUser[0] ? "/" : ""), (dr->dr_IdentUser[0] ? dr->dr_IdentUser : ""), (dr->dr_IdentUser[0] ? "@" : ""), dr->dr_Host, NetAddrToSt(0, (struct sockaddr *)&dr->dr_Addr, 1, 1, 1), dres->dr_GrpCount, dres->dr_ArtCount, dres->dr_ByteCount, dres->dr_PostCount, dres->dr_PostBytes, dres->dr_PostFailCount, #ifdef DREADER_CLIENT_TIMING time(NULL) - dres->dr_TimeStart, desc->d_stime/ticksPerSecond, desc->d_utime/ticksPerSecond #else time(NULL) - dres->dr_TimeStart #endif ); } if (dr->dr_Flags & DF_FEEDONLY) { TotalFeedArticles += dres->dr_PostCount; TotalFeedBytes += dres->dr_PostBytes; TotalFeedArtFail += dres->dr_PostFailCount; } else { TotalClientGroups += dres->dr_GrpCount; TotalClientArticles += dres->dr_ArtCount; TotalClientArtBytes += dres->dr_ByteCount; TotalClientPosts += dres->dr_PostCount; TotalClientPostBytes += dres->dr_PostBytes; TotalClientPostFail += dres->dr_PostFailCount; } *pdr = dr->dr_HNext; zfree(&DnsResPool, dr, sizeof(DnsRes)); break; } else { pdr = &dr->dr_HNext; } } } } int getReaderForkSlot(void) { int i; for (i = 0; i < DOpts.ReaderForks + DOpts.ReaderFeedForks; ++i) { if (ReaderPids[i] == 0) return(i); } return(0); /* XXX panic */ } void delReaderForkSlot(pid_t pid) { int i; for (i = 0; i < DOpts.ReaderForks + DOpts.ReaderFeedForks; ++i) { if (ReaderPids[i] == pid) ReaderPids[i] = 0; } } typedef struct vscount { Vserver *vserver; int count; struct vscount *next; } vscount; typedef struct rdcount { ReaderDef *reader; int count; struct vscount *vslist; struct rdcount *next; } rdcount; MemPool *StatsPool; void addVSlist(vscount **vslistp, DnsRes *dr) { vscount *vslist = *vslistp; vscount *vs; vscount *pvs = NULL; if (vslist == NULL) { vslist = zalloc(&StatsPool, sizeof(vscount)); vslist->vserver = dr->dr_VServerDef; vslist->count = 1; vslist->next = NULL; *vslistp = vslist; } else { for (vs = vslist; vs != NULL; pvs = vs, vs = vs->next) { if (vs->vserver == dr->dr_VServerDef) { vs->count++; break; } } if (vs == NULL) { vs = zalloc(&StatsPool, sizeof(vscount)); vs->vserver = dr->dr_VServerDef; vs->count = 1; vs->next = NULL; pvs->next = vs; } } } void getVStats(FILE *fo) { int hmi; DnsRes *dr; int vcount = 0; int rcount = 0; rdcount *rdlist = NULL; vscount *vslist = NULL; vscount *vs; rdcount *rd; vscount *pvs = NULL; rdcount *prd = NULL; for (hmi=0; hmi < DRHSIZE; hmi++) for (dr = DnsResHash[hmi]; dr; dr = dr->dr_HNext) { SetAuthDetails(dr, dr->dr_ReaderName); if (rdlist == NULL) { rdlist = zalloc(&StatsPool, sizeof(rdcount)); rdlist->reader = dr->dr_ReaderDef; rdlist->count = 1; rdlist->next = NULL; rdlist->vslist = NULL; addVSlist(&rdlist->vslist, dr); } else { for (rd = rdlist; rd != NULL; prd = rd, rd = rd->next) { if (rd->reader == dr->dr_ReaderDef) { addVSlist(&rd->vslist, dr); rd->count++; break; } } if (rd == NULL) { rd = zalloc(&StatsPool, sizeof(rdcount)); rd->reader = dr->dr_ReaderDef; rd->count = 1; rd->next = NULL; rd->vslist = NULL; addVSlist(&rd->vslist, dr); prd->next = rd; } } addVSlist(&vslist, dr); } rcount = 0; for (rd = rdlist; rd != NULL; prd = rd, rd = rd->next) { for (vs = rd->vslist; vs != NULL; pvs = vs, vs = vs->next) fprintf(fo, "reader %s=%d vs %s=%d\n", rd->reader->rd_Name, rd->count, vs->vserver->vs_Name, vs->count); rcount++; } fprintf(fo, "readergroups in use = %d\n", rcount); vcount = 0; for (vs = vslist; vs != NULL; pvs = vs, vs = vs->next) { fprintf(fo, "vserver %s=%d\n", vs->vserver->vs_Name, vs->count); vcount++; } fprintf(fo, "vservers in use = %d\n", vcount); freePool(&StatsPool); } void getStats(FILE *fo, int raw) { fprintf(fo, "UPTIME %d\n", time(NULL) - TimeStart); fprintf(fo, "SERVER Connect=%d Failed=%d Dns=%d/%d Act=%d/%d\n", ConnectCount, FailCount, NumPending, DOpts.ReaderDns, NumActive, MaxConnects ); if (raw) { fprintf(fo, "CLIENT Groups=%.0f Articles=%.0f Bytes=%.0f\n", TotalClientGroups, TotalClientArticles, TotalClientArtBytes ); fprintf(fo, "POST Posts=%.0f PostBytes=%.0f PostsFailed=%.0f\n", TotalClientPosts, TotalClientPostBytes, TotalClientPostFail ); fprintf(fo, "SPOOL Articles=%.0f Bytes=%.0f\n", TotalSpoolArticles, TotalSpoolBytes ); fprintf(fo, "FEED Articles=%.0f Bytes=%.0f ArtFail=%.0f\n", TotalFeedArticles, TotalFeedBytes, TotalFeedArtFail ); } else { fprintf(fo, "CLIENT Groups=%.0f Articles=%.0f Bytes=%s\n", TotalClientGroups, TotalClientArticles, ftos(TotalClientArtBytes) ); fprintf(fo, "POST Posts=%.0f PostBytes=%s PostsFailed=%.0f\n", TotalClientPosts, ftos(TotalClientPostBytes), TotalClientPostFail ); fprintf(fo, "SPOOL Articles=%.0f Bytes=%s\n", TotalSpoolArticles, ftos(TotalSpoolBytes) ); fprintf(fo, "FEED Articles=%.0f Bytes=%s ArtFail=%.0f\n", TotalFeedArticles, ftos(TotalFeedBytes), TotalFeedArtFail ); } } #ifdef READER_BAN_LISTS void SetBanList(char *ban) { BanInfo *bi = NULL; if (ban == NULL) return; while (*ban) { switch (*ban) { case ' ': case '\t': ban++; break; case 'U': bi = &UserBan; ban++; break; case 'H': bi = &HostBan; ban++; break; case 'G': bi = &GroupBan; ban++; break; case 'L': bi = &GlobalBan; ban++; break; case 's': bi->BanHashSize = atoi(++ban); while (isdigit((int)*ban)) ban++; break; case 'l': bi->BanLinkSize = atoi(++ban); while (isdigit((int)*ban)) ban++; break; case 'c': bi->BanCount = atoi(++ban); while (isdigit((int)*ban)) ban++; break; case 't': bi->BanTime = atoi(++ban); while (isdigit((int)*ban)) ban++; break; default: logit(LOG_ERR, "Unknown readerban option '%s'", ban); return; } } } int Banned(char *ip, time_t t) { if (UserBan.BanCount > 0 && IPListCheck(UserBan.BanList, ip, 1, t + UserBan.BanTime, t, UserBan.BanHashSize) == 0) return(1); if (HostBan.BanCount > 0 && IPListCheck(HostBan.BanList, ip, 1, t + HostBan.BanTime, t, HostBan.BanHashSize) == 0) return(1); if (GroupBan.BanCount > 0 && IPListCheck(GroupBan.BanList, ip, 1, t + GroupBan.BanTime, t, GroupBan.BanHashSize) == 0) return(1); if (GlobalBan.BanCount > 0 && IPListCheck(GlobalBan.BanList, ip, 1, t + GlobalBan.BanTime, t, GlobalBan.BanHashSize) == 0) return(1); return(0); } void DumpBannedLists(FILE *fo) { if (UserBan.BanCount > 0) { fprintf(fo, "User bans:\n"); IPListDump(fo, UserBan.BanList, UserBan.BanHashSize); } if (HostBan.BanCount > 0) { fprintf(fo, "Host bans:\n"); IPListDump(fo, HostBan.BanList, HostBan.BanHashSize); } if (GroupBan.BanCount > 0) { fprintf(fo, "Group bans:\n"); IPListDump(fo, GroupBan.BanList, GroupBan.BanHashSize); } if (GlobalBan.BanCount > 0) { fprintf(fo, "Global bans:\n"); IPListDump(fo, GlobalBan.BanList, GlobalBan.BanHashSize); } } void DumpBannedConfigs(FILE *fo) { if (UserBan.BanCount > 0) fprintf(fo, "user : count=%10d time=%ld\n", UserBan.BanCount, UserBan.BanTime); if (HostBan.BanCount > 0) fprintf(fo, "host : count=%10d time=%ld\n", HostBan.BanCount, HostBan.BanTime); if (GroupBan.BanCount > 0) fprintf(fo, "group : count=%10d time=%ld\n", GroupBan.BanCount, GroupBan.BanTime); if (GlobalBan.BanCount > 0) fprintf(fo, "global: count=%10d time=%ld\n", GlobalBan.BanCount, GlobalBan.BanTime); } #endif /* READER_BAN_LISTS */