This recitation presents an extension to async that facilitates running multiple computations simultaneously using operating system processes and sockets.
Most operating systems and programming languages provide several different concurrent programming abstractions. A process ensures that a given program to execute without interference. The operating system will typically encapsulate the memory used by a process and ensure that it cannot be tampered with by any other processes running concurrently. In contrast, a thread is a lightweighter abstraction in which several threads may share the same memory. Threads can be provided by the operating system (e.g., pthreads) or by the programming language (e.g., OCaml's Thread module). Depending on the number of cores available, several processes or threads may be executing simultaneously.
With the current implementation of OCaml, it is difficult to execute multiple threads of execution in parallel, due to limitations of the run-time system. In particular, the garbage collector, which scavenges the heap and reclaims unused blocks of memory, is not reentrant. That is, the garbage collector cannot be interrupted and subsequently restarted safely—in general, this can cause key data structures to be corrupted, leading to unexpected errors. As a result, it is not possible to use traditional means of obtaining parallelism, such as threads, in OCaml.
Async_parallel is a new library that enables programmers to work around the limitations of the OCaml run-time system by using OS-level processes. It obtains a parallel execution environment by mapping deferred computations to processes and it sets up communication channels between them using sockets. Each process manages its own memory, and executes its own copy of the OCaml run-time system, so none of the limitations discussed above apply.
The two essential functions in async_parallel are
val run
: ?where:[`Local | `On of string | `F of (unit -> string)]
-> thunk:(unit -> 'a Deferred.t)
-> 'a Deferred.t
val spawn
: ?where:[`Local | `On of string | `F of (unit -> string)]
-> (('a, 'b) Hub.t -> 'c Deferred.t)
-> (('a, 'b) Channel.t * ('c, string) Result.t Deferred.t) Deferred.t
The function run executes a function in a separate process on
the same machine (`Local) or on another machine connected
over the network (`On h and `F f). In the latter
case, the library handles initiating a network connection, setting up
communication channels using sockets, serializing the binary using
OCaml's built in marshalling library, and starting the remote
processes. The second argument contains the deferred computation to
execute. In the case of run this is just a thunk, while in
the case of spawn it is a function from a
communication Hub.t to the result. A hub is like a
channel, except that multiple entities can be connected to it. It
supports point-to-point communication as well as broadcast. This is
useful in larger programs where coordinating the behavior of processes
is useful. The final result is a deferred value that is determined
when the other process has completed and the result has been
transmitted back to the initiating process.
As a concrete example, consider the following code:
open Core.Std
open Async.Std
open Async_parallel.Std
let worker h =
Pipe.iter_without_pushback (Hub.listen_simple h) ~f:(fun (id, `Ping) ->
Hub.send h id `Pong;
> > | fun () -> `Done
let main () =
Parallel.spawn ~where:Parallel.random worker >>> fun (c, _res) ->
let rec loop () =
Channel.write c `Ping;
Channel.read c >>> fun `Pong ->
Clock.after (sec 1.) >>> loop
in
loop ();
Clock.after (sec 60.) >>> fun () -> Shutdown.shutdown 0
let () =
Parallel.init ~cluster:
{Cluster.master_machine = Unix.gethostname ();
worker_machines = ["host0"; "host1"]} ();
main ();
never_returns (Scheduler.go ())
The interesting pieces are Parallel.init, which initializes a
cluster of machines. In this case, the master node is the local
machine and there are two workers. More generally, one could use
several machines to spawn workers. The main loop simply
sends `Ping values to the workers, which respond
with `Pong. Note that the worker node can read and write to
the hub using Hub.listen_simple and Hub.send. The
latter takes the identifier of the entity to contact, which is added
to messages received. Likewise, the main node gets a channel for each
worker that it can use for communication with it.
For another example, consider the following
open Core.Std
open Async.Std
open Async_parallel.Std
let deep () =
Deferred.List.iter [ 1; 2; 10; 100 ] ~f:(fun depth ->
let rec loop i =
if i = 0
then return 0
else begin
Parallel.run (fun () -> loop (i - 1))
>>| function
| Error e -> failwith e
| Ok j -> j + 1
end
in
loop depth
>>| fun d ->
assert (d = depth))
;;
let () =
Parallel.init ();
(deep () >>> fun () -> Shutdown.shutdown 0);
never_returns (Scheduler.go ())
;;
It executes a loop that runs a cascade of processes (all on the
same machine) that count down from the integer argument. If you run
this on a Linux machine and use a utility such as top you
will see many processes being created as the calls
to Parallel.run are executed