In the last lecture, we saw some simple examples of programming with deferred computations in async. In this recitation, we will implement a simple echo server using async and pipes. We will also briefly discuss exception handling in async.
Many programs have one component that produces some data and another component that consumes it. This idiom is sometimes called the producer-consumer pattern. A simple way to implement the producers and consumers is to keep them in lock step: whenever the producer generates an item, control is passed to the consumer which processes it, and finally returns control back to the producer. However, this is not very efficient, especially if not all items take the same amount of time to produce and consume.
Another different way to implement producers and consumers is to use a FIFO pipe. The producer generates data items and writes them into the pipe, and the consumer reads items from the pipe. Importantly, the pipe can contain many items, so the producer and consumer need not proceed in lock step. The async Pipe module includes functions for creating and manipulating pipes:
module Pipe : sig = val create : unit -> 'a Reader.t * 'a Writer.t val read : 'a Reader.t -> [ `Eof | `Ok of 'a ] Deferred.t val write : 'a Writer.t -> 'a -> unit Deferred.t ... endRecall that the type [ `Eof | `Ok of 'a] is a polymorphic variant. For the purposes of this course, it can be treated as an ordinary datatype (whose constructors are prefixed with the backtick symbol, "`"). The Pipe module also includes other useful functions for closing the endpoints of a pipe and for mapping, filtering, and transferring pipe contents include:
val close : 'a Writer.t -> unit val close_read : 'a Reader.t -> unit val is_closed : ('a, 'b) t -> bool val map : 'a Reader.t -> f:('a -> 'b) -> 'b Reader.t val filter_map' : 'a Reader.t -> f:('a -> 'b option Deferred.t) -> 'b Reader.t val transfer : 'a Reader.t -> 'b Writer.t -> f:('a -> 'b) -> unit Deferred.t val transfer_id : 'a Reader.t -> 'a Writer.t -> unit Deferred.tThe notation f:('a -> b) used in map and other functions is a labled parameter. To invoke a function with a labeled parameter, use the syntax ~f:(fun x -> x).
Here is a simple example demonstrating how to move data across a pipe:
# #require "core";; # #require "async";; # open Core.Std;; # open Async.Std;; # let (r,w) = Pipe.create ();; val r : '_a Pipe.Reader.t = <abstr> val w : '_a Pipe.Writer.t = <abstr> # let d = Pipe.write w "Hello World!";; val d : unit Deferred.t = <abstr> # Pipe.read r;; - : [ `Eof | `Ok of string ] = `Ok "Hello World!" # Deferred.peek d - : unit option = Some ()Note that the write returns a deferred unit that eventually becomes determined.
As a more substantial example, the following program implements an echo server (code adapted from Real World OCaml).
The function Tcp.Server.create starts a TCP server listening on the specified port, and invokes handler whenever a new client connects. The handler function is supplied with the address of the client addr and a reader r for receiving data from the client and a writer w for sending data to the client. The actual Pipe.Reader.t and Pipe.Writer.t can be obtained by invoking Reader.pipe and Writer.pipe on these values. In this instance, the handler function transfers data from one end of the pipe to the other using transfer_id. The deferred computation Deferred.never () never becomes determined, which is appropriate because the server is intended to run forever (or until we kill it).
To compile the server, save it in a file server.ml and issue the following shell command:
ocamlbuild -use-ocamlfind -pkg core -pkg async -tag thread server.byteThen, to run the server, issue the following shell command:
./server.byte
PS5 includes an update to the cs3110 tool to make building Async code easier. After you've installed that update, you can compile and run as follows:
cs3110 compile -t -p async server cs3110 run server
% telnet localhost 3110 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. Hello Hello World World Zardoz Zardoz
As an extension to this simple server, modify it to capitalize or reverse the strings received as input.
In general, it can be difficult to reason about control flow in a program built using async's deferred computations. This is especially true when handling exceptions. The library provides a simple function for dealing with code that might raise an exception:
val try_with : (unit -> 'a Deferred.t) -> [`Ok of 'a | `Err of exn] Deferred.tWhen writing async code, you will want to use this function instead of OCaml's standard try-with construct.
As another extension, modify your server so that if creating the server fails (for example, because the TCP port is already in use) the program halts with an intuitive error message.