Spawning threads is a relatively expensive operation. Thus a common pattern for multi-threaded programming is to use what is generally referred to as a thread pool. A thread pool is a collection of threads that persist for a relatively long time and are re-used to perform work. If there is more work to do than there are threads available in the pool, the work is stored until a thread becomes available (generally work is stored in a queue). For instance, in a Web server that processes HTTP requests, a thread pool is commonly used to handle multiple requests in parallel. A given request requires parsing the request string, assembling the data (generally reading from files) and then formatting and sending the response string. These are all IO heavy operations that generally yield a speed advantage when run in parallel.
Determining the size of a thread pool is a bit of an art. As the number of threads is increased the overall throughput generally goes up for a while, but eventually if there are too many threads they compete for resources and overall capacity goes down. The "right" number depends on both the machine and on the work load (e.g, IO vs. processing).
The basic operations for a thread pool are generally to create a pool with some specified number of threads, to add work to a pool, and to stop or destroy a pool. Sometimes thread pools also have means of adding and reducing the number of threads in the pool. More complicated thread pools can maintain priorities for the work to be done and assign work to threads based on those priorities, including having a certain mix of high and low priority work being processed simultaneously. In such cases it is important to consider whether certain kinds of work will be "starved" for resources and never get to run if higher priority work is always arriving.
Here is a signature for a basic thread pool (without priorities):
signature SIMPLE_THREAD_POOL = sig type pool (* Create a thread pool with the specified number of threads *) val create: int -> pool (* Add work to a pool, where work is any f:unit->unit. return * true if work added, false if pool being destroyed *) val addwork: (unit->unit) * pool -> bool (* Destroy a thread pool, stopping when no work remains to be done *) val destroy: pool -> unit end
There are many ways of implementing this data type. First we will consider an approach that uses an explicit queue of work remaining to be done. Adding work simply puts the work on the queue. When a thread finishes the job it is currently doing it checks for more work in the queue, suspending if there is no work to do.
We will use a queue composed of two stacks, like we saw earlier in the semester. This queue also explicitly keeps track of the number of elements in the queue:
structure Queue = struct type 'a queue = int * 'a list * 'a list exception Empty val new = (0, nil, nil) fun insert (x, (l,b,f)) = (l+1, x::b, f) fun empty (0, nil, nil) = true | empty (_,_,_) = false fun remove (0, nil, nil) = (print("Empty Queue\n"); raise Empty) | remove (l, bs, nil) = remove(l, nil, rev bs) | remove (l, bs, f::fs) = (f, (l-1, bs, fs)) fun size (l, _, _) = l end
In this implementation, a thread pool consists of a pair of Boolean variable indicating whether the pool is being destroyed or not, and a queue of functions that store the work waiting to be done. This pair is stored in a mutable synchronous variable:
type pool = (bool * (unit -> unit) Queue.queue) SyncVar.mvar
In addition to implementing the three functions in the signature,
addwork, create and destroy, a thread pool also
has an execution loop that each thread is running. This is not a publicly
accessible function of the thread pool, it is only used internally. In the
code below that function is called dowork. It checks for
work to be done in the queue, and runs it if there is any. The code also
uses the function printdbg which prints a string only when the global variable
debug is true. More generally print is defined to be
TextIO.print in order to work correctly with concurrent ML.
structure STPool :> SIMPLE_THREAD_POOL = struct type pool = (bool * (unit -> unit) Queue.queue) SyncVar.mvar fun dowork (tp:pool,tid:int) : unit = let val _ = printdbg("Thread " ^ Int.toString(tid) ^ " wakeup.\n") val (destroyed, q) = SyncVar.mTake(tp) in if Queue.empty(q) then (SyncVar.mPut(tp,(destroyed, Queue.new)); if destroyed then printdbg(Int.toString(tid) ^ " destroyed\n") else dowork(tp,tid)) else let val (f, nq) = Queue.remove(q) in SyncVar.mPut(tp,(destroyed,nq)); printdbg("Thread " ^ Int.toString(tid) ^ " working; work todo " ^ Int.toString(Queue.size(nq)) ^ ".\n"); f(); dowork(tp,tid) end; () end fun create(num: int) : pool = let val tp:pool = SyncVar.mVarInit((false,Queue.new)) fun iter(n) = if n<=0 then tp else (CML.spawn(fn () => dowork(tp,n)); iter(n-1)) in iter(num) end fun addwork (f:unit->unit,tp:pool):bool = let val (destroyed, q) = SyncVar.mTake(tp) in if destroyed then (SyncVar.mPut(tp,(true, q)); false) else (SyncVar.mPut(tp,(false, Queue.insert(f,q))); true) end fun destroy(tp) = let val (destroyed, q) = SyncVar.mTake(tp) in SyncVar.mPut(tp,(true,q)); () end end
The create function creates a synchronized variable (mvar) of
type pool. This variable is used to ensure the thread safety of the
operations that are performed by the thread pool. The threads of the
thread pool are not stored in any data structure, rather they are simply all
created with access to the same synchronized variable. We also give
each thread an integer id.
Every operation that could modify the state of the thread pool first does an
mTake, then does the potential modification work, and then does an
mPut. This guarantees that no other thread modifies the state
of the shared structure in an inconsistent way. Each operation should do
as little work as possible after "locking" using mTake, before
"unlocking" using mPut, because no other threads can run while the
structure is "locked" in this manner. The longer an interval while it is
locked, the less concurrency in the resulting code.
The addwork function, as with all the functions (other
than create), first "locks" the data structure by doing an mTake.
It then simply adds the given function to the work queue, unlocking the mvar
(using mPut) in the process, and returns true to indicate that work
was added. However if the pool is being destroyed then addwork
does not add to the queue, simply unlocking the mvar and returning false to
indicate no work was added. Note: every mTake must be
paired with an mPut in all possible execution paths, or else no
other work will be able to be done because the synchronous variable will remain
"locked". Simply returning false without doing an mPut in
addwork would result in all the pending operations hanging if an
addwork call was done after a destroy call. This
kind of error is very hard to debug!
The destroy function simply sets the state of the Boolean
variable to true, again taking the lock first and releasing it afterwards. It
first locks the structure and checks if there is any work in the queue. If
not, it releases the lock and either calls itself again or exits, depending on
whether or not the destroy variable is set. Thus note that the threads in
a destroyed pool only exit when there is no more work in the queue (and no work
can be added once destroy has been set to true). If the queue is not
empty, then the first work item (function) is removed and the updated queue is
stored releasing the lock. Only then is the work done - it is important
that the work is not done while holding the lock or there will be no
concurrency, only one work item at a time will be done!
The dowork function is run by each thread in the thread pool.
It first locks the shared thread pool object, using mGet, and
checks whether the work queue is empty, If so, it mPut's the
unchanged data back (i.e., unlocking so other threads can run) and then checks
whether the thread pool has been destroyed. If ithas been destroyed then
it simply and lets the thread exit, and if not it calls the dowork
function again to check for more work. On the other hand if the queue is
not empty, then there is work to do. The first element is removed from the
queue and the updated queue is mPut. This first element is
then executed, and when it completes dowork is again called to
check for more work. Note that it is critical that the synchronous
variable is released (mPut) before calling the work function f.
If not, there would almost be no concurrency, because only one piece of work
would be running at a time.
Now that we've looked at the implementation in some detail, lets consider a simple example of using a thread pool:
fun prog() = let val tp = STpool.create(10) fun iter(n) = if n<= 0 then () else (STpool.addwork (fn () => (print("Doing " ^ Int.toString(n) ^ "\n"); ()),tp); iter(n-1)) in iter(500); STpool.destroy(tp); () end
This simply creates a thread pool of size 10, adds 500 work items to the pool and then destroys the pool. Note that the added work does not necessarily happen in the order that it was added. There is concurrency with multiple threads allowing for considerable out of order execution. Thus one cannot depend on the order of the different jobs in a thread pool. You can try running this to see how it works using the code file linked to at the top of these notes.
There are some drawbacks to this implementation of thread pools. The main
issue is that the use of a single synchronized variable for everything results
in a waiting thread "waking" whenever a thread checks for work when it is done
with a given job, because any mPut causes a waiting thread to check for work.
But an mPut is not only done when new work arrives, but also when
other threads check for work. You can see this if you comment out the
destroy call in prog, in which case when the work is
all done, a thread will keep waking up to check for work even without any
additional addwork calls being done (i.e., with the work queue
remaining empty). In addition to this drawback, performing the queue operations inside the
"global lock" of an mTake is also not ideal, but not as much of an
issue as the needless waking of threads that then determine there is nothing to
be done.
Thus we now consider a second implementation that uses the built-in asynchronous message channels rather than an explicit queue. We still make use of a synchronous variable as well, but it just holds whether or not the pool is being destroyed and a count of the number of threads in the pool (which we need for destroy in this implementation). This pool does not explicitly keep track of the amount of pending work, unlike the first implementation. In this implementation a pool is represented as a pair of an mvar and a mailbox.
struct type pool = (bool * int) SyncVar.mvar * (unit -> unit) Mailbox.mbox exception Done fun dowork (tp:pool as (vars, box),tid:int) : unit = let val g = Mailbox.recv(box) in printdbg("Thread " ^ Int.toString(tid) ^ " wakeup.\n"); g(); dowork(tp,tid); () end fun create(num: int) : pool = let val tp:pool = (SyncVar.mVarInit((false,num)),Mailbox.mailbox()) fun iter(n) = if n<=0 then tp else (CML.spawn(fn () => dowork(tp,n)); iter(n-1)) in iter(num) fun addwork (f:unit->unit,tp:pool as (svar,box)) = let val (destroyed, nthreads) = SyncVar.mTake(svar) in SyncVar.mPut(svar, (destroyed, nthreads)); if destroyed then false else (Mailbox.send(box,f); true) end fun destroy(tp:pool as (svar, box)) = let fun iter(n) = if n<=0 then () else (Mailbox.send(box,fn () => (printdbg("Stopping thread\n"); raise Done)); iter(n-1)) val (destroyed, nthreads) = SyncVar.mTake(svar) in SyncVar.mPut(svar,(true,nthreads)); iter(nthreads) end end
The internal exception Done is used by destroy to have each
thread raise an unhandled exception and thus exit when the work is finished.
The create function is quite similar to the previous
implementation, creating an empty structure and spawning the specified number of
threads.
The addwork function is also similar to the previous
implementation, however it unlocks the synchronous variable before adding the
work, because it only uses the variable to check whether the pool has been
destroyed. The asynchronous mailbox is used to add the work. Note
this is not a fully asynchronous call, and could still hang for an indefinite
time if some other thread has acquired the lock on the synchronous variable, but
the implementation does almost no work inside those locks so this should not
happen.
The destroy function is a bit more involved than the previous
implementation. This is because rather than having the dowork
loop of each thread check whether the pool has been destroyed, we simply add
jobs to the queue that will destroy the threads. This is only done after
setting the synchronous variable such that no additional work can be added, so
there will be no remaining work after these jobs.
The dowork function in this implementation is very simple.
It just gets a job from the asynchronous message queue (mailbox), runs it, and
then iterates. The loop will eventually exit when it is sent a job that
raises an unhandled done exception. Thus this loop never needs to check
any shared state, but rather simply waits for work and does that work.