Problem Set 5: MapReduce

Due Thursday April 11, 2013, 11:59pm

Note: Do not change the .mli files!


Last Updated: Wed Apr 10 09:22 EDT 2013

Part 1: Written Problem (5 points)

Problem Set 7 is due in 10 minutes and your group is struggling to finish on time.

Your partner Gary has given you this interface for a module he's writing. You do not have access to Gary's module, but you know that he is representing the tree as an array where the root is stored at index 0 and the left and right children of any node at index i are stored at indices 2i + 1 and 2i + 2.

In order to finish the assignment, you need a function map f t that traverses a BinaryTree t in order and applies the function f to each node. Unfortunately, Gary is taking his 2110 prelim and cannot be reached. Write the map function inside your own module using only the BinaryTree interface or write a README explaining to the course staff why this task is impossible.

module type BinaryTree = sig
  (* Represented as an array in the implementation *)
  type 'a tree

  (* [add x t] adds an element [x] to the tree [t] *)
  val add : 'a tree -gt; 'a -> 'a tree
  (* [remove x t] removes element [x] from the tree [t] *)
  val remove : 'a tree -> 'a -> 'a tree
  (* [member x t] returns [true] if [x] is an element of 
   * tree [t] and [false] otherwise *)
  val member : 'a tree -> 'a -> bool
  (* [size t] returns the number of elements in the tree [t] *)
  val size : 'a tree -> int

Your Tasks

Write your group's solution to this problem in part1.txt and submit it to the appropriate slot on CMS. We will grade these solutions by hand, so don't worry about making your solution compile!

Part 2: MapReduce (60 points)

In this problem set, you will be implementing a simplified version of Google's MapReduce. Prior to starting this problem set, read the short paper on MapReduce to familiarize yourself with the basic architecture. This writeup will assume that you have read sections 1–3.1 of that paper.


Map and Reduce Functions

The map and reduce functions have the following OCaml types:

val map : 'a -> 'b -> ('c * 'd) list  
val reduce : 'c ->> 'd list -> 'e list

These functions will be computed in a distributed fashion by several agents running concurrently. The agents communicate with each other using a messaging protocol defined in shared/ However, that protocol only allows for the transmission of strings, so you must use OCaml's built-in marshalling and unmarshalling facilities to transmit values of different types. We will explain this more thoroughly below.

For a given MapReduce application foo, the application-specific code is stored in the directory apps/foo/. For standard MapReduce applications having one map function and one reduce function, store the code for the mappers in apps/foo/ and the code for the reducers in apps/foo/ There is also a controller in the file apps/foo/, which handles initialization of and communication between mappers and reducers. This controller must be named in this manner because our application assumes this convention to dynamically load apps.

The mappers and reducers receive their input by calling Program.get_input and communicate their results by calling Program.set_output. The specific mechanisms for these operations are described below.

Basic Execution

Execution is divided into five phases:

  1. Pre-Map:

    controller.exe is invoked with the name of a MapReduce application on the command line along with other application-specific information (e.g., input file name, any other parameters as required by the application). This information is passed to main method of the controller in the application directory. This method preprocesses the program input into key-value pairs for map, and then manages the rest of the MapReduce procedure.

  2. Map

    The controller sends each (key, value) pair to an available mapper. The mapper applies the mapping function on this (key, value) pair, resulting in a list of (key, value) pairs (the types of the input and output do not have to be the same). The resulting list of (key, value) pairs is sent back to the controller. The controller continues to send inputs to the next available mapper until all pairs have been mapped.

  3. Combine

    For each key produced by the mappers, all of its corresponding values are collected into a single list, resulting in a (key, value list) pair for that key.

  4. Reduce

    The controller connects to worker servers and initializes reducers on these servers. The (key, value list) pairs produced in the Combine phase are then sent to available reducers. The reducers perform the reduce operation and send the results back to the controller.

  5. Post-Reduce

    The controller collects the results of the Reduce phase and outputs them in a suitable manner.

Client Execution Example

Consider the canonical MapReduce example of counting the number of occurrences of each word in a set of documents. The input is a set of (document id, body) pairs. In the Map phase, each word that occurs in the body of a file is mapped to the pair (word, "1"), indicating that the word has been seen once. In the Combine phase, all pairs with the same first component are collected to form the pair (word, ["1"; "1"; ...; "1"]), one such pair for each word. Then in the Reduce phase, for each word, the list of ones is summed to determine the number of occurrences of that word.

For simplicity, our framework accepts only a single data file containing all the input documents. Each document is represented in the input file as a (id, document name, body) triple. See data/reuters.txt, a collection of Reuters articles from the 1980's, or the shorter files data/test1.txt and data/test2.txt for an example of the formatting. Map_reduce.map_reduce prepares a document file by first calling Util.load_documents and then formatting the documents into (key, value) pairs for the mapper.

We have included a full implementation of this application in apps/word_count. Here is a detailed explanation of the sequence of events.

  1. The controller application is started using the command controller.exe word_count filename. It immediately calls the main method of apps/word_count/, passing it the argument list. The main method calls controller/Map_reduce.map_reduce, which is common controller code for performing a simple one-phase MapReduce on documents. Other more involved applications have their own controller code.
  2. The documents in filename are read in and parsed using Util.load_documents which splits the collection of documents into {id; title; body} triples. These triples are converted into (id, body) pairs.
  3. The controller calls kv_pairs "apps/word_count/".
  4. initializes a mapper worker manager using Worker_manager.initialize_mappers "apps/word_count/". The Worker_manager loads in the list of available workers from the file named addresses. Each line of this file contains a worker address of the form ip_address:port_number. Each of these workers is sent the mapper code. The worker creates a mapper with that code and and sends back the id of the resulting mapper. This id is combined with the address of the worker to uniquely identify that mapper. If certain workers are unavailable, this function will report this fact, but will continue to run successfully.
  5. then sends individual unmapped (id, body) pairs to available mappers until it has received results for all pairs. Free mappers are obtained using Worker_manager.pop_worker(). Mappers should be released once their results have been received using Worker_manager.push_worker. Read controller/worker_manager.mli for more complete documentation. Once all (id, body) pairs have been mapped, the new list of (word, "1") pairs is returned.
  6. Map_reduce.map_reduce receives the results of the mapping. If desired, Util.print_map_results can be called here to display the results of the Map phase for debugging purposes.
  7. The list of (word, "1") pairs is then combined into (word, ["1"; ...; "1"]) pairs for each word by calling Map_reduce.combine. The results can be displayed using Util.print_combine_results.
  8. Map_reduce.reduce is then called with the results of Map_reduce.combine and the name of the reducer file apps/word_count/
  9. Map_reduce.reduce initializes the reducer worker manager by calling the appropriate Worker_manager function, which retrieves worker addresses in the same manner as for mappers.
  10. Map_reduce.reduce then sends the unreduced (word, count list) pairs to available reducers until it has received results for all input. This is performed in essentially the same manner as the map phase. When all pairs have been reduced, the new list of (word, count) tuples is returned. In this application, the key doesn't change, so Worker_manager.reduce and the reduce workers only calculate and return the new value (in this case, count), instead of returning the key (in this case, word) and count.
  11. The results are returned to the main method of, which displays them using Util.print_reduce_results.

Worker Execution Example

  1. Multiple workers can be run from the same directory as long as they listen on different ports. A worker server is started using worker_server.exe port_number, where port_number is the port the worker listens on.
  2. Worker_server receives a connection and spawns a thread, which calls Worker.handle_request to handle it.
  3. Worker.handle_request determines the request type. If it is an initialization request, then the new mapper or reducer is built using, which returns either the new worker id or the compilation error. See worker_server/ for more complete documentation. If it is a map or reduce request, the worker id is verified as referencing a valid mapper or reducer, respectively. If the id is valid, then is called with that id and the provided input. This runs the relevant worker, which receives its input by calling Program.get_input(). Once the worker terminates, having set its output using Program.set_output, these results are returned by If the request is invalid, then the appropriate error message, as defined in shared/, is prepared.
  4. Once the results of either the build or map/reduce are completed, then Worker.send_response is called, which is responsible for sending the result back to the client. If the response is sent successfully, then Worker.handle_request simply recurses, otherwise it returns unit.

Code Structure

In this section we describe the organization of the MapReduce code.



Communication Protocol

The protocol that the controller application and the workers use to communicate is stored in shared/




Only string data can be communicated between agents. Values can be converted to and from strings explicitly using Util.marshal and Util.unmarshal, which call the OCaml built-in Marshal.to_string and Marshal.from_string functions, respectively. You can send strings via communication channels without marshalling, but other values need to be marshalled. Your mapper or reducer must also convert the input it receives from strings back to the appropriate type it can operate on, and once it has finished, it needs to convert its output into strings to communicate it back.

Note that marshalling is not type safe. OCaml cannot detect misuse of marshalled data during compilation. If you unmarshal a string and treat it as a value of any type other than the type it was marshalled as, your program will compile, run, and crash. You should therefore take particular care that the types match when marshalling/unmarshalling. Make sure that the type of marshalled input sent to your mapper matches the type that the mapper expects, and that the type of the marshalled results that the mapper sends back matches the type expected. The same is true for reducers using the reducer messages. Recall the syntax for type annotations:

let x : int = Util.unmarshal my_data
Annotated variable declarations will help pinpoint unmarshalling errors.

Your Tasks

All code you submit must adhere to the specifications defined in the respective .mli files. As always, do not change the .mli files.

You must implement functions in the following files:

Building and running your code

We have provided a Makefile and a build script build.bat that you can use to build your project.

Part 3: Using MapReduce (27 points)

You will use the simplified MapReduce that you created in part 1 to implement three applications: inverted_index, game_of_life, and dna_sequencing.

Each application will require a mapper apps/xxx/ and reducer apps/xxx/, where xxx is the name of the application, as well as a controller apps/xxx/ We have supplied a full implementation of the word_count application as an example to get you started.

Inverted Index (5 points)

An inverted index is a mapping from words to the documents in which they appear. For example, if these were your documents:

Document 1:

OCaml map reduce

Document 2:
fold filter ocaml
The inverted index would look like this:
ocaml1 2
map 1
reduce 1
fold 2
filter 2


To implement this application, you should take a dataset of documents (such as data/reuters.txt) as input and use MapReduce to produce an inverted index. This can be done in a way very similar to word_count. In particular, you can use Map_reduce.map_reduce. Print your final results using Util.print_reduce_results.

Your Tasks

Write the mapper, reducer, and controller for inverted_index. Your mapper and reducer code should go in apps/inverted_index/ and apps/inverted_index/, respectively, and your controller code should go in apps/inverted_index/

Conway's Game of Life (8 points)

Mathematician John Horton Conway, in addition to his significant contributions to theoretical Mathematics, single-handedly created the field of recreational mathematics. He invented thousands of mathematical games and published a number of books and papers on the subject. The most famous of his mathematical games is the simply titled Game of Life. A number of variations have gained popularity, but original Life is a zero-player solitaire which models a cellular automaton. Players define an initial state for the world and then the cells live and die according to a few simple rules, creating fascinating patterns and sequences. "The world" is a two-dimensional torroid (read: a matrix that wraps around at the edges) divided into grid squares. Each square is either alive or dead, and the current state of the board at any instant defines the next state of each cell/grid square. The game progresses in a series of turns, where each square of the entire grid is updated simultaneously according to the following rules:

  1. Any live cell with more than three live neighbors dies of overcrowding
  2. Any live cell with two or three live neighbors lives on to the next generation
  3. Any live cell with fewer than two live neighbors dies of loneliness
  4. Any dead cell with exactly three live neighbors becomes a live cell. It's the miracle of birth!
To reiterate: in one turn, every cell gets updated. One game is a series of turns. Each turn is like a snapshot of the world. Playing the snapshots in sequence animates life on our little planet.

App Overview

You will implement Life using the map reduce framework. Represent the board as an array of 1s and 0s, marking a live cell with a 1 and a dead cell with a 0. For I/O purposes, we will store initial states and generated games as strings, but we have supplied functions to handle the conversions from string to array for you. At a high level, your app must read in a board from a file and run for given number of turns, saving a transcript of the board state before every update. This transcript can be opened by the included GUI conway.jar so you can view your simulation. Simply double-click the jarfile or run the command java -jar conway.jar in the gui directory.

Your Tasks

Implement and the corresponding mapper and reducer. You are free to write the map and reduce functions however you like, but the controller must be implemented such that ./controller.exe game_of_life my_board.txt 10 my_board.out:

  1. Reads in the string representation of a board stored in my_board.txt as the initial state
  2. Records the board dimensions at the top of the transcript file
  3. Runs the simulation for 10 turns according to Conway's rules
  4. Records the 11 board states (intial + 10 turns) in the transcript
Note that we have handled steps 1. and 2. for you already in You may reorganize this code if you like, but it is complete. Additionally, you may find the supplied function write_epoch : int array array -> out_channel -> unit useful. It takes a board and an output channel and writes a string representation of the board to the output.

Genetic Sequence Alignment (14 points)

In this exercise you will implement the computationally intensive part of one variant of genetic sequence alignment. Sequence alignment is a collection of problems related to finding approximate matches between two (or more) sequences of genetic information. This exercise was inspired by this paper which you are welcome to read, though you won't need to in order complete this application.

For our purposes, DNA sequences are simply strings of the letters {G,C,A,T}. The context for our problem is that we have some piece of DNA (which we will call the 'sample') that we would like to analyze to get its exact sequence of letters. Unfortunately, it is quite hard to directly read the sequence of letters from a long piece of DNA. Don't ask me why; I'm not a biochemist.

What the biochemists can do is make a ton of copies of the sample DNA and then randomly chop up all the copies into relatively short pieces (on the order of 100 letters long). They call these pieces 'short reads' or just 'reads'. Now we have a puzzle on our hands. How can we put the short reads back together to recreate the whole sample? Without any more information, this is a very challenging problem.1 However, if we have a reference sequence that we expect to be highly similar to the sample sequence, we can use it to combine the short reads. This solution is commonly used in practice; for example, a reference sequence for one set of short reads might come from a different individual of the same species.

Your goal in this section is to implement (most of) the computationally intensive part of putting the short reads back together as 2 map-reduce passes.2 At the end of the 2 passes, your application should identify all the pieces of short reads that appear exactly somewhere in the reference sequence. For example with this reference sequence:

            1         2         3         4
And these short reads:
            1         2
  ATGCAAAATACGTATT        length 16 match at index  7 of reference 1 and index 0 of read
  GTCCACCCTCGGA           length 11 match at index 30 of reference 1 and index 2 of read
  CGTATTTGTACATCCACCCTCGG length 11 match at index 17 of reference 1 and index 0 of read
                          length 11 match at index 29 of reference 1 and index 12 of read


  1. The input files will be a sequence of lines, where each line looks like:
    ID is a unique number. This format is set up so that the utility function load_documents is reasonably useful.
  2. First Map/Reduce Step. Chop up all the short reads and the reference sequence into even shorter "k-mers". A "mer" is simply a short sequence of DNA information; for example "AGTTCA" is a 6-mer. The input to each map instance should be either one short read or the reference sequence.
  3. Second Map/Reduce Step. Coalesce runs of adjacent shared k-mers into longer "seeds". In this step we do not tolerate any mismatched letters. If we have two shared mers from the previous step whose index is off by one, we merge them into a single larger shared mer.

The result of these three map-reduce steps is a set of aligned short reads (with at most a small number of mismatched letters) which can be stitched together into a full sequence. Don't worry about producing the final combination. If you get to this step with a set of aligned short reads, throw up the victory flags and congratulate yourself!

Your Tasks

Implement the two-stage dna sequencing application according to the above specification. We have supplied you with a complete controller script, so all you need to do is implement the mappers and reducers Enjoy!

1. This short read alignment problem without any more information is exactly what researchers looking at genetic information from uncharted places like the deep sea have to deal with.
2. It is actually common to use several reference sequences and reads from several samples, but we will stick with one to keep things simple.

Note that for the above applications, your MapReduce code will probably run a lot slower than a non-MapReduce implementation of the same algorithm. This slowdown is due to the fact that there is a lot of overhead and the simplified MapReduce framework is not very well optimized. Performance gains are only realized when doing this on a massive scale.

To submit

Simply zip up and submit your entire ps5 folder. Be sure to double-check your app names and delete all compiled files before submitting. In particular, remove the working directories generated by the worker server. If you have GNU make installed, simply run make clean and you'll be ready to zip and submit. Otherwise, use the script clean.bat and zip what's left. If you're paranoid about build scripts and want to prepare everything by hand, then delete those *.cmo & *.cma files and the worker_XXXXX directories generated by the worker_server manually. Be careful not to delete the worker_server directory, though!

Part 4: Source Control (3 points)

You are required to use a source control system like Git or SVN. Submit the log file that describes your activity. We will provide you with an SVN repository, hosted by CSUG, or you may use a private repository from a provider like xp-dev or bitbucket. github. Do not post your code in a public repository. That would be an academic integrity violation.

For information on how to get started with your provided SVN account, read Using Subversion in the CSUGLab.

Note: Your repository name is cs3110_<netid(s)>. For example, cs3110_njl36, or if you have a partner: cs3110_dck10_njl36. Notice that the netids of groups are in alphabetical order. This repository name is what you put in place of project1 in the directions in the provided link.

If you use Windows, and are unfamiliar with the command line or Cygwin, there is a GUI based SVN client called Tortoise SVN that provides convenient context menu access to your repo.

For Debian/Ubuntu, a GUI-based solution to explore would be Rapid SVN. To install:

apt-get install rapidsvn

Mac users looking into a front-end can try SC Plugin, which claims to provide similar functionality to Tortoise. Rapid SVN might work as well.

There is also a plug-in for Eclipse called Subclipse that integrates everything for you nicely.

Note that all of these options are simply graphical mimics of the extremely powerful terminal commands, and are by no means necessary to successfully utilize version control.

Part 5: Design Review Meeting (5 points)

We will be holding 15-minute design review meetings. Your group is expected to meet with one of the course staff during their office hours to discuss this assignment. A sign-up schedule is available on CMS.

Be prepared to give a short presentation on how you and your partner plan to implement MapReduce, including how you will divide the work, and bring any questions you may have concerning the assignment. Staff members will ask questions to gauge your understanding of varying aspects of the assignment, ranging from the high-level outline to specific design pitfalls. This is not a quiz; you are not expected to be familiar with the intricacies of the project during your design review. Rather, you are expected to have outlined your design and prepared thoughtful questions. Your design review should focus on your approach toward implementing the MapReduce framework, specifically controller/, but you may spend a few minutes at the end discussing the MapReduce applications.

Design reviews are for your benefit. Meet with your partner prior to the review and spend time outlining and implementing MapReduce. This is a large and difficult assignment. The review is an opportunity to ensure your group understands the tasks involved and has a feasible design early on.