# Twisted, the Framework of Your Internet
# Copyright (C) 2001 Matthew W. Lefkowitz
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
from twisted.trial import unittest
from twisted.web import server, static, client, error, util, resource
from twisted.internet import reactor, defer, interfaces
from twisted.python.util import sibpath
from twisted.python import components
try:
from twisted.internet import ssl
except:
ssl = None
import os
serverCallID = None
class LongTimeTakingResource(resource.Resource):
def render(self, request):
global serverCallID
serverCallID = reactor.callLater(1, self.writeIt, request)
return server.NOT_DONE_YET
def writeIt(self, request):
request.write("hello!!!")
request.finish()
class CookieMirrorResource(resource.Resource):
def render(self, request):
l = []
for k,v in request.received_cookies.items():
l.append((k, v))
l.sort()
return repr(l)
class RawCookieMirrorResource(resource.Resource):
def render(self, request):
return repr(request.getHeader('cookie'))
class ErrorResource(resource.Resource):
def render(self, request):
request.setResponseCode(401)
if request.args.get("showlength"):
request.setHeader("content-length", "0")
return ""
class NoLengthResource(resource.Resource):
def render(self, request):
return "nolength"
class HostHeaderResource(resource.Resource):
def render(self, request):
return request.received_headers["host"]
class PayloadResource(resource.Resource):
def render(self, request):
data = request.content.read()
if len(data) != 100 or int(request.received_headers["content-length"]) != 100:
return "ERROR"
return data
class WebClientTestCase(unittest.TestCase):
def _listen(self, site):
return reactor.listenTCP(0, site, interface="127.0.0.1")
def setUp(self):
name = str(id(self)) + "_webclient"
if not os.path.exists(name):
os.mkdir(name)
f = open(os.path.join(name, "file"), "wb")
f.write("0123456789")
f.close()
r = static.File(name)
r.putChild("redirect", util.Redirect("/file"))
r.putChild("wait", LongTimeTakingResource())
r.putChild("error", ErrorResource())
r.putChild("nolength", NoLengthResource())
r.putChild("host", HostHeaderResource())
r.putChild("payload", PayloadResource())
site = server.Site(r, timeout=None)
self.port = self._listen(site)
reactor.iterate(); reactor.iterate()
self.portno = self.port.getHost()[2]
def tearDown(self):
if serverCallID and serverCallID.active():
serverCallID.cancel()
self.port.stopListening()
reactor.iterate(); reactor.iterate();
del self.port
def getURL(self, path):
return "http://127.0.0.1:%d/%s" % (self.portno, path)
def testPayload(self):
s = "0123456789" * 10
self.assertEquals(unittest.deferredResult(client.getPage(self.getURL("payload"), postdata=s)),
s)
def testHostHeader(self):
# if we pass Host header explicitly, it should be used, otherwise
# it should extract from url
self.assertEquals(unittest.deferredResult(client.getPage(self.getURL("host"))),
"127.0.0.1")
self.assertEquals(unittest.deferredResult(client.getPage(self.getURL("host"),
headers={"Host": "www.example.com"})),
"www.example.com")
def testGetPage(self):
self.assertEquals(unittest.deferredResult(client.getPage(self.getURL("file"))),
"0123456789")
def testTimeout(self):
r = unittest.deferredResult(client.getPage(self.getURL("wait"), timeout=1.5))
self.assertEquals(r, 'hello!!!')
f = unittest.deferredError(client.getPage(self.getURL("wait"), timeout=0.5))
f.trap(defer.TimeoutError)
def testDownloadPage(self):
name = self.mktemp()
r = unittest.deferredResult(client.downloadPage(self.getURL("file"), name))
self.assertEquals(open(name, "rb").read(), "0123456789")
name = self.mktemp()
r = unittest.deferredResult(client.downloadPage(self.getURL("nolength"), name))
self.assertEquals(open(name, "rb").read(), "nolength")
def testDownloadPageError1(self):
class errorfile:
def write(self, data):
raise IOError, "badness happened during write"
def close(self):
pass
ef = errorfile()
d = client.downloadPage(self.getURL("file"), ef)
f = unittest.deferredError(d)
self.failUnless(f.check(IOError))
def testDownloadPageError2(self):
class errorfile:
def write(self, data):
pass
def close(self):
raise IOError, "badness happened during close"
ef = errorfile()
d = client.downloadPage(self.getURL("file"), ef)
f = unittest.deferredError(d)
self.failUnless(f.check(IOError))
def testDownloadPageError3(self):
# make sure failures in open() are caught too. This is tricky.
# Might only work on posix.
tmpfile = open("unwritable", "wb")
tmpfile.close()
os.chmod("unwritable", 0) # make it unwritable (to us)
d = client.downloadPage(self.getURL("file"), "unwritable")
f = unittest.deferredError(d)
self.failUnless(f.check(IOError))
os.chmod("unwritable", 0700)
os.unlink("unwritable")
def testServerError(self):
f = unittest.deferredError(client.getPage(self.getURL("nosuchfile")))
f.trap(error.Error)
self.assertEquals(f.value.args[0], "404")
f = unittest.deferredError(client.getPage(self.getURL("error")))
f.trap(error.Error)
self.assertEquals(f.value.args[0], "401")
f = unittest.deferredError(client.getPage(self.getURL("error?showlength=1")))
f.trap(error.Error)
self.assertEquals(f.value.args[0], "401")
def testDownloadServerError(self):
f = unittest.deferredError(client.downloadPage(self.getURL("nosuchfile"), "nosuchfile"))
f.trap(error.Error)
self.assertEquals(f.value.args[0], "404")
# this is different since content length is 0, and HTTPClient SUCKS
f = unittest.deferredError(client.downloadPage(self.getURL("error"), "error"))
f.trap(error.Error)
self.assertEquals(f.value.args[0], "401")
f = unittest.deferredError(client.downloadPage(self.getURL("error?showlength=1"), "error"))
f.trap(error.Error)
self.assertEquals(f.value.args[0], "401")
def testFactoryInfo(self):
url = self.getURL('file')
scheme, host, port, path = client._parse(url)
factory = client.HTTPClientFactory(url)
reactor.connectTCP(host, port, factory)
unittest.deferredResult(factory.deferred)
self.assertEquals(factory.status, '200')
self.assert_(factory.version.startswith('HTTP/'))
self.assertEquals(factory.message, 'OK')
self.assertEquals(factory.response_headers['content-length'][0], '10')
def testRedirect(self):
self.assertEquals("0123456789",
unittest.deferredResult(client.getPage(self.getURL("redirect"))))
f = unittest.deferredError(client.getPage(self.getURL("redirect"),
followRedirect = 0))
f.trap(error.PageRedirect)
self.assertEquals(f.value.location, "/file")
def testPartial(self):
name = self.mktemp()
f = open(name, "wb")
f.write("abcd")
f.close()
r = unittest.deferredResult(client.downloadPage(self.getURL("file"), name,
supportPartial=1))
self.assertEquals(open(name, "rb").read(), "abcd456789")
r = unittest.deferredResult(client.downloadPage(self.getURL("file"), name,
supportPartial=1))
self.assertEquals(open(name, "rb").read(), "abcd456789")
r = unittest.deferredResult(client.downloadPage(self.getURL("file"), name))
self.assertEquals(open(name, "rb").read(), "0123456789")
class WebClientSSLTestCase(WebClientTestCase):
def _listen(self, site):
return reactor.listenSSL(0, site,
contextFactory=ssl.DefaultOpenSSLContextFactory(
sibpath(__file__, 'server.pem'),
sibpath(__file__, 'server.pem'),
),
interface="127.0.0.1")
def getURL(self, path):
return "https://127.0.0.1:%d/%s" % (self.portno, path)
def testFactoryInfo(self):
url = self.getURL('file')
scheme, host, port, path = client._parse(url)
factory = client.HTTPClientFactory(url)
reactor.connectSSL(host, port, factory, ssl.ClientContextFactory())
unittest.deferredResult(factory.deferred)
self.assertEquals(factory.status, '200')
self.assert_(factory.version.startswith('HTTP/'))
self.assertEquals(factory.message, 'OK')
self.assertEquals(factory.response_headers['content-length'][0], '10')
class WebClientRedirectBetweenSSLandPlainText(unittest.TestCase):
def getHTTPS(self, path):
return "https://127.0.0.1:%d/%s" % (self.tlsPortno, path)
def getHTTP(self, path):
return "http://127.0.0.1:%d/%s" % (self.plainPortno, path)
def setUp(self):
plainRoot = static.Data('not me', 'text/plain')
tlsRoot = static.Data('me neither', 'text/plain')
plainSite = server.Site(plainRoot, timeout=None)
tlsSite = server.Site(tlsRoot, timeout=None)
self.tlsPort = reactor.listenSSL(0, tlsSite,
contextFactory=ssl.DefaultOpenSSLContextFactory(
sibpath(__file__, 'server.pem'),
sibpath(__file__, 'server.pem'),
),
interface="127.0.0.1")
self.plainPort = reactor.listenTCP(0, plainSite, interface="127.0.0.1")
reactor.iterate(); reactor.iterate()
self.plainPortno = self.plainPort.getHost()[2]
self.tlsPortno = self.tlsPort.getHost()[2]
plainRoot.putChild('one', util.Redirect(self.getHTTPS('two')))
tlsRoot.putChild('two', util.Redirect(self.getHTTP('three')))
plainRoot.putChild('three', util.Redirect(self.getHTTPS('four')))
tlsRoot.putChild('four', static.Data('FOUND IT!', 'text/plain'))
def tearDown(self):
self.plainPort.stopListening()
self.tlsPort.stopListening()
reactor.iterate(); reactor.iterate();
del self.plainPort
del self.tlsPort
def testHoppingAround(self):
self.assertEquals(unittest.deferredResult(client.getPage(self.getHTTP("one"))),
"FOUND IT!")
class FakeTransport:
disconnecting = False
def __init__(self):
self.data = []
def write(self, stuff):
self.data.append(stuff)
class CookieTestCase(unittest.TestCase):
def _listen(self, site):
return reactor.listenTCP(0, site, interface="127.0.0.1")
def setUp(self):
root = static.Data('El toro!', 'text/plain')
root.putChild("cookiemirror", CookieMirrorResource())
root.putChild("rawcookiemirror", RawCookieMirrorResource())
site = server.Site(root, timeout=None)
self.port = self._listen(site)
reactor.iterate(); reactor.iterate()
self.portno = self.port.getHost()[2]
def tearDown(self):
self.port.stopListening()
reactor.iterate(); reactor.iterate();
del self.port
def getHTTP(self, path):
return "http://127.0.0.1:%d/%s" % (self.portno, path)
def testNoCookies(self):
self.assertEquals(unittest.deferredResult(client.getPage(self.getHTTP("cookiemirror"))),
"[]")
def testSomeCookies(self):
self.assertEquals(unittest.deferredResult(client.getPage(self.getHTTP("cookiemirror"),
cookies={'foo': 'bar',
'baz': 'quux'})),
"[('baz', 'quux'), ('foo', 'bar')]")
def testRawNoCookies(self):
self.assertEquals(unittest.deferredResult(client.getPage(self.getHTTP("rawcookiemirror"))),
"None")
def testRawSomeCookies(self):
self.assertEquals(unittest.deferredResult(client.getPage(self.getHTTP("rawcookiemirror"),
cookies={'foo': 'bar',
'baz': 'quux'})),
"'foo=bar; baz=quux'")
def testCookieHeaderParsing(self):
d = defer.Deferred()
factory = client.HTTPClientFactory('http://foo.example.com/')
proto = factory.buildProtocol('127.42.42.42')
proto.transport = FakeTransport()
proto.connectionMade()
for line in [
'200 Ok',
'Squash: yes',
'Hands: stolen',
'Set-Cookie: CUSTOMER=WILE_E_COYOTE; path=/; expires=Wednesday, 09-Nov-99 23:12:40 GMT',
'Set-Cookie: PART_NUMBER=ROCKET_LAUNCHER_0001; path=/',
'Set-Cookie: SHIPPING=FEDEX; path=/foo',
'',
'body',
'more body',
]:
proto.dataReceived(line + '\r\n')
self.assertEquals(proto.transport.data,
['GET / HTTP/1.0\r\n',
'Host: foo.example.com\r\n',
'User-Agent: Twisted PageGetter\r\n',
'\r\n'])
self.assertEquals(factory.cookies,
{
'CUSTOMER': 'WILE_E_COYOTE',
'PART_NUMBER': 'ROCKET_LAUNCHER_0001',
'SHIPPING': 'FEDEX',
})
if ssl is None or not hasattr(ssl, 'DefaultOpenSSLContextFactory'):
for case in [WebClientSSLTestCase, WebClientRedirectBetweenSSLandPlainText]:
case.skip = "OpenSSL not present"
if not components.implements(reactor, interfaces.IReactorSSL):
for case in [WebClientSSLTestCase, WebClientRedirectBetweenSSLandPlainText]:
case.skip = "Reactor doesn't support SSL"
syntax highlighted by Code2HTML, v. 0.9.1