# -*- test-case-name: twisted.test.test_internet -*- # Twisted, the Framework of Your Internet # Copyright (C) 2001 Matthew W. Lefkowitz # # This library is free software; you can redistribute it and/or # modify it under the terms of version 2.1 of the GNU Lesser General Public # License as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """Very basic functionality for a Reactor implementation. API Stability: stable Maintainer: U{Itamar Shtull-Trauring} """ import socket # needed only for sync-dns import sys import warnings from bisect import insort try: import fcntl except ImportError: fcntl = None import traceback from twisted.internet.interfaces import IReactorCore, IReactorTime, IReactorUNIX, IReactorUNIXDatagram, IReactorThreads from twisted.internet.interfaces import IReactorTCP, IReactorUDP, IReactorSSL from twisted.internet.interfaces import IReactorProcess, IReactorPluggableResolver from twisted.internet.interfaces import IConnector, IDelayedCall from twisted.internet import main, error, abstract, defer from twisted.python import threadable, log, failure, reflect from twisted.python.runtime import seconds from twisted.internet.defer import Deferred, DeferredList from twisted.persisted import styles class DelayedCall(styles.Ephemeral): __implements__ = IDelayedCall # enable .debug to record creator call stack, and it will be logged if # an exception occurs while the function is being run debug = False def __init__(self, time, func, args, kw, cancel, reset): self.time, self.func, self.args, self.kw = time, func, args, kw self.resetter = reset self.canceller = cancel self.cancelled = self.called = 0 if self.debug: self.creator = traceback.format_stack()[:-2] def getTime(self): """Return the time at which this call will fire @rtype: C{float} @return: The number of seconds after the epoch at which this call is scheduled to be made. """ return self.time def cancel(self): """Unschedule this call @raise AlreadyCancelled: Raised if this call has already been unscheduled. @raise AlreadyCalled: Raised if this call has already been made. """ if self.cancelled: raise error.AlreadyCancelled elif self.called: raise error.AlreadyCalled else: self.canceller(self) self.cancelled = 1 def reset(self, secondsFromNow): """Reschedule this call for a different time @type secondsFromNow: C{float} @param secondsFromNow: The number of seconds from the time of the C{reset} call at which this call will be scheduled. @raise AlreadyCancelled: Raised if this call has been cancelled. @raise AlreadyCalled: Raised if this call has already been made. """ if self.cancelled: raise error.AlreadyCancelled elif self.called: raise error.AlreadyCalled else: self.time = seconds() + secondsFromNow self.resetter(self) def delay(self, secondsLater): """Reschedule this call for a later time @type secondsLater: C{float} @param secondsLater: The number of seconds after the originally scheduled time for which to reschedule this call. @raise AlreadyCancelled: Raised if this call has been cancelled. @raise AlreadyCalled: Raised if this call has already been made. """ if self.cancelled: raise error.AlreadyCancelled elif self.called: raise error.AlreadyCalled else: self.time += secondsLater self.resetter(self) def active(self): """Determine whether this call is still pending @rtype: C{bool} @return: True if this call has not yet been made or cancelled, False otherwise. """ return not (self.cancelled or self.called) def __lt__(self, other): # Order reversed for efficiency concerns, see below return self.time >= other.time def __str__(self): try: func = self.func.func_name try: func = self.func.im_class.__name__ + '.' + func except: func = self.func if hasattr(func, 'func_code'): func = func.func_code # func_code's repr sometimes has more useful info except: func = reflect.safe_repr(self.func) return "" % ( id(self), self.time - seconds(), self.called, self.cancelled, func, reflect.safe_repr(self.args)) class ReactorBase: """Default base class for Reactors. """ __implements__ = IReactorCore, IReactorTime, IReactorThreads, IReactorPluggableResolver installed = 0 __name__ = "twisted.internet.reactor" def __init__(self): self._eventTriggers = {} self._pendingTimedCalls = [] self.running = 0 self.waker = None self.resolver = None self.usingThreads = 0 self.addSystemEventTrigger('during', 'shutdown', self.crash) self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll) threadable.whenThreaded(self.initThreads) # override in subclasses _lock = None def initThreads(self): import thread self.usingThreads = 1 self.installWaker() self.threadCallQueue = [] threadCallQueue = None def installWaker(self): raise NotImplementedError() def installResolver(self, resolver): self.resolver = resolver def callFromThread(self, f, *args, **kw): """See twisted.internet.interfaces.IReactorThreads.callFromThread. """ assert callable(f), "%s is not callable" % f if threadable.isInIOThread(): self.callLater(0, f, *args, **kw) else: # lists are thread-safe in CPython, but not in Jython # this is probably a bug in Jython, but until fixed this code # won't work in Jython. self.threadCallQueue.append((f, args, kw)) self.wakeUp() def wakeUp(self): """Wake up the event loop.""" if not threadable.isInIOThread(): if self.waker: self.waker.wakeUp() # if the waker isn't installed, the reactor isn't running, and # therefore doesn't need to be woken up def doIteration(self): """Do one iteration over the readers and writers we know about.""" raise NotImplementedError def addReader(self, reader): raise NotImplementedError def addWriter(self, writer): raise NotImplementedError def removeReader(self, reader): raise NotImplementedError def removeWriter(self, writer): raise NotImplementedError def removeAll(self): raise NotImplementedError def resolve(self, name, timeout = 10): """Return a Deferred that will resolve a hostname. """ if not name: # XXX - This is *less than* '::', and will screw up IPv6 servers return defer.succeed('0.0.0.0') if abstract.isIPAddress(name): return defer.succeed(name) if self.resolver is None: return self._internalResolve(name, timeout) return self.resolver.getHostByName(name, timeout) def _internalResolve(self, name, timeout): try: address = socket.gethostbyname(name) except socket.error: return defer.fail(failure.Failure(error.DNSLookupError("address %r not found" % name))) else: return defer.succeed(address) # Installation. # IReactorCore def stop(self): """See twisted.internet.interfaces.IReactorCore.stop. """ self.fireSystemEvent("shutdown") def crash(self): """See twisted.internet.interfaces.IReactorCore.crash. """ self.running = 0 def sigInt(self, *args): """Handle a SIGINT interrupt. """ log.msg("Received SIGINT, shutting down.") self.callLater(0, self.stop) def sigBreak(self, *args): """Handle a SIGBREAK interrupt. """ log.msg("Received SIGBREAK, shutting down.") self.callLater(0, self.stop) def sigTerm(self, *args): """Handle a SIGTERM interrupt. """ log.msg("Received SIGTERM, shutting down.") self.callLater(0, self.stop) def disconnectAll(self): """Disconnect every reader, and writer in the system. """ selectables = self.removeAll() for reader in selectables: log.callWithLogger(reader, reader.connectionLost, failure.Failure(main.CONNECTION_LOST)) def iterate(self, delay=0): """See twisted.internet.interfaces.IReactorCore.iterate. """ self.runUntilCurrent() self.doIteration(delay) def fireSystemEvent(self, eventType): """See twisted.internet.interfaces.IReactorCore.fireSystemEvent. """ sysEvtTriggers = self._eventTriggers.get(eventType) if not sysEvtTriggers: return defrList = [] for callable, args, kw in sysEvtTriggers[0]: try: d = callable(*args, **kw) except: log.deferr() else: if isinstance(d, Deferred): defrList.append(d) if defrList: DeferredList(defrList).addBoth(self._cbContinueSystemEvent, eventType) else: self.callLater(0, self._continueSystemEvent, eventType) def _cbContinueSystemEvent(self, result, eventType): self._continueSystemEvent(eventType) def _continueSystemEvent(self, eventType): sysEvtTriggers = self._eventTriggers.get(eventType) for callList in sysEvtTriggers[1], sysEvtTriggers[2]: for callable, args, kw in callList: try: callable(*args, **kw) except: log.deferr() def addSystemEventTrigger(self, _phase, _eventType, _f, *args, **kw): """See twisted.internet.interfaces.IReactorCore.addSystemEventTrigger. """ assert callable(_f), "%s is not callable" % _f if self._eventTriggers.has_key(_eventType): triglist = self._eventTriggers[_eventType] else: triglist = [[], [], []] self._eventTriggers[_eventType] = triglist evtList = triglist[{"before": 0, "during": 1, "after": 2}[_phase]] evtList.append((_f, args, kw)) return (_phase, _eventType, (_f, args, kw)) def removeSystemEventTrigger(self, triggerID): """See twisted.internet.interfaces.IReactorCore.removeSystemEventTrigger. """ phase, eventType, item = triggerID self._eventTriggers[eventType][{"before": 0, "during": 1, "after": 2}[phase] ].remove(item) def callWhenRunning(self, _callable, *args, **kw): """See twisted.internet.interfaces.IReactorCore.callWhenRunning. """ if self.running: _callable(*args, **kw) else: return self.addSystemEventTrigger('after', 'startup', _callable, *args, **kw) # IReactorTime def callLater(self, _seconds, _f, *args, **kw): """See twisted.internet.interfaces.IReactorTime.callLater. """ assert callable(_f), "%s is not callable" % _f assert sys.maxint >= _seconds >= 0, \ "%s is not greater than or equal to 0 seconds" % (_seconds,) tple = DelayedCall(seconds() + _seconds, _f, args, kw, self._pendingTimedCalls.remove, self._resetCallLater) insort(self._pendingTimedCalls, tple) return tple def _resetCallLater(self, tple): assert tple in self._pendingTimedCalls self._pendingTimedCalls.remove(tple) insort(self._pendingTimedCalls, tple) return tple def cancelCallLater(self, callID): """See twisted.internet.interfaces.IReactorTime.cancelCallLater. """ # DO NOT DELETE THIS - this is documented in Python in a Nutshell, so we # we can't get rid of it for a long time. warnings.warn("reactor.cancelCallLater(callID) is deprecated - use callID.cancel() instead") callID.cancel() def getDelayedCalls(self): return tuple(self._pendingTimedCalls) def timeout(self): if self._pendingTimedCalls: t = self._pendingTimedCalls[-1].time - seconds() if t < 0: t = 0 return t else: return None def runUntilCurrent(self): """Run all pending timed calls. """ if self.threadCallQueue: # Keep track of how many calls we actually make, as we're # making them, in case another call is added to the queue # while we're in this loop. count = 0 for (f, a, kw) in self.threadCallQueue: try: f(*a, **kw) except: log.err() count += 1 del self.threadCallQueue[:count] now = seconds() while self._pendingTimedCalls and (self._pendingTimedCalls[-1].time <= now): call = self._pendingTimedCalls.pop() try: call.called = 1 call.func(*call.args, **call.kw) except: log.deferr() if hasattr(call, "creator"): e = "\n" e += " C: previous exception occurred in " + \ "a DelayedCall created here:\n" e += " C:" e += "".join(call.creator).rstrip().replace("\n","\n C:") e += "\n" log.msg(e) # IReactorThreads threadpool = None def _initThreadPool(self): from twisted.python import threadpool, threadable threadable.init(1) self.threadpool = threadpool.ThreadPool(0, 10) self.threadpool.start() self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop) def callInThread(self, _callable, *args, **kwargs): """See twisted.internet.interfaces.IReactorThreads.callInThread. """ if not self.threadpool: self._initThreadPool() self.threadpool.callInThread(_callable, *args, **kwargs) def suggestThreadPoolSize(self, size): """See twisted.internet.interfaces.IReactorThreads.suggestThreadPoolSize. """ if size == 0 and not self.threadpool: return if not self.threadpool: self._initThreadPool() self.threadpool.adjustPoolsize(maxthreads=size) # backwards compatibility def clientUNIX(self, address, protocol, timeout=30): """Deprecated - use connectUNIX instead. """ warnings.warn("clientUNIX is deprecated - use connectUNIX instead.", category=DeprecationWarning, stacklevel=2) f = BCFactory(protocol) self.connectUNIX(address, f, timeout) def clientTCP(self, host, port, protocol, timeout=30): """Deprecated - use connectTCP instead. """ warnings.warn("clientTCP is deprecated - use connectTCP instead.", category=DeprecationWarning, stacklevel=2) f = BCFactory(protocol) self.connectTCP(host, port, f, timeout) return f def clientSSL(self, host, port, protocol, contextFactory, timeout=30): """Deprecated - use connectSSL instead. """ warnings.warn("clientSSL is deprecated - use connectSSL instead.", category=DeprecationWarning, stacklevel=2) f = BCFactory(protocol) self.connectSSL(host, port, f, contextFactory, timeout) from protocol import ClientFactory class BCFactory(ClientFactory): """Factory for backwards compatability with old clientXXX APIs.""" def __init__(self, protocol): self.protocol = protocol self.connector = None def startedConnecting(self, connector): self.connector = connector def loseConnection(self): if self.connector: self.connector.stopConnecting() elif self.protocol: self.protocol.transport.loseConnection() def buildProtocol(self, addr): self.connector = None return self.protocol def clientConnectionFailed(self, connector, reason): self.connector = None self.protocol.connectionFailed() self.protocol = None class BaseConnector(styles.Ephemeral): """Basic implementation of connector. State can be: "connecting", "connected", "disconnected" """ __implements__ = IConnector, timeoutID = None factoryStarted = 0 def __init__(self, factory, timeout, reactor): self.state = "disconnected" self.reactor = reactor self.factory = factory self.timeout = timeout def disconnect(self): """Disconnect whatever our state is.""" if self.state == 'connecting': self.stopConnecting() elif self.state == 'connected': self.transport.loseConnection() def connect(self): """Start connection to remote server.""" if self.state != "disconnected": raise RuntimeError, "can't connect in this state" self.state = "connecting" if not self.factoryStarted: self.factory.doStart() self.factoryStarted = 1 self.transport = transport = self._makeTransport() if self.timeout is not None: self.timeoutID = self.reactor.callLater(self.timeout, transport.failIfNotConnected, error.TimeoutError()) self.factory.startedConnecting(self) def stopConnecting(self): """Stop attempting to connect.""" if self.state != "connecting": raise error.NotConnectingError, "we're not trying to connect" self.state = "disconnected" self.transport.failIfNotConnected(error.UserError()) del self.transport def cancelTimeout(self): if self.timeoutID: try: self.timeoutID.cancel() except ValueError: pass del self.timeoutID def buildProtocol(self, addr): self.state = "connected" self.cancelTimeout() return self.factory.buildProtocol(addr) def connectionFailed(self, reason): self.cancelTimeout() self.state = "disconnected" self.factory.clientConnectionFailed(self, reason) if self.state == "disconnected": # factory hasn't called our connect() method self.factory.doStop() self.factoryStarted = 0 def connectionLost(self, reason): self.state = "disconnected" self.factory.clientConnectionLost(self, reason) if self.state == "disconnected": # factory hasn't called our connect() method self.factory.doStop() self.factoryStarted = 0 class BasePort(abstract.FileDescriptor): """Basic implementation of a ListeningPort.""" addressFamily = None socketType = None def createInternetSocket(self): s = socket.socket(self.addressFamily, self.socketType) s.setblocking(0) if fcntl and hasattr(fcntl, 'FD_CLOEXEC'): old = fcntl.fcntl(s.fileno(), fcntl.F_GETFD) fcntl.fcntl(s.fileno(), fcntl.F_SETFD, old | fcntl.FD_CLOEXEC) return s def doWrite(self): """Raises a RuntimeError""" raise RuntimeError, "doWrite called on a %s" % reflect.qual(self.__class__) __all__ = ["ReactorBase"]