Recitation 19: Async Parallel

This recitation presents an extension to async that facilitates running multiple computations simultaneously using operating system processes and sockets.

Processes and Threads

Most operating systems and programming languages provide several different concurrent programming abstractions. A process ensures that a given program to execute without interference. The operating system will typically encapsulate the memory used by a process and ensure that it cannot be tampered with by any other processes running concurrently. In contrast, a thread is a lightweighter abstraction in which several threads may share the same memory. Threads can be provided by the operating system (e.g., pthreads) or by the programming language (e.g., OCaml's Thread module). Depending on the number of cores available, several processes or threads may be executing simultaneously.

OCaml Run-Time Limitations

With the current implementation of OCaml, it is difficult to execute multiple threads of execution in parallel, due to limitations of the run-time system. In particular, the garbage collector, which scavenges the heap and reclaims unused blocks of memory, is not reentrant. That is, the garbage collector cannot be interrupted and subsequently restarted safely—in general, this can cause key data structures to be corrupted, leading to unexpected errors. As a result, it is not possible to use traditional means of obtaining parallelism, such as threads, in OCaml.

The async_parallel Library

Async_parallel is a new library that enables programmers to work around the limitations of the OCaml run-time system by using OS-level processes. It obtains a parallel execution environment by mapping deferred computations to processes and it sets up communication channels between them using sockets. Each process manages its own memory, and executes its own copy of the OCaml run-time system, so none of the limitations discussed above apply.

The two essential functions in async_parallel are

val run
   : ?where:[`Local | `On of string | `F of (unit -> string)] 
  -> thunk:(unit -> 'a Deferred.t) 
  -> 'a Deferred.t 

val spawn 
   : ?where:[`Local | `On of string | `F of (unit -> string)] 
  -> (('a, 'b) Hub.t -> 'c Deferred.t) 
  -> (('a, 'b) Channel.t * ('c, string) Result.t Deferred.t) Deferred.t
The function run executes a function in a separate process on the same machine (`Local) or on another machine connected over the network (`On h and `F f). In the latter case, the library handles initiating a network connection, setting up communication channels using sockets, serializing the binary using OCaml's built in marshalling library, and starting the remote processes. The second argument contains the deferred computation to execute. In the case of run this is just a thunk, while in the case of spawn it is a function from a communication Hub.t to the result. A hub is like a channel, except that multiple entities can be connected to it. It supports point-to-point communication as well as broadcast. This is useful in larger programs where coordinating the behavior of processes is useful. The final result is a deferred value that is determined when the other process has completed and the result has been transmitted back to the initiating process.

As a concrete example, consider the following code:

open Core.Std
open Async.Std
open Async_parallel.Std
let worker h = 
  Pipe.iter_without_pushback (Hub.listen_simple h) ~f:(fun (id, `Ping) ->
    Hub.send h id `Pong;
> > | fun () -> `Done
let main () =
  Parallel.spawn ~where:Parallel.random worker >>> fun (c, _res) ->
  let rec loop () =
    Channel.write c `Ping; c >>> fun `Pong ->
    Clock.after (sec 1.) >>> loop
  loop ();
  Clock.after (sec 60.) >>> fun () -> Shutdown.shutdown 0
let () =
  Parallel.init ~cluster:
    {Cluster.master_machine = Unix.gethostname ();
    worker_machines = ["host0"; "host1"]} ();
  main ();
  never_returns (Scheduler.go ()) 
The interesting pieces are Parallel.init, which initializes a cluster of machines. In this case, the master node is the local machine and there are two workers. More generally, one could use several machines to spawn workers. The main loop simply sends `Ping values to the workers, which respond with `Pong. Note that the worker node can read and write to the hub using Hub.listen_simple and Hub.send. The latter takes the identifier of the entity to contact, which is added to messages received. Likewise, the main node gets a channel for each worker that it can use for communication with it.

For another example, consider the following

open Core.Std
open Async.Std
open Async_parallel.Std

let deep () =
  Deferred.List.iter [ 1; 2; 10; 100 ] ~f:(fun depth ->
    let rec loop i =
      if i = 0
      then return 0
      else begin (fun () -> loop (i - 1))
        >>| function
        | Error e -> failwith e
        | Ok j -> j + 1
    loop depth
    >>| fun d ->
    assert (d = depth))

let () =
  Parallel.init ();
  (deep () >>> fun () -> Shutdown.shutdown 0);
  never_returns (Scheduler.go ())
It executes a loop that runs a cascade of processes (all on the same machine) that count down from the integer argument. If you run this on a Linux machine and use a utility such as top you will see many processes being created as the calls to are executed