#! /usr/bin/env python
## Copyright 2001 by LivingLogic AG, Bayreuth, Germany.
## Copyright 2001 by Walter Dörwald
##
## All Rights Reserved
##
## Permission to use, copy, modify, and distribute this software and its documentation
## for any purpose and without fee is hereby granted, provided that the above copyright
## notice appears in all copies and that both that copyright notice and this permission
## notice appear in supporting documentation, and that the name of LivingLogic AG or
## the author not be used in advertising or publicity pertaining to distribution of the
## software without specific, written prior permission.
##
## LIVINGLOGIC AG AND THE AUTHOR DISCLAIM ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
## INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL
## LIVING LOGIC AG OR THE AUTHOR BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL
## DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
## IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR
## IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""
A module that provides some tools for file i/o related stuff.
The first is a class Filename, that
provides several methods (e.g. a method open,
that automatically creates any non exiting directories).
The second is a class named SafeFile,
that encapsulates a normal file. Nothing will be written to the file with the
specified name. All writes go to a temporary file instead. Only when the
method close is called, will the
temporary file be closed and the file renamed to the original name.
The third tool is a class LogFile. Writing
to a log file prepends a timestamp to each line.
"""
__version__ = tuple(map(int,"$Revision: 1.29 $"[11:-2].split(".")))
# $Source: /data/cvsroot/LivingLogic/Python/fileutils/fileutils.py,v $
import sys, os, os.path, time, UserString, types
try:
import datetime
def timestamp2dt(t):
return datetime.datetime.utcfromtimestamp(t)
except ImportError:
from mx import DateTime
def timestamp2dt(t):
return DateTime.mktime(time.gmtime(t))
from ll import url
class Filename(UserString.UserString):
def __init__(self, name):
"""
create a new Filename instance.
name may be a Filename
instance or a string.
"""
if isinstance(name, Filename):
UserString.UserString.__init__(self, name.data)
else:
if name.startswith("file:"):
name = name[5:]
name = os.path.expanduser(os.path.normpath(name))
UserString.UserString.__init__(self, name)
def clone(self):
"""
return a (deep) copy of .
"""
return self.__class__(self)
def __cmp__(self, other):
if type(other) is type(self):
othertype = other.__class__
else:
othertype = type(other)
res = cmp(self.__class__.__name__, othertype.__name__)
if not res:
res = cmp(self.data, other.data)
return res
def __hash__(self):
return hash(self.data)
def url(self):
"""
return as an URL (i.e. with a leading file:.
"""
return "file:" + self.data
def ext(self):
"""
return the file extension.
"""
return os.path.splitext(self.data)[1]
def withoutExt(self):
"""
return a copy of without an extension.
"""
return Filename(os.path.splitext(self.data)[0])
def withExt(self, ext):
"""
return a copy of with the new extension ext.
"""
return Filename(os.path.splitext(self.data)[0] + ext)
def open(self, mode="rb", buffering=-1):
"""
Works like the builtin function open, but is able to create
non existing directories.
"""
name = self.data
try:
return open(name, mode, buffering)
except IOError, ex:
if "w" in mode:
if ex[0] != 2: # didn't work for some other reason than a non existing directory
raise
(splitpath, splitname) = os.path.split(name)
if splitpath!="": # we really have a parent
os.makedirs(splitpath)
return open(name, mode, buffering)
raise # we don't have a directory to make (or open was for reading), so pass the error on
def openread(self, headers=None, data=None):
return url.Filename(self.data).openread(headers=headers, data=data)
def openwrite(self):
return url.Filename(self.data).openwrite()
def remove(self):
"""
delete the file
"""
os.remove(self.data)
def rename(self, other):
"""
rename the file. other must be a Filename instance.
"""
os.rename(self.data, other.data)
def getmtime(self):
"""
return the time of the last modification timestamp to the file object.
"""
timestamp = os.path.getmtime(self.data)
return timestamp2dt(timestamp)
def mkdir(self):
"""
create a directory with the name of .
"""
os.makedirs(self.data)
def mkparentdir(self):
"""
create the parent directory of .
"""
name = self.data
(parent, dummy) = os.path.split(name)
if parent != name:
os.makedirs(parent)
def files(self, withDir=0):
"""
return a list of files in this directory (and all subdirectories)
"""
files = {}
name = self.data
os.path.walk(name, self.__walk, (withDir, files))
files = files.keys()
files.sort()
return [ Filename(file) for file in files ]
def __walk(self, (withDir, files), dirname, filenames):
"""
internal method used by files
"""
try:
del files[dirname] # don't collect directories
except KeyError:
pass
l = len(self.data) + len(os.sep)
for filename in filenames:
filename = os.path.join(dirname, filename)
if not withDir:
filename = filename[l:]
files[filename] = None
def __div__(self, other):
"""
contatenate two filenames. other may be a
Filename or a string.
"""
if isinstance(other, Filename):
other = other.data
return self.__class__(os.path.join(self.data, other))
def cd(self, other):
"""
synonym for self/other.
"""
return self/other
def __repr__(self):
return "%s(%r)" % (self.__class__.__name__, self.data)
def cwd():
"""
return the current working directory as a Filename.
"""
return Filename(os.getcwd())
class SafeFile:
"""
encapsulates a normal file.
When the file will be opened, all writes go to
a temporary file. After all writes are finished
the file will be renamed to its final name.
"""
def __init__(self, name, mode="w", buffering=0):
self.name = Filename(name)
self.tmpname = self.name + ("_%d.tmp" % os.getpid())
self.mode = mode
self.buffering = buffering
self.file = None
def write(self, *texts):
self.__open()
for text in texts:
self.file.write(text)
def writelines(self, *alllines):
self.__open()
for lines in alllines:
self.file.writelines(lines)
def __open(self):
if self.file is None:
self.file = self.tmpname.open(self.mode, self.buffering)
def close(self):
"""
This method can be called by the user when all writes to the file
are done. The file will be closed and moved to the final destination.
"""
if self.file is not None:
self.file.close()
self.tmpname.rename(self.name)
self.file = None
class LogFile:
"""
A log file. All lines written to the file will be prepended with a time stamp.
"""
def __init__(self, name, mode="a", buffering=1, encoding="iso-8859-1"):
"""
create a new log file (which will be opened on the first write). Arguments are:
- name: the filename (either as a string or a
Filename instance).
- mode: The mode for opening the file (should be "w" or "a")
- buffering: the buffering for the file (0 is unbuffered, 1 is line buffered, any other integer specifies the buffersize)
- encoding: the encoding to use for the strings written to the file
"""
self.startTime = time.time()
self.name = name
self.mode = mode
self.buffering = buffering
self.encoding = encoding
self.file = None
def _formatTime(self, timestamp):
"""
format timestamp into a string.
"""
return time.strftime("%d/%b/%Y %H:%M:%S", time.localtime(timestamp))
def __open(self):
if self.file is None:
self.file = Filename(self.name).open(self.mode, self.buffering)
def write(self, *texts):
"""
write texts to the log file.
"""
now = time.time()
pid = os.getpid()
self.__open()
for text in texts:
if type(text) is types.UnicodeType:
text = text.encode(self.encoding, "replace")
lines = text.split("\n")
if not len(lines[-1]):
del lines[-1]
lines = [ "[pid=%d][%s]=[t+%.2fsec] %s\n" % (pid, self._formatTime(now), now-self.startTime, line) for line in lines ]
self.file.write("".join(lines))
def getFirst(names):
"""
Return the first file or directory from the sequence names that does exist.
None entries will be ignored. If none of the names exists None will be
returned.
"""
for name in names:
if name is not None:
if os.path.exists(name):
return name
return None