"""Testing Tools Web Interface Classes: TestToolsUserInterface - Interface class for testing tools. Abstract: This module implements a browser based Spambayes user interface for the various testing tools. Users may use it to interface with the tools. The following functions are currently included: onCV - cross-validation testing To do: o Add interface to Alex's incremental test setup. o Suggestions? """ # This module is part of the spambayes project, which is Copyright 2002-3 # The Python Software Foundation and is covered by the Python Software # Foundation license. from __future__ import generators __author__ = "Tony Meyer " __credits__ = "All the Spambayes folk." try: True, False except NameError: # Maintain compatibility with Python 2.2 True, False = 1, 0 import os import sys import cgi import glob import random import StringIO import ProxyUI import oe_mailbox from spambayes import msgs from spambayes import TestDriver from spambayes import OptionsClass from spambayes.Options import options # These are the options that will be offered on the testing page. # If the option is None, then the entry is a header and the following # options will appear in a new box on the configuration page. # These are also used to generate http request parameters and template # fields/variables. testtools_ini_map = ( ## ('General Options', None), # Put any general options that we wish to encourage people to test # here, for example: # ('Classifier', 'max_discriminators'), ('Experimental Options', None), ) # Dynamically add any current experimental/deprecated options. for opt in options.options(True): sect, opt = opt[1:].split(']', 1) if opt[:2].lower() == "x-": testtools_ini_map += ((sect, opt),) class TestToolsUserInterface(ProxyUI.ProxyUserInterface): """Serves the HTML user interface for the test tools.""" def onCv(self): global testtools_ini_map self._writePreamble("CV Test") configTable = self.html.configForm.clone() del configTable.configTextRow1 del configTable.configTextRow2 del configTable.configCbRow1 del configTable.configRow2 del configTable.blankRow del configTable.folderRow # Add some options that are only available via this page. # (This makes displaying the options nice and easy, since # they're just handled like everything else). sect = 'TestToolsUI' for newopt in [('source', 'Messages source', 'Standard test setup', 'Select the source of the messages to test on.', ('Standard test setup', 'Cache', 'Outlook Express'), False), ('n', 'Number of runs', 10, 'Select the number of cross-validation runs.', OptionsClass.INTEGER, False),]: options._options[sect, newopt[0]] = OptionsClass.Option(*newopt) testtools_ini_map += (('Testing Options', None), ('TestToolsUI', 'source'), ('TestToolsUI', 'n'),) option_choice = self._buildConfigPageBody(\ configTable, testtools_ini_map) option_choice.action_page.action = "cvresults" option_choice.introduction = "Select the options for your test " \ "(these will be run against the " \ "defaults)." option_choice.optionsPathname = "memory only" del option_choice.restore_form del option_choice.adv_button option_choice.config_submit.value = "Run Test" self.write(option_choice) self._writePostamble() def onCvresults(self, *args, **kwargs): del kwargs["how"] self._writePreamble("CV Test Results") text = "Display the results of a cross-validation test with the " \ "current settings against the defaults." nsets = options["TestToolsUI", "n"] # With defaults first. self.write("

Testing with defaults...

") saved = {} for opt in options.options(True): # Ignore those that have do_not_restore as True # (These are predominately storage options, and at least # the cache directory ones may be needed later on). sect, opt = opt[1:].split(']', 1) saved[(sect, opt)] = options[(sect, opt)] if not options.no_restore(sect, opt): options.set(sect, opt, options.default(sect, opt)) options["TestToolsUI", "source"] = kwargs["TestToolsUI_source"] # XXX Cache this somewhere? If the testing data isn't changing, # XXX and the user is running multiple tests, then it doesn't # XXX make much sense to rerun the 'default's test over and over # XXX again. cv_out, errors = self.timCV(nsets) ## print errors.read() defaults = self.rates(cv_out) # Now with specified settings. self.write("

Testing with selected settings...

") for opt in options.options(True): sect, opt = opt[1:].split(']', 1) try: value = kwargs["%s_%s" % (sect, opt)] except KeyError: # Leave as the default. pass else: options.set(sect, opt, value) cv_out, errors = self.timCV(nsets) ## print errors.read() current = self.rates(cv_out) # Restore the settings. for opt in options.options(True): sect, opt = opt[1:].split(']', 1) options.set(sect, opt, saved[(sect, opt)]) # Do the comparison. comp, errors = self.compare(defaults, current) ## print errors.read() # Output the results # XXX This is just what you'd get from running cmp.py # XXX at the moment - it could be prettied up a bit. comp = comp.read() box = self._buildBox('Cross-validation test', None, cgi.escape(comp).replace("\n", "
")) self.write(box) self._writePostamble() def timCV(self, nsets): # Until we are un-lazy enough to change the code borrowed from # timcv.py, just capture the output that normally goes to stdout # or stderr and return it. cout, cerr = sys.stdout, sys.stderr sys.stdout = StringIO.StringIO() sys.stderr = StringIO.StringIO() if options["TestToolsUI", "source"] == "Standard test setup": # Source the test data from the 'standard' test setup, # as described in the testtools directory. hamdirs = [options["TestDriver", "ham_directories"] % \ i for i in range(1, nsets+1)] spamdirs = [options["TestDriver", "spam_directories"] % \ i for i in range(1, nsets+1)] hstream = msgs.HamStream sstream = msgs.SpamStream elif options["TestToolsUI", "source"] == "Cache": # Source the test data from the cache directories # specified in the "Storage" section of the configuration. # This means that we have one 'ham' directory and one # 'spam' directory (we ignore the unknown one, obviously), # but what we really want is n directories of ham and spam. # To overcome this without actually moving the files about # we have a class that fakes it for us. hamdirs = ["%s%s%s/%s" % (options["Storage", "ham_cache"], os.pathsep, i, nsets) for i in range(1, nsets+1)] spamdirs = ["%s%s%s/%s" % (options["Storage", "spam_cache"], os.pathsep, i, nsets) for i in range(1, nsets+1)] hstream = HamCacheStream sstream = SpamCacheStream elif options["TestToolsUI", "source"] == "Outlook Express": # Source the test data from Outlook Express # Pretty crude at the moment (hard coded): # Ignores: # o Deleted Items # o Drafts # o Folders # o Offline # o Outbox # o Pop3uidl # Assumes that anything with 'spam' in the name is spam # (so don't have a folder called "looks like spam"!) and # that anything else is ham. # No mixed dbxes! # Each dbx is the equivilent of a directory in the 'standard' # test setup - so it would be good if each dbx had a roughly # equal number of messages, and if there were the same number # of ham and spam dbxes. dbx_dir = oe_mailbox.OEStoreRoot() dbxes = glob.glob(os.path.join(dbx_dir, "*.dbx")) spamdirs = [] hamdirs = [] for dbx in dbxes: if os.path.splitext(os.path.basename(dbx))[0].lower() in \ ["deleted items", "drafts", "folders", "offline", "outbox", "pop3uidl",]: continue elif dbx.lower().find("spam") == -1: spamdirs.append(dbx) else: hamdirs.append(dbx) hstream = oe_mailbox.OEHamStream sstream = oe_mailbox.OESpamStream d = TestDriver.Driver() # Train it on all sets except the first. h = hstream("%s-%d" % (hamdirs[1], nsets), hamdirs[1:], train=1) d.train(hstream("%s-%d" % (hamdirs[1], nsets), hamdirs[1:], train=1), sstream("%s-%d" % (spamdirs[1], nsets), spamdirs[1:], train=1)) # Now run nsets times, predicting pair i against all except pair i. for i in range(nsets): h = hamdirs[i] s = spamdirs[i] hamstream = hstream(h, [h], train=0) spamstream = sstream(s, [s], train=0) if i > 0: if options["CV Driver", "build_each_classifier_from_scratch"]: # Build a new classifier from the other sets. d.new_classifier() hname = "%s-%d, except %d" % (hamdirs[0], nsets, i+1) h2 = hamdirs[:] del h2[i] sname = "%s-%d, except %d" % (spamdirs[0], nsets, i+1) s2 = spamdirs[:] del s2[i] d.train(hstream(hname, h2, train=1), sstream(sname, s2, train=1)) else: # Forget this set. d.untrain(hamstream, spamstream) # Predict this set. d.test(hamstream, spamstream) d.finishtest() if i < nsets - 1 and not options["CV Driver", "build_each_classifier_from_scratch"]: # Add this set back in. d.train(hamstream, spamstream) d.alldone() # Other end of the lazy 'capture the output' code. sys.stdout.seek(0) sys.stderr.seek(0) out, err = sys.stdout, sys.stderr sys.stdout = cout sys.stderr = cerr return out, err def rates(self, ifile): """This is essentially rates.py from the testtools directory.""" # XXX Stop being lazy and using the remapping cout/cerr cheat # XXX at some point. cout = sys.stdout cerr = sys.stderr sys.stdout = StringIO.StringIO() sys.stderr = StringIO.StringIO() interesting = filter(lambda line: line.startswith('-> '), ifile) ifile.close() ofile = StringIO.StringIO() def dump(*stuff): msg = ' '.join(map(str, stuff)) print msg print >> ofile, msg ntests = nfn = nfp = 0 sumfnrate = sumfprate = 0.0 for line in interesting: dump(line[:-1]) fields = line.split() # 0 1 2 3 4 5 6 -5 -4 -3 -2 -1 #-> tested 4000 hams & 2750 spams against 8000 hams & 5500 spams if line.startswith('-> tested '): ntests += 1 continue # 0 1 2 3 # -> false positive %: 0.025 # -> false negative %: 0.327272727273 if line.startswith('-> false '): kind = fields[3] percent = float(fields[-1]) if kind == 'positive': sumfprate += percent lastval = percent else: sumfnrate += percent dump(' %7.3f %7.3f' % (lastval, percent)) continue # 0 1 2 3 4 5 # -> 1 new false positives if len(fields) >= 5 and fields[3] == 'new' and fields[4] == 'false': kind = fields[-1] count = int(fields[2]) if kind == 'positives': nfp += count else: nfn += count dump('total unique false pos', nfp) dump('total unique false neg', nfn) dump('average fp %', sumfprate / ntests) dump('average fn %', sumfnrate / ntests) ofile.seek(0) sys.stdout = cout sys.stderr = cerr return ofile def compare(self, f1, f2): """This is essentially cmp.py from the testtools directory.""" # XXX Stop being lazy and using the remapping cout/cerr cheat # XXX at some point. cout, cerr = sys.stdout, sys.stderr sys.stdout = StringIO.StringIO() sys.stderr = StringIO.StringIO() def suck(f): fns = [] fps = [] hamdev = [] spamdev = [] hamdevall = spamdevall = (0.0, 0.0) get = f.readline while 1: line = get() if line.startswith('-> tested'): print line, if line.find(' items; mean ') != -1: # -> Ham distribution for this pair: 1000 items; mean 0.05; sample sdev 0.68 # and later "sample " went away vals = line.split(';') mean = float(vals[1].split()[-1]) sdev = float(vals[2].split()[-1]) val = (mean, sdev) typ = vals[0].split()[2] if line.find('for all runs') != -1: if typ == 'Ham': hamdevall = val else: spamdevall = val elif line.find('all in this') != -1: if typ == 'Ham': hamdev.append(val) else: spamdev.append(val) continue if line.startswith('-> '): continue if line.startswith('total'): break if len(line) == 0: continue # A line with an f-p rate and an f-n rate. p, n = map(float, line.split()) fps.append(p) fns.append(n) # "total unique false pos 0" # "total unique false neg 0" # "average fp % 0.0" # "average fn % 0.0" fptot = int(line.split()[-1]) fntot = int(get().split()[-1]) fpmean = float(get().split()[-1]) fnmean = float(get().split()[-1]) return (fps, fns, fptot, fntot, fpmean, fnmean, hamdev, spamdev, hamdevall, spamdevall) def tag(p1, p2): if p1 == p2: t = "tied " else: t = p1 < p2 and "lost " or "won " if p1: p = (p2 - p1) * 100.0 / p1 t += " %+7.2f%%" % p else: t += " +(was 0)" return t def mtag(m1, m2): mean1, dev1 = m1 mean2, dev2 = m2 t = "%7.2f %7.2f " % (mean1, mean2) if mean1: mp = (mean2 - mean1) * 100.0 / mean1 t += "%+7.2f%%" % mp else: t += "+(was 0)" t += " %7.2f %7.2f " % (dev1, dev2) if dev1: dp = (dev2 - dev1) * 100.0 / dev1 t += "%+7.2f%%" % dp else: t += "+(was 0)" return t def dump(p1s, p2s): alltags = "" for p1, p2 in zip(p1s, p2s): t = tag(p1, p2) print " %5.3f %5.3f %s" % (p1, p2, t) alltags += t + " " print for t in "won", "tied", "lost": print "%-4s %2d times" % (t, alltags.count(t)) print def dumpdev(meandev1, meandev2): for m1, m2 in zip(meandev1, meandev2): print mtag(m1, m2) (fp1, fn1, fptot1, fntot1, fpmean1, fnmean1, hamdev1, spamdev1, hamdevall1, spamdevall1) = suck(f1) (fp2, fn2, fptot2, fntot2, fpmean2, fnmean2, hamdev2, spamdev2, hamdevall2, spamdevall2) = suck(f2) print print "false positive percentages" dump(fp1, fp2) print "total unique fp went from", fptot1, "to", fptot2, tag(fptot1, fptot2) print "mean fp % went from", fpmean1, "to", fpmean2, tag(fpmean1, fpmean2) print print "false negative percentages" dump(fn1, fn2) print "total unique fn went from", fntot1, "to", fntot2, tag(fntot1, fntot2) print "mean fn % went from", fnmean1, "to", fnmean2, tag(fnmean1, fnmean2) print if len(hamdev1) == len(hamdev2) and len(spamdev1) == len(spamdev2): print "ham mean ham sdev" dumpdev(hamdev1, hamdev2) print print "ham mean and sdev for all runs" dumpdev([hamdevall1], [hamdevall2]) print print "spam mean spam sdev" dumpdev(spamdev1, spamdev2) print print "spam mean and sdev for all runs" dumpdev([spamdevall1], [spamdevall2]) print diff1 = spamdevall1[0] - hamdevall1[0] diff2 = spamdevall2[0] - hamdevall2[0] print "ham/spam mean difference: %2.2f %2.2f %+2.2f" % (diff1, diff2, diff2 - diff1) else: print "[info about ham & spam means & sdevs not available in both files]" sys.stdout.seek(0) sys.stderr.seek(0) out, err = sys.stdout, sys.stderr sys.stdout = cout sys.stderr = cerr return out, err # The iterator yields a stream of Msg objects from the given # 'directory'. The directory is actually the actual directory # and then an indication of the portion of it that we are after. # (so that a single directory can be used, a la the caches, rather # than a nicely split up into sets directory). class CacheStream(msgs.MsgStream): def produce(self): # We only want some of the msgs. Shuffle each directory list, but # in such a way that we'll get the same result each time this is # called on the same directory list. base_check = None for directory in self.directories: directory, portion = directory.split(os.pathsep) # All the directories in the list *must* be the same, and just # different sections, because this makes the code easier, and is # the desired usage, anyway. if base_check is None: base_check = directory assert directory == base_check set_num, nsets = portion.split('/') all = os.listdir(directory) random.seed(hash(max(all)) ^ msgs.SEED) random.shuffle(all) set_size = len(all) // int(nsets) set_num = int(set_num) set = all[set_num*set_size:((set_num+1)*set_size)-1] set.sort() for fname in set: yield msgs.Msg(directory, fname) class HamCacheStream(CacheStream): def __init__(self, tag, directories, train=0): if train: CacheStream.__init__(self, tag, directories, msgs.HAMTRAIN) else: CacheStream.__init__(self, tag, directories, msgs.HAMTEST) class SpamCacheStream(CacheStream): def __init__(self, tag, directories, train=0): if train: CacheStream.__init__(self, tag, directories, msgs.SPAMTRAIN) else: CacheStream.__init__(self, tag, directories, msgs.SPAMTEST)