# Twisted, the Framework of Your Internet
# Copyright (C) 2003 Matthew W. Lefkowitz
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General
# Public License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA
#
# Author: Clark Evans (cce@clarkevans.com)
#
""" flow.base
This module contains the core exceptions and base classes
in the flow module. See flow.flow for more detailed information
"""
import twisted.python.compat
from twisted.internet import reactor
import time
#
# Exceptions used within flow
#
class Unsupported(NotImplementedError):
""" Indicates that the given stage does not know what to do
with the flow instruction that was returned.
"""
def __init__(self, inst):
msg = "Unsupported flow instruction: %s " % repr(inst)
TypeError.__init__(self,msg)
class NotReadyError(RuntimeError):
""" Raised when a stage has not been subject to a yield """
pass
#
# Abstract/Base Classes
#
class Instruction:
""" Has special meaning when yielded in a flow """
pass
class Controller:
""" Flow controller
At the base of every flow, is a controller class which
interprets the instructions, especially the CallLater
instructions. This is primarly just a marker class to
denote which classes consume Instruction events. If a
controller cannot handle a particular instruction, it
raises the Unsupported exception.
"""
pass
class CallLater(Instruction):
""" Instruction to support callbacks
This is the instruction which is returned during the yield
of the _Deferred and Callback stage. The underlying
flow driver should call the 'callLater' function with the
callable to be executed after each callback.
"""
def callLater(self, callable):
pass
class Cooperate(CallLater):
""" Requests that processing be paused so other tasks can resume
Yield this object when the current chain would block or periodically
during an intensive processing task. The flow mechanism uses these
objects to signal that the current processing chain should be paused
and resumed later. This allows other delayed operations to be
processed, etc. Usage is quite simple:
// within some generator wrapped by a Controller
yield Cooperate(1) # yield for a second or more
"""
def __init__(self, timeout = 0):
self.timeout = timeout
def callLater(self, callable):
reactor.callLater(self.timeout, callable)
class Stage(Instruction):
""" Abstract base defining protocol for iterator/generators in a flow
This is the primary component in the flow system, it is an
iterable object which must be passed to a yield statement
before each call to next(). Usage...
iterable = DerivedStage( ... , SpamError, EggsError))
yield iterable
for result in iterable:
// handle good result, or SpamError or EggsError
yield iterable
Alternatively, when inside a generator, the next() method can be
used directly. In this case, if no results are available,
StopIteration is raised, and if left uncaught, will nicely end
the generator. Of course, unexpected failures are raised. This
technique is especially useful when pulling from more than
one stage at a time.
def someGenerator():
iterable = SomeStage( ... , SpamError, EggsError)
while True:
yield iterable
result = iterable.next()
// handle good result or SpamError or EggsError
For many generators, the results become available in chunks
of rows. While the default value is to get one row at a time,
there is a 'chunked' property which allows them to be
returned via the next() method as many rows rather than
row by row.
iterable = DerivedStage(...)
iterable.chunked = True
for results in iterable:
for result in results:
// handle good result
yield iterable
For those wishing more control at the cost of a painful experience,
the following member variables can be used to great effect:
results This is a list of results produced by the generator,
they can be fetched one by one using next() or in a
group together. If no results were produced, then
this is an empty list. These results should be
removed from the list after they are read; or, after
reading all of the results set to an empty list
stop This is true if the underlying generator has finished
execution (raised a StopIteration or returned). Note
that several results may exist, and stop may be true.
failure If the generator produced an exception, then it is
wrapped as a Failure object and put here. Note that
several results may have been produced before the
failure. To ensure that the failure isn't accidently
reported twice, it is adviseable to set stop to True.
The order in which these member variables is used is *critical* for
proper adherance to the flow protocol. First, all successful
results should be handled. Second, the iterable should be checked
to see if it is finished. Third, a failure should be checked;
while handling a failure, either the loop should be exited, or
the iterable's stop member should be set.
iterable = SomeStage(...)
while True:
yield iterable
if iterable.results:
for result in iterable.results:
// handle good result
iterable.results = []
if iterable.stop:
break
if iterable.failure:
iterable.stop = True
// handle iterable.failure
break
"""
def __init__(self, *trap):
self._trap = trap
self.stop = False
self.failure = None
self.results = []
self.chunked = False
def __iter__(self):
return self
def next(self):
""" return current result
This is the primary function to be called to retrieve
the current result. It complies with the iterator
protocol by raising StopIteration when the stage is
complete. It also raises an exception if it is
called before the stage is yielded.
"""
if self.results:
if self.chunked:
ret = self.results
self.results = []
return ret
else:
return self.results.pop(0)
if self.stop:
raise StopIteration()
if self.failure:
self.stop = True
cr = self.failure.check(*self._trap)
if cr:
return cr
if self.failure.tb:
raise self.failure.value.__class__, \
self.failure.value, self.failure.tb
raise self.failure.value
raise NotReadyError("Must 'yield' this object before calling next()")
def _yield(self):
""" executed during a yield statement by previous stage
This method is private within the scope of the flow module,
it is used by one stage in the flow to ask a subsequent
stage to produce its value. The result of the yield is
then stored in self.result and is an instance of Failure
if a problem occurred.
"""
raise NotImplementedError
syntax highlighted by Code2HTML, v. 0.9.1