# Twisted, the Framework of Your Internet # Copyright (C) 2003 Matthew W. Lefkowitz # # This library is free software; you can redistribute it and/or # modify it under the terms of version 2.1 of the GNU Lesser General Public # License as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA import os, types from pprint import pformat from twisted.trial import unittest from twisted.news import news, database from twisted.internet import reactor MESSAGE_ID = "f83ba57450ed0fd8ac9a472b847e830e" POST_STRING = """Path: not-for-mail From: Subject: a test Newsgroups: alt.test.nntp Organization: Summary: Keywords: Message-Id: %s User-Agent: tin/1.4.5-20010409 ("One More Nightmare") (UNIX) (Linux/2.4.17 (i686)) this is a test ... lala moo -- "One World, one Web, one Program." - Microsoft(R) promotional ad "Ein Volk, ein Reich, ein Fuhrer." - Adolf Hitler -- 10:56pm up 4 days, 4:42, 1 user, load average: 0.08, 0.08, 0.12 """ % (MESSAGE_ID) class NewsTestCase(unittest.TestCase): def callback(self, result): self.result = result def errback(self, failure): try: self.fail('Errback called: ' + str(failure)) except Exception, e: self.error = sys.exc_info() raise def timeout(self): reactor.crash() self.fail('Timed out') def setUp(self): self.backend = database.NewsShelf(None, 'news2.db') self.backend.addGroup('alt.test.nntp', 'y') self.backend.postRequest(POST_STRING.replace('\n', '\r\n')) def tearDown(self): try: del self.result except: pass try: del self.error except: pass def testArticleExists(self): d = self.backend.articleExistsRequest(MESSAGE_ID) self.assert_(d.result) def testArticleRequest(self): d = self.backend.articleRequest(None, None, MESSAGE_ID) d.addCallbacks(self.callback, self.errback) id = reactor.callLater(5, self.timeout) while not hasattr(self, 'result') and not hasattr(self, 'error'): reactor.iterate() try: id.cancel() except ValueError: pass error = getattr(self, 'error', None) if error: raise error[0], error[1], error[2] self.failUnless(type(self.result) == types.TupleType, 'callback result is wrong type: ' + str(self.result)) self.failUnless(len(self.result) == 3, 'callback result list should have three entries: ' + str(self.result)) self.failUnless(self.result[1] == MESSAGE_ID, "callback result Message-Id doesn't match: %s vs %s" % (MESSAGE_ID, self.result[1])) body = self.result[2].read() self.failUnless(body.find('\r\n\r\n'), "Can't find \\r\\n\\r\\n between header and body") def testHeadRequest(self): self.testArticleRequest() index = self.result[0] try: del self.result except: pass try: del self.error except: pass d = self.backend.headRequest("alt.test.nntp", index) d.addCallbacks(self.callback, self.errback) id = reactor.callLater(5, self.timeout) while not hasattr(self, 'result') and not hasattr(self, 'error'): reactor.iterate() try: id.cancel() except ValueError: pass error = getattr(self, 'error', None) if error: raise error[0], error[1], error[2] self.failUnless(self.result[1] == MESSAGE_ID, "callback result Message-Id doesn't match: %s vs %s" % (MESSAGE_ID, self.result[1])) self.failUnless(self.result[2][-2:] == '\r\n', "headers must be \\r\\n terminated.") def testBodyRequest(self): self.testArticleRequest() index = self.result[0] try: del self.result except: pass try: del self.error except: pass d = self.backend.bodyRequest("alt.test.nntp", index) d.addCallbacks(self.callback, self.errback) id = reactor.callLater(5, self.timeout) while not hasattr(self, 'result') and not hasattr(self, 'error'): reactor.iterate() try: id.cancel() except ValueError: pass error = getattr(self, 'error', None) if error: raise error[0], error[1], error[2] body = self.result[2].read() self.failUnless(body[0:4] == 'this', "message body has been altered: " + pformat(body[0:4]))