#!/usr/bin/env python2.4
from basetest import BaseTest
import sys, tempfile, os, shutil
from StringIO import StringIO
import unittest, signal
from logging import getLogger, DEBUG, INFO, WARN
#getLogger().setLevel(DEBUG)

sys.path.insert(0, '..')

from zeroinstall.injector import model, basedir, autopolicy, gpg, iface_cache, download, reader, trust
from zeroinstall.zerostore import Store; Store._add_with_helper = lambda *unused: False
import data

import server

class Reply:
	def __init__(self, reply):
		self.reply = reply

	def readline(self):
		return self.reply

class TestDownload(BaseTest):
	def setUp(self):
		BaseTest.setUp(self)

		stream = tempfile.TemporaryFile()
		stream.write(data.thomas_key)
		stream.seek(0)
		gpg.import_key(stream)
		self.child = None

		trust.trust_db.watchers = []
	
	def tearDown(self):
		BaseTest.tearDown(self)
		if self.child is not None:
			os.kill(self.child, signal.SIGTERM)
			os.waitpid(self.child, 0)
			self.child = None
	
	def testRejectKey(self):
		old_out = sys.stdout
		try:
			sys.stdout = StringIO()
			self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg')
			policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False)
			assert policy.need_download()
			sys.stdin = Reply("N\n")
			try:
				policy.download_and_execute(['Hello'])
				assert 0
			except model.SafeException, ex:
				if "Not signed with a trusted key" not in str(ex):
					raise ex
		finally:
			sys.stdout = old_out
	
	def testRejectKeyXML(self):
		old_out = sys.stdout
		try:
			sys.stdout = StringIO()
			self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
			policy = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml', download_only = False)
			assert policy.need_download()
			sys.stdin = Reply("N\n")
			try:
				policy.download_and_execute(['Hello'])
				assert 0
			except model.SafeException, ex:
				if "Not signed with a trusted key" not in str(ex):
					raise
		finally:
			sys.stdout = old_out
	
	def testImport(self):
		old_out = sys.stdout
		try:
			from zeroinstall.injector import cli
			import logging

			rootLogger = getLogger()
			rootLogger.disabled = True
			try:
				try:
					cli.main(['--import', '-v', 'NO-SUCH-FILE'])
					assert 0
				except model.SafeException, ex:
					assert 'NO-SUCH-FILE' in str(ex)
			finally:
				rootLogger.disabled = False
				rootLogger.setLevel(WARN)

			hello = iface_cache.iface_cache.get_interface('http://localhost:8000/Hello')
			self.assertEquals(0, len(hello.implementations))

			sys.stdout = StringIO()
			self.child = server.handle_requests('6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
			sys.stdin = Reply("Y\n")

			assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
			cli.main(['--import', 'Hello'])
			assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')

			# Check we imported the interface after trusting the key
			reader.update_from_cache(hello)
			self.assertEquals(1, len(hello.implementations))

			# Shouldn't need to prompt the second time
			sys.stdin = None
			cli.main(['--import', 'Hello'])
		finally:
			sys.stdout = old_out
	
	def testAcceptKey(self):
		old_out = sys.stdout
		try:
			sys.stdout = StringIO()
			self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
			policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False)
			assert policy.need_download()
			sys.stdin = Reply("Y\n")
			try:
				policy.download_and_execute(['Hello'], main = 'Missing')
				assert 0
			except model.SafeException, ex:
				if "HelloWorld/Missing" not in str(ex):
					raise ex
		finally:
			sys.stdout = old_out
	
	def testRecipe(self):
		old_out = sys.stdout
		try:
			sys.stdout = StringIO()
			self.child = server.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
			policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False)
			try:
				policy.download_and_execute([])
				assert False
			except model.SafeException, ex:
				if "HelloWorld/Missing" not in str(ex):
					raise ex
		finally:
			sys.stdout = old_out

	def testSymlink(self):
		old_out = sys.stdout
		try:
			sys.stdout = StringIO()
			self.child = server.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
			policy = autopolicy.AutoPolicy(os.path.abspath('RecipeSymlink.xml'), download_only = False)
			try:
				policy.download_and_execute([])
				assert False
			except model.SafeException, ex:
				if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
					raise ex
			self.assertEquals(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
		finally:
			sys.stdout = old_out

	def testAutopackage(self):
		old_out = sys.stdout
		try:
			sys.stdout = StringIO()
			self.child = server.handle_requests('HelloWorld.autopackage')
			policy = autopolicy.AutoPolicy(os.path.abspath('Autopackage.xml'), download_only = False)
			try:
				policy.download_and_execute([])
				assert False
			except model.SafeException, ex:
				if "HelloWorld/Missing" not in str(ex):
					raise ex
		finally:
			sys.stdout = old_out

	def testRecipeFailure(self):
		old_out = sys.stdout
		try:
			sys.stdout = StringIO()
			self.child = server.handle_requests('*')
			policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False)
			try:
				policy.download_and_execute([])
				assert False
			except download.DownloadError, ex:
				if "Connection" not in str(ex):
					raise ex
		finally:
			sys.stdout = old_out

suite = unittest.makeSuite(TestDownload)
if __name__ == '__main__':
	sys.argv.append('-v')
	unittest.main()


syntax highlighted by Code2HTML, v. 0.9.1