# -*- coding: Latin-1 -*-
"""Predicates for the mini-language used by rresource.py"""
import struct
import socket
import time
def atoi(s):
if s == '255.255.255.255':
return -1
return struct.unpack('!i', socket.inet_aton(s))[0]
class Address:
def __eq__(self, address):
return _Address(address, operator.eq)
def __ne__(self, address):
return _Address(address, operator.ne)
class _Address:
subjugates = None
def __init__(self, a, f):
parts = a.split('/', 1)
if len(parts) == 1:
parts.append('255.255.255.255')
self.host, self.mask = parts
self.mask = atoi(self.mask)
self.host = atoi(self.host) & self.mask
self.f = f
def check(self, resource, request):
if self.f(atoi(request.client[1]) & self.mask, self.host):
if self.subjugates is None:
return True
for s in self.subjugates:
r = s.check(resource, request)
if r is not None:
return r
return None
def __call__(self, *args):
self.subjugates = args
class Tautology:
def check(self, resource, request):
if self.subjugates is not None:
for s in self.subjugates:
r = s.check(resource, request)
if r is not None:
return r
else:
return None
else:
return True
def __call__(self, *args):
self.subjugates = args
class Contradiction:
def check(self, resource, request):
return False
def __call__(self, *args):
pass
import operator
tmpl = "def __%s__(self, m): return __DateThing(self.attr, m, operator.%s)"
class DateThing:
def __init__(self, attr):
self.attr = attr
for fn in 'eq', 'ne', 'lt', 'gt', 'le', 'ge':
exec tmpl % (fn, fn)
class _DateThing(Tautology):
def __init__(self, a, m, f):
self.a = a
self.m = m
self.f = f
def check(self, resource, request):
if self.f(getattr(time.localtime(), self.a), self.m):
return Tautology.check(resource, request)
return false
Month = DateThing('tm_mon')
MonthDay = DateThing('tm_mday')
tmpl = 'def __%s__(self, n): return _UniqueHosts(n, operator.%s)'
class UniqueHosts:
for fn in 'eq', 'ne', 'lt', 'gt', 'le', 'ge':
exec tmpl % (fn, fn)
class _UniqueHosts(Tautology):
def __init__(self, n, op):
self.n = n
self.op = op
def check(self, resource, request):
i = 0
d = {}
for conn in request.channel.factory.connections:
if conn.transport.getPeer()[1] not in d:
d[conn.transport.getPeer()[1]] = None
i += 1
if self.op(i, self.n):
return Tautology.check(self, resource, request)
return False
tmpl = 'def __%s__(self, c): return _Connections(c, operator.%s)'
class Connections:
for fn in 'eq', 'ne', 'lt', 'gt', 'le', 'ge':
exec tmpl % (fn, fn)
class _Connections(Tautology):
def __init__(self, c, op):
self.c = c
self.op = op
def check(self, resource, request):
if self.op(len(request.channel.factory.connections), self.c):
return Tautology.check(self, resource, request)
return False
Tautology = Tautology()
Contradiction = Contradiction()
Address = Address()
Connections = Connections()
UniqueHosts = UniqueHosts()
syntax highlighted by Code2HTML, v. 0.9.1