static char rcsid[] = "@(#)$Id: schedule.c,v 1.19 2006/04/09 07:37:06 hurtta Exp $"; /****************************************************************************** * The Elm (ME+) Mail System - $Revision: 1.19 $ $State: Exp $ * * Author: Kari Hurtta (was hurtta+elm@ozone.FMI.FI) *****************************************************************************/ #include "headers.h" DEBUG_VAR(Debug,__FILE__,"net"); #include #ifndef ANSI_C extern int errno; #endif VOLATILE int wait_can_signal = 0; int no_action_routine(fd,data) int fd; void * data; { return 0; } static int ANY_ACTION P_((int fd, void * data)); static int ANY_ACTION(fd,data) int fd; void * data; { return 0; } enum schedule_return no_schedule_routine(fd,data) int fd; void * data; { return schedule_done; } static struct actions { int fd; int timeout_sec; action_routine * read_act; action_routine * write_act; action_routine * timeout_act; time_t next_timeout; void * data; int busy; /* Prevent nested calls -- * when read_act(), write_act() or * timeout_act() calls wait_for_timeout() * or wait_for_action() directly or * indirectly (for example with lib_error) */ schedule_routine * schedule_act; void * schedule_data; } * actions = NULL; static int actions_count = 0; static void remove_action P_((int ptr)); static void remove_action(ptr) int ptr; { int i; DPRINT(Debug,4,(&Debug, "** Removing action %d (fd=%d)\n", ptr, actions[ptr].fd)); for (i = ptr+1; i < actions_count; i++) { actions[i-1] = actions[i]; } actions_count--; } void change_action (fd,timeout_sec,read_act,write_act,timeout_act,data) int fd; int timeout_sec; action_routine * read_act; action_routine * write_act; action_routine * timeout_act; void *data; { int ptr; for (ptr = 0; ptr < actions_count; ptr++) if (actions[ptr].fd == fd) break; if (ptr == actions_count) { if (no_action_routine == read_act && no_action_routine == write_act && no_action_routine == timeout_act) { DPRINT(Debug,9,(&Debug, "change_action: action for (fd=%d) already cleared\n", fd)); return; } actions = safe_realloc(actions, (++actions_count) * sizeof (struct actions)); actions[ptr].timeout_sec = 0; actions[ptr].next_timeout = 0; actions[ptr].busy = 0; actions[ptr].schedule_act = no_schedule_routine; actions[ptr].schedule_data = NULL; DPRINT(Debug,4,(&Debug, "** Adding action %d (fd=%d)\n", ptr,fd)); } if (actions[ptr].timeout_sec != timeout_sec) { actions[ptr].next_timeout = time(NULL) + timeout_sec; } actions[ptr].fd = fd; actions[ptr].timeout_sec = timeout_sec; actions[ptr].read_act = read_act; actions[ptr].write_act = write_act; actions[ptr].timeout_act = timeout_act; actions[ptr].data = data; if (no_action_routine == read_act && no_action_routine == write_act && no_action_routine == timeout_act && actions[ptr].schedule_act == no_schedule_routine) { if (actions[ptr].busy) { DPRINT(Debug,4,(&Debug, "** Action %d busy (fd=%d): remove skipped\n", ptr, actions[ptr].fd)); } else remove_action(ptr); } } void set_schedule_action (fd,routine,data) int fd; schedule_routine *routine; void * data; { int ptr; for (ptr = 0; ptr < actions_count; ptr++) if (actions[ptr].fd == fd) break; if (ptr == actions_count) { if (no_schedule_routine == routine) { DPRINT(Debug,9,(&Debug, "set_schedule_action: action for (fd=%d) already cleared\n", fd)); return; } actions = safe_realloc(actions, (++actions_count) * sizeof (struct actions)); actions[ptr].timeout_sec = 0; actions[ptr].next_timeout = 0; actions[ptr].busy = 0; actions[ptr].timeout_sec = 0; actions[ptr].read_act = no_action_routine; actions[ptr].write_act = no_action_routine; actions[ptr].timeout_act = no_action_routine; actions[ptr].data = NULL; DPRINT(Debug,4,(&Debug, "** Adding action %d (fd=%d)\n", ptr,fd)); } actions[ptr].fd = fd; actions[ptr].schedule_act = routine; actions[ptr].schedule_data = data; if (no_action_routine == actions[ptr].read_act && no_action_routine == actions[ptr].write_act && no_action_routine == actions[ptr].timeout_act && actions[ptr].schedule_act == no_schedule_routine) { if (actions[ptr].busy) { DPRINT(Debug,4,(&Debug, "** Action %d busy (fd=%d): remove skipped\n", ptr, actions[ptr].fd)); } else remove_action(ptr); } } void clear_action(fd) int fd; { set_schedule_action(fd,no_schedule_routine,NULL); change_action(fd,0, no_action_routine,no_action_routine,no_action_routine, NULL); } int have_actions() { return actions_count; } #if POLL_METHOD == 1 /* --------------------- select() ------------------------------------------ */ static int real_wait P_((action_routine * action, time_t wait_timeout,time_t next_timeout)); static int real_wait(action,wait_timeout,next_timeout) action_routine * action; time_t wait_timeout, next_timeout; { int result = 0; int done = 0; int err = 0; char *Y = "action"; if (action == no_action_routine) Y = "no action"; else if (action == ANY_ACTION) Y = "any action"; DPRINT(Debug,20,(&Debug, "real_wait: [select] %s %p, wait_timeout=%ld, next_timeout=%ld (actions_count=%d)\n", Y, action, (long) wait_timeout, (long) next_timeout, actions_count)); do { int i, n = 0; fd_set read_set; fd_set write_set; fd_set exp_set; struct timeval timeout, *timeout_val = NULL; int ret; time_t current; FD_ZERO(&read_set); FD_ZERO(&write_set); FD_ZERO(&exp_set); DPRINT(Debug,20,(&Debug, "real_wait: actions:")); for (i = 0; i < actions_count; i++) { if (n <= actions[i].fd) n = actions[i].fd+1; DPRINT(Debug,20,(&Debug, " (fd=%d",actions[i].fd)); FD_SET(actions[i].fd,&exp_set); if (actions[i].busy) { DPRINT(Debug,20,(&Debug, " busy!")); } else { if (actions[i].read_act != no_action_routine) { FD_SET(actions[i].fd,&read_set); DPRINT(Debug,20,(&Debug, " READ")); } if (actions[i].write_act != no_action_routine) { FD_SET(actions[i].fd,&write_set); DPRINT(Debug,20,(&Debug, " WRITE")); } if (actions[i].timeout_act != no_action_routine) { DPRINT(Debug,20,(&Debug, " TIMEOUT")); } } DPRINT(Debug,20,(&Debug, ")")); } DPRINT(Debug,20,(&Debug, "\n")); if (next_timeout > 0) { int diff = next_timeout - time(NULL); DPRINT(Debug,20,(&Debug, "real_wait: diff=%d\n", diff)); if (diff <= 0) timeout.tv_sec = 0; else timeout.tv_sec = diff; timeout.tv_usec = 0; timeout_val = &timeout; } DPRINT(Debug,20,(&Debug, "real_wait: n = %d, read set=", n)); for (i = 0; i < n; i++) { if (FD_ISSET(i,&read_set)) { DPRINT(Debug,20,(&Debug, " %d", i)); } } DPRINT(Debug,20,(&Debug, ", write set=")); for (i = 0; i < n; i++) { if (FD_ISSET(i,&write_set)) { DPRINT(Debug,20,(&Debug, " %d", i)); } } DPRINT(Debug,20,(&Debug,", exception set=")); for (i = 0; i < n; i++) { if (FD_ISSET(i,&exp_set)) { DPRINT(Debug,20,(&Debug, " %d", i)); } } DPRINT(Debug,20,(&Debug, ", timeout=")); if (timeout_val) { DPRINT(Debug,20,(&Debug, " sec=%ld usec=%ld\n", (long) timeout_val->tv_sec, (long) timeout_val->tv_usec)); } else { DPRINT(Debug,20,(&Debug, "NULL\n")); } wait_can_signal = 1; #ifdef _HPUX_SOURCE ret = select(n,(int *)&read_set,(int *)&write_set, (int *)&exp_set,timeout_val); #else ret = select(n,&read_set,&write_set,&exp_set,timeout_val); #endif wait_can_signal = 0; if (ret < 0) { err = errno; DPRINT(Debug,20,(&Debug, "select: %s (errno %d)\n", error_description(err),err)); result = 0; break; } current = time(NULL); DPRINT(Debug,20,(&Debug, "real_wait: select=%d, current=%ld (time)\n", ret,(long)current)); #ifdef USE_DLOPEN if (ret > 0) { /* If we just timed out, it is quite predictable so no seeding */ union xxx_rand { struct collection { struct timeval XX; /* someone may modify this */ time_t X; fd_set read_set; fd_set write_set; int r; } X; char bytes [sizeof (struct collection)]; } A; if (timeout_val) A.X.XX = *timeout_val; A.X.X = current; memcpy( &(A.X.read_set), &(read_set), sizeof read_set); memcpy( &(A.X.write_set), &(write_set), sizeof write_set); A.X.r = ret; seed_rand_bits(A.bytes, sizeof A, 4 /* entropy bits */ ); } #endif if (ret > 0) { for (i = 0; i < actions_count; i++) { if (!actions[i].busy) { actions[i].busy = 1; if (FD_ISSET(actions[i].fd,&read_set) || FD_ISSET(actions[i].fd,&exp_set)) { int val; DPRINT(Debug,20,(&Debug, "real_wait: running read_act for %d (action %d)\n", actions[i].fd,i)); val = actions[i].read_act(actions[i].fd, actions[i].data); DPRINT(Debug,20,(&Debug, "real_wait: [%d].read_act=%d\n",i,val)); if (action == actions[i].read_act || action == ANY_ACTION) { done = 1; result = 1; } if (!val) actions[i].read_act = no_action_routine; } if (FD_ISSET(actions[i].fd,&write_set) || FD_ISSET(actions[i].fd,&exp_set)) { int val; DPRINT(Debug,20,(&Debug, "real_wait: running write_act for %d (action %d)\n", actions[i].fd,i)); val = actions[i].write_act(actions[i].fd, actions[i].data); DPRINT(Debug,20,(&Debug, "real_wait: [%d].write_act=%d\n",i,val)); if (action == actions[i].write_act || action == ANY_ACTION) { done = 1; result = 1; } if (!val) actions[i].write_act = no_action_routine; } actions[i].busy = 0; } if (FD_ISSET(actions[i].fd,&exp_set)) { DPRINT(Debug,20,(&Debug, "real_wait: [%d] exception\n",i)); actions[i].write_act = no_action_routine; actions[i].read_act = no_action_routine; done = 1; result = 1; } } } for (i = 0; i < actions_count; i++) { if (actions[i].next_timeout && actions[i].next_timeout <= current) { if (actions[i].busy) { DPRINT(Debug,20,(&Debug, "real_wait: %d busy (action %d) -- timout skipped\n", actions[i].fd,i)); } else { int val; actions[i].busy = 1; DPRINT(Debug,20,(&Debug, "real_wait: running timeout_act for %d (action %d)\n", actions[i].fd,i)); val = actions[i].timeout_act(actions[i].fd, actions[i].data); DPRINT(Debug,20,(&Debug, "real_wait: [%d].timeout_act=%d\n",i,val)); if (action == actions[i].timeout_act || action == ANY_ACTION) { done = 1; result = 1; } if (!val) { actions[i].timeout_act = no_action_routine; actions[i].next_timeout = 0; } else { actions[i].next_timeout = current + actions[i].timeout_sec; } actions[i].busy = 0; } } } if (next_timeout > 0 && next_timeout <= current) { done = 1; if (result == 0) result = -1; } if (wait_timeout > 0 && wait_timeout <= current) { done = 1; result = 1; } DPRINT(Debug,20,(&Debug, "real_wait: done=%d, result=%d\n",done,result)); } while(!done); DPRINT(Debug,20,(&Debug, "real_wait=%d (errno=%d)\n",result,err)); errno = err; return result; } #endif #if POLL_METHOD == 2 /* --------------------- poll() -------------------------------------------- */ /* For MAXINT */ #include static int real_wait P_((action_routine * action, time_t wait_timeout,time_t next_timeout)); static int real_wait(action,wait_timeout,next_timeout) action_routine * action; time_t wait_timeout, next_timeout; { int result = 0; int done = 0; int err = 0; struct pollfd * polls = NULL; int polls_count = 0; char *Y = "action"; if (action == no_action_routine) Y = "no action"; else if (action == ANY_ACTION) Y = "any action"; DPRINT(Debug,20,(&Debug, "real_wait: [poll] %s %p, wait_timeout=%ld, next_timeout=%ld (actions_count=%d)\n", Y, action, (long) wait_timeout, (long) next_timeout, actions_count)); do { int i; int ret; time_t current; int timeout = -1; if (polls_count != actions_count) { polls = safe_realloc(polls, actions_count * sizeof(struct pollfd)); polls_count = actions_count; bzero(polls,actions_count * sizeof(struct pollfd)); DPRINT(Debug,20,(&Debug, "real_wait: polls=%p, polls_count=%d\n", polls,polls_count)); } DPRINT(Debug,20,(&Debug, "real_wait: actions:")); for (i = 0; i < actions_count; i++) { DPRINT(Debug,20,(&Debug, " (fd=%d",actions[i].fd)); polls[i].fd = actions[i].fd; polls[i].events = 0; polls[i].revents = 0; if (actions[i].busy) { DPRINT(Debug,20,(&Debug, " busy!")); } else { if (actions[i].read_act != no_action_routine) { polls[i].events |= POLLIN | POLLHUP; DPRINT(Debug,20,(&Debug, " READ")); } if (actions[i].write_act != no_action_routine) { polls[i].events |= POLLOUT; DPRINT(Debug,20,(&Debug, " WRITE")); } if (actions[i].timeout_act != no_action_routine) { DPRINT(Debug,20,(&Debug, " TIMEOUT")); } } DPRINT(Debug,20,(&Debug, ")")); } DPRINT(Debug,20,(&Debug, "\n")); if (next_timeout > 0) { int diff = next_timeout - time(NULL); DPRINT(Debug,20,(&Debug, "real_wait: diff=%d\n", diff)); if (diff <= 0) timeout = -1; else if (diff < MAXINT / 1000) timeout = diff * 1000; else timeout = MAXINT; } DPRINT(Debug,20,(&Debug,"real_wait: polls_count = %d, polls=", polls_count)); for (i = 0; i < polls_count; i++) DPRINT(Debug,10,(&Debug, " {fd=%d events=%d}", polls[i].fd, polls[i].events)); DPRINT(Debug,20,(&Debug, ", timeout=%d\n", timeout)); wait_can_signal = 1; ret = poll(polls,polls_count,timeout); wait_can_signal = 0; if (ret < 0) { err = errno; DPRINT(Debug,20,(&Debug, "select: %s (errno %d)\n", error_description(err),err)); result = 0; break; } current = time(NULL); DPRINT(Debug,20,(&Debug, "real_wait: poll=%d, current=%ld\n", ret,(long)current)); #ifdef USE_DLOPEN if (ret > 0) { /* If we just timed out, it is quit epredictable so no seeding */ union xxx_rand { struct collection { int r; time_t t; } X; char bytes [sizeof (struct collection)]; } A; A.r = ret; A.t = current; seed_rand_bits(A.bytes, sizeof A, 4 /* entroby bits */ ); } #endif if (ret > 0) { for (i = 0; i < polls_count && i < actions_count; i++) { if (polls[i].revents) { DPRINT(Debug,20,(&Debug, "real_wait: polls[%d]: fd=%d revents=%d\n", i,polls[i].fd,polls[i].revents)); } if (!actions[i].busy) { actions[i].busy = 1; if (polls[i].revents && (POLLIN|POLLHUP|POLLERR)) { int val; DPRINT(Debug,20,(&Debug, "real_wait: running read_act for %d (action %d)\n", actions[i].fd,i)); val = actions[i].read_act(actions[i].fd, actions[i].data); DPRINT(Debug,20,(&Debug, "real_wait: [%d].read_act=%d\n",i,val)); if (action == actions[i].read_act || action == ANY_ACTION) { done = 1; result = 1; } if (!val) actions[i].read_act = no_action_routine; } if (polls[i].revents && (POLLOUT|POLLERR)) { int val; DPRINT(Debug,20,(&Debug, "real_wait: running write_act for %d (action %d)\n", actions[i].fd,i)); val = actions[i].write_act(actions[i].fd, actions[i].data); DPRINT(Debug,20,(&Debug, "real_wait: [%d].write_act=%d\n",i,val)); if (action == actions[i].write_act || action == ANY_ACTION) { done = 1; result = 1; } if (!val) actions[i].write_act = no_action_routine; } actions[i].busy = 0; } if (polls[i].revents & POLLERR) { DPRINT(Debug,20,(&Debug, "real_wait: [%d] POLLERR\n",i)); actions[i].write_act = no_action_routine; actions[i].read_act = no_action_routine; done = 1; result = 1; } } } for (i = 0; i < actions_count; i++) { if (actions[i].next_timeout && actions[i].next_timeout <= current) { if (actions[i].busy) { DPRINT(Debug,20,(&Debug, "real_wait: %d busy (action %d) -- timout skipped\n", actions[i].fd,i)); } else { int val; actions[i].busy = 1; DPRINT(Debug,20,(&Debug, "real_wait: running timeout_act for %d (action %d)\n", actions[i].fd,i)); val = actions[i].timeout_act(actions[i].fd, actions[i].data); DPRINT(Debug,20,(&Debug, "real_wait: [%d].timeout_act=%d\n",i,val)); if (action == actions[i].timeout_act || action == ANY_ACTION) { done = 1; result = 1; } if (!val) { actions[i].timeout_act = no_action_routine; actions[i].next_timeout = 0; } else { actions[i].next_timeout = current + actions[i].timeout_sec; } actions[i].busy = 0; } } } if (next_timeout > 0 && next_timeout < current) { done = 1; if (result == 0) result = -1; } if (wait_timeout > 0 && wait_timeout <= current) { done = 1; result = 1; } DPRINT(Debug,20,(&Debug, "real_wait: done=%d, result=%d\n",done,result)); } while(!done); if (polls) free(polls); DPRINT(Debug,20,(&Debug, "real_wait=%d (errno=%d)\n",result,err)); errno = err; return result; } #endif #if POLL_METHOD /* ----------- GENERIC ---------------------------------------------------- */ static int wait_for_something P_((action_routine * action,int seconds)); static int wait_for_something(action,seconds) action_routine * action; int seconds; { time_t next_timeout = 0; time_t wait_timeout = 0; int result,err = 0; int done = 0; int i; if (seconds >= 0) wait_timeout = time(NULL) + seconds; do { next_timeout = wait_timeout; for (i = 0; i < actions_count; i++) { if (!actions[i].busy) { actions[i].busy = 1; switch(actions[i].schedule_act(actions[i].fd, actions[i].schedule_data)) { case schedule_remove: DPRINT(Debug,4,(&Debug, "** Schedule action %d (fd=%d): Remove pending\n", i,actions[i].fd)); clear_action(i); break; case schedule_reconsider: DPRINT(Debug,4,(&Debug, "** Schedule action %d (fd=%d): Reconsider pending\n", i,actions[i].fd)); next_timeout = time(NULL); break; case schedule_have_action: DPRINT(Debug,4,(&Debug, "** Schedule action %d (fd=%d): Action filled\n", i,actions[i].fd)); next_timeout = time(NULL); if (ANY_ACTION == action || no_action_routine != action && (action == actions[i].read_act || action == actions[i].write_act || action == actions[i].timeout_act) ) { DPRINT(Debug,4,(&Debug, "** Schedule action %d (fd=%d): Ready\n", i,actions[i].fd)); done = 1; } break; } actions[i].busy = 0; } } for (i = 0; i < actions_count; i++) { if (!actions[i].busy && actions[i].timeout_act != no_action_routine) if (!next_timeout || next_timeout > actions[i].next_timeout) next_timeout = actions[i].next_timeout; } if (next_timeout < 0) { DPRINT(Debug,1,(&Debug, "** Next timeout %d < 0 -- setting to current time\n", next_timeout)); next_timeout = time(NULL); } result = real_wait(action,wait_timeout,next_timeout); err = errno; for (i = 0; i < actions_count; i++) { if (no_action_routine == actions[i].read_act && no_action_routine == actions[i].write_act && no_action_routine == actions[i].timeout_act && actions[i].schedule_act == no_schedule_routine) { if (actions[i].busy) { DPRINT(Debug,4,(&Debug,"** Action %d busy (fd=%d): remove skipped\n", i, actions[i].fd)); } else remove_action(i); break; } } } while (-1 == result && !done); errno = err; return result; } int wait_for_action(action) action_routine * action; { int ret; DPRINT(Debug,4,(&Debug, "wait_for_action: START | action=%p\n",action)); ret=wait_for_something(action,-1); DPRINT(Debug,4,(&Debug, "wait_for_action=%d END (errno=%d)\n", ret,errno)); return ret; } int wait_for_timeout(seconds) int seconds; { int ret; DPRINT(Debug,4,(&Debug, "wait_for_timeout: START | seconds=%d\n",seconds)); ret = wait_for_something(no_action_routine,seconds); DPRINT(Debug,4,(&Debug, "wait_for_timeout=%d END (errno=%d) \n", ret,errno)); return ret; } int wait_for_action_or_timeout(action,seconds) action_routine * action; int seconds; { int ret; DPRINT(Debug,4,(&Debug, "wait_for_action_or_timeout: START | action=%p seconds=%d\n", action,seconds)); ret = wait_for_something(action,seconds); DPRINT(Debug,4,(&Debug, "wait_for_action_or_timeout=%d END (errno=%d)\n", ret,errno)); return ret; } int wait_for_any_action() { int ret; DPRINT(Debug,4,(&Debug,"wait_for_any_action: START\n")); ret=wait_for_something(ANY_ACTION,-1); DPRINT(Debug,4,(&Debug,"wait_for_any_action=%d END (errno=%d)\n", ret,errno)); return ret; } int wait_for_any_action_or_timeout(seconds) int seconds; { int ret; DPRINT(Debug,4,(&Debug,"wait_for_any_action_or_timeout: START | seconds=%d\n",seconds)); ret = wait_for_something(ANY_ACTION,seconds); DPRINT(Debug,4,(&Debug,"wait_for_any_action_or_timeout=%d END (errno=%d) \n", ret,errno)); return ret; } #endif /* * Local Variables: * mode:c * c-basic-offset:4 * buffer-file-coding-system: iso-8859-1 * End: */