Note: Don't change the mli files!
March 30: We have changed the build system for workers. First,
Program.run no longer runs asynchronously, and instead blocks until the program terminates. Mappers/Reducers no longer obtain their input using
stdin, and instead use
Program.get_input() Similarly, when the mapper/reducer has finished constructing its list of outputs, it now "returns" them using
Program.set_output results. Lastly, shared data is now accessed using
Program.get_shared_data(). We have updated the mapper and reducer for word count.
Windows imposes a limit on the number of connections a socket can accept at too low a number for this assignment. Additionally, sockets are kept alive too long after being closed. Therefore, we have included two registry fixes for this, that you can add by simply running socket_fixes.reg. You should backup your registry before doing this.
Obtaining a channel from a socket using
Unix.in_channel_of_descr functions differently in Windows when compared to Linux/OS X. With Linux/OS X, when you close the channel, the socket is closed as well. However, when you close the channel in Windows, you must still close the socket. Therefore, whenever you close a channel, but still want to use the socket (for example, in
Worker.handle_request) you should add the following code:
let inp = Unix.in_channel_of_descr socket in ... if Sys.os_type = "Win32" then close_in_noerr inp else ()
A new release has been posted to CMS. If you have already started work, then please just copy the files over your current working directory, with the exception of the files that you have modified (master/controller.ml, worker/worker.ml, master/master.ml, shared/hashtable.ml). Then, for worker/worker.ml, copy over the new
default_includes variable. Lastly, if worker builds fail and you are running Linux/OS X, you may need to update the variable
ocaml_lib_dir in worker/program.ml to point to your OCaml lib directory.
In this problem set, you will be implementing a simplified version of Google's MapReduce. Prior to starting this problem set, you should read the short whitepaper to familiarize yourself with the basic MapReduce architecture. This writeup will assume that you have read sections 1 - 3.1 of the whitepaper.
The map and reduce functions can be defined by the following OCaml syntax:
val map : 'a -> 'b -> ('c * 'd) list val reduce : 'c -> 'd list -> 'e list
However, note that in the messaging protocol defined in shared/protocol.ml we only allow for the transmission of strings. Therefore, you must utilize OCaml's built-in marshalling and unmarshalling to transmit values of different types. (See below for more thorough explanation).
For a given MapReduce application
foo, your mapper code will be
stored in apps/foo/mapper.ml, while your reducer code will be stored in
apps/foo/reducer.ml. Both the mapper and reducer will receive
their input from
Program.get_input(), and then output their list of results using
specific mechanism for this operation will be explained below.
Execution is divided into five distinct phases:
The master application is invoked with a MapReduce application name and the name of a file containing the data to provide as input. The data in this file is parsed into (key, value) pairs to provide as input. Mappers are initialized by the master on the available worker machines.
The master application obtains a list of (key, value) pairs. Each pair is sent to an available mapper. Mappers calculate the result of mapping their (key, value) pair, and return the resulting (key, value) list, which is sent back to the master. As long as the master hasn't received the resulting mapping for a given (key, value) pair, it will send that input to the next available mapper until all pairs have been mapped.
For each key in the result of the Map, all of its values are collected into a list, thus generating a list of (key, value list) pairs. Reducers are initialized by the master on the available worker machines.
The master sends unreduced (key, value list) pairs to available reducers. Reducers calculate the result of the reduction and send it back to the master. This continues until the master received results for all pairs.
The result of the Reduce phase is collected and output in some manner.
Consider the canonical MapReduce example of counting the number of occurrences
of each word in a single document. In the Map phase, each (filename, word)
pair is mapped to (word, "1"), indicating that the word has been seen once.
Then in the Reduce phase, a reducer will receive (word, ["1", "1", ..., "1"]
as input, and sum all the ones to determine the number of times that the word
has occurred. Note that our framework only accepts a single data file as
input, however we approximate multiple files by storing the
(filename, file content) pair in the single file. See
apps/kmeans/reuters/reuters000.txt for an example of the formatting.
Files that match this format can be parsed by
Controller.load_documents. This dataset
is a collection of Reuters articles from the 1980's.
Note:We have included the mapper and reducer code for this example in apps/wordcount.
Controller.word_count filenamereads in the contents of filename, splitting it into (filename, word) pairs using
Master.map kv_pairs "" "apps/wordcount/mapper.ml"is called. The empty string contains shared data that is accessible to all mappers in addition to their input (key, value) pair. This data is accessed using
Master.mapinitializes the mappers using
Worker_managerloads in the list of available workers from the file named addresses. Each line of this file contains an worker address of the form "ip_address:port_number". Each of these workers is sent the mapper code, and sends back the id of the resulting mapper. This id is combined with the address of the worker to uniquely identify each mapper. If certain workers are unavailable, this function will output this fact, but continue to run successfully.
Master.mapthen sends individual unmapped (filename, word) pairs to available mappers until it has received results for all pairs. Free mappers are obtained using
Worker_manager.pop_mapper()and should be returned to the manager once a result has been received using
Worker_manager.push_mapper. Read master/worker_manager.mli for more complete documentation.
Controller.word_countreceives the results of the mapping, and combines the list of (word, "1") pairs into (word, ["1", ..., "1"]) pairs for each word.
Master.reduceis then called with this input, along with the name of the reducer file.
Master.reduceinitializes the reducers by calling the appropriate
Worker_managerfunction, which retrieves worker addresses in the same manner as for mappers.
Master.reducethen sends the unreduced (word, count list) pairs to available reducers until it has received results for all input. This is performed in essentially the same manner as the map phase. When all pairs have been reduced, the new list of (word, count) tuples is returned. Note that because the key value doesn't change,
Worker_manager.reduce, and the reducer workers, only calculate and return the new value (in this case count), but don't return the key (in this case word).
Controller.word_countoutputs the results.
Worker_serverreceives a connection, and spawns a thread which calls
Worker.handle_requestto handle it.
Worker.handle_requestdetermines the request type. If it is an initialization request, then the new mapper or reducer is built using
Program.build, which returns either the new worker id, or the compilation error. See program.ml for more complete documentation. This new id is returned. If it is a map or reduce request, the id is verified as referencing a valid mapper (if the request is for a map) or a valid reducer (if the request is for a reduce).
Program.runwhich loads the worker, which then obtains its input using
Program.runthen waits until the worker terminates, having set its output using
Program.set_output results. These results are then sent back to the client using
send_response. This function must be passed
Worker.send_responseso that these results can be sent.
Worker.send_responseis called, which is responsible for sending the result back to the client.
Main controller module for the master. First, it parses the commandline arguments,
and then invokes the appropriate MapReduce application by calling
Controller.app_name. Each application function in this module
is responsible for loading the input data, and mapping it using
Master.map. The results should then be output using
Controller.print_map_results. Next, the list of (key, value)
pairs should be converted into a list of (key, value list) pairs such that
value list is a list of all the values mapped to key, and then reducing this
Master.reduce. These results should be output using
This module is responsible for performing the actual map and reduce operations. A more thorough description is contained below.
This module provides the capability to initialize mappers and reducers,
select inactive workers using the
pop functions, perform a
map/reduce task using a worker using either the
reduce function, and then return workers which have completed
their task to the pool of inactive workers using the
If a request fails due to some sort of network or file descriptor error,
Worker_manager will automatically retry it up to 5
times. Therefore, it is acceptable if your workers intermittently output
such errors, as long as they are still able to service the requests.
This module provides the ability to build and run mappers/reducers.
This module is responsible for spawning threads to handle client connection
requests. These threads simply invoke
This module is responsible for managing communication between the clients and the mappers/reducers that they build and then run. A more thorough description is contained below.
The protocol that the master application and the workers use to communicate is stored in shared/protocol.ml.
InitMapper(includes, code, shared_data)
Initialize a mapper. Each element in the
code list is a line in
shared_data is accessible to all mappers, as
Initialize a reducer. Same as above, except for no shared data.
MapRequest(worker_id, key, value)
Execute mapper with id
ReduceRequest(includes, key, value list)
Execute reducer with id
(key, value list) as input.
Mapper(worker_id option, error)
Mapper(Some(id), _) indicates the requested mapper was successfully
built, and has the returned id.
Mapper(None, error) indicates
compilation of the mapper failed with the returned error.
Reducer(worker_id option, error)
Same as above, except for a reducer
Either a map request was made, and the
worker_id doesn't correspond to a
mapper, or a reduce request was made, and the
worker_id doesn't correspond
to a reducer.
Worker with id
worker_id output the runtime error(s) stored in
MapResults(worker_id, (key, value) list)
Map request sent to
worker_id resulted in output of
ReduceResults(worker_id, value list)
Reduce request sent to
worker_id resulted in output of
Marshalling and unmarshalling of most values (with the exception of functions)
is built into OCaml. Values can be converted to and from strings explicitly
Alternatively, if you are retrieving a marshalled value from a channel, you can
input_value, and if you are sending a value into a channel, you
output_value. These functions aren't typesafe, but you can
still retrieve the type information by matching on the retrieved value.
All code you submit must adhere to the specifications defined in the respective mli files.
As in the previous problem set, you must submit a design document for parts 1 and 2. Additionally, you must implement functions in the following files:
You are required to implement a hash table according to the specifications in shared/hashtable.mli. This data structure will be useful in your MapReduce implementation.
Hashtbl.hash is an acceptable hash function to
use. However, you may not use any other part of the
The code in this module is responsible for performing the actual map and reduce operations by initializing the mappers/reducers, sending them input that hasn't been mapped/reduced yet, and then returning the final list of results.
reduce functions must be implemented
according to the specifications stored in master/master.mli, with
functionality that mirrors the above example execution description. Both
functions must be able to handle both silent and explicit worker failure,
and individual worker slowness. However, you don't need to handle the case
when all workers fail when there is no coding error in your worker code
(i.e. complete network failure). Also, there should only be one active
request per worker at a time, and if a worker fails, it shouldn't be
returned to the
Worker_manager using the
Both functions share the same basic structure. First, the workers are
initialized through a call to the appropriate
function. There is a single shared instance of this
Worker_manager module, and all relevant functions are
thread-safe. Once the workers have been initialized, the input list should
be iterated over, sending each available element to a free worker, which can
be accessed using the appropriate
pop function of
Worker_manager. A mapper is invoked using
Worker_manager.map providing the mappers id, and the (key,
value) pair. Each mapper (and therefore invocation of
Worker_manager.map) receives an individual (key, value) pair, and
outputs a new (key, value) list, which is simply added onto the list of all
previous map results. A reducer is invoked using similarly using
Worker_manager.reduce. These functions block until a response
is received, so it is important that they are invoked using a thread from
Thread_pool module. Additionally, this requires
that these spawned threads store their results in a shared data structure,
which must be accessed in a thread-safe manner.
Once all of the input has been iterated over, it is important that input which remains unfinished, either due to a slow or failed worker, is re-submitted to available workers until all input has been processed. Once all input has been fully processed, the results are returned.
Note: It is acceptable to use
Thread.delay with a short
sleep period (0.1 seconds or less) when looping over unfinished data to send
to available workers in order to prevent a flooding of workers with unnecessary work.
The code in this module is responsible for handling communication between the clients that request work, and the mapper/reducers that perform the work.
send_response function must be implemented according to the
mli specifications, and the above description. Furthermore, it must be
thread-safe. Note that this function should be quite short.
handle_request function must also be implemented
according to the mli specifications and the above description, and must be
thread-safe. This function receives a connected client socket as input,
and must retrieve the
worker_request from that socket.
If the request is for a mapper or reducer initialization, then the code must be
Program.build, which returns
"") where id is the worker id if the compilation was successful,
(None, error) if the compilation wasn't successful. If the
build succeeds, the worker id should be stored in the appropriate mapper or
reducer set (depending on the initialization request), and the id should be
sent back to the client using
send_response. If the build fails,
then the error message should be sent back to the client using
If the request is to perform a map or reduce, then the worker id must
be verified to be of the correct type, by looking it up in either the mapper
or reducer set, before mapper or reducer is invoked using
Program.run. You will pass
send_response as one
of the arguments to this function, which will automatically be invoked with
either the runtime error(s) that the mapper/reducer output, or the result of the
map/reduce, as the response when the worker terminates.
Note that the mapper and reducer sets are shared between all request handler threads
spawned by the
Worker_server, and therefore access to them must
Note: the includes sent to
Program.build should be those
default_includes in worker/worker.ml, as the
master isn't sending along an includes list. Code in shared/util.ml
is accessible to all workers by default.
in_channel can be extracted from a
Unix.file_descr by using
A similar function exists for
out_channel. Remember to close
all sockets and channels when finished.
Note: Linux/Mac users should add
./ before all commands given.
foousing datafile bar
If you would like to define additional modules, please add them to the appropriate build_*.bat file.
nameis the name of algorithm). We have implemented the mapper and reducer for one of the applications, but you will have to implement the rest yourself. For each application you must also implement the code that invokes
reduceand prints the results as specified. To do this, you will implement three functions:
kmeansall located in
Given a set of documents as input, count the total number of times that each word occurs. For example, if your documents were:
ocaml is fun
because fun is a keyword
We have provided you with the mapper and reducer for word count.
Your task is to implement the function
master/controller.ml. This function takes a filename of a
document dataset (described below) as input and prints out the
number of occurrences of each word. On separate lines, print each
distinct word (in arbitrary order) followed by the number of times
Here is a basic outline of the code you must implement for
word_count: First, load the provided dataset using the
load_documents. Convert each document into a
(id, contents), where
is the document ID and
contents is the text of the
document. These key-value pairs should then be passed as input to
map function. The
map function (when
invoked with the mapper we have provided) will return a list of
key-value pairs of the form
(word, "1"), where the keys
are not necessarily distinct. Print this list of pairs using
Controller.print_map_results. From these key-value pairs, derive the
key-value pairs that you will pass to
reduce will return a list of key-value pairs such that
the distinct words are the keys and the values are a string
representation of the number of occurrences. Print this list of (key,
value list) pairs using
index is a mapping from words to the documents in which they
appear. For example, if these were your documents:
OCaml map reduce
fold filter ocamlThe inverted index would look like this:
Your task is to implement the function
take a dataset as input and use MapReduce to print an inverted index.
As in the word count application, you must print your map results using
Controller.print_map_results and your reduce results using
Be sure to describe your map and reduce steps in your overview
document. Your mapper and reducer code should go
in apps/inverted_index/mapper.ml and
apps/inverted_index/reducer.ml. Use the
Util.split_words to tokenize a string into words.
The k-means algorithm is an algorithm that partitions a set of vectors into k clusters. k-means works by finding a set of k centroids (i.e. means): one per cluster. These centroids usually are initialized arbitrarily. For each vector in the dataset, we find the centroid whose centroid it is closest to. The vector is said to belong to cluster . Once we have matched each vector with a cluster, we recompute the centroids. For each centroid, take the set of vectors that belong to it. The new position of this centroid is set to the mean of all of the vectors in . The process of updating the means is called an iteration. We repeat this process until the means converge or after a fixed number of iterations. Usually k-means converges rapidly to a local maximum. For an illustration of this algorithm, see Wikipedia.
Your task is to implement the function
kmeans takes the
filename of a dataset, a parameter k, and a maximum number of
iterations. The vectors you will cluster will be the word count
vectors from each document in the dataset. Therefore, at the end of this
process you will have clusters of similar documents. So for each document, you
will need to compute a vector mapping a word to the number of times the word
appears in the document. These vectors will be need to be sparse
vectors, because it is infeasible to store a count for every possible
word. For consistency, initialize the k centroids to be the word
vectors from the first k documents. The distance between two vectors
is simply the Euclidean distance between them. You should use
MapReduce to perform the k-means algorithm for the specified number of
iterations. Describe your map and reduce steps in your overview
After performing k-means for the specified number of iterations, you should print the contents of each cluster. For each cluster, print the ID and title of each document that belongs to it. Your output should be in the following format: For example:
Cluster 1 id1 doc1 id2 doc2 ... Cluster 2 id1 doc1 id2 doc2 ...
You are not required to use MapReduce to compute the word count vectors. Nor are you required to use MapReduce to find the documents that compose each cluster at the end of the algorithm. You also do not need to check the clusters for convergence. The reduce phase of your algorithm should support running at least workers at once. Particularly efficient implementations that can support running more than workers at once in the reduce phase will receive karma.
It may be useful to define a
Word_vector submodule in
shared/util.ml, so you can reuse this code in your
mapper, reducer, and controller. It may be helpful to implement the
algorithm without MapReduce first. Then rewrite it using MapReduce.
Note that for the above applications, your MapReduce code will probably run a lot slower than a non-MapReduce implementation of the same algorithm. This slowdown is due to the fact that the simplified MapReduce framework is not very well optimized.
Simply zip up and submit your entire ps5 folder.
Consider the following three functions, a simple implementation of merge sort. The specification for the function mergesort itself is provided; specifications for the helper functions merge and split are not.
let rec merge l1 l2 = match (l1,l2) with ,_ -> l2 | _, -> l1 | x::xs,y::ys -> if x > y then x::(merge xs l2) else y::(merge l1 ys) let rec split l = match l with  -> (,) | [x] -> ([x],) | x1::x2::xs -> let (l1,l2) = split xs in (x1::l1,x2::l2) (* Requires: true * Returns: l', such that elements(l') = elements(l), * and sorted(l') *) let rec mergesort l = match l with  ->  | [x] -> [x] | _ -> let (l1,l2) = split l in let s1 = mergesort l1 in let s2 = mergesort l2 in merge s1 s2
We will be holding 20-minute design review meetings. Your group is expected to meet with one of the TAs or the instructor during their office hours, or with consultants during consulting hours. A signup schedule is available on CMS. You should be able to explain your design in about 10 minutes, leaving 10 minutes for questions. It's a good idea to practice your presentation ahead of time so you use your time effectively.
Your design review should focus on your approach to Part 1, however time permitting you may spend a few minutes at the end on Part 2.
In your design review, plan to talk about at least:
To do: turn in the solution in the file complexity.txt.