# -*- test-case-name: twisted.test.test_spread -*-
# Twisted, the Framework of Your Internet
# Copyright (C) 2001 Matthew W. Lefkowitz
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""Utility classes for spread."""
from twisted.internet import defer
from twisted.python.failure import Failure
class LocalMethod:
def __init__(self, local, name):
self.local = local
self.name = name
def __call__(self, *args, **kw):
return self.local.callRemote(self.name, *args, **kw)
class LocalAsRemote:
"""
A class useful for emulating the effects of remote behavior locally.
"""
reportAllTracebacks = 1
def callRemote(self, name, *args, **kw):
"""Call a specially-designated local method.
self.callRemote('x') will first try to invoke a method named
sync_x and return its result (which should probably be a
Deferred). Second, it will look for a method called async_x,
which will be called and then have its result (or Failure)
automatically wrapped in a Deferred.
"""
if hasattr(self, 'sync_'+name):
return getattr(self, 'sync_'+name)(*args, **kw)
try:
method = getattr(self, "async_" + name)
return defer.succeed(method(*args, **kw))
except:
f = Failure()
if self.reportAllTracebacks:
f.printTraceback()
return defer.fail(f)
def remoteMethod(self, name):
return LocalMethod(self, name)
from twisted.python.components import Interface, implements
from twisted.python.reflect import prefixedMethodNames
class LocalAsyncForwarder:
"""A class useful for forwarding a locally-defined interface.
"""
def __init__(self, forwarded, interfaceClass, failWhenNotImplemented=0):
assert implements(forwarded, interfaceClass)
self.forwarded = forwarded
self.interfaceClass = interfaceClass
self.failWhenNotImplemented = failWhenNotImplemented
def _callMethod(self, method, *args, **kw):
return getattr(self.forwarded, method)(*args, **kw)
def callRemote(self, method, *args, **kw):
if hasattr(self.interfaceClass, method):
result = defer.maybeDeferred(self._callMethod, method, *args, **kw)
return result
elif self.failWhenNotImplemented:
return defer.fail(
Failure(NotImplementedError,
"No Such Method in Interface: %s" % method))
else:
return defer.succeed(None)
class Pager:
"""I am an object which pages out information.
"""
def __init__(self, collector, callback=None, *args, **kw):
"""
Create a pager with a Reference to a remote collector and
an optional callable to invoke upon completion.
"""
if callable(callback):
self.callback = callback
self.callbackArgs = args
self.callbackKeyword = kw
else:
self.callback = None
self._stillPaging = 1
self.collector = collector
collector.broker.registerPageProducer(self)
def stillPaging(self):
"""(internal) Method called by Broker.
"""
if not self._stillPaging:
self.collector.callRemote("endedPaging")
if self.callback is not None:
self.callback(*self.callbackArgs, **self.callbackKeyword)
return self._stillPaging
def sendNextPage(self):
"""(internal) Method called by Broker.
"""
self.collector.callRemote("gotPage", self.nextPage())
def nextPage(self):
"""Override this to return an object to be sent to my collector.
"""
raise NotImplementedError()
def stopPaging(self):
"""Call this when you're done paging.
"""
self._stillPaging = 0
class StringPager(Pager):
"""A simple pager that splits a string into chunks.
"""
def __init__(self, collector, st, chunkSize=8192, callback=None, *args, **kw):
self.string = st
self.pointer = 0
self.chunkSize = chunkSize
Pager.__init__(self, collector, callback, *args, **kw)
def nextPage(self):
val = self.string[self.pointer:self.pointer+self.chunkSize]
self.pointer += self.chunkSize
if self.pointer >= len(self.string):
self.stopPaging()
return val
from twisted.protocols import basic
from twisted.internet import interfaces
class FilePager(Pager):
"""Reads a file in chunks and sends the chunks as they come.
"""
__implements__ = interfaces.IConsumer
def __init__(self, collector, fd, callback=None, *args, **kw):
self.chunks = []
self.pointer = 0
self.startProducing(fd)
Pager.__init__(self, collector, callback, *args, **kw)
def startProducing(self, fd):
self.deferred = basic.FileSender().beginFileTransfer(fd, self)
self.deferred.addBoth(lambda x : self.stopPaging())
def registerProducer(self, producer, streaming):
self.producer = producer
if not streaming:
self.producer.resumeProducing()
def unregisterProducer(self):
self.producer = None
def write(self, chunk):
self.chunks.append(chunk)
def sendNextPage(self):
if self.pointer >= len(self.chunks):
return
val = self.chunks[self.pointer]
self.pointer += 1
self.producer.resumeProducing()
self.collector.callRemote("gotPage", val)
### Utility paging stuff.
from twisted.spread import pb
class CallbackPageCollector(pb.Referenceable):
"""I receive pages from the peer. You may instantiate a Pager with a
remote reference to me. I will call the callback with a list of pages
once they are all received."""
def __init__(self, callback):
self.pages = []
self.callback = callback
def remote_gotPage(self, page):
self.pages.append(page)
def remote_endedPaging(self):
self.callback(self.pages)
def getAllPages(referenceable, methodName, *args, **kw):
"""A utility method that will call a remote method which expects a
PageCollector as the first argument."""
d = defer.Deferred()
referenceable.callRemote(methodName, CallbackPageCollector(d.callback), *args, **kw)
return d
syntax highlighted by Code2HTML, v. 0.9.1