# Twisted, the Framework of Your Internet # Copyright (C) 2001 Matthew W. Lefkowitz # # This library is free software; you can redistribute it and/or # modify it under the terms of version 2.1 of the GNU Lesser General Public # License as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-7 USA """ Test case for twisted.protocols.imap4 """ from __future__ import nested_scopes try: from cStringIO import StringIO except ImportError: from StringIO import StringIO import os import sys import types from twisted.protocols.imap4 import MessageSet from twisted.protocols import imap4 from twisted.protocols import smtp from twisted.protocols import loopback from twisted.internet import defer from twisted.trial import unittest from twisted.python import util from twisted.python import components from twisted.python.util import sibpath from twisted.python import failure from twisted import cred import twisted.cred.error import twisted.cred.checkers import twisted.cred.credentials import twisted.cred.portal from twisted.test.proto_helpers import StringTransport try: from ssl_helpers import ClientTLSContext, ServerTLSContext except ImportError: ClientTLSContext = ServerTLSContext = None def strip(f): return lambda result, f=f: f() def sortNest(l): l = l[:] l.sort() for i in range(len(l)): if isinstance(l[i], types.ListType): l[i] = sortNest(l[i]) elif isinstance(l[i], types.TupleType): l[i] = tuple(sortNest(list(l[i]))) return l class IMAP4UTF7TestCase(unittest.TestCase): tests = [ ['Hello world', 'Hello world'], ['Hello & world', 'Hello &- world'], ['Hello\xffworld', 'Hello&,w-world'], ['\xff\xfe\xfd\xfc', '&,,79,A-'], ] def testEncode(self): for (input, output) in self.tests: self.assertEquals(input.encode('imap4-utf-7'), output) def testDecode(self): for (input, output) in self.tests: # XXX - Piece of *crap* 2.1 self.assertEquals(input, imap4.decoder(output)[0]) def testPrintableSingletons(self): # All printables represent themselves for o in range(0x20, 0x26) + range(0x27, 0x7f): self.failUnlessEqual(chr(o), chr(o).encode('imap4-utf-7')) self.failUnlessEqual(chr(o), chr(o).decode('imap4-utf-7')) self.failUnlessEqual('&'.encode('imap4-utf-7'), '&-') self.failUnlessEqual('&-'.decode('imap4-utf-7'), '&') class BufferingConsumer: def __init__(self): self.buffer = [] def write(self, bytes): self.buffer.append(bytes) if self.consumer: self.consumer.resumeProducing() def registerProducer(self, consumer, streaming): self.consumer = consumer self.consumer.resumeProducing() def unregisterProducer(self): self.consumer = None class MessageProducerTestCase(unittest.TestCase): def testSinglePart(self): body = 'This is body text. Rar.' headers = util.OrderedDict() headers['from'] = 'sender@host' headers['to'] = 'recipient@domain' headers['subject'] = 'booga booga boo' headers['content-type'] = 'text/plain' msg = FakeyMessage(headers, (), None, body, 123, None ) c = BufferingConsumer() p = imap4.MessageProducer(msg) r = unittest.deferredResult(p.beginProducing(c)) self.failUnlessIdentical(r, p) self.assertEquals(''.join(c.buffer), '{119}\r\n' 'From: sender@host\r\n' 'To: recipient@domain\r\n' 'Subject: booga booga boo\r\n' 'Content-Type: text/plain\r\n' '\r\n' + body ) def testSingleMultiPart(self): outerBody = '' innerBody = 'Contained body message text. Squarge.' headers = util.OrderedDict() headers['from'] = 'sender@host' headers['to'] = 'recipient@domain' headers['subject'] = 'booga booga boo' headers['content-type'] = 'multipart/alternative; boundary="xyz"' innerHeaders = util.OrderedDict() innerHeaders['subject'] = 'this is subject text' innerHeaders['content-type'] = 'text/plain' msg = FakeyMessage(headers, (), None, outerBody, 123, [FakeyMessage(innerHeaders, (), None, innerBody, None, None)], ) c = BufferingConsumer() p = imap4.MessageProducer(msg) r = unittest.deferredResult(p.beginProducing(c)) self.failUnlessIdentical(r, p) self.assertEquals(''.join(c.buffer), '{239}\r\n' 'From: sender@host\r\n' 'To: recipient@domain\r\n' 'Subject: booga booga boo\r\n' 'Content-Type: multipart/alternative; boundary="xyz"\r\n' '\r\n' '\r\n' '--xyz\r\n' 'Subject: this is subject text\r\n' 'Content-Type: text/plain\r\n' '\r\n' + innerBody + '\r\n--xyz--\r\n' ) def testMultipleMultiPart(self): outerBody = '' innerBody1 = 'Contained body message text. Squarge.' innerBody2 = 'Secondary message text of squarge body.' headers = util.OrderedDict() headers['from'] = 'sender@host' headers['to'] = 'recipient@domain' headers['subject'] = 'booga booga boo' headers['content-type'] = 'multipart/alternative; boundary="xyz"' innerHeaders = util.OrderedDict() innerHeaders['subject'] = 'this is subject text' innerHeaders['content-type'] = 'text/plain' innerHeaders2 = util.OrderedDict() innerHeaders2['subject'] = 'this is subject' innerHeaders2['content-type'] = 'text/html' msg = FakeyMessage(headers, (), None, outerBody, 123, [ FakeyMessage(innerHeaders, (), None, innerBody1, None, None), FakeyMessage(innerHeaders2, (), None, innerBody2, None, None) ], ) c = BufferingConsumer() p = imap4.MessageProducer(msg) r = unittest.deferredResult(p.beginProducing(c)) self.failUnlessIdentical(r, p) self.assertEquals(''.join(c.buffer), '{354}\r\n' 'From: sender@host\r\n' 'To: recipient@domain\r\n' 'Subject: booga booga boo\r\n' 'Content-Type: multipart/alternative; boundary="xyz"\r\n' '\r\n' '\r\n' '--xyz\r\n' 'Subject: this is subject text\r\n' 'Content-Type: text/plain\r\n' '\r\n' + innerBody1 + '\r\n--xyz\r\n' 'Subject: this is subject\r\n' 'Content-Type: text/html\r\n' '\r\n' + innerBody2 + '\r\n--xyz--\r\n' ) class IMAP4HelperTestCase(unittest.TestCase): def testFileProducer(self): b = (('x' * 1) + ('y' * 1) + ('z' * 1)) * 10 c = BufferingConsumer() f = StringIO(b) p = imap4.FileProducer(f) r = unittest.deferredResult(p.beginProducing(c)) self.failUnlessIdentical(r, p) self.assertEquals(('{%d}\r\n' % len(b))+ b, ''.join(c.buffer)) def testWildcard(self): cases = [ ['foo/%gum/bar', ['foo/bar', 'oo/lalagum/bar', 'foo/gumx/bar', 'foo/gum/baz'], ['foo/xgum/bar', 'foo/gum/bar'], ], ['foo/x%x/bar', ['foo', 'bar', 'fuz fuz fuz', 'foo/*/bar', 'foo/xyz/bar', 'foo/xx/baz'], ['foo/xyx/bar', 'foo/xx/bar', 'foo/xxxxxxxxxxxxxx/bar'], ], ['foo/xyz*abc/bar', ['foo/xyz/bar', 'foo/abc/bar', 'foo/xyzab/cbar', 'foo/xyza/bcbar'], ['foo/xyzabc/bar', 'foo/xyz/abc/bar', 'foo/xyz/123/abc/bar'], ] ] for (wildcard, fail, succeed) in cases: wildcard = imap4.wildcardToRegexp(wildcard, '/') for x in fail: self.failIf(wildcard.match(x)) for x in succeed: self.failUnless(wildcard.match(x)) def testWildcardNoDelim(self): cases = [ ['foo/%gum/bar', ['foo/bar', 'oo/lalagum/bar', 'foo/gumx/bar', 'foo/gum/baz'], ['foo/xgum/bar', 'foo/gum/bar', 'foo/x/gum/bar'], ], ['foo/x%x/bar', ['foo', 'bar', 'fuz fuz fuz', 'foo/*/bar', 'foo/xyz/bar', 'foo/xx/baz'], ['foo/xyx/bar', 'foo/xx/bar', 'foo/xxxxxxxxxxxxxx/bar', 'foo/x/x/bar'], ], ['foo/xyz*abc/bar', ['foo/xyz/bar', 'foo/abc/bar', 'foo/xyzab/cbar', 'foo/xyza/bcbar'], ['foo/xyzabc/bar', 'foo/xyz/abc/bar', 'foo/xyz/123/abc/bar'], ] ] for (wildcard, fail, succeed) in cases: wildcard = imap4.wildcardToRegexp(wildcard, None) for x in fail: self.failIf(wildcard.match(x), x) for x in succeed: self.failUnless(wildcard.match(x), x) def testHeaderFormatter(self): cases = [ ({'Header1': 'Value1', 'Header2': 'Value2'}, 'Header2: Value2\r\nHeader1: Value1\r\n'), ] for (input, output) in cases: self.assertEquals(imap4._formatHeaders(input), output) def testMessageSet(self): m1 = MessageSet() m2 = MessageSet() self.assertEquals(m1, m2) m1 = m1 + (1, 3) self.assertEquals(len(m1), 3) self.assertEquals(list(m1), [1, 2, 3]) m2 = m2 + (1, 3) self.assertEquals(m1, m2) self.assertEquals(list(m1 + m2), [1, 2, 3]) def testQuotedSplitter(self): cases = [ '''Hello World''', '''Hello "World!"''', '''World "Hello" "How are you?"''', '''"Hello world" How "are you?"''', '''foo bar "baz buz" NIL''', '''foo bar "baz buz" "NIL"''', '''foo NIL "baz buz" bar''', '''foo "NIL" "baz buz" bar''', '''"NIL" bar "baz buz" foo''', ] answers = [ ['Hello', 'World'], ['Hello', 'World!'], ['World', 'Hello', 'How are you?'], ['Hello world', 'How', 'are you?'], ['foo', 'bar', 'baz buz', None], ['foo', 'bar', 'baz buz', 'NIL'], ['foo', None, 'baz buz', 'bar'], ['foo', 'NIL', 'baz buz', 'bar'], ['NIL', 'bar', 'baz buz', 'foo'], ] errors = [ '"mismatched quote', 'mismatched quote"', 'mismatched"quote', '"oops here is" another"', ] for s in errors: self.assertRaises(imap4.MismatchedQuoting, imap4.splitQuoted, s) for (case, expected) in zip(cases, answers): self.assertEquals(imap4.splitQuoted(case), expected) def testStringCollapser(self): cases = [ ['a', 'b', 'c', 'd', 'e'], ['a', ' ', '"', 'b', 'c', ' ', '"', ' ', 'd', 'e'], [['a', 'b', 'c'], 'd', 'e'], ['a', ['b', 'c', 'd'], 'e'], ['a', 'b', ['c', 'd', 'e']], ['"', 'a', ' ', '"', ['b', 'c', 'd'], '"', ' ', 'e', '"'], ['a', ['"', ' ', 'b', 'c', ' ', ' ', '"'], 'd', 'e'], ] answers = [ ['abcde'], ['a', 'bc ', 'de'], [['abc'], 'de'], ['a', ['bcd'], 'e'], ['ab', ['cde']], ['a ', ['bcd'], ' e'], ['a', [' bc '], 'de'], ] for (case, expected) in zip(cases, answers): self.assertEquals(imap4.collapseStrings(case), expected) def testParenParser(self): s = '\r\n'.join(['xx'] * 4) cases = [ '(BODY.PEEK[HEADER.FIELDS.NOT (subject bcc cc)] {%d}\r\n%s)' % (len(s), s,), # '(FLAGS (\Seen) INTERNALDATE "17-Jul-1996 02:44:25 -0700" ' # 'RFC822.SIZE 4286 ENVELOPE ("Wed, 17 Jul 1996 02:23:25 -0700 (PDT)" ' # '"IMAP4rev1 WG mtg summary and minutes" ' # '(("Terry Gray" NIL "gray" "cac.washington.edu")) ' # '(("Terry Gray" NIL "gray" "cac.washington.edu")) ' # '(("Terry Gray" NIL "gray" "cac.washington.edu")) ' # '((NIL NIL "imap" "cac.washington.edu")) ' # '((NIL NIL "minutes" "CNRI.Reston.VA.US") ' # '("John Klensin" NIL "KLENSIN" "INFOODS.MIT.EDU")) NIL NIL ' # '"") ' # 'BODY ("TEXT" "PLAIN" ("CHARSET" "US-ASCII") NIL NIL "7BIT" 3028 92))', '(FLAGS (\Seen) INTERNALDATE "17-Jul-1996 02:44:25 -0700" ' 'RFC822.SIZE 4286 ENVELOPE ("Wed, 17 Jul 1996 02:23:25 -0700 (PDT)" ' '"IMAP4rev1 WG mtg summary and minutes" ' '(("Terry Gray" NIL gray cac.washington.edu)) ' '(("Terry Gray" NIL gray cac.washington.edu)) ' '(("Terry Gray" NIL gray cac.washington.edu)) ' '((NIL NIL imap cac.washington.edu)) ' '((NIL NIL minutes CNRI.Reston.VA.US) ' '("John Klensin" NIL KLENSIN INFOODS.MIT.EDU)) NIL NIL ' ') ' 'BODY (TEXT PLAIN (CHARSET US-ASCII) NIL NIL 7BIT 3028 92))', ] answers = [ ['BODY.PEEK', ['HEADER.FIELDS.NOT', ['subject', 'bcc', 'cc']], s], ['FLAGS', [r'\Seen'], 'INTERNALDATE', '17-Jul-1996 02:44:25 -0700', 'RFC822.SIZE', '4286', 'ENVELOPE', ['Wed, 17 Jul 1996 02:23:25 -0700 (PDT)', 'IMAP4rev1 WG mtg summary and minutes', [["Terry Gray", None, "gray", "cac.washington.edu"]], [["Terry Gray", None, "gray", "cac.washington.edu"]], [["Terry Gray", None, "gray", "cac.washington.edu"]], [[None, None, "imap", "cac.washington.edu"]], [[None, None, "minutes", "CNRI.Reston.VA.US"], ["John Klensin", None, "KLENSIN", "INFOODS.MIT.EDU"]], None, None, ""], "BODY", ["TEXT", "PLAIN", ["CHARSET", "US-ASCII"], None, None, "7BIT", "3028", "92"]], ] for (case, expected) in zip(cases, answers): self.assertEquals(imap4.parseNestedParens(case), [expected]) # XXX This code used to work, but changes occurred within the # imap4.py module which made it no longer necessary for *all* of it # to work. In particular, only the part that makes # 'BODY.PEEK[HEADER.FIELDS.NOT (Subject Bcc Cc)]' come out correctly # no longer needs to work. So, I am loathe to delete the entire # section of the test. --exarkun # # for (case, expected) in zip(answers, cases): # self.assertEquals('(' + imap4.collapseNestedLists(case) + ')', expected) def testFetchParserSimple(self): cases = [ ['ENVELOPE', 'Envelope'], ['FLAGS', 'Flags'], ['INTERNALDATE', 'InternalDate'], ['RFC822.HEADER', 'RFC822Header'], ['RFC822.SIZE', 'RFC822Size'], ['RFC822.TEXT', 'RFC822Text'], ['RFC822', 'RFC822'], ['UID', 'UID'], ['BODYSTRUCTURE', 'BodyStructure'], ] for (inp, outp) in cases: p = imap4._FetchParser() p.parseString(inp) self.assertEquals(len(p.result), 1) self.failUnless(isinstance(p.result[0], getattr(p, outp))) def testFetchParserMacros(self): cases = [ ['ALL', (4, ['flags', 'internaldate', 'rfc822.size', 'envelope'])], ['FULL', (5, ['flags', 'internaldate', 'rfc822.size', 'envelope', 'body'])], ['FAST', (3, ['flags', 'internaldate', 'rfc822.size'])], ] for (inp, outp) in cases: p = imap4._FetchParser() p.parseString(inp) self.assertEquals(len(p.result), outp[0]) p = [str(p).lower() for p in p.result] p.sort() outp[1].sort() self.assertEquals(p, outp[1]) def testFetchParserBody(self): P = imap4._FetchParser p = P() p.parseString('BODY') self.assertEquals(len(p.result), 1) self.failUnless(isinstance(p.result[0], p.Body)) self.assertEquals(p.result[0].peek, False) self.assertEquals(p.result[0].header, None) self.assertEquals(str(p.result[0]), 'BODY') p = P() p.parseString('BODY.PEEK') self.assertEquals(len(p.result), 1) self.failUnless(isinstance(p.result[0], p.Body)) self.assertEquals(p.result[0].peek, True) self.assertEquals(str(p.result[0]), 'BODY') p = P() p.parseString('BODY[]') self.assertEquals(len(p.result), 1) self.failUnless(isinstance(p.result[0], p.Body)) self.assertEquals(p.result[0].empty, True) self.assertEquals(str(p.result[0]), 'BODY[]') p = P() p.parseString('BODY[HEADER]') self.assertEquals(len(p.result), 1) self.failUnless(isinstance(p.result[0], p.Body)) self.assertEquals(p.result[0].peek, False) self.failUnless(isinstance(p.result[0].header, p.Header)) self.assertEquals(p.result[0].header.negate, False) self.assertEquals(p.result[0].header.fields, None) self.assertEquals(p.result[0].empty, False) self.assertEquals(str(p.result[0]), 'BODY[HEADER]') p = P() p.parseString('BODY.PEEK[HEADER]') self.assertEquals(len(p.result), 1) self.failUnless(isinstance(p.result[0], p.Body)) self.assertEquals(p.result[0].peek, True) self.failUnless(isinstance(p.result[0].header, p.Header)) self.assertEquals(p.result[0].header.negate, False) self.assertEquals(p.result[0].header.fields, None) self.assertEquals(p.result[0].empty, False) self.assertEquals(str(p.result[0]), 'BODY[HEADER]') p = P() p.parseString('BODY[HEADER.FIELDS (Subject Cc Message-Id)]') self.assertEquals(len(p.result), 1) self.failUnless(isinstance(p.result[0], p.Body)) self.assertEquals(p.result[0].peek, False) self.failUnless(isinstance(p.result[0].header, p.Header)) self.assertEquals(p.result[0].header.negate, False) self.assertEquals(p.result[0].header.fields, ['SUBJECT', 'CC', 'MESSAGE-ID']) self.assertEquals(p.result[0].empty, False) self.assertEquals(str(p.result[0]), 'BODY[HEADER.FIELDS (Subject Cc Message-Id)]') p = P() p.parseString('BODY.PEEK[HEADER.FIELDS (Subject Cc Message-Id)]') self.assertEquals(len(p.result), 1) self.failUnless(isinstance(p.result[0], p.Body)) self.assertEquals(p.result[0].peek, True) self.failUnless(isinstance(p.result[0].header, p.Header)) self.assertEquals(p.result[0].header.negate, False) self.assertEquals(p.result[0].header.fields, ['SUBJECT', 'CC', 'MESSAGE-ID']) self.assertEquals(p.result[0].empty, False) self.assertEquals(str(p.result[0]), 'BODY[HEADER.FIELDS (Subject Cc Message-Id)]') p = P() p.parseString('BODY.PEEK[HEADER.FIELDS.NOT (Subject Cc Message-Id)]') self.assertEquals(len(p.result), 1) self.failUnless(isinstance(p.result[0], p.Body)) self.assertEquals(p.result[0].peek, True) self.failUnless(isinstance(p.result[0].header, p.Header)) self.assertEquals(p.result[0].header.negate, True) self.assertEquals(p.result[0].header.fields, ['SUBJECT', 'CC', 'MESSAGE-ID']) self.assertEquals(p.result[0].empty, False) self.assertEquals(str(p.result[0]), 'BODY[HEADER.FIELDS.NOT (Subject Cc Message-Id)]') p = P() p.parseString('BODY[1.MIME]<10.50>') self.assertEquals(len(p.result), 1) self.failUnless(isinstance(p.result[0], p.Body)) self.assertEquals(p.result[0].peek, False) self.failUnless(isinstance(p.result[0].mime, p.MIME)) self.assertEquals(p.result[0].part, (0,)) self.assertEquals(p.result[0].partialBegin, 10) self.assertEquals(p.result[0].partialLength, 50) self.assertEquals(p.result[0].empty, False) self.assertEquals(str(p.result[0]), 'BODY[1.MIME]<10.50>') p = P() p.parseString('BODY.PEEK[1.3.9.11.HEADER.FIELDS.NOT (Message-Id Date)]<103.69>') self.assertEquals(len(p.result), 1) self.failUnless(isinstance(p.result[0], p.Body)) self.assertEquals(p.result[0].peek, True) self.failUnless(isinstance(p.result[0].header, p.Header)) self.assertEquals(p.result[0].part, (0, 2, 8, 10)) self.assertEquals(p.result[0].header.fields, ['MESSAGE-ID', 'DATE']) self.assertEquals(p.result[0].partialBegin, 103) self.assertEquals(p.result[0].partialLength, 69) self.assertEquals(p.result[0].empty, False) self.assertEquals(str(p.result[0]), 'BODY[1.3.9.11.HEADER.FIELDS.NOT (Message-Id Date)]<103.69>') def testFiles(self): inputStructure = [ 'foo', 'bar', 'baz', StringIO('this is a file\r\n'), 'buz' ] output = '"foo" "bar" "baz" {16}\r\nthis is a file\r\n "buz"' self.assertEquals(imap4.collapseNestedLists(inputStructure), output) def testQuoteAvoider(self): input = [ 'foo', imap4.DontQuoteMe('bar'), "baz", StringIO('this is a file\r\n'), imap4.DontQuoteMe('buz') ] output = '"foo" bar "baz" {16}\r\nthis is a file\r\n buz' self.assertEquals(imap4.collapseNestedLists(input), output) def testLiterals(self): cases = [ ('({10}\r\n0123456789)', [['0123456789']]), ] for (case, expected) in cases: self.assertEquals(imap4.parseNestedParens(case), expected) def testQueryBuilder(self): inputs = [ imap4.Query(flagged=1), imap4.Query(sorted=1, unflagged=1, deleted=1), imap4.Or(imap4.Query(flagged=1), imap4.Query(deleted=1)), imap4.Query(before='today'), imap4.Or( imap4.Query(deleted=1), imap4.Query(unseen=1), imap4.Query(new=1) ), imap4.Or( imap4.Not( imap4.Or( imap4.Query(sorted=1, since='yesterday', smaller=1000), imap4.Query(sorted=1, before='tuesday', larger=10000), imap4.Query(sorted=1, unseen=1, deleted=1, before='today'), imap4.Not( imap4.Query(subject='spam') ), ), ), imap4.Not( imap4.Query(uid='1:5') ), ) ] outputs = [ 'FLAGGED', '(DELETED UNFLAGGED)', '(OR FLAGGED DELETED)', '(BEFORE "today")', '(OR DELETED (OR UNSEEN NEW))', '(OR (NOT (OR (SINCE "yesterday" SMALLER 1000) ' # Continuing '(OR (BEFORE "tuesday" LARGER 10000) (OR (BEFORE ' # Some more '"today" DELETED UNSEEN) (NOT (SUBJECT "spam")))))) ' # And more '(NOT (UID 1:5)))', ] for (query, expected) in zip(inputs, outputs): self.assertEquals(query, expected) def testIdListParser(self): inputs = [ '1:*', '5:*', '1:2,5:*', '1', '1,2', '1,3,5', '1:10', '1:10,11', '1:5,10:20', '1,5:10', '1,5:10,15:20', '1:10,15,20:25', ] outputs = [ MessageSet(1, None), MessageSet(5, None), MessageSet(5, None) + MessageSet(1, 2), MessageSet(1), MessageSet(1, 2), MessageSet(1) + MessageSet(3) + MessageSet(5), MessageSet(1, 10), MessageSet(1, 11), MessageSet(1, 5) + MessageSet(10, 20), MessageSet(1) + MessageSet(5, 10), MessageSet(1) + MessageSet(5, 10) + MessageSet(15, 20), MessageSet(1, 10) + MessageSet(15) + MessageSet(20, 25), ] lengths = [ None, None, None, 1, 2, 3, 10, 11, 16, 7, 13, 17, ] for (input, expected) in zip(inputs, outputs): self.assertEquals(imap4.parseIdList(input), expected) for (input, expected) in zip(inputs, lengths): try: L = len(imap4.parseIdList(input)) except TypeError: L = None self.assertEquals(L, expected, "len(%r) = %r != %r" % (input, L, expected)) class SimpleMailbox: __implements__ = imap4.IMailboxInfo, imap4.IMailbox, imap4.ICloseableMailbox flags = ('\\Flag1', 'Flag2', '\\AnotherSysFlag', 'LastFlag') messages = [] mUID = 0 rw = 1 closed = False def __init__(self): self.listeners = [] self.addListener = self.listeners.append self.removeListener = self.listeners.remove def getFlags(self): return self.flags def getUIDValidity(self): return 42 def getUIDNext(self): return len(self.messages) + 1 def getMessageCount(self): return 9 def getRecentCount(self): return 3 def getUnseenCount(self): return 4 def isWriteable(self): return self.rw def destroy(self): pass def getHierarchicalDelimiter(self): return '/' def requestStatus(self, names): r = {} if 'MESSAGES' in names: r['MESSAGES'] = self.getMessageCount() if 'RECENT' in names: r['RECENT'] = self.getRecentCount() if 'UIDNEXT' in names: r['UIDNEXT'] = self.getMessageCount() + 1 if 'UIDVALIDITY' in names: r['UIDVALIDITY'] = self.getUID() if 'UNSEEN' in names: r['UNSEEN'] = self.getUnseenCount() return defer.succeed(r) def addMessage(self, message, flags, date = None): self.messages.append((message, flags, date, self.mUID)) self.mUID += 1 return defer.succeed(None) def expunge(self): delete = [] for i in self.messages: if '\\Deleted' in i[1]: delete.append(i) for i in delete: self.messages.remove(i) return [i[3] for i in delete] def close(self): self.closed = True class Account(imap4.MemoryAccount): def _emptyMailbox(self, name, id): return SimpleMailbox() def select(self, name, rw=1): mbox = imap4.MemoryAccount.select(self, name) if mbox is not None: mbox.rw = rw return mbox class SimpleServer(imap4.IMAP4Server): def __init__(self, *args, **kw): imap4.IMAP4Server.__init__(self, *args, **kw) realm = TestRealm() realm.theAccount = Account('testuser') portal = cred.portal.Portal(realm) c = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse() self.checker = c self.portal = portal portal.registerChecker(c) def authenticateLogin(self, username, password): if username == 'testuser' and password == 'password-test': return imap4.IAccount, self.theAccount, lambda: None raise cred.error.UnauthorizedLogin() class SimpleClient(imap4.IMAP4Client): def __init__(self, deferred, contextFactory = None): imap4.IMAP4Client.__init__(self, contextFactory) self.deferred = deferred self.events = [] def serverGreeting(self, caps): self.deferred.callback(None) def modeChanged(self, writeable): self.events.append(['modeChanged', writeable]) self.transport.loseConnection() def flagsChanged(self, newFlags): self.events.append(['flagsChanged', newFlags]) self.transport.loseConnection() def newMessages(self, exists, recent): self.events.append(['newMessages', exists, recent]) self.transport.loseConnection() class IMAP4HelperMixin: serverCTX = None clientCTX = None def setUp(self): d = defer.Deferred() self.server = SimpleServer(contextFactory=self.serverCTX) self.client = SimpleClient(d, contextFactory=self.clientCTX) self.connected = d SimpleMailbox.messages = [] theAccount = Account('testuser') theAccount.mboxType = SimpleMailbox SimpleServer.theAccount = theAccount def tearDown(self): del self.server del self.client del self.connected def _cbStopClient(self, ignore): self.client.transport.loseConnection() def _ebGeneral(self, failure): self.client.transport.loseConnection() self.server.transport.loseConnection() failure.printTraceback(open('failure.log', 'w')) failure.printTraceback() raise failure.value def loopback(self): loopback.loopback(self.server, self.client) class IMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): def testCapability(self): caps = {} def getCaps(): def gotCaps(c): caps.update(c) self.server.transport.loseConnection() return self.client.getCapabilities().addCallback(gotCaps) self.connected.addCallback(strip(getCaps)).addErrback(self._ebGeneral) self.loopback() self.assertEquals({'IMAP4rev1': None, 'NAMESPACE': None, 'IDLE': None}, caps) def testCapabilityWithAuth(self): caps = {} self.server.challengers['CRAM-MD5'] = cred.credentials.CramMD5Credentials def getCaps(): def gotCaps(c): caps.update(c) self.server.transport.loseConnection() return self.client.getCapabilities().addCallback(gotCaps) self.connected.addCallback(strip(getCaps)).addErrback(self._ebGeneral) self.loopback() expCap = {'IMAP4rev1': None, 'NAMESPACE': None, 'IDLE': None, 'AUTH': ['CRAM-MD5']} self.assertEquals(expCap, caps) def testLogout(self): self.loggedOut = 0 def logout(): def setLoggedOut(): self.loggedOut = 1 self.client.logout().addCallback(strip(setLoggedOut)) self.connected.addCallback(strip(logout)).addErrback(self._ebGeneral) self.loopback() self.assertEquals(self.loggedOut, 1) def testNoop(self): self.responses = None def noop(): def setResponses(responses): self.responses = responses self.server.transport.loseConnection() self.client.noop().addCallback(setResponses) self.connected.addCallback(strip(noop)).addErrback(self._ebGeneral) self.loopback() self.assertEquals(self.responses, []) def testLogin(self): def login(): d = self.client.login('testuser', 'password-test') d.addCallback(self._cbStopClient) self.connected.addCallback(strip(login)).addErrback(self._ebGeneral) self.loopback() self.assertEquals(self.server.account, SimpleServer.theAccount) self.assertEquals(self.server.state, 'auth') def testFailedLogin(self): def login(): d = self.client.login('testuser', 'wrong-password') d.addBoth(self._cbStopClient) self.connected.addCallback(strip(login)).addErrback(self._ebGeneral) self.loopback() self.assertEquals(self.server.account, None) self.assertEquals(self.server.state, 'unauth') def testNamespace(self): self.namespaceArgs = None def login(): return self.client.login('testuser', 'password-test') def namespace(): def gotNamespace(args): self.namespaceArgs = args self._cbStopClient(None) return self.client.namespace().addCallback(gotNamespace) d = self.connected.addCallback(strip(login)) d.addCallback(strip(namespace)) d.addErrback(self._ebGeneral) self.loopback() self.assertEquals(self.namespaceArgs, [[['', '/']], [], []]) def testSelect(self): SimpleServer.theAccount.addMailbox('test-mailbox') self.selectedArgs = None def login(): return self.client.login('testuser', 'password-test') def select(): def selected(args): self.selectedArgs = args self._cbStopClient(None) d = self.client.select('test-mailbox') d.addCallback(selected) return d d = self.connected.addCallback(strip(login)) d.addCallback(strip(select)) d.addErrback(self._ebGeneral) self.loopback() mbox = SimpleServer.theAccount.mailboxes['TEST-MAILBOX'] self.assertEquals(self.server.mbox, mbox) self.assertEquals(self.selectedArgs, { 'EXISTS': 9, 'RECENT': 3, 'UIDVALIDITY': 42, 'FLAGS': ('\\Flag1', 'Flag2', '\\AnotherSysFlag', 'LastFlag'), 'READ-WRITE': 1 }) def testExamine(self): SimpleServer.theAccount.addMailbox('test-mailbox') self.examinedArgs = None def login(): return self.client.login('testuser', 'password-test') def examine(): def examined(args): self.examinedArgs = args self._cbStopClient(None) d = self.client.examine('test-mailbox') d.addCallback(examined) return d d = self.connected.addCallback(strip(login)) d.addCallback(strip(examine)) d.addErrback(self._ebGeneral) self.loopback() mbox = SimpleServer.theAccount.mailboxes['TEST-MAILBOX'] self.assertEquals(self.server.mbox, mbox) self.assertEquals(self.examinedArgs, { 'EXISTS': 9, 'RECENT': 3, 'UIDVALIDITY': 42, 'FLAGS': ('\\Flag1', 'Flag2', '\\AnotherSysFlag', 'LastFlag'), 'READ-WRITE': 0 }) def testCreate(self): succeed = ('testbox', 'test/box', 'test/', 'test/box/box', 'INBOX') fail = ('testbox', 'test/box') def cb(): self.result.append(1) def eb(failure): self.result.append(0) def login(): return self.client.login('testuser', 'password-test') def create(): for name in succeed + fail: d = self.client.create(name) d.addCallback(strip(cb)).addErrback(eb) d.addCallbacks(self._cbStopClient, self._ebGeneral) self.result = [] d = self.connected.addCallback(strip(login)).addCallback(strip(create)) self.loopback() self.assertEquals(self.result, [1] * len(succeed) + [0] * len(fail)) mbox = SimpleServer.theAccount.mailboxes.keys() answers = ['inbox', 'testbox', 'test/box', 'test', 'test/box/box'] mbox.sort() answers.sort() self.assertEquals(mbox, [a.upper() for a in answers]) def testDelete(self): SimpleServer.theAccount.addMailbox('delete/me') def login(): return self.client.login('testuser', 'password-test') def delete(): return self.client.delete('delete/me') d = self.connected.addCallback(strip(login)) d.addCallbacks(strip(delete), self._ebGeneral) d.addCallbacks(self._cbStopClient, self._ebGeneral) self.loopback() self.assertEquals(SimpleServer.theAccount.mailboxes.keys(), []) def testIllegalInboxDelete(self): self.stashed = None def login(): return self.client.login('testuser', 'password-test') def delete(): return self.client.delete('inbox') def stash(result): self.stashed = result d = self.connected.addCallback(strip(login)) d.addCallbacks(strip(delete), self._ebGeneral) d.addBoth(stash) d.addCallbacks(self._cbStopClient, self._ebGeneral) self.loopback() self.failUnless(isinstance(self.stashed, failure.Failure)) def testNonExistentDelete(self): def login(): return self.client.login('testuser', 'password-test') def delete(): return self.client.delete('delete/me') def deleteFailed(failure): self.failure = failure self.failure = None d = self.connected.addCallback(strip(login)) d.addCallback(strip(delete)).addErrback(deleteFailed) d.addCallbacks(self._cbStopClient, self._ebGeneral) self.loopback() self.assertEquals(str(self.failure.value), 'No such mailbox') def testIllegalDelete(self): m = SimpleMailbox() m.flags = (r'\Noselect',) SimpleServer.theAccount.addMailbox('delete', m) SimpleServer.theAccount.addMailbox('delete/me') def login(): return self.client.login('testuser', 'password-test') def delete(): return self.client.delete('delete') def deleteFailed(failure): self.failure = failure self.failure = None d = self.connected.addCallback(strip(login)) d.addCallback(strip(delete)).addErrback(deleteFailed) d.addCallbacks(self._cbStopClient, self._ebGeneral) self.loopback() self.assertEquals(str(self.failure.value), "Hierarchically inferior mailboxes exist and \\Noselect is set") def testRename(self): SimpleServer.theAccount.addMailbox('oldmbox') def login(): return self.client.login('testuser', 'password-test') def rename(): return self.client.rename('oldmbox', 'newname') d = self.connected.addCallback(strip(login)) d.addCallbacks(strip(rename), self._ebGeneral) d.addCallbacks(self._cbStopClient, self._ebGeneral) self.loopback() self.assertEquals(SimpleServer.theAccount.mailboxes.keys(), ['NEWNAME']) def testIllegalInboxRename(self): self.stashed = None def login(): return self.client.login('testuser', 'password-test') def rename(): return self.client.rename('inbox', 'frotz') def stash(stuff): self.stashed = stuff d = self.connected.addCallback(strip(login)) d.addCallbacks(strip(rename), self._ebGeneral) d.addBoth(stash) d.addCallbacks(self._cbStopClient, self._ebGeneral) self.loopback() self.failUnless(isinstance(self.stashed, failure.Failure)) def testHierarchicalRename(self): SimpleServer.theAccount.create('oldmbox/m1') SimpleServer.theAccount.create('oldmbox/m2') def login(): return self.client.login('testuser', 'password-test') def rename(): return self.client.rename('oldmbox', 'newname') d = self.connected.addCallback(strip(login)) d.addCallbacks(strip(rename), self._ebGeneral) d.addCallbacks(self._cbStopClient, self._ebGeneral) self.loopback() mboxes = SimpleServer.theAccount.mailboxes.keys() expected = ['newname', 'newname/m1', 'newname/m2'] mboxes.sort() self.assertEquals(mboxes, [s.upper() for s in expected]) def testSubscribe(self): def login(): return self.client.login('testuser', 'password-test') def subscribe(): return self.client.subscribe('this/mbox') d = self.connected.addCallback(strip(login)) d.addCallbacks(strip(subscribe), self._ebGeneral) d.addCallbacks(self._cbStopClient, self._ebGeneral) self.loopback() self.assertEquals(SimpleServer.theAccount.subscriptions, ['THIS/MBOX']) def testUnsubscribe(self): SimpleServer.theAccount.subscriptions = ['THIS/MBOX', 'THAT/MBOX'] def login(): return self.client.login('testuser', 'password-test') def unsubscribe(): return self.client.unsubscribe('this/mbox') d = self.connected.addCallback(strip(login)) d.addCallbacks(strip(unsubscribe), self._ebGeneral) d.addCallbacks(self._cbStopClient, self._ebGeneral) self.loopback() self.assertEquals(SimpleServer.theAccount.subscriptions, ['THAT/MBOX']) def _listSetup(self, f): SimpleServer.theAccount.addMailbox('root/subthing') SimpleServer.theAccount.addMailbox('root/another-thing') SimpleServer.theAccount.addMailbox('non-root/subthing') def login(): return self.client.login('testuser', 'password-test') def listed(answers): self.listed = answers self.listed = None d = self.connected.addCallback(strip(login)) d.addCallbacks(strip(f), self._ebGeneral) d.addCallbacks(listed, self._ebGeneral) d.addCallbacks(self._cbStopClient, self._ebGeneral) self.loopback() return self.listed def testList(self): def list(): return self.client.list('root', '%') listed = self._listSetup(list) self.assertEquals( sortNest(listed), sortNest([ (SimpleMailbox.flags, "/", "ROOT/SUBTHING"), (SimpleMailbox.flags, "/", "ROOT/ANOTHER-THING") ]) ) def testLSub(self): SimpleServer.theAccount.subscribe('ROOT/SUBTHING') def lsub(): return self.client.lsub('root', '%') listed = self._listSetup(lsub) self.assertEquals(listed, [(SimpleMailbox.flags, "/", "ROOT/SUBTHING")]) def testStatus(self): SimpleServer.theAccount.addMailbox('root/subthing') def login(): return self.client.login('testuser', 'password-test') def status(): return self.client.status('root/subthing', 'MESSAGES', 'UIDNEXT', 'UNSEEN') def statused(result): self.statused = result self.statused = None d = self.connected.addCallback(strip(login)) d.addCallbacks(strip(status), self._ebGeneral) d.addCallbacks(statused, self._ebGeneral) d.addCallbacks(self._cbStopClient, self._ebGeneral) self.loopback() self.assertEquals( self.statused, {'MESSAGES': 9, 'UIDNEXT': '10', 'UNSEEN': 4} ) def testFailedStatus(self): def login(): return self.client.login('testuser', 'password-test') def status(): return self.client.status('root/nonexistent', 'MESSAGES', 'UIDNEXT', 'UNSEEN') def statused(result): self.statused = result def failed(failure): self.failure = failure self.statused = self.failure = None d = self.connected.addCallback(strip(login)) d.addCallbacks(strip(status), self._ebGeneral) d.addCallbacks(statused, failed) d.addCallbacks(self._cbStopClient, self._ebGeneral) self.loopback() self.assertEquals( self.statused, None ) self.assertEquals( self.failure.value.args, ('Could not open mailbox',) ) def testFullAppend(self): infile = util.sibpath(__file__, 'rfc822.message') message = open(infile) SimpleServer.theAccount.addMailbox('root/subthing') def login(): return self.client.login('testuser', 'password-test') def append(): return self.client.append( 'root/subthing', message, ('\\SEEN', '\\DELETED'), 'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)', ) d = self.connected.addCallback(strip(login)) d.addCallbacks(strip(append), self._ebGeneral) d.addCallbacks(self._cbStopClient, self._ebGeneral) self.loopback() mb = SimpleServer.theAccount.mailboxes['ROOT/SUBTHING'] self.assertEquals(1, len(mb.messages)) self.assertEquals( (['\\SEEN', '\\DELETED'], 'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)', 0), mb.messages[0][1:] ) self.assertEquals(open(infile).read(), mb.messages[0][0].getvalue()) def testPartialAppend(self): infile = util.sibpath(__file__, 'rfc822.message') message = open(infile) SimpleServer.theAccount.addMailbox('PARTIAL/SUBTHING') def login(): return self.client.login('testuser', 'password-test') def append(): message = file(infile) return self.client.sendCommand( imap4.Command( 'APPEND', 'PARTIAL/SUBTHING (\\SEEN) "Right now" {%d}' % os.path.getsize(infile), (), self.client._IMAP4Client__cbContinueAppend, message ) ) d = self.connected.addCallback(strip(login)) d.addCallbacks(strip(append), self._ebGeneral) d.addCallbacks(self._cbStopClient, self._ebGeneral) d.setTimeout(5) self.loopback() mb = SimpleServer.theAccount.mailboxes['PARTIAL/SUBTHING'] self.assertEquals(1, len(mb.messages)) self.assertEquals( (['\\SEEN'], 'Right now', 0), mb.messages[0][1:] ) self.assertEquals(open(infile).read(), mb.messages[0][0].getvalue()) def testCheck(self): SimpleServer.theAccount.addMailbox('root/subthing') def login(): return self.client.login('testuser', 'password-test') def select(): return self.client.select('root/subthing') def check(): return self.client.check() d = self.connected.addCallback(strip(login)) d.addCallbacks(strip(select), self._ebGeneral) d.addCallbacks(strip(check), self._ebGeneral) d.addCallbacks(self._cbStopClient, self._ebGeneral) self.loopback() # Okay, that was fun def testClose(self): m = SimpleMailbox() m.messages = [ ('Message 1', ('\\Deleted', 'AnotherFlag'), None, 0), ('Message 2', ('AnotherFlag',), None, 1), ('Message 3', ('\\Deleted',), None, 2), ] SimpleServer.theAccount.addMailbox('mailbox', m) def login(): return self.client.login('testuser', 'password-test') def select(): return self.client.select('mailbox') def close(): return self.client.close() d = self.connected.addCallback(strip(login)) d.addCallbacks(strip(select), self._ebGeneral) d.addCallbacks(strip(close), self._ebGeneral) d.addCallbacks(self._cbStopClient, self._ebGeneral) self.loopback() self.assertEquals(len(m.messages), 1) self.assertEquals(m.messages[0], ('Message 2', ('AnotherFlag',), None, 1)) self.failUnless(m.closed) def testExpunge(self): m = SimpleMailbox() m.messages = [ ('Message 1', ('\\Deleted', 'AnotherFlag'), None, 0), ('Message 2', ('AnotherFlag',), None, 1), ('Message 3', ('\\Deleted',), None, 2), ] SimpleServer.theAccount.addMailbox('mailbox', m) def login(): return self.client.login('testuser', 'password-test') def select(): return self.client.select('mailbox') def expunge(): return self.client.expunge() def expunged(results): self.failIf(self.server.mbox is None) self.results = results self.results = None d = self.connected.addCallback(strip(login)) d.addCallbacks(strip(select), self._ebGeneral) d.addCallbacks(strip(expunge), self._ebGeneral) d.addCallbacks(expunged, self._ebGeneral) d.addCallbacks(self._cbStopClient, self._ebGeneral) self.loopback() self.assertEquals(len(m.messages), 1) self.assertEquals(m.messages[0], ('Message 2', ('AnotherFlag',), None, 1)) self.assertEquals(self.results, [0, 2]) class TestRealm: theAccount = None def requestAvatar(self, avatarId, mind, *interfaces): return imap4.IAccount, self.theAccount, lambda: None class TestChecker: credentialInterfaces = (cred.credentials.IUsernameHashedPassword, cred.credentials.IUsernamePassword) users = { 'testuser': 'secret' } def requestAvatarId(self, credentials): if credentials.username in self.users: return defer.maybeDeferred( credentials.checkPassword, self.users[credentials.username] ).addCallback(self._cbCheck, credentials.username) def _cbCheck(self, result, username): if result: return username raise cred.error.UnauthorizedLogin() class AuthenticatorTestCase(IMAP4HelperMixin, unittest.TestCase): def setUp(self): IMAP4HelperMixin.setUp(self) realm = TestRealm() realm.theAccount = Account('testuser') portal = cred.portal.Portal(realm) portal.registerChecker(TestChecker()) self.server.portal = portal self.authenticated = 0 self.account = realm.theAccount def testCramMD5(self): self.server.challengers['CRAM-MD5'] = cred.credentials.CramMD5Credentials cAuth = imap4.CramMD5ClientAuthenticator('testuser') self.client.registerAuthenticator(cAuth) def auth(): return self.client.authenticate('secret') def authed(): self.authenticated = 1 d = self.connected.addCallback(strip(auth)) d.addCallbacks(strip(authed), self._ebGeneral) d.addCallbacks(self._cbStopClient, self._ebGeneral) self.loopback() self.assertEquals(self.authenticated, 1) self.assertEquals(self.server.account, self.account) def testFailedCramMD5(self): self.server.challengers['CRAM-MD5'] = cred.credentials.CramMD5Credentials cAuth = imap4.CramMD5ClientAuthenticator('testuser') self.client.registerAuthenticator(cAuth) def misauth(): return self.client.authenticate('not the secret') def authed(): self.authenticated = 1 def misauthed(): self.authenticated = -1 d = self.connected.addCallback(strip(misauth)) d.addCallbacks(strip(authed), strip(misauthed)) d.addCallbacks(self._cbStopClient, self._ebGeneral) self.loopback() self.assertEquals(self.authenticated, -1) self.assertEquals(self.server.account, None) def testLOGIN(self): self.server.challengers['LOGIN'] = imap4.LOGINCredentials cAuth = imap4.LOGINAuthenticator('testuser') self.client.registerAuthenticator(cAuth) def auth(): return self.client.authenticate('secret') def authed(): self.authenticated = 1 d = self.connected.addCallback(strip(auth)) d.addCallbacks(strip(authed), self._ebGeneral) d.addCallbacks(self._cbStopClient, self._ebGeneral) self.loopback() self.assertEquals(self.authenticated, 1) self.assertEquals(self.server.account, self.account) def testFailedLOGIN(self): self.server.challengers['LOGIN'] = imap4.LOGINCredentials cAuth = imap4.LOGINAuthenticator('testuser') self.client.registerAuthenticator(cAuth) def misauth(): return self.client.authenticate('not the secret') def authed(): self.authenticated = 1 def misauthed(): self.authenticated = -1 d = self.connected.addCallback(strip(misauth)) d.addCallbacks(strip(authed), strip(misauthed)) d.addCallbacks(self._cbStopClient, self._ebGeneral) self.loopback() self.assertEquals(self.authenticated, -1) self.assertEquals(self.server.account, None) def testPLAIN(self): self.server.challengers['PLAIN'] = imap4.PLAINCredentials cAuth = imap4.PLAINAuthenticator('testuser') self.client.registerAuthenticator(cAuth) def auth(): return self.client.authenticate('secret') def authed(): self.authenticated = 1 d = self.connected.addCallback(strip(auth)) d.addCallbacks(strip(authed), self._ebGeneral) d.addCallbacks(self._cbStopClient, self._ebGeneral) self.loopback() self.assertEquals(self.authenticated, 1) self.assertEquals(self.server.account, self.account) def testFailedPLAIN(self): self.server.challengers['PLAIN'] = imap4.PLAINCredentials cAuth = imap4.PLAINAuthenticator('testuser') self.client.registerAuthenticator(cAuth) def misauth(): return self.client.authenticate('not the secret') def authed(): self.authenticated = 1 def misauthed(): self.authenticated = -1 d = self.connected.addCallback(strip(misauth)) d.addCallbacks(strip(authed), strip(misauthed)) d.addCallbacks(self._cbStopClient, self._ebGeneral) self.loopback() self.assertEquals(self.authenticated, -1) self.assertEquals(self.server.account, None) class UnsolicitedResponseTestCase(IMAP4HelperMixin, unittest.TestCase): def testReadWrite(self): def login(): return self.client.login('testuser', 'password-test') def loggedIn(): self.server.modeChanged(1) d = self.connected.addCallback(strip(login)) d.addCallback(strip(loggedIn)).addErrback(self._ebGeneral) self.loopback() E = self.client.events self.assertEquals(E, [['modeChanged', 1]]) def testReadOnly(self): def login(): return self.client.login('testuser', 'password-test') def loggedIn(): self.server.modeChanged(0) d = self.connected.addCallback(strip(login)) d.addCallback(strip(loggedIn)).addErrback(self._ebGeneral) self.loopback() E = self.client.events self.assertEquals(E, [['modeChanged', 0]]) def testFlagChange(self): flags = { 1: ['\\Answered', '\\Deleted'], 5: [], 10: ['\\Recent'] } def login(): return self.client.login('testuser', 'password-test') def loggedIn(): self.server.flagsChanged(flags) d = self.connected.addCallback(strip(login)) d.addCallback(strip(loggedIn)).addErrback(self._ebGeneral) self.loopback() E = self.client.events expect = [['flagsChanged', {x[0]: x[1]}] for x in flags.items()] E.sort() expect.sort() self.assertEquals(E, expect) def testNewMessages(self): def login(): return self.client.login('testuser', 'password-test') def loggedIn(): self.server.newMessages(10, None) d = self.connected.addCallback(strip(login)) d.addCallback(strip(loggedIn)).addErrback(self._ebGeneral) self.loopback() E = self.client.events self.assertEquals(E, [['newMessages', 10, None]]) def testNewRecentMessages(self): def login(): return self.client.login('testuser', 'password-test') def loggedIn(): self.server.newMessages(None, 10) d = self.connected.addCallback(strip(login)) d.addCallback(strip(loggedIn)).addErrback(self._ebGeneral) self.loopback() E = self.client.events self.assertEquals(E, [['newMessages', None, 10]]) def testNewMessagesAndRecent(self): def login(): return self.client.login('testuser', 'password-test') def loggedIn(): self.server.newMessages(20, 10) d = self.connected.addCallback(strip(login)) d.addCallback(strip(loggedIn)).addErrback(self._ebGeneral) self.loopback() E = self.client.events self.assertEquals(E, [['newMessages', 20, None], ['newMessages', None, 10]]) class HandCraftedTestCase(unittest.TestCase): def testTrailingLiteral(self): transport = StringTransport() c = imap4.IMAP4Client() c.makeConnection(transport) c.lineReceived('* OK [IMAP4rev1]') d = c.login('blah', 'blah') c.dataReceived('0001 OK LOGIN\r\n') self.failUnless(unittest.deferredResult(d)) d = c.select('inbox') c.lineReceived('0002 OK SELECT') self.failUnless(unittest.deferredResult(d)) d = c.fetchMessage('1') c.dataReceived('* 1 FETCH (RFC822 {10}\r\n0123456789\r\n RFC822.SIZE 10)\r\n') c.dataReceived('0003 OK FETCH\r\n') self.failUnless(unittest.deferredResult(d)) def testPathelogicalScatteringOfLiterals(self): transport = StringTransport() c = imap4.IMAP4Server() c.makeConnection(transport) transport.clear() c.lineReceived("01 LOGIN {8}") self.assertEquals(transport.value(), "+ Ready for 8 octets of text\r\n") transport.clear() c.lineReceived("testuser {8}") self.assertEquals(transport.value(), "+ Ready for 8 octets of text\r\n") transport.clear() c.lineReceived("password") self.assertEquals(transport.value(), "01 OK Login succeeded\r\n") self.assertEquals(c.state, 'auth') testPathelogicalScatteringOfLiterals.todo = "Parsing this protocol is hard :(" class FakeyServer(imap4.IMAP4Server): state = 'select' timeout = None def sendServerGreeting(self): pass class FakeyMessage: __implements__ = (imap4.IMessage,) def __init__(self, headers, flags, date, body, uid, subpart): self.headers = headers self.flags = flags self.body = StringIO(body) self.size = len(body) self.date = date self.uid = uid self.subpart = subpart def getHeaders(self, negate, *names): self.got_headers = negate, names return self.headers def getFlags(self): return self.flags def getInternalDate(self): return self.date def getBodyFile(self): return self.body def getSize(self): return self.size def getUID(self): return self.uid def isMultipart(self): return self.subpart is not None def getSubPart(self, part): self.got_subpart = part return self.subpart[part] class NewStoreTestCase(unittest.TestCase, IMAP4HelperMixin): result = None storeArgs = None def setUp(self): self.received_messages = self.received_uid = None self.server = imap4.IMAP4Server() self.server.state = 'select' self.server.mbox = self self.connected = defer.Deferred() self.client = SimpleClient(self.connected) def addListener(self, x): pass def removeListener(self, x): pass def store(self, *args, **kw): self.storeArgs = args, kw return self.response def _storeWork(self): def connected(): return self.function(self.messages, self.flags, self.silent, self.uid) def result(R): self.result = R self.connected.addCallback(strip(connected) ).addCallback(result ).addCallback(self._cbStopClient ).addErrback(self._ebGeneral) loopback.loopbackTCP(self.server, self.client) self.assertEquals(self.result, self.expected) self.assertEquals(self.storeArgs, self.expectedArgs) def testSetFlags(self, uid=0): self.function = self.client.setFlags self.messages = '1,5,9' self.flags = ['\\A', '\\B', 'C'] self.silent = False self.uid = uid self.response = { 1: ['\\A', '\\B', 'C'], 5: ['\\A', '\\B', 'C'], 9: ['\\A', '\\B', 'C'], } self.expected = { 1: {'FLAGS': ['\\A', '\\B', 'C']}, 5: {'FLAGS': ['\\A', '\\B', 'C']}, 9: {'FLAGS': ['\\A', '\\B', 'C']}, } msg = imap4.MessageSet() msg.add(1) msg.add(5) msg.add(9) self.expectedArgs = ((msg, ['\\A', '\\B', 'C'], 0), {'uid': 0}) self._storeWork() class NewFetchTestCase(unittest.TestCase, IMAP4HelperMixin): def setUp(self): self.received_messages = self.received_uid = None self.result = None self.server = imap4.IMAP4Server() self.server.state = 'select' self.server.mbox = self self.connected = defer.Deferred() self.client = SimpleClient(self.connected) def addListener(self, x): pass def removeListener(self, x): pass def fetch(self, messages, uid): self.received_messages = messages self.received_uid = uid return iter(zip(range(len(self.msgObjs)), self.msgObjs)) def _fetchWork(self, uid): if uid: for (i, msg) in zip(range(len(self.msgObjs)), self.msgObjs): self.expected[i]['UID'] = str(msg.getUID()) def result(R): self.result = R self.connected.addCallback(lambda _: self.function(self.messages, uid) ).addCallback(result ).addCallback(self._cbStopClient ).addErrback(self._ebGeneral) loopback.loopbackTCP(self.server, self.client) self.assertEquals(self.result, self.expected) def testFetchUID(self): self.function = lambda m, u: self.client.fetchUID(m) self.messages = '7' self.msgObjs = [ FakeyMessage({}, (), '', '', 12345, None), FakeyMessage({}, (), '', '', 999, None), FakeyMessage({}, (), '', '', 10101, None), ] self.expected = { 0: {'UID': '12345'}, 1: {'UID': '999'}, 2: {'UID': '10101'}, } self._fetchWork(0) def testFetchFlags(self, uid=0): self.function = self.client.fetchFlags self.messages = '9' self.msgObjs = [ FakeyMessage({}, ['FlagA', 'FlagB', '\\FlagC'], '', '', 54321, None), FakeyMessage({}, ['\\FlagC', 'FlagA', 'FlagB'], '', '', 12345, None), ] self.expected = { 0: {'FLAGS': ['FlagA', 'FlagB', '\\FlagC']}, 1: {'FLAGS': ['\\FlagC', 'FlagA', 'FlagB']}, } self._fetchWork(uid) def testFetchFlagsUID(self): self.testFetchFlags(1) def testFetchInternalDate(self, uid=0): self.function = self.client.fetchInternalDate self.messages = '13' self.msgObjs = [ FakeyMessage({}, (), 'Fri, 02 Nov 2003 21:25:10 GMT', '', 23232, None), FakeyMessage({}, (), 'Thu, 29 Dec 2013 11:31:52 EST', '', 101, None), FakeyMessage({}, (), 'Mon, 10 Mar 1992 02:44:30 CST', '', 202, None), FakeyMessage({}, (), 'Sat, 11 Jan 2000 14:40:24 PST', '', 303, None), ] self.expected = { 0: {'INTERNALDATE': '02-Nov-2003 21:25:10 +0000'}, 1: {'INTERNALDATE': '29-Dec-2013 11:31:52 -0500'}, 2: {'INTERNALDATE': '10-Mar-1992 02:44:30 -0600'}, 3: {'INTERNALDATE': '11-Jan-2000 14:40:24 -0800'}, } self._fetchWork(uid) def testFetchInternalDateUID(self): self.testFetchInternalDate(1) def testFetchEnvelope(self, uid=0): self.function = self.client.fetchEnvelope self.messages = '15' self.msgObjs = [ FakeyMessage({ 'from': 'user@domain', 'to': 'resu@domain', 'date': 'thursday', 'subject': 'it is a message', 'message-id': 'id-id-id-yayaya'}, (), '', '', 65656, None), ] self.expected = { 0: {'ENVELOPE': ['thursday', 'it is a message', [[None, None, 'user', 'domain']], [[None, None, 'user', 'domain']], [[None, None, 'user', 'domain']], [[None, None, 'resu', 'domain']], None, None, None, 'id-id-id-yayaya'] } } self._fetchWork(uid) def testFetchEnvelopeUID(self): self.testFetchEnvelope(1) def testFetchBodyStructure(self, uid=0): self.function = self.client.fetchBodyStructure self.messages = '3:9,10:*' self.msgObjs = [FakeyMessage({ 'content-type': 'text/plain; name=thing; key=value', 'content-id': 'this-is-the-content-id', 'content-description': 'describing-the-content-goes-here!', 'content-transfer-encoding': '8BIT', }, (), '', 'Body\nText\nGoes\nHere\n', 919293, None)] self.expected = {0: {'BODYSTRUCTURE': [ 'text', 'plain', [['name', 'thing'], ['key', 'value']], 'this-is-the-content-id', 'describing-the-content-goes-here!', '8BIT', '20', '4', None, None, None]}} self._fetchWork(uid) def testFetchBodyStructureUID(self): self.testFetchBodyStructure(1) def testFetchSimplifiedBody(self, uid=0): self.function = self.client.fetchSimplifiedBody self.messages = '21' self.msgObjs = [FakeyMessage({}, (), '', 'Yea whatever', 91825, [FakeyMessage({'content-type': 'image/jpg'}, (), '', 'Body Body Body', None, None )] )] self.expected = {0: {'BODY': [None, None, [], None, None, None, '12' ] } } self._fetchWork(uid) def testFetchSimplifiedBodyUID(self): self.testFetchSimplifiedBody(1) def testFetchSimplifiedBodyText(self, uid=0): self.function = self.client.fetchSimplifiedBody self.messages = '21' self.msgObjs = [FakeyMessage({'content-type': 'text/plain'}, (), '', 'Yea whatever', 91825, None)] self.expected = {0: {'BODY': ['text', 'plain', [], None, None, None, '12', '1' ] } } self._fetchWork(uid) def testFetchSimplifiedBodyTextUID(self): self.testFetchSimplifiedBodyText(1) def testFetchSimplifiedBodyRFC822(self, uid=0): self.function = self.client.fetchSimplifiedBody self.messages = '21' self.msgObjs = [FakeyMessage({'content-type': 'message/rfc822'}, (), '', 'Yea whatever', 91825, [FakeyMessage({'content-type': 'image/jpg'}, (), '', 'Body Body Body', None, None )] )] self.expected = {0: {'BODY': ['message', 'rfc822', [], None, None, None, '12', [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None], ['image', 'jpg', [], None, None, None, '14'], '1' ] } } self._fetchWork(uid) def testFetchSimplifiedBodyRFC822UID(self): self.testFetchSimplifiedBodyRFC822(1) def testFetchMessage(self, uid=0): self.function = self.client.fetchMessage self.messages = '1,3,7,10101' self.msgObjs = [ FakeyMessage({'Header': 'Value'}, (), '', 'BODY TEXT\r\n', 91, None), ] self.expected = { 0: {'RFC822': 'Header: Value\r\n\r\nBODY TEXT\r\n'} } self._fetchWork(uid) def testFetchMessageUID(self): self.testFetchMessage(1) def testFetchHeaders(self, uid=0): self.function = self.client.fetchHeaders self.messages = '9,6,2' self.msgObjs = [ FakeyMessage({'H1': 'V1', 'H2': 'V2'}, (), '', '', 99, None), ] self.expected = { 0: {'RFC822.HEADER': imap4._formatHeaders({'H1': 'V1', 'H2': 'V2'})}, } self._fetchWork(uid) def testFetchHeadersUID(self): self.testFetchHeaders(1) def testFetchBody(self, uid=0): self.function = self.client.fetchBody self.messages = '1,2,3,4,5,6,7' self.msgObjs = [ FakeyMessage({'Header': 'Value'}, (), '', 'Body goes here\r\n', 171, None), ] self.expected = { 0: {'RFC822.TEXT': 'Body goes here\r\n'}, } self._fetchWork(uid) def testFetchBodyUID(self): self.testFetchBody(1) def testFetchSize(self, uid=0): self.function = self.client.fetchSize self.messages = '1:100,2:*' self.msgObjs = [ FakeyMessage({}, (), '', 'x' * 20, 123, None), ] self.expected = { 0: {'RFC822.SIZE': '20'}, } self._fetchWork(uid) def testFetchSizeUID(self): self.testFetchSize(1) def testFetchFull(self, uid=0): self.function = self.client.fetchFull self.messages = '1,3' self.msgObjs = [ FakeyMessage({}, ('\\XYZ', '\\YZX', 'Abc'), 'Sun, 25 Jul 2010 06:20:30 -0400 (EDT)', 'xyz' * 2, 654, None), FakeyMessage({}, ('\\One', '\\Two', 'Three'), 'Mon, 14 Apr 2003 19:43:44 -0400', 'abc' * 4, 555, None), ] self.expected = { 0: {'FLAGS': ['\\XYZ', '\\YZX', 'Abc'], 'INTERNALDATE': '25-Jul-2010 06:20:30 -0400', 'RFC822.SIZE': '6', 'ENVELOPE': [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None], 'BODY': [None, None, [], None, None, None, '6']}, 1: {'FLAGS': ['\\One', '\\Two', 'Three'], 'INTERNALDATE': '14-Apr-2003 19:43:44 -0400', 'RFC822.SIZE': '12', 'ENVELOPE': [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None], 'BODY': [None, None, [], None, None, None, '12']}, } self._fetchWork(uid) def testFetchFullUID(self): self.testFetchFull(1) def testFetchAll(self, uid=0): self.function = self.client.fetchAll self.messages = '1,2:3' self.msgObjs = [ FakeyMessage({}, (), 'Mon, 14 Apr 2003 19:43:44 +0400', 'Lalala', 10101, None), FakeyMessage({}, (), 'Tue, 15 Apr 2003 19:43:44 +0200', 'Alalal', 20202, None), ] self.expected = { 0: {'ENVELOPE': [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None], 'RFC822.SIZE': '6', 'INTERNALDATE': '14-Apr-2003 19:43:44 +0400', 'FLAGS': []}, 1: {'ENVELOPE': [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None], 'RFC822.SIZE': '6', 'INTERNALDATE': '15-Apr-2003 19:43:44 +0200', 'FLAGS': []}, } self._fetchWork(uid) def testFetchAllUID(self): self.testFetchAll(1) def testFetchFast(self, uid=0): self.function = self.client.fetchFast self.messages = '1' self.msgObjs = [ FakeyMessage({}, ('\\X',), '19 Mar 2003 19:22:21 -0500', '', 9, None), ] self.expected = { 0: {'FLAGS': ['\\X'], 'INTERNALDATE': '19-Mar-2003 19:22:21 -0500', 'RFC822.SIZE': '0'}, } self._fetchWork(uid) def testFetchFastUID(self): self.testFetchFast(1) class FetchSearchStoreTestCase(unittest.TestCase, IMAP4HelperMixin): __implements__ = (imap4.ISearchableMailbox,) def setUp(self): self.expected = self.result = None self.server_received_query = None self.server_received_uid = None self.server_received_parts = None self.server_received_messages = None self.server = imap4.IMAP4Server() self.server.state = 'select' self.server.mbox = self self.connected = defer.Deferred() self.client = SimpleClient(self.connected) def search(self, query, uid): self.server_received_query = query self.server_received_uid = uid return self.expected def addListener(self, *a, **kw): pass removeListener = addListener def _searchWork(self, uid): def search(): return self.client.search(self.query, uid=uid) def result(R): self.result = R self.connected.addCallback(strip(search) ).addCallback(result ).addCallback(self._cbStopClient ).addErrback(self._ebGeneral) loopback.loopbackTCP(self.server, self.client) # Ensure no short-circuiting wierdness is going on self.failIf(self.result is self.expected) self.assertEquals(self.result, self.expected) self.assertEquals(self.uid, self.server_received_uid) self.assertEquals( imap4.parseNestedParens(self.query), self.server_received_query ) def testSearch(self): self.query = imap4.Or( imap4.Query(header=('subject', 'substring')), imap4.Query(larger=1024, smaller=4096), ) self.expected = [1, 4, 5, 7] self.uid = 0 self._searchWork(0) def testUIDSearch(self): self.query = imap4.Or( imap4.Query(header=('subject', 'substring')), imap4.Query(larger=1024, smaller=4096), ) self.uid = 1 self.expected = [1, 2, 3] self._searchWork(1) def getUID(self, msg): try: return self.expected[msg]['UID'] except (TypeError, IndexError): return self.expected[msg-1] except KeyError: return 42 def fetch(self, messages, uid): self.server_received_uid = uid self.server_received_messages = str(messages) return self.expected def _fetchWork(self, fetch): def result(R): self.result = R self.connected.addCallback(strip(fetch) ).addCallback(result ).addCallback(self._cbStopClient ).addErrback(self._ebGeneral) loopback.loopbackTCP(self.server, self.client) # Ensure no short-circuiting wierdness is going on self.failIf(self.result is self.expected) self.parts and self.parts.sort() self.server_received_parts and self.server_received_parts.sort() if self.uid: for (k, v) in self.expected.items(): v['UID'] = str(k) self.assertEquals(self.result, self.expected) self.assertEquals(self.uid, self.server_received_uid) self.assertEquals(self.parts, self.server_received_parts) self.assertEquals(imap4.parseIdList(self.messages), imap4.parseIdList(self.server_received_messages)) class FakeMailbox: def __init__(self): self.args = [] def addMessage(self, body, flags, date): self.args.append((body, flags, date)) return defer.succeed(None) class FeaturefulMessage: __implements__ = imap4.IMessageFile, def getFlags(self): return 'flags' def getInternalDate(self): return 'internaldate' def open(self): return StringIO("open") class MessageCopierMailbox: __implements__ = imap4.IMessageCopier, def __init__(self): self.msgs = [] def copy(self, msg): self.msgs.append(msg) return len(self.msgs) class CopyWorkerTestCase(unittest.TestCase): def testFeaturefulMessage(self): s = imap4.IMAP4Server() # Yes. I am grabbing this uber-non-public method to test it. # It is complex. It needs to be tested directly! # Perhaps it should be refactored, simplified, or split up into # not-so-private components, but that is a task for another day. # Ha ha! Addendum! Soon it will be split up, and this test will # be re-written to just use the default adapter for IMailbox to # IMessageCopier and call .copy on that adapter. f = s._IMAP4Server__cbCopy m = FakeMailbox() d = f([(i, FeaturefulMessage()) for i in range(1, 11)], 'tag', m) r = unittest.deferredResult(d) for a in m.args: self.assertEquals(a[0].read(), "open") self.assertEquals(a[1], "flags") self.assertEquals(a[2], "internaldate") for (status, result) in r: self.failUnless(status) self.assertEquals(result, None) def testUnfeaturefulMessage(self): s = imap4.IMAP4Server() # See above comment f = s._IMAP4Server__cbCopy m = FakeMailbox() msgs = [FakeyMessage({'Header-Counter': str(i)}, (), 'Date', 'Body %d' % (i,), i + 10, None) for i in range(1, 11)] d = f([im for im in zip(range(1, 11), msgs)], 'tag', m) r = unittest.deferredResult(d) seen = [] for a in m.args: seen.append(a[0].read()) self.assertEquals(a[1], ()) self.assertEquals(a[2], "Date") seen.sort() exp = ["Header-Counter: %d\r\n\r\nBody %d" % (i, i) for i in range(1, 11)] exp.sort() self.assertEquals(seen, exp) for (status, result) in r: self.failUnless(status) self.assertEquals(result, None) def testMessageCopier(self): s = imap4.IMAP4Server() # See above comment f = s._IMAP4Server__cbCopy m = MessageCopierMailbox() msgs = [object() for i in range(1, 11)] d = f([im for im in zip(range(1, 11), msgs)], 'tag', m) r = unittest.deferredResult(d) self.assertEquals(r, zip([1] * 10, range(1, 11))) for (orig, new) in zip(msgs, m.msgs): self.assertIdentical(orig, new) class TLSTestCase(IMAP4HelperMixin, unittest.TestCase): serverCTX = ServerTLSContext and ServerTLSContext() clientCTX = ClientTLSContext and ClientTLSContext() def loopback(self): loopback.loopbackTCP(self.server, self.client) def testAPileOfThings(self): SimpleServer.theAccount.addMailbox('inbox') called = [] def login(): called.append(None) return self.client.login('testuser', 'password-test') def list(): called.append(None) return self.client.list('inbox', '%') def status(): called.append(None) return self.client.status('inbox', 'UIDNEXT') def examine(): called.append(None) return self.client.examine('inbox') def logout(): called.append(None) return self.client.logout() self.client.requireTransportSecurity = True methods = [login, list, status, examine, logout] map(self.connected.addCallback, map(strip, methods)) self.connected.addCallbacks(self._cbStopClient, self._ebGeneral) self.loopback() self.assertEquals(self.server.startedTLS, True) self.assertEquals(self.client.startedTLS, True) self.assertEquals(len(called), len(methods)) def testLoginLogin(self): self.server.checker.addUser('testuser', 'password-test') success = [] self.client.registerAuthenticator(imap4.LOGINAuthenticator('testuser')) self.connected.addCallback( lambda _: self.client.authenticate('password-test') ).addCallback( lambda _: self.client.logout() ).addCallback(success.append ).addCallback(self._cbStopClient ).addErrback(self._ebGeneral) self.loopback() self.assertEquals(len(success), 1) if ClientTLSContext is None: for case in (TLSTestCase,): case.skip = "OpenSSL not present"