# Problem Set 5: Simplified MapReduce

### Due April 14, 2009 - 11:59PM

Note: Don't change the mli files!

March 30: We have changed the build system for workers. First, `Program.run` no longer runs asynchronously, and instead blocks until the program terminates. Mappers/Reducers no longer obtain their input using `stdin`, and instead use `Program.get_input()` Similarly, when the mapper/reducer has finished constructing its list of outputs, it now "returns" them using `Program.set_output results`. Lastly, shared data is now accessed using `Program.get_shared_data()`. We have updated the mapper and reducer for word count.

Windows imposes a limit on the number of connections a socket can accept at too low a number for this assignment. Additionally, sockets are kept alive too long after being closed. Therefore, we have included two registry fixes for this, that you can add by simply running socket_fixes.reg. You should backup your registry before doing this.

Obtaining a channel from a socket using `Unix.in_channel_of_descr` functions differently in Windows when compared to Linux/OS X. With Linux/OS X, when you close the channel, the socket is closed as well. However, when you close the channel in Windows, you must still close the socket. Therefore, whenever you close a channel, but still want to use the socket (for example, in `Worker.handle_request`) you should add the following code:

```let inp = Unix.in_channel_of_descr socket in
...
if Sys.os_type = "Win32" then close_in_noerr inp else ()
```

A new release has been posted to CMS. If you have already started work, then please just copy the files over your current working directory, with the exception of the files that you have modified (master/controller.ml, worker/worker.ml, master/master.ml, shared/hashtable.ml). Then, for worker/worker.ml, copy over the new `default_includes` variable. Lastly, if worker builds fail and you are running Linux/OS X, you may need to update the variable `ocaml_lib_dir` in worker/program.ml to point to your OCaml lib directory.

## Part 1: Simplified MapReduce (45 points)

### Overview

In this problem set, you will be implementing a simplified version of Google's MapReduce. Prior to starting this problem set, you should read the short whitepaper to familiarize yourself with the basic MapReduce architecture. This writeup will assume that you have read sections 1 - 3.1 of the whitepaper.

#### Map and Reduce Functions

The map and reduce functions can be defined by the following OCaml syntax:

```val map : 'a -> 'b -> ('c * 'd) list
val reduce : 'c -> 'd list -> 'e list
```

However, note that in the messaging protocol defined in shared/protocol.ml we only allow for the transmission of strings. Therefore, you must utilize OCaml's built-in marshalling and unmarshalling to transmit values of different types. (See below for more thorough explanation).

For a given MapReduce application `foo`, your mapper code will be stored in apps/foo/mapper.ml, while your reducer code will be stored in apps/foo/reducer.ml. Both the mapper and reducer will receive their input from `Program.get_input()`, and then output their list of results using `Program.set_output results`. specific mechanism for this operation will be explained below.

#### Basic Execution

Execution is divided into five distinct phases:

1. Pre-Map:

The master application is invoked with a MapReduce application name and the name of a file containing the data to provide as input. The data in this file is parsed into (key, value) pairs to provide as input. Mappers are initialized by the master on the available worker machines.

2. Map

The master application obtains a list of (key, value) pairs. Each pair is sent to an available mapper. Mappers calculate the result of mapping their (key, value) pair, and return the resulting (key, value) list, which is sent back to the master. As long as the master hasn't received the resulting mapping for a given (key, value) pair, it will send that input to the next available mapper until all pairs have been mapped.

3. Combiner

For each key in the result of the Map, all of its values are collected into a list, thus generating a list of (key, value list) pairs. Reducers are initialized by the master on the available worker machines.

4. Reduce

The master sends unreduced (key, value list) pairs to available reducers. Reducers calculate the result of the reduction and send it back to the master. This continues until the master received results for all pairs.

5. Post-Reduce

The result of the Reduce phase is collected and output in some manner.

#### Client Execution Example

Consider the canonical MapReduce example of counting the number of occurrences of each word in a single document. In the Map phase, each (filename, word) pair is mapped to (word, "1"), indicating that the word has been seen once. Then in the Reduce phase, a reducer will receive (word, ["1", "1", ..., "1"] as input, and sum all the ones to determine the number of times that the word has occurred. Note that our framework only accepts a single data file as input, however we approximate multiple files by storing the (filename, file content) pair in the single file. See apps/kmeans/reuters/reuters000.txt for an example of the formatting. Files that match this format can be parsed by `Controller.load_documents`. This dataset is a collection of Reuters articles from the 1980's.

Note:We have included the mapper and reducer code for this example in apps/wordcount.

1. The master application is started using the following command: master.exe word_count filename
2. `Controller.word_count filename` reads in the contents of filename, splitting it into (filename, word) pairs using `load_documents`. `Master.map kv_pairs "" "apps/wordcount/mapper.ml"` is called. The empty string contains shared data that is accessible to all mappers in addition to their input (key, value) pair. This data is accessed using `Program.get_shared_data()`.
3. `Master.map` initializes the mappers using `Worker_manager.initialize_mappers`. `Worker_manager` loads in the list of available workers from the file named addresses. Each line of this file contains an worker address of the form "ip_address:port_number". Each of these workers is sent the mapper code, and sends back the id of the resulting mapper. This id is combined with the address of the worker to uniquely identify each mapper. If certain workers are unavailable, this function will output this fact, but continue to run successfully.
4. `Master.map` then sends individual unmapped (filename, word) pairs to available mappers until it has received results for all pairs. Free mappers are obtained using `Worker_manager.pop_mapper()` and should be returned to the manager once a result has been received using `Worker_manager.push_mapper`. Read master/worker_manager.mli for more complete documentation.
Once all (filename, word) pairs have been mapped, the new list of (word, "1") pairs is returned.
5. `Controller.word_count` receives the results of the mapping, and combines the list of (word, "1") pairs into (word, ["1", ..., "1"]) pairs for each word.`Master.reduce` is then called with this input, along with the name of the reducer file.
6. `Master.reduce` initializes the reducers by calling the appropriate `Worker_manager` function, which retrieves worker addresses in the same manner as for mappers.
7. `Master.reduce` then sends the unreduced (word, count list) pairs to available reducers until it has received results for all input. This is performed in essentially the same manner as the map phase. When all pairs have been reduced, the new list of (word, count) tuples is returned. Note that because the key value doesn't change, `Worker_manager.reduce`, and the reducer workers, only calculate and return the new value (in this case count), but don't return the key (in this case word).
8. `Controller.word_count` outputs the results.

#### Worker Execution Example

1. Multiple workers can be run from the same directory, as long as they listen on different ports. A worker is started using worker.exe port_number where port_number is the port the worker will listen on.
2. `Worker_server` receives a connection, and spawns a thread which calls `Worker.handle_request` to handle it.
3. `Worker.handle_request` determines the request type. If it is an initialization request, then the new mapper or reducer is built using `Program.build`, which returns either the new worker id, or the compilation error. See program.ml for more complete documentation. This new id is returned. If it is a map or reduce request, the id is verified as referencing a valid mapper (if the request is for a map) or a valid reducer (if the request is for a reduce). `Program.run` which loads the worker, which then obtains its input using `Program.get_input ()`. `Program.run` then waits until the worker terminates, having set its output using `Program.set_output results`. These results are then sent back to the client using `send_response`. This function must be passed `Worker.send_response` so that these results can be sent.
4. Once the results of either the build or map/reduce are completed, then `Worker.send_response` is called, which is responsible for sending the result back to the client.

#### Code Structure

##### master/
• controller.ml

Main controller module for the master. First, it parses the commandline arguments, and then invokes the appropriate MapReduce application by calling `Controller.app_name`. Each application function in this module is responsible for loading the input data, and mapping it using `Master.map`. The results should then be output using `Controller.print_map_results`. Next, the list of (key, value) pairs should be converted into a list of (key, value list) pairs such that value list is a list of all the values mapped to key, and then reducing this data using `Master.reduce`. These results should be output using `Controller.print_reduce_results`.

• master.ml

This module is responsible for performing the actual map and reduce operations. A more thorough description is contained below.

• worker_manager.ml

This module provides the capability to initialize mappers and reducers, select inactive workers using the `pop` functions, perform a map/reduce task using a worker using either the `map` or `reduce` function, and then return workers which have completed their task to the pool of inactive workers using the `push` functions.

If a request fails due to some sort of network or file descriptor error, then the `Worker_manager` will automatically retry it up to 5 times. Therefore, it is acceptable if your workers intermittently output such errors, as long as they are still able to service the requests.

##### worker/
• program.ml

This module provides the ability to build and run mappers/reducers.

• worker_server.ml

This module is responsible for spawning threads to handle client connection requests. These threads simply invoke `Worker.handle_request`.

• worker.ml

This module is responsible for managing communication between the clients and the mappers/reducers that they build and then run. A more thorough description is contained below.

#### Communication Protocol

The protocol that the master application and the workers use to communicate is stored in shared/protocol.ml.

##### Requests
• `InitMapper(includes, code, shared_data)`

Initialize a mapper. Each element in the `code` list is a line in mapper.ml. `shared_data` is accessible to all mappers, as described above.

• `InitReducer(includes, code)`

Initialize a reducer. Same as above, except for no shared data.

• `MapRequest(worker_id, key, value)`

Execute mapper with id `worker_id` with `(key, value)` as input.

• `ReduceRequest(includes, key, value list)`

Execute reducer with id `worker_id` with `(key, value list)` as input.

##### Responses
• `Mapper(worker_id option, error)`

`Mapper(Some(id), _)` indicates the requested mapper was successfully built, and has the returned id. `Mapper(None, error)` indicates compilation of the mapper failed with the returned error.

• `Reducer(worker_id option, error)`

Same as above, except for a reducer

• `InvalidWorker(worker_id)`

Either a map request was made, and the `worker_id` doesn't correspond to a mapper, or a reduce request was made, and the `worker_id` doesn't correspond to a reducer.

• `RuntimeError(worker_id, error)`

Worker with id `worker_id` output the runtime error(s) stored in `error`.

• `MapResults(worker_id, (key, value) list)`

Map request sent to `worker_id` resulted in output of ```(key, value) list```.

• `ReduceResults(worker_id, value list)`

Reduce request sent to `worker_id` resulted in output of `value list`.

#### Marshalling

Marshalling and unmarshalling of most values (with the exception of functions) is built into OCaml. Values can be converted to and from strings explicitly using `Marshal.to_string` and `Marshal.from_string`. Alternatively, if you are retrieving a marshalled value from a channel, you can use `input_value`, and if you are sending a value into a channel, you can use `output_value`. These functions aren't typesafe, but you can still retrieve the type information by matching on the retrieved value.

All code you submit must adhere to the specifications defined in the respective mli files.

As in the previous problem set, you must submit a design document for parts 1 and 2. Additionally, you must implement functions in the following files:

• shared/hashtable.ml

You are required to implement a hash table according to the specifications in shared/hashtable.mli. This data structure will be useful in your MapReduce implementation.

Note: `Hashtbl.hash` is an acceptable hash function to use. However, you may not use any other part of the `Hashtbl` module.

• master/master.ml

The code in this module is responsible for performing the actual map and reduce operations by initializing the mappers/reducers, sending them input that hasn't been mapped/reduced yet, and then returning the final list of results.

The `map` and `reduce` functions must be implemented according to the specifications stored in master/master.mli, with functionality that mirrors the above example execution description. Both functions must be able to handle both silent and explicit worker failure, and individual worker slowness. However, you don't need to handle the case when all workers fail when there is no coding error in your worker code (i.e. complete network failure). Also, there should only be one active request per worker at a time, and if a worker fails, it shouldn't be returned to the `Worker_manager` using the `push` function.

Both functions share the same basic structure. First, the workers are initialized through a call to the appropriate `Worker_manager` function. There is a single shared instance of this `Worker_manager` module, and all relevant functions are thread-safe. Once the workers have been initialized, the input list should be iterated over, sending each available element to a free worker, which can be accessed using the appropriate `pop` function of `Worker_manager`. A mapper is invoked using `Worker_manager.map` providing the mappers id, and the (key, value) pair. Each mapper (and therefore invocation of `Worker_manager.map`) receives an individual (key, value) pair, and outputs a new (key, value) list, which is simply added onto the list of all previous map results. A reducer is invoked using similarly using `Worker_manager.reduce`. These functions block until a response is received, so it is important that they are invoked using a thread from the included `Thread_pool` module. Additionally, this requires that these spawned threads store their results in a shared data structure, which must be accessed in a thread-safe manner.

Once all of the input has been iterated over, it is important that input which remains unfinished, either due to a slow or failed worker, is re-submitted to available workers until all input has been processed. Once all input has been fully processed, the results are returned.

Note: It is acceptable to use `Thread.delay` with a short sleep period (0.1 seconds or less) when looping over unfinished data to send to available workers in order to prevent a flooding of workers with unnecessary work.

• worker/worker.ml

The code in this module is responsible for handling communication between the clients that request work, and the mapper/reducers that perform the work.

The `send_response` function must be implemented according to the mli specifications, and the above description. Furthermore, it must be thread-safe. Note that this function should be quite short.

The `handle_request` function must also be implemented according to the mli specifications and the above description, and must be thread-safe. This function receives a connected client socket as input, and must retrieve the `worker_request` from that socket.

If the request is for a mapper or reducer initialization, then the code must be built using `Program.build`, which returns ```(Some(id), "")``` where id is the worker id if the compilation was successful, or `(None, error)` if the compilation wasn't successful. If the build succeeds, the worker id should be stored in the appropriate mapper or reducer set (depending on the initialization request), and the id should be sent back to the client using `send_response`. If the build fails, then the error message should be sent back to the client using `send_request`.

If the request is to perform a map or reduce, then the worker id must be verified to be of the correct type, by looking it up in either the mapper or reducer set, before mapper or reducer is invoked using `Program.run`. You will pass `send_response` as one of the arguments to this function, which will automatically be invoked with either the runtime error(s) that the mapper/reducer output, or the result of the map/reduce, as the response when the worker terminates.

Note that the mapper and reducer sets are shared between all request handler threads spawned by the `Worker_server`, and therefore access to them must be thread-safe.

Note: the includes sent to `Program.build` should be those stored in `default_includes` in worker/worker.ml, as the master isn't sending along an includes list. Code in shared/util.ml is accessible to all workers by default.

Note: An `in_channel` can be extracted from a `Unix.file_descr` by using `Unix.in_channel_of_decr`. A similar function exists for `out_channel`. Remember to close all sockets and channels when finished.

### Building and running your code

Note: Linux/Mac users should add `./` before all commands given.

• build_master.bat: build master.exe
• build_worker.bat: build worker.exe
• master.exe foo bar: perform a MapReduce using application `foo` using datafile bar
• worker.exe port_number: start up a worker listening on `port_number` for requests

## Part 2: Using Simplified MapReduce (32 points)

You will use the simplified MapReduce that you created in part 1 to implement three algorithms: word count, inverted index, and k-means. Each application will use a mapper and reducer located in `apps/name/mapper.ml` and `apps/name/reducer.ml` (where `name` is the name of algorithm). We have implemented the mapper and reducer for one of the applications, but you will have to implement the rest yourself. For each application you must also implement the code that invokes `map` and `reduce` and prints the results as specified. To do this, you will implement three functions: `word_count`, `inverted_index`, and `kmeans` all located in `master/controller.ml`.

### Word Count

Given a set of documents as input, count the total number of times that each word occurs. For example, if your documents were:

• Document 1:
`ocaml is fun`
• Document 2:
`because fun is a keyword`
These would be the word counts:
wordcount
ocaml1
is2
fun2
because1
a1
keyword1

#### Implementation

We have provided you with the mapper and reducer for word count. Your task is to implement the function `word_count` in `master/controller.ml`. This function takes a filename of a document dataset (described below) as input and prints out the number of occurrences of each word. On separate lines, print each distinct word (in arbitrary order) followed by the number of times it occurred.

Here is a basic outline of the code you must implement for `word_count`: First, load the provided dataset using the function `load_documents`. Convert each document into a key-value pair ` (id, contents)`, where `id` is the document ID and `contents` is the text of the document. These key-value pairs should then be passed as input to the `map` function. The `map` function (when invoked with the mapper we have provided) will return a list of key-value pairs of the form `(word, "1")`, where the keys are not necessarily distinct. Print this list of pairs using `Controller.print_map_results`. From these key-value pairs, derive the key-value pairs that you will pass to `reduce`. Finally, `reduce` will return a list of key-value pairs such that the distinct words are the keys and the values are a string representation of the number of occurrences. Print this list of (key, value list) pairs using `Controller.print_reduce_results`.

### Inverted Index

An inverted index is a mapping from words to the documents in which they appear. For example, if these were your documents:

Document 1:

`OCaml map reduce`

Document 2:
`fold filter ocaml`
The inverted index would look like this:
worddocument
ocaml1 2
map 1
reduce 1
fold 2
filter 2

#### Implementation

Your task is to implement the function `inverted_index` in master/controller.ml. `inverted_index` should take a dataset as input and use MapReduce to print an inverted index. As in the word count application, you must print your map results using `Controller.print_map_results` and your reduce results using `Controller.print_reduce_results`.

Be sure to describe your map and reduce steps in your overview document. Your mapper and reducer code should go in apps/inverted_index/mapper.ml and apps/inverted_index/reducer.ml. Use the function `Util.split_words` to tokenize a string into words.

### K-means

The k-means algorithm is an algorithm that partitions a set of vectors into k clusters. k-means works by finding a set of k centroids (i.e. means): one per cluster. These centroids usually are initialized arbitrarily. For each vector $v$ in the dataset, we find the centroid $j$ whose centroid it is closest to. The vector $v$ is said to belong to cluster $j$. Once we have matched each vector with a cluster, we recompute the centroids. For each centroid, take the set $C$ of vectors that belong to it. The new position of this centroid is set to the mean of all of the vectors in $C$. The process of updating the means is called an iteration. We repeat this process until the means converge or after a fixed number of iterations. Usually k-means converges rapidly to a local maximum. For an illustration of this algorithm, see Wikipedia.

#### Implementation

Your task is to implement the function `kmeans` in master/controller.ml. `kmeans` takes the filename of a dataset, a parameter k, and a maximum number of iterations. The vectors you will cluster will be the word count vectors from each document in the dataset. Therefore, at the end of this process you will have clusters of similar documents. So for each document, you will need to compute a vector mapping a word to the number of times the word appears in the document. These vectors will be need to be sparse vectors, because it is infeasible to store a count for every possible word. For consistency, initialize the k centroids to be the word vectors from the first k documents. The distance between two vectors is simply the Euclidean distance between them. You should use MapReduce to perform the k-means algorithm for the specified number of iterations. Describe your map and reduce steps in your overview document.

After performing k-means for the specified number of iterations, you should print the contents of each cluster. For each cluster, print the ID and title of each document that belongs to it. Your output should be in the following format: For example:

```Cluster 1
id1 doc1
id2 doc2 ...
Cluster 2
id1 doc1
id2 doc2 ...  ```

You are not required to use MapReduce to compute the word count vectors. Nor are you required to use MapReduce to find the documents that compose each cluster at the end of the algorithm. You also do not need to check the clusters for convergence. The reduce phase of your algorithm should support running at least $k$ workers at once. Particularly efficient implementations that can support running more than $k$ workers at once in the reduce phase will receive karma.

It may be useful to define a `Word_vector` submodule in shared/util.ml, so you can reuse this code in your mapper, reducer, and controller. It may be helpful to implement the algorithm without MapReduce first. Then rewrite it using MapReduce.

Note that for the above applications, your MapReduce code will probably run a lot slower than a non-MapReduce implementation of the same algorithm. This slowdown is due to the fact that the simplified MapReduce framework is not very well optimized.

### To submit

Simply zip up and submit your entire ps5 folder.

## Part 3: Mergesort Induction (10 points)

Consider the following three functions, a simple implementation of merge sort. The specification for the function mergesort itself is provided; specifications for the helper functions merge and split are not.

```let rec merge l1 l2 =
match (l1,l2) with
[],_ -> l2
| _,[] -> l1
| x::xs,y::ys ->
if x > y then x::(merge xs l2)
else y::(merge l1 ys)

let rec split l =
match l with
[] -> ([],[])
| [x] -> ([x],[])
| x1::x2::xs ->
let (l1,l2) = split xs in
(x1::l1,x2::l2)

(* Requires: true
* Returns: l', such that elements(l') = elements(l),
*   and sorted(l') *)
let rec mergesort l =
match l with
[] -> []
| [x] -> [x]
| _ ->
let (l1,l2) = split l in
let s1 = mergesort l1 in
let s2 = mergesort l2 in
merge s1 s2
```

1. Write specifications for merge and split. You should strive to write specifications that are not just correct, but also reasonably complete - meaning that they do not exclude legitimate inputs.
2. Using the specification of mergesort and your specifications for merge and split from part a, prove the correctness of all three functions by induction.

## Part 4: Source control (5 points)

You are required to use a source control system like CVS or SVN. Submit the log file that describes your activity. If you are using CVS, this can be obtained with the command cvs log. CVS is supported in the CSUG Lab. For information on how to get started with CVS there, read the CVS/CSUG handout.

## Part 5: Design Review Meeting (8 points)

We will be holding 20-minute design review meetings. Your group is expected to meet with one of the TAs or the instructor during their office hours, or with consultants during consulting hours. A signup schedule is available on CMS. You should be able to explain your design in about 10 minutes, leaving 10 minutes for questions. It's a good idea to practice your presentation ahead of time so you use your time effectively.

Your design review should focus on your approach to Part 1, however time permitting you may spend a few minutes at the end on Part 2.

• Specifications for any modules you define.
• What representations (and invariants over them) you will use.
• Any other interesting issues you foresee.

## Karma: Recurrences and Complexity

1. Denote by Th the AVL tree of height h that has the minimum number of nodes (among those of height h). The height of a tree with 1 node is 1 by definition. Derive a recurrence relation for |Th|, the number of nodes in Th
2. Let us define Ch = |Th|+1. Find two distinct numbers c1 and c2 such that (ci)h satisfies the recurrence relation of Ch (for i=1,2).
3. Show that  |Th| = ((c1)h+2 - (c2)h+2) / √5 - 1
4. For what value of c is |Th| = Θ(ch)?
5. Part (d) implies that AVL trees are balanced. Explain.

To do: turn in the solution in the file complexity.txt.