# Twisted, the Framework of Your Internet # Copyright (C) 2001-2003 Matthew W. Lefkowitz # # This library is free software; you can redistribute it and/or # modify it under the terms of version 2.1 of the GNU Lesser General Public # License as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # from twisted.trial import unittest from twisted.application import service, compat, internet, app from twisted.persisted import sob from twisted.python import components from twisted.python import log from twisted.python.runtime import platformType from twisted.internet import utils, interfaces, defer from twisted.protocols import wire, basic from twisted.internet import protocol, reactor import copy, os, pickle, sys class Dummy: processName=None class TestService(unittest.TestCase): def testName(self): s = service.Service() s.setName("hello") self.failUnlessEqual(s.name, "hello") def testParent(self): s = service.Service() p = service.MultiService() s.setServiceParent(p) self.failUnlessEqual(list(p), [s]) self.failUnlessEqual(s.parent, p) def testApplicationAsParent(self): s = service.Service() p = service.Application("") s.setServiceParent(p) self.failUnlessEqual(list(service.IServiceCollection(p)), [s]) self.failUnlessEqual(s.parent, service.IServiceCollection(p)) def testNamedChild(self): s = service.Service() p = service.MultiService() s.setName("hello") s.setServiceParent(p) self.failUnlessEqual(list(p), [s]) self.failUnlessEqual(s.parent, p) self.failUnlessEqual(p.getServiceNamed("hello"), s) def testDoublyNamedChild(self): s = service.Service() p = service.MultiService() s.setName("hello") s.setServiceParent(p) self.failUnlessRaises(RuntimeError, s.setName, "lala") def testDuplicateNamedChild(self): s = service.Service() p = service.MultiService() s.setName("hello") s.setServiceParent(p) s = service.Service() s.setName("hello") self.failUnlessRaises(RuntimeError, s.setServiceParent, p) def testDisowning(self): s = service.Service() p = service.MultiService() s.setServiceParent(p) self.failUnlessEqual(list(p), [s]) self.failUnlessEqual(s.parent, p) s.disownServiceParent() self.failUnlessEqual(list(p), []) self.failUnlessEqual(s.parent, None) def testRunning(self): s = service.Service() self.assert_(not s.running) s.startService() self.assert_(s.running) s.stopService() self.assert_(not s.running) def testRunningChildren(self): s = service.Service() p = service.MultiService() s.setServiceParent(p) self.assert_(not s.running) self.assert_(not p.running) p.startService() self.assert_(s.running) self.assert_(p.running) p.stopService() self.assert_(not s.running) self.assert_(not p.running) def testRunningChildren(self): s = service.Service() def checkRunning(): self.assert_(s.running) t = service.Service() t.stopService = checkRunning t.startService = checkRunning p = service.MultiService() s.setServiceParent(p) t.setServiceParent(p) p.startService() p.stopService() def testAddingIntoRunning(self): p = service.MultiService() p.startService() s = service.Service() self.assert_(not s.running) s.setServiceParent(p) self.assert_(s.running) s.disownServiceParent() self.assert_(not s.running) def testPrivileged(self): s = service.Service() def pss(): s.privilegedStarted = 1 s.privilegedStartService = pss s1 = service.Service() p = service.MultiService() s.setServiceParent(p) s1.setServiceParent(p) p.privilegedStartService() self.assert_(s.privilegedStarted) def testCopying(self): s = service.Service() s.startService() s1 = copy.copy(s) self.assert_(not s1.running) self.assert_(s.running) if hasattr(os, "getuid"): curuid = os.getuid() curgid = os.getgid() else: curuid = curgid = 0 class TestProcess(unittest.TestCase): def testID(self): p = service.Process(5, 6) self.assertEqual(p.uid, 5) self.assertEqual(p.gid, 6) def testDefaults(self): p = service.Process(5) self.assertEqual(p.uid, 5) self.assertEqual(p.gid, curgid) p = service.Process(gid=5) self.assertEqual(p.uid, curuid) self.assertEqual(p.gid, 5) p = service.Process() self.assertEqual(p.uid, curuid) self.assertEqual(p.gid, curgid) def testProcessName(self): p = service.Process() self.assertEqual(p.processName, None) p.processName = 'hello' self.assertEqual(p.processName, 'hello') class TestInterfaces(unittest.TestCase): def testService(self): self.assert_(components.implements(service.Service(), service.IService)) def testMultiService(self): self.assert_(components.implements(service.MultiService(), service.IService)) self.assert_(components.implements(service.MultiService(), service.IServiceCollection)) def testProcess(self): self.assert_(components.implements(service.Process(), service.IProcess)) class TestApplication(unittest.TestCase): def testConstructor(self): service.Application("hello") service.Application("hello", 5) service.Application("hello", 5, 6) def testProcessComponent(self): a = service.Application("hello") self.assertEqual(service.IProcess(a).uid, curuid) self.assertEqual(service.IProcess(a).gid, curgid) a = service.Application("hello", 5) self.assertEqual(service.IProcess(a).uid, 5) self.assertEqual(service.IProcess(a).gid, curgid) a = service.Application("hello", 5, 6) self.assertEqual(service.IProcess(a).uid, 5) self.assertEqual(service.IProcess(a).gid, 6) def testServiceComponent(self): a = service.Application("hello") self.assert_(service.IService(a) is service.IServiceCollection(a)) self.assertEqual(service.IService(a).name, "hello") self.assertEqual(service.IService(a).parent, None) def testPersistableComponent(self): a = service.Application("hello") p = sob.IPersistable(a) self.assertEqual(p.style, 'pickle') self.assertEqual(p.name, 'hello') self.assert_(p.original is a) class TestLoading(unittest.TestCase): def test_simpleStoreAndLoad(self): a = service.Application("hello") p = sob.IPersistable(a) for style in 'xml source pickle'.split(): p.setStyle(style) p.save() a1 = service.loadApplication("hello.ta"+style[0], style) self.assertEqual(service.IService(a1).name, "hello") open("hello.tac", 'w').writelines([ "from twisted.application import service\n", "application = service.Application('hello')\n", ]) a1 = service.loadApplication("hello.tac", 'python') self.assertEqual(service.IService(a1).name, "hello") def test_implicitConversion(self): a = Dummy() a.__dict__ = {'udpConnectors': [], 'unixConnectors': [], '_listenerDict': {}, 'name': 'dummy', 'sslConnectors': [], 'unixPorts': [], '_extraListeners': {}, 'sslPorts': [], 'tcpPorts': [], 'services': {}, 'gid': 0, 'tcpConnectors': [], 'extraConnectors': [], 'udpPorts': [], 'extraPorts': [], 'uid': 0} pickle.dump(a, open("file.tap", 'wb')) a1 = service.loadApplication("file.tap", "pickle", None) self.assertEqual(service.IService(a1).name, "dummy") self.assertEqual(list(service.IServiceCollection(a1)), []) class TestAppSupport(unittest.TestCase): def testPassphrase(self): self.assertEqual(app.getPassphrase(0), None) def testLoadApplication(self): a = service.Application("hello") baseconfig = {'file': None, 'xml': None, 'source': None, 'python':None} for style in 'source xml pickle'.split(): config = baseconfig.copy() config[{'pickle': 'file'}.get(style, style)] = 'helloapplication' sob.IPersistable(a).setStyle(style) sob.IPersistable(a).save(filename='helloapplication') a1 = app.getApplication(config, None) self.assertEqual(service.IService(a1).name, "hello") config = baseconfig.copy() config['python'] = 'helloapplication' open("helloapplication", 'w').writelines([ "from twisted.application import service\n", "application = service.Application('hello')\n", ]) a1 = app.getApplication(config, None) self.assertEqual(service.IService(a1).name, "hello") def test_convertStyle(self): appl = service.Application("lala") for instyle in 'xml source pickle'.split(): for outstyle in 'xml source pickle'.split(): sob.IPersistable(appl).setStyle(instyle) sob.IPersistable(appl).save(filename="converttest") app.convertStyle("converttest", instyle, None, "converttest.out", outstyle, 0) appl2 = service.loadApplication("converttest.out", outstyle) self.assertEqual(service.IService(appl2).name, "lala") def test_getLogFile(self): os.mkdir("logfiledir") l = app.getLogFile(os.path.join("logfiledir", "lala")) self.assertEqual(l.path, os.path.abspath(os.path.join("logfiledir", "lala"))) self.assertEqual(l.name, "lala") self.assertEqual(l.directory, os.path.abspath("logfiledir")) def test_startApplication(self): appl = service.Application("lala") app.startApplication(appl, 0) self.assert_(service.IService(appl).running) class TestInternet(unittest.TestCase): def testUNIX(self): if not components.implements(reactor, interfaces.IReactorUNIX): raise unittest.SkipTest, "This reactor does not support UNIX domain sockets" s = service.MultiService() c = compat.IOldApplication(s) factory = protocol.ServerFactory() factory.protocol = wire.Echo if os.path.exists('.hello.skt'): os.remove('hello.skt') c.listenUNIX('./hello.skt', factory) class Foo(basic.LineReceiver): def connectionMade(self): self.transport.write('lalala\r\n') def lineReceived(self, line): self.factory.line = line factory = protocol.ClientFactory() factory.protocol = Foo factory.line = None c.connectUNIX('./hello.skt', factory) s.privilegedStartService() s.startService() while factory.line is None: reactor.iterate(0.1) s.stopService() self.assertEqual(factory.line, 'lalala') def testTCP(self): s = service.MultiService() c = compat.IOldApplication(s) factory = protocol.ServerFactory() factory.protocol = wire.Echo c.listenTCP(0, factory) s.privilegedStartService() s.startService() num = list(s)[0]._port.getHost()[2] class Foo(basic.LineReceiver): def connectionMade(self): self.transport.write('lalala\r\n') def lineReceived(self, line): self.factory.line = line factory = protocol.ClientFactory() factory.protocol = Foo factory.line = None c.connectTCP('localhost', num, factory) while factory.line is None: reactor.iterate(0.1) s.stopService() self.assertEqual(factory.line, 'lalala') def testCalling(self): s = service.MultiService() c = compat.IOldApplication(s) c.listenWith(None) self.assertEqual(list(s)[0].args[0], None) c.listenTCP(None, None) self.assertEqual(list(s)[1].args[:2], (None,)*2) c.listenSSL(None, None, None) self.assertEqual(list(s)[2].args[:3], (None,)*3) c.listenUDP(None, None) self.assertEqual(list(s)[3].args[:2], (None,)*2) c.listenUNIX(None, None) self.assertEqual(list(s)[4].args[:2], (None,)*2) for ch in s: self.assert_(ch.privileged) c.connectWith(None) self.assertEqual(list(s)[5].args[0], None) c.connectTCP(None, None, None) self.assertEqual(list(s)[6].args[:3], (None,)*3) c.connectSSL(None, None, None, None) self.assertEqual(list(s)[7].args[:4], (None,)*4) c.connectUDP(None, None, None) self.assertEqual(list(s)[8].args[:3], (None,)*3) c.connectUNIX(None, None) self.assertEqual(list(s)[9].args[:2], (None,)*2) self.assertEqual(len(list(s)), 10) for ch in s: self.failIf(ch.kwargs) self.assertEqual(ch.name, None) def testUnlistenersCallable(self): s = service.MultiService() c = compat.IOldApplication(s) self.assert_(callable(c.unlistenTCP)) self.assert_(callable(c.unlistenUNIX)) self.assert_(callable(c.unlistenUDP)) self.assert_(callable(c.unlistenSSL)) def testServices(self): s = service.MultiService() c = compat.IOldApplication(s) ch = service.Service() ch.setName("lala") ch.setServiceParent(c) self.assertEqual(c.getServiceNamed("lala"), ch) ch.disownServiceParent() self.assertEqual(list(s), []) def testInterface(self): s = service.MultiService() c = compat.IOldApplication(s) for key in compat.IOldApplication.__dict__.keys(): if callable(getattr(compat.IOldApplication, key)): self.assert_(callable(getattr(c, key))) class DummyApp: processName = None def addService(self, service): self.services[service.name] = service def removeService(self, service): del self.services[service.name] class TestConvert(unittest.TestCase): def testSimpleInternet(self): s = "(dp0\nS'udpConnectors'\np1\n(lp2\nsS'unixConnectors'\np3\n(lp4\nsS'twisted.internet.app.Application.persistenceVersion'\np5\nI12\nsS'name'\np6\nS'web'\np7\nsS'sslConnectors'\np8\n(lp9\nsS'sslPorts'\np10\n(lp11\nsS'tcpPorts'\np12\n(lp13\n(I8080\n(itwisted.web.server\nSite\np14\n(dp16\nS'resource'\np17\n(itwisted.web.test\nTest\np18\n(dp19\nS'files'\np20\n(lp21\nsS'paths'\np22\n(dp23\nsS'tmpl'\np24\n(lp25\nS'\\n Congratulations, twisted.web appears to work!\\n