#!/usr/bin/env python ############################################################################# # Copyright (C) DSTC Pty Ltd (ACN 052 372 577) 1997, 1998, 1999 # All Rights Reserved. # # The software contained on this media is the property of the DSTC Pty # Ltd. Use of this software is strictly in accordance with the # license agreement in the accompanying LICENSE.HTML file. If your # distribution of this software does not contain a LICENSE.HTML file # then you have no rights to use this software in any manner and # should contact DSTC at the address below to determine an appropriate # licensing arrangement. # # DSTC Pty Ltd # Level 7, GP South # Staff House Road # University of Queensland # St Lucia, 4072 # Australia # Tel: +61 7 3365 4310 # Fax: +61 7 3365 4311 # Email: enquiries@dstc.edu.au # # This software is being provided "AS IS" without warranty of any # kind. In no event shall DSTC Pty Ltd be liable for damage of any # kind arising out of or in connection with the use or performance of # this software. # # Project: Fnorb # File: $Source: /cvsroot/fnorb/fnorb/orb/GIOPServer.py,v $ # Version: @(#)$RCSfile: GIOPServer.py,v $ $Revision: 1.15 $ # ############################################################################# """ GIOPServer class. """ # Fnorb modules. import fnorb_thread, cdrpy, CORBA, GIOP, GIOPServerWorker, OctetStream, Util, IOP class GIOPServer: """ GIOPServer class. """ def __init__(self, giop_version, connection): """ Constructor. 'giop_version' is the GIOP version of this ORB 'connection' is a connection to a remote ORB. """ # A mutex to make access to the server thread-safe. self.__lk = fnorb_thread.allocate_lock() self.__lk.acquire() try: # Get the object adapter used by the ORB. # # fixme: We get a reference to the object adapter here for the sake # of efficiency, but is this dynamic enough? # # fixme: Should this be passed as a parameter to the constructor? self.__oa = CORBA.ORB_init()._fnorb_object_adapter() # Get a reference to the factory for GIOP server workers. swf = GIOPServerWorker.GIOPServerWorkerFactory_init() # Create a new GIOP server worker (the concrete type of which will # be determined by the threading model). self.__worker = swf.create_worker(giop_version, self, connection) # Flag set when our worker is closed. self.__closed = 0 # Store the GIOP version self.__giop_version = giop_version self.__connection = connection self.__fragments = {} finally: self.__lk.release() def pseudo__del__(self): """ Pseudo destructor to remove circular references. """ self.__lk.acquire() try: # Remove the (circular) reference to our worker. del self.__worker # All done! self.__closed = 1 finally: self.__lk.release() ######################################################################### # GIOPServer interface. ######################################################################### def process_giop_message(self, message): """ Process a GIOP message. This method is called by our worker when it has received a GIOP message. """ # Get a cursor for the message. cursor = message.cursor() # Get the GIOP message header. giop_header = message.header() # Make sure that the header has the right magic and that we are talking # the same, or a lesser, version of GIOP. if (giop_header.magic != Util.MAGIC or giop_header.GIOP_version.major > self.__giop_version.major or giop_header.GIOP_version.minor > self.__giop_version.minor): self.__message_error() return # Inherit GIOP version from header, to use the negotiated protocol self.__giop_version = giop_header.GIOP_version # Handle each GIOP message type. if giop_header.message_type == GIOP.Request.value(): # Unmarshal the GIOP request header. if giop_header.GIOP_version.minor == 0: tc = CORBA.typecode('IDL:omg.org/GIOP/RequestHeader_1_0:1.0') elif giop_header.GIOP_version.minor == 1: tc = CORBA.typecode('IDL:omg.org/GIOP/RequestHeader_1_1:1.0') elif giop_header.GIOP_version.minor == 2: tc = CORBA.typecode('IDL:omg.org/GIOP/RequestHeader_1_2:1.0') else: assert 0, 'Internal error: invalid GIOP version minor number' # XXX: fragmented request headers are not supported request_header = tc._fnorb_unmarshal_value(cursor) # Align on the next 8-byte boundary for GIOP 1.2 if giop_header.GIOP_version.minor == 2: padding_length = cdrpy.padding_length(cursor.offset(), 8) for i in range(padding_length): cursor.unmarshal('o'); # Check for Codesets context for c in request_header.service_context: if c.context_id == IOP.CodeSets: self.__connection.set_negotiated_codeset(c) break message.set_tcs_w(self.__connection.get_tcs_w()) # queue fragmented messages if message.more_fragments(): self.__fragments[request_header.request_id] = (request_header, message) return # Pass the message to the active OA to invoke the operation on the # appropriate object implementation. The OA will call one of # our callback methods iff there is a reply to send. self.__oa._fnorb_request((self, request_header, cursor)) elif giop_header.message_type == GIOP.CancelRequest.value(): # fixme: We just ignore these at the moment! pass elif giop_header.message_type == GIOP.LocateRequest.value(): # Unmarshal the GIOP locate request header. if giop_header.GIOP_version.minor == 0: tc = CORBA.typecode( 'IDL:omg.org/GIOP/LocateRequestHeader_1_0:1.0') elif giop_header.GIOP_version.minor == 1: tc = CORBA.typecode( 'IDL:omg.org/GIOP/LocateRequestHeader_1_1:1.0') elif giop_header.GIOP_version.minor == 2: tc = CORBA.typecode( 'IDL:omg.org/GIOP/LocateRequestHeader_1_2:1.0') else: assert 0, 'Internal error: invalid GIOP version minor number' request_header = tc._fnorb_unmarshal_value(cursor) # queue fragmented messages if message.more_fragments(): self.__fragments[request_header.request_id] = (request_header, message) return # See if the object is here! if giop_header.GIOP_version.minor != 2: if self.__oa._fnorb_object_here(request_header.object_key): status = GIOP.OBJECT_HERE else: status = GIOP.UNKNOWN_OBJECT else: if request_header.target.d != GIOP.KeyAddr: # TODO Better error: Only KeyAddr supported raise self.__message_error() if self.__oa._fnorb_object_here( request_header.target.v): status = GIOP.OBJECT_HERE else: status = GIOP.UNKNOWN_OBJECT # Create and send the locate reply. self.__locate_reply(request_header, status) elif giop_header.message_type == GIOP.MessageError.value(): # What can we do here? Just ignore the message! # FIXME need to shutdown the connection pass elif giop_header.message_type == GIOP.Fragment.value(): if giop_header.GIOP_version.minor == 0: # Fragments not supported in GIOP 1.0 self.__message_error() return elif giop_header.GIOP_version.minor == 1: # Fragments in GIOP 1.1 had no fragment header # so there should be atmost one outstanding message previous = Util._fnorb_fragment_11(self.__fragments) if previous is None: self.__message_error() return else: old_header, old_message = previous elif giop_header.GIOP_version.minor == 2: tc = CORBA.typecode('IDL:omg.org/GIOP/FragmentHeader_1_2:1.0') header = tc._fnorb_unmarshal_value(cursor) try: old_header, old_message = self.__fragments[header.request_id] except KeyError: self.__message_error() return else: assert 0, 'Internal error' old_message.add_fragment(cursor.read(), message.more_fragments()) if not old_message.more_fragments(): # XXX Could inline completion here. # The recursive invocation means that we will # parse the header once more del self.__fragments[old_header.request_id] self.process_giop_message(old_message) # Some crappy message or other ;^) else: self.__message_error() return def close_connection(self): """ Close down the connection. Currently, Fnorb does not close down servers, so this is not used. """ # Send the close connection message. self.__close_connection() # In multi-thread mode, if the client has disconnected then the worker # will be 'None'. worker = self.__get_worker() if worker is not None: worker.close_connection() return ######################################################################### # Object adapter callback interface. ######################################################################### def reply(self, request_header, server_request): """ Create and send a successful reply message. """ # Start the reply. reply = OctetStream.GIOPMessage(self.__giop_version, type=GIOP.Reply) reply.set_tcs_w(self.__connection.get_tcs_w()) cursor = reply.cursor() # Create the reply header. if isinstance(request_header, GIOP.RequestHeader_1_0): reply_header = GIOP.ReplyHeader_1_0(request_header.service_context, request_header.request_id, GIOP.NO_EXCEPTION) tc = CORBA.typecode('IDL:omg.org/GIOP/ReplyHeader_1_0:1.0') elif isinstance(request_header, GIOP.RequestHeader_1_1): # The 1.1 reply header is an alias for the 1.0 reply header reply_header = GIOP.ReplyHeader_1_0(request_header.service_context, request_header.request_id, GIOP.NO_EXCEPTION) tc = CORBA.typecode('IDL:omg.org/GIOP/ReplyHeader_1_1:1.0') elif isinstance(request_header, GIOP.RequestHeader_1_2): reply_header = GIOP.ReplyHeader_1_2(request_header.request_id, GIOP.NO_EXCEPTION, request_header.service_context) tc = CORBA.typecode('IDL:omg.org/GIOP/ReplyHeader_1_2:1.0') else: assert 0 # Marshal it onto the octet stream. tc._fnorb_marshal_value(cursor, reply_header) # Align on the next 8-byte boundary for GIOP 1.2 if self.__giop_version.minor == 2: padding_length = cdrpy.padding_length(cursor.offset(), 8) for i in range(padding_length): cursor.marshal('o', 0); # Marshal the results onto the octet stream. self.__marshal_results(cursor, server_request.outputs(), server_request._fnorb_results()) # Send it! self.__send(reply) return def user_exception(self, request_header, server_request): """ Create and send a user exception reply message. """ # Start the reply. reply = OctetStream.GIOPMessage(self.__giop_version, type=GIOP.Reply) cursor = reply.cursor() # Create the reply header. if isinstance(request_header, GIOP.RequestHeader_1_0): reply_header = GIOP.ReplyHeader_1_0(request_header.service_context, request_header.request_id, GIOP.USER_EXCEPTION) tc = CORBA.typecode('IDL:omg.org/GIOP/ReplyHeader_1_0:1.0') elif isinstance(request_header, GIOP.RequestHeader_1_1): # The 1.1 reply header is an alias for the 1.0 reply header reply_header = GIOP.ReplyHeader_1_0(request_header.service_context, request_header.request_id, GIOP.USER_EXCEPTION) tc = CORBA.typecode('IDL:omg.org/GIOP/ReplyHeader_1_1:1.0') elif isinstance(request_header, GIOP.RequestHeader_1_2): reply_header = GIOP.ReplyHeader_1_2(request_header.request_id, GIOP.USER_EXCEPTION, request_header.service_context) tc = CORBA.typecode('IDL:omg.org/GIOP/ReplyHeader_1_2:1.0') else: assert 0 # Marshal it onto the octet stream. tc._fnorb_marshal_value(cursor, reply_header) # Get the exception that was raised in response to the server request. ex = server_request._fnorb_exception() # Try to find a matching exception. for typecode in server_request.exceptions(): if ex._FNORB_ID == typecode.id(): break # If there is no matching exception then raise an 'UNKNOWN' system # exception. else: raise CORBA.UNKNOWN() # System exception. if self.__giop_version.minor == 2: padding_length = cdrpy.padding_length(cursor.offset(), 8) for i in range(padding_length): cursor.marshal('o', 0); # Marshal the repository id of the exception followed by its members. cursor.marshal('s', typecode.id()) typecode._fnorb_marshal_value(cursor, ex) # Send it. self.__send(reply) return def system_exception(self, request_header, ex): """ Create and send a system exception reply message. """ # Create an octet stream and get a cursor for it. reply = OctetStream.GIOPMessage(self.__giop_version, type=GIOP.Reply) cursor = reply.cursor() # Create the reply header. if isinstance(request_header, GIOP.RequestHeader_1_0): reply_header = GIOP.ReplyHeader_1_0(request_header.service_context, request_header.request_id, GIOP.SYSTEM_EXCEPTION) tc = CORBA.typecode('IDL:omg.org/GIOP/ReplyHeader_1_0:1.0') elif isinstance(request_header, GIOP.RequestHeader_1_1): # The 1.1 reply header is an alias for the 1.0 reply header reply_header = GIOP.ReplyHeader_1_0(request_header.service_context, request_header.request_id, GIOP.SYSTEM_EXCEPTION) tc = CORBA.typecode('IDL:omg.org/GIOP/ReplyHeader_1_1:1.0') elif isinstance(request_header, GIOP.RequestHeader_1_2): reply_header = GIOP.ReplyHeader_1_2(request_header.request_id, GIOP.SYSTEM_EXCEPTION, request_header.service_context) tc = CORBA.typecode('IDL:omg.org/GIOP/ReplyHeader_1_2:1.0') else: assert 0 # Marshal it onto the octet stream. tc._fnorb_marshal_value(cursor, reply_header) if self.__giop_version.minor == 2: padding_length = cdrpy.padding_length(cursor.offset(), 8) for i in range(padding_length): cursor.marshal('o', 0); # Marshal the system exception onto the octet stream. cursor.marshal('s', "IDL:omg.org/CORBA/%s:1.0" % ex.__class__.__name__) ex._fnorb_marshal(cursor) # Send it. self.__send(reply) return ######################################################################### # Private interface. ######################################################################### def __get_worker(self): """ Get our GIOP server worker. """ self.__lk.acquire() try: if self.__closed: worker = None else: worker = self.__worker finally: self.__lk.release() return worker def __send(self, message): """ Send a GIOP message via our worker. """ # In multi-thread mode, if the client has disconnected before we have # had a chance to send the reply, then the worker will be 'None'. worker = self.__get_worker() if worker is not None: worker.send(message) return def __marshal_results(self, cursor, typecodes, results): """ Marshal the results of a request onto an octet stream. """ no_of_outputs = len(typecodes) # If there are no outputs then do nothing ;^) if no_of_outputs == 0: pass # If there is exactly 1 result from the request then 'results' # contains a single value. elif no_of_outputs == 1: # Marshal the first and only output! typecodes[0]._fnorb_marshal_value(cursor, results) # Otherwise, 'results' is a tuple containing all of the results. else: i = 0 for typecode in typecodes: typecode._fnorb_marshal_value(cursor, results[i]) i = i + 1 return def __locate_reply(self, request_header, status): """ Create and send a locate reply. """ # Start the reply. message = OctetStream.GIOPMessage(self.__giop_version, type=GIOP.LocateReply) cursor = message.cursor() # Create the locate reply header and marshal it onto the octet stream. reply_header = GIOP.LocateReplyHeader_1_0(request_header.request_id,status) tc = CORBA.typecode('IDL:omg.org/GIOP/LocateReplyHeader_1_0:1.0') tc._fnorb_marshal_value(cursor, reply_header) # Send it! self.__send(message) return def __message_error(self): """ Create and send a 'MessageError' message. """ # Create the 'MessageError' message. message = OctetStream.GIOPMessage(self.__giop_version, type=GIOP.MessageError) # Send it! self.__send(message) return def __close_connection(self): """ Create and send a close connection message. """ # Create the 'CloseConnection' message. message = OctetStream.GIOPMessage(self.__giop_version, type=GIOP.CloseConnection) # Send it! self.__send(message) return #############################################################################