# -*- test-case-name: twisted.test.test_journal -*-
#
# Twisted, the Framework of Your Internet
# Copyright (C) 2001-2002 Matthew W. Lefkowitz
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""Basic classes and interfaces for journal."""
from __future__ import nested_scopes
# system imports
import os, time
try:
import cPickle as pickle
except ImportError:
import pickle
# twisted imports
from twisted.python.components import Interface
class Journal:
"""All commands to the system get routed through here.
Subclasses should implement the actual snapshotting capability.
"""
def __init__(self, log, journaledService):
self.log = log
self.journaledService = journaledService
self.latestIndex = self.log.getCurrentIndex()
def updateFromLog(self):
"""Run all commands from log that haven't been run yet.
This method should be run on startup to ensure the snapshot
is up-to-date.
"""
snapshotIndex = self.getLastSnapshot()
if snapshotIndex < self.latestIndex:
for cmdtime, command in self.log.getCommandsSince(snapshotIndex + 1):
command.execute(self.journaledService, cmdtime)
def executeCommand(self, command):
"""Log and execute a command."""
runTime = time.time()
d = self.log.logCommand(command, runTime)
d.addCallback(self._reallyExecute, command, runTime)
return d
def _reallyExecute(self, index, command, runTime):
"""Callback called when logging command is done."""
result = command.execute(self.journaledService, runTime)
self.latestIndex = index
return result
def getLastSnapshot(self):
"""Return command index of the last snapshot taken."""
raise NotImplementedError
def sync(self, *args, **kwargs):
"""Save journal to disk, returns Deferred of finish status.
Subclasses may choose whatever signature is appropriate, or may
not implement this at all.
"""
raise NotImplementedError
class MemoryJournal(Journal):
"""Prevayler-like journal that dumps from memory to disk."""
def __init__(self, log, journaledService, path, loadedCallback):
self.path = path
if os.path.exists(path):
try:
self.lastSync, obj = pickle.load(open(path, "rb"))
except (IOError, OSError, pickle.UnpicklingError):
self.lastSync, obj = 0, None
loadedCallback(obj)
else:
self.lastSync = 0
loadedCallback(None)
Journal.__init__(self, log, journaledService)
def getLastSnapshot(self):
return self.lastSync
def sync(self, obj):
# make this more reliable at some point
f = open(self.path, "wb")
pickle.dump((self.latestIndex, obj), f, 1)
f.close()
self.lastSync = self.latestIndex
class ICommand(Interface):
"""A serializable command which interacts with a journaled service."""
def execute(self, journaledService, runTime):
"""Run the command and return result."""
class ICommandLog(Interface):
"""Interface for command log."""
def logCommand(self, command, runTime):
"""Add a command and its run time to the log.
@return: Deferred of command index.
"""
def getCurrentIndex(self):
"""Return index of last command that was logged."""
def getCommandsSince(self, index):
"""Return commands who's index >= the given one.
@return: list of (time, command) tuples, sorted with ascending times.
"""
class LoadingService:
"""Base class for journalled service used with Wrappables."""
def loadObject(self, objType, objId):
"""Return object of specified type and id."""
raise NotImplementedError
class Wrappable:
"""Base class for objects used with LoadingService."""
objectType = None # override in base class
def getUid(self):
"""Return uid for loading with LoadingService.loadObject"""
raise NotImplementedError
class WrapperCommand:
__implements__ = ICommand
def __init__(self, methodName, obj, args=(), kwargs={}):
self.obj = obj
self.objId = obj.getUid()
self.objType = obj.objectType
self.methodName = methodName
self.args = args
self.kwargs = kwargs
def execute(self, svc, commandTime):
if not hasattr(self, "obj"):
obj = svc.loadObject(self.objType, self.objId)
else:
obj = self.obj
return getattr(obj, self.methodName)(*self.args, **self.kwargs)
def __getstate__(self):
d = self.__dict__.copy()
del d["obj"]
return d
def command(methodName, cmdClass=WrapperCommand):
"""Wrap a method so it gets turned into command automatically.
For use with Wrappables.
Usage::
| class Foo(Wrappable):
| objectType = "foo"
| def getUid(self):
| return self.id
| def _bar(self, x):
| return x + 1
|
| bar = command('_bar')
The resulting callable will have signature identical to wrapped
function, except that it expects journal as first argument, and
returns a Deferred.
"""
def wrapper(obj, journal, *args, **kwargs):
return journal.executeCommand(cmdClass(methodName, obj, args, kwargs))
return wrapper
class ServiceWrapperCommand:
__implements__ = ICommand
def __init__(self, methodName, args=(), kwargs={}):
self.methodName = methodName
self.args = args
self.kwargs = kwargs
def execute(self, svc, commandTime):
return getattr(svc, self.methodName)(*self.args, **self.kwargs)
def __repr__(self):
return "<ServiceWrapperCommand: %s, %s, %s>" % (self.methodName, self.args, self.kwargs)
def __cmp__(self, other):
if hasattr(other, "__dict__"):
return cmp(self.__dict__, other.__dict__)
else:
return 0
def serviceCommand(methodName, cmdClass=ServiceWrapperCommand):
"""Wrap methods into commands for a journalled service.
The resulting callable will have signature identical to wrapped
function, except that it expects journal as first argument, and
returns a Deferred.
"""
def wrapper(obj, journal, *args, **kwargs):
return journal.executeCommand(cmdClass(methodName, args, kwargs))
return wrapper
syntax highlighted by Code2HTML, v. 0.9.1