# -*- test-case-name: twisted.test.test_trial -*-
# Twisted, the Framework of Your Internet
# Copyright (C) 2001-2003 Matthew W. Lefkowitz
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# -*- test-case-name: twisted.test.test_trial -*-
import traceback
from twisted.python import components, failure
from twisted.internet import interfaces
# Methods in this list will be omitted from a failed test's traceback if
# they are the final frame.
_failureConditionals = [
'fail', 'failIf', 'failUnless', 'failUnlessRaises', 'failUnlessEqual',
'failUnlessIdentical', 'failIfEqual', 'assertApproximates']
def reactorCleanUp():
from twisted.internet import reactor
reactor.iterate() # flush short-range timers
pending = reactor.getDelayedCalls()
if pending:
msg = "\npendingTimedCalls still pending:\n"
for p in pending:
msg += " %s\n" % p
from warnings import warn
warn(msg)
for p in pending: p.cancel() # delete the rest
reactor.iterate() # flush them
raise unittest.FailTest, msg
if components.implements(reactor, interfaces.IReactorThreads):
reactor.suggestThreadPoolSize(0)
if hasattr(reactor, 'threadpool') and reactor.threadpool:
reactor.threadpool.stop()
reactor.threadpool = None
def isTestClass(testClass):
return issubclass(testClass, unittest.TestCase)
def isTestCase(testCase):
return isinstance(testCase, unittest.TestCase)
def _append(result, lst):
lst.append(result)
def _getDeferredResult(d, timeout=None):
from twisted.internet import reactor
if timeout is not None:
d.setTimeout(timeout)
resultSet = []
d.addBoth(_append, resultSet)
while not resultSet:
reactor.iterate()
return resultSet[0]
def deferredResult(d, timeout=None):
"""Waits for a Deferred to arrive, then returns or throws an exception,
based on the result.
"""
result = _getDeferredResult(d, timeout)
if isinstance(result, failure.Failure):
raise result
else:
return result
def wait(d, timeout=10):
"""This function is unstable.
Waits (spins the reactor) for a Deferred to arrive, then returns
or throws an exception, based on the result. The difference
between this and deferredResult is that it actually throws the
original exception, not the Failure, so synchronous exception
handling is much more sane.
"""
result = _getDeferredResult(d, timeout)
if isinstance(result, failure.Failure):
if result.tb:
raise result.value.__class__, result.value, result.tb
raise result.value
else:
return result
def deferredError(d, timeout=None):
"""Waits for deferred to fail, and it returns the Failure.
If the deferred succeeds, raises FailTest.
"""
result = _getDeferredResult(d, timeout)
if isinstance(result, failure.Failure):
return result
else:
raise unittest.FailTest, "Deferred did not fail: %r" % (result,)
def extract_tb(tb, limit=None):
"""Extract a list of frames from a traceback, without unittest internals.
Functionally identical to L{traceback.extract_tb}, but cropped to just
the test case itself, excluding frames that are part of the Trial
testing framework.
"""
l = traceback.extract_tb(tb, limit)
util_file = __file__.replace('.pyc','.py')
unittest_file = unittest.__file__.replace('.pyc','.py')
runner_file = runner.__file__.replace('.pyc','.py')
framework = [(unittest_file, '_runPhase'), # Tester._runPhase
(unittest_file, '_main'), # Tester._main
(runner_file, 'runTest'), # [ITestRunner].runTest
]
# filename, line, funcname, sourcetext
while (l[0][0], l[0][2]) in framework:
del l[0]
if (l[-1][0] == unittest_file) and (l[-1][2] in _failureConditionals):
del l[-1]
return l
def format_exception(eType, eValue, tb, limit=None):
"""A formatted traceback and exception, without exposing the framework.
I am identical in function to L{traceback.format_exception},
but I screen out frames from the traceback that are part of
the testing framework itself, leaving only the code being tested.
"""
result = [x.strip()+'\n' for x in
failure.Failure(eValue,eType,tb).getBriefTraceback().split('\n')]
return result
# Only mess with tracebacks if they are from an explicitly failed
# test.
if eType != unittest.FailTest:
return traceback.format_exception(eType, eValue, tb, limit)
tb_list = extract_tb(tb, limit)
l = ["Traceback (most recent call last):\n"]
l.extend(traceback.format_list(tb_list))
l.extend(traceback.format_exception_only(eType, eValue))
return l
# sibling imports, ugh.
import unittest
import runner
syntax highlighted by Code2HTML, v. 0.9.1