import os import sys import errno import unittest import time from urllib import urlopen, urlencode import sb_test_support sb_test_support.fix_sys_path() try: True;False except NameError: # 2.2 compat True=(None is None); False=not True import sb_server from spambayes.Options import options default_shutdown_port = options["html_ui", "port"] verbose = 0 def call_web_function(url, **kw): got = urlopen(url, urlencode(kw)).read() # Very simple - just look for tracebacks if got.find("Traceback (most recent call last)")>=0: print "FAILED calling URL", url print got raise AssertionError, "Opening URL %s appeared to fail" % (url,) class Spawner: def __init__(self, test_case, spawn_args): self.test_case = test_case self.spawn_args = spawn_args # If the command is a .py file, insert an executable. if os.path.splitext(self.spawn_args[0])[1]=='.py': self.spawn_args.insert(0, sys.executable) self.pid = None def _spawn(self, args): return os.spawnv(os.P_NOWAIT, self.spawn_args[0], self.spawn_args) def start(self): raise NotImplementedError def stop(self): raise NotImplementedError def is_running(self): if self.pid is None: return False # damn - we could implement os.waitpid correctly # using the win32api - but as at 2.3, you can't check # if a pid is still running with os.waitpid if sys.platform.startswith("win32"): import win32process # sorry, ya gotta have win32all to run tests import win32con try: rc = win32process.GetExitCodeProcess(self.pid) result = rc==win32con.STILL_ACTIVE except win32process.error: result = False else: try: os.waitpid(self.pid, os.WNOHANG) result = True except os.error, details: if details.errno == errno.ECHILD: result = False # other exceptions invalid? raise # Wait a few seconds for the global mutex to catch up for i in range(20): time.sleep(0.25) if result==is_any_sb_server_running(): break # Check the platform agrees (could do xor, but even I wont be able # to read it in a few weeks if result: self.test_case.failUnless(is_any_sb_server_running(), "My server stopped, but global server mutex held") else: self.test_case.failUnless(not is_any_sb_server_running(), "My server running, but no global server mutex held") return result class Spawner_sb_server(Spawner): def __init__(self, test_case, args, shutdown_port = default_shutdown_port): self.shutdown_port = shutdown_port f = sb_server.__file__ if f.endswith(".pyc") or f.endswith(".pyo"): f = f[:-1] Spawner.__init__(self, test_case, [f]+args) def start(self): self.test_case.failUnless(not is_any_sb_server_running(), "Should be no server running") if verbose > 1: print "Spawning", self.spawn_args self.pid = self._spawn(self.spawn_args) # wait for it to start - 5 secs, 0.25 per check for i in range(20): time.sleep(0.25) if verbose > 1: print "Waiting for start flags: running=%s, global_mutex=%s" \ % (self.is_running(), is_any_sb_server_running()) if self.is_running() and is_any_sb_server_running(): return # gave up waiting. self.test_case.fail("sb_server appeared to not start") def stop(self): # Copied from sb_server.stop() # Shutdown as though through the web UI. This will save the DB, allow # any open proxy connections to complete, etc. call_web_function('http://localhost:%d/save' % self.shutdown_port, how='Save & shutdown') # wait for it to stop - 5 secs, 0.25 per check for i in range(20): time.sleep(0.25) if not self.is_running() and not is_any_sb_server_running(): # stopped - check the exit code temp_pid, rc = os.waitpid(self.pid, 0) if rc: self.test_case.fail("sb_server returned exit code %s" % rc) return # gave up waiting. self.test_case.fail("sb_server appeared to not stop") def is_any_sb_server_running(): # reach into sb_server internals, as it is authoritative (sometimes ) try: mutex = sb_server.open_platform_mutex() sb_server.close_platform_mutex(mutex) return False except sb_server.AlreadyRunningException: return True class TestServer(unittest.TestCase): def setUp(self): self.failUnless(not is_any_sb_server_running(), "Can't do sb_server tests while a server is running "\ "(platform mutex held)") def tearDown(self): # If we cause failure here, we mask the underlying error which left # the server running - so just print the warning. if is_any_sb_server_running(): print "WARNING:", self, "completed with the platform mutex held" def _start_spawner(self, spawner): self.failUnless(not spawner.is_running(), "this spawneer can't be running") spawner.start() self.failUnless(spawner.is_running(), "this spawner must be running after successful start") self.failUnless(is_any_sb_server_running(), "Platform mutex not held after starting") def _stop_spawner(self, spawner): self.failUnless(spawner.is_running(), "must be running to stop") self.failUnless(is_any_sb_server_running(), "Platform mutex must be held to stop") spawner.stop() self.failUnless(not spawner.is_running(), "didn't stop after stop") self.failUnless(not is_any_sb_server_running(), "Platform mutex still help after stop") def test_sb_server_default(self): # Should be using the default port from the options file. from spambayes.Options import options port = options["html_ui", "port"] s = Spawner_sb_server(self, []) self._start_spawner(s) self._stop_spawner(s) def test_sb_server_ui_port(self): # Should be using the default port from the options file. s = Spawner_sb_server(self, ["-u8899"], 8899) self._start_spawner(s) self._stop_spawner(s) def test_sb_server_restore(self): # Make sure we can do a restore defaults and shutdown without incident. from spambayes.Options import options port = options["html_ui", "port"] s = Spawner_sb_server(self, [], shutdown_port=port) self._start_spawner(s) # do the reload call_web_function('http://localhost:%d/restoredefaults' % port, how='') self._stop_spawner(s) if sys.platform.startswith("win"): import win32service # You need win32all to run the tests! import win32serviceutil import winerror service_name = "pop3proxy" class TestService(unittest.TestCase): def setUp(self): try: win32serviceutil.QueryServiceStatus(service_name) except win32service.error, details: if details[0]==winerror.ERROR_SERVICE_DOES_NOT_EXIST: self.was_installed = False raise else: self.was_installed = True self.failUnless(not is_any_sb_server_running(), "Can't do service tests while a server is running "\ "(platform mutex held)") def tearDown(self): if is_any_sb_server_running(): print "WARNING:", self, "completed with the platform mutex held" def _start_service(self): win32serviceutil.StartService(service_name) for i in range(10): time.sleep(0.5) status = win32serviceutil.QueryServiceStatus(service_name) if status[1] == win32service.SERVICE_RUNNING: break if verbose > 1: print "Service status is %d - still waiting" % status[1] else: self.fail("Gave up waiting for service to start") def _stop_service(self): # StopServiceWithDeps checks the status of each service as it # stops it, which is exactly what we want here. win32serviceutil.StopServiceWithDeps(service_name) def test_simple_startstop(self): self._start_service() self._stop_service() def test_remote_shutdown(self): self._start_service() # Should be using the default port from the options file. from spambayes.Options import options port = options["html_ui", "port"] call_web_function ('http://localhost:%d/save' % port, how='Save & shutdown') # wait for it to stop - 5 secs, 0.25 per check for i in range(10): time.sleep(0.5) status = win32serviceutil.QueryServiceStatus(service_name) if status[1] == win32service.SERVICE_STOPPED: break else: self.fail("Gave up waiting for service to stop") self.failUnless(not is_any_sb_server_running(), "Should be no platform mutex held after stopping") if __name__=='__main__': sb_test_support.unittest_main()