Recitation 14: Async Programming

In the last lecture, we saw some simple examples of programming with deferred computations in async. In this recitation, we will implement a simple echo server using async and pipes. We will also briefly discuss exception handling in async.

Pipes

Many programs have one component that produces some data and another component that consumes it. This idiom is sometimes called the producer-consumer pattern. A simple way to implement the producers and consumers is to keep them in lock step: whenever the producer generates an item, control is passed to the consumer which processes it, and finally returns control back to the producer. However, this is not very efficient, especially if not all items take the same amount of time to produce and consume.

Another different way to implement producers and consumers is to use a FIFO pipe. The producer generates data items and writes them into the pipe, and the consumer reads items from the pipe. Importantly, the pipe can contain many items, so the producer and consumer need not proceed in lock step. The async Pipe module includes functions for creating and manipulating pipes:

module Pipe : sig = 
  val create : unit -> 'a Reader.t * 'a Writer.t
  val read : 'a Reader.t -> [ `Eof | `Ok of 'a ] Deferred.t
  val write : 'a Writer.t -> 'a -> unit Deferred.t
  ...
end
Recall that the type [ `Eof | `Ok of 'a] is a polymorphic variant. For the purposes of this course, it can be treated as an ordinary datatype (whose constructors are prefixed with the backtick symbol, "`"). The Pipe module also includes other useful functions for closing the endpoints of a pipe and for mapping, filtering, and transferring pipe contents include:
val close : 'a Writer.t -> unit
val close_read : 'a Reader.t -> unit
val is_closed : ('a, 'b) t -> bool

val map : 'a Reader.t -> f:('a -> 'b) -> 'b Reader.t
val filter_map' : 'a Reader.t -> f:('a -> 'b option Deferred.t) -> 'b Reader.t
val transfer : 'a Reader.t -> 'b Writer.t -> f:('a -> 'b) -> unit Deferred.t
val transfer_id : 'a Reader.t -> 'a Writer.t -> unit Deferred.t
The notation f:('a -> b) used in map and other functions is a labled parameter. To invoke a function with a labeled parameter, use the syntax ~f:(fun x -> x).

Here is a simple example demonstrating how to move data across a pipe:

# #require "core";;
# #require "async";;
# open Core.Std;;
# open Async.Std;;
# let (r,w) = Pipe.create ();;
val r : '_a Pipe.Reader.t = <abstr>
val w : '_a Pipe.Writer.t = <abstr>
# let d = Pipe.write w "Hello World!";;
val d : unit Deferred.t = <abstr>
# Pipe.read r;;
- : [ `Eof | `Ok of string ] = `Ok "Hello World!"
# Deferred.peek d
- : unit option = Some ()
Note that the write returns a deferred unit that eventually becomes determined.

As a more substantial example, the following program implements an echo server (code adapted from Real World OCaml).


The function Tcp.Server.create starts a TCP server listening
on the specified port, and invokes handler whenever a new
client connects. The handler function is supplied with the address of
the client addr and a reader r for receiving data from
the client and a writer w for sending data to the client. The
actual Pipe.Reader.t and Pipe.Writer.t can be
obtained by invoking Reader.pipe and Writer.pipe on
these values. In this instance, the handler function
transfers data from one end of the pipe to the other
using transfer_id. The deferred
computation Deferred.never () never becomes determined, which
is appropriate because the server is intended to run forever (or until
we kill it). 


Compilation Method 1

To compile the server, save it in a file server.ml and issue the following shell command:

ocamlbuild -use-ocamlfind -pkg core -pkg async -tag thread server.byte
Then, to run the server, issue the following shell command:
./server.byte

Compilation Method 2

PS5 includes an update to the cs3110 tool to make building Async code easier. After you've installed that update, you can compile and run as follows:

cs3110 compile -t -p async server
cs3110 run server

In a different terminal, you should be able to connect to port 3110. Every line of text you type in will be echoed back on the terminal:
% telnet localhost 3110
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Hello
Hello
World
World
Zardoz
Zardoz

As an extension to this simple server, modify it to capitalize or reverse the strings received as input.

Exceptions

In general, it can be difficult to reason about control flow in a program built using async's deferred computations. This is especially true when handling exceptions. The library provides a simple function for dealing with code that might raise an exception:

val try_with : (unit -> 'a Deferred.t) -> [`Ok of 'a | `Err of exn] Deferred.t
When writing async code, you will want to use this function instead of OCaml's standard try-with construct.

As another extension, modify your server so that if creating the server fails (for example, because the TCP port is already in use) the program halts with an intuitive error message.