Recitation 24: Concurrency using Message-Passing Communication

SML code examples

What we have discussed last lecture is known as a shared-memory approach to thread communication, because the state of variables is shared among the various threads. Shared-memory communication does not work in all concurrent programming models; for example, the standard programming model of Unix (Linux, etc.) is based on processes rather than threads. The major difference is that processes do not share any state; a spawned process gets a copy of the state of its parent process.

CML discourages communication through shared memory (although SyncVar can be quite useful).  Instead it takes the other major approach to managing thread communication and synchronization, called message-passing. Message passing has the benefit of being easier to reason about, and also easier to implement in a distributed system. In CML, threads communicate and synchronize using channels, mailboxes, and events (These are terms specific to CML.) Channels and mailboxes provide the ability to deliver values from one thread to another. Events give a thread the ability to synchronize on activity by multiple other threads.

Channels

structure CML = struct 
  type 'a chan
  type 'a event
  val channel: unit -> 'a chan
  val send: 'a chan * 'a -> unit
  val recv: 'a chan -> 'a
  val sendEvt: 'a chan * 'a -> unit event
  val recvEvt: 'a chan -> 'a event
  ...
  val choose: 'a event list -> 'a event
  val sync: 'a event -> 'a
  val select: 'a event list -> 'a
  val wrap: 'a event * ('a -> 'b) -> 'b event
  ...  
end 

A value of type T chan is a channel that transmits values of type T. A new channel is created using channel. The channel allows two threads to synchronize: a sending thread and a receiving thread. When a thread evaluates send(c,x) for some channel c and message value x, it then blocks waiting for some thread to receive the value by calling recv(c). Once one thread is waiting on send and another on recv, the value x is transferred and becomes the result of the recv. The two threads then both resume execution. Similarly, if a thread performs a recv(c) but there is no other thread doing a send already, the receiving thread blocks waiting for a sender. This is known as synchronous message-passing because the sender and receiver synchronize at the moment that the message is delivered.

Synchronous channels are commonly used with events.  Such events are used to indicate when particular changes in the state of a channel has occurred.  For instance an recvEvt corresponds to a recv operation on the channel and a sendEvt corresponds to a send operation on the channel.  The choose function allows a thread to non-deterministically choose one of a list of events.  If the thread then sync's on the resulting event it resumes running when that happens. The composition of choose and sync is so common that it there is a compound operation called select.

We can use synchronous channels and events as an alternate synchronization mechanism for the simple account example we have been considering.  We make the example a little more complicated by creating two channels, both of which are used the same way, just so as to illustrate the use of select with more than one channel (and because we will use it further below when we consider wrapping events).

fun prog() =
  let 
    val c1: int CML.chan = CML.channel()
    val e1 = CML.recvEvt(c1)
    val c2: int CML.chan = CML.channel()
    val e2 = CML.recvEvt(c2)
  in
    CML.spawn(fn() => CML.send(c1,100));
    CML.spawn(fn() => CML.send(c2,100));
    CML.spawn(fn() => CML.send(c2,100));
    CML.spawn(fn() => CML.send(c1,100));
    CML.spawn(fn() =>
      let 
        val acct_from = ref 1000
        val acct_to = ref 1000
        fun server() =
          let
            (* wait until event to process *) 
            val amount = CML.select([e1,e2])
          in
            acct_from := !acct_from - amount;
            acct_to := !acct_to + amount;
            TextIO.print ("account balances now "^
                          Int.toString(!acct_from)^" "^
                          Int.toString(!acct_to)^"\n");
            server()
          end
      in
        server()
      end);
    ()
  end

Note how synchronous channels are considerably simpler than synchronous variables, in that there is no need for pairing of variables.  If we had used synchronous variables to "imitate" this situation with two separate channels there would be four variables, two for each channel.

Wrapping Events

Events can be wrapped in order to generate other events.  The function wrap in the CML structure does this.  For instance, in the previous example where we have two channels, it may be useful to know which of the channels a message came from.  Wrapping events is a way to do this, in order to construct a new event that has as its value the pair of a channel number and the original message:

fun prog() =
  let 
    val c1: int CML.chan = CML.channel()
    val e1 = CML.wrap(CML.recvEvt(c1), fn (amt) => (1, amt))
    val c2: int CML.chan = CML.channel()
    val e2 = CML.wrap(CML.recvEvt(c1), fn (amt) => (2, amt))
  in
    CML.spawn(fn() => CML.send(c1,100));
    CML.spawn(fn() => CML.send(c2,100));
    CML.spawn(fn() => CML.send(c2,100));
    CML.spawn(fn() => CML.send(c1,100));
    CML.spawn(fn() =>
      let 
        val acct_from = ref 1000
        val acct_to = ref 1000
        fun server() =
          let
            (* wait until event to process *) 
            val (ch, amount) = CML.select([e1,e2])
          in
            acct_from := !acct_from - amount;
            acct_to := !acct_to + amount;
            TextIO.print ("[Chan " ^ Int.toString(ch)
                          ^ "] account balances now "^
                          Int.toString(!acct_from)^" "^
                          Int.toString(!acct_to)^"\n");
            server()
          end
      in
        server()
      end);
    ()
  end

 

Mailboxes

SML also has support for asynchronous message channels, where the sender does not wait for the receiver before going on.  These are called mailboxes.  Here is part of the definition of a Mailbox:

struct Mailbox = struct
  type 'a mbox
  val mailbox : unit -> 'a mbox
  val send : ('a mbox * 'a) -> unit
  val recv : 'a mbox -> 'a
  ...
end

Mailboxes act like channels except for being asynchronous. A mailbox provides a FIFO message queue: messages are delivered in the order they were sent. This is important because a mailbox can contain a large number of messages, as the send operation is no longer synchronized.  In general when using mailboxes it is important to realize that even such a queue can be overwhelmed if the sender(s) are creating messages much faster than the receiver(s).

Continuing with the running example from above, we can use mailboxes instead of synchronous channels.  Note that in this case it is no longer necessary to spawn threads for sending messages, because the send operation is non-blocking.  Thus the main thread simply does the sends.  A separate server thread is still used however.

fun prog() =
  let 
    val m1: int Mailbox.mbox = Mailbox.mailbox()
    val e1 = Mailbox.recvEvt(m1)
    val m2: int Mailbox.mbox = Mailbox.mailbox()
    val e2 = Mailbox.recvEvt(m2)
  in
    Mailbox.send(m1,100);
    Mailbox.send(m2,100);
    Mailbox.send(m2,100);
    Mailbox.send(m1,100);
    CML.spawn(fn() =>
      let 
        val acct_from = ref 1000
        val acct_to = ref 1000
        fun server() =
          let
            (* wait until event to process *) 
            val amount = CML.select([e1,e2])
          in
            acct_from := !acct_from - amount;
            acct_to := !acct_to + amount;
            TextIO.print ("account balances now "^
                          Int.toString(!acct_from)^" "^
                          Int.toString(!acct_to)^"\n");
            server()
          end
      in
        server()
      end);
    ()
  end