############################################################################# # Copyright (C) DSTC Pty Ltd (ACN 052 372 577) 1995, 1996, 1997, 1998, 1999 # Unpublished work. All Rights Reserved. # # The software contained on this media is the property of the # DSTC Pty Ltd. Use of this software is strictly in accordance # with the license agreement in the accompanying LICENSE.DOC file. # If your distribution of this software does not contain a # LICENSE.DOC file then you have no rights to use this software # in any manner and should contact DSTC at the address below # to determine an appropriate licensing arrangement. # # DSTC Pty Ltd # Level 7, GP South # Staff House Road # University of Queensland # St Lucia, 4072 # Australia # Tel: +61 7 3365 4310 # Fax: +61 7 3365 4311 # Email: enquiries@dstc.edu.au # # This software is being provided "AS IS" without warranty of # any kind. In no event shall DSTC Pty Ltd be liable for # damage of any kind arising out of or in connection with # the use or performance of this software. # # Project: Hector # File: $Source: /cvsroot/fnorb/fnorb/orb/condvar.py,v $ # ############################################################################# """ Condition variables """ ############################################################################# import thread, time ############################################################################# class condvar: """Condition variables. A condition variable class with a built-in mutex. To use, create and associate a variable with it. Others, interested in the state of that variable, can "wait()" on the condvar and will be suspended, to be resumed whe another thread "signal()"s or "broadcast()"s. You must have acquired the condvar prior to any of "release()", "signal()" or "broadcast()". All of these operations release the acquired mutex. You must also have acquired the condvar prior to "wait()"ing or "timedwait()"ing, which release and then reacquire the mutex.""" #-- timedwait timer table d_timer = {} def __init__(self): """Create a new condition variable.""" self.lk = thread.allocate_lock() self.q_waiting = [] def __del__(self): """Destroy a condition variable.""" self.lk.acquire() for thr in self.q_waiting: thr.release() def acquire(self): """Acquire the lock on a condition variable. Return value none Pre-Locking caller must NOT hold the CV lock Post-Locking caller holds the CV lock Condition variables must be locked before calling "signal()", "broadcast()", "wait()", "timedwait()" or "release()".""" self.lk.acquire() def release(self): """Release the currently held lock on a condition variable. Return value none Pre-Locking caller must hold the CV lock Post-Locking callers lock is released The condition variable lock must be currently held by the caller.""" self.lk.release() def signal(self): """Inform the first waiting thread that the condition is altered. Return value none Pre-Locking caller MUST hold the CV lock Post-Locking callers lock is released The first thread queued awaiting changes in the condition is unblocked. Note that the condition variable lock must be held by the caller.""" #-- check for waiting threads if len(self.q_waiting) > 0: #-- remove head of waiting queue thr = self.q_waiting[0] self.q_waiting = self.q_waiting[1:] #-- remove timeout record (if present) if self.__class__.d_timer.has_key(id(thr)): del self.__class__.d_timer[id(thr)] #-- unblock thread thr.release() #-- unlock CV self.lk.release() def broadcast(self): """Inform all waiting threads that the condition is altered. Return value none Pre-Locking caller MUST hold the CV lock Post-Locking callers lock is released All threads queued awaiting a change in the condition are unblocked. Note that the condition variable lock must be held by the caller.""" for thr in self.q_waiting: #-- remove timeout record (if present) if self.__class__.d_timer.has_key(id(thr)): del self.__class__.d_timer[id(thr)] #-- unblock thread thr.release() #-- reset waiting queue self.q_waiting = [] #-- unlock CV self.lk.release() def wait(self): """Wait for notification of a change in the condition. Return value none Pre-Locking caller MUST hold the CV lock Post-Locking caller holds the CV lock (see note) The caller is blocked until notified of a change in the condition (from either "signal()" or "broadcast()"). Note: the caller must hold the condition variable lock, which will be released and reacquired before returning.""" #-- create the blocking lock and add to the CV queue lk = thread.allocate_lock() lk.acquire() self.q_waiting.append(lk) #-- release the CV lock self.lk.release() #-- block the thread lk.acquire() #-- reacquire the CV lock self.lk.acquire() def timedwait(self, timeout = 0): """Wait for notification of a change in the condition with a timeout. "timeout" period to wait, in seconds. 0 means return immediately. Return value 0 - a change has been notified -1 - the timeout period expired Exceptions ? Pre-Locking caller MUST hold the CV lock Post-Locking caller holds the CV lock (see note) The caller is blocked until notified of a change in the condition (from "signal()" or "broadcast()") or the expiry of the timeout period. Note: the caller must hold the condition variable lock, which will be released and reacquired before returning.""" res = 0 #-- create the blocking lock and add to the CV queue lk = thread.allocate_lock() lk.acquire() self.q_waiting.append(lk) #-- add this thread to the set of timers lk_id = id(lk) self.__class__.d_timer[lk_id] = lk thread.start_new_thread(self._wakeup, (lk_id, timeout)) #-- release the CV lock self.lk.release() #-- block the thread lk.acquire() #-- reacquire the CV lock self.lk.acquire() #-- check to see if we timed out if self.__class__.d_timer.has_key(lk_id): res = -1 del self.__class__.d_timer[lk_id] return res def _wakeup(self, lk_id, timeout): """Wakeup a timedwait on a condition variable. The workings of "timedwait()" are a little obscure: when a thread calls "timedwait()" an entry is placed into the timer table "d_timer" which is shared by all instances of the class. This entry is keyed by the ID of the lock used to suspend the thread, and contains the lock instance. When a thread is woken by "signal()" or "broadcast()" this entry is deleted. Note that it will only be there if "timedwait()" has been called. "timedwait()" checks for the presence of the table entry when it is unblocked. If it is not there, then the thread was woken by a "signal()" or "broadcast()" (which will have deleted it). "_wakeup()" unblocks the thread using the lock instance in the timer table, but does not delete the entry. When "timedwait()" is unblocked then, it notices that the timer table entry is still there, and can determine that it was unblocked by the timer expiry, not a "signal()" or "broadcast()". ok?""" time.sleep(timeout) #-- check if thread is still blocked if self.__class__.d_timer.has_key(lk_id): #-- unblock thread self.__class__.d_timer[lk_id].release() thread.exit() ############################################################################# def cvtest(): """Test function.""" cv = condvar() var = 1 print "master: starting threads." for id in [1, 2, 3, 4, 5]: thread.start_new_thread(tester, (cv, id)) time.sleep(0) print time.sleep(5) print print "master: about to signal" cv.acquire() cv.signal() time.sleep(5) print print "master: about to broadcast" cv.acquire() cv.broadcast() time.sleep(5) print print "master: starting timedwait threads." print thread.start_new_thread(time_tester, (cv, 6, 0)) thread.start_new_thread(time_tester, (cv, 7, 3)) thread.start_new_thread(time_tester, (cv, 8, 10)) thread.start_new_thread(time_tester, (cv, 9, 15)) time.sleep(5) cv.acquire() print "master: about to broadcast." cv.broadcast() time.sleep(5) print print "press C-c to exit." print signal.pause() def tester(cv, id): print "thread %d: Started tester." % (id) print "thread %d: About to wait." % (id) cv.acquire() cv.wait() cv.release() print "thread %d: Released." % (id) thread.exit() def time_tester(cv, id, timeout): print "thread %d: Started time tester - waiting %d secs." % (id, timeout) print "thread %d: About to wait." % (id) cv.acquire() if cv.timedwait(timeout) == 0: print "thread %d: Signalled." % (id) else: print "thread %d: Timeout expired." % (id) cv.release() thread.exit() ############################################################################# if __name__ == "__main__": cvtest() #############################################################################