import socket
from twisted.internet import interfaces, defer, main, error, protocol, address
from twisted.internet.abstract import isIPAddress
from twisted.persisted import styles
from twisted.python import log, failure, reflect
from ops import ReadFileOp, WriteFileOp, WSARecvFromOp, WSASendToOp
from util import StateEventMachineType
class Port(log.Logger, styles.Ephemeral, object):
__metaclass__ = StateEventMachineType
__implements__ = interfaces.IUDPTransport
events = ["startListening", "stopListening", "write", "readDone", "readErr", "writeDone", "writeErr", "connect"]
sockinfo = (socket.AF_INET, socket.SOCK_DGRAM, 0)
read_op_class = WSARecvFromOp
write_op_class = WSASendToOp
reading = False
def __init__(self, bindAddress, proto, maxPacketSize=8192):
assert isinstance(proto, protocol.DatagramProtocol)
self.state = "disconnected"
from twisted.internet import reactor
self.bindAddress = bindAddress
self.protocol = proto
self.maxPacketSize = maxPacketSize
self.logstr = reflect.qual(self.protocol.__class__) + " (UDP)"
self.read_op = self.read_op_class(self)
self.readbuf = reactor.AllocateReadBuffer(maxPacketSize)
self.reactor = reactor
def __repr__(self):
return "<%s on %s>" % (self.protocol.__class__, 'port')
def handle_listening_connect(self, host, port):
self.state = "connecting"
if isIPAddress(host):
return defer.maybeDeferred(self._connectDone, host, port)
else:
d = self.reactor.resolve(host)
d.addCallback(self._connectDone, port)
return d
def _connectDone(self, host, port):
self._connectedAddr = (host, port)
self.state = "connected"
self.socket.connect((host, port))
return self._connectedAddr
def handle_disconnected_startListening(self):
self._bindSocket()
self._connectSocket()
def _bindSocket(self):
log.msg("%s starting on %s" % (self.protocol.__class__, 'port'))
try:
skt = socket.socket(*self.sockinfo)
skt.bind(self.bindAddress)
# print "bound %s to %s" % (skt.fileno(), self.bindAddress)
except socket.error, le:
raise error.CannotListenError, (None, None, le)
self.socket = skt
def _connectSocket(self):
self.protocol.makeConnection(self)
self.startReading()
self.state = "listening"
def startReading(self):
self.reading = True
try:
self.read_op.initiateOp(self.socket.fileno(), self.readbuf)
except WindowsError, we:
log.msg("initiating read failed with args %s" % (we,))
def stopReading(self):
self.reading = False
def handle_listening_readDone(self, bytes, addr = None):
if addr:
self.protocol.datagramReceived(self.readbuf[:bytes], addr)
else:
self.protocol.datagramReceived(self.readbuf[:bytes])
if self.reading:
self.startReading()
handle_connecting_readDone = handle_listening_readDone
handle_connected_readDone = handle_listening_readDone
def handle_listening_readErr(self, ret, bytes):
log.msg("read failed with err %s" % (ret,))
# TODO: use Failures or something
if ret == 1234: # ERROR_PORT_UNREACHABLE
self.protocol.connectionRefused()
if self.reading:
self.startReading()
handle_connecting_readErr = handle_listening_readErr
handle_connected_readErr = handle_listening_readErr
def handle_disconnected_readErr(self, ret, bytes):
pass # no kicking the dead horse
def handle_disconnected_readDone(self, bytes, addr = None):
pass # no kicking the dead horse
def handle_listening_write(self, data, addr):
self.performWrite(data, addr)
def handle_connected_write(self, data, addr = None):
assert addr in (None, self._connectedAddr)
self.performWrite(data, addr)
def performWrite(self, data, addr = None):
# print "performing write on", data, addr
self.writing = True
try:
write_op = self.write_op_class(self)
if not addr:
addr = self._connectedAddr
write_op.initiateOp(self.socket.fileno(), data, addr)
# print "initiating write_op to", addr
except WindowsError, we:
log.msg("initiating write failed with args %s" % (we,))
def handle_listening_writeDone(self, bytes):
log.msg("write success with bytes %s" % (bytes,))
# self.callBufferHandlers(event = "buffer empty")
handle_connecting_writeDone = handle_listening_writeDone
handle_connected_writeDone = handle_listening_writeDone
def handle_listening_writeErr(self, ret, bytes):
log.msg("write failed with err %s" % (ret,))
self.connectionLost()
handle_connecting_writeErr = handle_listening_writeErr
handle_connected_writeErr = handle_listening_writeErr
def handle_disconnected_writeErr(self, ret, bytes):
pass # no kicking the dead horse
def handle_disconnected_writeDone(self, bytes):
pass # no kicking the dead horse
def writeSequence(self, seq, addr):
self.write("".join(seq), addr)
def handle_listening_stopListening(self):
self.stopReading()
self.connectionLost()
handle_connecting_stopListening = handle_listening_stopListening
handle_connected_stopListening = handle_listening_stopListening
def connectionLost(self, reason=None):
log.msg('(Port %r Closed)' % ('port',))
self.protocol.doStop()
self.socket.close()
del self.socket
self.state = "disconnected"
def logPrefix(self):
return self.logstr
def getHost(self):
return address.IPv4Address('UDP', *(self.socket.getsockname() + ('INET_UDP',)))
syntax highlighted by Code2HTML, v. 0.9.1