# Twisted, the Framework of Your Internet
# Copyright (C) 2001 Matthew W. Lefkowitz
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Test cases for defer module.
"""
from __future__ import nested_scopes
from twisted.trial import unittest
from twisted.internet import reactor, defer
from twisted.python import failure, log
class GenericError(Exception): pass
class DeferredTestCase(unittest.TestCase):
def setUp(self):
self.callback_results = None
self.errback_results = None
self.callback2_results = None
def _callback(self, *args, **kw):
self.callback_results = args, kw
return args[0]
def _callback2(self, *args, **kw):
self.callback2_results = args, kw
def _errback(self, *args, **kw):
self.errback_results = args, kw
def testCallbackWithoutArgs(self):
deferred = defer.Deferred()
deferred.addCallback(self._callback)
deferred.callback("hello")
self.failUnlessEqual(self.errback_results, None)
self.failUnlessEqual(self.callback_results, (('hello',), {}))
def testCallbackWithArgs(self):
deferred = defer.Deferred()
deferred.addCallback(self._callback, "world")
deferred.callback("hello")
self.failUnlessEqual(self.errback_results, None)
self.failUnlessEqual(self.callback_results, (('hello', 'world'), {}))
def testCallbackWithKwArgs(self):
deferred = defer.Deferred()
deferred.addCallback(self._callback, world="world")
deferred.callback("hello")
self.failUnlessEqual(self.errback_results, None)
self.failUnlessEqual(self.callback_results,
(('hello',), {'world': 'world'}))
def testTwoCallbacks(self):
deferred = defer.Deferred()
deferred.addCallback(self._callback)
deferred.addCallback(self._callback2)
deferred.callback("hello")
self.failUnlessEqual(self.errback_results, None)
self.failUnlessEqual(self.callback_results,
(('hello',), {}))
self.failUnlessEqual(self.callback2_results,
(('hello',), {}))
def testDeferredList(self):
defr1 = defer.Deferred()
defr2 = defer.Deferred()
defr3 = defer.Deferred()
dl = defer.DeferredList([defr1, defr2, defr3])
result = []
def cb(resultList, result=result):
result.extend(resultList)
def catch(err):
return None
dl.addCallbacks(cb, cb)
defr1.callback("1")
defr2.addErrback(catch)
# "catch" is added to eat the GenericError that will be passed on by
# the DeferredList's callback on defr2. If left unhandled, the
# Failure object would cause a log.err() warning about "Unhandled
# error in Deferred". Twisted's pyunit watches for log.err calls and
# treats them as failures. So "catch" must eat the error to prevent
# it from flunking the test.
defr2.errback(GenericError("2"))
defr3.callback("3")
self.failUnlessEqual([result[0],
#result[1][1] is now a Failure instead of an Exception
(result[1][0], str(result[1][1].value)),
result[2]],
[(defer.SUCCESS, "1"),
(defer.FAILURE, "2"),
(defer.SUCCESS, "3")])
def testEmptyDeferredList(self):
result = []
def cb(resultList, result=result):
result.append(resultList)
dl = defer.DeferredList([])
dl.addCallbacks(cb)
self.failUnlessEqual(result, [[]])
defr1 = defer.Deferred()
dl.addDeferred(defr1)
defr1.callback(1)
self.failUnlessEqual(result, [[(1, 1)]])
defr2 = defer.Deferred()
dl.addDeferred(defr2)
defr2.callback(2)
self.failUnlessEqual(result, [[(1, 1), (1, 2)]])
result[:] = []
dl = defer.DeferredList([], fireOnOneCallback=1)
dl.addCallbacks(cb)
defr1 = defer.Deferred()
dl.addDeferred(defr1)
defr1.callback('a')
self.failUnlessEqual(result, [('a', 0)])
defr2 = defer.Deferred()
dl.addDeferred(defr2)
defr2.callback('b')
self.failUnlessEqual(result, [('a', 0)])
def testDeferredListFireOnOneError(self):
defr1 = defer.Deferred()
defr2 = defer.Deferred()
defr3 = defer.Deferred()
dl = defer.DeferredList([defr1, defr2, defr3], fireOnOneErrback=1)
result = []
def catch(err):
return None
dl.addErrback(result.append)
defr1.callback("1")
defr2.addErrback(catch)
defr2.errback(GenericError("2"))
self.failUnlessEqual([str(result[0].value[0].value),
str(result[0].value[1])],
["2", "1"])
def testDeferredListDontConsumeErrors(self):
d1 = defer.Deferred()
dl = defer.DeferredList([d1])
errorTrap = []
d1.addErrback(errorTrap.append)
result = []
dl.addCallback(result.append)
d1.errback(GenericError('Bang'))
self.failUnlessEqual('Bang', errorTrap[0].value.args[0])
self.failUnlessEqual(1, len(result))
self.failUnlessEqual('Bang', result[0][0][1].value.args[0])
def testDeferredListConsumeErrors(self):
d1 = defer.Deferred()
dl = defer.DeferredList([d1], consumeErrors=True)
errorTrap = []
d1.addErrback(errorTrap.append)
result = []
dl.addCallback(result.append)
d1.errback(GenericError('Bang'))
self.failUnlessEqual([], errorTrap)
self.failUnlessEqual(1, len(result))
self.failUnlessEqual('Bang', result[0][0][1].value.args[0])
def testDeferredListFireOnOneErrorWithAlreadyFiredDeferreds(self):
# Create some deferreds, and errback one
d1 = defer.Deferred()
d2 = defer.Deferred()
d1.errback(GenericError('Bang'))
# *Then* build the DeferredList, with fireOnOneErrback=True
dl = defer.DeferredList([d1, d2], fireOnOneErrback=True)
result = []
dl.addErrback(result.append)
self.failUnlessEqual(1, len(result))
d1.addErrback(lambda e: None) # Swallow error
def testDeferredListWithAlreadyFiredDeferreds(self):
# Create some deferreds, and err one, call the other
d1 = defer.Deferred()
d2 = defer.Deferred()
d1.errback(GenericError('Bang'))
d2.callback(2)
# *Then* build the DeferredList
dl = defer.DeferredList([d1, d2])
result = []
dl.addCallback(result.append)
self.failUnlessEqual(1, len(result))
d1.addErrback(lambda e: None) # Swallow error
def testTimeOut(self):
d = defer.Deferred()
d.setTimeout(1.0)
l = []
d.addErrback(l.append)
# Make sure the reactor is shutdown
d.addBoth(lambda x, r=reactor: r.crash())
self.assertEquals(l, [])
reactor.run()
self.assertEquals(len(l), 1)
self.assertEquals(l[0].type, defer.TimeoutError)
def testImmediateSuccess(self):
l = []
d = defer.succeed("success")
d.addCallback(l.append)
self.assertEquals(l, ["success"])
def testImmediateSuccess2(self):
l = []
d = defer.succeed("success")
# this is how trial.util.deferredResult works
d.setTimeout(1.0)
d.addCallback(l.append)
self.assertEquals(l, ["success"])
def testImmediateFailure(self):
l = []
d = defer.fail(GenericError("fail"))
d.addErrback(l.append)
self.assertEquals(str(l[0].value), "fail")
def testPausedFailure(self):
l = []
d = defer.fail(GenericError("fail"))
d.pause()
d.addErrback(l.append)
self.assertEquals(l, [])
d.unpause()
self.assertEquals(str(l[0].value), "fail")
def testCallbackErrors(self):
l = []
d = defer.Deferred().addCallback(lambda _: 1/0).addErrback(l.append)
d.callback(1)
self.assert_(isinstance(l[0].value, ZeroDivisionError))
l = []
d = defer.Deferred().addCallback(
lambda _: failure.Failure(ZeroDivisionError())).addErrback(l.append)
d.callback(1)
self.assert_(isinstance(l[0].value, ZeroDivisionError))
def testUnpauseBeforeCallback(self):
d = defer.Deferred()
d.pause()
d.addCallback(self._callback)
d.unpause()
def testReturnDeferred(self):
d = defer.Deferred()
d2 = defer.Deferred()
d2.pause()
d.addCallback(lambda r, d2=d2: d2)
d.addCallback(self._callback)
d.callback(1)
assert self.callback_results is None, "Should not have been called yet."
d2.callback(2)
assert self.callback_results is None, "Still should not have been called yet."
d2.unpause()
assert self.callback_results[0][0] == 2, "Result should have been from second deferred:%s"% (self.callback_results,)
def testGatherResults(self):
# test successful list of deferreds
l = []
defer.gatherResults([defer.succeed(1), defer.succeed(2)]).addCallback(l.append)
self.assertEquals(l, [[1, 2]])
# test failing list of deferreds
l = []
dl = [defer.succeed(1), defer.fail(ValueError)]
defer.gatherResults(dl).addErrback(l.append)
self.assert_(isinstance(l[0], failure.Failure))
self.assertEquals(len(l), 1)
# get rid of error
dl[1].addErrback(lambda e: 1)
def testMaybeDeferred(self):
S, E = [], []
d = defer.maybeDeferred((lambda x: x + 5), 10)
d.addCallbacks(S.append, E.append)
self.assertEquals(E, [])
self.assertEquals(S, [15])
S, E = [], []
try:
'10' + 5
except TypeError, e:
expected = str(e)
d = defer.maybeDeferred((lambda x: x + 5), '10')
d.addCallbacks(S.append, E.append)
self.assertEquals(S, [])
self.assertEquals(len(E), 1)
self.assertEquals(str(E[0].value), expected)
d = defer.Deferred()
reactor.callLater(0.2, d.callback, 'Success')
r = unittest.deferredResult(defer.maybeDeferred(lambda: d))
self.assertEquals(r, 'Success')
d = defer.Deferred()
reactor.callLater(0.2, d.errback, failure.Failure(RuntimeError()))
r = unittest.deferredError(defer.maybeDeferred(lambda: d))
r.trap(RuntimeError)
class AlreadyCalledTestCase(unittest.TestCase):
def setUp(self):
defer.Deferred.debug = True
def tearDown(self):
defer.Deferred.debug = False
def _callback(self, *args, **kw):
pass
def _errback(self, *args, **kw):
pass
def _call_1(self, d):
d.callback("hello")
def _call_2(self, d):
d.callback("twice")
def _err_1(self, d):
d.errback(failure.Failure(RuntimeError()))
def _err_2(self, d):
d.errback(failure.Failure(RuntimeError()))
def testAlreadyCalled_CC(self):
d = defer.Deferred()
d.addCallbacks(self._callback, self._errback)
self._call_1(d)
self.failUnlessRaises(defer.AlreadyCalledError, self._call_2, d)
def testAlreadyCalled_CE(self):
d = defer.Deferred()
d.addCallbacks(self._callback, self._errback)
self._call_1(d)
self.failUnlessRaises(defer.AlreadyCalledError, self._err_2, d)
def testAlreadyCalled_EE(self):
d = defer.Deferred()
d.addCallbacks(self._callback, self._errback)
self._err_1(d)
self.failUnlessRaises(defer.AlreadyCalledError, self._err_2, d)
def testAlreadyCalled_EC(self):
d = defer.Deferred()
d.addCallbacks(self._callback, self._errback)
self._err_1(d)
self.failUnlessRaises(defer.AlreadyCalledError, self._call_2, d)
def _count(self, linetype, func, lines, expected):
count = 0
for line in lines:
if (line.startswith(' %s:' % linetype) and
line.endswith(' %s' % func)):
count += 1
self.failUnless(count == expected)
def _check(self, e, caller, invoker1, invoker2):
# make sure the debugging information is vaguely correct
lines = e.args[0].split("\n")
# the creator should list the creator (testAlreadyCalledDebug) but not
# _call_1 or _call_2 or other invokers
self._count('C', caller, lines, 1)
self._count('C', '_call_1', lines, 0)
self._count('C', '_call_2', lines, 0)
self._count('C', '_err_1', lines, 0)
self._count('C', '_err_2', lines, 0)
# invoker should list the first invoker but not the second
self._count('I', invoker1, lines, 1)
self._count('I', invoker2, lines, 0)
def testAlreadyCalledDebug_CC(self):
d = defer.Deferred()
d.addCallbacks(self._callback, self._errback)
self._call_1(d)
try:
self._call_2(d)
except defer.AlreadyCalledError, e:
self._check(e, "testAlreadyCalledDebug_CC", "_call_1", "_call_2")
else:
self.fail("second callback failed to raise AlreadyCalledError")
def testAlreadyCalledDebug_CE(self):
d = defer.Deferred()
d.addCallbacks(self._callback, self._errback)
self._call_1(d)
try:
self._err_2(d)
except defer.AlreadyCalledError, e:
self._check(e, "testAlreadyCalledDebug_CE", "_call_1", "_err_2")
else:
self.fail("second errback failed to raise AlreadyCalledError")
def testAlreadyCalledDebug_EC(self):
d = defer.Deferred()
d.addCallbacks(self._callback, self._errback)
self._err_1(d)
try:
self._call_2(d)
except defer.AlreadyCalledError, e:
self._check(e, "testAlreadyCalledDebug_EC", "_err_1", "_call_2")
else:
self.fail("second callback failed to raise AlreadyCalledError")
def testAlreadyCalledDebug_EE(self):
d = defer.Deferred()
d.addCallbacks(self._callback, self._errback)
self._err_1(d)
try:
self._err_2(d)
except defer.AlreadyCalledError, e:
self._check(e, "testAlreadyCalledDebug_EE", "_err_1", "_err_2")
else:
self.fail("second errback failed to raise AlreadyCalledError")
def testNoDebugging(self):
defer.Deferred.debug = False
d = defer.Deferred()
d.addCallbacks(self._callback, self._errback)
self._call_1(d)
try:
self._call_2(d)
except defer.AlreadyCalledError, e:
self.failIf(e.args)
else:
self.fail("second callback failed to raise AlreadyCalledError")
class LogTestCase(unittest.TestCase):
def setUp(self):
self.c = []
log.addObserver(self.c.append)
def tearDown(self):
log.removeObserver(self.c.append)
def testErrorLog(self):
c = self.c
defer.Deferred().addCallback(lambda x: 1/0).callback(1)
# do you think it is rad to have memory leaks glyph
## d = defer.Deferred()
## d.addCallback(lambda x: 1/0)
## d.callback(1)
## del d
c2 = [e for e in c if e["isError"]]
self.assertEquals(len(c2), 2)
c2[1]["failure"].trap(ZeroDivisionError)
log.flushErrors(ZeroDivisionError)
class DeferredTestCaseII(unittest.TestCase):
def setUp(self):
self.callbackRan = 0
def testDeferredListEmpty(self):
"""Testing empty DeferredList."""
dl = defer.DeferredList([])
dl.addCallback(self.cb_empty)
def cb_empty(self, res):
self.callbackRan = 1
self.failUnlessEqual([], res)
def tearDown(self):
self.failUnless(self.callbackRan, "Callback was never run.")
syntax highlighted by Code2HTML, v. 0.9.1