############################################################################# # # $Id: xnsc.py,v 2.14 2005/02/12 23:45:29 irmen Exp $ # Pyro Name Server Control Tool with GUI # # This is part of "Pyro" - Python Remote Objects # which is (c) Irmen de Jong - irmen@users.sourceforge.net # ############################################################################# from Tkinter import * from naming import NameServerLocator from errors import NamingError, ConnectionClosedError import Pyro import core import sys class xnscFrame: def quit(self): self.master.quit() def clearOutput(self): self.text_out.delete('1.0',AtEnd()) def output(self,txt): self.text_out.insert(AtEnd(),txt) self.text_out.yview(AtEnd()) def outputln(self,txt): self.output(txt+'\n') def b_clearoutput(self, event=None): self.clearOutput() def b_findNS(self,event=None): self.clearOutput() hst,prt = None,None self.authID = self.entry_AuthID.get() if event and event.char=='\r': # Pressed in entry box addr = self.entry_NSloc.get().split(':') hst=addr[0] if len(addr)>1: prt=int(addr[1]) # We need to keep the host/port for the shutdown button... self.NShost = hst self.NSport = prt self.outputln('*** finding NS') locator=NameServerLocator(identification=self.authID) try: self.NS=locator.getNS(hst,prt,trace=1) self.entry_NSloc.delete(0,AtEnd()) self.entry_NSloc.insert(AtEnd(),self.NS.URI.address+':'+str(self.NS.URI.port)) self.entry_AuthID.delete(0,AtEnd()) self.entry_AuthID.insert(AtEnd(),'****') self.enable_buttons() self.outputln(' found, URI='+str(self.NS.URI)) except: self.disable_buttons() self.outputln(' not found:'); a,b = sys.exc_info()[:2] self.outputln(' '+str(a)+' : '+str(b)) self.outputln('See standard output for trace messages.') def handle_comm_error(self,name): # Handle a communication error: disable buttons and print exception a,b = sys.exc_info()[:2] self.outputln('*** '+name+': exception occured:') self.outputln(' '+str(a)+' : '+str(b)) if a==ConnectionClosedError: self.disable_buttons() self.outputln('*** Connection with NS lost - reconnect') def b_list(self,event=None): names = self.entry_arg.get().split() try: if names: self.outputln('*** List groups:') for n in names: self.output(' '+self.NS.fullName(n)+' --> ') try: self.printList(self.NS.list(n)) except NamingError,x: self.outputln('## can\'t list: '+str(x)+' ##') else: self.outputln('*** List default group:') self.printList(self.NS.list(None)) except: self.handle_comm_error('list') def printList(self,lst): out='( ' lst.sort() for (n,t) in lst: if t==0: out+='['+n+'] ' elif t==1: out+=n+' ' self.outputln(out+')') def b_listall(self,event=None): try: flat=self.NS.flatlist() flat.sort() self.outputln('--------- Flat dump of namespace') for (name,val) in flat: self.outputln(' '+name+' --> '+str(val)) self.outputln('--------- End dump') except: self.handle_comm_error('listall') def b_register(self,event=None): self.outputln('*** registering with NS:') try: (name,uri) = self.entry_arg.get().split() try: self.NS.register(name,uri) uri=core.PyroURI(uri) self.outputln(' '+name+' --> '+str(uri)) except NamingError,arg: self.outputln(' Error from NS: '+str(arg)) except: self.handle_comm_error('register') except ValueError: self.outputln(' Invalid arguments, use " ".') def b_resolve(self,event=None): self.outputln('*** resolving:') name=self.entry_arg.get() if not name: self.outputln(' Invalid arguments, use "".') else: try: uri=self.NS.resolve(name) self.outputln(' '+name+' --> '+str(uri)) except NamingError,x: self.outputln(' '+name+' --> ## can\'t resolve: '+str(x)+' ##') except: self.handle_comm_error('resolve') def b_remove(self,event=None): self.outputln('*** removing:') name=self.entry_arg.get() if not name: self.outputln(' Invalid arguments, use "".') else: try: self.NS.unregister(name) self.outputln('*** removed: '+name) except NamingError,x: self.outputln(' Can\'t remove '+name+' because '+str(x)) except: self.handle_comm_error('remove') def b_ping(self,event=None): try: self.NS.ping() self.outputln('*** ping NS: up and running!') except: self.handle_comm_error('ping') def b_creategroup(self,event=None): name=self.entry_arg.get() if not name: self.outputln(' Invalid arguments, use "".') else: try: self.NS.createGroup(name) self.outputln('*** group created: '+name) except: self.handle_comm_error('creategroup') def b_deletegroup(self,event=None): name=self.entry_arg.get() if not name: self.outputln(' Invalid arguments, use "".') else: try: self.NS.deleteGroup(name) self.outputln('*** group deleted: '+name) except: self.handle_comm_error('deletegroup') def b_showmeta(self,event=None): name=self.NS.fullName(self.entry_arg.get()) self.outputln('*** showing meta info of: '+name) try: self.outputln("system meta info : "+str(self.NS._getSystemMeta(name))) self.outputln(" user meta info : "+str(self.NS.getMeta(name))) except NamingError,x: self.outputln(' --> ## can\'t get meta info: '+str(x)+' ##') except: self.handle_comm_error('showmeta') def b_setmeta(self,event=None): self.outputln('*** setting user meta data:') try: (name,meta) = self.entry_arg.get().split(None,1) try: self.NS.setMeta(name,meta) self.outputln(' '+name+' META='+meta) except NamingError,arg: self.outputln(' Error from NS: '+str(arg)) except: self.handle_comm_error('setmeta') except ValueError: self.outputln(' Invalid arguments, use " ".') def b_resync(self,event=None): self.outputln("*** resync NS with twin") try: self.NS.resync() except NamingError,x: self.outputln(' --> ## can\'t resync: '+str(x)+' ##') except: self.handle_comm_error('resync') def b_shutdown(self,event=None): locator = NameServerLocator(self.authID) try: result = locator.sendSysCommand('shutdown',self.NShost,self.NSport,0) self.outputln('*** The NS replied to the shutdown message: '+str(result)) except: self.disable_buttons() self.outputln(' not found:'); a,b = sys.exc_info()[:2] self.outputln(' '+str(a)+' : '+str(b)) def enable_buttons(self): self.enable_disable_buttons(NORMAL) def disable_buttons(self): self.enable_disable_buttons(DISABLED) def enable_disable_buttons(self,state): self.but_ping['state']=state self.but_list['state']=state self.but_listall['state']=state self.but_resolve['state']=state self.but_register['state']=state self.but_remove['state']=state self.but_shutdown['state']=state self.but_showmeta['state']=state self.but_setmeta['state']=state self.but_resync['state']=state self.but_creategroup['state']=state self.but_deletegroup['state']=state def createWidgets(self): frame_top = Frame(self.master,borderwidth=2,relief=GROOVE) frame_top1 = Frame(frame_top,borderwidth=0) Label(frame_top1,text='Name Server Location (host:port)').pack(side=LEFT,anchor=W) self.entry_NSloc=Entry(frame_top1) self.entry_NSloc.bind('',self.b_findNS) self.entry_NSloc.pack(expand=1,fill=X,side=LEFT) Label(frame_top1,text='(press enter)').pack(side=LEFT,anchor=W) frame_top1.pack(fill=X) frame_top2 = Frame(frame_top,borderwidth=0) Label(frame_top2,text='Authorization ID:').pack(side=LEFT,anchor=W) self.entry_AuthID=Entry(frame_top2) self.entry_AuthID.bind('',self.b_findNS) self.entry_AuthID.pack(expand=1,fill=X,side=LEFT) self.but_findNS=Button(frame_top2,text='Auto Discover NS',command=self.b_findNS) self.QUIT=Button(frame_top2,text='QUIT',command=self.quit) self.QUIT.pack(side=RIGHT) self.but_findNS.pack(side=RIGHT) frame_top2.pack(fill=X) frame_top.pack(fill=X) frame_cmds=Frame(self.master) frame_cmds1=Frame(frame_cmds) frame_cmds2=Frame(frame_cmds) self.but_ping=Button(frame_cmds1,text='Ping',state=DISABLED,command=self.b_ping) self.but_list=Button(frame_cmds1,text='List',state=DISABLED,command=self.b_list) self.but_listall=Button(frame_cmds1,text='List All',state=DISABLED,command=self.b_listall) self.but_register=Button(frame_cmds2,text='Register',state=DISABLED,command=self.b_register) self.but_resolve=Button(frame_cmds1,text='Resolve',state=DISABLED,command=self.b_resolve) self.but_remove=Button(frame_cmds2,text='Remove',state=DISABLED,command=self.b_remove) self.but_creategroup=Button(frame_cmds2,text='Create Group',state=DISABLED,command=self.b_creategroup) self.but_deletegroup=Button(frame_cmds2,text='Delete Group',state=DISABLED,command=self.b_deletegroup) self.but_showmeta=Button(frame_cmds1,text='Show Meta',state=DISABLED,command=self.b_showmeta) self.but_setmeta=Button(frame_cmds1,text='Set Meta',state=DISABLED,command=self.b_setmeta) self.but_resync=Button(frame_cmds1,text='ReSync',state=DISABLED,command=self.b_resync) self.but_shutdown=Button(frame_cmds1,text='Shutdown',state=DISABLED,command=self.b_shutdown) self.but_clearoutput=Button(frame_cmds2,text='Clear output',command=self.b_clearoutput) Label(frame_cmds,text='NS commands:').pack(side=LEFT) self.but_ping.pack(side=LEFT) self.but_list.pack(side=LEFT) self.but_listall.pack(side=LEFT) self.but_register.pack(side=LEFT) self.but_resolve.pack(side=LEFT) self.but_remove.pack(side=LEFT) self.but_creategroup.pack(side=LEFT) self.but_deletegroup.pack(side=LEFT) self.but_showmeta.pack(side=LEFT) self.but_setmeta.pack(side=LEFT) self.but_resync.pack(side=LEFT) self.but_shutdown.pack(side=LEFT) self.but_clearoutput.pack(side=RIGHT) frame_args=Frame(self.master,borderwidth=2) self.entry_arg=Entry(frame_args) Label(frame_args,text='Command arguments').pack(side=LEFT) self.entry_arg.pack(expand=1,fill=X) frame_output=Frame(self.master) ys=Scrollbar(frame_output,orient=VERTICAL) self.text_out=Text(frame_output,yscrollcommand=ys.set,width=90,height=20) ys['command']=self.text_out.yview ys.pack(fill=Y,side=LEFT) self.text_out.pack(side=LEFT,expand=1,fill=BOTH) # pack root children: frame_cmds1.pack(fill=X) frame_cmds2.pack(fill=X) frame_cmds.pack(fill=X) frame_args.pack(fill=X) frame_output.pack(fill=BOTH,expand=1) def __init__(self, master=None): self.master = master self.createWidgets() def main(argv): Pyro.core.initClient() root=Tk() root.title('xnsc - Pyro Name Server control tool') app=xnscFrame(root) root.protocol('WM_DELETE_WINDOW',root.quit) root.mainloop()