wups

Robert Olson olson at mcs.anl.gov
Wed Nov 12 11:19:10 CST 2003


import threading
import logging

log = logging.getLogger("AG.ThreadPoolServer")
import Queue

class ThreadPoolMixIn:
    """
    Mix-in class for a socket server that allocates requests
    to a pool of worker threads.
    """

    def __init__(self, numWorkerThreads = 1):

        self.done = 0

        log.debug("Creating mixin")
        self.numWorkerThreads = numWorkerThreads
        self.requestQueue = Queue.Queue()
        self._CreateWorkers()

    def _CreateWorkers(self):
        self.workerThread = {}
        self.startLock = threading.Lock()
        for workerNum in range(self.numWorkerThreads):
            log.debug("Creating worker thread %d", workerNum)
            self.workerThread[workerNum] = threading.Thread(target = self.__WorkerRun,
                                                            name = 'ThreadPoolWorker ' + str(workerNum),
                                                            args = (workerNum,))
            self.startLock.acquire()
            log.debug("Starting thread %d", workerNum)
            self.workerThread[workerNum].start()
            log.debug("Waiting thread %d", workerNum)
            self.startLock.acquire()
            self.startLock.release()
        log.debug("Done creating workers")

    def __WorkerRun(self, workerNum):
        log.debug("Worker %d starting", workerNum)
        self.startLock.release()

        while not self.done:
            cmd = self.requestQueue.get(1)
            log.debug("Worker %d gets cmd %s", workerNum, cmd)
            cmdType = cmd[0]
            if cmdType == "quit":
                break
            elif cmdType == "request":
                request = cmd[1]
                client_address = cmd[2]
                log.debug("handle request '%s' '%s'", request, client_address)
                try:
                    self.finish_request(request, client_address)
                    self.close_request(request)
                except:
                    log.exception("Worker %d: Request handling threw exception", workerNum)
        log.debug("Worker %d exiting", workerNum)
            

    def process_request(self, request, client_address):
        log.debug("process_request: request=%s client_address=%s", request, client_address)
        self.requestQueue.put(("request", request, client_address))


More information about the ag-dev mailing list