What we have discussed last lecture is known as a shared-memory approach to thread communication, because the state of variables is shared among the various threads. Shared-memory communication does not work in all concurrent programming models; for example, the standard programming model of Unix (Linux, etc.) is based on processes rather than threads. The major difference is that processes do not share any state; a spawned process gets a copy of the state of its parent process.
CML discourages communication through shared memory (although SyncVar can be quite useful). Instead it takes the other major approach to managing thread communication and synchronization, called message-passing. Message passing has the benefit of being easier to reason about, and also easier to implement in a distributed system. In CML, threads communicate and synchronize using channels, mailboxes, and events (These are terms specific to CML.) Channels and mailboxes provide the ability to deliver values from one thread to another. Events give a thread the ability to synchronize on activity by multiple other threads.
structure CML = struct type 'a chan type 'a event val channel: unit -> 'a chan val send: 'a chan * 'a -> unit val recv: 'a chan -> 'a val sendEvt: 'a chan * 'a -> unit event val recvEvt: 'a chan -> 'a event ... val choose: 'a event list -> 'a event val sync: 'a event -> 'a val select: 'a event list -> 'a val wrap: 'a event * ('a -> 'b) -> 'b event ... end
A value of type T chan
is a channel that transmits values of
type T
. A new channel is created using channel
. The
channel allows two threads to synchronize: a sending thread and a receiving
thread. When a thread evaluates send(c,x)
for some channel c
and message value x
, it then blocks waiting for some thread to
receive the value by calling recv(c)
. Once one thread is waiting on
send
and another on recv
, the value x
is
transferred and becomes the result of the recv
. The two threads
then both resume execution. Similarly, if a thread performs a recv(c)
but there is no other thread doing a send already, the receiving thread blocks
waiting for a sender. This is known as synchronous message-passing
because the sender and receiver synchronize at the moment that the message is
delivered.
Synchronous channels are commonly used with events.
Such events are used to indicate when particular changes in the state of a
channel has occurred. For instance an recvEvt
corresponds to
a recv
operation on the channel and a sendEvt
corresponds to a send
operation on the channel. The
choose
function allows a thread to non-deterministically choose one of a
list of events. If the thread then sync
's on the resulting
event it resumes running when that happens. The composition of choose
and sync
is so common that it there is a compound operation called
select
.
We can use synchronous channels and events as an alternate synchronization mechanism for the simple account example we have been considering. We make the example a little more complicated by creating two channels, both of which are used the same way, just so as to illustrate the use of select with more than one channel (and because we will use it further below when we consider wrapping events).
fun prog() = let val c1: int CML.chan = CML.channel() val e1 = CML.recvEvt(c1) val c2: int CML.chan = CML.channel() val e2 = CML.recvEvt(c2) in CML.spawn(fn() => CML.send(c1,100)); CML.spawn(fn() => CML.send(c2,100)); CML.spawn(fn() => CML.send(c2,100)); CML.spawn(fn() => CML.send(c1,100)); CML.spawn(fn() => let val acct_from = ref 1000 val acct_to = ref 1000 fun server() = let (* wait until event to process *) val amount = CML.select([e1,e2]) in acct_from := !acct_from - amount; acct_to := !acct_to + amount; TextIO.print ("account balances now "^ Int.toString(!acct_from)^" "^ Int.toString(!acct_to)^"\n"); server() end in server() end); () end
Note how synchronous channels are considerably simpler than synchronous variables, in that there is no need for pairing of variables. If we had used synchronous variables to "imitate" this situation with two separate channels there would be four variables, two for each channel.
Events can be wrapped in order to generate other events. The function
wrap
in the CML structure does this. For instance, in the
previous example where we have two channels, it may be useful to know which of
the channels a message came from. Wrapping events is a way to do this, in
order to construct a new event that has as its value the pair of a channel
number and the original message:
fun prog() = let val c1: int CML.chan = CML.channel() val e1 = CML.wrap(CML.recvEvt(c1), fn (amt) => (1, amt)) val c2: int CML.chan = CML.channel() val e2 = CML.wrap(CML.recvEvt(c1), fn (amt) => (2, amt)) in CML.spawn(fn() => CML.send(c1,100)); CML.spawn(fn() => CML.send(c2,100)); CML.spawn(fn() => CML.send(c2,100)); CML.spawn(fn() => CML.send(c1,100)); CML.spawn(fn() => let val acct_from = ref 1000 val acct_to = ref 1000 fun server() = let (* wait until event to process *) val (ch, amount) = CML.select([e1,e2]) in acct_from := !acct_from - amount; acct_to := !acct_to + amount; TextIO.print ("[Chan " ^ Int.toString(ch) ^ "] account balances now "^ Int.toString(!acct_from)^" "^ Int.toString(!acct_to)^"\n"); server() end in server() end); () end
SML also has support for asynchronous message channels, where the sender does not
wait for the receiver before going on. These are called mailboxes.
Here is part of the definition of a Mailbox
:
struct Mailbox = struct type 'a mbox val mailbox : unit -> 'a mbox val send : ('a mbox * 'a) -> unit val recv : 'a mbox -> 'a ... end
Mailboxes act like channels except for being asynchronous. A mailbox provides a FIFO message queue: messages are delivered in the order they were sent. This is important because a mailbox can contain a large number of messages, as the send operation is no longer synchronized. In general when using mailboxes it is important to realize that even such a queue can be overwhelmed if the sender(s) are creating messages much faster than the receiver(s).
Continuing with the running example from above, we can use mailboxes instead of synchronous channels. Note that in this case it is no longer necessary to spawn threads for sending messages, because the send operation is non-blocking. Thus the main thread simply does the sends. A separate server thread is still used however.
fun prog() = let val m1: int Mailbox.mbox = Mailbox.mailbox() val e1 = Mailbox.recvEvt(m1) val m2: int Mailbox.mbox = Mailbox.mailbox() val e2 = Mailbox.recvEvt(m2) in Mailbox.send(m1,100); Mailbox.send(m2,100); Mailbox.send(m2,100); Mailbox.send(m1,100); CML.spawn(fn() => let val acct_from = ref 1000 val acct_to = ref 1000 fun server() = let (* wait until event to process *) val amount = CML.select([e1,e2]) in acct_from := !acct_from - amount; acct_to := !acct_to + amount; TextIO.print ("account balances now "^ Int.toString(!acct_from)^" "^ Int.toString(!acct_to)^"\n"); server() end in server() end); () end