# -*- test-case-name: twisted.test.test_trial -*-
#
# Twisted, the Framework of Your Internet
# Copyright (C) 2001 Matthew W. Lefkowitz
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
# FIXME
# - Hangs.
from twisted.python import usage, reflect, failure
from twisted.trial import unittest, util, reporter as reps
from twisted.application import app
import sys, os, types, inspect
import re
class Options(usage.Options):
synopsis = """%s [options] [[file|package|module|TestCase|testmethod]...]
""" % (os.path.basename(sys.argv[0]),)
optFlags = [["help", "h"],
["text", "t", "Text mode (ignored)"],
["verbose", "v", "Verbose output"],
["timing", None, "Timing output"],
["bwverbose", "o", "Colorless verbose output"],
["jelly", "j", "Jelly (machine-readable) output"],
["summary", "s", "summary output"],
["debug", "b", "Run tests in the Python debugger. Will load '.pdbrc' from current directory if it exists."],
["profile", None, "Run tests under the Python profiler"],
["benchmark", None, "Run performance tests instead of unit tests."],
["until-failure", "u", "Repeat test until it fails"],
["recurse", "R", "Search packages recursively"]]
optParameters = [["reactor", "r", None,
"Which reactor to use out of: " + \
", ".join(app.reactorTypes.keys()) + "."],
["logfile", "l", "test.log", "log file name"],
["random", "z", None,
"Run tests in random order using the specified seed"],
]
tracer = None
def __init__(self):
usage.Options.__init__(self)
self['modules'] = []
self['packages'] = []
self['testcases'] = []
self['methods'] = []
self['_couldNotImport'] = {}
def opt_coverage(self, coverdir):
"""Generate coverage information in the given directory
(relative to _trial_temp). Requires Python 2.3.3."""
import trace
# WOO MONKEY PATCH
def find_executable_linenos(filename):
"""Return dict where keys are line numbers in the line number table."""
#assert filename.endswith('.py') # YOU BASTARDS
try:
prog = open(filename).read()
except IOError, err:
print >> sys.stderr, ("Not printing coverage data for %r: %s"
% (filename, err))
return {}
code = compile(prog, filename, "exec")
strs = trace.find_strings(filename)
return trace.find_lines(code, strs)
#kaching!
trace.find_executable_linenos = find_executable_linenos
#countfile = abs(os.path.join('_trial_temp', 'coverage.count'))
self.coverdir = os.path.abspath(os.path.join('_trial_temp', coverdir))
self.tracer = trace.Trace(count=1, trace=0)#, infile=countfile, outfile=countfile)
sys.settrace(self.tracer.globaltrace)
def opt_reactor(self, reactorName):
# this must happen before parseArgs does lots of imports
app.installReactor(reactorName)
print "Using %s reactor" % app.reactorTypes[reactorName]
def opt_testmodule(self, file):
"Module to find a test case for"
# only look at the first two lines of the file. Try to behave as
# much like emacs local-variables scanner as is sensible
if not os.path.isfile(file):
return
# recognize twisted/test/test_foo.py, which is itself a test case
d,f = os.path.split(file)
if d == "twisted/test" and f.startswith("test_") and f.endswith(".py"):
self['modules'].append("twisted.test." + f[:-3])
return
f = open(file, "r")
lines = [f.readline(), f.readline()]
f.close()
m = []
for line in lines:
# insist upon -*- delimiters
res = re.search(r'-\*-(.*)-\*-', line)
if res:
# handle multiple variables
for var in res.group(1).split(";"):
bits = var.split(":")
# ignore malformed variables
if len(bits) == 2 and bits[0].strip() == "test-case-name":
for module in bits[1].split(","):
module = module.strip()
# avoid duplicates
if module not in self['modules']:
self['modules'].append(module)
def opt_module(self, module):
"Module to test"
self['modules'].append(module)
def opt_package(self, package):
"Package to test"
self['packages'].append(package)
def opt_testcase(self, case):
"TestCase to test"
self['testcases'].append(case)
def opt_file(self, filename):
"Filename of module to test"
from twisted.python import reflect
self['modules'].append(reflect.filenameToModuleName(filename))
def opt_method(self, method):
"Method to test"
self['methods'].append(method)
def opt_spew(self):
"""Print an insanely verbose log of everything that happens. Useful
when debugging freezes or locks in complex code."""
from twisted.python.util import spewer
sys.settrace(spewer)
def opt_disablegc(self):
"""Disable the garbage collector"""
import gc
gc.disable()
def opt_tbformat(self, opt):
"""Specify the format to display tracebacks with. Valid formats are 'plain' and 'emacs'."""
if opt not in ('plain', 'emacs'):
raise usage.UsageError("tbformat must be 'plain' or 'emacs'.")
self['tbformat'] = opt
opt_m = opt_module
opt_p = opt_package
opt_c = opt_testcase
opt_M = opt_method
opt_f = opt_file
# ["extra","x", None,
# "Add an extra argument. "
# "(This is a hack necessary for "
# "interfacing with emacs's `gud'.)" ]
extra = None
def opt_extra(self, arg):
"""
Add an extra argument. (This is a hack necessary for interfacing with
emacs's `gud'.)
"""
if self.extra is None:
self.extra = []
self.extra.append(arg)
opt_x = opt_extra
def parseArgs(self, *args):
if self.extra is not None:
args = list(args)
args.extend(self.extra)
for arg in args:
if (os.sep in arg):
# It's a file.
if not os.path.exists(arg):
import errno
raise IOError(errno.ENOENT, os.strerror(errno.ENOENT), arg)
if arg.endswith(os.sep) and (arg != os.sep):
arg = arg[:-len(os.sep)]
name = reflect.filenameToModuleName(arg)
if os.path.isdir(arg):
self['packages'].append(name)
else:
self['modules'].append(name)
continue
if arg.endswith('.py'):
# *Probably* a file.
if os.path.exists(arg):
arg = reflect.filenameToModuleName(arg)
self['modules'].append(arg)
continue
# a non-default reactor must have been installed by now: it
# imports the module, which installs a reactor
try:
arg = reflect.namedAny(arg)
except ValueError:
raise usage.UsageError, "Can't find anything named %r to run" % arg
except:
self['_couldNotImport'][arg] = failure.Failure()
continue
if inspect.ismodule(arg):
filename = os.path.basename(arg.__file__)
filename = os.path.splitext(filename)[0]
if filename == '__init__':
self['packages'].append(arg)
else:
self['modules'].append(arg)
elif inspect.isclass(arg):
self['testcases'].append(arg)
elif inspect.ismethod(arg):
self['methods'].append(arg)
else:
# Umm, seven?
self['methods'].append(arg)
def postOptions(self):
if self['random'] is not None:
try:
self['random'] = long(self['random'])
except ValueError:
raise usage.UsageError("Argument to --random must be a positive integer")
else:
if self['random'] < 0:
raise usage.UsageError("Argument to --random must be a positive integer")
elif self['random'] == 0:
import time
self['random'] = long(time.time() * 100)
if not self.has_key('tbformat'):
self['tbformat'] = 'plain'
def call_until_failure(reporter, callable, *args, **kwargs):
count = 1
print "Test Pass %d" % count
callable(*args, **kwargs)
while reporter.allPassed():
count += 1
print "Test Pass %d" % count
callable(*args, **kwargs)
def run():
if len(sys.argv) == 1:
sys.argv.append("--help")
config = Options()
try:
config.parseOptions()
except usage.error, ue:
print "%s: %s" % (sys.argv[0], ue)
os._exit(1)
reporter = reallyRun(config)
if config.tracer:
sys.settrace(None)
results = config.tracer.results()
results.write_results(show_missing=1, summary=False, coverdir=config.coverdir)
sys.exit(not reporter.allPassed())
def reallyRun(config):
# do this part of debug setup first for easy debugging of import failures
if config['debug']:
from twisted.internet import defer
from twisted.python import failure
defer.Deferred.debug = True
failure.startDebugMode()
suite = unittest.TestSuite(config['benchmark'])
suite.couldNotImport.update(config['_couldNotImport'])
if config['recurse']:
for package in config['packages']:
suite.addPackageRecursive(package)
else:
for package in config['packages']:
suite.addPackage(package)
for module in config['modules']:
suite.addModule(module)
for testcase in config['testcases']:
if type(testcase) is types.StringType:
case = reflect.namedObject(testcase)
else:
case = testcase
if type(case) is types.ClassType and util.isTestClass(case):
suite.addTestClass(case)
for testmethod in config['methods']:
suite.addMethod(testmethod)
testdir = os.path.abspath("_trial_temp")
if os.path.exists(testdir):
import shutil, random
try:
shutil.rmtree(testdir)
except OSError, e:
print "Error deleting:", e
os.rename(testdir, os.path.abspath("_trial_temp_old%s" % random.randint(0, 99999999)))
os.mkdir(testdir)
os.chdir(testdir)
if config['logfile']:
from twisted.python import log
# we should SEE deprecation warnings
def seeWarnings(x):
if x.has_key('warning'):
print
print x['format'] % x
log.addObserver(seeWarnings)
log.startLogging(open(config['logfile'], 'a'), 0)
tbformat = config['tbformat']
# XXX Yuck. We should just have a --reporter option. Then we could
# have a dict of {reportername: reporterclass}, look up
# config['reporter'] in it, and instantiate the result with the
# args.
if config['verbose']:
reporter = reps.TreeReporter(sys.stdout, tbformat)
elif config['bwverbose']:
reporter = reps.VerboseTextReporter(sys.stdout, tbformat)
elif config['summary']:
reporter = reps.MinimalReporter(sys.stdout)
elif config['jelly']:
import twisted.trial.remote
reporter = twisted.trial.remote.JellyReporter(sys.stdout)
elif config['timing']:
reporter = reps.TimingTextReporter(sys.stdout, tbformat)
else:
reporter = reps.TextReporter(sys.stdout, tbformat)
if config['debug']:
reporter.debugger = 1
import pdb
dbg = pdb.Pdb()
try:
rcFile = open("../.pdbrc")
except IOError:
hasattr(sys, 'exc_clear') and sys.exc_clear()
else:
dbg.rcLines.extend(rcFile.readlines())
if config['until-failure']:
call_until_failure(reporter,
dbg.run,
"suite.run(reporter, config['random'])",
globals(), locals())
else:
dbg.run("suite.run(reporter, config['random'])",
globals(), locals())
elif config['profile']:
if config['until-failure']:
raise RuntimeError, \
"you cannot use both --until-failure and --profile"
import profile
prof = profile.Profile()
try:
prof.runcall(suite.run, reporter, config['random'])
prof.dump_stats('profile.data')
except SystemExit:
pass
prof.print_stats()
else:
if config['until-failure']:
call_until_failure(reporter,
suite.run, reporter, config['random'])
else:
suite.run(reporter, config['random'])
return reporter
if __name__ == '__main__':
run()
syntax highlighted by Code2HTML, v. 0.9.1