# -*- Mode: Python -*- VERSION_STRING = '$Id: coro.py,v 1.1 2000/04/11 00:50:22 hassan Exp $' # Copyright 1999 by eGroups, Inc. # # All Rights Reserved # # Permission to use, copy, modify, and distribute this software and # its documentation for any purpose and without fee is hereby # granted, provided that the above copyright notice appear in all # copies and that both that copyright notice and this permission # notice appear in supporting documentation, and that the name of # eGroups not be used in advertising or publicity pertaining to # distribution of the software without specific, written prior # permission. # # EGROUPS DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN # NO EVENT SHALL EGROUPS BE LIABLE FOR ANY SPECIAL, INDIRECT OR # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. import bisect import os import select import socket import string import sys import time import whrandom # This is a problem, because of /findmail/src/poll.py ## try: ## import poll ## USE_POLL = 1 ## except ImportError: ## USE_POLL = 0 USE_POLL = 0 LOG_LEVELS = 10 # Do not change this - jeske PRINT_LOG_LEVELS = 5 # Higher levels are more severe errors LOG_ERROR = 7 LOG_VERBOSE = 3 TIMEOUT_VALUE = 0xb00b BADF_ERROR = 0xc00c from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, ENOTCONN import coroutine import exceptions class CoroutineSocketError (exceptions.Exception): pass class CoroutineCondError (exceptions.Exception): pass class CoroutineThreadError (exceptions.Exception): pass class TimeoutError (exceptions.Exception): pass # =========================================================================== # Coroutine Socket # =========================================================================== class coroutine_socket: "socket that automatically suspends/resumes instead of blocking." def __init__ (self, sock=None): self.socket = sock if sock: self.socket.setblocking (0) self.connected = 1 self.set_fileno() def set_fileno (self): self._fileno = self.socket.fileno() def fileno (self): return self._fileno def create_socket (self, family, type, proto=0): self.socket = socket.socket (family, type, proto) self.socket.setblocking (0) self.set_fileno() TimeoutError = TimeoutError def wait_for_read (self, timeout=None): me = current_thread() if timeout is not None: triple = the_event_list.insert_event (me, time.time()+timeout, TIMEOUT_VALUE) if me is None: raise CoroutineSocketError, "coroutine sockets cannot run in 'main'" else: read_set[self._fileno] = me try: result = me.yield_() except: raise CoroutineSocketError, "coroutine socket could not yield_" if result == TIMEOUT_VALUE: # i.e, we timed out del read_set[self._fileno] raise TimeoutError elif timeout is not None: try: # remove event the_event_list.remove_event (triple) except: print "Error removing: result=%s" % result if result == BADF_ERROR: # ie, bad socket for select raise CoroutineSocketError, "bad file descriptor" def wait_for_write (self, timeout=None): me = current_thread() if timeout is not None: triple = the_event_list.insert_event (me, time.time()+timeout, TIMEOUT_VALUE) if me is None: raise CoroutineSocketError, "coroutine sockets cannot run in 'main'" else: write_set[self._fileno] = me try: result = me.yield_() except: raise CoroutineSocketError, "coroutine socket could not yield_" if result == TIMEOUT_VALUE: # i.e, we timed out del write_set[self._fileno] raise TimeoutError elif timeout is not None: # remove event the_event_list.remove_event (triple) if result == BADF_ERROR: # ie, bad socket for select raise CoroutineSocketError, "bad file descriptor" def connect (self, address, timeout=None): try: return self.socket.connect (address) except socket.error, why: if why[0] in (EINPROGRESS, EWOULDBLOCK): self.wait_for_write(timeout=timeout) ret = self.socket.getsockopt (socket.SOL_SOCKET,socket.SO_ERROR) if ret != 0: raise socket.error, (ret, os.strerror(ret)) return elif why[0] == EALREADY: return else: raise socket.error, why def recv (self, buffer_size): self.wait_for_read() return self.socket.recv (buffer_size) def recvfrom (self, buffer_size): self.wait_for_read() return self.socket.recvfrom (buffer_size) def send (self, data): self.wait_for_write() return self.socket.send (data) def sendto (self, data, where): self.wait_for_write() return self.socket.sendto (data, where) def bind (self, address): return self.socket.bind (address) def listen (self, queue_length): return self.socket.listen (queue_length) def accept (self): self.wait_for_read() conn, addr = self.socket.accept() return self.__class__ (conn), addr _closed = 0 def close (self): if not self._closed: self._closed = 1 if self.socket: return self.socket.close() else: return None def __del__ (self): self.close() def set_reuse_addr (self): # try to re-use a server port if possible try: self.socket.setsockopt ( socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 | self.socket.getsockopt (socket.SOL_SOCKET, socket.SO_REUSEADDR) ) except: pass # =========================================================================== # Condition Variable # =========================================================================== class coroutine_cond: def __init__ (self): self._waiting = {} def __len__(self): return len(self._waiting) def wait (self, timeout = None): thrd = current_thread() tid = thrd.thread_id() self._waiting[tid] = thrd result = thrd.yield_(timeout) # If we have passed to here, we are not waiting any more, # so remove reference to thread: if self._waiting.has_key(tid): del self._waiting[tid] return result def wake (self, id): thrd = self._waiting.get(id, None) if thrd is None: raise CoroutineCondError, 'unknown thread <%d>' % (id) else: del self._waiting[id] schedule (thrd, ()) def wake_one (self, *args): if len(self._waiting): id = whrandom.choice(self._waiting.keys()) thrd = self._waiting[id] del self._waiting[id] schedule (thrd, args) def wake_all (self, *args): for thrd in self._waiting.values(): schedule (thrd, args) self._waiting = {} # =========================================================================== # Thread Abstraction # =========================================================================== _current_threads = {} class Thread: # TODO: the method _thread_count = 0 def __init__ (self, group=None, target=None, name=None, args=(), kwargs={}): if Thread._thread_count == 0x7FFFFFFF: Thread._thread_count = 0 Thread._thread_count = Thread._thread_count + 1 self._thread_id = self._thread_count if name is None: self._name = 'thread_%d' % self._thread_id else: self._name = name self._target = target self._args = args self._kwargs = kwargs self._resume_count = 0 self._total_time = 0 self._alive = 0 self._profile = 0 self._daemonic = 0 self._co = coroutine.new (self._run, 65536) self._status = 'initialized' self._log_level = PRINT_LOG_LEVELS def resume (self, *args): if self._profile: self._resume_count = self._resume_count + 1 start_time = time.time() else: start_time = 0 if self._alive: result = coroutine.resume (self._co, args) else: result = coroutine.resume (self._co, ()) if self._profile and start_time: end_time = time.time() self._total_time = self._total_time + (end_time - start_time) return result def start (self): schedule (self) def _run (self): global _current_threads try: self._alive = 1 self._status = 'alive' _current_threads[self._co] = self if self._target is None: result = apply (self.run, self._args, self._kwargs) else: result = apply (self._target, self._args, self._kwargs) except coroutine.unwind: # kill() will cause this pass except: self._error = compact_traceback() self.log (LOG_ERROR, self._error) del _current_threads[self._co] self._alive = 0 self._status = 'dead' def __del__ (self): if self._alive: self.kill() def kill (self): if self._alive: coroutine.kill (self._co) def run (self): self.log (0, 'unregistered run method') def profile (self, status): self._profile = status # Higher level == more severe error def log_level_(self, level): if level not in range(LOG_LEVELS): raise CoroutineThreadError, 'error log level out of bounds' else: self._log_level = level def log (self, level, message): if level not in range(LOG_LEVELS): raise CoroutineThreadError, 'error log level out of bounds' if level > self._log_level: time_str = "[%02d/%02d %02d:%02d:%02d]" % \ time.localtime(time.time())[1:6] sys.stderr.write('%s thread %d: %s\n' % \ (time_str, self._thread_id, str(message))) def yield_ (self, timeout = None): if timeout is not None: triple = the_event_list.insert_event (self, time.time()+timeout, TIMEOUT_VALUE) try: retval = coroutine.main(()) finally: if timeout: try: the_event_list.remove_event (triple) except: pass ## print "Error removing: result=%s" % str(triple) ## import traceback ## traceback.print_exc() if timeout and retval == TIMEOUT_VALUE: raise TimeoutError return retval def thread_id (self): return self._thread_id def getName (self): return self._name def setName (self, name): self._name = name def isAlive (self): return self._alive def isDaemon (self): return self._daemonic def setDaemon (self, daemonic): self._daemonic = daemonic def status (self): print 'Thread status:' print ' id: ', self._thread_id print ' alive: ', self._alive if self._profile: print ' resume count:', self._resume_count print ' execute time:', self._total_time def __repr__ (self): if self._profile: p = ' resume_count: %d execute_time: %s' % ( self._resume_count, self._total_time ) else: p = '' if self._alive: a = 'running' else: a = 'suspended' return '<%s.%s id:%d %s %s%s at %x>' % ( __name__, self.__class__.__name__, self._thread_id, self._status, a, p, id(self) ) # # end class Thread # def compact_traceback (): t,v,tb = sys.exc_info() tbinfo = [] while 1: tbinfo.append ( tb.tb_frame.f_code.co_filename, tb.tb_frame.f_code.co_name, str(tb.tb_lineno) ) tb = tb.tb_next if not tb: break # just to be safe del tb file, function, line = tbinfo[-1] info = '[' + string.join ( map ( lambda x: string.join (x, '|'), tbinfo ), '] [' ) + ']' return (file, function, line), str(t), str(v), info # # =========================================================================== # global state and threadish API # =========================================================================== # # file descriptors waiting for a read event read_set = {} # file descriptors waiting for a write event write_set = {} # coroutines that are ready to run pending = {} def _socket (family, type, proto=0): s = coroutine_socket() s.create_socket (family, type, proto) return s make_socket = _socket def new (function, *args, **kwargs): return Thread (target=function, args=args, kwargs=kwargs) #def spawn (function, *args): # schedule (coroutine.new (function), args) def spawn (function, *args): t = Thread (target=function, args=args) t.start() return t def spawnDaemon(function, *args): t = Thread (target=function, args=args) t.setDaemon(1) t.start() return t def schedule (coroutine, args=None): "schedule a coroutine to run" pending[coroutine] = args #def yield_ (): # return coroutine.main() def yield_(timeout=None): return current_thread().yield_(timeout=timeout) def thread_list(): return _current_threads.values() def current_thread(): co = coroutine.current() return _current_threads.get (co, None) current = current_thread def insert_thread(thrd): thrd.start() def run_pending(): "run all pending coroutines" while len(pending): try: # some of these will kick off others, thus the loop runnable = pending.items() pending.clear() for c,v in runnable: c.resume (v) except: # XXX can we throw the exception to the coroutine? import traceback traceback.print_exc() # uses poll(2) def poll_with_poll (timeout=30.0): if read_set or write_set: u = {} for fd in read_set.keys(): u[fd] = poll.POLLIN for fd in write_set.keys(): if u.has_key(fd): u[fd] = u[fd] | poll.POLLOUT else: u[fd] = poll.POLLOUT u = u.items() #print 'before',u u = poll.poll (u, timeout) #print 'after', u for fd, flags in u: if flags & poll.POLLIN: schedule (read_set[fd]) del read_set[fd] if flags & poll.POLLOUT: schedule (write_set[fd]) del write_set[fd] # uses select(2) def poll_with_select (timeout=30.0): if read_set or write_set: r = read_set.keys() w = write_set.keys() #print 'before: read: %d write: %d' % (len(r),len(w)) r,w,e = select.select (r,w, [], timeout) #print 'after: read: %d write: %d' % (len(r),len(w)) #sys.stdout.write ('- %d %d|' % (len(r),len(w))); sys.stdout.flush() for fd in r: schedule (read_set[fd]) del read_set[fd] for fd in w: schedule (write_set[fd]) del write_set[fd] class event_list: def __init__ (self): self.events = [] def __nonzero__ (self): return len(self.events) def __len__ (self): return len(self.events) def insert_event (self, co, when, args): triple = (when, co, args) bisect.insort (self.events, triple) return triple def remove_event (self, triple): self.events.remove (triple) def sleep_absolute (self, when, *args): me = current_thread() self.insert_event (me, when, args) me.yield_() def sleep_relative (self, delta, *args): me = current_thread() self.insert_event (me, time.time()+delta, args) me.yield_() def run_scheduled (self): now = time.time() i = j = 0 while i < len(self.events): when, thread, args = self.events[i] if now >= when: schedule (thread, args) j = i + 1 else: break i = i + 1 self.events = self.events[j:] return None def next_event (self, max_timeout=30.0): now = time.time() if len(self.events): when, thread, args = self.events[0] return min (max_timeout, max(when-now, 0)) else: return max_timeout return None the_event_list = event_list() sleep_absolute = the_event_list.sleep_absolute sleep_relative = the_event_list.sleep_relative gIsDone = 0 def reset_event_loop(): # # dump all events/threads, I'm using this to shut down, so # if the event_loop exits, I want to startup another eventloop # thread set to perform shutdown functions. # global the_event_list global read_set global write_set global pending the_event_list = event_list() read_set = {} write_set = {} pending = {} return None def _continue_event_loop(): if gIsDone: return 0 else: return len(the_event_list) + \ len(read_set) + \ len(write_set) + \ len(pending) def stop_event_loop(): global gIsDone gIsDone = 1 def event_loop (max_timeout=30.0): if USE_POLL: max_timeout = int (max_timeout * 1000) poll_fun = poll_with_poll else: poll_fun = poll_with_select while _continue_event_loop(): the_event_list.run_scheduled() run_pending() delta = the_event_list.next_event (max_timeout) try: poll_fun (timeout=delta) except select.error: b, thrd = find_broken_socket() if b != None: print "*** broken socket: %s, %s" % (repr(b), repr(thrd)) schedule (thrd, BADF_ERROR) def find_broken_socket(): # ick. scan through all the sockets to find out which one is bad. # XXX: in every case where this has happened, socket.fileno() yield_s # -1, which is probably a much cheaper way to find Mr. Broken. for s in read_set.keys(): try: r,w,e = select.select ([s],[s],[s], 0.0) except select.error: thrd = read_set[s] del read_set[s] return s, thrd for s in write_set.keys(): try: r,w,e = select.select ([s],[s],[s], 0.0) except select.error: thrd = write_set[s] del write_set[s] return s, thrd return None, None