# Twisted, the Framework of Your Internet
# Copyright (C) 2001 Matthew W. Lefkowitz
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Author: Clark C. Evans
#
from __future__ import nested_scopes
from __future__ import generators
from twisted.flow import flow
from twisted.flow.threads import Threaded, QueryIterator
from twisted.trial import unittest
from twisted.python import failure, threadable
from twisted.internet import defer, reactor, protocol
from time import sleep
threadable.init()
class slowlist:
""" this is a generator based list
def slowlist(list):
list = list[:]
while list:
yield list.pop(0)
It is primarly used to simulate generators by using
a list (for testing purposes) without being wrapped
as a flow.List, which has all kinds of shortcuts we
don't want for testing.
"""
def __init__(self, list):
self.list = list[:]
def __iter__(self):
return self
def next(self):
if self.list:
return self.list.pop(0)
raise StopIteration
_onetwothree = ['one','two',flow.Cooperate(),'three']
class producer:
""" iterator version of the following generator...
def producer():
lst = flow.wrap(slowlist([1,2,3]))
nam = flow.wrap(slowlist(_onetwothree))
while True:
yield lst
yield nam
yield (lst.next(),nam.next())
"""
def __iter__(self):
self.lst = flow.wrap(slowlist([1,2,3]))
self.nam = flow.wrap(slowlist(_onetwothree))
self.next = self.yield_lst
return self
def yield_lst(self):
self.next = self.yield_nam
return self.lst
def yield_nam(self):
self.next = self.yield_results
return self.nam
def yield_results(self):
self.next = self.yield_lst
return (self.lst.next(), self.nam.next())
class consumer:
""" iterator version of the following generator...
def consumer():
title = flow.wrap(['Title'])
prod = flow.wrap(producer())
yield title
yield title.next()
yield prod
for data in prod:
yield data
yield prod
"""
def __iter__(self):
self.title = flow.wrap(['Title'])
self.lst = flow.wrap(producer())
self.next = self.yield_title
return self
def yield_title(self):
self.next = self.yield_title_result
return self.title
def yield_title_result(self):
self.next = self.yield_lst
return self.title.next()
def yield_lst(self):
self.next = self.yield_result
return self.lst
def yield_result(self):
self.next = self.yield_lst
return self.lst.next()
class badgen:
""" a bad generator...
def badgen():
yield 'x'
err = 3/ 0
"""
def __iter__(self):
self.next = self.yield_x
return self
def yield_x(self):
self.next = self.yield_done
return 'x'
def yield_done(self):
err = 3 / 0
raise StopIteration
class buildlist:
""" building a list
def buildlist(src):
out = []
yield src
for itm in src:
out.append(itm)
yield src
yield out
"""
def __init__(self, src):
self.src = src
def __iter__(self):
self.out = []
self.next = self.yield_src
return self
def yield_src(self):
self.next = self.yield_append
return self.src
def yield_append(self):
try:
self.out.append(self.src.next())
except StopIteration:
self.next = self.yield_finish
return self.out
return self.src
def yield_finish(self):
raise StopIteration
class testconcur:
""" interweving two concurrent stages
def testconcur(*stages):
both = flow.Concurrent(*stages)
yield both
for stage in both:
yield (stage.name, stage.result)
yield both
"""
def __init__(self, *stages):
self.both = flow.Concurrent(*stages)
def __iter__(self):
self.next = self.yield_both
return self
def yield_both(self):
self.next = self.yield_result
return self.both
def yield_result(self):
self.next = self.yield_both
stage = self.both.next()
return (stage.name, stage.next())
class echoServer:
""" a simple echo protocol, server side
def echoServer(conn):
yield conn
for data in conn:
yield data
yield conn
"""
def __init__(self, conn):
self.conn = conn
def __iter__(self):
self.next = self.yield_conn
return self
def yield_conn(self):
self.next = self.yield_data
return self.conn
def yield_data(self):
self.next = self.yield_conn
return self.conn.next()
class echoClient:
""" a simple echo client tester
def echoClient(conn):
yield "testing"
yield conn
# signal that we are done
conn.d.callback(conn.next())
"""
def __init__(self, conn):
self.conn = conn
def __iter__(self):
self.next = self.yield_testing
return self
def yield_testing(self):
self.next = self.yield_conn
return "testing"
def yield_conn(self):
self.next = self.yield_stop
return self.conn
def yield_stop(self):
# signal that we are done
self.conn.factory.d.callback(self.conn.next())
raise StopIteration()
class CountIterator:
def __init__(self, count):
self.count = count
def __iter__(self):
return self
def next(self): # this is run in a separate thread
sleep(.1)
val = self.count
if not(val):
raise StopIteration
self.count -= 1
return val
class FlowTest(unittest.TestCase):
def testNotReady(self):
x = flow.wrap([1,2,3])
self.assertRaises(flow.NotReadyError,x.next)
def testBasic(self):
lhs = ['string']
rhs = list(flow.Block('string'))
self.assertEqual(lhs,rhs)
def testBasicList(self):
lhs = [1,2,3]
rhs = list(flow.Block([1,2,flow.Cooperate(),3]))
self.assertEqual(lhs,rhs)
def testBasicIterator(self):
lhs = ['one','two','three']
rhs = list(flow.Block(slowlist(_onetwothree)))
self.assertEqual(lhs,rhs)
def testCallable(self):
lhs = ['one','two','three']
rhs = list(flow.Block(slowlist(_onetwothree)))
self.assertEqual(lhs,rhs)
def testProducer(self):
lhs = [(1,'one'),(2,'two'),(3,'three')]
rhs = list(flow.Block(producer()))
self.assertEqual(lhs,rhs)
def testConsumer(self):
lhs = ['Title',(1,'one'),(2,'two'),(3,'three')]
rhs = list(flow.Block(consumer))
self.assertEqual(lhs,rhs)
def testFailure(self):
self.assertRaises(ZeroDivisionError, list, flow.Block(badgen()))
self.assertEqual(['x',ZeroDivisionError],
list(flow.Block(badgen(),ZeroDivisionError)))
self.assertEqual(['x',ZeroDivisionError],
list(flow.Block(flow.wrap(badgen()),
ZeroDivisionError)))
def testZip(self):
lhs = [(1,'a'),(2,'b'),(3,None)]
mrg = flow.Zip([1,2,flow.Cooperate(),3],['a','b'])
rhs = list(flow.Block(mrg))
self.assertEqual(lhs,rhs)
def testMerge(self):
lhs = ['one', 1, 'two', 2, 3, 'three']
mrg = flow.Merge(slowlist(_onetwothree),slowlist([1,2,3]))
rhs = list(flow.Block(mrg))
self.assertEqual(lhs,rhs)
def testFilter(self):
def odd(val):
if val % 2:
return True
lhs = [ 1, 3 ]
mrg = flow.Filter(odd,slowlist([1,2,flow.Cooperate(),3]))
rhs = list(flow.Block(mrg))
self.assertEqual(lhs,rhs)
def testLineBreak(self):
lhs = [ "Hello World", "Happy Days Are Here" ]
rhs = ["Hello ","World\nHappy", flow.Cooperate(),
" Days"," Are Here\n"]
mrg = flow.LineBreak(slowlist(rhs), delimiter='\n')
rhs = list(flow.Block(mrg))
self.assertEqual(lhs,rhs)
def testDeferred(self):
lhs = ['Title', (1,'one'),(2,'two'),(3,'three')]
d = flow.Deferred(consumer())
self.assertEquals(lhs, unittest.deferredResult(d))
def testBuildList(self):
src = flow.wrap([1,2,3])
out = flow.Block(buildlist(src)).next()
self.assertEquals(out,[1,2,3])
def testDeferredFailure(self):
d = flow.Deferred(badgen())
unittest.deferredError(d).trap(ZeroDivisionError)
def testDeferredTrap(self):
d = flow.Deferred(badgen(), ZeroDivisionError)
r = unittest.deferredResult(d)
self.assertEqual(r, ['x',ZeroDivisionError])
def testZipFailure(self):
lhs = [(1,'a'),(2,'b'),(3,'c')]
mrg = flow.Zip([1,2,flow.Cooperate(),3],badgen())
d = flow.Deferred(mrg)
unittest.deferredError(d).trap(ZeroDivisionError)
def testDeferredWrapper(self):
from twisted.internet import defer
from twisted.internet import reactor
a = defer.Deferred()
reactor.callLater(0, lambda: a.callback("test"))
b = flow.Merge(a, slowlist([1,2,flow.Cooperate(),3]))
rhs = unittest.deferredResult(flow.Deferred(b))
self.assertEquals(rhs, [1, 2, 'test', 3])
def testDeferredWrapperImmediate(self):
from twisted.internet import defer
a = defer.Deferred()
a.callback("test")
self.assertEquals(["test"], list(flow.Block(a)))
def testDeferredWrapperFail(self):
from twisted.internet import defer
from twisted.internet import reactor
d = defer.Deferred()
f = lambda: d.errback(flow.Failure(IOError()))
reactor.callLater(0, f)
unittest.deferredError(d).trap(IOError)
def testCallback(self):
cb = flow.Callback()
d = flow.Deferred(buildlist(cb))
for x in range(9):
cb.result(x)
cb.finish()
rhs = unittest.deferredResult(d)
self.assertEquals([range(9)],rhs)
def testCallbackFailure(self):
cb = flow.Callback()
d = flow.Deferred(buildlist(cb))
for x in range(3):
cb.result(x)
cb.errback(flow.Failure(IOError()))
unittest.deferredError(d).trap(IOError)
def testConcurrentCallback(self):
ca = flow.Callback()
ca.name = 'a'
cb = flow.Callback()
cb.name = 'b'
d = flow.Deferred(testconcur(ca,cb))
ca.result(1)
cb.result(2)
ca.result(3)
ca.result(4)
ca.finish()
cb.result(5)
cb.finish()
rhs = unittest.deferredResult(d)
self.assertEquals([('a',1),('b',2),('a',3),('a',4),('b',5)],rhs)
def testProtocolLocalhost(self):
# this fails if parallel tests are run on the same box
server = protocol.ServerFactory()
server.protocol = flow.Protocol
server.protocol.controller = echoServer
port = reactor.listenTCP(0, server)
client = protocol.ClientFactory()
client.protocol = flow.makeProtocol(echoClient)
client.d = defer.Deferred()
reactor.connectTCP("127.0.0.1", port.getHost()[2], client)
self.assertEquals('testing', unittest.deferredResult(client.d))
def testProtocol(self):
from twisted.protocols import loopback
server = flow.Protocol()
server.controller = echoServer
client = flow.makeProtocol(echoClient)()
client.factory = protocol.ClientFactory()
client.factory.d = defer.Deferred()
loopback.loopback(server, client)
self.assertEquals('testing', unittest.deferredResult(client.factory.d))
def testThreaded(self):
expect = [5,4,3,2,1]
d = flow.Deferred(Threaded(CountIterator(5)))
self.assertEquals(expect, unittest.deferredResult(d))
def testThreadedError(self):
# is this the expected behaviour?
def iterator():
yield 1
raise ValueError
f = unittest.deferredError(flow.Deferred(Threaded(iterator())))
f.trap(ValueError)
def testThreadedSleep(self):
expect = [5,4,3,2,1]
d = flow.Deferred(Threaded(CountIterator(5)))
sleep(.5)
self.assertEquals(expect, unittest.deferredResult(d))
def testQueryIterator(self):
try:
from pyPgSQL import PgSQL
dbpool = PgSQL
c = dbpool.connect()
r = c.cursor()
r.execute("SELECT 'x'")
r.fetchone()
except:
# PostgreSQL is not installed or bad permissions
return
expect = [['one'],['two'],['three']]
sql = """
(SELECT 'one')
UNION ALL
(SELECT 'two')
UNION ALL
(SELECT 'three')
"""
d = flow.Deferred(Threaded(QueryIterator(dbpool, sql)))
self.assertEquals(expect, unittest.deferredResult(d))
def testThreadedImmediate(self):
"""
The goal of this test is to test the callback mechanism with
regard to threads, namely to assure that results can be
accumulated before they are needed; and that left-over results
are immediately made available on the next round (even though
the producing thread has shut down). This is a very tough thing
to test due to the timing issues. So it may fail on some
platforms, I'm not sure.
"""
expect = [5,4,3,2,1]
result = []
f = Threaded(CountIterator(5))
d = defer.Deferred()
def process():
coop = f._yield()
if f.results:
result.extend(f.results)
f.results = []
reactor.callLater(0, process)
return
if coop:
sleep(.3)
reactor.callLater(0, coop.callLater, process)
return
if f.stop:
reactor.callLater(0, d.callback, result)
reactor.callLater(0, process)
self.assertEquals(expect, unittest.deferredResult(d))
syntax highlighted by Code2HTML, v. 0.9.1