"""Indexing and Method Combination Strategies ProtocolCriterion -- Index handler for checking that an expression adapts to a protocol ClassCriterion -- Index handler for checking that an expression is of a type or class SubclassCriterion -- Index handler for checking that an expression is a subclass of a given class Inequality -- Index handler for checking that an expression has a range relation (i.e. <,>,<=,>=,==,!=) to a constant value IdentityCriterion -- Index handler for checking that an expression 'is' a particular object Min, Max -- Extreme values for use with 'Inequality' Pointer -- hashable/comparable object pointer, with weakref support, used to implement 'IdentityCriterion' Predicate, Signature, PositionalSignature, Argument -- primitives to implement indexable multiple dispatch predicates most_specific_signatures, ordered_signatures, method_chain, method_list, all_methods, safe_methods, separate_qualifiers -- utility functions for creating method combinations validateCriterion -- check if a criterion's implementation is sane """ from __future__ import generators from protocols import Protocol, Adapter, StickyAdapter from protocols.advice import getMRO import protocols, operator, inspect from types import ClassType, InstanceType ClassTypes = (ClassType, type) from sys import _getframe from weakref import WeakKeyDictionary, ref from dispatch.interfaces import * from new import instancemethod import dispatch __all__ = [ 'ProtocolCriterion', 'ClassCriterion', 'SubclassCriterion', 'Inequality', 'Min', 'Max', 'Predicate', 'Signature', 'PositionalSignature', 'Argument', 'most_specific_signatures', 'ordered_signatures', 'separate_qualifiers', 'method_chain', 'method_list', 'all_methods', 'safe_methods', 'Pointer', 'default', 'IdentityCriterion', 'NullCriterion', 'validateCriterion', 'DispatchNode', 'SeededIndex', ] rev_ops = { '>': '<=', '>=': '<', '=>': '<', '<': '>=', '<=': '>', '=<': '>', '<>': '==', '!=': '==', '==':'!=' } try: set except NameError: from sets import Set as set from sets import ImmutableSet as frozenset class SeededIndex(object): """Index connecting seeds and results""" __slots__ = ( 'dispatch_function', 'allSeeds', 'matchingSeeds', 'criteria', 'enumerables','impliedSeeds' ) def __init__(self,dispatch_function): self.dispatch_function = dispatch_function self.clear() def clear(self): """Reset index to empty""" self.allSeeds = {} # set of all seeds self.matchingSeeds = {} # case -> applicable seeds self.criteria = {} # criterion -> applicable seeds self.enumerables = {} # enumerable -> applicable seeds (reg'd) self.impliedSeeds = {} # enumerable -> applicable seeds (all) def __setitem__(self,criterion,case): """Register 'case' under each of the criterion's seeds""" if criterion.enumerable: enumerables = self.enumerables if criterion not in enumerables: seed = criterion.leaf_seed seeds = self.allSeeds new_seed = seed not in seeds impliedSeeds = self.impliedSeeds for cri in criterion.parent_criteria(): if new_seed or cri not in enumerables: impliedSeeds.setdefault(cri,[]).append(seed) enumerables[criterion] = impliedSeeds[criterion] add = self.addSeed for key in criterion.seeds(): if key not in seeds: add(key,False) self.matchingSeeds[case] = enumerables[criterion] return criteria = self.criteria if criterion not in criteria: seeds = self.allSeeds add = self.addSeed for key in criterion.seeds(): if key not in seeds: add(key) criteria[criterion] = list(criterion.matches(seeds)) self.matchingSeeds[case] = criteria[criterion] def count_for(self,cases): """Get the total count of outgoing branches, given incoming cases""" get = self.matchingSeeds.get dflt = self.allSeeds return len(self.allSeeds), sum([len(get(case,dflt)) for case in cases]) def casemap_for(self,cases): """Return a mapping from seeds->caselists for the given cases""" get = self.matchingSeeds.get dflt = self.allSeeds casemap = dict([(key,[]) for key in dflt]) for case in cases: for key in get(case,dflt): casemap[key].append(case) return casemap def addSeed(self,seed, reseed=True): """Add a previously-missing seed""" if seed in self.allSeeds: return # avoid duping entries if this is a reseed via dispatcher for criterion,itsSeeds in self.criteria.items(): if seed in criterion: itsSeeds.append(seed) if reseed: for criterion,itsSeeds in self.enumerables.items(): if seed in criterion: itsSeeds.append(seed) self.allSeeds[seed] = None def mkNode(self,*args): node = DispatchNode(*args) return instancemethod(self.dispatch_function,node,DispatchNode) class DispatchNode(dict): """A mapping w/lazy population and supporting 'reseed()' operations""" protocols.advise(instancesProvide=[IDispatchTable]) __slots__ = 'index','cases','build','lock' def __init__(self, index, cases, build, lock): self.index = index self.cases = cases self.build = build self.lock = lock dict.__init__( self, [(key,build(subcases)) for key,subcases in index.casemap_for(cases).items()] ) def reseed(self,key): self.lock.acquire() try: self.index.addSeed(key) self[key] = retval = self.build( self.index.casemap_for(self.cases)[key] ) return retval finally: self.lock.release() _memo = {} def make_node_type(dispatch_function): if dispatch_function not in _memo: class Node(DispatchNode): def make_index(cls): return SeededIndex(dispatch_function) make_index = classmethod(make_index) # XXX can't use disp.as! _memo[dispatch_function] = Node return _memo[dispatch_function] def validateCriterion(criterion, node_type, seeded=True, parents=None): """Does 'criterion' have a sane implementation?""" criterion = ICriterion(criterion) hash(criterion) assert criterion.node_type is node_type assert criterion==criterion, (criterion, "should equal itself") assert criterion!=NullCriterion,(criterion,"shouldn't equal NullCriterion") assert criterion!=~criterion,(criterion,"shouldn't equal its inverse") assert criterion==~~criterion,(criterion,"should equal its double-inverse") assert criterion.implies(NullCriterion), ( criterion,"should imply NullCriterion" ) assert criterion.implies(criterion), (criterion,"should imply itself") assert not (~criterion).implies(criterion), ( criterion,"should not be implied by its inverse" ) assert not criterion.implies(~criterion), ( criterion,"should not imply its inverse", ~criterion ) d = {} criterion.subscribe(d) criterion.unsubscribe(d) if seeded: criterion = ISeededCriterion(criterion) for seed in criterion.seeds(): d[seed] = seed in criterion matches = list(criterion.matches(d)) for seed in matches: assert d[seed], (criterion,"should have contained",seed) del d[seed] for value in d.values(): assert not value,(criterion,"should've included",seed,"in matches") _parents = set(criterion.parent_criteria()) if parents is not None: # check specific parent assertion assert _parents == set(parents), (_parents,set(parents)) elif criterion.enumerable: assert _parents, ( criterion,"is enumerable and so should have parents" ) assert criterion in _parents, ( criterion,"does not include itself in its parent criteria" ) assert _parents == set([criterion]), ( criterion,"has too many parents, or you should be specifying" " the `parents` argument to ``validateCriterion()``", _parents ) else: assert not _parents, ( criterion, "is not enumerable and so should not have parents", _parents ) for parent in _parents: assert criterion.leaf_seed in parent assert criterion.implies(parent) class TGraph: """Simple transitive dependency graph""" def __init__(self): self.data = {} def add(self,s,e): self.data.setdefault(s,{}) for old_s,old_es in self.data.items(): if s in old_es or s is old_s: g = self.data.setdefault(old_s,{}) g[e] = 1 for ee in self.data.get(e,()): g[ee] = 1 def items(self): """List of current edges""" return [(s,e) for s in self.data for e in self.data[s]] def successors(self,items): """Return a truth map of the acyclic sucessors of 'items'""" d = {} get = self.data.get for s in items: for e in get(s,()): if s not in get(e,()): d[e] = 1 return d def dispatch_by_mro(table,ob): """Lookup '__class__' of 'ob' in 'table' using its MRO order""" try: klass = ob.__class__ except AttributeError: klass = type(ob) while True: if klass in table: return table[klass] try: klass, = klass.__bases__ except ValueError: if klass.__bases__: # Fixup for multiple inheritance return table.reseed(klass) else: break if isinstance(ob,InstanceType) and InstanceType in table: return table[InstanceType] if klass is not object and object in table: return table[object] def dispatch_by_subclass(table,ob): if isinstance(ob,ClassTypes): while 1: if ob in table: return table[ob] try: ob, = ob.__bases__ except ValueError: if ob.__bases__: return table.reseed(ob) break return table[None] class AbstractCriterion(object): """Common behaviors for typical criteria""" class __metaclass__(type): def __init__(cls,name,bases,cdict): if 'dispatch_function' in cdict: cls.node_type = make_node_type(cls.dispatch_function) __slots__ = 'hash','subject' enumerable = True protocols.advise(instancesProvide=[ISeededCriterion]) def __init__(self,subject=None): self.subject = subject self.hash = hash(subject) def __eq__(self,other): return type(self) is type(other) and self.subject==other.subject def __ne__(self,other): return not self==other def matches(self,table): for key in table: if key in self: yield key def __invert__(self): from predicates import NotCriterion return NotCriterion(self) def subscribe(self,listener): pass def unsubscribe(self,listener): pass def __contains__(self,key): raise NotImplementedError def __and__(self,other): from predicates import AndCriterion return AndCriterion(self,other) def implies(self,other): other = ISeededCriterion(other) if self.enumerable: return self.leaf_seed in other for seed in self.seeds(): if seed in self and seed not in other: return False for seed in other.seeds(): if seed in self and seed not in other: return False return True def parent_criteria(self): if self.enumerable: yield self # Alias subject -> leaf_seed by default AbstractCriterion.leaf_seed = AbstractCriterion.subject # Hack to make hashing faster, by bypassing function call and attr lookup AbstractCriterion.__hash__ = instancemethod( AbstractCriterion.hash.__get__, None, AbstractCriterion ) try: from _speedups import dispatch_by_mro except ImportError: pass class ClassCriterion(AbstractCriterion): """Criterion that indicates expr is of a particular class""" __slots__ = () protocols.advise( instancesProvide=[ISeededCriterion], asAdapterForTypes=ClassTypes ) dispatch_function = staticmethod(dispatch_by_mro) def __init__(self,cls): AbstractCriterion.__init__(self,cls) def seeds(self): return [self.subject,object] def __contains__(self,ob): if isinstance(ob,ClassTypes): return ( self.subject is object or issubclass(ob,self.subject) or (self.subject is InstanceType and isinstance(ob,ClassType)) ) return False def __repr__(self): return self.subject.__name__ def parent_criteria(self): for cls in getMRO(self.subject,True): yield self.__class__(cls) _guard = object() class Pointer(int): __slots__ = 'ref' def __new__(cls,ob): self = int.__new__(cls,id(ob)) try: self.ref=ref(ob) except TypeError: self.ref=lambda ob=ob: ob return self def __eq__(self,other): return self is other or int(self)==other and self.ref() is not None def __repr__(self): if self.ref() is None: return "Pointer()" % hex(self) else: return "Pointer(%r)" % self.ref() def dispatch_by_identity(table,ob): oid = id(ob) if oid in table: return table[oid] return table[None] class IdentityCriterion(AbstractCriterion): """Criterion that is true when target object is the same""" protocols.advise( instancesProvide=[ISeededCriterion],asAdapterForTypes=[Pointer] ) __slots__ = () dispatch_function = staticmethod(dispatch_by_identity) def __init__(self,ptr): AbstractCriterion.__init__(self,ptr) def seeds(self): return None,self.subject def matches(self,table): return self.subject, def __contains__(self,ob): return ob==self.subject def __repr__(self): return `self.subject` class NullCriterion(AbstractCriterion): """A "wildcard" Criterion that is always true""" node_type = None def seeds(self): return () def __contains__(self,ob): return True def implies(self,other): return False def __repr__(self): return "NullCriterion" def matches(self,table): return list(table) NullCriterion = NullCriterion() class SubclassCriterion(ClassCriterion): """Criterion that indicates expr is a subclass of a particular class""" __slots__ = () dispatch_function = staticmethod(dispatch_by_subclass) def seeds(self): return [self.subject,None] def __repr__(self): return "SubclassCriterion(%s)" % (self.subject.__name__,) class _ExtremeType(object): # Courtesy of PEP 326 def __init__(self, cmpr, rep): object.__init__(self) self._cmpr = cmpr self._rep = rep def __cmp__(self, other): if isinstance(other, self.__class__) and\ other._cmpr == self._cmpr: return 0 return self._cmpr def __repr__(self): return self._rep def __lt__(self,other): return self.__cmp__(other)<0 def __le__(self,other): return self.__cmp__(other)<=0 def __gt__(self,other): return self.__cmp__(other)>0 def __eq__(self,other): return self.__cmp__(other)==0 def __ge__(self,other): return self.__cmp__(other)>=0 def __ne__(self,other): return self.__cmp__(other)<>0 Max = _ExtremeType(1, "Max") Min = _ExtremeType(-1, "Min") def dispatch_by_inequalities(table,ob): key = ob,ob try: return table[key] except KeyError: if None not in table: table[None] = ranges = concatenate_ranges(table) else: ranges = table[None] lo = 0; hi = len(ranges) while loth: lo = mid+1 else: return table[ranges[mid]] def concatenate_ranges(range_map): ranges = range_map.keys(); ranges.sort() output = [] last = Min for (l,h) in ranges: if l applicable seeds self.last_cases = None self.last_out = None def __setitem__(self,criterion,case): """Register 'case' under each of the criterion's seeds""" self.criteria[case] = criterion for (lo,hi) in criterion.ranges: self.allSeeds[lo] = self.allSeeds[hi] = None def addSeed(self,seed): raise NotImplementedError def casemap_for(self,cases): """Return a mapping from seeds->caselists for the given cases""" if cases is self.last_cases or cases==self.last_cases: return self.last_out tmp = {} out = {} get = self.criteria.get all = Inequality('..',[(Min,Max)]) have_ineq = False for case in cases: for (lo,hi) in get(case,all).ranges: if lo not in tmp: tmp[lo] = [],[],[] if lo==hi: tmp[lo][2].append(case) else: have_ineq = True if hi not in tmp: tmp[hi] = [],[],[] tmp[lo][0].append(case) if hi is not Max: tmp[hi][1].append(case) if have_ineq: keys = tmp.keys() keys.sort() current = frozenset(tmp.get(Min,[[]])[0]) hi = Min for val in keys: add,remove,eq = tmp[val] lo,hi = hi,val out[lo,hi] = current current = current.difference(remove) out[val,val] = current.union(eq) current = current.union(add) else: out[Min,Max] = [] # default for val,(add,remove,eq) in tmp.items(): out[val,val] = eq self.last_out = out self.last_cases = cases return out class Inequality(object): """Criterion that indicates target matches specified const. inequalities""" __slots__ = 'hash','ranges' protocols.advise(instancesProvide=[ICriterion]) class node_type(DispatchNode): make_index = InequalityIndex def __init__(self,op,val): ranges = [] if op=='..': ranges.extend(val) else: if op=='!=': op = '<>' # easier to process this way if '<' in op: ranges.append((Min,val)) if '=' in op: ranges.append((val,val)) if '>' in op: ranges.append((val,Max)) if not ranges or [c for c in op if c not in '<=>']: raise ValueError("Invalid inequality operator", op) self.ranges = ranges = tuple(ranges) self.hash = hash(ranges) def implies(self,other): for r in self.ranges: if not r in other: return False return True def __repr__(self): return 'Inequality("..",%r)' % (self.ranges,) def __contains__(self,ob): for r in self.ranges: if ob==r: return True elif ob[0]==ob[1]: # single point must be *inside* the range if ob[0]>r[0] and ob[1]=r[0] and ob[1]<=r[1]: # for range, overlap allowed return True return False def __and__(self, other): ranges = [] for r in self.ranges: if r in other: ranges.append(r) else: l1, h1 = r for l2,h2 in other.ranges: if l2>h1 or h2Min and (last,lo) not in self: ranges[last,lo] = 1 last = hi if lo<>hi and lo<>Min and (lo,lo) not in self: ranges[lo,lo] = 1 if last is not Max: if (last,last) not in self: ranges[last,last]=1 ranges[last,Max]=1 ranges = list(ranges); ranges.sort() return self.__class__('..',ranges) # Hack to make hashing faster, by bypassing function call and attr lookup Inequality.__hash__ = instancemethod( Inequality.hash.__get__, None, Inequality ) class _Notifier(Protocol): """Helper class that forwards class registration info""" def __init__(self,baseProto): Protocol.__init__(self) from weakref import WeakKeyDictionary self.__subscribers = WeakKeyDictionary() baseProto.addImpliedProtocol(self, protocols.NO_ADAPTER_NEEDED, 1) def subscribe(self,listener): self._Protocol__lock.acquire() try: self.__subscribers[listener] = 1 finally: self._Protocol__lock.release() def unsubscribe(self,listener): self._Protocol__lock.acquire() try: if listener in self.__subscribers: del self.__subscriber[listener] finally: self._Protocol__lock.release() def registerImplementation(self,klass,adapter=protocols.NO_ADAPTER_NEEDED,depth=1): old_reg = Protocol.registerImplementation.__get__(self,self.__class__) result = old_reg(klass,adapter,depth) self._Protocol__lock.acquire() try: if self.__subscribers: for subscriber in self.__subscribers.keys(): subscriber.criterionChanged() finally: self._Protocol__lock.release() return result class ProtocolCriterion(StickyAdapter,AbstractCriterion): """Criterion that indicates instances of expr's class provide a protocol""" protocols.advise( instancesProvide=[ISeededCriterion], asAdapterForTypes=[Protocol] ) attachForProtocols = (ICriterion,ISeededCriterion) dispatch_function = staticmethod(dispatch_by_mro) enumerable = False # Just to be safe def __init__(self,ob): self.notifier = _Notifier(ob) StickyAdapter.__init__(self,ob) AbstractCriterion.__init__(self,ob) def subscribe(self,listener): self.notifier.subscribe(listener) def unsubscribe(self,listener): self.notifier.unsubscribe(listener) def seeds(self): return self.notifier._Protocol__adapters.keys() + [object] def __contains__(self,ob): if isinstance(ob,ClassTypes): bases = self.subject._Protocol__adapters for base in getMRO(ob,True): if base in bases: return bases[base][0] is not protocols.DOES_NOT_SUPPORT return False def implies(self,other): other = ISeededCriterion(other) if other is NullCriterion: return True for base in self.notifier._Protocol__adapters.keys(): if base not in other: return False return True def __repr__(self): return self.subject.__name__ def most_specific_signatures(cases): """List the most specific '(signature,method)' pairs from 'cases' 'cases' is a list of '(signature,method)' pairs, where each 'signature' must provide 'ISignature'. This routine checks the implication relationships between pairs of signatures, and then returns a shorter list of '(signature,method)' pairs such that no other signature from the original list implies a signature in the new list. """ if len(cases)==1: # Shortcut for common case return list(cases) best, rest = list(cases[:1]), list(cases[1:]) for new_sig,new_meth in rest: for old_sig, old_meth in best[:]: # copy so we can modify inplace new_implies_old = new_sig.implies(old_sig) old_implies_new = old_sig.implies(new_sig) if new_implies_old: if not old_implies_new: # better, remove the old one best.remove((old_sig, old_meth)) elif old_implies_new: # worse, skip adding the new one break else: # new_sig has passed the gauntlet, as it has not been implied # by any of the current "best" items best.append((new_sig,new_meth)) return best def ordered_signatures(cases): """Return list of lists of cases sorted into partial implication order Each list within the returned list contains cases whose signatures are overlapping, equivalent, or disjoint with one another, but are more specific than any other case in the lists that follow.""" rest = list(cases) while rest: best = most_specific_signatures(rest) map(rest.remove,best) yield best def all_methods(grouped_cases): """Yield all methods in 'grouped_cases'""" for group in grouped_cases: for signature,method in group: yield method def safe_methods(grouped_cases): """Yield non-ambiguous methods (plus optional raiser of AmbiguousMethod)""" for group in grouped_cases: if len(group)>1: yield AmbiguousMethod(group) break for signature,method in group: yield method def method_list(methods): """Return callable that yields results of calling 'methods' w/same args""" methods = list(methods) # ensure it's re-iterable def combined(*args,**kw): for m in methods: yield m(*args,**kw) return combined def method_chain(methods): """Chain 'methods' such that each may call the next""" methods = iter(methods) # ensure that nested calls will see only the tail for method in methods: try: args = inspect.getargspec(method)[0] except TypeError: return method # not a function, therefore not chainable if args and args[0]=='next_method': if getattr(method,'im_self',None) is None: next_method = method_chain(methods) return instancemethod(method,next_method,type(next_method)) return method return NoApplicableMethods() def single_best(cases): for method in safe_methods(ordered_signatures(cases)): return method else: return NoApplicableMethods() def separate_qualifiers(qualified_cases, **postprocessors): """list[qualified_case] -> dict[qualifier:list[unqualified_case]] Turn a list of cases with possibly-qualified methods into a dictionary mapping qualifiers to (possibly post-processed) case lists. If a given method is not qualified, it's treated as though it had the qualifier '"primary"'. Keyword arguments supplied to this function are treated as a mapping from qualifiers to lists of functions that should be applied to the list of cases to that qualifier. So, for example, this:: cases = separate_qualifiers(cases, primary=[strategy.ordered_signatures,strategy.safe_methods], ) is equivalent to:: cases = separate_qualifiers(cases) if "primary" in cases: cases["primary"]=safe_methods(ordered_signatures(cases["primary"])) Notice, by the way, that the postprocessing functions must be listed in order of *application* (i.e. outermost last). """ cases = {} for signature,method in qualified_cases: if isinstance(method,tuple): qualifier,method = method else: qualifier="primary" cases.setdefault(qualifier,[]).append((signature,method)) for k,v in cases.items(): if k in postprocessors: for p in postprocessors[k]: v = p(v) cases[k] = v return cases class ExprBase(object): protocols.advise(instancesProvide=[IDispatchableExpression]) def __ne__(self,other): return not self.__eq__(other) def __hash__(self): return self.hash def asFuncAndIds(self,generic): raise NotImplementedError class Argument(ExprBase): """The most basic kind of dispatch expression: an argument specifier""" def __init__(self,pos=None,name=None): if pos is None and name is None: raise ValueError("Argument name or position must be specified") self.pos = pos self.name = name self.hash = hash((type(self),self.pos,self.name)) def __eq__(self,other): return isinstance(other,Argument) and \ (other.pos==self.pos) and \ (other.name==self.name) def asFuncAndIds(self,generic): raise NameError("%r is not known to %s" % (self,generic)) def __repr__(self): if self.name: return self.name return 'Argument(%r)' % self.pos class Predicate(object): """A set of alternative signatures in disjunctive normal form""" protocols.advise( instancesProvide=[IDispatchPredicate], asAdapterForProtocols = [protocols.sequenceOf(ISignature)], ) def __init__(self,items): self.items = all = [] for item in map(ISignature,items): if item not in all: all.append(item) def __iter__(self): return iter(self.items) def __and__(self,other): return Predicate( [(a & b) for a in self for b in IDispatchPredicate(other)]) def __or__(self,other): sig = ISignature(other,None) if sig is not None: if len(self.items)==1: return self.items[0] | sig return Predicate(self.items+[sig]) return Predicate(list(self)+list(other)) def __eq__(self,other): return self is other or self.items==list(other) def __ne__(self,other): return not self.__eq__(other) def __repr__(self): return `self.items` protocols.declareAdapter( lambda ob: Predicate([ob]), [IDispatchPredicate], forProtocols=[ISignature] ) class Signature(object): """A set of criteria (in conjunctive normal form) applied to expressions""" protocols.advise(instancesProvide=[ISignature]) __slots__ = 'data','keys' def __init__(self, __id_to_test=(), **kw): items = list(__id_to_test)+[(Argument(name=k),v) for k,v in kw.items()] self.data = data = {}; self.keys = keys = [] for k,v in items: v = ICriterion(v) k = k,v.node_type if k in data: data[k] &= v else: data[k] = v; keys.append(k) def implies(self,otherSig): otherSig = ISignature(otherSig) for expr_id,otherCriterion in otherSig.items(): if not self.get(expr_id).implies(otherCriterion): return False return True def items(self): return [(k,self.data[k]) for k in self.keys] def get(self,expr_id): return self.data.get(expr_id,NullCriterion) def __repr__(self): return 'Signature(%s)' % (','.join( [('%r=%r' % (k,v)) for k,v in self.data.items()] ),) def __and__(self,other): me = self.data.items() if not me: return other if IDispatchPredicate(other) is other: return Predicate([self]) & other they = ISignature(other).items() if not they: return self return Signature( [(k[0],self.data[k]) for k in self.keys] + [(k,v) for (k,d),v in they] ) def __or__(self,other): me = self.data.items() if not me: return self # Always true if IDispatchPredicate(other) is other: return Predicate([self]) | other they = ISignature(other).items() if not they: return other # Always true return Predicate([self,other]) def __eq__(self,other): if other is self: return True other = ISignature(other,None) if other is None or other is NullCriterion: return False for k,v in self.items(): if v!=other.get(k): return False for k,v in other.items(): if v!=self.get(k): return False return True def __ne__(self,other): return not self.__eq__(other) class PositionalSignature(Signature): protocols.advise( instancesProvide=[ISignature], asAdapterForProtocols=[protocols.sequenceOf(ICriterion)] ) __slots__ = () def __init__(self,criteria,proto=None): Signature.__init__(self, zip(map(Argument,range(len(criteria))), criteria) ) default = Signature()