Note: Don't change the mli files!
March 30: We have changed the build system for workers. First, Program.run
no longer runs asynchronously, and instead blocks until the program terminates. Mappers/Reducers no longer obtain their input using stdin
, and instead use Program.get_input()
Similarly, when the mapper/reducer has finished constructing its list of outputs, it now "returns" them using Program.set_output results
. Lastly, shared data is now accessed using Program.get_shared_data()
. We have updated the mapper and reducer for word count.
Windows imposes a limit on the number of connections a socket can accept at too low a number for this assignment. Additionally, sockets are kept alive too long after being closed. Therefore, we have included two registry fixes for this, that you can add by simply running socket_fixes.reg. You should backup your registry before doing this.
Obtaining a channel from a socket using Unix.in_channel_of_descr
functions differently in Windows when compared to Linux/OS X. With Linux/OS X, when you close the channel, the socket is closed as well. However, when you close the channel in Windows, you must still close the socket. Therefore, whenever you close a channel, but still want to use the socket (for example, in Worker.handle_request
) you should add the following code:
let inp = Unix.in_channel_of_descr socket in ... if Sys.os_type = "Win32" then close_in_noerr inp else ()
A new release has been posted to CMS. If you have already started work, then please just copy the files over your current working directory, with the exception of the files that you have modified (master/controller.ml, worker/worker.ml, master/master.ml, shared/hashtable.ml). Then, for worker/worker.ml, copy over the new default_includes
variable. Lastly, if worker builds fail and you are running Linux/OS X, you may need to update the variable ocaml_lib_dir
in worker/program.ml to point to your OCaml lib directory.
In this problem set, you will be implementing a simplified version of Google's MapReduce. Prior to starting this problem set, you should read the short whitepaper to familiarize yourself with the basic MapReduce architecture. This writeup will assume that you have read sections 1 - 3.1 of the whitepaper.
The map and reduce functions can be defined by the following OCaml syntax:
val map : 'a -> 'b -> ('c * 'd) list val reduce : 'c -> 'd list -> 'e list
However, note that in the messaging protocol defined in shared/protocol.ml we only allow for the transmission of strings. Therefore, you must utilize OCaml's built-in marshalling and unmarshalling to transmit values of different types. (See below for more thorough explanation).
For a given MapReduce application foo
, your mapper code will be
stored in apps/foo/mapper.ml, while your reducer code will be stored in
apps/foo/reducer.ml. Both the mapper and reducer will receive
their input from Program.get_input()
, and then output their list of results using Program.set_output results
.
specific mechanism for this operation will be explained below.
Execution is divided into five distinct phases:
The master application is invoked with a MapReduce application name and the name of a file containing the data to provide as input. The data in this file is parsed into (key, value) pairs to provide as input. Mappers are initialized by the master on the available worker machines.
The master application obtains a list of (key, value) pairs. Each pair is sent to an available mapper. Mappers calculate the result of mapping their (key, value) pair, and return the resulting (key, value) list, which is sent back to the master. As long as the master hasn't received the resulting mapping for a given (key, value) pair, it will send that input to the next available mapper until all pairs have been mapped.
For each key in the result of the Map, all of its values are collected into a list, thus generating a list of (key, value list) pairs. Reducers are initialized by the master on the available worker machines.
The master sends unreduced (key, value list) pairs to available reducers. Reducers calculate the result of the reduction and send it back to the master. This continues until the master received results for all pairs.
The result of the Reduce phase is collected and output in some manner.
Consider the canonical MapReduce example of counting the number of occurrences
of each word in a single document. In the Map phase, each (filename, word)
pair is mapped to (word, "1"), indicating that the word has been seen once.
Then in the Reduce phase, a reducer will receive (word, ["1", "1", ..., "1"]
as input, and sum all the ones to determine the number of times that the word
has occurred. Note that our framework only accepts a single data file as
input, however we approximate multiple files by storing the
(filename, file content) pair in the single file. See
apps/kmeans/reuters/reuters000.txt for an example of the formatting.
Files that match this format can be parsed by
Controller.load_documents
. This dataset
is a collection of Reuters articles from the 1980's.
Note:We have included the mapper and reducer code for this example in apps/wordcount.
Controller.word_count filename
reads in the contents of
filename, splitting it into (filename, word) pairs using
load_documents
.
Master.map kv_pairs "" "apps/wordcount/mapper.ml"
is called. The empty string contains shared data that is accessible to
all mappers in addition to their input (key, value) pair. This data is accessed using Program.get_shared_data()
.
Master.map
initializes the mappers using
Worker_manager.initialize_mappers
.
Worker_manager
loads in the list of available workers from
the file named addresses. Each line of this file contains an
worker address of the form "ip_address:port_number". Each of these
workers is sent the mapper code, and sends back the id of the resulting
mapper. This id is combined with the address of the worker to uniquely
identify each mapper. If certain workers are unavailable, this function
will output this fact, but continue to run successfully.
Master.map
then sends individual unmapped (filename, word)
pairs to available mappers until it has received results for all pairs. Free
mappers are obtained using Worker_manager.pop_mapper()
and
should be returned to the manager once a result has been received using
Worker_manager.push_mapper
. Read
master/worker_manager.mli for more complete documentation. Controller.word_count
receives the results of the mapping,
and combines the list of (word, "1") pairs into (word, ["1", ..., "1"])
pairs for each word.Master.reduce
is then called with this
input, along with the name of the reducer file.
Master.reduce
initializes the reducers by calling the
appropriate Worker_manager
function, which retrieves worker
addresses in the same manner as for mappers.
Master.reduce
then sends the unreduced (word, count list)
pairs to available reducers until it has received results for all input.
This is performed in essentially the same manner as the map phase. When
all pairs have been reduced, the new list of (word, count) tuples is
returned. Note that because the key value doesn't change,
Worker_manager.reduce
, and the reducer workers, only
calculate and return the new value (in this case count), but don't return
the key (in this case word).
Controller.word_count
outputs the results.
Worker_server
receives a connection, and spawns a thread which
calls Worker.handle_request
to handle it.
Worker.handle_request
determines the request type. If it is an
initialization request, then the new mapper or reducer is built using
Program.build
, which returns either the new worker id, or
the compilation error. See program.ml for more complete
documentation. This new id is returned. If it is a map or reduce request,
the id is verified as referencing a valid mapper (if the request is for
a map) or a valid reducer (if the request is for a reduce).
Program.run
which loads the worker, which then obtains its input using Program.get_input ()
. Program.run
then waits until the worker terminates, having set its output using Program.set_output results
. These results are then sent back to the client using send_response
. This function must be passed
Worker.send_response
so that these results can be sent.
Worker.send_response
is called, which is responsible for
sending the result back to the client.
Main controller module for the master. First, it parses the commandline arguments,
and then invokes the appropriate MapReduce application by calling
Controller.app_name
. Each application function in this module
is responsible for loading the input data, and mapping it using
Master.map
. The results should then be output using
Controller.print_map_results
. Next, the list of (key, value)
pairs should be converted into a list of (key, value list) pairs such that
value list is a list of all the values mapped to key, and then reducing this
data using Master.reduce
. These results should be output using
Controller.print_reduce_results
.
This module is responsible for performing the actual map and reduce operations. A more thorough description is contained below.
This module provides the capability to initialize mappers and reducers,
select inactive workers using the pop
functions, perform a
map/reduce task using a worker using either the map
or
reduce
function, and then return workers which have completed
their task to the pool of inactive workers using the push
functions.
If a request fails due to some sort of network or file descriptor error,
then the Worker_manager
will automatically retry it up to 5
times. Therefore, it is acceptable if your workers intermittently output
such errors, as long as they are still able to service the requests.
This module provides the ability to build and run mappers/reducers.
This module is responsible for spawning threads to handle client connection
requests. These threads simply invoke Worker.handle_request
.
This module is responsible for managing communication between the clients and the mappers/reducers that they build and then run. A more thorough description is contained below.
The protocol that the master application and the workers use to communicate is stored in shared/protocol.ml.
InitMapper(includes, code, shared_data)
Initialize a mapper. Each element in the code
list is a line in
mapper.ml. shared_data
is accessible to all mappers, as
described above.
InitReducer(includes, code)
Initialize a reducer. Same as above, except for no shared data.
MapRequest(worker_id, key, value)
Execute mapper with id worker_id
with (key, value)
as input.
ReduceRequest(includes, key, value list)
Execute reducer with id worker_id
with
(key, value list)
as input.
Mapper(worker_id option, error)
Mapper(Some(id), _)
indicates the requested mapper was successfully
built, and has the returned id. Mapper(None, error)
indicates
compilation of the mapper failed with the returned error.
Reducer(worker_id option, error)
Same as above, except for a reducer
InvalidWorker(worker_id)
Either a map request was made, and the worker_id
doesn't correspond to a
mapper, or a reduce request was made, and the worker_id
doesn't correspond
to a reducer.
RuntimeError(worker_id, error)
Worker with id worker_id
output the runtime error(s) stored in
error
.
MapResults(worker_id, (key, value) list)
Map request sent to worker_id
resulted in output of (key,
value) list
.
ReduceResults(worker_id, value list)
Reduce request sent to worker_id
resulted in output of
value list
.
Marshalling and unmarshalling of most values (with the exception of functions)
is built into OCaml. Values can be converted to and from strings explicitly
using Marshal.to_string
and Marshal.from_string
.
Alternatively, if you are retrieving a marshalled value from a channel, you can
use input_value
, and if you are sending a value into a channel, you
can use output_value
. These functions aren't typesafe, but you can
still retrieve the type information by matching on the retrieved value.
All code you submit must adhere to the specifications defined in the respective mli files.
As in the previous problem set, you must submit a design document for parts 1 and 2. Additionally, you must implement functions in the following files:
You are required to implement a hash table according to the specifications in shared/hashtable.mli. This data structure will be useful in your MapReduce implementation.
Note: Hashtbl.hash
is an acceptable hash function to
use. However, you may not use any other part of the Hashtbl
module.
The code in this module is responsible for performing the actual map and reduce operations by initializing the mappers/reducers, sending them input that hasn't been mapped/reduced yet, and then returning the final list of results.
The map
and reduce
functions must be implemented
according to the specifications stored in master/master.mli, with
functionality that mirrors the above example execution description. Both
functions must be able to handle both silent and explicit worker failure,
and individual worker slowness. However, you don't need to handle the case
when all workers fail when there is no coding error in your worker code
(i.e. complete network failure). Also, there should only be one active
request per worker at a time, and if a worker fails, it shouldn't be
returned to the Worker_manager
using the push
function.
Both functions share the same basic structure. First, the workers are
initialized through a call to the appropriate Worker_manager
function. There is a single shared instance of this
Worker_manager
module, and all relevant functions are
thread-safe. Once the workers have been initialized, the input list should
be iterated over, sending each available element to a free worker, which can
be accessed using the appropriate pop
function of
Worker_manager
. A mapper is invoked using
Worker_manager.map
providing the mappers id, and the (key,
value) pair. Each mapper (and therefore invocation of
Worker_manager.map
) receives an individual (key, value) pair, and
outputs a new (key, value) list, which is simply added onto the list of all
previous map results. A reducer is invoked using similarly using
Worker_manager.reduce
. These functions block until a response
is received, so it is important that they are invoked using a thread from
the included Thread_pool
module. Additionally, this requires
that these spawned threads store their results in a shared data structure,
which must be accessed in a thread-safe manner.
Once all of the input has been iterated over, it is important that input which remains unfinished, either due to a slow or failed worker, is re-submitted to available workers until all input has been processed. Once all input has been fully processed, the results are returned.
Note: It is acceptable to use Thread.delay
with a short
sleep period (0.1 seconds or less) when looping over unfinished data to send
to available workers in order to prevent a flooding of workers with unnecessary work.
The code in this module is responsible for handling communication between the clients that request work, and the mapper/reducers that perform the work.
The send_response
function must be implemented according to the
mli specifications, and the above description. Furthermore, it must be
thread-safe. Note that this function should be quite short.
The handle_request
function must also be implemented
according to the mli specifications and the above description, and must be
thread-safe. This function receives a connected client socket as input,
and must retrieve the worker_request
from that socket.
If the request is for a mapper or reducer initialization, then the code must be
built using Program.build
, which returns (Some(id),
"")
where id is the worker id if the compilation was successful,
or (None, error)
if the compilation wasn't successful. If the
build succeeds, the worker id should be stored in the appropriate mapper or
reducer set (depending on the initialization request), and the id should be
sent back to the client using send_response
. If the build fails,
then the error message should be sent back to the client using
send_request
.
If the request is to perform a map or reduce, then the worker id must
be verified to be of the correct type, by looking it up in either the mapper
or reducer set, before mapper or reducer is invoked using
Program.run
. You will pass send_response
as one
of the arguments to this function, which will automatically be invoked with
either the runtime error(s) that the mapper/reducer output, or the result of the
map/reduce, as the response when the worker terminates.
Note that the mapper and reducer sets are shared between all request handler threads
spawned by the Worker_server
, and therefore access to them must
be thread-safe.
Note: the includes sent to Program.build
should be those
stored in default_includes
in worker/worker.ml, as the
master isn't sending along an includes list. Code in shared/util.ml
is accessible to all workers by default.
Note: An in_channel
can be extracted from a
Unix.file_descr
by using Unix.in_channel_of_decr
.
A similar function exists for out_channel
. Remember to close
all sockets and channels when finished.
Note: Linux/Mac users should add ./
before all commands given.
foo
using datafile bar
port_number
for requests
If you would like to define additional modules, please add them to the appropriate build_*.bat file.
apps/name/mapper.ml
and apps/name/reducer.ml
(where name
is the name of algorithm). We have
implemented the mapper and reducer for one of the applications, but
you will have to implement the rest yourself.
For each application you must also implement the code that invokes
map
and reduce
and prints the results as
specified. To do this, you will implement three functions:
word_count
, inverted_index
, and
kmeans
all located in master/controller.ml
.
Given a set of documents as input, count the total number of times that each word occurs. For example, if your documents were:
ocaml is fun
because fun is a keyword
word | count |
---|---|
ocaml | 1 |
is | 2 |
fun | 2 |
because | 1 |
a | 1 |
keyword | 1 |
We have provided you with the mapper and reducer for word count.
Your task is to implement the function word_count
in master/controller.ml
. This function takes a filename of a
document dataset (described below) as input and prints out the
number of occurrences of each word. On separate lines, print each
distinct word (in arbitrary order) followed by the number of times
it occurred.
Here is a basic outline of the code you must implement for
word_count
: First, load the provided dataset using the
function load_documents
. Convert each document into a
key-value pair (id, contents)
, where id
is the document ID and contents
is the text of the
document. These key-value pairs should then be passed as input to
the map
function. The map
function (when
invoked with the mapper we have provided) will return a list of
key-value pairs of the form (word, "1")
, where the keys
are not necessarily distinct. Print this list of pairs using
Controller.print_map_results
. From these key-value pairs, derive the
key-value pairs that you will pass to reduce
. Finally,
reduce
will return a list of key-value pairs such that
the distinct words are the keys and the values are a string
representation of the number of occurrences. Print this list of (key,
value list) pairs using Controller.print_reduce_results
.
An inverted
index is a mapping from words to the documents in which they
appear. For example, if these were your documents:
Document 1:
OCaml map reduce
fold filter ocamlThe inverted index would look like this:
word | document |
---|---|
ocaml | 1 2 |
map | 1 |
reduce | 1 |
fold | 2 |
filter | 2 |
Your task is to implement the function inverted_index
in
master/controller.ml. inverted_index
should
take a dataset as input and use MapReduce to print an inverted index.
As in the word count application, you must print your map results using
Controller.print_map_results
and your reduce results using
Controller.print_reduce_results
.
Be sure to describe your map and reduce steps in your overview
document. Your mapper and reducer code should go
in apps/inverted_index/mapper.ml and
apps/inverted_index/reducer.ml. Use the
function Util.split_words
to tokenize a string into words.
The k-means algorithm is an algorithm that partitions a set of vectors into k clusters. k-means works by finding a set of k centroids (i.e. means): one per cluster. These centroids usually are initialized arbitrarily. For each vector in the dataset, we find the centroid whose centroid it is closest to. The vector is said to belong to cluster . Once we have matched each vector with a cluster, we recompute the centroids. For each centroid, take the set of vectors that belong to it. The new position of this centroid is set to the mean of all of the vectors in . The process of updating the means is called an iteration. We repeat this process until the means converge or after a fixed number of iterations. Usually k-means converges rapidly to a local maximum. For an illustration of this algorithm, see Wikipedia.
Your task is to implement the function
kmeans
in
master/controller.ml. kmeans
takes the
filename of a dataset, a parameter k, and a maximum number of
iterations. The vectors you will cluster will be the word count
vectors from each document in the dataset. Therefore, at the end of this
process you will have clusters of similar documents. So for each document, you
will need to compute a vector mapping a word to the number of times the word
appears in the document. These vectors will be need to be sparse
vectors, because it is infeasible to store a count for every possible
word. For consistency, initialize the k centroids to be the word
vectors from the first k documents. The distance between two vectors
is simply the Euclidean distance between them. You should use
MapReduce to perform the k-means algorithm for the specified number of
iterations. Describe your map and reduce steps in your overview
document.
After performing k-means for the specified number of iterations, you should print the contents of each cluster. For each cluster, print the ID and title of each document that belongs to it. Your output should be in the following format: For example:
Cluster 1 id1 doc1 id2 doc2 ... Cluster 2 id1 doc1 id2 doc2 ...
You are not required to use MapReduce to compute the word count vectors. Nor are you required to use MapReduce to find the documents that compose each cluster at the end of the algorithm. You also do not need to check the clusters for convergence. The reduce phase of your algorithm should support running at least workers at once. Particularly efficient implementations that can support running more than workers at once in the reduce phase will receive karma.
It may be useful to define a Word_vector
submodule in
shared/util.ml, so you can reuse this code in your
mapper, reducer, and controller. It may be helpful to implement the
algorithm without MapReduce first. Then rewrite it using MapReduce.
Note that for the above applications, your MapReduce code will probably run a lot slower than a non-MapReduce implementation of the same algorithm. This slowdown is due to the fact that the simplified MapReduce framework is not very well optimized.
Simply zip up and submit your entire ps5 folder.
Consider the following three functions, a simple implementation of merge sort. The specification for the function mergesort itself is provided; specifications for the helper functions merge and split are not.
let rec merge l1 l2 = match (l1,l2) with [],_ -> l2 | _,[] -> l1 | x::xs,y::ys -> if x > y then x::(merge xs l2) else y::(merge l1 ys) let rec split l = match l with [] -> ([],[]) | [x] -> ([x],[]) | x1::x2::xs -> let (l1,l2) = split xs in (x1::l1,x2::l2) (* Requires: true * Returns: l', such that elements(l') = elements(l), * and sorted(l') *) let rec mergesort l = match l with [] -> [] | [x] -> [x] | _ -> let (l1,l2) = split l in let s1 = mergesort l1 in let s2 = mergesort l2 in merge s1 s2
We will be holding 20-minute design review meetings. Your group is expected to meet with one of the TAs or the instructor during their office hours, or with consultants during consulting hours. A signup schedule is available on CMS. You should be able to explain your design in about 10 minutes, leaving 10 minutes for questions. It's a good idea to practice your presentation ahead of time so you use your time effectively.
Your design review should focus on your approach to Part 1, however time permitting you may spend a few minutes at the end on Part 2.
In your design review, plan to talk about at least:
To do: turn in the solution in the file complexity.txt.