#! /usr/local/bin/python2.3
### Train spambayes on all previously-untrained messages in a mailbox.
###
### This keeps track of messages it's already trained by adding an
### X-Spambayes-Trained: header to each one. Then, if you move one to
### another folder, it will retrain that message. You would want to run
### this from a cron job on your server.
"""Usage: %(program)s [OPTIONS] ...
Where OPTIONS is one or more of:
-h
show usage and exit
-d DBNAME
use the DBM store. A DBM file is larger than the pickle and
creating it is slower, but loading it is much faster,
especially for large word databases. Recommended for use with
sb_filter or any procmail-based filter.
-p DBNAME
use the pickle store. A pickle is smaller and faster to create,
but much slower to load. Recommended for use with sb_server and
sb_xmlrpcserver.
-g PATH
mbox or directory of known good messages (non-spam) to train on.
Can be specified more than once.
-s PATH
mbox or directory of known spam messages to train on.
Can be specified more than once.
-f
force training, ignoring the trained header. Use this if you
need to rebuild your database from scratch.
-q
quiet mode; no output
-n train mail residing in "new" directory, in addition to "cur"
directory, which is always trained (Maildir only)
-r remove mail which was trained on (Maildir only)
-o section:option:value
set [section, option] in the options database to value
"""
try:
True, False
except NameError:
# Maintain compatibility with Python 2.2
True, False = 1, 0
import sys, os, getopt, email
import shutil
from spambayes import hammie, storage, mboxutils
from spambayes.Options import options, get_pathname_option
program = sys.argv[0]
loud = True
def get_message(obj):
"""Return an email Message object.
This works like mboxutils.get_message, except it doesn't junk the
headers if there's an error. Doing so would cause a headerless
message to be written back out!
"""
if isinstance(obj, email.Message.Message):
return obj
# Create an email Message object.
if hasattr(obj, "read"):
obj = obj.read()
try:
msg = email.message_from_string(obj)
except email.Errors.MessageParseError:
msg = None
return msg
def msg_train(h, msg, is_spam, force):
"""Train bayes with a single message."""
# XXX: big hack -- why is email.Message unable to represent
# multipart/alternative?
try:
mboxutils.as_string(msg)
except TypeError:
# We'll be unable to represent this as text :(
return False
if is_spam:
spamtxt = options["Headers", "header_spam_string"]
else:
spamtxt = options["Headers", "header_ham_string"]
oldtxt = msg.get(options["Headers", "trained_header_name"])
if force:
# Train no matter what.
if oldtxt != None:
del msg[options["Headers", "trained_header_name"]]
elif oldtxt == spamtxt:
# Skip this one, we've already trained with it.
return False
elif oldtxt != None:
# It's been trained, but as something else. Untrain.
del msg[options["Headers", "trained_header_name"]]
h.untrain(msg, not is_spam)
h.train(msg, is_spam)
msg.add_header(options["Headers", "trained_header_name"], spamtxt)
return True
def maildir_train(h, path, is_spam, force, removetrained):
"""Train bayes with all messages from a maildir."""
if loud: print " Reading %s as Maildir" % (path,)
import time
import socket
pid = os.getpid()
host = socket.gethostname()
counter = 0
trained = 0
for fn in os.listdir(path):
cfn = os.path.join(path, fn)
tfn = os.path.normpath(os.path.join(path, "..", "tmp",
"%d.%d_%d.%s" % (time.time(), pid,
counter, host)))
if (os.path.isdir(cfn)):
continue
counter += 1
if loud and counter % 10 == 0:
sys.stdout.write("\r%6d" % counter)
sys.stdout.flush()
f = file(cfn, "rb")
msg = get_message(f)
f.close()
if not msg:
print "Malformed message: %s. Skipping..." % cfn
continue
if not msg_train(h, msg, is_spam, force):
continue
trained += 1
if not options["Headers", "include_trained"]:
continue
f = file(tfn, "wb")
f.write(mboxutils.as_string(msg))
f.close()
shutil.copystat(cfn, tfn)
# XXX: This will raise an exception on Windows. Do any Windows
# people actually use Maildirs?
os.rename(tfn, cfn)
if (removetrained):
os.unlink(cfn)
if loud:
sys.stdout.write("\r%6d" % counter)
sys.stdout.write("\r Trained %d out of %d messages\n" %
(trained, counter))
def mbox_train(h, path, is_spam, force):
"""Train bayes with a Unix mbox"""
if loud: print " Reading as Unix mbox"
import mailbox
import fcntl
# Open and lock the mailbox. Some systems require it be opened for
# writes in order to assert an exclusive lock.
f = file(path, "r+b")
fcntl.flock(f, fcntl.LOCK_EX)
mbox = mailbox.PortableUnixMailbox(f, get_message)
outf = os.tmpfile()
counter = 0
trained = 0
for msg in mbox:
if not msg:
print "Malformed message number %d. I can't train on this mbox, sorry." % counter
return
counter += 1
if loud and counter % 10 == 0:
sys.stdout.write("\r%6d" % counter)
sys.stdout.flush()
if msg_train(h, msg, is_spam, force):
trained += 1
if options["Headers", "include_trained"]:
# Write it out with the Unix "From " line
outf.write(mboxutils.as_string(msg, True))
if options["Headers", "include_trained"]:
outf.seek(0)
try:
os.ftruncate(f.fileno(), 0)
f.seek(0)
except:
# If anything goes wrong, don't try to write
print "Problem truncating mbox--nothing written"
raise
try:
for line in outf.xreadlines():
f.write(line)
except:
print >> sys.stderr ("Problem writing mbox! Sorry, "
"I tried my best, but your mail "
"may be corrupted.")
raise
fcntl.flock(f, fcntl.LOCK_UN)
f.close()
if loud:
sys.stdout.write("\r%6d" % counter)
sys.stdout.write("\r Trained %d out of %d messages\n" %
(trained, counter))
def mhdir_train(h, path, is_spam, force):
"""Train bayes with an mh directory"""
if loud: print " Reading as MH mailbox"
import glob
counter = 0
trained = 0
for fn in glob.glob(os.path.join(path, "[0-9]*")):
counter += 1
cfn = fn
tfn = os.path.join(path, "spambayes.tmp")
if loud and counter % 10 == 0:
sys.stdout.write("\r%6d" % counter)
sys.stdout.flush()
f = file(fn, "rb")
msg = get_message(f)
f.close()
if not msg:
print "Malformed message: %s. Skipping..." % cfn
continue
msg_train(h, msg, is_spam, force)
trained += 1
if not options["Headers", "include_trained"]:
continue
f = file(tfn, "wb")
f.write(mboxutils.as_string(msg))
f.close()
shutil.copystat(cfn, tfn)
# XXX: This will raise an exception on Windows. Do any Windows
# people actually use MH directories?
os.rename(tfn, cfn)
if loud:
sys.stdout.write("\r%6d" % counter)
sys.stdout.write("\r Trained %d out of %d messages\n" %
(trained, counter))
def train(h, path, is_spam, force, trainnew, removetrained):
if not os.path.exists(path):
raise ValueError("Nonexistent path: %s" % path)
elif os.path.isfile(path):
mbox_train(h, path, is_spam, force)
elif os.path.isdir(os.path.join(path, "cur")):
maildir_train(h, os.path.join(path, "cur"), is_spam, force,
removetrained)
if trainnew:
maildir_train(h, os.path.join(path, "new"), is_spam, force,
removetrained)
elif os.path.isdir(path):
mhdir_train(h, path, is_spam, force)
else:
raise ValueError("Unable to determine mailbox type: " + path)
def usage(code, msg=''):
"""Print usage message and sys.exit(code)."""
if msg:
print >> sys.stderr, msg
print >> sys.stderr
print >> sys.stderr, __doc__ % globals()
sys.exit(code)
def main():
"""Main program; parse options and go."""
global loud
try:
opts, args = getopt.getopt(sys.argv[1:], 'hfqnrd:p:g:s:o:')
except getopt.error, msg:
usage(2, msg)
if not opts:
usage(2, "No options given")
force = False
trainnew = False
removetrained = False
good = []
spam = []
for opt, arg in opts:
if opt == '-h':
usage(0)
elif opt == "-f":
force = True
elif opt == "-n":
trainnew = True
elif opt == "-q":
loud = False
elif opt == '-g':
good.append(arg)
elif opt == '-s':
spam.append(arg)
elif opt == "-r":
removetrained = True
elif opt == '-o':
options.set_from_cmdline(arg, sys.stderr)
pck, usedb = storage.database_type(opts)
if args:
usage(2, "Positional arguments not allowed")
if usedb == None:
# Use settings in configuration file.
usedb = options["Storage", "persistent_use_database"]
pck = get_pathname_option("Storage",
"persistent_storage_file")
h = hammie.open(pck, usedb, "c")
for g in good:
if loud: print "Training ham (%s):" % g
train(h, g, False, force, trainnew, removetrained)
sys.stdout.flush()
save = True
for s in spam:
if loud: print "Training spam (%s):" % s
train(h, s, True, force, trainnew, removetrained)
sys.stdout.flush()
save = True
if save:
h.store()
if __name__ == "__main__":
main()
syntax highlighted by Code2HTML, v. 0.9.1