# -*- test-case-name: twisted.test.test_trial -*- # Twisted, the Framework of Your Internet # Copyright (C) 2001-2003 Matthew W. Lefkowitz # # This library is free software; you can redistribute it and/or # modify it under the terms of version 2.1 of the GNU Lesser General Public # License as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # -*- test-case-name: twisted.test.test_trial -*- import traceback from twisted.python import components, failure from twisted.internet import interfaces # Methods in this list will be omitted from a failed test's traceback if # they are the final frame. _failureConditionals = [ 'fail', 'failIf', 'failUnless', 'failUnlessRaises', 'failUnlessEqual', 'failUnlessIdentical', 'failIfEqual', 'assertApproximates'] def reactorCleanUp(): from twisted.internet import reactor reactor.iterate() # flush short-range timers pending = reactor.getDelayedCalls() if pending: msg = "\npendingTimedCalls still pending:\n" for p in pending: msg += " %s\n" % p from warnings import warn warn(msg) for p in pending: p.cancel() # delete the rest reactor.iterate() # flush them raise unittest.FailTest, msg if components.implements(reactor, interfaces.IReactorThreads): reactor.suggestThreadPoolSize(0) if hasattr(reactor, 'threadpool') and reactor.threadpool: reactor.threadpool.stop() reactor.threadpool = None def isTestClass(testClass): return issubclass(testClass, unittest.TestCase) def isTestCase(testCase): return isinstance(testCase, unittest.TestCase) def _append(result, lst): lst.append(result) def _getDeferredResult(d, timeout=None): from twisted.internet import reactor if timeout is not None: d.setTimeout(timeout) resultSet = [] d.addBoth(_append, resultSet) while not resultSet: reactor.iterate() return resultSet[0] def deferredResult(d, timeout=None): """Waits for a Deferred to arrive, then returns or throws an exception, based on the result. """ result = _getDeferredResult(d, timeout) if isinstance(result, failure.Failure): raise result else: return result def wait(d, timeout=10): """This function is unstable. Waits (spins the reactor) for a Deferred to arrive, then returns or throws an exception, based on the result. The difference between this and deferredResult is that it actually throws the original exception, not the Failure, so synchronous exception handling is much more sane. """ result = _getDeferredResult(d, timeout) if isinstance(result, failure.Failure): if result.tb: raise result.value.__class__, result.value, result.tb raise result.value else: return result def deferredError(d, timeout=None): """Waits for deferred to fail, and it returns the Failure. If the deferred succeeds, raises FailTest. """ result = _getDeferredResult(d, timeout) if isinstance(result, failure.Failure): return result else: raise unittest.FailTest, "Deferred did not fail: %r" % (result,) def extract_tb(tb, limit=None): """Extract a list of frames from a traceback, without unittest internals. Functionally identical to L{traceback.extract_tb}, but cropped to just the test case itself, excluding frames that are part of the Trial testing framework. """ l = traceback.extract_tb(tb, limit) util_file = __file__.replace('.pyc','.py') unittest_file = unittest.__file__.replace('.pyc','.py') runner_file = runner.__file__.replace('.pyc','.py') framework = [(unittest_file, '_runPhase'), # Tester._runPhase (unittest_file, '_main'), # Tester._main (runner_file, 'runTest'), # [ITestRunner].runTest ] # filename, line, funcname, sourcetext while (l[0][0], l[0][2]) in framework: del l[0] if (l[-1][0] == unittest_file) and (l[-1][2] in _failureConditionals): del l[-1] return l def format_exception(eType, eValue, tb, limit=None): """A formatted traceback and exception, without exposing the framework. I am identical in function to L{traceback.format_exception}, but I screen out frames from the traceback that are part of the testing framework itself, leaving only the code being tested. """ result = [x.strip()+'\n' for x in failure.Failure(eValue,eType,tb).getBriefTraceback().split('\n')] return result # Only mess with tracebacks if they are from an explicitly failed # test. if eType != unittest.FailTest: return traceback.format_exception(eType, eValue, tb, limit) tb_list = extract_tb(tb, limit) l = ["Traceback (most recent call last):\n"] l.extend(traceback.format_list(tb_list)) l.extend(traceback.format_exception_only(eType, eValue)) return l # sibling imports, ugh. import unittest import runner