Problem Set 5: Simplified MapReduce

Due November 12, 2009 - 11:59PM


Note: Don't change the mli files!

Updates/Corrections

November 2: Clarified word count.


Part 1: Simplified MapReduce (45 points)

Overview

In this problem set, you will be implementing a simplified version of Google's MapReduce. Prior to starting this problem set, you should read the short paper on MapReduce to familiarize yourself with the basic MapReduce architecture. This writeup will assume that you have read sections 1–3.1 of that paper.

Map and Reduce Functions

The map and reduce functions have the following OCaml types:

val map : 'a -> 'b -> ('c * 'd) list  
val reduce : 'c -> 'd list -> 'e list

However, in the messaging protocol defined in shared/protocol.ml we only allow for the transmission of strings. Therefore, you must utilize OCaml's built-in marshalling and unmarshalling to transmit values of different types. We will explain this more thoroughly below.

For a given MapReduce application foo, your mapper code will be stored in apps/foo/mapper.ml, while your reducer code will be stored in apps/foo/reducer.ml. Both the mapper and reducer will receive their input from Program.get_input() and output their results using Program.set_output results. The specific mechanisms for these operations are described below.

Basic Execution

Execution is divided into five phases:

  1. Pre-Map:

    The controller is invoked with the name of a MapReduce application and the name of a file containing the input data. It reads the data from this file and parses it into a list of (key, value) pairs to provide as input to the mappers. It then connects to worker servers and initializes mappers on these servers.

  2. Map

    The controller sends each (key, value) pair to an available mapper. The mapper calculates the mapping function on this (key, value) pair, resulting in a list of (key, value) pairs (the types of the input and output do not have to be the same). The resulting list of (key, value) pairs is sent back to the controller application. The controller continues to send inputs to the next available mapper until all pairs have been mapped.

  3. Combine

    For each key produced by the mappers, all of its corresponding values are collected into a single list, resulting in a (key, value list) pair for that key.

  4. Reduce

    The controller connects to worker servers and initializes reducers on these servers. The (key, value list) pairs produced in the Combine phase are then sent to available reducers. The reducers perform the reduce operation and send the results back to the controller. This continues until all input has been reduced.

  5. Post-Reduce

    The results of the Reduce phase are collected and output in some manner.

Client Execution Example

Consider the canonical MapReduce example of counting the number of occurrences of each word in a set of documents. The input is a set of (document id, contents) pairs. In the Map phase, each word that occurs in the contents of a file is mapped to the pair (word, "1"), indicating that the word has been seen once. In the Combine phase, all pairs with the same first component are collected to form the pair (word, ["1"; "1"; ...; "1"]), one such pair for each word. Then in the Reduce phase, for each word, the list of ones is summed to determine the number of occurrences of that word.

For simplicity, our framework accepts only a single data file containing all the input data. However, we simulate multiple files by storing several (id, document name, contents) triples in the input file. See data/reuters/reuters000.txt for an example of the formatting. Files that match this format can be parsed by Controller.load_documents. This dataset is a collection of Reuters articles from the 1980's.

Note: We have included the mapper and reducer code for this example in apps/word_count.

  1. The controller application is started using the command controller.exe word_count filename.
  2. Controller.word_count filename reads in the contents of filename using Controller.load_documents, and splits the resulting collection of documents into (document id, contents) pairs. Then, the controller calls Map_reduce.map kv_pairs "" "apps/word_count/mapper.ml". The second argument represents shared data that is accessible to all mappers in addition to their input (key, value) pair. For this application, there is no shared data, so this argument is the empty string. In general, the shared data can be accessed by a mapper using Program.get_shared_data().
  3. Map_reduce.map initializes a mapper worker manager using Worker_manager.initialize_mappers "apps/word_count/mapper.ml" "". The Worker_manager loads in the list of available workers from the file named addresses. Each line of this file contains an worker address of the form ip_address:port_number. Each of these workers is sent the mapper code, and sends back the id of the resulting mapper. This id is combined with the address of the worker to uniquely identify each mapper. If certain workers are unavailable, this function will report this fact, but will continue to run successfully.
  4. Map_reduce.map then sends individual unmapped (id, contents) pairs to available mappers until it has received results for all pairs. Free mappers are obtained using Worker_manager.pop_worker(). Mappers should be released once their results have been received using Worker_manager.push_worker. Read controller/worker_manager.mli for more complete documentation. Once all (id, contents) pairs have been mapped, the new list of (word, "1") pairs is returned.
  5. Controller.word_count receives the results of the mapping. Controller.print_map_results is called to display the results.
  6. The list of (word, "1") pairs is then combined into (word, ["1", ..., "1"]) pairs for each word by calling Map_reduce.combine. The results are then displayed using Controller.print_combine_results.
  7. Map_reduce.reduce is then called with the results of Map_reduce.combine, along with the name of the reducer file.
  8. Map_reduce.reduce initializes the reducer worker manager by calling the appropriate Worker_manager function, which retrieves worker addresses in the same manner as for mappers.
  9. Map_reduce.reduce then sends the unreduced (word, count list) pairs to available reducers until it has received results for all input. This is performed in essentially the same manner as the map phase. When all pairs have been reduced, the new list of (word, count) tuples is returned. In this application, because the key value doesn't change, Worker_manager.reduce and the reduce workers only calculate and return the new value (in this case, count), but don't return the key (in this case, word).
  10. Controller.word_count displays the results using Controller.print_reduce_results.

Worker Execution Example

  1. Multiple workers can be run from the same directory as long as they listen on different ports. A worker server is started using worker_server.exe port_number, where port_number is the port the worker listens on.
  2. Worker_server receives a connection and spawns a thread, which calls Worker.handle_request to handle it.
  3. Worker.handle_request determines the request type. If it is an initialization request, then the new mapper or reducer is built using Program.build, which returns either the new worker id or the compilation error. See worker_server/program.ml for more complete documentation. If it is a map or reduce request, the worker id is verified as referencing a valid mapper or reducer, respectively. If the id is valid, then Program.run is called with that id and the provided input. This runs the relevant worker, which receives its input by calling Program.get_input(). Once the worker terminates, having set its output using Program.set_output, these results are returned by Program.run. If the request is invalid, then the appropriate error message, as defined in shared/protocol.ml, is prepared.
  4. Once the results of either the build or map/reduce are completed, then Worker.send_response is called, which is responsible for sending the result back to the client. If the response is sent successfully, then Worker.handle_request simply recurses, otherwise it returns a unit.

Code Structure

controller/
worker_server/

Communication Protocol

The protocol that the controller application and the workers use to communicate is stored in shared/protocol.ml.

Requests
Responses

Marshalling

Marshalling and unmarshalling of most values (with the exception of functions) is built into OCaml. Values can be converted to and from strings explicitly using Marshal.to_string and Marshal.from_string. If the values you are sending aren't strings, then you will need to use these functions to convert them to and from strings in order to use the relevant Map_reduce functions. Note that your worker must also convert the input it receives from strings back to the type it can operate on. Once it has finished, it needs to convert its output back into strings.

Make sure that the type of input that your mapper is expecting matches the type of input sent in the Mapper protocol message and that the type of output matches the type sent in the MapResults protocol message. The same is true for reducers using the reducer messages.

These functions are not typesafe, but you can still retrieve the type information by matching on the retrieved value. Note that due to the lack of type safety, if you unmarshal a string and treat it as a value of any type other than the type it was marshalled as, your program will most likely crash. You should therefore take particular care that the types match when marshalling/unmarshalling.

Your Tasks:

All code you submit must adhere to the specifications defined in the respective mli files.

You must implement functions in the following files:

Building and running your code

Note: Linux/Mac users should add ./ before all commands given.

Modules

If you would like to define additional modules, please add them to the appropriate build_*.bat file.


Part 2: Using Simplified MapReduce (30 points)

You will use the simplified MapReduce that you created in part 1 to implement three algorithms: word count, inverted index, and k-means.

Each application will use a mapper and reducer located in apps/name/mapper.ml and apps/name/reducer.ml (where name is the name of the application). We have implemented the mapper and reducer for word count, but you will have to implement the others yourself.

For each application, you must also implement the code that invokes map and reduce and prints the results as specified. To do this, you will implement three functions: Controller.word_count, Controller.inverted_index, and Controller.kmeans.

Word Count

Given a set of documents as input, count the total number of times that each word occurs. For example, if your documents were:

then these would be the word counts:
wordcount
ocaml1
is2
fun2
because1
a1
keyword1

Implementation

We have provided you with the mapper and reducer for word count. Your task is to implement the function word_count in controller/controller.ml. This function takes a filename of a document dataset (described below) as input and calculates the number of occurrences of each word.

Here is a basic outline of the code you must implement for word_count: First, load the provided dataset using the function Controller.load_documents. Convert each document into a key-value pair (id, contents), where id is the document id and contents is the text of the document. These key-value pairs should then be passed as input to the Map_reduce.map function. The Map_reduce.map function, when invoked with the mapper we have provided, will return a list of key-value pairs of the form (word, "1"), where the keys are not necessarily distinct. Print this list of pairs using Controller.print_map_results. From these key-value pairs, call Map_reduce.combine to derive the key-value pairs that you will pass to Map_reduce.reduce. These values must be output using Controller.print_combine_results. Finally, Map_reduce.reduce is called with the combine results, and will return a list of key-value pairs such that the distinct words are the keys and the values are a string representation of the number of occurrences. Print this list of (key, value list) pairs using Controller.print_reduce_results.

Inverted Index

An inverted index is a mapping from words to the documents in which they appear. For example, if these were your documents:

Document 1:

OCaml map reduce

Document 2:
fold filter ocaml
The inverted index would look like this:
worddocument
ocaml1 2
map 1
reduce 1
fold 2
filter 2

Implementation

Your task is to implement the function inverted_index in controller/controller.ml. The function inverted_index should take a dataset as input and use MapReduce to print an inverted index. As in the word count application, you must print your map results using Controller.print_map_results, your combine results using Controller.print_combine_results,and your reduce results using Controller.print_reduce_results.

Your mapper and reducer code should go in apps/inverted_index/mapper.ml and apps/inverted_index/reducer.ml, respectively. Use the function Util.split_words to tokenize a string into words.

K-means

K-means is an iterative algorithm used in machine learning to partition a set of vectors into k clusters. Each cluster is described by a centroid vector. A vector belongs to a cluster if the centroid of that cluster is closer to it than to any other centroid (as defined by Euclidean distance). At a high level, the algorithm begins with some initial set of k centroids and updates the centroids at each iteration, eventually producing the clustering given by the final centroids.

To begin running kmeans, we first need to select our initial k centroids. This is usually done heuristically, but in this project, we will have you simply select the first k vectors to be the initial centroids. Once we have the initial centroids, we can begin our iterative algorithm. Each iteration consists of two steps:

  1. For each vector, find the closest centroid by Euclidean distance.
  2. For each cluster, recalculate its centroid. To do this calculation, simply average all of the vectors in the cluster.

Note: The centroids do NOT need to be vectors from the set. We just use vectors in the set as a convenient starting point.

The 2-step iterative nature of this algorithm makes it a perfect fit for MapReduce. We perform these updates either for a fixed number of iterations, or until some convergence criterion is reached (for example, no vectors change clusters). In this project, you will just run a specified number of iterations, so you do not need to worry about convergence. At the end, the clusterings from the final iteration are output.

Implementation

Your task is to implement the function kmeans in controller/controller.ml. The function kmeans takes the filename of a dataset, a parameter k, and a maximum number of iterations. The vectors you will cluster will be the word count vectors from each document in the dataset. Therefore, at the end of this process you will have clusters of similar documents. So for each document, you will need to compute a vector mapping a word to the number of times the word appears in the document. These vectors will be need to be sparse vectors, because it is infeasible to store a count for every possible word. As described above, initialize the k centroids to be the word vectors from the first k documents. The distance between two vectors is simply the Euclidean distance between them. You should use MapReduce to perform the k-means algorithm for the specified number of iterations.

After performing k-means for the specified number of iterations, you should print the contents of the final clusters. You should not use the same print functions that word_count and inverted_index use. For each cluster, print the id and title of each document that belongs to it. Your output should be in the following format:

For example:

Cluster 1
id1 doc1
id2 doc2 ...
Cluster 2
id1 doc1
id2 doc2 ...  

You are not required to use MapReduce to compute the word count vectors, nor are you required to use MapReduce to find the documents that compose each cluster at the end of the algorithm. You also do not need to check the clusters for convergence. The reduce phase of your algorithm should support running at least k workers at once. Particularly efficient implementations that can support running more than k workers at once in the reduce phase will receive karma.

It may be useful to define a Word_vector submodule in shared/util.ml so you can reuse this code in your mapper, reducer, and controller. It may be helpful to implement the algorithm without MapReduce first, then rewrite it using MapReduce.

Note that for the above applications, your MapReduce code will probably run a lot slower than a non-MapReduce implementation of the same algorithm. This slowdown is due to the fact that the simplified MapReduce framework is not very well optimized.

To submit

Simply zip up and submit your entire ps5 folder.


Part 3: Formal Verification (15 points)

In modern cryptography, it is often useful to perform exponentiation efficiently modulo a large integer N. Given integers x, y, n, we would like to compute x^y mod n. In such applications, there is little room for error, as a bug in a program could compromise the privacy of its users. Your job is to prove the correctness of the function below. You may make use of the following results from number theory:

Theorem 1: If a, b, n are integers and 0 ≤ a, b and n ≥ 2, then
(a*b) mod n = ((a mod n) * (b mod n)) mod n.

Theorem 2: If a, b, n are integers such that gcd(a, n) = 1 and gcd(b, n) = 1, then gcd(ab, n) = 1.

Prove using the methods from Lecture 14 that the function below satisfies both partial correctness (ie. on input (a, b, n) it will return (a^b mod n) and total correctness (ie. it will terminate on any input).

let rec modexp(x, y, n : int * int * int) : int =
  (* Error checking *)
  if x < 1 then raise (Failure "invalid x")
  else if x >= n then raise (Failure "invalid x")
  else if y < 0 then raise (Failure "invalid y")
  else if n < 2 then raise (Failure "invalid n")

  (* Parameters seem valid *)
  else if y = 0 then 1
  else if (y mod 2) = 0 then
    let partial = modexp (x, y / 2, n) in
    (partial * partial) mod n
  else if (y mod 2) = 1 then
    let partial = modexp (x, y - 1, n) in
    (x * partial) mod n
  else raise (Failure "Invalid y")

You may assume the following preconditions: 0 < x < n, 0 ≤ y, 2 ≤ n,
gcd(x, n) = 1, n < sqrt(max_int)
.

Postconditions: Returns r with 0 ≤ r < n such that r = (x^y) mod n and
gcd(r, n) = 1.


Part 4: Source control (3 points)

You are required to use a source control system like CVS or SVN. Submit the log file that describes your activity. We have provided you with an SVN repository, hosted by CSUG. For information on how to get started with SVN there, read Using Subversion in the CSUGLab.

Note: Your repository name is cs3110_<netid(s)>. For example, cs3110_njl36, or if you have a partner: cs3110_dck10_njl36. Notice that the netids of groups are in alphabetical order. This repository name is what you put in place of project1 in the directions in the provided link.

If you use Windows, and are unfamiliar with the command line or Cygwin, there is a GUI based SVN client called Tortoise SVN that provides convenient context menu access to your repo.

For Debian/Ubuntu, a GUI-based solution to explore would be Rapid SVN. To install:

apt-get install rapidsvn

Mac users looking into a front-end can try SC Plugin, which claims to provide similar functionality to Tortoise. Rapid SVN might work as well.

There is also a plugin for Eclipse called Subclipse that integrates everything for you nicely.

Note that all of these options are simply graphically based alternatives to the extremely powerful terminal commands, and are by no means necessary to sucessfully utilize version control.


Part 5: Design Review Meeting (7 points)

We will be holding 15-minute design review meetings. Your group is expected to meet with one of the TAs or the instructor during their office hours, or with consultants during consulting hours. A signup schedule is available on CMS. You should be able to explain your design in about 10 minutes, leaving 5 minutes for questions. It's a good idea to practice your presentation ahead of time so you use your time effectively.

Your design review should focus on your approach to Part 1, however time permitting you may spend a few minutes at the end on Part 2.

In your design review, plan to talk about at least: