wups
Robert Olson
olson at mcs.anl.gov
Wed Nov 12 11:19:10 CST 2003
import threading
import logging
log = logging.getLogger("AG.ThreadPoolServer")
import Queue
class ThreadPoolMixIn:
"""
Mix-in class for a socket server that allocates requests
to a pool of worker threads.
"""
def __init__(self, numWorkerThreads = 1):
self.done = 0
log.debug("Creating mixin")
self.numWorkerThreads = numWorkerThreads
self.requestQueue = Queue.Queue()
self._CreateWorkers()
def _CreateWorkers(self):
self.workerThread = {}
self.startLock = threading.Lock()
for workerNum in range(self.numWorkerThreads):
log.debug("Creating worker thread %d", workerNum)
self.workerThread[workerNum] = threading.Thread(target = self.__WorkerRun,
name = 'ThreadPoolWorker ' + str(workerNum),
args = (workerNum,))
self.startLock.acquire()
log.debug("Starting thread %d", workerNum)
self.workerThread[workerNum].start()
log.debug("Waiting thread %d", workerNum)
self.startLock.acquire()
self.startLock.release()
log.debug("Done creating workers")
def __WorkerRun(self, workerNum):
log.debug("Worker %d starting", workerNum)
self.startLock.release()
while not self.done:
cmd = self.requestQueue.get(1)
log.debug("Worker %d gets cmd %s", workerNum, cmd)
cmdType = cmd[0]
if cmdType == "quit":
break
elif cmdType == "request":
request = cmd[1]
client_address = cmd[2]
log.debug("handle request '%s' '%s'", request, client_address)
try:
self.finish_request(request, client_address)
self.close_request(request)
except:
log.exception("Worker %d: Request handling threw exception", workerNum)
log.debug("Worker %d exiting", workerNum)
def process_request(self, request, client_address):
log.debug("process_request: request=%s client_address=%s", request, client_address)
self.requestQueue.put(("request", request, client_address))
More information about the ag-dev
mailing list