#!/usr/bin/env python ############################################################################# # Copyright (C) DSTC Pty Ltd (ACN 052 372 577) 1997, 1998, 1999 # All Rights Reserved. # # The software contained on this media is the property of the DSTC Pty # Ltd. Use of this software is strictly in accordance with the # license agreement in the accompanying LICENSE.HTML file. If your # distribution of this software does not contain a LICENSE.HTML file # then you have no rights to use this software in any manner and # should contact DSTC at the address below to determine an appropriate # licensing arrangement. # # DSTC Pty Ltd # Level 7, GP South # Staff House Road # University of Queensland # St Lucia, 4072 # Australia # Tel: +61 7 3365 4310 # Fax: +61 7 3365 4311 # Email: enquiries@dstc.edu.au # # This software is being provided "AS IS" without warranty of any # kind. In no event shall DSTC Pty Ltd be liable for damage of any # kind arising out of or in connection with the use or performance of # this software. # # Project: Fnorb # File: $Source: /cvsroot/fnorb/fnorb/orb/GIOPClient.py,v $ # Version: @(#)$RCSfile: GIOPClient.py,v $ $Revision: 1.12 $ # ############################################################################# """ GIOPClient class. """ # Standard/built-in modules. import new # Fnorb modules. import fnorb_thread, CORBA, GIOP, GIOPClientWorker, IOP, OctetStream, Util import cdrpy class GIOPClient: """ GIOPClient class. """ def __init__(self, protocol, address, giop_version): """ Constructor. 'protocol' is the protocol creating this client. 'address' is the address of the remote object. 'giop_version' is the GIOP version supported by the remote object """ self.__protocol = protocol self.__address = address self.__giop_version = giop_version # We only get a worker when the first request is made. self.__worker = None # Mutex for the worker. Amongst other things, this makes sure that we # don't create more than one worker for the same client in # multi-threaded environments! self.__lk = fnorb_thread.allocate_lock() return def pseudo__del__(self): """ Pseudo destructor to remove circular references. This method is called by the GIOPClientManager when it determines that there are no more clients using this address. """ # Clean up our worker (required because of circular references between # the worker and its connection handler). self.__lk.acquire() if self.__worker is not None: self.__worker.pseudo__del__() self.__worker = None self.__lk.release() return ######################################################################### # GIOPClient interface. ######################################################################### def synchronous(self, request, parameters): """ Send a 'synchronous' GIOP request message & wait for the reply. """ # Get our worker. worker = self.__get_worker() # Get a unique request id. request_id = self.__worker.next_request_id() # Get the active object key. object_key = request.object()._fnorb_object_key() # Start a new GIOP message. message = OctetStream.GIOPMessage(self.__giop_version, type=GIOP.Request) cursor = message.cursor() # Create a GIOP request header. if self.__giop_version.minor == 0: request_header = GIOP.RequestHeader_1_0( request.context(), request_id, 0x03, # Response expected. object_key, request.operation(), '') tc = CORBA.typecode('IDL:omg.org/GIOP/RequestHeader_1_0:1.0') elif self.__giop_version.minor == 1: conn = worker._get_connection() request._fnorb_check_codeset_context(conn) message.set_tcs_w(conn.get_tcs_w()) request_header = GIOP.RequestHeader_1_1( request.context(), request_id, 1, # Response expected. (0, 0, 0), # Reserved object_key, request.operation(), '') tc = CORBA.typecode('IDL:omg.org/GIOP/RequestHeader_1_1:1.0') elif self.__giop_version.minor == 2: conn = worker._get_connection() request._fnorb_check_codeset_context(conn) message.set_tcs_w(conn.get_tcs_w()) request_header = GIOP.RequestHeader_1_2( request_id, 0x03, # Response expected. (0, 0, 0), # Reserved GIOP.TargetAddress(GIOP.KeyAddr, object_key), request.operation(), request.context()) tc = CORBA.typecode('IDL:omg.org/GIOP/RequestHeader_1_2:1.0') else: assert 0, 'Internal error: Invalid GIOP minor version' # Marshal the request header onto the octet stream. tc._fnorb_marshal_value(cursor, request_header) # Align on the next 8-byte boundary for GIOP 1.2 if self.__giop_version.minor == 2: padding_length = cdrpy.padding_length(cursor.offset(), 8) for i in range(padding_length): cursor.marshal('o', 0); # Marshal the request parameters onto the octet stream. self.__marshal_parameters(cursor, request.inputs(), parameters) try: # Send the request. worker.send(request_id, message) # Wait for the reply. reply = worker.recv(request_id) # Unpack the reply. (reply_header, cursor) = reply # If we have received a 'LOCATION_FORWARD' message. if reply_header.reply_status == GIOP.LOCATION_FORWARD: # Unmarshal the IOR that contains the address to forward # operation requests to. ior = new.instance(IOP.IOR, {}) ior._fnorb_unmarshal(cursor) # Update the object reference to use the forwarded IOR. request.object()._fnorb_forward(ior) # Try again! forwarded = 1 result = None # The reply was *not* a 'LOCATION_FORWARD' so lets deal with it! else: forwarded = 0 result = self.__process_reply(request, reply) # Retry on transient failures (this includes 'CloseConnection' messages # from the server. except CORBA.TRANSIENT, ex: forwarded = 1 result = None # If a communication (ie. socket!) error occurred then see if we have # previously been 'forwarded'. except CORBA.COMM_FAILURE, ex: # If we *have* been 'forwarded' then try again using the object's # original address. if request.object()._fnorb_forwarded(): # Update the object reference to use its original IOR. request.object()._fnorb_unforward() # Try again! forwarded = 1 result = None # Otherwise, if we haven't been forwarded then give up! else: raise ex return (forwarded, result) def deferred(self, request, parameters): """ Create and send send a 'deferred' GIOP request message. """ # Get our worker. worker = self.__get_worker() # Get a unique request id. request_id = self.__worker.next_request_id() # Get the active object key. object_key = request.object()._fnorb_object_key() # Start a new GIOP message. message = OctetStream.GIOPMessage(self.__giop_version, type=GIOP.Request) cursor = message.cursor() # Create a GIOP request header. request_header = GIOP.RequestHeader_1_0(request.context(), request_id, 1, # Response expected. object_key, request.operation(), '') # Marshal the request header onto the octet stream. tc = CORBA.typecode('IDL:omg.org/GIOP/RequestHeader_1_0:1.0') tc._fnorb_marshal_value(cursor, request_header) # Marshal the request parameters onto the octet stream. self.__marshal_parameters(cursor, request.inputs(), parameters) # Send the request. try: self.__worker.send(request_header.request_id, message) # Retry on transient failures (this includes 'CloseConnection' messages # from the server. except CORBA.TRANSIENT: return self.deferred(request, parameters) return request_header.request_id def oneway(self, request, parameters): """ Create and send a 'oneway' GIOP request message. """ # Get our worker. worker = self.__get_worker() # Get a unique request id. request_id = self.__worker.next_request_id() # Get the active object key. object_key = request.object()._fnorb_object_key() # Start a new GIOP message. message = OctetStream.GIOPMessage(self.__giop_version, type=GIOP.Request) cursor = message.cursor() # Create a GIOP request header. if self.__giop_version.minor == 0: # Create a GIOP request header. request_header = GIOP.RequestHeader_1_0(request.context(), request_id, 0, # No response expected. object_key, request.operation(), '') # Marshal the request header onto the octet stream. tc = CORBA.typecode('IDL:omg.org/GIOP/RequestHeader_1_0:1.0') elif self.__giop_version.minor == 1: conn = worker._get_connection() request._fnorb_check_codeset_context(conn) message.set_tcs_w(conn.get_tcs_w()) request_header = GIOP.RequestHeader_1_1( request.context(), request_id, 0, # No response expected. (0, 0, 0), # Reserved object_key, request.operation(), '') tc = CORBA.typecode('IDL:omg.org/GIOP/RequestHeader_1_1:1.0') elif self.__giop_version.minor == 2: conn = worker._get_connection() request._fnorb_check_codeset_context(conn) message.set_tcs_w(conn.get_tcs_w()) request_header = GIOP.RequestHeader_1_2( request_id, 0x00, # No response expected. (0, 0, 0), # Reserved GIOP.TargetAddress(GIOP.KeyAddr, object_key), request.operation(), request.context()) tc = CORBA.typecode('IDL:omg.org/GIOP/RequestHeader_1_2:1.0') else: assert 0, 'Internal error: Invalid GIOP minor version' tc._fnorb_marshal_value(cursor, request_header) # Align on the next 8-byte boundary for GIOP 1.2 if self.__giop_version.minor == 2: padding_length = cdrpy.padding_length(cursor.offset(), 8) for i in range(padding_length): cursor.marshal('o', 0); # Marshal the request parameters onto the octet stream. self.__marshal_parameters(cursor, request.inputs(), parameters) # Send the request. try: self.__worker.send(request_header.request_id, message) # Retry on transient failures (this includes 'CloseConnection' messages # from the server. except CORBA.TRANSIENT: return self.oneway(request, parameters) return def reply(self, request): """ Get the reply to a 'deferred' operation request. """ # Get our worker. worker = self.__get_worker() try: # Wait for the reply. reply = worker.recv(request._fnorb_request_id()) # Unpack the reply. (reply_header, cursor) = reply # If we have received a 'LOCATION_FORWARD' message. if reply_header.reply_status == GIOP.LOCATION_FORWARD: # Unmarshal the IOR that contains the address to forward # operation requests to. ior = new.instance(IOP.IOR, {}) ior._fnorb_unmarshal(cursor) # Update the object reference to use the forwarded IOR. request.object()._fnorb_forward(ior) # Try again! forwarded = 1 result = None # The reply was *not* a 'LOCATION_FORWARD' so lets deal with it! else: forwarded = 0 result = self.__process_reply(request, reply) # Retry on transient failures (this includes 'CloseConnection' messages # from the server. except CORBA.TRANSIENT, ex: forwarded = 1 result = None # If a communication (ie. socket!) error occurred then see if we have # previously been 'forwarded'. except CORBA.COMM_FAILURE, ex: # If we *have* been 'forwarded' then try again using the object's # original address. if request.object()._fnorb_forwarded(): # Update the object reference to use its original IOR. request.object()._fnorb_unforward() # Try again! forwarded = 1 result = None # Otherwise, if we haven't been forwarded then give up! else: raise ex return (forwarded, result) def poll(self, request): """ Has a reply been received for the specified request? """ # Get our worker. worker = self.__get_worker() # Get the request id. request_id = request._fnorb_request_id() try: if worker.poll(request_id): # Take a peek at the reply. (reply_header, cursor) = worker.peek(request_id) # If we have received a 'LOCATION_FORWARD' message. if reply_header.reply_status == GIOP.LOCATION_FORWARD: # Delete the reply. worker.delete_reply(self.__request_id) # Unmarshal the IOR that contains the address to forward # operation requests to. ior = new.instance(IOP.IOR, {}) ior._fnorb_unmarshal(cursor) # Update the object reference to use the forwarded IOR. self.__object._fnorb_forward(ior) # Try again! forwarded = 1 result = 0 # If a CORBA system exception occurred... elif reply_header.reply_status == GIOP.SYSTEM_EXCEPTION: # Unmarshal and raise the system exception. raise self.__unmarshal_system_exception(cursor) # The reply is either a 'NO_EXCEPTION' or a 'USER_EXCEPTION' # message. else: forwarded = 0 result = 1 # The reply has not arrived yet 8^( else: forwarded = 0 result = 0 # Retry on transient failures (this includes 'CloseConnection' messages # from the server. except CORBA.TRANSIENT, ex: forwarded = 1 result = None # If a communication (ie. socket!) error occurred then see if we have # previously been 'forwarded'. except CORBA.COMM_FAILURE, ex: # If we *have* been 'forwarded' then try again using the object's # original address. if self.__object._fnorb_forwarded(): # Update the object reference to use its original IOR. self.__object._fnorb_unforward() # Try again! forwarded = 1 result = 0 # Otherwise, if we haven't been forwarded then give up! else: raise ex return (forwarded, result) ######################################################################### # Private interface. ######################################################################### def __get_worker(self): """ Get our GIOP client worker. """ self.__lk.acquire() try: # If this is the first operation request then get a worker. if self.__worker is None: # Get a reference to the factory for GIOP client workers. cwf = GIOPClientWorker.GIOPClientWorkerFactory_init() # Create a new GIOP client worker (the concrete type of which # will be determined by the threading model). self.__worker = cwf.create_worker(self.__protocol, self.__address, self.__giop_version) # If the worker has received a 'CloseConnection' message, then # we need a new one! elif self.__worker.is_closed(): # Clean up the old worker. self.__worker.pseudo__del__() # Get a reference to the factory for GIOP client workers. cwf = GIOPClientWorker.GIOPClientWorkerFactory_init() # Create a new GIOP client worker (the concrete type of which # will be determined by the threading model). self.__worker = cwf.create_worker(self.__protocol, self.__address) # Return value. worker = self.__worker finally: self.__lk.release() return worker def __process_reply(self, request, (reply_header, cursor)): """ Process a GIOP reply. """ # Align on the next 8-byte boundary for GIOP 1.2 if self.__giop_version.minor == 2: padding_length = cdrpy.padding_length(cursor.offset(), 8) for i in range(padding_length): cursor.unmarshal('o'); # If a CORBA system exception occurred... if reply_header.reply_status == GIOP.SYSTEM_EXCEPTION: # Unmarshal and raise the system exception. raise self.__unmarshal_system_exception(cursor) # Else if a user exception occurred... elif reply_header.reply_status == GIOP.USER_EXCEPTION: # Get the repository id of the exception. repository_id = cursor.unmarshal('s') # Try to find a matching exception in the request's list of # exception typecodes. for typecode in request.exceptions(): if typecode.id() == repository_id: break # If there is no matching exception then raise an UNKNOWN. else: raise CORBA.UNKNOWN() # System exception. # Unmarshal and raise the user exception. raise typecode._fnorb_unmarshal_value(cursor) # If we get here then the operation was successful! # # Unmarshal the return value and any 'inout' and 'out' parameters. return self.__unmarshal_results(cursor, request.outputs()) def __marshal_parameters(self, cursor, typecodes, parameters): """ Marshal the parameters onto an octet stream. """ # Marshal each parameter according to its typecode. i = 0 for typecode in typecodes: typecode._fnorb_marshal_value(cursor, parameters[i]) i = i + 1 return def __unmarshal_results(self, cursor, typecodes): """ Unmarshal the result and any 'inout', and 'out' parameters. """ # In the spirit of the ILU mapping, we have the following semantics:- # # If the operation has no outputs (ie. no result, 'inout', or 'out' # parameters) then we return 'None'. # # If the operation has exactly ONE output then we return just that # value. # # If the operation has more than one output then we return a TUPLE # with the result first followed by any 'inout' and 'out' parameters # in the order that they appear in the IDL definition. no_of_outputs = len(typecodes) if no_of_outputs == 0: results = None elif no_of_outputs == 1: results = typecodes[0]._fnorb_unmarshal_value(cursor) else: outputs = [None] * no_of_outputs for i in range(no_of_outputs): outputs[i] = typecodes[i]._fnorb_unmarshal_value(cursor) results = tuple(outputs) return results def __unmarshal_system_exception(self, cursor): """ Unmarshal a system exception from an octet stream. """ # Unmarshal the repository id of the system exception. intrep_id = Util.RepositoryId(cursor.unmarshal('s')) # Get the scoped name from the repository id. scoped_name = intrep_id.scoped_name() # The last two components of the scoped name make up the name of the # Python class that represents the system exception. # # e.g. For the COMM_FAILURE exception, the repository id is:- # # IDL:omg.org/CORBA/COMM_FAILURE:1.0 # # The scoped name is therefore:- # # omg.org/CORBA/COMM_FAILURE # # And the Python class name is:- # # CORBA.COMM_FAILURE klass = eval(scoped_name[-2:].join('.')) # Create an uninitialised exception instance, and then unmarshal the # exception details! ex = new.instance(klass, {}) ex._fnorb_unmarshal(cursor) return ex #############################################################################