/* * SMTP sink based on server.c * * $Id: smtps.c,v 1.20 2007/02/04 17:48:38 ca Exp $ */ /* * Copyright (C) 2000 Silicon Graphics, Inc. All Rights Reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. Neither the name of Silicon Graphics, Inc. nor the names of its * contributors may be used to endorse or promote products derived from * this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * HOLDERS AND CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include "sm/generic.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "st.h" #include "data.h" #include "sm/smreplycodes.h" #include #if !HAVE_SNPRINTF # define snprintf sm_snprintf # include "sm/string.h" #endif /* !HAVE_SNPRINTF */ /****************************************************************** * Server configuration parameters */ /* Log files */ #define PID_FILE "pid" #define ERRORS_FILE "errors" #define ACCESS_FILE "access" /* Default server port */ #define SERV_PORT_DEFAULT 8000 /* Socket listen queue size */ #define LISTENQ_SIZE_DEFAULT 256 /* Max number of listening sockets ("hardware virtual servers") */ #define MAX_BIND_ADDRS 16 /* Max number of "spare" threads per process per socket */ #define MAX_WAIT_THREADS_DEFAULT 8 /* Number of file descriptors needed to handle one client session */ #define FD_PER_THREAD 2 /* Access log buffer flushing interval (in seconds) */ #define ACCLOG_FLUSH_INTERVAL 30 /* Request read timeout (in seconds) */ #define REQUEST_TIMEOUT 30 /****************************************************************** * Global data */ struct socket_info { st_netfd_t nfd; /* Listening socket */ char *addr; /* Bind address */ int port; /* Port */ int wait_threads; /* Number of threads waiting to accept */ int busy_threads; /* Number of threads processing request */ int rqst_count; /* Total number of processed requests */ unsigned int rcpt_count; /* Total number of recipients */ } srv_socket[MAX_BIND_ADDRS]; /* Array of listening sockets */ static int sk_count = 0; /* Number of listening sockets */ static int vp_count = 0; /* Number of server processes (VPs) */ static pid_t *vp_pids; /* Array of VP pids */ static int my_index = -1; /* Current process index */ static pid_t my_pid = -1; /* Current process pid */ static time_t oktime = 0; static char greeting[] = "220"; static char ehloresp[] = "250"; static char finaldot[] = "250"; static char *Myname = "local.host"; static st_netfd_t sig_pipe[2]; /* Signal pipe */ /* * Configuration flags/parameters */ static int interactive_mode = 0; static int serialize_accept = 0; static int log_access = 0; static char *logdir = NULL; static char *username = NULL; static int listenq_size = LISTENQ_SIZE_DEFAULT; static int errfd = STDERR_FILENO; static int debug = 0; static int sequence = -1; static int seqleft = -1; static int *gotit; /* * Thread throttling parameters (all numbers are per listening socket). * Zero values mean use default. */ static int max_threads = 0; /* Max number of threads */ static int max_wait_threads = 0; /* Max number of "spare" threads */ static int min_wait_threads = 2; /* Min number of "spare" threads */ static int datacomp = 0; /* perform some computation */ static int datawait = 0; /* sleep... */ /****************************************************************** * Useful macros */ #ifndef INADDR_NONE #define INADDR_NONE 0xffffffff #endif #define SEC2USEC(s) ((s)*1000000LL) #define WAIT_THREADS(i) (srv_socket[i].wait_threads) #define BUSY_THREADS(i) (srv_socket[i].busy_threads) #define TOTAL_THREADS(i) (WAIT_THREADS(i) + BUSY_THREADS(i)) #define RQST_COUNT(i) (srv_socket[i].rqst_count) #define RCPT_COUNT(i) (srv_socket[i].rcpt_count) /****************************************************************** * Forward declarations */ static void usage(const char *progname); static void parse_arguments(int argc, char *argv[]); static void start_daemon(void); static void set_thread_throttling(void); static void create_listeners(void); static void change_user(void); static void open_log_files(void); static void start_processes(void); static void wdog_sighandler(int signo); static void child_sighandler(int signo); static void install_sighandlers(void); static void start_threads(void); static void *process_signals(void *arg); static void *flush_acclog_buffer(void *arg); static void *handle_connections(void *arg); static void dump_server_info(void); static void Signal(int sig, void (*handler)(int)); static int cpu_count(void); extern void handle_session(long srv_socket_index, st_netfd_t cli_nfd); extern void load_configs(void); extern void logbuf_open(void); extern void logbuf_flush(void); extern void logbuf_close(void); /* Error reporting functions defined in the error.c file */ extern void err_sys_report(int fd, const char *fmt, ...); extern void err_sys_quit(int fd, const char *fmt, ...); extern void err_sys_dump(int fd, const char *fmt, ...); extern void err_report(int fd, const char *fmt, ...); extern void err_quit(int fd, const char *fmt, ...); /* * General server example: accept a client connection and do something. * This program acts as an SMTP sink. * * This server creates a constant number of processes ("virtual processors" * or VPs) and replaces them when they die. Each virtual processor manages * its own independent set of state threads (STs), the number of which varies * with load against the server. Each state thread listens to exactly one * listening socket. The initial process becomes the watchdog, waiting for * children (VPs) to die or for a signal requesting termination or restart. * Upon receiving a restart signal (SIGHUP), all VPs close and then reopen * log files and reload configuration. All currently active connections remain * active. It is assumed that new configuration affects only request * processing and not the general server parameters such as number of VPs, * thread limits, bind addresses, etc. Those are specified as command line * arguments, so the server has to be stopped and then started again in order * to change them. * * Each state thread loops processing connections from a single listening * socket. Only one ST runs on a VP at a time, and VPs do not share memory, * so no mutual exclusion locking is necessary on any data, and the entire * server is free to use all the static variables and non-reentrant library * functions it wants, greatly simplifying programming and debugging and * increasing performance (for example, it is safe to ++ and -- all global * counters or call inet_ntoa(3) without any mutexes). The current thread on * each VP maintains equilibrium on that VP, starting a new thread or * terminating itself if the number of spare threads exceeds the lower or * upper limit. * * All I/O operations on sockets must use the State Thread library's I/O * functions because only those functions prevent blocking of the entire VP * process and perform state thread scheduling. */ int main(int argc, char *argv[]) { /* Parse command-line options */ parse_arguments(argc, argv); /* Allocate array of server pids */ if ((vp_pids = calloc(vp_count, sizeof(pid_t))) == NULL) err_sys_quit(errfd, "ERROR: calloc failed"); /* Start the daemon */ if (!interactive_mode) start_daemon(); /* Initialize the ST library */ if (st_init() < 0) err_sys_quit(errfd, "ERROR: initialization failed: st_init"); /* Set thread throttling parameters */ set_thread_throttling(); /* Create listening sockets */ create_listeners(); /* Change the user */ if (username) change_user(); /* Open log files */ open_log_files(); /* Start server processes (VPs) */ start_processes(); /* Turn time caching on */ st_timecache_set(1); /* Install signal handlers */ install_sighandlers(); /* Load configuration from config files */ load_configs(); /* Start all threads */ start_threads(); /* Become a signal processing thread */ process_signals(NULL); /* NOTREACHED */ return 1; } /******************************************************************/ static void usage(const char *progname) { fprintf(stderr, "Usage: %s -l []\n" "Possible options:\n" "-a Enable access logging.\n" "-b : Bind to specified address. Multiple" " addresses\n" " are permitted.\n" "-D n Perform data computation.\n" "-E Use SMTP reply code for EHLO response.\n" "-F Use SMTP reply code for final dot response.\n" "-G Use SMTP reply code for greeting.\n" "-h Print this message.\n" "-i Run in interactive mode.\n" "-N Use as hostname in greeting.\n" "-o Accept all mail after n seconds.\n" "-p Create specified number of processes.\n" "-s n Expect n messages with 1..n as body.\n" "-t : Specify thread limits per listening" " socket\n" " Across all processes.\n" "-u Change server's user id to specified" " value.\n" "-q Set max length of pending connections" " queue.\n" "-S Serialize all accept() calls.\n" , progname); exit(1); } /******************************************************************/ static void parse_arguments(int argc, char *argv[]) { extern char *optarg; int opt; char *c; while ((opt = getopt(argc, argv, "ab:d:D:E:F:G:hio:p:l:N:s:St:u:q:w:")) != -1) { switch (opt) { case 'a': log_access = 1; break; case 'b': if (sk_count >= MAX_BIND_ADDRS) err_quit(errfd, "ERROR: max number of bind addresses (%d) exceeded", MAX_BIND_ADDRS); if ((c = strdup(optarg)) == NULL) err_sys_quit(errfd, "ERROR: strdup"); srv_socket[sk_count++].addr = c; break; case 'w': datawait = atoi(optarg); break; case 'd': debug = atoi(optarg); break; case 'D': datacomp = atoi(optarg); break; case 'E': if (IS_SMTP_CODE(optarg, 0)) strlcpy(ehloresp, optarg, sizeof(ehloresp)); break; case 'F': if (IS_SMTP_CODE(optarg, 0)) strlcpy(finaldot, optarg, sizeof(finaldot)); break; case 'G': if (IS_SMTP_CODE(optarg, 0)) strlcpy(greeting, optarg, sizeof(greeting)); break; case 'p': vp_count = atoi(optarg); if (vp_count < 1) err_quit(errfd, "ERROR: invalid number of processes: %s", optarg); break; case 'l': logdir = strdup(optarg); if (NULL == logdir) err_quit(errfd, "ERROR: strdup for logdir %s failed", optarg); break; case 'N': Myname = strdup(optarg); if (NULL == Myname) err_quit(errfd, "ERROR: strdup for myname %s failed", optarg); break; case 's': { size_t len; sequence = (int) strtol(optarg, &c, 10); if (sequence < 1) err_quit(errfd, "ERROR: invalid number for sequence: %s\n", optarg); len = (sequence + 1) * sizeof(int); gotit = (int *) malloc(len); if (NULL == gotit) err_quit(errfd, "ERROR: can't allocate array of size %d\n", len); memset(gotit, 0, len); seqleft = sequence; } break; case 't': max_wait_threads = (int) strtol(optarg, &c, 10); if (*c++ == ':') max_threads = atoi(c); if (max_wait_threads < 0 || max_threads < 0) err_quit(errfd, "ERROR: invalid number of threads: %s", optarg); break; case 'u': username = optarg; break; case 'o': oktime = atoi(optarg) + st_time(); break; case 'q': listenq_size = atoi(optarg); if (listenq_size < 1) err_quit(errfd, "ERROR: invalid listen queue size: %s", optarg); break; case 'i': interactive_mode = 1; break; case 'S': /* * Serialization decision is tricky on some platforms. For example, * Solaris 2.6 and above has kernel sockets implementation, so supposedly * there is no need for serialization. The ST library may be compiled * on one OS version, but used on another, so the need for serialization * should be determined at run time by the application. Since it's just * an example, the serialization decision is left up to user. * Only on platforms where the serialization is never needed on any OS * version st_netfd_serialize_accept() is a no-op. */ serialize_accept = 1; break; case 'h': case '?': usage(argv[0]); } } if (logdir == NULL && !interactive_mode) { err_report(errfd, "ERROR: logging directory is required\n"); usage(argv[0]); } if (getuid() == 0 && username == NULL) err_report(errfd, "WARNING: running as super-user!"); if (vp_count == 0 && (vp_count = cpu_count()) < 1) vp_count = 1; if (sk_count == 0) { sk_count = 1; srv_socket[0].addr = "0.0.0.0"; } } /******************************************************************/ static void start_daemon(void) { pid_t pid; /* Start forking */ if ((pid = fork()) < 0) err_sys_quit(errfd, "ERROR: fork"); if (pid > 0) exit(0); /* parent */ /* First child process */ setsid(); /* become session leader */ if ((pid = fork()) < 0) err_sys_quit(errfd, "ERROR: fork"); if (pid > 0) /* first child */ exit(0); umask(022); if (chdir(logdir) < 0) err_sys_quit(errfd, "ERROR: can't change directory to %s: chdir", logdir); } /****************************************************************** * For simplicity, the minimal size of thread pool is considered * as a maximum number of spare threads (max_wait_threads) that * will be created upon server startup. The pool size can grow up * to the max_threads value. Note that this is a per listening * socket limit. It is also possible to limit the total number of * threads for all sockets rather than impose a per socket limit. */ static void set_thread_throttling(void) { /* * Calculate total values across all processes. * All numbers are per listening socket. */ if (max_wait_threads == 0) max_wait_threads = MAX_WAIT_THREADS_DEFAULT * vp_count; /* Assuming that each client session needs FD_PER_THREAD file descriptors */ if (max_threads == 0) max_threads = (st_getfdlimit() * vp_count) / FD_PER_THREAD / sk_count; if (max_wait_threads > max_threads) max_wait_threads = max_threads; /* * Now calculate per-process values. */ if (max_wait_threads % vp_count) max_wait_threads = max_wait_threads / vp_count + 1; else max_wait_threads = max_wait_threads / vp_count; if (max_threads % vp_count) max_threads = max_threads / vp_count + 1; else max_threads = max_threads / vp_count; if (min_wait_threads > max_wait_threads) min_wait_threads = max_wait_threads; } /******************************************************************/ static void create_listeners(void) { int i, n, sock; char *c; struct sockaddr_in serv_addr; struct hostent *hp; short port; for (i = 0; i < sk_count; i++) { port = 0; if ((c = strchr(srv_socket[i].addr, ':')) != NULL) { *c++ = '\0'; port = (short) atoi(c); } if (srv_socket[i].addr[0] == '\0') srv_socket[i].addr = "0.0.0.0"; if (port == 0) port = SERV_PORT_DEFAULT; /* Create server socket */ if ((sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) err_sys_quit(errfd, "ERROR: can't create socket: socket"); n = 1; if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&n, sizeof(n)) < 0) err_sys_quit(errfd, "ERROR: can't set SO_REUSEADDR: setsockopt"); memset(&serv_addr, 0, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(port); serv_addr.sin_addr.s_addr = inet_addr(srv_socket[i].addr); if (serv_addr.sin_addr.s_addr == INADDR_NONE) { /* not dotted-decimal */ if ((hp = gethostbyname(srv_socket[i].addr)) == NULL) err_quit(errfd, "ERROR: can't resolve address: %s", srv_socket[i].addr); memcpy(&serv_addr.sin_addr, hp->h_addr, hp->h_length); } srv_socket[i].port = port; /* Do bind and listen */ if (bind(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) err_sys_quit(errfd, "ERROR: can't bind to address %s, port %d", srv_socket[i].addr, port); if (listen(sock, listenq_size) < 0) err_sys_quit(errfd, "ERROR: listen"); /* Create file descriptor object from OS socket */ if ((srv_socket[i].nfd = st_netfd_open_socket(sock)) == NULL) err_sys_quit(errfd, "ERROR: st_netfd_open_socket"); /* * On some platforms (e.g. IRIX, Linux) accept() serialization is never * needed for any OS version. In that case st_netfd_serialize_accept() * is just a no-op. Also see the comment above. */ if (serialize_accept && st_netfd_serialize_accept(srv_socket[i].nfd) < 0) err_sys_quit(errfd, "ERROR: st_netfd_serialize_accept"); } } /******************************************************************/ static void change_user(void) { struct passwd *pw; if ((pw = getpwnam(username)) == NULL) err_quit(errfd, "ERROR: can't find user '%s': getpwnam failed", username); if (setgid(pw->pw_gid) < 0) err_sys_quit(errfd, "ERROR: can't change group id: setgid"); if (setuid(pw->pw_uid) < 0) err_sys_quit(errfd, "ERROR: can't change user id: setuid"); err_report(errfd, "INFO: changed process user id to '%s'", username); } /******************************************************************/ static void open_log_files(void) { int fd; char str[32]; if (interactive_mode) return; /* Open access log */ if (log_access) logbuf_open(); /* Open and write pid to pid file */ if ((fd = open(PID_FILE, O_CREAT | O_WRONLY | O_TRUNC, 0644)) < 0) err_sys_quit(errfd, "ERROR: can't open pid file: open"); snprintf(str, sizeof(str), "%d\n", (int)getpid()); if (write(fd, str, strlen(str)) != (ssize_t) strlen(str)) err_sys_quit(errfd, "ERROR: can't write to pid file: write"); close(fd); /* Open error log file */ if ((fd = open(ERRORS_FILE, O_CREAT | O_WRONLY | O_APPEND, 0644)) < 0) err_sys_quit(errfd, "ERROR: can't open error log file: open"); errfd = fd; err_report(errfd, "INFO: starting the server..."); } /******************************************************************/ static void start_processes(void) { int i, status; pid_t pid; sigset_t mask, omask; if (interactive_mode) { my_index = 0; my_pid = getpid(); return; } for (i = 0; i < vp_count; i++) { if ((pid = fork()) < 0) { err_sys_report(errfd, "ERROR: can't create process: fork"); if (i == 0) exit(1); err_report(errfd, "WARN: started only %d processes out of %d", i, vp_count); vp_count = i; break; } if (pid == 0) { my_index = i; my_pid = getpid(); /* Child returns to continue in main() */ return; } vp_pids[i] = pid; } /* * Parent process becomes a "watchdog" and never returns to main(). */ /* Install signal handlers */ Signal(SIGTERM, wdog_sighandler); /* terminate */ Signal(SIGHUP, wdog_sighandler); /* restart */ Signal(SIGUSR1, wdog_sighandler); /* dump info */ /* Now go to sleep waiting for a child termination or a signal */ for ( ; ; ) { if ((pid = wait(&status)) < 0) { if (EINTR == errno) continue; err_sys_quit(errfd, "ERROR: watchdog: wait"); } /* Find index of the exited child */ for (i = 0; i < vp_count; i++) { if (vp_pids[i] == pid) break; } /* Block signals while printing and forking */ sigemptyset(&mask); sigaddset(&mask, SIGTERM); sigaddset(&mask, SIGHUP); sigaddset(&mask, SIGUSR1); sigprocmask(SIG_BLOCK, &mask, &omask); if (WIFEXITED(status)) err_report(errfd, "WARN: watchdog: process %d (pid %d) exited" " with status %d", i, pid, WEXITSTATUS(status)); else if (WIFSIGNALED(status)) err_report(errfd, "WARN: watchdog: process %d (pid %d) terminated" " by signal %d", i, pid, WTERMSIG(status)); else if (WIFSTOPPED(status)) err_report(errfd, "WARN: watchdog: process %d (pid %d) stopped" " by signal %d", i, pid, WSTOPSIG(status)); else err_report(errfd, "WARN: watchdog: process %d (pid %d) terminated:" " unknown termination reason", i, pid); /* Fork another VP */ if ((pid = fork()) < 0) { err_sys_report(errfd, "ERROR: watchdog: can't create process: fork"); } else if (pid == 0) { my_index = i; my_pid = getpid(); /* Child returns to continue in main() */ return; } vp_pids[i] = pid; /* Restore the signal mask */ sigprocmask(SIG_SETMASK, &omask, NULL); } } /******************************************************************/ static void wdog_sighandler(int signo) { int i, err; /* Save errno */ err = errno; /* Forward the signal to all children */ for (i = 0; i < vp_count; i++) { if (vp_pids[i] > 0) kill(vp_pids[i], signo); } /* * It is safe to do pretty much everything here because process is * sleeping in wait() which is async-safe. */ switch (signo) { case SIGHUP: err_report(errfd, "INFO: watchdog: caught SIGHUP"); /* Reopen log files - needed for log rotation */ if (log_access) { logbuf_close(); logbuf_open(); } close(errfd); if ((errfd = open(ERRORS_FILE, O_CREAT | O_WRONLY | O_APPEND, 0644)) < 0) err_sys_quit(STDERR_FILENO, "ERROR: watchdog: open"); break; case SIGTERM: /* Non-graceful termination */ err_report(errfd, "INFO: watchdog: caught SIGTERM, terminating"); unlink(PID_FILE); exit(0); case SIGUSR1: err_report(errfd, "INFO: watchdog: caught SIGUSR1"); break; default: err_report(errfd, "INFO: watchdog: caught signal %d", signo); } /* Restore errno */ errno = err; } /******************************************************************/ static void install_sighandlers(void) { sigset_t mask; int p[2]; /* Create signal pipe */ if (pipe(p) < 0) err_sys_quit(errfd, "ERROR: process %d (pid %d): can't create" " signal pipe: pipe", my_index, my_pid); if ((sig_pipe[0] = st_netfd_open(p[0])) == NULL || (sig_pipe[1] = st_netfd_open(p[1])) == NULL) err_sys_quit(errfd, "ERROR: process %d (pid %d): can't create" " signal pipe: st_netfd_open", my_index, my_pid); /* Install signal handlers */ Signal(SIGTERM, child_sighandler); /* terminate */ Signal(SIGHUP, child_sighandler); /* restart */ Signal(SIGUSR1, child_sighandler); /* dump info */ /* Unblock signals */ sigemptyset(&mask); sigaddset(&mask, SIGTERM); sigaddset(&mask, SIGHUP); sigaddset(&mask, SIGUSR1); sigprocmask(SIG_UNBLOCK, &mask, NULL); } /******************************************************************/ static void child_sighandler(int signo) { int err, fd; err = errno; fd = st_netfd_fileno(sig_pipe[1]); /* write() is async-safe */ if (write(fd, &signo, sizeof(int)) != sizeof(int)) err_sys_quit(errfd, "ERROR: process %d (pid %d): child's signal" " handler: write", my_index, my_pid); errno = err; } /****************************************************************** * The "main" function of the signal processing thread. */ /* ARGSUSED */ static void *process_signals(void *arg) { int signo; (void) arg; for ( ; ; ) { /* Read the next signal from the signal pipe */ if (st_read(sig_pipe[0], &signo, sizeof(int), -1) != sizeof(int)) err_sys_quit(errfd, "ERROR: process %d (pid %d): signal processor:" " st_read", my_index, my_pid); switch (signo) { case SIGHUP: err_report(errfd, "INFO: process %d (pid %d): caught SIGHUP," " reloading configuration", my_index, my_pid); if (interactive_mode) { load_configs(); break; } /* Reopen log files - needed for log rotation */ if (log_access) { logbuf_flush(); logbuf_close(); logbuf_open(); } close(errfd); if ((errfd = open(ERRORS_FILE, O_CREAT | O_WRONLY | O_APPEND, 0644)) < 0) err_sys_quit(STDERR_FILENO, "ERROR: process %d (pid %d): signal" " processor: open", my_index, my_pid); /* Reload configuration */ load_configs(); break; case SIGTERM: /* * Terminate ungracefully since it is generally not known how long * it will take to gracefully complete all client sessions. */ err_report(errfd, "INFO: process %d (pid %d): caught SIGTERM," " terminating", my_index, my_pid); if (log_access) logbuf_flush(); exit(0); case SIGUSR1: err_report(errfd, "INFO: process %d (pid %d): caught SIGUSR1", my_index, my_pid); /* Print server info to stderr */ dump_server_info(); break; default: err_report(errfd, "INFO: process %d (pid %d): caught signal %d", my_index, my_pid, signo); } } /* NOTREACHED */ return NULL; } /****************************************************************** * The "main" function of the access log flushing thread. */ /* ARGSUSED */ static void *flush_acclog_buffer(void *arg) { (void) arg; for ( ; ; ) { st_sleep(ACCLOG_FLUSH_INTERVAL); logbuf_flush(); } /* NOTREACHED */ return NULL; } /******************************************************************/ static void start_threads(void) { long i, n; /* Create access log flushing thread */ if (log_access && st_thread_create(flush_acclog_buffer, NULL, 0, 0) == NULL) err_sys_quit(errfd, "ERROR: process %d (pid %d): can't create" " log flushing thread", my_index, my_pid); /* Create connections handling threads */ for (i = 0; i < sk_count; i++) { err_report(errfd, "INFO: process %d (pid %d): starting %d threads" " on %s:%d", my_index, my_pid, max_wait_threads, srv_socket[i].addr, srv_socket[i].port); WAIT_THREADS(i) = 0; BUSY_THREADS(i) = 0; RQST_COUNT(i) = 0; RCPT_COUNT(i) = 0; for (n = 0; n < max_wait_threads; n++) { if (st_thread_create(handle_connections, (void *)i, 0, 0) != NULL) WAIT_THREADS(i)++; else err_sys_report(errfd, "ERROR: process %d (pid %d): can't create" " thread", my_index, my_pid); } if (WAIT_THREADS(i) == 0) exit(1); } } /******************************************************************/ static void *handle_connections(void *arg) { st_netfd_t srv_nfd, cli_nfd; struct sockaddr_in from; int fromlen; long i = (long) arg; srv_nfd = srv_socket[i].nfd; fromlen = sizeof(from); while (WAIT_THREADS(i) <= max_wait_threads) { cli_nfd = st_accept(srv_nfd, (struct sockaddr *)&from, &fromlen, -1); if (NULL == cli_nfd) { err_sys_report(errfd, "ERROR: can't accept connection: st_accept"); continue; } /* Save peer address, so we can retrieve it later */ st_netfd_setspecific(cli_nfd, &from.sin_addr, NULL); WAIT_THREADS(i)--; BUSY_THREADS(i)++; if (WAIT_THREADS(i) < min_wait_threads && TOTAL_THREADS(i) < max_threads && seqleft != 0) { /* Create another spare thread */ if (st_thread_create(handle_connections, (void *)i, 0, 0) != NULL) WAIT_THREADS(i)++; else err_sys_report(errfd, "ERROR: process %d (pid %d): can't create" " thread", my_index, my_pid); } handle_session(i, cli_nfd); st_netfd_close(cli_nfd); if (seqleft == 0) exit(0); WAIT_THREADS(i)++; BUSY_THREADS(i)--; } WAIT_THREADS(i)--; return NULL; } /******************************************************************/ static void dump_server_info(void) { char *buf; int i, len, s; s = sk_count * 512; if ((buf = malloc(s)) == NULL) { err_sys_report(errfd, "ERROR: malloc failed"); return; } len = snprintf(buf, s, "\n\nProcess #%d (pid %d):\n", my_index, (int)my_pid); for (i = 0; i < sk_count; i++) { len += snprintf(buf + len, s - len, "\nListening Socket #%d:\n" "-------------------------\n" "Address %s:%d\n" "Thread limits (min/max) %d/%d\n" "Waiting threads %d\n" "Busy threads %d\n" "Requests served %d\n" "Recipients %u\n" ,i, srv_socket[i].addr, srv_socket[i].port ,max_wait_threads, max_threads ,WAIT_THREADS(i), BUSY_THREADS(i), RQST_COUNT(i) ,RCPT_COUNT(i)); } write(STDERR_FILENO, buf, len); free(buf); } /****************************************************************** * Stubs */ /* transaction states, these must be sorted! */ #define SSTA_NONE 0x00 /* must be 0 */ #define SSTA_INIT 0x01 /* ta initialized */ #define SSTA_MAIL 0x02 /* MAIL received */ #define SSTA_RCPT 0x04 /* RCPT received */ #define SSTA_DATA 0x08 /* DATA received */ #define SSTA_DOT 0x10 /* Final dot received */ /* * Session handling function stub. */ void handle_session(long srv_socket_index, st_netfd_t cli_nfd) { char resp[512]; char buf[512]; short data = 0; int r,n; struct in_addr *from = st_netfd_getspecific(cli_nfd); int state; int ta_state; static char eot[] = "\r\n.\r\n"; char sbuf[64]; /* ** Maybe reject a command unless ** oktime is not set and it hasn't expired yet. */ #define MAYBE_REJECT (oktime == 0 || st_time() < oktime) ta_state = SSTA_NONE; strlcpy(resp, MAYBE_REJECT ? greeting : "220", sizeof resp); strlcat(resp, " ", sizeof resp); strlcat(resp, Myname, sizeof resp); strlcat(resp, " ESMTP ST sink\r\n", sizeof resp); n = strlen(resp); if (n != 0 && st_write(cli_nfd, resp, n, -1) != n) { err_sys_report(errfd, "WARN: can't write greeting to %s: st_write", inet_ntoa(*from)); return; } state = 0; ta_state = SSTA_INIT; while ((r = st_read(cli_nfd, buf, sizeof(buf), SEC2USEC(REQUEST_TIMEOUT))) >= 0) { if (r == 0) { switch (errno) { case 0: return; case EINTR: continue; case ETIMEDOUT: case EAGAIN: default: err_sys_report(errfd, "WARN: can't read from %s", inet_ntoa(*from)); return; } } /* debug */ if (debug) write(STDERR_FILENO, buf, r); resp[0] = '\0'; if (data) { int i, c, d; i = 0; d = (seqleft == -1) ? -3 : -2; if (datacomp > 0) (void) data_comp(buf, r, datacomp); while ((c = buf[i++]) && i <= r) { if (d == -2 && c == '\n') d = -1; else if (d == -1 && isascii(c) && isdigit(c)) d = 0; if (d >= 0) { if (isascii(c) && isdigit(c)) d = (d * 10) + c - '0'; else if (c == '\r') { if (d > sequence) { err_report(errfd, "WARN: got %d > %d", d, sequence); } else if (gotit[d] > 0) err_report(errfd, "WARN: got %d already", d, gotit[d]); else { gotit[d] = d; if (seqleft > 0) seqleft--; else err_report(errfd, "WARN: left=%d", seqleft); } d = -10; } } if (c == eot[state]) { if (++state >= (int) strlen(eot)) break; } else { state = 0; if (c == eot[state]) ++state; } } if (debug && r > 0) { if (r >= (int) sizeof(buf) - 1) r -= 2; buf[r + 1] = '\0'; snprintf(sbuf, sizeof sbuf, "state: %d, c=%c, i=%d, buf='%s'\r\n", state, isprint(c) ? c : '_', i, buf + i); write(STDERR_FILENO, sbuf, strlen(sbuf)); } if (state >= (int) strlen(eot)) { if (MAYBE_REJECT) snprintf(resp, sizeof(resp), "%s Hmm\r\n", finaldot); else strlcpy(resp, "250 got it\r\n",sizeof resp); data = 0; state = 0; ta_state = SSTA_INIT; } } if (!data) { if (strncasecmp(buf,"data",4) == 0) { if (datawait > 0) st_sleep(datawait); if (ta_state < SSTA_RCPT) strlcpy(resp, "503 Need RCPT\r\n", sizeof resp); else { strlcpy(resp, "354 Go\r\n",sizeof resp); data = 1; ta_state = SSTA_DATA; } } else if (strncasecmp(buf,"quit",4) == 0) { strlcpy(resp, "221 Bye\r\n",sizeof resp); } else if (strncasecmp(buf,"helo",4) == 0) { strlcpy(resp, "250 Hi there\r\n",sizeof resp); } else if (strncasecmp(buf,"ehlo",4) == 0) { if (MAYBE_REJECT && ehloresp[0] != '2') snprintf(resp, sizeof(resp), "%s %s ESMTP Hi\r\n", ehloresp, Myname); else snprintf(resp, sizeof(resp), "250-%s ESMTP Hi there\r\n250 HELP\r\n", Myname); } else if (strncasecmp(buf, "mail from:<", 11) == 0) { ta_state = SSTA_MAIL; if (MAYBE_REJECT && IS_SMTP_CODE(buf, 11)) { snprintf(resp, sizeof(resp), "%c%c%c whatever\r\n", buf[11], buf[12], buf[13]); } else strlcpy(resp, "250 Ok\r\n",sizeof resp); } else if (strncasecmp(buf, "rcpt to:<", 9) == 0) { if (MAYBE_REJECT && IS_SMTP_CODE(buf, 9)) { snprintf(resp, sizeof(resp), "%c%c%c whatever\r\n", buf[9], buf[10], buf[11]); } else if (ta_state < SSTA_MAIL) strlcpy(resp, "503 Need MAIL\r\n", sizeof resp); else { ta_state = SSTA_RCPT; strlcpy(resp, "250 Ok\r\n",sizeof resp); } RCPT_COUNT(srv_socket_index)++; } else if (strncasecmp(buf,"rset",4) == 0) { ta_state = SSTA_INIT; strlcpy(resp, "250 Ok\r\n",sizeof resp); } else if (strncasecmp(buf,"help",4) == 0) { strlcpy(resp, "250 Sorry, I lied.\r\n",sizeof resp); } else if (resp[0] == '\0') { strlcpy(resp, "550 What?\r\n",sizeof resp); } n = strlen(resp); if (n != 0 && st_write(cli_nfd, resp, n, -1) != n) { err_sys_report(errfd, "WARN: can't write response to %s: st_write", inet_ntoa(*from)); return; } if (debug && n != 0) write(STDERR_FILENO, resp, n); if (strncasecmp(buf,"quit",4) == 0) { RQST_COUNT(srv_socket_index)++; return; } } } if (r < 0) { err_sys_report(errfd, "WARN: can't read request from %s: st_read", inet_ntoa(*from)); return; } } /* * Configuration loading function stub. */ void load_configs(void) { err_report(errfd, "INFO: process %d (pid %d): configuration loaded", my_index, my_pid); } /* * Buffered access logging methods. * Note that stdio functions (fopen(3), fprintf(3), fflush(3), etc.) cannot * be used if multiple VPs are created since these functions can flush buffer * at any point and thus write only partial log record to disk. * Also, it is completely safe for all threads of the same VP to write to * the same log buffer without any mutex protection (one buffer per VP, of * course). */ void logbuf_open(void) { } void logbuf_flush(void) { } void logbuf_close(void) { } /****************************************************************** * Small utility functions */ static void Signal(int sig, void (*handler)(int)) { struct sigaction sa; sa.sa_handler = handler; sigemptyset(&sa.sa_mask); sa.sa_flags = 0; sigaction(sig, &sa, NULL); } static int cpu_count(void) { int n; #if defined (_SC_NPROCESSORS_ONLN) n = (int) sysconf(_SC_NPROCESSORS_ONLN); #elif defined (_SC_NPROC_ONLN) n = (int) sysconf(_SC_NPROC_ONLN); #elif defined (HPUX) #include n = mpctl(MPC_GETNUMSPUS, 0, 0); #else n = -1; errno = ENOSYS; #endif return n; } /******************************************************************/