#!/usr/bin/env python ############################################################################# # Copyright (C) DSTC Pty Ltd (ACN 052 372 577) 1997, 1998, 1999 # All Rights Reserved. # # The software contained on this media is the property of the DSTC Pty # Ltd. Use of this software is strictly in accordance with the # license agreement in the accompanying LICENSE.HTML file. If your # distribution of this software does not contain a LICENSE.HTML file # then you have no rights to use this software in any manner and # should contact DSTC at the address below to determine an appropriate # licensing arrangement. # # DSTC Pty Ltd # Level 7, GP South # Staff House Road # University of Queensland # St Lucia, 4072 # Australia # Tel: +61 7 3365 4310 # Fax: +61 7 3365 4311 # Email: enquiries@dstc.edu.au # # This software is being provided "AS IS" without warranty of any # kind. In no event shall DSTC Pty Ltd be liable for damage of any # kind arising out of or in connection with the use or performance of # this software. # # Project: Fnorb # File: $Source: /cvsroot/fnorb/fnorb/orb/GIOPClientWorkerThreaded.py,v $ # Version: @(#)$RCSfile: GIOPClientWorkerThreaded.py,v $ $Revision: 1.9 $ # ############################################################################# """ GIOPClientWorkerThreaded class. """ # Standard/built-in modules. import thread # Fnorb modules. import CORBA, GIOPClientWorker, GIOPConnectionHandler # Fnorb threading modules. import condvar class GIOPClientWorkerThreaded(GIOPClientWorker.GIOPClientWorker): """ GIOPClientWorkerThreaded class. """ def __init__(self, protocol, address, giop_version): """ Provide an interface to a remote object! 'protocol' is the protocol used by the client. 'address' is the address of the remote object. """ # fixme: We don't need to keep these as instance variables... (maybe # for unpickling?), self.__protocol = protocol self.__address = address # This flag is set when we get a CloseConnection message, or an # exception event. self.__closed = 0 # Each request has a unique id. associated with it. self.__request_id = 0 # Mutex to make access to the request id and the closed flag # thread-safe. self.__lk = thread.allocate_lock() # Condition variable for outgoing messages. self.__outgoing_cv = condvar.condvar() # Dictionary of condition variables for threads that are blocked # waiting for replies. self.__blocked_pending = {} # {RequestId: condvar} # Dictionary of complete replies. self.__replies = {} # {RequestId: Message} # Mutex for incoming messages. self.__incoming_lk = thread.allocate_lock() # Ask the protocol to create a connection and actually connect to the # remote object. self.__connection = protocol.create_connection() self.__connection.connect(self.__address) # Set the connection to BLOCKING mode. self.__connection.blocking(1) # Create a handler to look after the connection. self.__handler = GIOPConnectionHandler.GIOPConnectionHandler \ (giop_version, self, self.__connection) # Start the read thread. thread.start_new_thread(self.__read_thread, ()) # Store the GIOP version self._giop_version = giop_version return def pseudo__del__(self): """ Pseudo destructor to remove circular references. """ # Close down the worker. self.__close() # The handler holds a (circular) reference to this instance so we # have to explicitly clean it up. self.__handler.pseudo__del__() # Clean up *our* reference to *it*! del self.__handler return ######################################################################### # GIOPClientWorker interface. ######################################################################### def send(self, request_id, message): """ Send an operation request to the remote object. This method BLOCKS until the message has been sent. """ # Block until the connection is available. self.__outgoing_cv.acquire() try: # Send the entire message. n = 0 while n < len(message): n = n + self.__handler.send(message) except CORBA.COMM_FAILURE, ex: # Let somebody else have a go ;^) self.__outgoing_cv.release() # If the connection has been closed then we can try again. if self.is_closed(): raise CORBA.TRANSIENT() else: raise ex # Let somebody else have a go ;^) self.__outgoing_cv.release() return def recv(self, request_id): """ Wait for a specific reply. """ # Make sure that we aren't interfering with the read thread. self.__incoming_lk.acquire() # See if the reply has already arrived. reply = self.__replies.get(request_id) if reply is None: # Create a new condition variable. cv = condvar.condvar() # Add the condition variable to the dictionary of blocked threads. # The read thread will 'signal' the condition variable (if and) # when the corresponding reply is received. self.__blocked_pending[request_id] = cv # Acquire the condition variable so that if the read thread # receives the reply before we have called 'wait', it will be # blocked. cv.acquire() # Let the read thread do its thang! self.__incoming_lk.release() # Wait on the condition variable ('wait' releases the lock on the # condition variable first). cv.wait() cv.release() self.__incoming_lk.acquire() reply = self.__replies[request_id] del self.__replies[request_id] self.__incoming_lk.release() # If the condition variable was signalled because we received a # 'CloseConnection' message, then the reply will be a # 'CORBA.TRANSIENT' exception. If the condition variable was # signalled because of some other failure then the reply will be # an instance of the appropriate CORBA system exception. if isinstance(reply, CORBA.SystemException): raise reply # The reply has already arrived - now that's what I call service ;^) else: del self.__replies[request_id] self.__incoming_lk.release() return reply def poll(self, request_id): """ Poll for a reply to a specific request. """ # Have we got our reply yet self.__incoming_lk.acquire() result = self.__replies.has_key(request_id) self.__incoming_lk.release() return result def peek(self, request_id): """ Peek at the reply for the specified request. This method does *not* delete the reply from the client's queue. """ # Get the reply. self.__incoming_lk.acquire() reply = self.__replies[request_id] self.__incoming_lk.release() return reply def delete_reply(self, request_id): """ Delete the reply with the specified request id. """ # Delete the reply. self.__incoming_lk.acquire() del self.__replies[request_id] self.__incoming_lk.release() return def next_request_id(self): """ Return the next request id. """ # fixme: This will get stupidly big oneday!!!! self.__lk.acquire() request_id = self.__request_id = self.__request_id + 1 self.__lk.release() return request_id def is_closed(self): """ Has the worker received a close event? """ self.__lk.acquire() closed = self.__closed self.__lk.release() return closed def close_connection(self): """ Close down the connection. This method is only ever called from the read thread. """ # Unblock all threads that are waiting for replies. self.__unblock_all(CORBA.TRANSIENT()) # Close down the worker. self.__close() return ######################################################################### # GIOPConnectionHandlerListener interface. ######################################################################### def message_received(self, message): """ Called when a complete GIOP message has been received. """ # Code re-use! The protected method '_message_received' is implemented # in the 'GIOPClientWorker' class. # # If the message is a GIOP Reply message, then this method will call # our '_reply_received' method. self._message_received(message) return def message_sent(self): """ Called when a complete GIOP message has been sent. In the threaded worker, we block until the message has been sent anyway, so we don't have anything to do here! """ pass def _get_connection(self): return self.__connection ######################################################################### # Protected interface. ######################################################################### def _reply_received(self, reply_header, cursor): """ Called by the '_message_received' method of GIOPClientWorker. """ self.__incoming_lk.acquire() # Add the reply to our dictionary of complete replies. self.__replies[reply_header.request_id] = (reply_header, cursor) # Wake up waiting threads self.__do_reply(reply_header, cursor) self.__incoming_lk.release() return def _fragment_received(self, header, cursor): """Called by the _message_received method of GIOPClientWorker.""" self.__incoming_lk.acquire() reply = None if header is None: previous = Util._fnorb_fragment_11(self.__replies) if previous: (old_header, reply) = previous else: self._message_error() else: # GIOP 1.2: message id included in FragmentHeader old_header, reply = self.__replies[header.request_id] if reply is not None: reply.stream().add_fragment(cursor.read(), cursor.stream().more_fragments()) self.__do_reply(old_header, reply) self.__incoming_lk.release() return ######################################################################### # Private interface. ######################################################################### def __read_thread(self): """ Read messages from the server. """ try: while not self.is_closed(): # Blocking receive. self.__handler.recv() # If we have received a 'CloseConnection' message, then we notify # the GIOPClient by raising a 'TRANSIENT' system exception. ex = CORBA.TRANSIENT() except CORBA.SystemException, ex: # If we have received a 'CloseConnection' message, then we notify # the GIOPClient by raising a 'TRANSIENT' system exception. if self.is_closed(): ex = CORBA.TRANSIENT() # Otherwise, it was some other system exception, so close the # connection down. else: self.__close() # Unblock all threads that are waiting for replies. self.__unblock_all(ex) # Explicitly exit the thread! thread.exit() def __close(self): """ Close down the worker. """ self.__lk.acquire() if not self.__closed: # All done! self.__closed = 1 self.__lk.release() # Close our connection. self.__connection.disconnect() else: self.__lk.release() return def __unblock_all(self, exception): """ Unblock all waiting threads with the specified exception. """ self.__incoming_lk.acquire() # Set the reply for each blocked thread to be the specified exception. for request_id in self.__blocked_pending.keys(): self.__replies[request_id] = exception # Get a list of condition variables to signal. blocked_pending = self.__blocked_pending.values() # Clean them up! self.__blocked_pending = {} self.__incoming_lk.release() # Unblock each thread. for cv in blocked_pending: cv.acquire() cv.signal() return def __do_reply(self, reply_header, cursor): # If more fragments are expected, do nothing: if cursor.stream().more_fragments(): return # See if a thread is blocked waiting for this reply. cv = self.__blocked_pending.get(reply_header.request_id) if cv is not None: # Signal the blocked thread. cv.acquire() cv.signal() # Clean up. del self.__blocked_pending[reply_header.request_id] #############################################################################