Problem Set 5: MapReduce

Due Thursday November 10, 2011, 11:59pm


Note: Don't change the mli files!

Version: 12
Last Modified: November 9, 22:28
Changes:

Part 1: MapReduce (60 points)

In this problem set, you will be implementing a simplified version of Google's MapReduce. Prior to starting this problem set, you should read the short paper on MapReduce to familiarize yourself with the basic MapReduce architecture. This writeup will assume that you have read sections 1–3.1 of that paper.

Overview

Map and Reduce Functions

The map and reduce functions have the following OCaml types:

val map : 'a -> 'b -> ('c * 'd) list  
val reduce : 'c -> 'd list -> 'e list

These functions will be computed in a distributed fashion by several agents running concurrently. The agents communicate with each other using a messaging protocol defined in shared/protocol.ml. However, that protocol only allows for the transmission of strings, so you must use OCaml's built-in marshalling and unmarshalling facilities to transmit values of different types. We will explain this more thoroughly below.

For a given MapReduce application foo, the application-specific code is stored in the directory apps/foo/. The code for the mappers is in apps/foo/mapper.ml and the code for the reducers is in apps/foo/reducer.ml. There is also a controller in the file apps/foo/foo.ml to handle the initialization of and communication with the mappers and reducers, reading inputs, and printing results.

The mappers and reducers receive their input by calling Program.get_input and communicate their results by calling Program.set_output. The specific mechanisms for these operations are described below.

Basic Execution

Execution is divided into five phases:

  1. Pre-Map:

    controller.exe is invoked with the name of a MapReduce application on the command line along with other application-specific information (e.g., input file name, any other parameters as required by the application). This information is passed to main method of the controller in the application directory. This method reads the data from the input file and parses it into a list of (key, value) pairs to provide as input to the mappers. It then connects to worker servers and initializes mappers on these servers.

  2. Map

    The controller sends each (key, value) pair to an available mapper. The mapper calculates the mapping function on this (key, value) pair, resulting in a list of (key, value) pairs (the types of the input and output do not have to be the same). The resulting list of (key, value) pairs is sent back to the controller. The controller continues to send inputs to the next available mapper until all pairs have been mapped.

  3. Combine

    For each key produced by the mappers, all of its corresponding values are collected into a single list, resulting in a (key, value list) pair for that key.

  4. Reduce

    The controller connects to worker servers and initializes reducers on these servers. The (key, value list) pairs produced in the Combine phase are then sent to available reducers. The reducers perform the reduce operation and send the results back to the controller. This continues until all input has been reduced.

  5. Post-Reduce

    The controller collects the results of the Reduce phase and outputs them in some manner.

Client Execution Example

Consider the canonical MapReduce example of counting the number of occurrences of each word in a set of documents. The input is a set of (document id, contents) pairs. In the Map phase, each word that occurs in the contents of a file is mapped to the pair (word, "1"), indicating that the word has been seen once. In the Combine phase, all pairs with the same first component are collected to form the pair (word, ["1"; "1"; ...; "1"]), one such pair for each word. Then in the Reduce phase, for each word, the list of ones is summed to determine the number of occurrences of that word.

For simplicity, our framework accepts only a single data file containing all the input documents. Each document is represented in the input file as a (id, document name, contents) triple. See data/reuters.txt or the shorter files data/test1.txt or data/test2.txt for an example of the formatting. Document files can be loaded and parsed by calling Util.load_documents. The dataset data/reuters.txt is a collection of Reuters articles from the 1980's.

We have included a full implementation of this application in apps/word_count. Here is a detailed explanation of the sequence of events.

  1. The controller application is started using the command controller.exe word_count filename. It immediately calls the main method of apps/word_count/word_count.ml, passing it the argument list. The main method calls controller/Map_reduce.map_reduce, which is common controller code for performing a simple one-phase MapReduce on documents. Other more involved applications have their own controller code.
  2. The documents in filename are read in and parsed using Util.load_documents, which splits the collection of documents into {id; title; contents} triples.
  3. The controller calls Map_reduce.map kv_pairs "" "apps/word_count/mapper.ml". The second argument represents shared data that is accessible to all mappers in addition to their input (key, value) pair. For this application, there is no shared data, so this argument is the empty string. In general, the shared data can be accessed by a mapper using Program.get_shared_data().
  4. Map_reduce.map initializes a mapper worker manager using Worker_manager.initialize_mappers "apps/word_count/mapper.ml" "". The Worker_manager loads in the list of available workers from the file named addresses. Each line of this file contains a worker address of the form ip_address:port_number. Each of these workers is sent the mapper code. The worker creates a mapper with that code and and sends back the id of the resulting mapper. This id is combined with the address of the worker to uniquely identify that mapper. If certain workers are unavailable, this function will report this fact, but will continue to run successfully.
  5. Map_reduce.map then sends individual unmapped (id, contents) pairs to available mappers until it has received results for all pairs. Free mappers are obtained using Worker_manager.pop_worker(). Mappers should be released once their results have been received using Worker_manager.push_worker. Read controller/worker_manager.mli for more complete documentation. Once all (id, contents) pairs have been mapped, the new list of (word, "1") pairs is returned.
  6. Map_reduce.map_reduce receives the results of the mapping. If desired, Util.print_map_results can be called here to display the results of the Map phase for debugging purposes.
  7. The list of (word, "1") pairs is then combined into (word, ["1"; ...; "1"]) pairs for each word by calling Map_reduce.combine. The results can be displayed using Util.print_combine_results.
  8. Map_reduce.reduce is then called with the results of Map_reduce.combine, along with the (empty) shared data string and the name of the reducer file apps/word_count/reducer.ml.
  9. Map_reduce.reduce initializes the reducer worker manager by calling the appropriate Worker_manager function, which retrieves worker addresses in the same manner as for mappers.
  10. Map_reduce.reduce then sends the unreduced (word, count list) pairs to available reducers until it has received results for all input. This is performed in essentially the same manner as the map phase. When all pairs have been reduced, the new list of (word, count) tuples is returned. In this application, because the key value doesn't change, Worker_manager.reduce and the reduce workers only calculate and return the new value (in this case, count), but don't return the key (in this case, word).
  11. The results are returned to the main method of word_count.ml, which displays them using Util.print_reduce_results.

Worker Execution Example

  1. Multiple workers can be run from the same directory as long as they listen on different ports. A worker server is started using worker_server.exe port_number, where port_number is the port the worker listens on.
  2. Worker_server receives a connection and spawns a thread, which calls Worker.handle_request to handle it.
  3. Worker.handle_request determines the request type. If it is an initialization request, then the new mapper or reducer is built using Program.build, which returns either the new worker id or the compilation error. See worker_server/program.ml for more complete documentation. If it is a map or reduce request, the worker id is verified as referencing a valid mapper or reducer, respectively. If the id is valid, then Program.run is called with that id and the provided input. This runs the relevant worker, which receives its input by calling Program.get_input(). Once the worker terminates, having set its output using Program.set_output, these results are returned by Program.run. If the request is invalid, then the appropriate error message, as defined in shared/protocol.ml, is prepared.
  4. Once the results of either the build or map/reduce are completed, then Worker.send_response is called, which is responsible for sending the result back to the client. If the response is sent successfully, then Worker.handle_request simply recurses, otherwise it returns unit.

Code Structure

In this section we describe the organization of the MapReduce code.

controller/

worker_server/


Communication Protocol

The protocol that the controller application and the workers use to communicate is stored in shared/protocol.ml.

Requests

Responses

Marshalling

Only string data can be communicated between agents. Values can be converted to and from strings explicitly using Util.marshal, which calls the OCaml built-in Marshal.to_string, and Util.unmarshal, which calls Marshal.from_string. You can send strings via communication channels without marshalling, but other values need to be marshalled. Your mapper or reducer must also convert the input it receives from strings back to the appropriate type it can operate on, and once it has finished, it needs to convert its output into strings to communicate it back.

Note that marshalling is not type safe. If you unmarshal a string and treat it as a value of any type other than the type it was marshalled as, your program will most likely crash. You should therefore take particular care that the types match when marshalling/unmarshalling. Make sure that the type of marshalled input sent to your mapper matches the type that the mapper expects, and that the type of the marshalled results that the mapper sends back matches the type expected. The same is true for reducers using the reducer messages. You can still retrieve the type information by matching on the retrieved value.


Your Tasks

All code you submit must adhere to the specifications defined in the respective .mli files.

You must implement functions in the following files:

Building and running your code

We have provided a Makefile and a build script build.bat that you can use to build your project.

Modules

If you would like to define additional modules, please add them to the appropriate build.bat file or Makefile.



Part 2: Using MapReduce (30 points)

You will use the simplified MapReduce that you created in part 1 to implement three applications: inverted_index, page_rank, and newton.

Each application will require a mapper apps/xxx/mapper.ml and reducer apps/xxx/reducer.ml, where xxx is the name of the application, as well as a controller apps/xxx/xxx.ml. We have supplied full implementations of a simple application word_count and a more involved application kmeans from a previous semester as models.

Inverted Index (5 points)

An inverted index is a mapping from words to the documents in which they appear. For example, if these were your documents:

Document 1:

OCaml map reduce

Document 2:
fold filter ocaml
The inverted index would look like this:
worddocument
ocaml1 2
map 1
reduce 1
fold 2
filter 2

Each document must appear only once in each word's index. A document with ID 1 containing "a a a" would result in "a" -> 1 rather than "a" -> 1, 1, 1

Behavior is undefined if any two documents share the same key.

Implementation

To implement this application, you should take a dataset of documents (such as data/reuters.txt) as input and use MapReduce to produce an inverted index. This can be done in a way very similar to word_count. In particular, you can use Map_reduce.map_reduce. Print your final results using Util.print_reduce_results.

Your mapper and reducer code should go in apps/inverted_index/mapper.ml and apps/inverted_index/reducer.ml, respectively, and your controller code should go in apps/inverted_index/inverted_index.ml.

Simplified PageRank (10 points)

PageRank is a link analysis algorithm used by Google to weight the importance of different websites on the Internet. You will be implementing a very simplified, iterative version of PageRank. To begin running simplified PageRank, we initialize each website with PageRank of 1/n, where n is the total number of websites. Each step of the iteration has two parts: For each website, we divide its PageRank by the number of links out of it, and send this amount to each website linked to. Then, for all websites, we sum up all values being sent into it, thus obtaining a new PageRank for each website. The number of iterations is supplied on the command line.

For more details on simplified PageRank, see Section 14.3 in Networks, Crowds, and Markets: Reasoning About a Highly Connected World by Cornell Professors David Easley and Jon Kleinberg.

Note: There is an unfortunate error in the example on page 408 of the networks book. On the second iteration, page A should have a PageRank of 5/16, not 3/16.

Implementation

Your mapper and reducer code should go in apps/page_rank/mapper.ml and apps/page_rank/reducer.ml, respectively, and your controller code should go in apps/page_rank/page_rank.ml.

In order to simulate websites, we have defined a new type, website. Each line is of the format pageid@pagetitle@links. The field links is of the form out1,out2,...,outm, where each outi is the pageid of a link out of the page. The file data/websites.txt contains the same example as on pages 407 and 408 of the Networks book draft linked above.

After performing PageRank for the specified number of iterations, you should print the final PageRank values. We have provided you with a function print_page_ranks, which takes in a list of tuples of pageids and PageRanks, and prints them to the screen.

Behavior is undefined if any two pages share the same key. Behavior is also undefined if any page has zero outgoing links. However, behavior is defined if a page has zero incoming links: it naturally ends up with zero PageRank. When printing the PageRanks for each page, you must explicitly list that page with a PageRank of zero.

No page will link to another page twice, nor link to a nonexistent page; the outgoing links list for each page will contain unique valid IDs.

Newton's Fractals (15 points)

In this part, you will implement the application newton which will use Newton's Method to compute zeroes of polynomials on complex numbers. Newton's method is a strategy for approximating zeroes of functions which starts at an arbitrary guess and repeatedly improves it until it (rather rapidly) reaches a 0. Recall that a complex polynomial has as many zeroes as its degree. A natural question to ask is: Given a point in the complex plane, which zero will Newton's method converge to when started at that point? It turns out that this is not that easy to predict, but the results are quite interesting. If we assign a color to each zero, then color each point in the complex plane with the color of the zero Newton's method converges to starting at that point, the result is a beautiful fractal.

Julia Fractal (click for larger version)

Implementation

You will be using MapReduce to compute in parallel the zero that Newton's method converges to starting from each point. To get a picture like this, we also need to keep track of the number of iterations Newton's method takes so that we can shade the positions darker if they take longer to converge.

Your application should take two command line arguments. The first is a file containing the complex numbers we will be starting Newton's method from. The file will have one pair of real numbers which specify a complex number on each line. The second is a string of floating point numbers like "3.0, 4.0, -1.0, 0.0". This specifies the polynomial we will use. We provide a function polynomial_of_string that takes input of this form and evaluates to something of type polynomial which is just a list of floats. This list corresponds to a polynomial that has degree equal to the length of the list minus one. The coefficient of the highest order term is the first entry, the coefficient of the second highest order term is the second entry and so on. If any term has coefficient 0, a zero must appear in the list. So if you input "3.0, 4.0, -1.0, 0.0", this will be mapped to the polynomial 3x^3 + 4x^2 - x.

In the map phase, you will take a point and will return the pair of the zero Newton's method converges to as the key and a tuple of the starting complex number and the number of iterations Newton's method takes as the value. Since these are just approximations, you should cut off both components of the zero at 1 decimal point (e.g. 1.55 becomes 1.5), or else we will have far more unique zeroes than actually exist. We will not test your code with polynomials that have zeroes so close together that this approximation does not work. Unfortunately, Newton's method does not necessarily converge from any point. It can fail if the derivative it uses to compute the next approximation evaluates to zero, or it can simply fail to converge. If Newton's method started at some point does not converge (we say that we have failed to converge if we have gone through 500 iterations of Newton's method without reaching a zero) you must store this as the key instead of the position of the zero. (Hint: the option type is useful here). There are some helper functions in Util that should be useful in implementing the mapper. Among these, we provide a type complex which is a record along with all of the functions over complex numbers that are necessary to find the approximation of the zero.

As usual, the combine phase will simply group together all the (starting point, number of iterations) pairs that share a zero as a key. The reduce phase should normalize the number of iterations to the range (0,1) so that it is convenient for drawing the fractal. One simple strategy for doing this is to divide the number of iterations associated with each point by some number greater than the maximum number of iterations for any point that causes Newton's method to converge to that zero. All points that do not result in convergence to any zero should be normalized to precisely 1.

We provide to you two scripts for drawing the fractal that you have computed:

Your mapper and reducer code should go in apps/newton/mapper.ml and apps/newton/reducer.ml, respectively, and your controller code should go in apps/newton/newton.ml. This app requires a set of points as an input. An input file has lines of the form xi, yi. We provide you with the function Util.load_coordinates to load such a file into a list of points in the complex plane. Some sample files of this format are c.txt, c2.txt, cold.txt. To generate more points, we have provided you with a script data/make_points.py. You should print your reduce results using the functions we have given you in newton.ml.

Note that for the above applications, your MapReduce code will probably run a lot slower than a non-MapReduce implementation of the same algorithm. This slowdown is due to the fact that there is a lot of overhead and the simplified MapReduce framework is not very well optimized. Performance gains are only realized when doing this on a massive scale.

To submit

Simply zip up and submit your entire ps5 folder.


Part 3: Source Control (3 points)

You are required to use a source control system like Subversion or Git. Submit the log file that describes your activity. We will provided you with an SVN repository, hosted by CSUG.

For information on how to get started with SVN there, read Using Subversion in the CSUGLab.

Note: Your repository name is cs3110_netid1_netid2. The netIDs of groups are in alphabetical order. For example: cs3110_gez3_rz13. This repository name is what you put in place of project1 in the directions in the provided link.

If you use Windows, and are unfamiliar with the command line or Cygwin, there is a GUI based SVN client called Tortoise SVN that provides convenient context menu access to your repo.

For Debian-based Linux distros such as Ubuntu, a GUI-based solution to explore would be Rapid SVN. To install:

apt-get install rapidsvn

Mac users looking into a front-end can try SC Plugin, which claims to provide similar functionality to Tortoise. Rapid SVN might work as well.

There is also a plug-in for Eclipse called Subclipse that integrates everything for you nicely.

Note that all of these options are simply graphically based alternatives to the extremely powerful terminal commands, and are by no means necessary to successfully utilize version control.


Part 4: Design Review Meeting (7 points)

We will be holding 15-minute design review meetings. Your group is expected to meet with one of the TAs or the instructor during their office hours, or with consultants during consulting hours. A sign-up schedule is available on CMS. You should be able to explain your design in about 10 minutes, leaving 5 minutes for questions. It's a good idea to practice your presentation ahead of time so you use your time effectively.

Your design review should focus on your approach to Part 1, however time permitting you may spend a few minutes at the end on Part 2.

In your design review, plan to talk about at least: