#! /usr/bin/env python from Pyro.EventService.Clients import Subscriber from Pyro.errors import NamingError from threading import Thread import Pyro.util import time class TrafficCounter(Subscriber,Thread): def __init__(self,id): self.subjPrefix="STRESSTEST.CARS.HEADING." Subscriber.__init__(self) Thread.__init__(self) self.patterns=['north','east','south','west'] self.currentPattern=None self.counter=0 self.id=id self.lock=Pyro.util.getLockObject() self.subscribeNextPattern() def subscribeNextPattern(self): # because this is called from an event, # and that may occur in multiple threads concurrently, # we need a lock on this. # If we don't, it's possible that multiple threads # access the same ES proxy concurrently --> HAVOC. self.lock.acquire() if self.currentPattern: self.unsubscribe(self.subjPrefix+self.currentPattern) try: self.currentPattern=self.patterns.pop() print self.id,'I am now watching for cars heading',self.currentPattern self.subscribe(self.subjPrefix+self.currentPattern) except IndexError: print self.id,'I watched all directions. Start over.' self.patterns=['north','south','east','west'] self.currentPattern=self.patterns.pop() self.subscribe(self.subjPrefix+self.currentPattern) self.lock.release() def event(self, event): (color,car)=event.msg print self.id,'A',color,car,'went',event.subject[len(self.subjPrefix):] self.counter+=1 if self.counter>=4: self.counter=0 print self.id,"There were enough cars in that direction. Let's look somewhere else." self.subscribeNextPattern() def run(self): try: print self.id,'Going to count cars.' self.listen() print self.id,'Stopped counting cars.' except NamingError: print 'Cannot find service. Is the Event Service running?' def main(): threads=[] for i in range(20): tc=TrafficCounter(i) tc.start() threads.append(tc) try: while 1: time.sleep(10) except KeyboardInterrupt: print 'Break-- weating for threads to stop.' for tc in threads: tc.abort() for tc in threads: tc.join() if __name__=='__main__': main()