#!/usr/bin/env python import sys from threading import Thread from Fnorb.orb import BOA, CORBA import datatransfer, datatransfer_skel # # A server object that implements the "short" type transfer testing # interface # class ShortImpl(datatransfer_skel.Short_skel): """ Implementation of the 'Short' interface. """ def shortIn(self, v): return v def shortOut(self, vin): return vin def shortInOut(self, vin, vinout): return (vinout, vin) # # A server object that implements the "long" type transfer testing # interface # class LongImpl(datatransfer_skel.Long_skel): """ Implementation of the 'Long' interface. """ def longIn(self, v): return v def longOut(self, vin): return vin def longInOut(self, vin, vinout): return (vinout, vin) # # A server object that implements the "char" type transfer testing # interface # class CharImpl(datatransfer_skel.Char_skel): """ Implementation of the 'Char' interface. """ def charIn(self, v): return v def charOut(self, vin): return vin def charInOut(self, vin, vinout): return (vinout, vin) # # A server object that implements the "wchar" type transfer testing # interface # class WCharImpl(datatransfer_skel.WChar_skel): """ Implementation of the 'WChar' interface. """ def wCharIn(self, v): return v def wCharOut(self, vin): return vin def wCharInOut(self, vin, vinout): return (vinout, vin) # # A server object that implements the "wstring" type transfer testing # interface # class WStringImpl(datatransfer_skel.WString_skel): """ Implementation of the 'WString' interface. """ def wStringIn(self, v): return v def wStringOut(self, vin): return vin def wStringInOut(self, vin, vinout): return (vinout, vin) # # A server object that implements the "sequence" type transfer testing # interface # class SequenceImpl(datatransfer_skel.Sequence_skel): """ Implementation of the 'Sequence' interface. """ def seqIn(self, v): return v def seqOut(self, vin): return vin def seqInOut(self, vin, vinout): return (vinout, vin) # # A server object that implements the "fixed" type transfer testing # interface # class FixedImpl(datatransfer_skel.Fixed_skel): """ Implementation of the 'Fixed' interface. """ def fixedIn(self, v): return v def fixedOut(self, vin): return vin def fixedInOut(self, vin, vinout): return (vinout, vin) def run(): # Initialise the ORB. orb = CORBA.ORB_init([], CORBA.ORB_ID) # Initialise the BOA. boa = BOA.BOA_init([], BOA.BOA_ID) # # Create a 'short' test object # short_obj = boa.create('short', ShortImpl._FNORB_ID) impl = ShortImpl() boa.obj_is_ready(short_obj, impl) ref_file = open('short.ref', 'w') ref_file.write(orb.object_to_string(short_obj)) ref_file.close() # # Create a 'long' test object # long_obj = boa.create('long', LongImpl._FNORB_ID) impl = LongImpl() boa.obj_is_ready(long_obj, impl) ref_file = open('long.ref', 'w') ref_file.write(orb.object_to_string(long_obj)) ref_file.close() # # Create a 'char' test object # char_obj = boa.create('char', CharImpl._FNORB_ID) impl = CharImpl() boa.obj_is_ready(char_obj, impl) ref_file = open('char.ref', 'w') ref_file.write(orb.object_to_string(char_obj)) ref_file.close() # # Create a 'wchar' test object # wchar_obj = boa.create('wchar', WCharImpl._FNORB_ID) impl = WCharImpl() boa.obj_is_ready(wchar_obj, impl) ref_file = open('wchar.ref', 'w') ref_file.write(orb.object_to_string(wchar_obj)) ref_file.close() # # Create a 'wstring' test object # wstring_obj = boa.create('wstring', WStringImpl._FNORB_ID) impl = WStringImpl() boa.obj_is_ready(wstring_obj, impl) ref_file = open('wstring.ref', 'w') ref_file.write(orb.object_to_string(wstring_obj)) ref_file.close() # # Create a 'sequence' test object # sequence_obj = boa.create('sequence', SequenceImpl._FNORB_ID) impl = SequenceImpl() boa.obj_is_ready(sequence_obj, impl) ref_file = open('sequence.ref', 'w') ref_file.write(orb.object_to_string(sequence_obj)) ref_file.close() # # Create a 'fixed' test object # fixed_obj = boa.create('fixed', FixedImpl._FNORB_ID) impl = FixedImpl() boa.obj_is_ready(fixed_obj, impl) ref_file = open('fixed.ref', 'w') ref_file.write(orb.object_to_string(fixed_obj)) ref_file.close() class MainLoop: def __init__(self, boa): self.boa = boa def run(self): self.boa._fnorb_mainloop() loop = MainLoop(boa) thread = Thread(target=loop.run); thread.start() sys.stdout.write("Ready!\n") sys.stdout.flush() while sys.stdin.readline() == '': pass boa._fnorb_quit() thread.join() if __name__ == "__main__": run()