# -*- test-case-name: twisted.test.test_udp -*-
# Twisted, the Framework of Your Internet
# Copyright (C) 2001 Matthew W. Lefkowitz
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""Various asynchronous UDP classes.
Please do not use this module directly.
API Stability: semi-stable
Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>}
"""
# System Imports
import os
import socket
import operator
import struct
import warnings
from twisted.python.runtime import platformType
if platformType == 'win32':
from errno import WSAEWOULDBLOCK as EWOULDBLOCK
from errno import WSAEINTR as EINTR
from errno import WSAEMSGSIZE as EMSGSIZE
from errno import WSAECONNREFUSED as ECONNREFUSED
from errno import WSAECONNRESET
from errno import EAGAIN
elif platformType != 'java':
from errno import EWOULDBLOCK, EINTR, EMSGSIZE, ECONNREFUSED, EAGAIN
# Twisted Imports
from twisted.internet import protocol, base, defer, address
from twisted.persisted import styles
from twisted.python import log, reflect
# Sibling Imports
import abstract, main, error, interfaces
class Port(base.BasePort):
"""UDP port, listening for packets."""
__implements__ = base.BasePort.__implements__, interfaces.IUDPTransport, interfaces.ISystemHandle
addressFamily = socket.AF_INET
socketType = socket.SOCK_DGRAM
maxThroughput = 256 * 1024 # max bytes we read in one eventloop iteration
def __init__(self, port, proto, interface='', maxPacketSize=8192, reactor=None):
"""Initialize with a numeric port to listen on.
"""
assert isinstance(proto, protocol.DatagramProtocol)
base.BasePort.__init__(self, reactor)
self.port = port
self.protocol = proto
self.maxPacketSize = maxPacketSize
self.interface = interface
self.setLogStr()
self._connected = False
self._connectedAddr = None
def __repr__(self):
return "<%s on %s>" % (self.protocol.__class__, self.port)
def getHandle(self):
"""Return a socket object."""
return self.socket
def startListening(self):
"""Create and bind my socket, and begin listening on it.
This is called on unserialization, and must be called after creating a
server to begin listening on the specified port.
"""
self._bindSocket()
self._connectToProtocol()
def _bindSocket(self):
log.msg("%s starting on %s"%(self.protocol.__class__, self.port))
try:
skt = self.createInternetSocket()
skt.bind((self.interface, self.port))
except socket.error, le:
raise error.CannotListenError, (self.interface, self.port, le)
self.connected = 1
self.socket = skt
self.fileno = self.socket.fileno
def _connectToProtocol(self):
self.protocol.makeConnection(self)
self.startReading()
def doRead(self):
"""Called when my socket is ready for reading."""
read = 0
while read < self.maxThroughput:
try:
data, addr = self.socket.recvfrom(self.maxPacketSize)
read += len(data)
self.protocol.datagramReceived(data, addr)
except socket.error, se:
no = se.args[0]
if no in (EAGAIN, EINTR, EWOULDBLOCK):
return
if (no == ECONNREFUSED) or (platformType == "win32" and no == WSAECONNRESET):
# XXX for the moment we don't deal with connection refused
# in non-connected UDP sockets.
pass
else:
raise
except:
log.deferr()
def write(self, datagram, addr=None):
"""Write a datagram.
@param addr: should be a tuple (ip, port), can be None in connected mode.
"""
if self._connected:
assert addr in (None, self._connectedAddr)
try:
return self.socket.send(datagram)
except socket.error, se:
no = se.args[0]
if no == EINTR:
return self.write(datagram)
elif no == EMSGSIZE:
raise error.MessageLengthError, "message too long"
elif no == ECONNREFUSED:
self.protocol.connectionRefused()
else:
raise
else:
assert addr != None
if not addr[0].replace(".", "").isdigit():
warnings.warn("Please only pass IPs to write(), not hostnames", DeprecationWarning, stacklevel=2)
try:
return self.socket.sendto(datagram, addr)
except socket.error, se:
no = se.args[0]
if no == EINTR:
return self.write(datagram, addr)
elif no == EMSGSIZE:
raise error.MessageLengthError, "message too long"
else:
raise
def writeSequence(self, seq, addr):
self.write("".join(seq), addr)
def connect(self, host, port):
"""'Connect' to remote server."""
if self._connected:
raise RuntimeError, "already connected, reconnecting is not currently supported"
if not abstract.isIPAddress(host):
raise ValueError, "please pass only IP addresses, not domain names"
self._connected = True
self._connectedAddr = (host, port)
self.socket.connect((host, port))
def loseConnection(self):
"""Stop accepting connections on this port.
This will shut down my socket and call self.connectionLost().
"""
self.stopReading()
if self.connected:
from twisted.internet import reactor
reactor.callLater(0, self.connectionLost)
def stopListening(self):
if self.connected:
result = self.d = defer.Deferred()
else:
result = None
self.loseConnection()
return result
def connectionLost(self, reason=None):
"""Cleans up my socket.
"""
log.msg('(Port %s Closed)' % self.port)
base.BasePort.connectionLost(self, reason)
if hasattr(self, "protocol"):
# we won't have attribute in ConnectedPort, in cases
# where there was an error in connection process
self.protocol.doStop()
self.connected = 0
self.socket.close()
del self.socket
del self.fileno
if hasattr(self, "d"):
self.d.callback(None)
del self.d
def setLogStr(self):
self.logstr = reflect.qual(self.protocol.__class__) + " (UDP)"
def logPrefix(self):
"""Returns the name of my class, to prefix log entries with.
"""
return self.logstr
def getHost(self):
"""
Returns an IPv4Address.
This indicates the address from which I am connecting.
"""
return address.IPv4Address('UDP', *(self.socket.getsockname() + ('INET_UDP',)))
class ConnectedPort(Port):
"""DEPRECATED.
A connected UDP socket."""
__implements__ = Port.__implements__, interfaces.IUDPConnectedTransport
def __init__(self, (remotehost, remoteport), port, proto, interface='', maxPacketSize=8192, reactor=None):
assert isinstance(proto, protocol.ConnectedDatagramProtocol)
Port.__init__(self, port, proto, interface, maxPacketSize, reactor)
self.remotehost = remotehost
self.remoteport = remoteport
def startListening(self):
self._bindSocket()
if abstract.isIPAddress(self.remotehost):
self.setRealAddress(self.remotehost)
else:
self.realAddress = None
d = self.reactor.resolve(self.remotehost)
d.addCallback(self.setRealAddress).addErrback(self.connectionFailed)
def setRealAddress(self, addr):
self.realAddress = addr
self.socket.connect((addr, self.remoteport))
self._connectToProtocol()
def connectionFailed(self, reason):
self.loseConnection()
self.protocol.connectionFailed(reason)
del self.protocol
def doRead(self):
"""Called when my socket is ready for reading."""
read = 0
while read < self.maxThroughput:
try:
data, addr = self.socket.recvfrom(self.maxPacketSize)
read += len(data)
self.protocol.datagramReceived(data)
except socket.error, se:
no = se.args[0]
if no in (EAGAIN, EINTR, EWOULDBLOCK):
return
if (no == ECONNREFUSED) or (platformType == "win32" and no == WSAECONNRESET):
self.protocol.connectionRefused()
else:
raise
except:
log.deferr()
def write(self, data):
"""Write a datagram."""
try:
return self.socket.send(data)
except socket.error, se:
no = se.args[0]
if no == EINTR:
return self.write(data)
elif no == EMSGSIZE:
raise error.MessageLengthError, "message too long"
elif no == ECONNREFUSED:
self.protocol.connectionRefused()
else:
raise
def getPeer(self):
"""
Returns a tuple of ('INET_UDP', hostname, port), indicating
the remote address.
"""
return address.IPv4Address('UDP', self.remotehost, self.remoteport, 'INET_UDP')
class MulticastMixin:
"""Implement multicast functionality."""
def getOutgoingInterface(self):
i = self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF)
return socket.inet_ntoa(struct.pack("@i", i))
def setOutgoingInterface(self, addr):
"""Returns Deferred of success."""
return self.reactor.resolve(addr).addCallback(self._setInterface)
def _setInterface(self, addr):
i = socket.inet_aton(addr)
self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, i)
return 1
def getLoopbackMode(self):
return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP)
def setLoopbackMode(self, mode):
mode = struct.pack("b", operator.truth(mode))
self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, mode)
def getTTL(self):
return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL)
def setTTL(self, ttl):
ttl = struct.pack("b", ttl)
self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)
def joinGroup(self, addr, interface=""):
"""Join a multicast group. Returns Deferred of success."""
return self.reactor.resolve(addr).addCallback(self._joinAddr1, interface, 1)
def _joinAddr1(self, addr, interface, join):
return self.reactor.resolve(interface).addCallback(self._joinAddr2, addr, join)
def _joinAddr2(self, interface, addr, join):
addr = socket.inet_aton(addr)
interface = socket.inet_aton(interface)
if join:
cmd = socket.IP_ADD_MEMBERSHIP
else:
cmd = socket.IP_DROP_MEMBERSHIP
self.socket.setsockopt(socket.IPPROTO_IP, cmd, addr + interface)
return 1
def leaveGroup(self, addr, interface=""):
"""Leave multicast group, return Deferred of success."""
return self.reactor.resolve(addr).addCallback(self._joinAddr1, interface, 0)
class MulticastPort(MulticastMixin, Port):
"""UDP Port that supports multicasting."""
__implements__ = Port.__implements__, interfaces.IMulticastTransport
class ConnectedMulticastPort(MulticastMixin, ConnectedPort):
"""DEPRECATED.
Connected UDP Port that supports multicasting."""
__implements__ = ConnectedPort.__implements__, interfaces.IMulticastTransport
syntax highlighted by Code2HTML, v. 0.9.1