static char rcsid[] = "@(#)$Id: streamsched.c,v 1.38 2006/04/09 07:37:08 hurtta Exp $"; /****************************************************************************** * The Elm (ME+) Mail System - $Revision: 1.38 $ $State: Exp $ * * Author: Kari Hurtta (was hurtta+elm@ozone.FMI.FI) *****************************************************************************/ #include "headers.h" #include "ss_imp.h" #include "connection_imp.h" #ifdef USE_DLOPEN #include "shared_imp.h" #endif DEBUG_VAR(Debug,__FILE__,"net"); #include #ifndef ANSI_C extern int errno; #endif #ifdef POLL_METHOD /* ------------------------------------------------------------------------ */ /* Simple stream implementation */ struct simple_type { struct stream_type *type; int socket; }; static int ss_ReadFromSocket P_((struct streamsched *ss, int stack_idx, struct Read_Buffer *buffer, int wanted)); static int ss_ReadFromSocket(ss,stack_idx,buffer,wanted) struct streamsched *ss; int stack_idx; struct Read_Buffer *buffer; int wanted; { int n = 0; int err = errno; DPRINT(Debug,15,(&Debug, "ss_ReadFromSocket: ss=%p, stack_idx=%d\n", ss,stack_idx)); if (stack_idx < 0 || stack_idx >= ss->stack_len || !ss->this_stack || !ss->this_stack[stack_idx].simple || ss->this_stack[stack_idx].simple->type != &SocketStream) panic("STREAMSCHED PANIC",__FILE__,__LINE__,"ss_ReadFromSocket", "Bad stack index or stack",0); n = ReadFromSocket(ss->this_stack[stack_idx].simple->socket, buffer,wanted); if (n < 0) { err = errno; DPRINT(Debug,8,(&Debug, "ss_ReadFromSocket: read error %s (errno %d)\n", error_description(err),err)); } if (n < 0 && err != EINTR && err != EAGAIN && err != EWOULDBLOCK) { ss->error_state = strmcpy(ss->error_state, error_description(err)); } if (n > 0 && ss->stack_len > 1 && transaction_file) { time_t tmp = time(NULL); struct tm * zz = localtime(&tmp); int i; fprintf(transaction_file, "%d [%d] %02d:%02d:%02d R<< [len %4d] ", getpid(), ss->this_stack[stack_idx].simple->socket, zz ? zz->tm_hour : 00, zz ? zz->tm_min : 00, zz ? zz->tm_sec : 00, n); for (i = 0; i < n && i < 13; i++) { unsigned char c = (unsigned char) buffer -> read_buffer[buffer -> read_len + i]; fprintf(transaction_file," %02X",c); } if (i < n) fprintf(transaction_file," ..."); fprintf(transaction_file,"\n"); } return n; } static int ss_WriteToSocket P_((struct streamsched *ss, int stack_idx, struct Write_Buffer *buffer)); static int ss_WriteToSocket(ss,stack_idx,buffer) struct streamsched *ss; int stack_idx; struct Write_Buffer *buffer; { int n; int err = 0; DPRINT(Debug,15,(&Debug, "ss_WriteToSocket: ss=%p, stack_idx=%d\n", ss,stack_idx)); if (stack_idx < 0 || stack_idx >= ss->stack_len || !ss->this_stack || !ss->this_stack[stack_idx].simple || ss->this_stack[stack_idx].simple->type != &SocketStream) panic("STREAMSCHED PANIC",__FILE__,__LINE__,"ss_WriteToSocket", "Bad stack index or stack",0); n = WriteToSocket(ss->this_stack[stack_idx].simple->socket,buffer); if (n < 0) { err = errno; DPRINT(Debug,8,(&Debug, "ss_WriteToSocket: read error %s (errno %d)\n", error_description(err),err)); } if (n < 0 && err != EINTR && err != EAGAIN && err != EWOULDBLOCK) { ss->error_state = strmcpy(ss->error_state, error_description(err)); } if (n > 0 && ss->stack_len > 1 && transaction_file) { time_t tmp = time(NULL); struct tm * zz = localtime(&tmp); int i; fprintf(transaction_file, "%d [%d] %02d:%02d:%02d R>> [len %4d] ", getpid(), ss->this_stack[stack_idx].simple->socket, zz ? zz->tm_hour : 00, zz ? zz->tm_min : 00, zz ? zz->tm_sec : 00, n); for (i = 0; i < n && i < 13; i++) { unsigned char c = (unsigned char) buffer -> write_buffer[i]; fprintf(transaction_file," %02X",c); } if (i < n) fprintf(transaction_file," ..."); fprintf(transaction_file,"\n"); } return n; } static void ss_FreeSocket P_((struct streamsched *ss,int stack_idx)); static void ss_FreeSocket(ss, stack_idx) struct streamsched *ss; int stack_idx; { DPRINT(Debug,15,(&Debug, "ss_FreeSocket: ss=%p, stack_idx=%d\n", ss,stack_idx)); if (stack_idx < 0 || stack_idx >= ss->stack_len || !ss->this_stack || !ss->this_stack[stack_idx].simple || ss->this_stack[stack_idx].simple->type != &SocketStream) panic("STREAMSCHED PANIC",__FILE__,__LINE__,"ss_FreeSocket", "Bad stack index or stack",0); if (ss->this_stack[stack_idx].simple->socket != -1) { /* No longer schedule for this! */ clear_action(ss->this_stack[stack_idx].simple->socket); close(ss->this_stack[stack_idx].simple->socket); ss->this_stack[stack_idx].simple->socket = -1; } ss->this_stack[stack_idx].simple->type = NULL; free(ss->this_stack[stack_idx].simple); ss->this_stack[stack_idx].simple = NULL; } static int ss_StreamNoAction P_((struct streamsched *ss, int stack_idx)); static int ss_StreamNoAction(ss,stack_idx) struct streamsched *ss; int stack_idx; { DPRINT(Debug,15,(&Debug, "ss_StreamNoAction: ss=%p, stack_idx=%d\n", ss,stack_idx)); panic("STREAMSCHED PANIC",__FILE__,__LINE__,"ss_StreamNoAction", "ss_StreamNoAction called",0); return 0; } static void ss_SocketSchedule P_((struct streamsched *ss, int stack_idx, int previous_needs, int *needs, int *provides)); static void ss_SocketSchedule(ss,stack_idx,previous_needs,needs,provides) struct streamsched *ss; int stack_idx; int previous_needs; int *needs; int *provides; { DPRINT(Debug,15,(&Debug, "ss_SocketSchedule: ss=%p, stack_idx=%d\n", ss,stack_idx)); if (stack_idx < 0 || stack_idx >= ss->stack_len || !ss->this_stack || !ss->this_stack[stack_idx].simple || ss->this_stack[stack_idx].simple->type != &SocketStream) panic("STREAMSCHED PANIC",__FILE__,__LINE__,"ss_SocketSchedule", "Bad stack index or stack",0); *needs = previous_needs & (SS_read_act|SS_write_act|SS_timeout_act); /* We indicate nothing because read_engine and write_engine will call action routines ... */ *provides = 0; } static void ss_SocketInfo P_((struct streamsched *ss, int stack_idx, enum SS_info function, int *int_val, char **str_val)); static void ss_SocketInfo(ss,stack_idx,function,int_val,str_val) struct streamsched *ss; int stack_idx; enum SS_info function; int *int_val; char **str_val; { DPRINT(Debug,15,(&Debug, "ss_SocketInfo: ss=%p, stack_idx=%d\n", ss,stack_idx)); if (stack_idx < 0 || stack_idx >= ss->stack_len || !ss->this_stack || !ss->this_stack[stack_idx].simple || ss->this_stack[stack_idx].simple->type != &SocketStream) panic("STREAMSCHED PANIC",__FILE__,__LINE__,"ss_SocketInfo", "Bad stack index or stack",0); /* We are not intresting any of functions currectly so do nothing ... */ } struct stream_type SocketStream = { SS_type_magic, "Socket", ss_ReadFromSocket, ss_WriteToSocket, ss_FreeSocket, ss_StreamNoAction, ss_StreamNoAction, ss_SocketSchedule, ss_StreamNoAction, ss_StreamNoAction, ss_SocketInfo }; /* ----------------------------------------------------------------------- */ static char * return_stream_ident P_((struct streamsched *ss)); static char * return_stream_ident(ss) struct streamsched *ss; { static char BUFFER[10]; if (ss->stack_len < 1 || ss->this_stack[ss->stack_len-1].simple->type != &SocketStream) return "*"; if (ss->this_stack[ss->stack_len-1].simple->socket == -1) return "*"; elm_sfprintf(BUFFER,sizeof BUFFER, FRM("%d"), ss->this_stack[ss->stack_len-1].simple->socket); return BUFFER; } static void free_error_state P_((struct streamsched *ss)); static void free_error_state(ss) struct streamsched *ss; { if (ss->error_state) { free(ss->error_state); ss->error_state = NULL; } } static int call_user_read_callback P_((struct streamsched *ss)); static int call_user_read_callback(ss) struct streamsched *ss; { int ret = ss->read_act(ss,ss->data); if (!ret) { ss->read_act = ss_noaction_routine; } ss->active_flags &= ~SS_read_act; return ret; } static int call_user_write_callback P_((struct streamsched *ss)); static int call_user_write_callback(ss) struct streamsched *ss; { int ret = ss->write_act(ss,ss->data); if (!ret) { ss->write_act = ss_noaction_routine; } ss->active_flags &= ~SS_write_act; return ret; } static int call_user_timeout_callback P_((struct streamsched *ss)); static int call_user_timeout_callback(ss) struct streamsched *ss; { int ret = ss->timeout_act(ss,ss->data); if (!ret) { ss->timeout_act = ss_noaction_routine; } ss->active_flags &= ~SS_timeout_act; return ret; } static void xx_will P_((struct streamsched *ss, char *Tag, int level, char *action)); static void xx_will(ss,Tag, level, action) struct streamsched *ss; char *Tag; int level; char * action; { char *s = return_stream_ident(ss); DPRINT(Debug,20,(&Debug, " ... [%s] %*s %s will %s\n", s, level*2+3, "", Tag,action)); } static int call_read_callback P_((struct streamsched *ss, int level)); static int call_read_callback(ss,level) struct streamsched *ss; int level; { int ret = 1; if (level >= 0) { xx_will(ss,(*(ss -> this_stack[level].TYPE))->tag,level, "read"); ret = (*(ss -> this_stack[level].TYPE))-> read_action(ss,level); } else { if (ss->read_act != ss_noaction_routine) { xx_will(ss,"Client",level,"read"); ret = call_user_read_callback(ss); } else ret = 0; } return ret; } static int call_write_callback P_((struct streamsched *ss, int level)); static int call_write_callback(ss,level) struct streamsched *ss; int level; { int ret = 1; if (level >= 0) { xx_will(ss,(*(ss -> this_stack[level].TYPE))->tag,level, "write"); ret = (*(ss -> this_stack[level].TYPE))-> write_action(ss,level); } else { if (ss->write_act != ss_noaction_routine) { xx_will(ss,"Client",level,"write"); ret = call_user_write_callback(ss); } else ret = 0; } return ret; } static void after_action P_((struct streamsched *ss)); static int read_engine P_((int fd, void * data)); static int read_engine(fd,data) int fd; void * data; { struct streamsched *ss = data; int ret = 0; DPRINT(Debug,19,(&Debug, "read_engine: ss=%p\n",ss)); if (ss->magic != SS_magic) panic("STREAMSCHED PANIC",__FILE__,__LINE__,"read_engine", "Bad stream (magic)",0); /* Bottom of stack is never called ... instead call previous routine ... */ ret = call_read_callback(ss,ss->stack_len-2); after_action(ss); DPRINT(Debug,19,(&Debug, "read_engine=%d %s\n", ret, ret == 0 ? "(disabling)" : "")); return ret; } static int write_engine P_((int fd, void * data)); static int write_engine(fd,data) int fd; void * data; { int ret; struct streamsched *ss = data; DPRINT(Debug,19,(&Debug, "write_engine: ss=%p\n",ss)); if (ss->magic != SS_magic) panic("STREAMSCHED PANIC",__FILE__,__LINE__,"write_engine", "Bad stream (magic)",0); /* Bottom of stack is newer called ... instead call previous routine ... */ ret = call_write_callback(ss,ss->stack_len-2); after_action(ss); DPRINT(Debug,19,(&Debug, "write_engine=%d %s\n", ret, ret == 0 ? "(disabling)" : "")); return ret; } static int timeout_engine P_((int fd, void * data)); static int timeout_engine(fd,data) int fd; void * data; { int ret; struct streamsched *ss = data; DPRINT(Debug,19,(&Debug, "timeout_engine: ss=%p\n",ss)); if (ss->magic != SS_magic) panic("STREAMSCHED PANIC",__FILE__,__LINE__,"timeout_engine", "Bad stream (magic)",0); ret = call_user_timeout_callback(ss); after_action(ss); DPRINT(Debug,19,(&Debug, "timeout_engine=%d %s\n", ret, ret == 0 ? "(disabling)" : "")); return ret; } static int prewait P_((struct streamsched *ss, int *restart)); static enum schedule_return schedule_engine P_((int fd, void * data)); static enum schedule_return schedule_engine(fd,data) int fd; void * data; { enum schedule_return ret = schedule_done; struct streamsched *ss = data; int retry = 0; int mask = 0; char * Ident; int old_values; DPRINT(Debug,19,(&Debug, "schedule_engine: ss=%p\n",ss)); if (ss->magic != SS_magic) panic("STREAMSCHED PANIC",__FILE__,__LINE__,"schedule_engine", "Bad stream (magic)",0); Ident = return_stream_ident(ss); old_values = ss->active_flags; mask = prewait(ss,&retry); if (ss->timeout_act != ss_noaction_routine) { DPRINT(Debug,20,(&Debug, " ... [%s] stream timeout=%d\n", Ident,ss->timeout_sec)); mask |= SS_timeout_act; } if (ss->stack_len < 1 || ss->this_stack[ss->stack_len-1].simple->type != &SocketStream) { DPRINT(Debug,1,(&Debug, "bad stream -- length of stream = %d\n", ss->stack_len)); if (ss->stack_len > 0) { DPRINT(Debug,1,(&Debug, " -- end of stream %s\n", SS_type_magic == ss->this_stack[ss->stack_len-1]. simple->type->magic ? "valid type" : "not valid")); } panic("STREAMSCHED PANIC",__FILE__,__LINE__,"schedule_engine", "No socket on end of stream",0); } else if (ss->this_stack[ss->stack_len-1].simple->socket == -1) panic("STREAMSCHED PANIC",__FILE__,__LINE__,"schedule_engine", "Socket closed on end of stream",0); else change_action(ss->this_stack[ss->stack_len-1].simple->socket, ss->timeout_sec, (mask & SS_read_act) ? read_engine : no_action_routine, (mask & SS_write_act) ? write_engine : no_action_routine, (mask & SS_timeout_act) ? timeout_engine : no_action_routine, ss); if (old_values != ss->active_flags) { DPRINT(Debug,20,(&Debug, " ... [%s] active flags changed ... quit waiting\n", Ident)); ret = schedule_have_action; } else if (retry > 0) { DPRINT(Debug,20,(&Debug, " ... [%s] retrying without waiting\n", Ident)); ret = schedule_reconsider; } else if (retry < 0) { DPRINT(Debug,20,(&Debug, " ... [%s] Signaling wait ready\n", Ident)); ret = schedule_have_action; } DPRINT(Debug,19,(&Debug, "schedule_engine=%d\n",ret)); return ret; } static void xx_wants P_((struct streamsched *ss, char *Tag, int level, int mask)); static void xx_wants(ss,Tag, level, mask) struct streamsched *ss; char *Tag; int level; int mask; { char *s = return_stream_ident(ss); DPRINT(Debug,20,(&Debug, " ... [%s] %*s %s wants", s, level*2+3, "", Tag)); if (mask & SS_read_act) DPRINT(Debug,20,(&Debug," read")); if (mask & SS_write_act) DPRINT(Debug,20,(&Debug," write")); if (mask & SS_timeout_act) DPRINT(Debug,20,(&Debug," timeout")); if (mask & SS_setup_act) DPRINT(Debug,20,(&Debug," setup")); if (mask & SS_shutdown_act) DPRINT(Debug,20,(&Debug," shutdown")); if (0 == mask) DPRINT(Debug,20,(&Debug," NONE")); DPRINT(Debug,20,(&Debug,"\n")); } static void xx_provides P_((struct streamsched *ss, char *Tag, int level, int mask)); static void xx_provides(ss,Tag, level, mask) struct streamsched *ss; char *Tag; int level; int mask; { char *s = return_stream_ident(ss); DPRINT(Debug,20,(&Debug, " ... [%s] %*s %s provides", s, level*2+3, "", Tag)); if (mask & SS_read_act) DPRINT(Debug,20,(&Debug," read")); if (mask & SS_write_act) DPRINT(Debug,20,(&Debug," write")); if (mask & SS_timeout_act) DPRINT(Debug,20,(&Debug," timeout")); if (mask & SS_setup_act) DPRINT(Debug,20,(&Debug," setup")); if (mask & SS_shutdown_act) DPRINT(Debug,20,(&Debug," shutdown")); if (0 == mask) DPRINT(Debug,20,(&Debug," NONE")); DPRINT(Debug,20,(&Debug,"\n")); } static int prewait(ss,restart) struct streamsched *ss; int *restart; { int return_mask = 0; int want_mask = 0; int level; char * Ident; if (ss->magic != SS_magic) panic("STREAMSCHED PANIC",__FILE__,__LINE__,"prewait", "Bad stream (magic)",0); Ident = return_stream_ident(ss); if (ss_noaction_routine != ss->read_act) want_mask |= SS_read_act; if (ss_noaction_routine != ss->write_act) want_mask |= SS_write_act; if (ss_noaction_routine != ss->timeout_act) want_mask |= SS_timeout_act; DPRINT(Debug,20,(&Debug, " ... [%s] --- before action:\n",Ident)); xx_wants(ss,"Client",-1,want_mask); for (level = 0; level < ss->stack_len; level++) { int old_mask = want_mask; int provide_mask; int reconsider = 0; int first = 1; int count = 0; do { if (reconsider) { count ++; if (count > 2) { DPRINT(Debug,1,(&Debug, "prewait [%s] recondsidering [%d] delayed!\n", Ident,count)); *restart = 1; break; } DPRINT(Debug,20,(&Debug, " ... [%s] %*s ... reconsidering [%d]\n", Ident,level*2+3,"",count)); reconsider = 0; } (*(ss -> this_stack[level].TYPE))-> schedule_data(ss, level,old_mask, &want_mask,&provide_mask); if (level == ss->stack_len-1) return_mask = want_mask; xx_wants(ss,(*(ss -> this_stack[level].TYPE))->tag,level,want_mask); xx_provides(ss,(*(ss -> this_stack[level].TYPE))->tag,level, provide_mask); if (provide_mask & SS_write_act) { if(call_write_callback(ss,level-1)) { *restart = 1; DPRINT(Debug,20,(&Debug, " ... [%s] %*s ... from beginning again\n", Ident,level*2+3,"")); break; } else reconsider = 1; } if (provide_mask & SS_read_act) { if(call_read_callback(ss,level-1)) { *restart = 1; DPRINT(Debug,20,(&Debug, " ... [%s] %*s ... from beginning again\n", Ident,level*2+3,"")); break; } else reconsider = 1; } if (first) { switch(want_mask & (SS_setup_act|SS_shutdown_act)) { case SS_setup_act: /* Setup is just internal action */ xx_will(ss,(*(ss -> this_stack[level].TYPE))->tag,level, "setup"); if (!(*(ss -> this_stack[level].TYPE))-> setup_action(ss,level)) { *restart = -1; DPRINT(Debug,20,(&Debug, " ... [%s] %*s ... from beginning again (quiting)\n", Ident,level*2+3,"")); } else reconsider = 1; ss->active_flags &= ~SS_setup_act; break; case SS_shutdown_act: /* Shutdown is just internal action */ xx_will(ss,(*(ss -> this_stack[level].TYPE))->tag,level, "shutdown"); if (!(*(ss -> this_stack[level].TYPE))-> shutdown_action(ss,level)) { *restart = -1; DPRINT(Debug,20,(&Debug, " ... [%s] %*s ... from beginning again (quiting)\n", Ident,level*2+3,"")); } else reconsider = 1; ss->active_flags &= ~SS_shutdown_act; break; case SS_shutdown_act|SS_setup_act: DPRINT(Debug,1,(&Debug, " ... [%s] %*s ... BOTH setup and shutdown wanted?", Ident,level*2+3,"")); panic("STREAMSCHED PANIC",__FILE__,__LINE__, "prewait", "Both setup and shutdown wanted to same stream",0); } first = 0; } } while (reconsider && !ss->error_state); if (ss->error_state) { DPRINT(Debug,20,(&Debug, " ... [%s] Error state: %s\n",Ident, ss->error_state)); *restart = 0; } } return return_mask; } static void after_action(ss) struct streamsched *ss; { int i; int want_mask; char *s; if (ss->magic != SS_magic) panic("STREAMSCHED PANIC",__FILE__,__LINE__,"after_action", "Bad stream (magic)",0); want_mask = ss->active_flags; s = return_stream_ident(ss); DPRINT(Debug,20,(&Debug, " ... [%s] --- after action:\n",s)); for (i = ss->stack_len-2; i >= 0; i--) { int mask; (*(ss -> this_stack[i].TYPE))-> schedule_data(ss,i,want_mask,&want_mask,&mask); xx_provides(ss,(*(ss -> this_stack[i].TYPE))->tag,i,mask); xx_wants(ss,(*(ss -> this_stack[i].TYPE))->tag,i,want_mask); if (mask & SS_read_act) call_read_callback(ss,i-1); if (mask & SS_write_act) call_write_callback(ss,i-1); } } void WaitStreamFor(ss,flags) struct streamsched *ss; int flags; { char * Ident; if (ss->magic != SS_magic) panic("STREAMSCHED PANIC",__FILE__,__LINE__,"WaitStreamFor", "Bad stream (magic)",0); DPRINT(Debug,20,(&Debug, "WaitStreamFor: ss=%p, flags=%d <--- START\n", ss,flags)); Ident = return_stream_ident(ss); ss->active_flags = flags; while (ss->active_flags == flags) { if (ss->stack_len < 1 || ss->this_stack[ss->stack_len-1].simple->type != &SocketStream) { DPRINT(Debug,1,(&Debug, "bad stream -- length of stream = %d\n", ss->stack_len)); if (ss->stack_len > 0) { DPRINT(Debug,1,(&Debug, " -- end of stream %s\n", SS_type_magic == ss->this_stack[ss->stack_len-1]. simple->type->magic ? "valid type" : "not valid")); } panic("STREAMSCHED PANIC",__FILE__,__LINE__,"WaitStreamFor", "No socket on end of stream",0); } else if (ss->this_stack[ss->stack_len-1].simple->socket == -1) panic("STREAMSCHED PANIC",__FILE__,__LINE__,"WaitStreamFor", "Socket closed on end of stream",0); else set_schedule_action(ss->this_stack[ss->stack_len-1].simple->socket, schedule_engine, ss); wait_for_any_action(); if (ss->error_state) { /* Let's hope that read_callback is available ... */ if ( ss_noaction_routine == ss->read_act) { DPRINT(Debug,4,(&Debug, " ... [%s] Can't signal on error state: %s\n", Ident,ss->error_state)); } else { DPRINT(Debug,4,(&Debug, " ... [%s] Signaling on error state: %s\n", Ident,ss->error_state)); call_user_read_callback(ss); } } } DPRINT(Debug,20,(&Debug, "WaitStreamFor <--- END\n")); return; } int ReadFromStream(ss,buffer,wanted) struct streamsched *ss; struct Read_Buffer *buffer; int wanted; { int n; if (ss->magic != SS_magic) panic("STREAMSCHED PANIC",__FILE__,__LINE__,"ReadFromStream", "Bad stream (magic)",0); n = (*(ss -> this_stack[0].TYPE))->read_from_it(ss,0,buffer,wanted); if (n > 0) { if (transaction_file) { time_t tmp = time(NULL); struct tm * zz = localtime(&tmp); int i; int q = 0; char * id = return_stream_ident(ss); fprintf(transaction_file, "%d [%s] %02d:%02d:%02d %c<< [len %4d] ", getpid(),id, zz ? zz->tm_hour : 00, zz ? zz->tm_min : 00, zz ? zz->tm_sec : 00, ss->stack_len > 1 ? 'T' : '<', n); for (i = 0; i < n; i++) { unsigned char c = (unsigned char) buffer -> read_buffer[buffer -> read_len + i]; if (isascii(c) && isprint(c) && c != '"') { if (!q) { putc(' ',transaction_file); putc('"',transaction_file); q++; } putc(c,transaction_file); } else { if (q) { putc('"',transaction_file); q = 0; } fprintf(transaction_file," %02X",c); if (c == 0x0A && i < n-1) { fprintf(transaction_file, "\n%d [%s] %02d:%02d:%02d %c<< [continue] ", getpid(),id, zz ? zz->tm_hour : 00, zz ? zz->tm_min : 00, zz ? zz->tm_sec : 00, ss->stack_len > 1 ? 'T' : '<'); } } } if (q) { putc('"',transaction_file); } putc('\n',transaction_file); } buffer -> read_len += n; DPRINT(Debug,8,(&Debug, "ReadFromStream: Read %d bytes (%d total)\n", n,buffer->read_len)); } return n; } char * RemoveStreamError(ss) struct streamsched *ss; { char * ret; if (ss->magic != SS_magic) panic("STREAMSCHED PANIC",__FILE__,__LINE__,"RemoveStreamError", "Bad stream (magic)",0); ret = ss->error_state; ss->error_state = NULL; return ret; } int WriteToStream(ss,buffer) struct streamsched *ss; struct Write_Buffer *buffer; { int n; if (ss->magic != SS_magic) panic("STREAMSCHED PANIC",__FILE__,__LINE__,"WriteToStream", "Bad stream (magic)",0); n = (*(ss -> this_stack[0].TYPE))->write_to_it(ss,0,buffer); if (n > 0) { if (transaction_file) { time_t tmp = time(NULL); struct tm * zz = localtime(&tmp); int i; int q = 0; char * id = return_stream_ident(ss); fprintf(transaction_file, "%d [%s] %02d:%02d:%02d %c>> [len %4d] ", getpid(),id, zz ? zz->tm_hour : 00, zz ? zz->tm_min : 00, zz ? zz->tm_sec : 00, ss->stack_len > 1 ? 'T' : '>', n); for (i = 0; i < n; i++) { unsigned char c = (unsigned char) buffer->write_buffer[i]; if (isascii(c) && isprint(c) && c != '"') { if (!q) { putc(' ',transaction_file); putc('"',transaction_file); q++; } putc(c,transaction_file); } else { if (q) { putc('"',transaction_file); q = 0; } fprintf(transaction_file," %02X",c); if (c == 0x0A && i < n-1) { fprintf(transaction_file, "\n%d [%s] %02d:%02d:%02d %c>> [continue] ", getpid(),id, zz ? zz->tm_hour : 00, zz ? zz->tm_min : 00, zz ? zz->tm_sec : 00, ss->stack_len > 1 ? 'T' : '>'); } } } if (q) { putc('"',transaction_file); } putc('\n',transaction_file); } } return n; } static union stream_types alloc_simple P_((int socket)); static union stream_types alloc_simple(socket) int socket; { union stream_types ret; struct simple_type * S = safe_malloc(sizeof (struct simple_type)); /* bzero is defined on hdrs/defs.h */ bzero((void *)S,sizeof (struct simple_type)); S->type = &SocketStream; S->socket = socket; ret.simple = S; return ret; } void add_stream_to_head(ss,stream) struct streamsched *ss; union stream_types stream; { int i; #ifdef USE_DLOPEN load_shared_libs1(&use_shared_connect); #endif if (SS_type_magic != (*(stream.TYPE))->magic) { panic("STREAMSCHED PANIC",__FILE__,__LINE__,"add_stream_to_head", "Bad streamtupe (magic)",0); } ss->this_stack = safe_realloc(ss->this_stack, (ss->stack_len+1) * sizeof (ss->this_stack[0])); for (i = ss->stack_len; i > 0; i--) ss->this_stack[i] = ss->this_stack[i-1]; ss->this_stack[0] = stream; ss->stack_len++; if (transaction_file) { time_t tmp = time(NULL); struct tm * zz = localtime(&tmp); char * id = return_stream_ident(ss); int i; fprintf(transaction_file, "%d [%s] %02d:%02d:%02d ---", getpid(),id, zz ? zz->tm_hour : 00, zz ? zz->tm_min : 00, zz ? zz->tm_sec : 00); for (i = 0; i < ss->stack_len; i++) fprintf(transaction_file," %s", (*(ss->this_stack[i].TYPE))->tag); fprintf(transaction_file,"\n"); } } int ss_noaction_routine(ss,data) struct streamsched *ss; void * data; { panic("STREAMSCHED PANIC",__FILE__,__LINE__,"ss_noaction_routine", "ss_noaction_routine called",0); return 0; } struct streamsched * returnSimpleStream(socket) int socket; { struct streamsched * ret = safe_malloc(sizeof (struct streamsched)); /* bzero is defined on hdrs/defs.h */ bzero((void *)ret,sizeof (struct streamsched)); ret->this_stack = NULL; ret->stack_len = 0; ret->read_act = ss_noaction_routine; ret->write_act = ss_noaction_routine; ret->timeout_act = ss_noaction_routine; ret->timeout_sec = 0; ret->error_state = NULL; ret->active_flags = 0; ret->data = NULL; ret->magic = SS_magic; add_stream_to_head(ret,alloc_simple(socket)); return ret; } void ConfigStream(ss,read_act,write_act,timeout_act,timeout_sec,data) struct streamsched *ss; ss_action_routine * read_act; ss_action_routine * write_act; ss_action_routine * timeout_act; int timeout_sec; void * data; { if (ss->magic != SS_magic) panic("STREAMSCHED PANIC",__FILE__,__LINE__,"ConfigStream", "Bad stream (magic)",0); ss->read_act = read_act; ss->write_act = write_act; ss->timeout_act = timeout_act; ss->timeout_sec = timeout_sec; ss->data = data; } void FreeStreamStack(ss) struct streamsched **ss; { if ((*ss)->magic != SS_magic) panic("STREAMSCHED PANIC",__FILE__,__LINE__,"ConfigStream", "Bad stream (magic)",0); if ((*ss) -> this_stack) { int i; /* NOTE: free_it may start shutdown_act */ for (i = 0; i < (*ss)->stack_len; i++) (*((*ss) -> this_stack[i].TYPE))->free_it(*ss,i); free((*ss) -> this_stack); (*ss) -> this_stack = NULL; } (*ss)->stack_len = 0; free_error_state(*ss); free(*ss); *ss = NULL; } /* Unknown functions does not modify *int_val or *str_val */ void StreamInfo(ss,function,int_val,str_val) struct streamsched *ss; enum SS_info function; int *int_val; char **str_val; { int max = 0; int i; switch (function) { case SS_ssf: /* "Security Strength Factor" */ /* return maximum of values */ for (i = 0; i < ss->stack_len; i++) { int val = 0; /* Default is 0 */ (*(ss -> this_stack[i].TYPE))->query_it(ss,i, function, &val,NULL); if (val > max) max = val; } *int_val = max; break; } } #endif /* * Local Variables: * mode:c * c-basic-offset:4 * buffer-file-coding-system: iso-8859-1 * End: */