# Twisted, the Framework of Your Internet
# Copyright (C) 2003 Matthew W. Lefkowitz
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General
# Public License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA
#
# Author: Clark Evans (cce@clarkevans.com)
# Stability: The API is stable, but the implementation may still
# have one or more bugs; threads are tough.
#
""" flow.thread
Support for threads within a flow
"""
from __future__ import nested_scopes
from base import *
from twisted.python.failure import Failure
from twisted.internet import reactor
from time import sleep
class Threaded(Stage):
""" A stage which runs a blocking iterable in a separate thread
This stage tunnels output from an iterable executed in a separate
thread to the main thread. This process is carried out by
a result buffer, and returning Cooperate if the buffer is
empty. The wrapped iterable's __iter__ and next() methods
will only be invoked in the spawned thread.
This can be used in one of two ways, first, it can be
extended via inheritance; with the functionality of the
inherited code implementing next(), and using init() for
initialization code to be run in the thread.
If the iterable happens to have a chunked attribute, and
that attribute is true, then this wrapper will assume that
data arrives in chunks via a sequence instead of by values.
from __future__ import generators
from twisted.internet import reactor, defer
from twisted.flow import flow
from twisted.flow.threads import Threaded
def countSleep(index):
from time import sleep
for index in range(index):
sleep(.3)
print "sleep", index
yield index
def countCooperate(index):
for index in range(index):
yield flow.Cooperate(.1)
print "cooperate", index
yield "coop %s" % index
d = flow.Deferred( flow.Merge(
Threaded(countSleep(5)),
countCooperate(5)))
def prn(x):
print x
reactor.stop()
d.addCallback(prn)
reactor.run()
"""
class Instruction(CallLater):
def __init__(self):
self.callable = None
self.immediate = False
def callLater(self, callable):
if self.immediate:
reactor.callLater(0,callable)
else:
self.callable = callable
def __call__(self):
callable = self.callable
if callable:
self.callable = None
callable()
def __init__(self, iterable, *trap):
Stage.__init__(self, trap)
self._iterable = iterable
self._cooperate = Threaded.Instruction()
self.srcchunked = getattr(iterable, 'chunked', False)
reactor.callInThread(self._process)
def _process_result(self, val):
if self.srcchunked:
self.results.extend(val)
else:
self.results.append(val)
self._cooperate()
def _stopping(self):
self.stop = True
self._cooperate()
def _process(self):
try:
self._iterable = iter(self._iterable)
except:
self.failure = Failure()
else:
try:
while True:
val = self._iterable.next()
reactor.callFromThread(self._process_result, val)
except StopIteration:
reactor.callFromThread(self._stopping)
except:
self.failure = Failure()
reactor.callFromThread(self._cooperate)
self._cooperate.immediate = True
def _yield(self):
if self.results or self.stop or self.failure:
return
return self._cooperate
class QueryIterator:
""" Converts a database query into a result iterator
from __future__ import generators
from twisted.enterprise import adbapi
from twisted.internet import reactor
from twisted.flow import flow
from twisted.flow.threads import QueryIterator, Threaded
dbpool = adbapi.ConnectionPool("SomeDriver",host='localhost',
db='Database',user='User',passwd='Password')
# # I test with...
# from pyPgSQL import PgSQL
# dbpool = PgSQL
sql = '''
(SELECT 'one')
UNION ALL
(SELECT 'two')
UNION ALL
(SELECT 'three')
'''
def consumer():
print "executing"
query = Threaded(QueryIterator(dbpool, sql))
print "yielding"
yield query
print "done yeilding"
for row in query:
print "Processed result : ", row
yield query
from twisted.internet import reactor
def finish(result):
print "Deferred Complete : ", result
reactor.stop()
f = flow.Deferred(consumer())
f.addBoth(finish)
reactor.run()
"""
def __init__(self, pool, sql, fetchmany=False, fetchall=False):
self.curs = None
self.sql = sql
self.pool = pool
if fetchmany:
self.next = self.next_fetchmany
self.chunked = True
if fetchall:
self.next = self.next_fetchall
self.chunked = True
def __iter__(self):
self.conn = self.pool.connect()
self.curs = self.conn.cursor()
self.curs.execute(self.sql)
return self
def next_fetchall(self):
if self.curs:
ret = self.curs.fetchall()
self.curs = None
self.conn = None
return ret
raise StopIteration
def next_fetchmany(self):
ret = self.curs.fetchmany()
if not ret:
self.curs = None
self.conn = None
raise StopIteration
return ret
def next(self):
ret = self.curs.fetchone()
if not ret:
self.curs = None
self.conn = None
raise StopIteration
return ret
syntax highlighted by Code2HTML, v. 0.9.1