"""Generic function implementations""" from __future__ import generators from dispatch.interfaces import * import protocols, inspect, sys, dispatch from protocols.advice import add_assignment_advisor,getFrameInfo,addClassAdvisor from protocols.interfaces import allocate_lock from new import instancemethod from types import FunctionType, ClassType, InstanceType ClassTypes = (ClassType, type) __all__ = [ 'GenericFunction', 'Dispatcher', 'AbstractGeneric', ] _NF = (0,None, NoApplicableMethods, (None,None)) try: frozenset except NameError: from sets import ImmutableSet as frozenset def _mkGeneric(oldfunc,argname): funcname = oldfunc.__name__ args, varargs, kwargs, defaults = inspect.getargspec(oldfunc) if defaults: tmpd = ["=__gfDefaults[%s]" % i for i in range(len(defaults))] else: tmpd = None argspec = inspect.formatargspec( args, varargs, kwargs, tmpd, formatvalue=lambda x:x) outargs = inspect.formatargspec(args,varargs,kwargs) protocol = protocols.Protocol() d={} s= """ def setup(__gfProtocol, __gfDefaults): def %(funcname)s%(argspec)s: __gfWhat = __gfProtocol(%(argname)s,None) if __gfWhat is None: raise NoApplicableMethods(%(argname)s) else: %(argname)s = __gfWhat[0] return __gfWhat[1]%(outargs)s return %(funcname)s """ % locals() exec s in globals(),d; func = d['setup'](protocol,defaults) def when(cond): """Add following function to this GF, using 'cond' as a guard""" def callback(frm,name,value,old_locals): declarePredicate(cond, protocol, lambda ob: (ob,value)) if old_locals.get(name) is func: return func return value return add_assignment_advisor(callback) def addMethod(cond,func): """Use 'func' when dispatch argument matches 'cond'""" declarePredicate(cond, protocol, lambda ob: (ob,func)) def clone(): """Return a simple generic function that "inherits" from this one""" f = _mkGeneric(oldfunc,argname) protocols.declareAdapter( protocols.NO_ADAPTER_NEEDED,[f.protocol],forProtocols=[protocol] ) return f func.addMethod = addMethod func.when = when func.clone = clone func.protocol = protocol func.__doc__ = oldfunc.__doc__ protocols.adviseObject(func,provides=[IExtensibleFunction]) return func # Bootstrap SimpleGeneric declaration helper function -- itself a SimpleGeneric [dispatch.on('ob')] def declarePredicate(ob,proto,factory): """Declare a SimpleGeneric dispatch predicate""" declarePredicate = _mkGeneric(declarePredicate,'ob') proto = declarePredicate.protocol def declareForType(typ,proto,factory): protocols.declareAdapter(factory,provides=[proto],forTypes=[typ]) def declareForProto(pro,proto,factory): protocols.declareAdapter(factory,provides=[proto],forProtocols=[pro]) def declareForSequence(seq,proto,factory): for item in seq: declarePredicate(item,proto,factory) declareForType(ClassType, proto, lambda ob:(ob,declareForType)) declareForType(type, proto, lambda ob:(ob,declareForType)) declareForProto(protocols.IOpenProtocol,proto, lambda ob:(ob,declareForProto)) declareForProto(protocols.IBasicSequence,proto, lambda ob:(ob,declareForSequence)) class ExprCache(object): __slots__ = 'cache','argtuple','expr_defs' def __init__(self,argtuple,expr_defs): self.argtuple = argtuple self.expr_defs = expr_defs self.cache = {} def __getitem__(self,item): if item==EXPR_GETTER_ID: return self.__getitem__ try: return self.argtuple[item] except IndexError: pass try: return self.cache[item] except KeyError: pass f,args = self.expr_defs[item] f = self.cache[item] = f(*map(self.__getitem__,args)) return f class BaseDispatcher: def __getitem__(self,argtuple): argct = self.argct node = self._dispatcher or self._startNode() or _NF expr, factory, func, init = node while factory: if func is None: self._acquire() try: if node[2] is None: node[2] = func = factory(*init) finally: self._release() if expr'method'""" cond = self.parseRule(signature) if cond is not None: for signature in IDispatchPredicate(cond): self[signature] = method return self._acquire() try: signature = strategy.Signature( [(self._dispatch_id(expr,criterion),criterion) for expr,criterion in ISignature(signature).items() if criterion is not strategy.NullCriterion ] ) self._addCase((signature, method)) self._addConstraints(signature) finally: self._release() [dispatch.on('rule')] def parseRule(self,rule,frame=None,depth=3): """Parse 'rule' if it's a string/unicode, otherwise return 'None'""" [parseRule.when([str,unicode])] def parseRule(self,rule,frame,depth): frame = frame or sys._getframe(depth) return self.parse(rule, frame.f_locals, frame.f_globals) [parseRule.when(object)] def parseRule(self,rule,frame,depth): return None def combine(self,cases): return strategy.single_best(cases) def _best_split(self, cases, disp_ids): """Return best (disp_id,method_map,remaining_ids) for current subtree""" best_id = None best_map = None best_spread = None remaining_ids = list(disp_ids) active_cases = len(cases) disabled = self.constraints.successors(remaining_ids) skipped = [] to_do = remaining_ids[:] for disp_id in to_do: if disp_id in disabled: # Skip criteria that have unchecked prerequisites skipped.append(disp_id) continue index = self.disp_indexes[disp_id] lindex, total_cases = index.count_for(cases) if total_cases == active_cases * lindex: # None of the index keys for this expression eliminate any # cases, so this expression isn't needed for dispatching remaining_ids.remove(disp_id) disabled = self.constraints.successors(remaining_ids) to_do.extend(skipped); skipped=[] continue spread = float(total_cases) / lindex if spread < best_spread or best_spread is None: best_spread = spread best_id = disp_id if best_id is not None: remaining_ids.remove(best_id) return best_id, tuple(remaining_ids) def _dispatch_id(self,(expr,disp_func),criterion): """Replace expr/criterion with a local key""" criterion.subscribe(self) expr = self.getExpressionId(expr) disp = expr, criterion.node_type if disp not in self.disp_indexes: self.disp_indexes[disp] = criterion.node_type.make_index() return expr def getExpressionId(self,expr): """Replace 'expr' with a local expression ID number""" # XXX this isn't threadsafe if not called from 'asFuncAndIds' try: return self.expr_map[expr] except KeyError: expr_def = IDispatchableExpression(expr).asFuncAndIds(self) try: return self.expr_map[expr_def] except KeyError: expr_id = len(self.expr_defs) self.expr_map[expr] = self.expr_map[expr_def] = expr_id self.expr_defs.append(expr_def) return expr_id def _addConstraints(self, signature): pre = [] for key,criterion in signature.items(): if key[0] >= self.argct: # constrain non-argument exprs for item in pre: self.constraints.add(item,key) pre.append(key) class AbstractGeneric(Dispatcher): protocols.advise(instancesProvide=[IGenericFunction]) delegate = None def __init__(self,func): self.delegate, args = _mkNormalizer(func, self) self.delegate.__dict__ = dict( [(k,getattr(self,k)) for k in dir(self.__class__) if not k.startswith('_')] ) self.delegate.__doc__ = self.__doc__ = func.__doc__ protocols.adviseObject(self.delegate,[IGenericFunction]) self.__name__ = func.__name__; self.__call__ = self.delegate Dispatcher.__init__(self,args) # We can't be used as a method, but make pydoc think we're a callable __get__ = None def addMethod(self,predicate,function,qualifier=None): if qualifier is not None: function = qualifier,function for signature in IDispatchPredicate(predicate): self[signature] = function def combine(self,cases): raise NotImplementedError( "The purpose of this class is to support *custom* method combiners" ) def __call__(__self,*args,**kw): return __self.delegate(*args,**kw) def _decorate(self,cond,qualifier=None,frame=None,depth=2): # XXX frame = frame or sys._getframe(depth) cond = self.parseRule(cond,frame=frame) or cond def registerMethod(frm,name,value,old_locals): if qualifier is None: func = value else: func = qualifier,value kind,module,locals_,globals_ = getFrameInfo(frm) if kind=='class': # 'when()' in class body; defer adding the method def registerClassSpecificMethod(cls): req = strategy.Signature( [(strategy.Argument(0),ICriterion(cls))] ) self.addMethod(req & cond, func) return cls addClassAdvisor(registerClassSpecificMethod,frame=frm) else: self.addMethod(cond,func) if old_locals.get(name) in (self,self.delegate): return self.delegate return value return add_assignment_advisor(registerMethod,frame=frame) class GenericFunction(AbstractGeneric): """Extensible predicate dispatch generic function""" def combine(self,cases): strict = [strategy.ordered_signatures,strategy.safe_methods] loose = [strategy.ordered_signatures,strategy.all_methods] cases = strategy.separate_qualifiers( cases, around = strict, before = loose, primary = strict, after =loose, ) primary = strategy.method_chain(cases.get('primary',[])) if cases.get('after') or cases.get('before'): befores = strategy.method_list(cases.get('before',[])) afters = strategy.method_list(list(cases.get('after',[]))[::-1]) def chain(*args,**kw): for tmp in befores(*args,**kw): pass # toss return values result = primary(*args,**kw) for tmp in afters(*args,**kw): pass # toss return values return result else: chain = primary if cases.get('around'): chain = strategy.method_chain(list(cases['around'])+[chain]) return chain def around(self,cond): """Add function as an "around" method w/'cond' as a guard If 'cond' is parseable, it will be parsed using the caller's frame locals and globals. """ return self._decorate(cond,"around") def before(self,cond): """Add function as a "before" method w/'cond' as a guard If 'cond' is parseable, it will be parsed using the caller's frame locals and globals. """ return self._decorate(cond,"before") def after(self,cond): """Add function as an "after" method w/'cond' as a guard If 'cond' is parseable, it will be parsed using the caller's frame locals and globals. """ return self._decorate(cond,"after") def when(self,cond): """Add following function to this GF, w/'cond' as a guard If 'cond' is parseable, it will be parsed using the caller's frame locals and globals. """ return self._decorate(cond) def _mkNormalizer(func,dispatcher): funcname = func.__name__ if funcname=='': funcname = "anonymous" args, varargs, kwargs, defaults = inspect.getargspec(func) if defaults: tmpd = ["=__gfDefaults[%s]" % i for i in range(len(defaults))] else: tmpd = None argspec = inspect.formatargspec( args, varargs, kwargs, tmpd, formatvalue=lambda x:x) allargs = inspect.formatargspec(args,varargs,kwargs) outargs = inspect.formatargspec(args, varargs, kwargs, formatvarargs=lambda name:name, formatvarkw=lambda name:name, join=lambda seq:','.join(seq)) outargs = outargs[1:-1]+',' if outargs==',': outargs='' retargs = [] else: retargs = filter(None,outargs.replace(' ','').split(',')) d ={} s = """ def setup(__dispatcher,__gfDefaults): def %(funcname)s%(argspec)s: return __dispatcher((%(outargs)s))%(allargs)s return %(funcname)s """ % locals() exec s in globals(),d return d['setup'](dispatcher.__getitem__,defaults), retargs defaultNormalize = lambda *__args: __args