# Twisted, the Framework of Your Internet
# Copyright (C) 2001 Matthew W. Lefkowitz
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Test cases for twisted.names.
"""
from __future__ import nested_scopes
import sys, socket, operator, copy
from twisted.trial import unittest
from twisted.trial.util import deferredResult as dR
from twisted.trial.util import wait
from twisted.internet import reactor, protocol, defer, error
from twisted.names import client, server, common, authority, hosts
from twisted.protocols import dns
from twisted.python import log, failure
# IPv6 support is spotty at best!
try:
socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
except:
IPv6 = False
else:
IPv6 = True
def justPayload(results):
return [r.payload for r in results[0]]
class NoFileAuthority(authority.FileAuthority):
def __init__(self, soa, records):
# Yes, skip FileAuthority
common.ResolverBase.__init__(self)
self.soa, self.records = soa, records
soa_record = dns.Record_SOA(
mname = 'test-domain.com',
rname = 'root.test-domain.com',
serial = 100,
refresh = 1234,
minimum = 7654,
expire = 19283784,
retry = 15,
ttl=1
)
reverse_soa = dns.Record_SOA(
mname = '93.84.28.in-addr.arpa',
rname = '93.84.28.in-addr.arpa',
serial = 120,
refresh = 54321,
minimum = 382,
expire = 11193983,
retry = 30,
ttl=3
)
my_soa = dns.Record_SOA(
mname = 'my-domain.com',
rname = 'postmaster.test-domain.com',
serial = 130,
refresh = 12345,
minimum = 1,
expire = 999999,
retry = 100,
)
test_domain_com = NoFileAuthority(
soa = ('test-domain.com', soa_record),
records = {
'test-domain.com': [
soa_record,
dns.Record_A('127.0.0.1'),
dns.Record_NS('39.28.189.39'),
dns.Record_MX(10, 'host.test-domain.com'),
dns.Record_HINFO(os='Linux', cpu='A Fast One, Dontcha know'),
dns.Record_CNAME('canonical.name.com'),
dns.Record_MB('mailbox.test-domain.com'),
dns.Record_MG('mail.group.someplace'),
dns.Record_TXT('A First piece of Text', 'a SecoNd piece')
] + (IPv6 and [
dns.Record_A6(0, 'ABCD::4321', ''),
dns.Record_A6(12, '0:0069::0', 'some.network.tld'),
dns.Record_A6(8, '0:5634:1294:AFCB:56AC:48EF:34C3:01FF', 'tra.la.la.net')
] or []) + [
dns.Record_TXT('Some more text, haha! Yes. \0 Still here?'),
dns.Record_MR('mail.redirect.or.whatever'),
dns.Record_MINFO(rmailbx='r mail box', emailbx='e mail box'),
dns.Record_AFSDB(subtype=1, hostname='afsdb.test-domain.com'),
dns.Record_RP(mbox='whatever.i.dunno', txt='some.more.text'),
dns.Record_WKS('12.54.78.12', socket.IPPROTO_TCP, '\x12\x01\x16\xfe\xc1\x00\x01'),
] + (IPv6 and [dns.Record_AAAA('AF43:5634:1294:AFCB:56AC:48EF:34C3:01FF')] or []),
'http.tcp.test-domain.com': [
dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool')
],
'host.test-domain.com': [
dns.Record_A('123.242.1.5'),
dns.Record_A('0.255.0.255'),
],
'host-two.test-domain.com': [
#
# Python bug
# dns.Record_A('255.255.255.255'),
#
dns.Record_A('255.255.255.254'),
dns.Record_A('0.0.0.0')
],
}
)
reverse_domain = NoFileAuthority(
soa = ('93.84.28.in-addr.arpa', reverse_soa),
records = {
'123.93.84.28.in-addr.arpa': [
dns.Record_PTR('test.host-reverse.lookup.com'),
reverse_soa
]
}
)
my_domain_com = NoFileAuthority(
soa = ('my-domain.com', my_soa),
records = {
'my-domain.com': [
my_soa,
dns.Record_A('1.2.3.4', ttl='1S'),
dns.Record_NS('ns1.domain', ttl='2M'),
dns.Record_NS('ns2.domain', ttl='3H'),
dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool', ttl='4D')
]
}
)
class ServerDNSTestCase(unittest.TestCase):
"""Test cases for DNS server and client."""
def setUp(self):
self.factory = server.DNSServerFactory([
test_domain_com, reverse_domain, my_domain_com
], verbose=2)
p = dns.DNSDatagramProtocol(self.factory)
while 1:
self.listenerTCP = reactor.listenTCP(0, self.factory, interface="127.0.0.1")
port = self.listenerTCP.getHost()[2]
try:
self.listenerUDP = reactor.listenUDP(port, p, interface="127.0.0.1")
except error.CannotListenError:
self.listenerTCP.stopListening()
else:
break
self.resolver = client.Resolver(servers=[('127.0.0.1', port)])
def tearDown(self):
self.listenerTCP.stopListening()
self.listenerUDP.stopListening()
def namesTest(self, d, r):
self.response = None
def setDone(response):
self.response = response
d.addBoth(setDone)
while not self.response:
reactor.iterate(0.1)
if isinstance(self.response, failure.Failure):
raise self.response
results = justPayload(self.response)
assert len(results) == len(r), "%s != %s" % (map(str, results), map(str, r))
for rec in results:
assert rec in r, "%s not in %s" % (rec, map(str, r))
def testAddressRecord1(self):
"""Test simple DNS 'A' record queries"""
self.namesTest(
self.resolver.lookupAddress('test-domain.com'),
[dns.Record_A('127.0.0.1', ttl=19283784)]
)
def testAddressRecord2(self):
"""Test DNS 'A' record queries with multiple answers"""
self.namesTest(
self.resolver.lookupAddress('host.test-domain.com'),
[dns.Record_A('123.242.1.5', ttl=19283784), dns.Record_A('0.255.0.255', ttl=19283784)]
)
def testAdressRecord3(self):
"""Test DNS 'A' record queries with edge cases"""
self.namesTest(
self.resolver.lookupAddress('host-two.test-domain.com'),
[dns.Record_A('255.255.255.254', ttl=19283784), dns.Record_A('0.0.0.0', ttl=19283784)]
)
def testAuthority(self):
"""Test DNS 'SOA' record queries"""
self.namesTest(
self.resolver.lookupAuthority('test-domain.com'),
[soa_record]
)
def testMailExchangeRecord(self):
"""Test DNS 'MX' record queries"""
self.namesTest(
self.resolver.lookupMailExchange('test-domain.com'),
[dns.Record_MX(10, 'host.test-domain.com', ttl=19283784)]
)
def testNameserver(self):
"""Test DNS 'NS' record queries"""
self.namesTest(
self.resolver.lookupNameservers('test-domain.com'),
[dns.Record_NS('39.28.189.39', ttl=19283784)]
)
def testHINFO(self):
"""Test DNS 'HINFO' record queries"""
self.namesTest(
self.resolver.lookupHostInfo('test-domain.com'),
[dns.Record_HINFO(os='Linux', cpu='A Fast One, Dontcha know', ttl=19283784)]
)
def testPTR(self):
"""Test DNS 'PTR' record queries"""
self.namesTest(
self.resolver.lookupPointer('123.93.84.28.in-addr.arpa'),
[dns.Record_PTR('test.host-reverse.lookup.com', ttl=11193983)]
)
def testCNAME(self):
"""Test DNS 'CNAME' record queries"""
self.namesTest(
self.resolver.lookupCanonicalName('test-domain.com'),
[dns.Record_CNAME('canonical.name.com', ttl=19283784)]
)
def testMB(self):
"""Test DNS 'MB' record queries"""
self.namesTest(
self.resolver.lookupMailBox('test-domain.com'),
[dns.Record_MB('mailbox.test-domain.com', ttl=19283784)]
)
def testMG(self):
"""Test DNS 'MG' record queries"""
self.namesTest(
self.resolver.lookupMailGroup('test-domain.com'),
[dns.Record_MG('mail.group.someplace', ttl=19283784)]
)
def testMR(self):
"""Test DNS 'MR' record queries"""
self.namesTest(
self.resolver.lookupMailRename('test-domain.com'),
[dns.Record_MR('mail.redirect.or.whatever', ttl=19283784)]
)
def testMINFO(self):
"""Test DNS 'MINFO' record queries"""
self.namesTest(
self.resolver.lookupMailboxInfo('test-domain.com'),
[dns.Record_MINFO(rmailbx='r mail box', emailbx='e mail box', ttl=19283784)]
)
def testSRV(self):
"""Test DNS 'SRV' record queries"""
self.namesTest(
self.resolver.lookupService('http.tcp.test-domain.com'),
[dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool', ttl=19283784)]
)
def testAFSDB(self):
"""Test DNS 'AFSDB' record queries"""
self.namesTest(
self.resolver.lookupAFSDatabase('test-domain.com'),
[dns.Record_AFSDB(subtype=1, hostname='afsdb.test-domain.com', ttl=19283784)]
)
def testRP(self):
"""Test DNS 'RP' record queries"""
self.namesTest(
self.resolver.lookupResponsibility('test-domain.com'),
[dns.Record_RP(mbox='whatever.i.dunno', txt='some.more.text', ttl=19283784)]
)
def testTXT(self):
"""Test DNS 'TXT' record queries"""
self.namesTest(
self.resolver.lookupText('test-domain.com'),
[dns.Record_TXT('A First piece of Text', 'a SecoNd piece', ttl=19283784),
dns.Record_TXT('Some more text, haha! Yes. \0 Still here?', ttl=19283784)]
)
def testWKS(self):
"""Test DNS 'WKS' record queries"""
self.namesTest(
self.resolver.lookupWellKnownServices('test-domain.com'),
[dns.Record_WKS('12.54.78.12', socket.IPPROTO_TCP, '\x12\x01\x16\xfe\xc1\x00\x01', ttl=19283784)]
)
def testSomeRecordsWithTTLs(self):
result_soa = copy.copy(my_soa)
result_soa.ttl = my_soa.expire
self.namesTest(
self.resolver.lookupAllRecords('my-domain.com'),
[result_soa,
dns.Record_A('1.2.3.4', ttl='1S'),
dns.Record_NS('ns1.domain', ttl='2M'),
dns.Record_NS('ns2.domain', ttl='3H'),
dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool', ttl='4D')]
)
if IPv6:
def testAAAA(self):
"""Test DNS 'AAAA' record queries (IPv6)"""
self.namesTest(
self.resolver.lookupIPV6Address('test-domain.com'),
[dns.Record_AAAA('AF43:5634:1294:AFCB:56AC:48EF:34C3:01FF', ttl=19283784)]
)
def testA6(self):
"""Test DNS 'A6' record queries (IPv6)"""
self.namesTest(
self.resolver.lookupAddress6('test-domain.com'),
[dns.Record_A6(0, 'ABCD::4321', '', ttl=19283784),
dns.Record_A6(12, '0:0069::0', 'some.network.tld', ttl=19283784),
dns.Record_A6(8, '0:5634:1294:AFCB:56AC:48EF:34C3:01FF', 'tra.la.la.net', ttl=19283784)]
)
def testZoneTransfer(self):
"""Test DNS 'AXFR' queries (Zone transfer)"""
default_ttl = soa_record.expire
results = [copy.copy(r) for r in reduce(operator.add, test_domain_com.records.values())]
for r in results:
if r.ttl is None:
r.ttl = default_ttl
self.namesTest(
self.resolver.lookupZone('test-domain.com').addCallback(lambda r: (r[0][:-1],)),
results
)
class HelperTestCase(unittest.TestCase):
def testSerialGenerator(self):
f = self.mktemp()
a = authority.getSerial(f)
for i in range(20):
b = authority.getSerial(f)
self.failUnless(a < b)
a = b
class AXFRTest(unittest.TestCase):
def setUp(self):
self.results = None
self.d = defer.Deferred()
self.d.addCallback(self._gotResults)
self.controller = client.AXFRController('fooby.com', self.d)
self.soa = dns.RRHeader(name='fooby.com', type=dns.SOA, cls=dns.IN, ttl=86400, auth=False,
payload=dns.Record_SOA(mname='fooby.com',
rname='hooj.fooby.com',
serial=100,
refresh=200,
retry=300,
expire=400,
minimum=500,
ttl=600))
self.records = [
self.soa,
dns.RRHeader(name='fooby.com', type=dns.NS, cls=dns.IN, ttl=700, auth=False,
payload=dns.Record_NS(name='ns.twistedmatrix.com', ttl=700)),
dns.RRHeader(name='fooby.com', type=dns.MX, cls=dns.IN, ttl=700, auth=False,
payload=dns.Record_MX(preference=10, exchange='mail.mv3d.com', ttl=700)),
dns.RRHeader(name='fooby.com', type=dns.A, cls=dns.IN, ttl=700, auth=False,
payload=dns.Record_A(address='64.123.27.105', ttl=700)),
self.soa
]
def _makeMessage(self):
# hooray they all have the same message format
return dns.Message(id=999, answer=1, opCode=0, recDes=0, recAv=1, auth=1, rCode=0, trunc=0, maxSize=0)
def testBindAndTNamesStyle(self):
# Bind style = One big single message
m = self._makeMessage()
m.queries = [dns.Query('fooby.com', dns.AXFR, dns.IN)]
m.answers = self.records
self.controller.messageReceived(m, None)
self.assertEquals(self.results, self.records)
def _gotResults(self, result):
self.results = result
def testDJBStyle(self):
# DJB style = message per record
records = self.records[:]
while records:
m = self._makeMessage()
m.queries = [] # DJB *doesn't* specify any queries.. hmm..
m.answers = [records.pop(0)]
self.controller.messageReceived(m, None)
self.assertEquals(self.results, self.records)
class HostsTestCase(unittest.TestCase):
def setUp(self):
f = open('EtcHosts', 'w')
f.write('''
1.1.1.1 EXAMPLE EXAMPLE.EXAMPLETHING
1.1.1.2 HOOJY
::1 ip6thingy
''')
f.close()
self.resolver = hosts.Resolver('EtcHosts')
def testGetHostByName(self):
data = [('EXAMPLE', '1.1.1.1'),
('EXAMPLE.EXAMPLETHING', '1.1.1.1'),
('HOOJY', '1.1.1.2'),
]
for name, ip in data:
self.assertEquals(
wait(self.resolver.getHostByName(name)),
ip)
def testLookupAddress(self):
stuff = wait(self.resolver.lookupAddress('HOOJY'))
self.assertEquals(stuff[0][0].payload.dottedQuad(), '1.1.1.2')
def testIPv6(self):
self.assertEquals(
wait(self.resolver.lookupIPV6Address('ip6thingy')),
'::1') #???
testIPv6.skip = 'IPv6 support is not in our hosts resolver yet'
def testNotImplemented(self):
self.assertRaises(
NotImplementedError,
lambda: wait(self.resolver.lookupMailExchange('EXAMPLE')))
def testQuery(self):
self.assertEquals(
wait(self.resolver.query(dns.Query('EXAMPLE')))[0][0].payload.dottedQuad(),
'1.1.1.1')
def testNotFound(self):
self.assertRaises(
dns.DomainError,
wait, self.resolver.lookupAddress('foueoa'))
syntax highlighted by Code2HTML, v. 0.9.1