Recitation 18: Working with Async

In lecture, you async at a high level, Deferred.t, upon, bind, the >>= notation, and return. In this recitation, we'll go into a bit more detail and practice with these concepts. We'll use them to implement some of the functions from the Async standard library.

Getting started with Async

To use the Async library, you must compile with it. The secret but easy way to do this with the cs3110 tools is to create a .cs3110 file containing the following lines:

compile.opam_modules = async core
compile.threads      = true

(Note that although async and threads are different models, the implementation of async provides integration with threads, so working with the library requires you to work with ocaml threads).

In utop, you need to run #require "async". You may wish to put this in your .ocamlinit.

All of the async functionality is included in the Async.Std module. It is common practice to call modules that are intended to be opened "Std", and this practice is followed in async. Thus most programs start with open Async.Std, but do not open Deferred or open Scheduler. The expectation is that the modules and functions defined directly in Std should be familiar to anyone reading async code.

You should consult the 3110 async documentation, which is also linked on the course website next to lecture 18. This is UNOFFICIAL DOCUMENTATION. It is not generated directly from the Async source, but rather is made to look as if it was. However, we hope that it will be useful for you; it hides much of the complexity of the full async library, while giving you more copious documentation of the parts it does expose. You may occasionally find a slight mismatch between this and the official documentation, in which case the official documentation is correct, but we hope that we've ironed out most of the discrepancies.

The official documentation is linked from the unofficial documentation linked above.

To make your async programs do anything, you must make a single call to Scheduler.go (). This starts the scheduler, which then begins calling any callbacks that you have registered with the scheduler.

Review of >>=

Let's start with an example of using >>= to build an asynchronous function:

utop # open Async.Std
utop # let lookup_line () =
         Reader.read_lines "foo.txt" >>= fun lines_of_foo ->
         Reader.read_lines (List.head lines) >>= fun lines_of_file ->
         return (take 2 lines_of_file);;

lookup_line : unit -> string list Deferred.t

utop # lookup_line () >>= fun line ->
       return (print_endline line);;

- : unit Deferred.t

Here we use >>= to build an asynchronous function lookup_line using >>=. Always keep in mind that d >>= fun x -> e starts the computation d, and when d is done, it then starts the computation e with x bound to the value produced by d. This is completely analogous to the synchronous let x = d in e. In the synchronous world, we might write a function

let lookup_line () =
    let lines_of_foo  = read_lines "foo.txt" in
    let lines_of_file = read_lines (List.hd lines_of_foo) in
    take 2 lines_of_file

Remember also that return is not a special keyword like it is in other languages. It is a very simple function: it takes a non-deferred value and creates a deferred containing that value.

Implementing Deferred.both

Let's write a function that takes two deferred values, and returns another value that is determined when both of the inputs are (this function is provided by Async, see Deferred.both)

val both : 'a Deferred.t -> 'b Deferred.t -> ('a * 'b) Deferred.t

let both d1 d2 =
  d1 >>= fun v1 ->
  d2 >>= fun d2 ->
  return (d1, d2)

Note that since both takes in two deferreds, the corresponding computations have already started by the time both is called. For example, if you called

both (after (sec 30)) (after (sec 60)) >>= fun _ ->
print_endline "done!";
return ()

then "done" would be printed after about 60 seconds, because both timers are started at the same time (before both gets called).

Implementing Deferred.all

Deferred.all is like Deferred.both, except that it takes a list of deferreds instead of a pair.

val all : 'a Deferred.t list -> 'a list Deferred.t
let rec all ds = match ds with
  | [] -> return []
  | dh::tl -> dh     >>= fun h ->
              all tl >>= fun t ->
              return (h::t)

Again, if you think of f v >>= fun x -> as let x = f v in, you can read this code very easily: it would look like

let rec copy ds = match ds with
  | []     -> []
  | dh::tl -> let h = dh      in
              let t = copy tl in

If you want to, you can use the normal List.fold to implement all:

val def_cons : 'a Deferred.t -> 'a list Deferred.t -> 'a list Deferred.t
let def_cons dh dt =
  dh >>= fun h ->
  dt >>= fun t ->
  return h::tl

let all = List.fold_left def_cons (return [])


Let's consider a function that works like but works with asynchronous functions (functions that return Deferred.t):

val map : ('a -> 'b Deferred.t) -> 'a list -> 'b list Deferred.t

Let's implement this like normal map:

let rec map1 f dxs = match dxs with
| [] -> return []
| dh::tl -> f dh      >>= fun h ->
            map1 f tl >>= fun tl ->
            return (h::tl)

Since f is an asynchronous function, we should think about when the calls to f occur. For example, consider the following call to map:

let result = map after [sec 1; sec 2; sec 3; sec 4]

Do all of the calls to after happen at once? If so, we'd expect result to become determined after about 4 seconds. Or, do we only call after (sec 2) after after (sec 1) completes? If it's the latter case, we'd expect result to become determined after about 10 seconds.

Examining the code, we see that the recursive call to map is only made /after/ the deferred returned by f is bound. Therefore, the second timer is only started after the first completes.

Here's an alternative implementation:

let rec map2 f dxs = match dxs with
  | [] -> return []
  | dh::tl -> let dfh = f h in
              let dtl = map2 f tl in
              dfh >>= fun h ->
              dtl >>= fun tl ->
              return (h::tl)

This implementation starts all the timers at once; it makes the recursive call before binding the value from f. Therefore, with this implementation, the result would be determined after about 4 seconds.

Here's a nice easy way to write a deferred map:

let map2 f dxs = all ( f dxs)

Is this equivalent to map1 or map2?

Async provides a function, and it actually works both ways. ~how:`Sequential f l

works like map1: it starts the second invocation to f only after the result of the first is bound. On the other hand ~how:`Parallel f l

works like map2: it starts all calls to f immediately.

What are ~how and `Parallel? uses two new features of OCaml that you haven't seen before: optional arguments and polymorphic variants.

Functions can have optional arguments with default values. These functions have types like val f : ?arg1:int -> bool -> ?arg3:char -> string This denotes a function with two optional arguments (of types int and char) and a non-optional argument of type string.

You can call f by just passing the non-optional argument: f true If you wish to specify the optional arguments, you must do so by name: f ~arg1:17 true Because you have to explicitly name the arguments, you can provide them in any order. That is f ~arg3:'x' ~arg1:17 false is perfectly valid.

The second new feature here is polymorphic variants. These types are just like the variant types you already know and love, except that they do not need to be declared ahead of time. For example, one can write a function with the type val g : int -> [Ok of int |Bogus] the type [`Ok of int | `Bogus] is a variant type with two constructors `Ok and `Bogus.