import os
import sys
import errno
import unittest
import time
from urllib import urlopen, urlencode
import sb_test_support
sb_test_support.fix_sys_path()
try:
True;False
except NameError: # 2.2 compat
True=(None is None);
False=not True
import sb_server
from spambayes.Options import options
default_shutdown_port = options["html_ui", "port"]
verbose = 0
def call_web_function(url, **kw):
got = urlopen(url, urlencode(kw)).read()
# Very simple - just look for tracebacks
if got.find("Traceback (most recent call last)")>=0:
print "FAILED calling URL", url
print got
raise AssertionError, "Opening URL %s appeared to fail" % (url,)
class Spawner:
def __init__(self, test_case, spawn_args):
self.test_case = test_case
self.spawn_args = spawn_args
# If the command is a .py file, insert an executable.
if os.path.splitext(self.spawn_args[0])[1]=='.py':
self.spawn_args.insert(0, sys.executable)
self.pid = None
def _spawn(self, args):
return os.spawnv(os.P_NOWAIT, self.spawn_args[0], self.spawn_args)
def start(self):
raise NotImplementedError
def stop(self):
raise NotImplementedError
def is_running(self):
if self.pid is None:
return False
# damn - we could implement os.waitpid correctly
# using the win32api - but as at 2.3, you can't check
# if a pid is still running with os.waitpid
if sys.platform.startswith("win32"):
import win32process # sorry, ya gotta have win32all to run tests
import win32con
try:
rc = win32process.GetExitCodeProcess(self.pid)
result = rc==win32con.STILL_ACTIVE
except win32process.error:
result = False
else:
try:
os.waitpid(self.pid, os.WNOHANG)
result = True
except os.error, details:
if details.errno == errno.ECHILD:
result = False
# other exceptions invalid?
raise
# Wait a few seconds for the global mutex to catch up
for i in range(20):
time.sleep(0.25)
if result==is_any_sb_server_running():
break
# Check the platform agrees (could do xor, but even I wont be able
# to read it in a few weeks <wink>
if result:
self.test_case.failUnless(is_any_sb_server_running(),
"My server stopped, but global server mutex held")
else:
self.test_case.failUnless(not is_any_sb_server_running(),
"My server running, but no global server mutex held")
return result
class Spawner_sb_server(Spawner):
def __init__(self, test_case, args, shutdown_port = default_shutdown_port):
self.shutdown_port = shutdown_port
f = sb_server.__file__
if f.endswith(".pyc") or f.endswith(".pyo"):
f = f[:-1]
Spawner.__init__(self, test_case, [f]+args)
def start(self):
self.test_case.failUnless(not is_any_sb_server_running(),
"Should be no server running")
if verbose > 1:
print "Spawning", self.spawn_args
self.pid = self._spawn(self.spawn_args)
# wait for it to start - 5 secs, 0.25 per check
for i in range(20):
time.sleep(0.25)
if verbose > 1:
print "Waiting for start flags: running=%s, global_mutex=%s" \
% (self.is_running(), is_any_sb_server_running())
if self.is_running() and is_any_sb_server_running():
return
# gave up waiting.
self.test_case.fail("sb_server appeared to not start")
def stop(self):
# Copied from sb_server.stop()
# Shutdown as though through the web UI. This will save the DB, allow
# any open proxy connections to complete, etc.
call_web_function('http://localhost:%d/save' % self.shutdown_port,
how='Save & shutdown')
# wait for it to stop - 5 secs, 0.25 per check
for i in range(20):
time.sleep(0.25)
if not self.is_running() and not is_any_sb_server_running():
# stopped - check the exit code
temp_pid, rc = os.waitpid(self.pid, 0)
if rc:
self.test_case.fail("sb_server returned exit code %s" % rc)
return
# gave up waiting.
self.test_case.fail("sb_server appeared to not stop")
def is_any_sb_server_running():
# reach into sb_server internals, as it is authoritative (sometimes <wink>)
try:
mutex = sb_server.open_platform_mutex()
sb_server.close_platform_mutex(mutex)
return False
except sb_server.AlreadyRunningException:
return True
class TestServer(unittest.TestCase):
def setUp(self):
self.failUnless(not is_any_sb_server_running(),
"Can't do sb_server tests while a server is running "\
"(platform mutex held)")
def tearDown(self):
# If we cause failure here, we mask the underlying error which left
# the server running - so just print the warning.
if is_any_sb_server_running():
print "WARNING:", self, "completed with the platform mutex held"
def _start_spawner(self, spawner):
self.failUnless(not spawner.is_running(),
"this spawneer can't be running")
spawner.start()
self.failUnless(spawner.is_running(),
"this spawner must be running after successful start")
self.failUnless(is_any_sb_server_running(),
"Platform mutex not held after starting")
def _stop_spawner(self, spawner):
self.failUnless(spawner.is_running(), "must be running to stop")
self.failUnless(is_any_sb_server_running(),
"Platform mutex must be held to stop")
spawner.stop()
self.failUnless(not spawner.is_running(), "didn't stop after stop")
self.failUnless(not is_any_sb_server_running(),
"Platform mutex still help after stop")
def test_sb_server_default(self):
# Should be using the default port from the options file.
from spambayes.Options import options
port = options["html_ui", "port"]
s = Spawner_sb_server(self, [])
self._start_spawner(s)
self._stop_spawner(s)
def test_sb_server_ui_port(self):
# Should be using the default port from the options file.
s = Spawner_sb_server(self, ["-u8899"], 8899)
self._start_spawner(s)
self._stop_spawner(s)
def test_sb_server_restore(self):
# Make sure we can do a restore defaults and shutdown without incident.
from spambayes.Options import options
port = options["html_ui", "port"]
s = Spawner_sb_server(self, [], shutdown_port=port)
self._start_spawner(s)
# do the reload
call_web_function('http://localhost:%d/restoredefaults' % port, how='')
self._stop_spawner(s)
if sys.platform.startswith("win"):
import win32service # You need win32all to run the tests!
import win32serviceutil
import winerror
service_name = "pop3proxy"
class TestService(unittest.TestCase):
def setUp(self):
try:
win32serviceutil.QueryServiceStatus(service_name)
except win32service.error, details:
if details[0]==winerror.ERROR_SERVICE_DOES_NOT_EXIST:
self.was_installed = False
raise
else:
self.was_installed = True
self.failUnless(not is_any_sb_server_running(),
"Can't do service tests while a server is running "\
"(platform mutex held)")
def tearDown(self):
if is_any_sb_server_running():
print "WARNING:", self, "completed with the platform mutex held"
def _start_service(self):
win32serviceutil.StartService(service_name)
for i in range(10):
time.sleep(0.5)
status = win32serviceutil.QueryServiceStatus(service_name)
if status[1] == win32service.SERVICE_RUNNING:
break
if verbose > 1:
print "Service status is %d - still waiting" % status[1]
else:
self.fail("Gave up waiting for service to start")
def _stop_service(self):
# StopServiceWithDeps checks the status of each service as it
# stops it, which is exactly what we want here.
win32serviceutil.StopServiceWithDeps(service_name)
def test_simple_startstop(self):
self._start_service()
self._stop_service()
def test_remote_shutdown(self):
self._start_service()
# Should be using the default port from the options file.
from spambayes.Options import options
port = options["html_ui", "port"]
call_web_function ('http://localhost:%d/save' % port,
how='Save & shutdown')
# wait for it to stop - 5 secs, 0.25 per check
for i in range(10):
time.sleep(0.5)
status = win32serviceutil.QueryServiceStatus(service_name)
if status[1] == win32service.SERVICE_STOPPED:
break
else:
self.fail("Gave up waiting for service to stop")
self.failUnless(not is_any_sb_server_running(),
"Should be no platform mutex held after stopping")
if __name__=='__main__':
sb_test_support.unittest_main()
syntax highlighted by Code2HTML, v. 0.9.1