# -*- test-case-name: twisted.test.test_trial -*- # # Twisted, the Framework of Your Internet # Copyright (C) 2001 Matthew W. Lefkowitz # # This library is free software; you can redistribute it and/or # modify it under the terms of version 2.1 of the GNU Lesser General Public # License as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # FIXME # - Hangs. from twisted.python import usage, reflect, failure from twisted.trial import unittest, util, reporter as reps from twisted.application import app import sys, os, types, inspect import re class Options(usage.Options): synopsis = """%s [options] [[file|package|module|TestCase|testmethod]...] """ % (os.path.basename(sys.argv[0]),) optFlags = [["help", "h"], ["text", "t", "Text mode (ignored)"], ["verbose", "v", "Verbose output"], ["timing", None, "Timing output"], ["bwverbose", "o", "Colorless verbose output"], ["jelly", "j", "Jelly (machine-readable) output"], ["summary", "s", "summary output"], ["debug", "b", "Run tests in the Python debugger. Will load '.pdbrc' from current directory if it exists."], ["profile", None, "Run tests under the Python profiler"], ["benchmark", None, "Run performance tests instead of unit tests."], ["until-failure", "u", "Repeat test until it fails"], ["recurse", "R", "Search packages recursively"]] optParameters = [["reactor", "r", None, "Which reactor to use out of: " + \ ", ".join(app.reactorTypes.keys()) + "."], ["logfile", "l", "test.log", "log file name"], ["random", "z", None, "Run tests in random order using the specified seed"], ] tracer = None def __init__(self): usage.Options.__init__(self) self['modules'] = [] self['packages'] = [] self['testcases'] = [] self['methods'] = [] self['_couldNotImport'] = {} def opt_coverage(self, coverdir): """Generate coverage information in the given directory (relative to _trial_temp). Requires Python 2.3.3.""" import trace # WOO MONKEY PATCH def find_executable_linenos(filename): """Return dict where keys are line numbers in the line number table.""" #assert filename.endswith('.py') # YOU BASTARDS try: prog = open(filename).read() except IOError, err: print >> sys.stderr, ("Not printing coverage data for %r: %s" % (filename, err)) return {} code = compile(prog, filename, "exec") strs = trace.find_strings(filename) return trace.find_lines(code, strs) #kaching! trace.find_executable_linenos = find_executable_linenos #countfile = abs(os.path.join('_trial_temp', 'coverage.count')) self.coverdir = os.path.abspath(os.path.join('_trial_temp', coverdir)) self.tracer = trace.Trace(count=1, trace=0)#, infile=countfile, outfile=countfile) sys.settrace(self.tracer.globaltrace) def opt_reactor(self, reactorName): # this must happen before parseArgs does lots of imports app.installReactor(reactorName) print "Using %s reactor" % app.reactorTypes[reactorName] def opt_testmodule(self, file): "Module to find a test case for" # only look at the first two lines of the file. Try to behave as # much like emacs local-variables scanner as is sensible if not os.path.isfile(file): return # recognize twisted/test/test_foo.py, which is itself a test case d,f = os.path.split(file) if d == "twisted/test" and f.startswith("test_") and f.endswith(".py"): self['modules'].append("twisted.test." + f[:-3]) return f = open(file, "r") lines = [f.readline(), f.readline()] f.close() m = [] for line in lines: # insist upon -*- delimiters res = re.search(r'-\*-(.*)-\*-', line) if res: # handle multiple variables for var in res.group(1).split(";"): bits = var.split(":") # ignore malformed variables if len(bits) == 2 and bits[0].strip() == "test-case-name": for module in bits[1].split(","): module = module.strip() # avoid duplicates if module not in self['modules']: self['modules'].append(module) def opt_module(self, module): "Module to test" self['modules'].append(module) def opt_package(self, package): "Package to test" self['packages'].append(package) def opt_testcase(self, case): "TestCase to test" self['testcases'].append(case) def opt_file(self, filename): "Filename of module to test" from twisted.python import reflect self['modules'].append(reflect.filenameToModuleName(filename)) def opt_method(self, method): "Method to test" self['methods'].append(method) def opt_spew(self): """Print an insanely verbose log of everything that happens. Useful when debugging freezes or locks in complex code.""" from twisted.python.util import spewer sys.settrace(spewer) def opt_disablegc(self): """Disable the garbage collector""" import gc gc.disable() def opt_tbformat(self, opt): """Specify the format to display tracebacks with. Valid formats are 'plain' and 'emacs'.""" if opt not in ('plain', 'emacs'): raise usage.UsageError("tbformat must be 'plain' or 'emacs'.") self['tbformat'] = opt opt_m = opt_module opt_p = opt_package opt_c = opt_testcase opt_M = opt_method opt_f = opt_file # ["extra","x", None, # "Add an extra argument. " # "(This is a hack necessary for " # "interfacing with emacs's `gud'.)" ] extra = None def opt_extra(self, arg): """ Add an extra argument. (This is a hack necessary for interfacing with emacs's `gud'.) """ if self.extra is None: self.extra = [] self.extra.append(arg) opt_x = opt_extra def parseArgs(self, *args): if self.extra is not None: args = list(args) args.extend(self.extra) for arg in args: if (os.sep in arg): # It's a file. if not os.path.exists(arg): import errno raise IOError(errno.ENOENT, os.strerror(errno.ENOENT), arg) if arg.endswith(os.sep) and (arg != os.sep): arg = arg[:-len(os.sep)] name = reflect.filenameToModuleName(arg) if os.path.isdir(arg): self['packages'].append(name) else: self['modules'].append(name) continue if arg.endswith('.py'): # *Probably* a file. if os.path.exists(arg): arg = reflect.filenameToModuleName(arg) self['modules'].append(arg) continue # a non-default reactor must have been installed by now: it # imports the module, which installs a reactor try: arg = reflect.namedAny(arg) except ValueError: raise usage.UsageError, "Can't find anything named %r to run" % arg except: self['_couldNotImport'][arg] = failure.Failure() continue if inspect.ismodule(arg): filename = os.path.basename(arg.__file__) filename = os.path.splitext(filename)[0] if filename == '__init__': self['packages'].append(arg) else: self['modules'].append(arg) elif inspect.isclass(arg): self['testcases'].append(arg) elif inspect.ismethod(arg): self['methods'].append(arg) else: # Umm, seven? self['methods'].append(arg) def postOptions(self): if self['random'] is not None: try: self['random'] = long(self['random']) except ValueError: raise usage.UsageError("Argument to --random must be a positive integer") else: if self['random'] < 0: raise usage.UsageError("Argument to --random must be a positive integer") elif self['random'] == 0: import time self['random'] = long(time.time() * 100) if not self.has_key('tbformat'): self['tbformat'] = 'plain' def call_until_failure(reporter, callable, *args, **kwargs): count = 1 print "Test Pass %d" % count callable(*args, **kwargs) while reporter.allPassed(): count += 1 print "Test Pass %d" % count callable(*args, **kwargs) def run(): if len(sys.argv) == 1: sys.argv.append("--help") config = Options() try: config.parseOptions() except usage.error, ue: print "%s: %s" % (sys.argv[0], ue) os._exit(1) reporter = reallyRun(config) if config.tracer: sys.settrace(None) results = config.tracer.results() results.write_results(show_missing=1, summary=False, coverdir=config.coverdir) sys.exit(not reporter.allPassed()) def reallyRun(config): # do this part of debug setup first for easy debugging of import failures if config['debug']: from twisted.internet import defer from twisted.python import failure defer.Deferred.debug = True failure.startDebugMode() suite = unittest.TestSuite(config['benchmark']) suite.couldNotImport.update(config['_couldNotImport']) if config['recurse']: for package in config['packages']: suite.addPackageRecursive(package) else: for package in config['packages']: suite.addPackage(package) for module in config['modules']: suite.addModule(module) for testcase in config['testcases']: if type(testcase) is types.StringType: case = reflect.namedObject(testcase) else: case = testcase if type(case) is types.ClassType and util.isTestClass(case): suite.addTestClass(case) for testmethod in config['methods']: suite.addMethod(testmethod) testdir = os.path.abspath("_trial_temp") if os.path.exists(testdir): import shutil, random try: shutil.rmtree(testdir) except OSError, e: print "Error deleting:", e os.rename(testdir, os.path.abspath("_trial_temp_old%s" % random.randint(0, 99999999))) os.mkdir(testdir) os.chdir(testdir) if config['logfile']: from twisted.python import log # we should SEE deprecation warnings def seeWarnings(x): if x.has_key('warning'): print print x['format'] % x log.addObserver(seeWarnings) log.startLogging(open(config['logfile'], 'a'), 0) tbformat = config['tbformat'] # XXX Yuck. We should just have a --reporter option. Then we could # have a dict of {reportername: reporterclass}, look up # config['reporter'] in it, and instantiate the result with the # args. if config['verbose']: reporter = reps.TreeReporter(sys.stdout, tbformat) elif config['bwverbose']: reporter = reps.VerboseTextReporter(sys.stdout, tbformat) elif config['summary']: reporter = reps.MinimalReporter(sys.stdout) elif config['jelly']: import twisted.trial.remote reporter = twisted.trial.remote.JellyReporter(sys.stdout) elif config['timing']: reporter = reps.TimingTextReporter(sys.stdout, tbformat) else: reporter = reps.TextReporter(sys.stdout, tbformat) if config['debug']: reporter.debugger = 1 import pdb dbg = pdb.Pdb() try: rcFile = open("../.pdbrc") except IOError: hasattr(sys, 'exc_clear') and sys.exc_clear() else: dbg.rcLines.extend(rcFile.readlines()) if config['until-failure']: call_until_failure(reporter, dbg.run, "suite.run(reporter, config['random'])", globals(), locals()) else: dbg.run("suite.run(reporter, config['random'])", globals(), locals()) elif config['profile']: if config['until-failure']: raise RuntimeError, \ "you cannot use both --until-failure and --profile" import profile prof = profile.Profile() try: prof.runcall(suite.run, reporter, config['random']) prof.dump_stats('profile.data') except SystemExit: pass prof.print_stats() else: if config['until-failure']: call_until_failure(reporter, suite.run, reporter, config['random']) else: suite.run(reporter, config['random']) return reporter if __name__ == '__main__': run()