Problem Set 5: Simplified MapReduce

Due April 14, 2009 - 11:59PM


Note: Don't change the mli files!

Updates/Corrections

March 30: We have changed the build system for workers. First, Program.run no longer runs asynchronously, and instead blocks until the program terminates. Mappers/Reducers no longer obtain their input using stdin, and instead use Program.get_input() Similarly, when the mapper/reducer has finished constructing its list of outputs, it now "returns" them using Program.set_output results. Lastly, shared data is now accessed using Program.get_shared_data(). We have updated the mapper and reducer for word count.

Windows imposes a limit on the number of connections a socket can accept at too low a number for this assignment. Additionally, sockets are kept alive too long after being closed. Therefore, we have included two registry fixes for this, that you can add by simply running socket_fixes.reg. You should backup your registry before doing this.

Obtaining a channel from a socket using Unix.in_channel_of_descr functions differently in Windows when compared to Linux/OS X. With Linux/OS X, when you close the channel, the socket is closed as well. However, when you close the channel in Windows, you must still close the socket. Therefore, whenever you close a channel, but still want to use the socket (for example, in Worker.handle_request) you should add the following code:

let inp = Unix.in_channel_of_descr socket in
...
  if Sys.os_type = "Win32" then close_in_noerr inp else ()

A new release has been posted to CMS. If you have already started work, then please just copy the files over your current working directory, with the exception of the files that you have modified (master/controller.ml, worker/worker.ml, master/master.ml, shared/hashtable.ml). Then, for worker/worker.ml, copy over the new default_includes variable. Lastly, if worker builds fail and you are running Linux/OS X, you may need to update the variable ocaml_lib_dir in worker/program.ml to point to your OCaml lib directory.

Part 1: Simplified MapReduce (45 points)

Overview

In this problem set, you will be implementing a simplified version of Google's MapReduce. Prior to starting this problem set, you should read the short whitepaper to familiarize yourself with the basic MapReduce architecture. This writeup will assume that you have read sections 1 - 3.1 of the whitepaper.

Map and Reduce Functions

The map and reduce functions can be defined by the following OCaml syntax:

val map : 'a -> 'b -> ('c * 'd) list  
val reduce : 'c -> 'd list -> 'e list

However, note that in the messaging protocol defined in shared/protocol.ml we only allow for the transmission of strings. Therefore, you must utilize OCaml's built-in marshalling and unmarshalling to transmit values of different types. (See below for more thorough explanation).

For a given MapReduce application foo, your mapper code will be stored in apps/foo/mapper.ml, while your reducer code will be stored in apps/foo/reducer.ml. Both the mapper and reducer will receive their input from Program.get_input(), and then output their list of results using Program.set_output results. specific mechanism for this operation will be explained below.

Basic Execution

Execution is divided into five distinct phases:

  1. Pre-Map:

    The master application is invoked with a MapReduce application name and the name of a file containing the data to provide as input. The data in this file is parsed into (key, value) pairs to provide as input. Mappers are initialized by the master on the available worker machines.

  2. Map

    The master application obtains a list of (key, value) pairs. Each pair is sent to an available mapper. Mappers calculate the result of mapping their (key, value) pair, and return the resulting (key, value) list, which is sent back to the master. As long as the master hasn't received the resulting mapping for a given (key, value) pair, it will send that input to the next available mapper until all pairs have been mapped.

  3. Combiner

    For each key in the result of the Map, all of its values are collected into a list, thus generating a list of (key, value list) pairs. Reducers are initialized by the master on the available worker machines.

  4. Reduce

    The master sends unreduced (key, value list) pairs to available reducers. Reducers calculate the result of the reduction and send it back to the master. This continues until the master received results for all pairs.

  5. Post-Reduce

    The result of the Reduce phase is collected and output in some manner.

Client Execution Example

Consider the canonical MapReduce example of counting the number of occurrences of each word in a single document. In the Map phase, each (filename, word) pair is mapped to (word, "1"), indicating that the word has been seen once. Then in the Reduce phase, a reducer will receive (word, ["1", "1", ..., "1"] as input, and sum all the ones to determine the number of times that the word has occurred. Note that our framework only accepts a single data file as input, however we approximate multiple files by storing the (filename, file content) pair in the single file. See apps/kmeans/reuters/reuters000.txt for an example of the formatting. Files that match this format can be parsed by Controller.load_documents. This dataset is a collection of Reuters articles from the 1980's.

Note:We have included the mapper and reducer code for this example in apps/wordcount.

  1. The master application is started using the following command: master.exe word_count filename
  2. Controller.word_count filename reads in the contents of filename, splitting it into (filename, word) pairs using load_documents. Master.map kv_pairs "" "apps/wordcount/mapper.ml" is called. The empty string contains shared data that is accessible to all mappers in addition to their input (key, value) pair. This data is accessed using Program.get_shared_data().
  3. Master.map initializes the mappers using Worker_manager.initialize_mappers. Worker_manager loads in the list of available workers from the file named addresses. Each line of this file contains an worker address of the form "ip_address:port_number". Each of these workers is sent the mapper code, and sends back the id of the resulting mapper. This id is combined with the address of the worker to uniquely identify each mapper. If certain workers are unavailable, this function will output this fact, but continue to run successfully.
  4. Master.map then sends individual unmapped (filename, word) pairs to available mappers until it has received results for all pairs. Free mappers are obtained using Worker_manager.pop_mapper() and should be returned to the manager once a result has been received using Worker_manager.push_mapper. Read master/worker_manager.mli for more complete documentation.
    Once all (filename, word) pairs have been mapped, the new list of (word, "1") pairs is returned.
  5. Controller.word_count receives the results of the mapping, and combines the list of (word, "1") pairs into (word, ["1", ..., "1"]) pairs for each word.Master.reduce is then called with this input, along with the name of the reducer file.
  6. Master.reduce initializes the reducers by calling the appropriate Worker_manager function, which retrieves worker addresses in the same manner as for mappers.
  7. Master.reduce then sends the unreduced (word, count list) pairs to available reducers until it has received results for all input. This is performed in essentially the same manner as the map phase. When all pairs have been reduced, the new list of (word, count) tuples is returned. Note that because the key value doesn't change, Worker_manager.reduce, and the reducer workers, only calculate and return the new value (in this case count), but don't return the key (in this case word).
  8. Controller.word_count outputs the results.

Worker Execution Example

  1. Multiple workers can be run from the same directory, as long as they listen on different ports. A worker is started using worker.exe port_number where port_number is the port the worker will listen on.
  2. Worker_server receives a connection, and spawns a thread which calls Worker.handle_request to handle it.
  3. Worker.handle_request determines the request type. If it is an initialization request, then the new mapper or reducer is built using Program.build, which returns either the new worker id, or the compilation error. See program.ml for more complete documentation. This new id is returned. If it is a map or reduce request, the id is verified as referencing a valid mapper (if the request is for a map) or a valid reducer (if the request is for a reduce). Program.run which loads the worker, which then obtains its input using Program.get_input (). Program.run then waits until the worker terminates, having set its output using Program.set_output results. These results are then sent back to the client using send_response. This function must be passed Worker.send_response so that these results can be sent.
  4. Once the results of either the build or map/reduce are completed, then Worker.send_response is called, which is responsible for sending the result back to the client.

Code Structure

master/
worker/

Communication Protocol

The protocol that the master application and the workers use to communicate is stored in shared/protocol.ml.

Requests
Responses

Marshalling

Marshalling and unmarshalling of most values (with the exception of functions) is built into OCaml. Values can be converted to and from strings explicitly using Marshal.to_string and Marshal.from_string. Alternatively, if you are retrieving a marshalled value from a channel, you can use input_value, and if you are sending a value into a channel, you can use output_value. These functions aren't typesafe, but you can still retrieve the type information by matching on the retrieved value.

Your Tasks:

All code you submit must adhere to the specifications defined in the respective mli files.

As in the previous problem set, you must submit a design document for parts 1 and 2. Additionally, you must implement functions in the following files:

Building and running your code

Note: Linux/Mac users should add ./ before all commands given.

Modules

If you would like to define additional modules, please add them to the appropriate build_*.bat file.

Part 2: Using Simplified MapReduce (32 points)

You will use the simplified MapReduce that you created in part 1 to implement three algorithms: word count, inverted index, and k-means. Each application will use a mapper and reducer located in apps/name/mapper.ml and apps/name/reducer.ml (where name is the name of algorithm). We have implemented the mapper and reducer for one of the applications, but you will have to implement the rest yourself. For each application you must also implement the code that invokes map and reduce and prints the results as specified. To do this, you will implement three functions: word_count, inverted_index, and kmeans all located in master/controller.ml.

Word Count

Given a set of documents as input, count the total number of times that each word occurs. For example, if your documents were:

These would be the word counts:
wordcount
ocaml1
is2
fun2
because1
a1
keyword1

Implementation

We have provided you with the mapper and reducer for word count. Your task is to implement the function word_count in master/controller.ml. This function takes a filename of a document dataset (described below) as input and prints out the number of occurrences of each word. On separate lines, print each distinct word (in arbitrary order) followed by the number of times it occurred.

Here is a basic outline of the code you must implement for word_count: First, load the provided dataset using the function load_documents. Convert each document into a key-value pair (id, contents), where id is the document ID and contents is the text of the document. These key-value pairs should then be passed as input to the map function. The map function (when invoked with the mapper we have provided) will return a list of key-value pairs of the form (word, "1"), where the keys are not necessarily distinct. Print this list of pairs using Controller.print_map_results. From these key-value pairs, derive the key-value pairs that you will pass to reduce. Finally, reduce will return a list of key-value pairs such that the distinct words are the keys and the values are a string representation of the number of occurrences. Print this list of (key, value list) pairs using Controller.print_reduce_results.

Inverted Index

An inverted index is a mapping from words to the documents in which they appear. For example, if these were your documents:

Document 1:

OCaml map reduce

Document 2:
fold filter ocaml
The inverted index would look like this:
worddocument
ocaml1 2
map 1
reduce 1
fold 2
filter 2

Implementation

Your task is to implement the function inverted_index in master/controller.ml. inverted_index should take a dataset as input and use MapReduce to print an inverted index. As in the word count application, you must print your map results using Controller.print_map_results and your reduce results using Controller.print_reduce_results.

Be sure to describe your map and reduce steps in your overview document. Your mapper and reducer code should go in apps/inverted_index/mapper.ml and apps/inverted_index/reducer.ml. Use the function Util.split_words to tokenize a string into words.

K-means

The k-means algorithm is an algorithm that partitions a set of vectors into k clusters. k-means works by finding a set of k centroids (i.e. means): one per cluster. These centroids usually are initialized arbitrarily. For each vector v in the dataset, we find the centroid j whose centroid it is closest to. The vector v is said to belong to cluster j. Once we have matched each vector with a cluster, we recompute the centroids. For each centroid, take the set C of vectors that belong to it. The new position of this centroid is set to the mean of all of the vectors in C. The process of updating the means is called an iteration. We repeat this process until the means converge or after a fixed number of iterations. Usually k-means converges rapidly to a local maximum. For an illustration of this algorithm, see Wikipedia.

Implementation

Your task is to implement the function kmeans in master/controller.ml. kmeans takes the filename of a dataset, a parameter k, and a maximum number of iterations. The vectors you will cluster will be the word count vectors from each document in the dataset. Therefore, at the end of this process you will have clusters of similar documents. So for each document, you will need to compute a vector mapping a word to the number of times the word appears in the document. These vectors will be need to be sparse vectors, because it is infeasible to store a count for every possible word. For consistency, initialize the k centroids to be the word vectors from the first k documents. The distance between two vectors is simply the Euclidean distance between them. You should use MapReduce to perform the k-means algorithm for the specified number of iterations. Describe your map and reduce steps in your overview document.

After performing k-means for the specified number of iterations, you should print the contents of each cluster. For each cluster, print the ID and title of each document that belongs to it. Your output should be in the following format: For example:

Cluster 1
id1 doc1
id2 doc2 ...
Cluster 2
id1 doc1
id2 doc2 ...  

You are not required to use MapReduce to compute the word count vectors. Nor are you required to use MapReduce to find the documents that compose each cluster at the end of the algorithm. You also do not need to check the clusters for convergence. The reduce phase of your algorithm should support running at least k workers at once. Particularly efficient implementations that can support running more than k workers at once in the reduce phase will receive karma.

It may be useful to define a Word_vector submodule in shared/util.ml, so you can reuse this code in your mapper, reducer, and controller. It may be helpful to implement the algorithm without MapReduce first. Then rewrite it using MapReduce.

Note that for the above applications, your MapReduce code will probably run a lot slower than a non-MapReduce implementation of the same algorithm. This slowdown is due to the fact that the simplified MapReduce framework is not very well optimized.

To submit

Simply zip up and submit your entire ps5 folder.

Part 3: Mergesort Induction (10 points)

Consider the following three functions, a simple implementation of merge sort. The specification for the function mergesort itself is provided; specifications for the helper functions merge and split are not.

let rec merge l1 l2 =
  match (l1,l2) with
    [],_ -> l2
  | _,[] -> l1
  | x::xs,y::ys ->
      if x > y then x::(merge xs l2)
      else y::(merge l1 ys)

let rec split l =
  match l with
    [] -> ([],[])
  | [x] -> ([x],[])
  | x1::x2::xs ->
      let (l1,l2) = split xs in
        (x1::l1,x2::l2)

(* Requires: true
 * Returns: l', such that elements(l') = elements(l),
 *   and sorted(l') *)
let rec mergesort l =
  match l with
    [] -> []
  | [x] -> [x]
  | _ ->
     let (l1,l2) = split l in
     let s1 = mergesort l1 in
     let s2 = mergesort l2 in
       merge s1 s2

Your tasks

  1. Write specifications for merge and split. You should strive to write specifications that are not just correct, but also reasonably complete - meaning that they do not exclude legitimate inputs.
  2. Using the specification of mergesort and your specifications for merge and split from part a, prove the correctness of all three functions by induction.

Part 4: Source control (5 points)

You are required to use a source control system like CVS or SVN. Submit the log file that describes your activity. If you are using CVS, this can be obtained with the command cvs log. CVS is supported in the CSUG Lab. For information on how to get started with CVS there, read the CVS/CSUG handout.

Part 5: Design Review Meeting (8 points)

We will be holding 20-minute design review meetings. Your group is expected to meet with one of the TAs or the instructor during their office hours, or with consultants during consulting hours. A signup schedule is available on CMS. You should be able to explain your design in about 10 minutes, leaving 10 minutes for questions. It's a good idea to practice your presentation ahead of time so you use your time effectively.

Your design review should focus on your approach to Part 1, however time permitting you may spend a few minutes at the end on Part 2.

In your design review, plan to talk about at least:

Karma: Recurrences and Complexity

  1. Denote by Th the AVL tree of height h that has the minimum number of nodes (among those of height h). The height of a tree with 1 node is 1 by definition. Derive a recurrence relation for |Th|, the number of nodes in Th
  2. Let us define Ch = |Th|+1. Find two distinct numbers c1 and c2 such that (ci)h satisfies the recurrence relation of Ch (for i=1,2).
  3. Show that  |Th| = ((c1)h+2 - (c2)h+2) / √5 - 1
  4. For what value of c is |Th| = Θ(ch)?
  5. Part (d) implies that AVL trees are balanced. Explain.

To do: turn in the solution in the file complexity.txt.