#!/usr/bin/env python ############################################################################# # Copyright (C) DSTC Pty Ltd (ACN 052 372 577) 1997, 1998, 1999 # All Rights Reserved. # # The software contained on this media is the property of the DSTC Pty # Ltd. Use of this software is strictly in accordance with the # license agreement in the accompanying LICENSE.HTML file. If your # distribution of this software does not contain a LICENSE.HTML file # then you have no rights to use this software in any manner and # should contact DSTC at the address below to determine an appropriate # licensing arrangement. # # DSTC Pty Ltd # Level 7, GP South # Staff House Road # University of Queensland # St Lucia, 4072 # Australia # Tel: +61 7 3365 4310 # Fax: +61 7 3365 4311 # Email: enquiries@dstc.edu.au # # This software is being provided "AS IS" without warranty of any # kind. In no event shall DSTC Pty Ltd be liable for damage of any # kind arising out of or in connection with the use or performance of # this software. # # Project: Fnorb # File: $Source: /cvsroot/fnorb/fnorb/orb/GIOPClientWorkerReactive.py,v $ # Version: @(#)$RCSfile: GIOPClientWorkerReactive.py,v $ $Revision: 1.7 $ # ############################################################################# """ GIOPClientWorkerReactive class. """ # Fnorb modules. import CORBA, EventHandler, GIOPClientWorker, GIOPConnectionHandler, Reactor class GIOPClientWorkerReactive(GIOPClientWorker.GIOPClientWorker, EventHandler.EventHandler): """ GIOPClientWorkerReactive class. A concrete 'EventHandler' in the 'Reactor' pattern. This class is *not* thread safe, but since the whole point of the 'Reactor' pattern is to work in a single-threaded environment, I don't see this as much of a problem ;^) """ def __init__(self, protocol, address, giop_version): """ Provide an interface to a remote object! 'protocol' is the protocol used by this worker. 'address' is the address of the remote object. """ # fixme: We don't need to keep these as instance variables... (maybe # for unpickling? self.__protocol = protocol self.__address = address # This flag is set when we get a CloseConnection message, or an # exception event. self.__closed = 0 # Each request has a unique id. associated with it. self.__request_id = 0 # Queue of outgoing messages. self.__outgoing = [] # [(RequestId, Message)] # Dictionary of complete replies. self.__replies = {} # {RequestId: (ReplyHeader, Cursor)} # Ask the protocol to create a connection and actually connect to the # remote object. self.__connection = protocol.create_connection() self.__connection.connect(self.__address) # Create a handler to look after the connection. self.__handler = GIOPConnectionHandler.GIOPConnectionHandler \ (giop_version, self, self.__connection) # Get a reference to the active Reactor. # # fixme: Should the reactor be obtained from the protocol? self.__reactor = Reactor.Reactor_init() # Register our interest in read events with the Reactor. self.__reactor.register_handler(self, Reactor.READ) # Store the GIOP version self._giop_version = giop_version return def pseudo__del__(self): """ Pseudo destructor to remove circular references. This method is called from the 'pseudo__del__' of the GIOPClient that we belong to. """ # Close down the worker. self.__close() # The handler holds a (circular) reference to this instance so we # have to explicitly clean it up. self.__handler.pseudo__del__() # Clean up *our* reference to *it*! del self.__handler return ######################################################################### # GIOPClientWorker interface. ######################################################################### def send(self, request_id, message): """ Send an operation request to the remote object. This method WAITS for the message to be sent. """ # Add the outgoing message to the queue. self.__outgoing.append((request_id, message)) # Register my interest in write events with the Reactor. self.__reactor.register_handler(self, Reactor.WRITE) # Get the reactor to process events until the message has been sent. while 1: for (id, message) in self.__outgoing: if request_id == id: break # Else, the request has been sent! else: break # Get the reactor to wait for and process a single event. self.__reactor.do_one_event() return def recv(self, request_id): """ Wait for a specific reply. """ # Get the reactor to process events until we either get a close event # or we get a reply. while 1: # Have we got our reply yet? try: reply = self.__replies[request_id] # Was that a fragmented reply if not reply[1].stream().more_fragments(): del self.__replies[request_id] break # Err, nope! except KeyError: pass # Get the reactor to wait for and process a single event. self.__reactor.do_one_event() return reply def poll(self, request_id): """ Poll for a reply to a specific request. """ # Get the reactor to poll for a single event. self.__reactor.do_one_event(0) # Have we got our reply yet? return self.__replies.has_key(request_id) def peek(self, request_id): """ Peek at the reply for the specified request. This method does *not* delete the reply from the client's queue. """ # Get the reply. return self.__replies[request_id] def delete_reply(self, request_id): """ Delete the reply with the specified request id. """ # Delete the reply. del self.__replies[request_id] return def next_request_id(self): """ Return the next request id. """ # fixme: This will get stupidly big oneday!!!! self.__request_id = self.__request_id + 1 return self.__request_id def is_closed(self): """ Has the worker received a close event? """ return self.__closed def close_connection(self): """ Orderly shutdown, in repsonse to a 'CloseConnection' message. """ if not self.__closed: # Close down the worker. self.__close() # We notify our 'GIOPClient' that a 'CloseConnection' message # was received by raising a 'CORBA.TRANSIENT' system exception. raise CORBA.TRANSIENT() return ######################################################################### # EventHandler interface. ######################################################################### def handle_event(self, mask): """ Callback method to handle all events except close events. """ # Read event. if mask & Reactor.READ: self.__handler.recv() # Write event. if mask & Reactor.WRITE: # Get the message at the head of the outgoing queue. (request_id, message) = self.__outgoing[0] # Send it! self.__handler.send(message) # Exception event. if mask & Reactor.EXCEPTION: # Close down the worker. self.__close() # Barf! raise CORBA.COMM_FAILURE() # System exception. return def handle_close(self): """ Callback method to handle close events. """ # Close down the worker. self.__close() return def handle(self): """ Return my underlying I/O handle. In this case, my I/O handle is the file descriptor of my socket. """ return self.__connection.handle() ######################################################################### # GIOPConnectionHandlerListener interface. ######################################################################### def message_received(self, message): """ Called when a complete GIOP message has been received. """ # Code re-use! The protected method '_message_received' is implemented # in the 'GIOPClientWorker' class. # # If the message is a GIOP Reply message, then this method will call # our '_reply_received' method. self._message_received(message) return def message_sent(self): """ Called when a complete GIOP message has been sent. """ # Get the details of the message that has just been sent. (request_id, message) = self.__outgoing[0] # Remove the message from the outgoing queue. del self.__outgoing[0] # If there are no other messages left to send then tell the Reactor # that I am no longer interested in write events. if len(self.__outgoing) == 0: self.__reactor.unregister_handler(self, Reactor.WRITE) return def _get_connection(self): return self.__connection ######################################################################### # Protected interface. ######################################################################### def _reply_received(self, reply_header, cursor): """ Called by the '_message_received' method of GIOPClientWorker. """ # Add the reply to our dictionary of complete replies. self.__replies[reply_header.request_id] = (reply_header, cursor) return def _fragment_received(self, header, cursor): """Called by the _message_received method of GIOPClientWorker.""" reply = None if header is None: previous = Util._fnorb_fragment_11(self.__replies) if previous: (old_header, reply) = previous else: self._message_error() else: # GIOP 1.2: message id included in FragmentHeader old_header, reply = self.__replies[header.request_id] if reply is not None: reply.stream().add_fragment(cursor.read(), cursor.stream().more_fragments()) return ######################################################################### # Private interface. ######################################################################### def __close(self): """ Close down the worker. """ # Withdraw all of my Reactor registrations. self.__reactor.unregister_handler(self, Reactor.ALL) # Close our connection. self.__connection.disconnect() # All done! self.__closed = 1 return #############################################################################