#!/usr/bin/env python ############################################################################# # Copyright (C) DSTC Pty Ltd (ACN 052 372 577) 1997, 1998, 1999 # All Rights Reserved. # # The software contained on this media is the property of the # DSTC Pty Ltd. Use of this software is strictly in accordance # with the license agreement in the accompanying LICENSE.DOC file. # If your distribution of this software does not contain a # LICENSE.DOC file then you have no rights to use this software # in any manner and should contact DSTC at the address below # to determine an appropriate licensing arrangement. # # DSTC Pty Ltd # Level 7, Gehrmann Labs # University of Queensland # St Lucia, 4072 # Australia # Tel: +61 7 3365 4310 # Fax: +61 7 3365 4311 # Email: enquiries@dstc.edu.au # # This software is being provided "AS IS" without warranty of # any kind. In no event shall DSTC Pty Ltd be liable for # damage of any kind arising out of or in connection with # the use or performance of this software. # # Project: Distributed Environment # File: $Source: /cvsroot/fnorb/fnorb/orb/ThreadPoolQueue.py,v $ # ############################################################################# """ A queue serviced by a thread pool. """ # Standard/built-in modules. import thread # Fnorb modules. import condvar class ThreadPoolQueue: """ A queue serviced by a thread pool. """ def __init__(self, size, function): """ Constructor. """ self.__size = size self.__function = function self.__stopped = 0 self.__data = [] self.__cv = condvar.condvar() return def start(self): """ Start servicing the queue. """ # Start the appropriate number of worker threads. for i in range(self.__size): thread.start_new_thread(self.worker_thread, (i,)) return def stop(self): """ Stop servicing the queue. """ self.__cv.acquire() self.__stopped = 1 self.__cv.broadcast() return def wait(self): """ Wait until all of the worker threads have finished. """ self.__cv.acquire() while self.__size > 0: self.__cv.wait() self.__cv.release() return def add_item(self, item): """ Add a single item to the queue. """ self.__cv.acquire() self.__data.append(item) self.__cv.signal() return def add_items(self, items): """ Add a list of items to the queue. """ self.__cv.acquire() self.__data[len(self.__data):] = items self.__cv.broadcast() return def worker_thread(self, i): """ The worker!""" self.__cv.acquire() while not self.__stopped: # Is there an item on the queue for me to deal with? if len(self.__data) > 0: item = self.__data[0] del self.__data[0] self.__cv.release() # Do the work! apply(self.__function, item) # Acquire the lock so that I can check to see if I am stopped # or if there is some more work for me to do. self.__cv.acquire() # Otherwise, we are not stopped, and there is nothing for me to # do, so I'll just wait around a while... else: self.__cv.wait() # The thread pool has been stopped so let's get outta here. self.__size = self.__size - 1 self.__cv.signal() # Explicitly exit the thread! thread.exit() #############################################################################