Recitation 24: Multi-Threaded Programming

[SML code examples]

Spawning threads is a relatively expensive operation.  Thus a common pattern for multi-threaded programming is to use what is generally referred to as a thread pool.  A thread pool is a collection of threads that persist for a relatively long time and are re-used to perform work.  If there is more work to do than there are threads available in the pool, the work is stored until a thread becomes available (generally work is stored in a queue).  For instance, in a Web server that processes HTTP requests, a thread pool is commonly used to handle multiple requests in parallel.  A given request requires parsing the request string, assembling the data (generally reading from files) and then formatting and sending the response string.  These are all IO heavy operations that generally yield a speed advantage when run in parallel.

Determining the size of a thread pool is a bit of an art.  As the number of threads is increased the overall throughput generally goes up for a while, but eventually if there are too many threads they compete for resources and overall capacity goes down.  The "right" number depends on both the machine and on the work load (e.g, IO vs. processing).

The basic operations for a thread pool are generally to create a pool with some specified number of threads,  to add work to a pool, and to stop or destroy a pool.  Sometimes thread pools also have means of adding and reducing the number of threads in the pool.  More complicated thread pools can maintain priorities for the work to be done and assign work to threads based on those priorities, including having a certain mix of high and low priority work being processed simultaneously.  In such cases it is important to consider whether certain kinds of work will be "starved" for resources and never get to run if higher priority work is always arriving.

Here is a signature for a basic thread pool (without priorities):

signature SIMPLE_THREAD_POOL = 
  sig
    type pool
    (* Create a thread pool with the specified number of threads *)
    val create: int -> pool
    (* Add work to a pool, where work is any f:unit->unit. return
     * true if work added, false if pool being destroyed *)
    val addwork: (unit->unit) * pool -> bool
    (* Destroy a thread pool, stopping when no work remains to be done *)
    val destroy: pool -> unit
  end

There are many ways of implementing this data type.  First we will consider an approach that uses an explicit queue of work remaining to be done.  Adding work simply puts the work on the queue.  When a thread finishes the job it is currently doing it checks for more work in the queue, suspending if there is no work to do.

We will use a queue composed of two stacks, like we saw earlier in the semester.  This queue also explicitly keeps track of the number of elements in the queue:

structure Queue = 
  struct
    type 'a queue = int * 'a list * 'a list
    exception Empty
    val new = (0, nil, nil)
    fun insert (x, (l,b,f)) = (l+1, x::b, f)
    fun empty (0, nil, nil) = true
      | empty (_,_,_) = false
    fun remove (0, nil, nil) = (print("Empty Queue\n"); raise Empty)
      | remove (l, bs, nil) = remove(l, nil, rev bs)
      | remove (l, bs, f::fs) = (f, (l-1, bs, fs))
    fun size (l, _, _) = l
  end

In this implementation, a thread pool consists of a pair of Boolean variable indicating whether the pool is being destroyed or not, and a queue of functions that store the work waiting to be done.  This pair is stored in a mutable synchronous variable:

type pool = (bool * (unit -> unit) Queue.queue) SyncVar.mvar

In addition to implementing the three functions in the signature, addwork, create and destroy, a thread pool also has an execution loop that each thread is running.  This is not a publicly accessible function of the thread pool, it is only used internally.  In the code below that function is called dowork.  It checks for work to be done in the queue, and runs it if there is any.  The code also uses the function printdbg which prints a string only when the global variable debug is true.  More generally print is defined to be TextIO.print in order to work correctly with concurrent ML.

structure STPool :> SIMPLE_THREAD_POOL = 
  struct
    type pool = (bool * (unit -> unit) Queue.queue) SyncVar.mvar
    fun dowork (tp:pool,tid:int) : unit =
      let
        val _ = printdbg("Thread " ^ Int.toString(tid) ^ " wakeup.\n")
        val (destroyed, q) = SyncVar.mTake(tp)
      in
        if Queue.empty(q)
        then (SyncVar.mPut(tp,(destroyed, Queue.new));
              if destroyed then printdbg(Int.toString(tid) ^ " destroyed\n")
              else dowork(tp,tid))
        else
          let
            val (f, nq) = Queue.remove(q)
          in
            SyncVar.mPut(tp,(destroyed,nq));
            printdbg("Thread " ^ Int.toString(tid) ^ " working; work todo "
                     ^ Int.toString(Queue.size(nq)) ^ ".\n");
            f();
            dowork(tp,tid)
          end;
        ()
      end
    fun create(num: int) : pool =
      let
        val tp:pool = SyncVar.mVarInit((false,Queue.new))
        fun iter(n) =
          if n<=0 then tp
          else (CML.spawn(fn () => dowork(tp,n)); iter(n-1))
      in
        iter(num)
      end
    fun addwork (f:unit->unit,tp:pool):bool =
      let
        val (destroyed, q) = SyncVar.mTake(tp)
      in
        if destroyed then (SyncVar.mPut(tp,(true, q)); false)
        else (SyncVar.mPut(tp,(false, Queue.insert(f,q))); true)
      end
    fun destroy(tp) =
      let
        val (destroyed, q) = SyncVar.mTake(tp)
      in
        SyncVar.mPut(tp,(true,q));
        ()
    end
  end

The create function creates a synchronized variable (mvar) of type pool.  This variable is used to ensure the thread safety of the operations that are performed by the thread pool.  The threads of the thread pool are not stored in any data structure, rather they are simply all created with access to the same synchronized variable.   We also give each thread an integer id. 

Every operation that could modify the state of the thread pool first does an mTake, then does the potential modification work, and then does an mPut.  This guarantees that no other thread modifies the state of the shared structure in an inconsistent way.  Each operation should do as little work as possible after "locking" using mTake, before "unlocking" using mPut, because no other threads can run while the structure is "locked" in this manner.  The longer an interval while it is locked, the less concurrency in the resulting code.  

The addwork function,  as with all the functions (other than create), first "locks" the data structure by doing an mTake.  It then simply adds the given function to the work queue, unlocking the mvar (using mPut) in the process, and returns true to indicate that work was added.  However if the pool is being destroyed then addwork does not add to the queue, simply unlocking the mvar and returning false to indicate no work was added.  Note: every mTake must be paired with an mPut in all possible execution paths, or else no other work will be able to be done because the synchronous variable will remain "locked".  Simply returning false without doing an mPut in addwork would result in all the pending operations hanging if an addwork call was done after a destroy call.  This kind of error is very hard to debug!

The destroy function simply sets the state of the Boolean variable to true, again taking the lock first and releasing it afterwards. It first locks the structure and checks if there is any work in the queue.  If not, it releases the lock and either calls itself again or exits, depending on whether or not the destroy variable is set.  Thus note that the threads in a destroyed pool only exit when there is no more work in the queue (and no work can be added once destroy has been set to true).  If the queue is not empty, then the first work item (function) is removed and the updated queue is stored releasing the lock.  Only then is the work done - it is important that the work is not done while holding the lock or there will be no concurrency, only one work item at a time will be done!

The dowork function is run by each thread in the thread pool.  It first locks the shared thread pool object, using mGet, and checks whether the work queue is empty,  If so, it mPut's the unchanged data back (i.e., unlocking so other threads can run) and then checks whether the thread pool has been destroyed.  If ithas been destroyed then it simply and lets the thread exit, and if not it calls the dowork function again to check for more work.  On the other hand if the queue is not empty, then there is work to do.  The first element is removed from the queue and the updated queue is mPut.  This first element is then executed, and when it completes dowork is again called to check for more work.  Note that it is critical that the synchronous variable is released (mPut) before calling the work function f.  If not, there would almost be no concurrency, because only one piece of work would be running at a time.

Now that we've looked at the implementation in some detail, lets consider a simple example of using a thread pool:

fun prog() =
  let 
    val tp = STpool.create(10)
    fun iter(n) =
      if n<= 0 then ()
      else (STpool.addwork
            (fn () => (print("Doing " ^ Int.toString(n) ^ "\n"); ()),tp);
            iter(n-1))
  in 
    iter(500);
    STpool.destroy(tp);
    ()
  end

This simply creates a thread pool of size 10, adds 500 work items to the pool and then destroys the pool.  Note that the added work does not necessarily happen in the order that it was added.  There is concurrency with multiple threads allowing for considerable out of order execution.  Thus one cannot depend on the order of the different jobs in a thread pool.  You can try running this to see how it works using the code file linked to at the top of these notes.

There are some drawbacks to this implementation of thread pools. The main issue is that the use of a single synchronized variable for everything results in a waiting thread "waking" whenever a thread checks for work when it is done with a given job, because any mPut causes a waiting thread to check for work.  But an mPut is not only done when new work arrives, but also when other threads check for work.  You can see this if you comment out the destroy call in prog, in which case when the work is all done, a thread will keep waking up to check for work even without any additional addwork calls being done (i.e., with the work queue remaining empty).  In addition to this drawback, performing the queue operations inside the "global lock" of an mTake is also not ideal, but not as much of an issue as the needless waking of threads that then determine there is nothing to be done. 

Thus we now consider a second implementation that uses the built-in asynchronous message channels rather than an explicit queue.  We still make use of a synchronous variable as well, but it just holds whether or not the pool is being destroyed and a count of the number of threads in the pool (which we need for destroy in this implementation).  This pool does not explicitly keep track of the amount of pending work, unlike the first implementation.   In this implementation a pool is represented as a pair of an mvar and a mailbox.

struct 
  type pool = (bool * int) SyncVar.mvar * (unit -> unit) Mailbox.mbox
  exception Done
  fun dowork (tp:pool as (vars, box),tid:int) : unit =
    let 
      val g = Mailbox.recv(box)
    in 
      printdbg("Thread " ^ Int.toString(tid) ^ " wakeup.\n");
      g();
      dowork(tp,tid);
      ()
    end
  fun create(num: int) : pool =
    let 
      val tp:pool = (SyncVar.mVarInit((false,num)),Mailbox.mailbox())
      fun iter(n) =
        if n<=0 then tp
        else (CML.spawn(fn () => dowork(tp,n)); iter(n-1))
    in 
      iter(num)
  fun addwork (f:unit->unit,tp:pool as (svar,box)) =
    let 
      val (destroyed, nthreads) = SyncVar.mTake(svar)
    in 
      SyncVar.mPut(svar, (destroyed, nthreads));
      if destroyed then false
      else (Mailbox.send(box,f);
            true)
    end
  fun destroy(tp:pool as (svar, box)) =
    let 
      fun iter(n) =
        if n<=0 then ()
        else (Mailbox.send(box,fn () =>
                                  (printdbg("Stopping thread\n");
                                   raise Done));
              iter(n-1))
      val (destroyed, nthreads) = SyncVar.mTake(svar)
    in 
      SyncVar.mPut(svar,(true,nthreads));
      iter(nthreads)
    end
  end

The internal exception Done is used by destroy to have each thread raise an unhandled exception and thus exit when the work is finished. 

The create function is quite similar to the previous implementation, creating an empty structure and spawning the specified number of threads. 

The addwork function is also similar to the previous implementation, however it unlocks the synchronous variable before adding the work, because it only uses the variable to check whether the pool has been destroyed.  The asynchronous mailbox is used to add the work.  Note this is not a fully asynchronous call, and could still hang for an indefinite time if some other thread has acquired the lock on the synchronous variable, but the implementation does almost no work inside those locks so this should not happen.

The destroy function is a bit more involved than the previous implementation.  This is because rather than having the dowork loop of each thread check whether the pool has been destroyed, we simply add jobs to the queue that will destroy the threads.  This is only done after setting the synchronous variable such that no additional work can be added, so there will be no remaining work after these jobs.

The dowork function in this implementation is very simple.  It just gets a job from the asynchronous message queue (mailbox), runs it, and then iterates.  The loop will eventually exit when it is sent a job that raises an unhandled done exception.  Thus this loop never needs to check any shared state, but rather simply waits for work and does that work.