# Twisted, the Framework of Your Internet # Copyright (C) 2003 Matthew W. Lefkowitz # # This library is free software; you can redistribute it and/or # modify it under the terms of version 2.1 of the GNU Lesser General Public # License as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """ This module provides support for Twisted to interact with CoreFoundation CFRunLoops. This includes Cocoa's NSRunLoop. In order to use this support, simply do the following:: | from twisted.internet import cfreactor | cfreactor.install() Then use the twisted.internet APIs as usual. The other methods here are not intended to be called directly under normal use. However, install can take a runLoop kwarg, and run will take a withRunLoop arg if you need to explicitly pass a CFRunLoop for some reason. Otherwise it will make a pretty good guess as to which runLoop you want (the current NSRunLoop if PyObjC is imported, otherwise the current CFRunLoop. Either way, if one doesn't exist, it will be created). API Stability: stable Maintainer: U{Bob Ippolito} """ __all__ = ['install'] import sys import cfsupport as cf from twisted.python import log, threadable, failure from twisted.internet import main, default, error from weakref import WeakKeyDictionary # cache two extremely common "failures" without traceback info _faildict = { error.ConnectionDone: failure.Failure(error.ConnectionDone()), error.ConnectionLost: failure.Failure(error.ConnectionLost()), } class SelectableSocketWrapper(object): _objCache = WeakKeyDictionary() cf = None def socketWrapperForReactorAndObject(klass, reactor, obj): _objCache = klass._objCache if obj in _objCache: return _objCache[obj] v = _objCache[obj] = klass(reactor, obj) return v socketWrapperForReactorAndObject = classmethod(socketWrapperForReactorAndObject) def __init__(self, reactor, obj): if self.cf: raise ValueError, "This socket wrapper is already initialized" self.reactor = reactor self.obj = obj obj._orig_ssw_connectionLost = obj.connectionLost obj.connectionLost = self.objConnectionLost self.fd = obj.fileno() self.writing = False self.reading = False self.wouldRead = False self.wouldWrite = False self.cf = cf.PyCFSocket(obj.fileno(), self.doRead, self.doWrite, self.doConnect) self.cf.stopWriting() reactor.getRunLoop().addSocket(self.cf) def __repr__(self): return 'SSW(fd=%r r=%r w=%r x=%08x o=%08x)' % (self.fd, int(self.reading), int(self.writing), id(self), id(self.obj)) def objConnectionLost(self, *args, **kwargs): obj = self.obj self.reactor.removeReader(obj) self.reactor.removeWriter(obj) obj.connectionLost = obj._orig_ssw_connectionLost obj.connectionLost(*args, **kwargs) try: del self._objCache[obj] except: pass self.obj = None self.cf = None def doConnect(self, why): pass def startReading(self): self.cf.startReading() self.reading = True if self.wouldRead: if not self.reactor.running: self.reactor.callLater(0, self.doRead) else: self.doRead() self.wouldRead = False return self def stopReading(self): self.cf.stopReading() self.reading = False self.wouldRead = False return self def startWriting(self): self.cf.startWriting() self.writing = True if self.wouldWrite: if not self.reactor.running: self.reactor.callLater(0, self.doWrite) else: self.doWrite() self.wouldWrite = False return self def stopWriting(self): self.cf.stopWriting() self.writing = False self.wouldWrite = False def _finishReadOrWrite(self, fn, faildict=_faildict): try: why = fn() except: why = sys.exc_info()[1] log.err() if why: try: f = faildict.get(why.__class__) or failure.Failure(why) self.objConnectionLost(f) except: log.err() if self.reactor.running: self.reactor.simulate() def doRead(self): obj = self.obj if not obj: return if not self.reading: self.wouldRead = True if self.reactor.running: self.reactor.simulate() return self._finishReadOrWrite(obj.doRead) def doWrite(self): obj = self.obj if not obj: return if not self.writing: self.wouldWrite = True if self.reactor.running: self.reactor.simulate() return self._finishReadOrWrite(obj.doWrite) def __hash__(self): return hash(self.fd) class CFReactor(default.PosixReactorBase): # how long to poll if we're don't care about signals longIntervalOfTime = 60.0 # how long we should poll if we do care about signals shortIntervalOfTime = 1.0 # don't set this pollInterval = longIntervalOfTime def __init__(self, runLoop=None): self.readers = {} self.writers = {} self.running = 0 self.crashing = False self._doRunUntilCurrent = True self.timer = None self.runLoop = None self.inheritedRunLoop = runLoop is not None if self.inheritedRunLoop: self.getRunLoop(runLoop) default.PosixReactorBase.__init__(self) def installWaker(self): # I don't know why, but the waker causes 100% CPU # so for now we don't install one, ever. return def getRunLoop(self, runLoop=None): if self.runLoop is None: # If Foundation is loaded, assume they want the current # NSRunLoop, not the base CFRunLoop. # If None or an NSRunLoop instance is given, then we assume # the user has caused it to begin running. In reality, # NSApplication probably started it for them. # # If this is a wrong guess, the user can make the runloop go # on their own after reactor.run(). It's a pretty good guess, # though. if 'Foundation' in sys.modules: from Foundation import NSRunLoop nsRunLoop = runLoop or NSRunLoop.currentRunLoop() if isinstance(nsRunLoop, NSRunLoop): runLoop = nsRunLoop.getCFRunLoop() self.inheritedRunLoop = True self.runLoop = cf.PyCFRunLoop(runLoop) return self.runLoop def addReader(self, reader): self.readers[reader] = SelectableSocketWrapper.socketWrapperForReactorAndObject(self, reader).startReading() def addWriter(self, writer): self.writers[writer] = SelectableSocketWrapper.socketWrapperForReactorAndObject(self, writer).startWriting() def removeReader(self, reader): wrapped = self.readers.get(reader, None) if wrapped is not None: del self.readers[reader] wrapped.stopReading() def removeWriter(self, writer): wrapped = self.writers.get(writer, None) if wrapped is not None: del self.writers[writer] wrapped.stopWriting() def removeAll(self): r = self.readers.keys() for s in self.readers.itervalues(): s.stopReading() for s in self.writers.itervalues(): s.stopWriting() self.readers.clear() self.writers.clear() return r def run(self, installSignalHandlers=1, withRunLoop=None): if self.running: raise ValueError, "Reactor already running" if installSignalHandlers: self.pollInterval = self.shortIntervalOfTime runLoop = self.getRunLoop(withRunLoop) self._startup() self.startRunning(installSignalHandlers=installSignalHandlers) self.running = True if not self.inheritedRunLoop: # Inherited runLoops are assumed to be running already, # but we created this one so we have to start it. runLoop.run() self.crashing = False def callLater(self, howlong, *args, **kwargs): rval = default.PosixReactorBase.callLater(self, howlong, *args, **kwargs) if self.timer: timeout = self.timeout() if timeout is None: timeout = howlong sleepUntil = cf.now() + min(timeout, howlong) if sleepUntil < self.timer.getNextFireDate(): self.timer.setNextFireDate(sleepUntil) else: pass return rval def iterate(self, howlong=0.0): if self.running: raise ValueError, "Can't iterate a running reactor" self.runUntilCurrent() self.doIteration(howlong) def doIteration(self, howlong): if self.running: raise ValueError, "Can't iterate a running reactor" howlong = howlong or 0.01 pi = self.pollInterval self.pollInterval = howlong self._doRunUntilCurrent = False self.run() self._doRunUntilCurrent = True self.pollInterval = pi def simulate(self): if self.crashing: return if not self.running: raise ValueError, "You can't simulate a stopped reactor" if self._doRunUntilCurrent: self.runUntilCurrent() if self.crashing: return if self.timer is None: return nap = self.timeout() if nap is None: nap = self.pollInterval else: nap = min(self.pollInterval, nap) if self.running: self.timer.setNextFireDate(cf.now() + nap) if not self._doRunUntilCurrent: self.crash() def _startup(self): if self.running: raise ValueError, "Can't bootstrap a running reactor" self.timer = cf.PyCFRunLoopTimer(cf.now(), self.pollInterval, self.simulate) self.runLoop.addTimer(self.timer) def cleanup(self): pass def sigInt(self, *args): self.callLater(0.0, self.stop) def crash(self): if not self.running: raise ValueError, "Can't crash a stopped reactor" self.running = False self.crashing = True if self.timer is not None: self.runLoop.removeTimer(self.timer) self.timer = None if not self.inheritedRunLoop: self.runLoop.stop() def stop(self): if not self.running: raise ValueError, "Can't stop a stopped reactor" default.PosixReactorBase.stop(self) def install(runLoop=None): """Configure the twisted mainloop to be run inside CFRunLoop. """ reactor = CFReactor(runLoop=runLoop) reactor.addSystemEventTrigger('after', 'shutdown', reactor.cleanup) from twisted.internet.main import installReactor installReactor(reactor) return reactor