static char rcsid[] = "@(#)$Id: streamsched.c,v 1.38 2006/04/09 07:37:08 hurtta Exp $";
/******************************************************************************
* The Elm (ME+) Mail System - $Revision: 1.38 $ $State: Exp $
*
* Author: Kari Hurtta <hurtta+elm@posti.FMI.FI> (was hurtta+elm@ozone.FMI.FI)
*****************************************************************************/
#include "headers.h"
#include "ss_imp.h"
#include "connection_imp.h"
#ifdef USE_DLOPEN
#include "shared_imp.h"
#endif
DEBUG_VAR(Debug,__FILE__,"net");
#include <errno.h>
#ifndef ANSI_C
extern int errno;
#endif
#ifdef POLL_METHOD
/* ------------------------------------------------------------------------ */
/* Simple stream implementation */
struct simple_type {
struct stream_type *type;
int socket;
};
static int ss_ReadFromSocket P_((struct streamsched *ss,
int stack_idx,
struct Read_Buffer *buffer,
int wanted));
static int ss_ReadFromSocket(ss,stack_idx,buffer,wanted)
struct streamsched *ss;
int stack_idx;
struct Read_Buffer *buffer;
int wanted;
{
int n = 0;
int err = errno;
DPRINT(Debug,15,(&Debug,
"ss_ReadFromSocket: ss=%p, stack_idx=%d\n",
ss,stack_idx));
if (stack_idx < 0 || stack_idx >= ss->stack_len ||
!ss->this_stack || !ss->this_stack[stack_idx].simple ||
ss->this_stack[stack_idx].simple->type != &SocketStream)
panic("STREAMSCHED PANIC",__FILE__,__LINE__,"ss_ReadFromSocket",
"Bad stack index or stack",0);
n = ReadFromSocket(ss->this_stack[stack_idx].simple->socket,
buffer,wanted);
if (n < 0) {
err = errno;
DPRINT(Debug,8,(&Debug,
"ss_ReadFromSocket: read error %s (errno %d)\n",
error_description(err),err));
}
if (n < 0 && err != EINTR && err != EAGAIN && err != EWOULDBLOCK) {
ss->error_state = strmcpy(ss->error_state,
error_description(err));
}
if (n > 0 && ss->stack_len > 1 && transaction_file) {
time_t tmp = time(NULL);
struct tm * zz = localtime(&tmp);
int i;
fprintf(transaction_file,
"%d [%d] %02d:%02d:%02d R<< [len %4d] ",
getpid(),
ss->this_stack[stack_idx].simple->socket,
zz ? zz->tm_hour : 00,
zz ? zz->tm_min : 00,
zz ? zz->tm_sec : 00,
n);
for (i = 0; i < n && i < 13; i++) {
unsigned char c =
(unsigned char) buffer -> read_buffer[buffer -> read_len
+ i];
fprintf(transaction_file," %02X",c);
}
if (i < n)
fprintf(transaction_file," ...");
fprintf(transaction_file,"\n");
}
return n;
}
static int ss_WriteToSocket P_((struct streamsched *ss,
int stack_idx,
struct Write_Buffer *buffer));
static int ss_WriteToSocket(ss,stack_idx,buffer)
struct streamsched *ss;
int stack_idx;
struct Write_Buffer *buffer;
{
int n;
int err = 0;
DPRINT(Debug,15,(&Debug,
"ss_WriteToSocket: ss=%p, stack_idx=%d\n",
ss,stack_idx));
if (stack_idx < 0 || stack_idx >= ss->stack_len ||
!ss->this_stack || !ss->this_stack[stack_idx].simple ||
ss->this_stack[stack_idx].simple->type != &SocketStream)
panic("STREAMSCHED PANIC",__FILE__,__LINE__,"ss_WriteToSocket",
"Bad stack index or stack",0);
n = WriteToSocket(ss->this_stack[stack_idx].simple->socket,buffer);
if (n < 0) {
err = errno;
DPRINT(Debug,8,(&Debug,
"ss_WriteToSocket: read error %s (errno %d)\n",
error_description(err),err));
}
if (n < 0 && err != EINTR && err != EAGAIN && err != EWOULDBLOCK) {
ss->error_state = strmcpy(ss->error_state,
error_description(err));
}
if (n > 0 && ss->stack_len > 1 && transaction_file) {
time_t tmp = time(NULL);
struct tm * zz = localtime(&tmp);
int i;
fprintf(transaction_file,
"%d [%d] %02d:%02d:%02d R>> [len %4d] ",
getpid(),
ss->this_stack[stack_idx].simple->socket,
zz ? zz->tm_hour : 00,
zz ? zz->tm_min : 00,
zz ? zz->tm_sec : 00,
n);
for (i = 0; i < n && i < 13; i++) {
unsigned char c =
(unsigned char) buffer -> write_buffer[i];
fprintf(transaction_file," %02X",c);
}
if (i < n)
fprintf(transaction_file," ...");
fprintf(transaction_file,"\n");
}
return n;
}
static void ss_FreeSocket P_((struct streamsched *ss,int stack_idx));
static void ss_FreeSocket(ss, stack_idx)
struct streamsched *ss;
int stack_idx;
{
DPRINT(Debug,15,(&Debug,
"ss_FreeSocket: ss=%p, stack_idx=%d\n",
ss,stack_idx));
if (stack_idx < 0 || stack_idx >= ss->stack_len ||
!ss->this_stack || !ss->this_stack[stack_idx].simple ||
ss->this_stack[stack_idx].simple->type != &SocketStream)
panic("STREAMSCHED PANIC",__FILE__,__LINE__,"ss_FreeSocket",
"Bad stack index or stack",0);
if (ss->this_stack[stack_idx].simple->socket != -1) {
/* No longer schedule for this! */
clear_action(ss->this_stack[stack_idx].simple->socket);
close(ss->this_stack[stack_idx].simple->socket);
ss->this_stack[stack_idx].simple->socket = -1;
}
ss->this_stack[stack_idx].simple->type = NULL;
free(ss->this_stack[stack_idx].simple);
ss->this_stack[stack_idx].simple = NULL;
}
static int ss_StreamNoAction P_((struct streamsched *ss, int stack_idx));
static int ss_StreamNoAction(ss,stack_idx)
struct streamsched *ss;
int stack_idx;
{
DPRINT(Debug,15,(&Debug,
"ss_StreamNoAction: ss=%p, stack_idx=%d\n",
ss,stack_idx));
panic("STREAMSCHED PANIC",__FILE__,__LINE__,"ss_StreamNoAction",
"ss_StreamNoAction called",0);
return 0;
}
static void ss_SocketSchedule P_((struct streamsched *ss, int stack_idx,
int previous_needs,
int *needs, int *provides));
static void ss_SocketSchedule(ss,stack_idx,previous_needs,needs,provides)
struct streamsched *ss;
int stack_idx;
int previous_needs;
int *needs;
int *provides;
{
DPRINT(Debug,15,(&Debug,
"ss_SocketSchedule: ss=%p, stack_idx=%d\n",
ss,stack_idx));
if (stack_idx < 0 || stack_idx >= ss->stack_len ||
!ss->this_stack || !ss->this_stack[stack_idx].simple ||
ss->this_stack[stack_idx].simple->type != &SocketStream)
panic("STREAMSCHED PANIC",__FILE__,__LINE__,"ss_SocketSchedule",
"Bad stack index or stack",0);
*needs = previous_needs &
(SS_read_act|SS_write_act|SS_timeout_act);
/* We indicate nothing because read_engine and write_engine
will call action routines ...
*/
*provides = 0;
}
static void ss_SocketInfo P_((struct streamsched *ss,
int stack_idx,
enum SS_info function,
int *int_val,
char **str_val));
static void ss_SocketInfo(ss,stack_idx,function,int_val,str_val)
struct streamsched *ss;
int stack_idx;
enum SS_info function;
int *int_val;
char **str_val;
{
DPRINT(Debug,15,(&Debug,
"ss_SocketInfo: ss=%p, stack_idx=%d\n",
ss,stack_idx));
if (stack_idx < 0 || stack_idx >= ss->stack_len ||
!ss->this_stack || !ss->this_stack[stack_idx].simple ||
ss->this_stack[stack_idx].simple->type != &SocketStream)
panic("STREAMSCHED PANIC",__FILE__,__LINE__,"ss_SocketInfo",
"Bad stack index or stack",0);
/* We are not intresting any of functions currectly
so do nothing ...
*/
}
struct stream_type SocketStream =
{
SS_type_magic,
"Socket",
ss_ReadFromSocket,
ss_WriteToSocket,
ss_FreeSocket,
ss_StreamNoAction,
ss_StreamNoAction,
ss_SocketSchedule,
ss_StreamNoAction,
ss_StreamNoAction,
ss_SocketInfo
};
/* ----------------------------------------------------------------------- */
static char * return_stream_ident P_((struct streamsched *ss));
static char * return_stream_ident(ss)
struct streamsched *ss;
{
static char BUFFER[10];
if (ss->stack_len < 1 ||
ss->this_stack[ss->stack_len-1].simple->type !=
&SocketStream)
return "*";
if (ss->this_stack[ss->stack_len-1].simple->socket == -1)
return "*";
elm_sfprintf(BUFFER,sizeof BUFFER,
FRM("%d"),
ss->this_stack[ss->stack_len-1].simple->socket);
return BUFFER;
}
static void free_error_state P_((struct streamsched *ss));
static void free_error_state(ss)
struct streamsched *ss;
{
if (ss->error_state) {
free(ss->error_state);
ss->error_state = NULL;
}
}
static int call_user_read_callback P_((struct streamsched *ss));
static int call_user_read_callback(ss)
struct streamsched *ss;
{
int ret = ss->read_act(ss,ss->data);
if (!ret) {
ss->read_act = ss_noaction_routine;
}
ss->active_flags &= ~SS_read_act;
return ret;
}
static int call_user_write_callback P_((struct streamsched *ss));
static int call_user_write_callback(ss)
struct streamsched *ss;
{
int ret = ss->write_act(ss,ss->data);
if (!ret) {
ss->write_act = ss_noaction_routine;
}
ss->active_flags &= ~SS_write_act;
return ret;
}
static int call_user_timeout_callback P_((struct streamsched *ss));
static int call_user_timeout_callback(ss)
struct streamsched *ss;
{
int ret = ss->timeout_act(ss,ss->data);
if (!ret) {
ss->timeout_act = ss_noaction_routine;
}
ss->active_flags &= ~SS_timeout_act;
return ret;
}
static void xx_will P_((struct streamsched *ss, char *Tag,
int level, char *action));
static void xx_will(ss,Tag, level, action)
struct streamsched *ss;
char *Tag;
int level;
char * action;
{
char *s = return_stream_ident(ss);
DPRINT(Debug,20,(&Debug,
" ... [%s] %*s %s will %s\n",
s, level*2+3, "",
Tag,action));
}
static int call_read_callback P_((struct streamsched *ss, int level));
static int call_read_callback(ss,level)
struct streamsched *ss;
int level;
{
int ret = 1;
if (level >= 0) {
xx_will(ss,(*(ss -> this_stack[level].TYPE))->tag,level,
"read");
ret = (*(ss -> this_stack[level].TYPE))-> read_action(ss,level);
} else {
if (ss->read_act != ss_noaction_routine) {
xx_will(ss,"Client",level,"read");
ret = call_user_read_callback(ss);
} else
ret = 0;
}
return ret;
}
static int call_write_callback P_((struct streamsched *ss, int level));
static int call_write_callback(ss,level)
struct streamsched *ss;
int level;
{
int ret = 1;
if (level >= 0) {
xx_will(ss,(*(ss -> this_stack[level].TYPE))->tag,level,
"write");
ret = (*(ss -> this_stack[level].TYPE))-> write_action(ss,level);
} else {
if (ss->write_act != ss_noaction_routine) {
xx_will(ss,"Client",level,"write");
ret = call_user_write_callback(ss);
} else
ret = 0;
}
return ret;
}
static void after_action P_((struct streamsched *ss));
static int read_engine P_((int fd, void * data));
static int read_engine(fd,data)
int fd;
void * data;
{
struct streamsched *ss = data;
int ret = 0;
DPRINT(Debug,19,(&Debug,
"read_engine: ss=%p\n",ss));
if (ss->magic != SS_magic)
panic("STREAMSCHED PANIC",__FILE__,__LINE__,"read_engine",
"Bad stream (magic)",0);
/* Bottom of stack is never called ...
instead call previous routine ...
*/
ret = call_read_callback(ss,ss->stack_len-2);
after_action(ss);
DPRINT(Debug,19,(&Debug,
"read_engine=%d %s\n",
ret, ret == 0 ? "(disabling)" : ""));
return ret;
}
static int write_engine P_((int fd, void * data));
static int write_engine(fd,data)
int fd;
void * data;
{
int ret;
struct streamsched *ss = data;
DPRINT(Debug,19,(&Debug,
"write_engine: ss=%p\n",ss));
if (ss->magic != SS_magic)
panic("STREAMSCHED PANIC",__FILE__,__LINE__,"write_engine",
"Bad stream (magic)",0);
/* Bottom of stack is newer called ...
instead call previous routine ...
*/
ret = call_write_callback(ss,ss->stack_len-2);
after_action(ss);
DPRINT(Debug,19,(&Debug,
"write_engine=%d %s\n",
ret, ret == 0 ? "(disabling)" : ""));
return ret;
}
static int timeout_engine P_((int fd, void * data));
static int timeout_engine(fd,data)
int fd;
void * data;
{
int ret;
struct streamsched *ss = data;
DPRINT(Debug,19,(&Debug,
"timeout_engine: ss=%p\n",ss));
if (ss->magic != SS_magic)
panic("STREAMSCHED PANIC",__FILE__,__LINE__,"timeout_engine",
"Bad stream (magic)",0);
ret = call_user_timeout_callback(ss);
after_action(ss);
DPRINT(Debug,19,(&Debug,
"timeout_engine=%d %s\n",
ret, ret == 0 ? "(disabling)" : ""));
return ret;
}
static int prewait P_((struct streamsched *ss, int *restart));
static enum schedule_return schedule_engine P_((int fd,
void * data));
static enum schedule_return schedule_engine(fd,data)
int fd;
void * data;
{
enum schedule_return ret = schedule_done;
struct streamsched *ss = data;
int retry = 0;
int mask = 0;
char * Ident;
int old_values;
DPRINT(Debug,19,(&Debug,
"schedule_engine: ss=%p\n",ss));
if (ss->magic != SS_magic)
panic("STREAMSCHED PANIC",__FILE__,__LINE__,"schedule_engine",
"Bad stream (magic)",0);
Ident = return_stream_ident(ss);
old_values = ss->active_flags;
mask = prewait(ss,&retry);
if (ss->timeout_act != ss_noaction_routine) {
DPRINT(Debug,20,(&Debug,
" ... [%s] stream timeout=%d\n",
Ident,ss->timeout_sec));
mask |= SS_timeout_act;
}
if (ss->stack_len < 1 ||
ss->this_stack[ss->stack_len-1].simple->type !=
&SocketStream) {
DPRINT(Debug,1,(&Debug,
"bad stream -- length of stream = %d\n",
ss->stack_len));
if (ss->stack_len > 0) {
DPRINT(Debug,1,(&Debug,
" -- end of stream %s\n",
SS_type_magic == ss->this_stack[ss->stack_len-1].
simple->type->magic ?
"valid type" : "not valid"));
}
panic("STREAMSCHED PANIC",__FILE__,__LINE__,"schedule_engine",
"No socket on end of stream",0);
} else if (ss->this_stack[ss->stack_len-1].simple->socket == -1)
panic("STREAMSCHED PANIC",__FILE__,__LINE__,"schedule_engine",
"Socket closed on end of stream",0);
else
change_action(ss->this_stack[ss->stack_len-1].simple->socket,
ss->timeout_sec,
(mask & SS_read_act) ?
read_engine : no_action_routine,
(mask & SS_write_act) ?
write_engine : no_action_routine,
(mask & SS_timeout_act) ?
timeout_engine : no_action_routine,
ss);
if (old_values != ss->active_flags) {
DPRINT(Debug,20,(&Debug,
" ... [%s] active flags changed ... quit waiting\n",
Ident));
ret = schedule_have_action;
} else if (retry > 0) {
DPRINT(Debug,20,(&Debug,
" ... [%s] retrying without waiting\n",
Ident));
ret = schedule_reconsider;
}
else if (retry < 0) {
DPRINT(Debug,20,(&Debug,
" ... [%s] Signaling wait ready\n",
Ident));
ret = schedule_have_action;
}
DPRINT(Debug,19,(&Debug,
"schedule_engine=%d\n",ret));
return ret;
}
static void xx_wants P_((struct streamsched *ss, char *Tag,
int level, int mask));
static void xx_wants(ss,Tag, level, mask)
struct streamsched *ss;
char *Tag;
int level;
int mask;
{
char *s = return_stream_ident(ss);
DPRINT(Debug,20,(&Debug,
" ... [%s] %*s %s wants",
s, level*2+3, "",
Tag));
if (mask & SS_read_act)
DPRINT(Debug,20,(&Debug," read"));
if (mask & SS_write_act)
DPRINT(Debug,20,(&Debug," write"));
if (mask & SS_timeout_act)
DPRINT(Debug,20,(&Debug," timeout"));
if (mask & SS_setup_act)
DPRINT(Debug,20,(&Debug," setup"));
if (mask & SS_shutdown_act)
DPRINT(Debug,20,(&Debug," shutdown"));
if (0 == mask)
DPRINT(Debug,20,(&Debug," NONE"));
DPRINT(Debug,20,(&Debug,"\n"));
}
static void xx_provides P_((struct streamsched *ss, char *Tag,
int level, int mask));
static void xx_provides(ss,Tag, level, mask)
struct streamsched *ss;
char *Tag;
int level;
int mask;
{
char *s = return_stream_ident(ss);
DPRINT(Debug,20,(&Debug,
" ... [%s] %*s %s provides",
s, level*2+3, "",
Tag));
if (mask & SS_read_act)
DPRINT(Debug,20,(&Debug," read"));
if (mask & SS_write_act)
DPRINT(Debug,20,(&Debug," write"));
if (mask & SS_timeout_act)
DPRINT(Debug,20,(&Debug," timeout"));
if (mask & SS_setup_act)
DPRINT(Debug,20,(&Debug," setup"));
if (mask & SS_shutdown_act)
DPRINT(Debug,20,(&Debug," shutdown"));
if (0 == mask)
DPRINT(Debug,20,(&Debug," NONE"));
DPRINT(Debug,20,(&Debug,"\n"));
}
static int prewait(ss,restart)
struct streamsched *ss;
int *restart;
{
int return_mask = 0;
int want_mask = 0;
int level;
char * Ident;
if (ss->magic != SS_magic)
panic("STREAMSCHED PANIC",__FILE__,__LINE__,"prewait",
"Bad stream (magic)",0);
Ident = return_stream_ident(ss);
if (ss_noaction_routine != ss->read_act)
want_mask |= SS_read_act;
if (ss_noaction_routine != ss->write_act)
want_mask |= SS_write_act;
if (ss_noaction_routine != ss->timeout_act)
want_mask |= SS_timeout_act;
DPRINT(Debug,20,(&Debug,
" ... [%s] --- before action:\n",Ident));
xx_wants(ss,"Client",-1,want_mask);
for (level = 0; level < ss->stack_len; level++) {
int old_mask = want_mask;
int provide_mask;
int reconsider = 0;
int first = 1;
int count = 0;
do {
if (reconsider) {
count ++;
if (count > 2) {
DPRINT(Debug,1,(&Debug,
"prewait [%s] recondsidering [%d] delayed!\n",
Ident,count));
*restart = 1;
break;
}
DPRINT(Debug,20,(&Debug,
" ... [%s] %*s ... reconsidering [%d]\n",
Ident,level*2+3,"",count));
reconsider = 0;
}
(*(ss -> this_stack[level].TYPE))->
schedule_data(ss,
level,old_mask,
&want_mask,&provide_mask);
if (level == ss->stack_len-1)
return_mask = want_mask;
xx_wants(ss,(*(ss -> this_stack[level].TYPE))->tag,level,want_mask);
xx_provides(ss,(*(ss -> this_stack[level].TYPE))->tag,level,
provide_mask);
if (provide_mask & SS_write_act) {
if(call_write_callback(ss,level-1)) {
*restart = 1;
DPRINT(Debug,20,(&Debug,
" ... [%s] %*s ... from beginning again\n",
Ident,level*2+3,""));
break;
} else
reconsider = 1;
}
if (provide_mask & SS_read_act) {
if(call_read_callback(ss,level-1)) {
*restart = 1;
DPRINT(Debug,20,(&Debug,
" ... [%s] %*s ... from beginning again\n",
Ident,level*2+3,""));
break;
} else
reconsider = 1;
}
if (first) {
switch(want_mask & (SS_setup_act|SS_shutdown_act)) {
case SS_setup_act:
/* Setup is just internal action */
xx_will(ss,(*(ss -> this_stack[level].TYPE))->tag,level,
"setup");
if (!(*(ss -> this_stack[level].TYPE))->
setup_action(ss,level)) {
*restart = -1;
DPRINT(Debug,20,(&Debug,
" ... [%s] %*s ... from beginning again (quiting)\n",
Ident,level*2+3,""));
} else
reconsider = 1;
ss->active_flags &= ~SS_setup_act;
break;
case SS_shutdown_act:
/* Shutdown is just internal action */
xx_will(ss,(*(ss -> this_stack[level].TYPE))->tag,level,
"shutdown");
if (!(*(ss -> this_stack[level].TYPE))->
shutdown_action(ss,level)) {
*restart = -1;
DPRINT(Debug,20,(&Debug,
" ... [%s] %*s ... from beginning again (quiting)\n",
Ident,level*2+3,""));
} else
reconsider = 1;
ss->active_flags &= ~SS_shutdown_act;
break;
case SS_shutdown_act|SS_setup_act:
DPRINT(Debug,1,(&Debug,
" ... [%s] %*s ... BOTH setup and shutdown wanted?",
Ident,level*2+3,""));
panic("STREAMSCHED PANIC",__FILE__,__LINE__,
"prewait",
"Both setup and shutdown wanted to same stream",0);
}
first = 0;
}
} while (reconsider && !ss->error_state);
if (ss->error_state) {
DPRINT(Debug,20,(&Debug,
" ... [%s] Error state: %s\n",Ident,
ss->error_state));
*restart = 0;
}
}
return return_mask;
}
static void after_action(ss)
struct streamsched *ss;
{
int i;
int want_mask;
char *s;
if (ss->magic != SS_magic)
panic("STREAMSCHED PANIC",__FILE__,__LINE__,"after_action",
"Bad stream (magic)",0);
want_mask = ss->active_flags;
s = return_stream_ident(ss);
DPRINT(Debug,20,(&Debug,
" ... [%s] --- after action:\n",s));
for (i = ss->stack_len-2; i >= 0; i--) {
int mask;
(*(ss -> this_stack[i].TYPE))->
schedule_data(ss,i,want_mask,&want_mask,&mask);
xx_provides(ss,(*(ss -> this_stack[i].TYPE))->tag,i,mask);
xx_wants(ss,(*(ss -> this_stack[i].TYPE))->tag,i,want_mask);
if (mask & SS_read_act)
call_read_callback(ss,i-1);
if (mask & SS_write_act)
call_write_callback(ss,i-1);
}
}
void WaitStreamFor(ss,flags)
struct streamsched *ss;
int flags;
{
char * Ident;
if (ss->magic != SS_magic)
panic("STREAMSCHED PANIC",__FILE__,__LINE__,"WaitStreamFor",
"Bad stream (magic)",0);
DPRINT(Debug,20,(&Debug,
"WaitStreamFor: ss=%p, flags=%d <--- START\n",
ss,flags));
Ident = return_stream_ident(ss);
ss->active_flags = flags;
while (ss->active_flags == flags) {
if (ss->stack_len < 1 ||
ss->this_stack[ss->stack_len-1].simple->type !=
&SocketStream) {
DPRINT(Debug,1,(&Debug,
"bad stream -- length of stream = %d\n",
ss->stack_len));
if (ss->stack_len > 0) {
DPRINT(Debug,1,(&Debug,
" -- end of stream %s\n",
SS_type_magic == ss->this_stack[ss->stack_len-1].
simple->type->magic ?
"valid type" : "not valid"));
}
panic("STREAMSCHED PANIC",__FILE__,__LINE__,"WaitStreamFor",
"No socket on end of stream",0);
} else if (ss->this_stack[ss->stack_len-1].simple->socket == -1)
panic("STREAMSCHED PANIC",__FILE__,__LINE__,"WaitStreamFor",
"Socket closed on end of stream",0);
else
set_schedule_action(ss->this_stack[ss->stack_len-1].simple->socket,
schedule_engine,
ss);
wait_for_any_action();
if (ss->error_state) {
/* Let's hope that read_callback is available ... */
if ( ss_noaction_routine == ss->read_act) {
DPRINT(Debug,4,(&Debug,
" ... [%s] Can't signal on error state: %s\n",
Ident,ss->error_state));
} else {
DPRINT(Debug,4,(&Debug,
" ... [%s] Signaling on error state: %s\n",
Ident,ss->error_state));
call_user_read_callback(ss);
}
}
}
DPRINT(Debug,20,(&Debug,
"WaitStreamFor <--- END\n"));
return;
}
int ReadFromStream(ss,buffer,wanted)
struct streamsched *ss;
struct Read_Buffer *buffer;
int wanted;
{
int n;
if (ss->magic != SS_magic)
panic("STREAMSCHED PANIC",__FILE__,__LINE__,"ReadFromStream",
"Bad stream (magic)",0);
n = (*(ss -> this_stack[0].TYPE))->read_from_it(ss,0,buffer,wanted);
if (n > 0) {
if (transaction_file) {
time_t tmp = time(NULL);
struct tm * zz = localtime(&tmp);
int i;
int q = 0;
char * id = return_stream_ident(ss);
fprintf(transaction_file,
"%d [%s] %02d:%02d:%02d %c<< [len %4d] ",
getpid(),id,
zz ? zz->tm_hour : 00,
zz ? zz->tm_min : 00,
zz ? zz->tm_sec : 00,
ss->stack_len > 1 ? 'T' : '<',
n);
for (i = 0; i < n; i++) {
unsigned char c =
(unsigned char) buffer -> read_buffer[buffer -> read_len
+ i];
if (isascii(c) && isprint(c) && c != '"') {
if (!q) {
putc(' ',transaction_file);
putc('"',transaction_file);
q++;
}
putc(c,transaction_file);
} else {
if (q) {
putc('"',transaction_file);
q = 0;
}
fprintf(transaction_file," %02X",c);
if (c == 0x0A && i < n-1) {
fprintf(transaction_file,
"\n%d [%s] %02d:%02d:%02d %c<< [continue] ",
getpid(),id,
zz ? zz->tm_hour : 00,
zz ? zz->tm_min : 00,
zz ? zz->tm_sec : 00,
ss->stack_len > 1 ? 'T' : '<');
}
}
}
if (q) {
putc('"',transaction_file);
}
putc('\n',transaction_file);
}
buffer -> read_len += n;
DPRINT(Debug,8,(&Debug,
"ReadFromStream: Read %d bytes (%d total)\n",
n,buffer->read_len));
}
return n;
}
char * RemoveStreamError(ss)
struct streamsched *ss;
{
char * ret;
if (ss->magic != SS_magic)
panic("STREAMSCHED PANIC",__FILE__,__LINE__,"RemoveStreamError",
"Bad stream (magic)",0);
ret = ss->error_state;
ss->error_state = NULL;
return ret;
}
int WriteToStream(ss,buffer)
struct streamsched *ss;
struct Write_Buffer *buffer;
{
int n;
if (ss->magic != SS_magic)
panic("STREAMSCHED PANIC",__FILE__,__LINE__,"WriteToStream",
"Bad stream (magic)",0);
n = (*(ss -> this_stack[0].TYPE))->write_to_it(ss,0,buffer);
if (n > 0) {
if (transaction_file) {
time_t tmp = time(NULL);
struct tm * zz = localtime(&tmp);
int i;
int q = 0;
char * id = return_stream_ident(ss);
fprintf(transaction_file,
"%d [%s] %02d:%02d:%02d %c>> [len %4d] ",
getpid(),id,
zz ? zz->tm_hour : 00,
zz ? zz->tm_min : 00,
zz ? zz->tm_sec : 00,
ss->stack_len > 1 ? 'T' : '>',
n);
for (i = 0; i < n; i++) {
unsigned char c =
(unsigned char) buffer->write_buffer[i];
if (isascii(c) && isprint(c) && c != '"') {
if (!q) {
putc(' ',transaction_file);
putc('"',transaction_file);
q++;
}
putc(c,transaction_file);
} else {
if (q) {
putc('"',transaction_file);
q = 0;
}
fprintf(transaction_file," %02X",c);
if (c == 0x0A && i < n-1) {
fprintf(transaction_file,
"\n%d [%s] %02d:%02d:%02d %c>> [continue] ",
getpid(),id,
zz ? zz->tm_hour : 00,
zz ? zz->tm_min : 00,
zz ? zz->tm_sec : 00,
ss->stack_len > 1 ? 'T' : '>');
}
}
}
if (q) {
putc('"',transaction_file);
}
putc('\n',transaction_file);
}
}
return n;
}
static union stream_types alloc_simple P_((int socket));
static union stream_types alloc_simple(socket)
int socket;
{
union stream_types ret;
struct simple_type * S = safe_malloc(sizeof (struct simple_type));
/* bzero is defined on hdrs/defs.h */
bzero((void *)S,sizeof (struct simple_type));
S->type = &SocketStream;
S->socket = socket;
ret.simple = S;
return ret;
}
void add_stream_to_head(ss,stream)
struct streamsched *ss;
union stream_types stream;
{
int i;
#ifdef USE_DLOPEN
load_shared_libs1(&use_shared_connect);
#endif
if (SS_type_magic != (*(stream.TYPE))->magic) {
panic("STREAMSCHED PANIC",__FILE__,__LINE__,"add_stream_to_head",
"Bad streamtupe (magic)",0);
}
ss->this_stack = safe_realloc(ss->this_stack,
(ss->stack_len+1) *
sizeof (ss->this_stack[0]));
for (i = ss->stack_len; i > 0; i--)
ss->this_stack[i] = ss->this_stack[i-1];
ss->this_stack[0] = stream;
ss->stack_len++;
if (transaction_file) {
time_t tmp = time(NULL);
struct tm * zz = localtime(&tmp);
char * id = return_stream_ident(ss);
int i;
fprintf(transaction_file,
"%d [%s] %02d:%02d:%02d ---",
getpid(),id,
zz ? zz->tm_hour : 00,
zz ? zz->tm_min : 00,
zz ? zz->tm_sec : 00);
for (i = 0; i < ss->stack_len; i++)
fprintf(transaction_file," %s",
(*(ss->this_stack[i].TYPE))->tag);
fprintf(transaction_file,"\n");
}
}
int ss_noaction_routine(ss,data)
struct streamsched *ss;
void * data;
{
panic("STREAMSCHED PANIC",__FILE__,__LINE__,"ss_noaction_routine",
"ss_noaction_routine called",0);
return 0;
}
struct streamsched * returnSimpleStream(socket)
int socket;
{
struct streamsched * ret = safe_malloc(sizeof (struct streamsched));
/* bzero is defined on hdrs/defs.h */
bzero((void *)ret,sizeof (struct streamsched));
ret->this_stack = NULL;
ret->stack_len = 0;
ret->read_act = ss_noaction_routine;
ret->write_act = ss_noaction_routine;
ret->timeout_act = ss_noaction_routine;
ret->timeout_sec = 0;
ret->error_state = NULL;
ret->active_flags = 0;
ret->data = NULL;
ret->magic = SS_magic;
add_stream_to_head(ret,alloc_simple(socket));
return ret;
}
void ConfigStream(ss,read_act,write_act,timeout_act,timeout_sec,data)
struct streamsched *ss;
ss_action_routine * read_act;
ss_action_routine * write_act;
ss_action_routine * timeout_act;
int timeout_sec;
void * data;
{
if (ss->magic != SS_magic)
panic("STREAMSCHED PANIC",__FILE__,__LINE__,"ConfigStream",
"Bad stream (magic)",0);
ss->read_act = read_act;
ss->write_act = write_act;
ss->timeout_act = timeout_act;
ss->timeout_sec = timeout_sec;
ss->data = data;
}
void FreeStreamStack(ss)
struct streamsched **ss;
{
if ((*ss)->magic != SS_magic)
panic("STREAMSCHED PANIC",__FILE__,__LINE__,"ConfigStream",
"Bad stream (magic)",0);
if ((*ss) -> this_stack) {
int i;
/* NOTE: free_it may start shutdown_act */
for (i = 0; i < (*ss)->stack_len; i++)
(*((*ss) -> this_stack[i].TYPE))->free_it(*ss,i);
free((*ss) -> this_stack);
(*ss) -> this_stack = NULL;
}
(*ss)->stack_len = 0;
free_error_state(*ss);
free(*ss);
*ss = NULL;
}
/* Unknown functions does not modify *int_val or *str_val */
void StreamInfo(ss,function,int_val,str_val)
struct streamsched *ss;
enum SS_info function;
int *int_val;
char **str_val;
{
int max = 0;
int i;
switch (function) {
case SS_ssf:
/* "Security Strength Factor" */
/* return maximum of values */
for (i = 0; i < ss->stack_len; i++) {
int val = 0; /* Default is 0 */
(*(ss -> this_stack[i].TYPE))->query_it(ss,i,
function,
&val,NULL);
if (val > max)
max = val;
}
*int_val = max;
break;
}
}
#endif
/*
* Local Variables:
* mode:c
* c-basic-offset:4
* buffer-file-coding-system: iso-8859-1
* End:
*/
syntax highlighted by Code2HTML, v. 0.9.1