Note: Don't change the mli files!
None
In this problem set, you will be implementing a simplified version of Google's MapReduce. Prior to starting this problem set, you should read the short paper on MapReduce to familiarize yourself with the basic MapReduce architecture. This writeup will assume that you have read sections 1–3.1 of that paper.
The map and reduce functions have the following OCaml types:
val map : 'a -> 'b -> ('c * 'd) list val reduce : 'c -> 'd list -> 'e list
However, in the messaging protocol defined in shared/protocol.ml we only allow for the transmission of strings. Therefore, you must utilize OCaml's built-in marshalling and unmarshalling to transmit values of different types. We will explain this more thoroughly below.
For a given MapReduce application foo
, your mapper code will be
stored in apps/foo/mapper.ml, while your reducer code will be stored in
apps/foo/reducer.ml. Both the mapper and reducer will receive
their input from Program.get_input()
and output their
results using Program.set_output results
. The specific mechanisms
for these operations are described below.
Execution is divided into five phases:
The controller is invoked with the name of a MapReduce application and the name of a file containing the input data. It reads the data from this file and parses it into a list of (key, value) pairs to provide as input to the mappers. It then connects to worker servers and initializes mappers on these servers.
The controller sends each (key, value) pair to an available mapper. The mapper calculates the mapping function on this (key, value) pair, resulting in a list of (key, value) pairs (the types of the input and output do not have to be the same). The resulting list of (key, value) pairs is sent back to the controller application. The controller continues to send inputs to the next available mapper until all pairs have been mapped.
For each key produced by the mappers, all of its corresponding values are collected into a single list, resulting in a (key, value list) pair for that key.
The controller connects to worker servers and initializes reducers on these servers. The (key, value list) pairs produced in the Combine phase are then sent to available reducers. The reducers perform the reduce operation and send the results back to the controller. This continues until all input has been reduced.
The results of the Reduce phase are collected and output in some manner.
Consider the canonical MapReduce example of counting the number of occurrences of each word in a set of documents. The input is a set of (document id, contents) pairs. In the Map phase, each word that occurs in the contents of a file is mapped to the pair (word, "1"), indicating that the word has been seen once. In the Combine phase, all pairs with the same first component are collected to form the pair (word, ["1"; "1"; ...; "1"]), one such pair for each word. Then in the Reduce phase, for each word, the list of ones is summed to determine the number of occurrences of that word.
For simplicity, our framework accepts only a single data file containing all the
input data. However, we simulate multiple files by storing several
(id, document name, contents) triples in the input file. See
data/reuters/reuters000.txt for an example of the formatting.
Files that match this format can be parsed by
Controller.load_documents
. This dataset
is a collection of Reuters articles from the 1980's.
Note: We have included the full implementation for this example.
controller.exe word_count filename
.
Controller.word_count filename
reads in the contents of
filename using Controller.load_documents
, and splits the
resulting collection of documents into (document id, contents) pairs.
Then, the controller calls
Map_reduce.map kv_pairs "" "apps/word_count/mapper.ml"
.
The second argument represents shared data that is accessible to
all mappers in addition to their input (key, value) pair. For this application,
there is no shared data, so this argument is the empty string. In general,
the shared data can be accessed by a mapper using Program.get_shared_data()
.
Map_reduce.map
initializes a mapper worker manager using
Worker_manager.initialize_mappers "apps/word_count/mapper.ml" ""
.
The Worker_manager
loads in the list of available workers from
the file named addresses. Each line of this file contains an
worker address of the form ip_address:port_number. Each of these
workers is sent the mapper code, and sends back the id of the resulting
mapper. This id is combined with the address of the worker to uniquely
identify each mapper. If certain workers are unavailable, this function
will report this fact, but will continue to run successfully.
Map_reduce.map
then sends individual unmapped (id, contents)
pairs to available mappers until it has received results for all pairs. Free
mappers are obtained using Worker_manager.pop_worker()
. Mappers
should be released once their results have been received using
Worker_manager.push_worker
. Read
controller/worker_manager.mli for more complete documentation.
Once all (id, contents) pairs have been mapped, the new list of (word, "1")
pairs is returned.
Controller.word_count
receives the results of the mapping.
Controller.print_map_results
is called to display the results.
Map_reduce.combine
. The results
are then displayed using Controller.print_combine_results
.
Map_reduce.reduce
is then called with the results of
Map_reduce.combine
, along with the name of the reducer file.
Map_reduce.reduce
initializes the reducer worker manager by calling the
appropriate Worker_manager
function, which retrieves worker
addresses in the same manner as for mappers.
Map_reduce.reduce
then sends the unreduced (word, count list)
pairs to available reducers until it has received results for all input.
This is performed in essentially the same manner as the map phase. When
all pairs have been reduced, the new list of (word, count) tuples is
returned. In this application, because the key value doesn't change,
Worker_manager.reduce
and the reduce workers only
calculate and return the new value (in this case, count), but don't return
the key (in this case, word).
Controller.word_count
displays
the results using Controller.print_reduce_results
.
Worker_server
receives a connection and spawns a thread, which
calls Worker.handle_request
to handle it.
Worker.handle_request
determines the request type. If it is an
initialization request, then the new mapper or reducer is built using
Program.build
, which returns either the new worker id or
the compilation error. See worker_server/program.ml for more complete
documentation. If it is a map or reduce request,
the worker id is verified as referencing a valid mapper or reducer, respectively.
If the id is valid, then Program.run
is called with that id and
the provided input. This runs the relevant worker, which receives its input
by calling Program.get_input()
. Once the worker terminates, having
set its output using Program.set_output
, these results are returned
by Program.run
. If the request is invalid, then the appropriate
error message, as defined in shared/protocol.ml, is prepared.
Worker.send_response
is called, which is responsible for
sending the result back to the client. If the response is sent successfully,
then Worker.handle_request
simply recurses, otherwise it returns
a unit.
This is the main controller module. First, it parses the command line arguments,
then invokes the appropriate MapReduce application by calling
Controller.app_name
. Each application function in this module
is responsible for loading the input data and mapping it using
Map_reduce.map
. The results should then be output using
Controller.print_map_results
. Next, the list of (key, value)
pairs are combined using Map_reduce.combine
. The combined values
are then output using Controller.print_combine_results
, and then reduced
using Map_reduce.reduce
. These results are output using
Controller.print_reduce_results
.
This module is responsible for performing the actual map, combine, and reduce operations. A more thorough description is given below.
This module provides the capability to initialize mappers and reducers,
select inactive workers using the pop
function, assign a
map/reduce task to a worker using either the map
or
reduce
function, and return workers that have completed
their task to the pool of inactive workers using the push
function.
If a request fails due to some sort of network or file descriptor error,
then the Worker_manager
will automatically retry a fixed number
of times. It is therefore acceptable for workers to intermittently output
errors as long as they are still able to service requests.
This module provides the ability to build and run mappers/reducers.
This module is responsible for spawning threads to handle client connection
requests. These threads simply invoke Worker.handle_request
.
This module is responsible for managing communication between the clients and the mappers/reducers that they build and then run. A more thorough description is contained below.
The protocol that the controller application and the workers use to communicate is stored in shared/protocol.ml.
InitMapper(code, shared_data)
Initialize a mapper. Each element in the code
list is a line in
mapper.ml. shared_data
is accessible to all mappers, as
described above.
InitReducer(code)
Initialize a reducer. Same as above, except for no shared data.
MapRequest(worker_id, key, value)
Execute mapper with id worker_id
with (key, value)
as input.
ReduceRequest(worker_id, key, value list)
Execute reducer with id worker_id
with
(key, value list)
as input.
Mapper(worker_id option, error)
Mapper(Some id, _)
indicates the requested mapper was successfully
built, and has the returned id. Mapper(None, error)
indicates
compilation of the mapper failed with the returned error.
Reducer(worker_id option, error)
Same as above, except for a reducer.
InvalidWorker(worker_id)
Either a map request was made, and the worker_id
doesn't correspond to a
mapper, or a reduce request was made, and the worker_id
doesn't correspond
to a reducer.
RuntimeError(worker_id, error)
Worker with id worker_id
output the runtime error(s) stored in
error
. Also used to indicate that the worker didn't return any
output, meaning Program.run
returned None
.
MapResults(worker_id, (key, value) list)
Map request sent to worker_id
resulted in output of (key,
value) list
.
ReduceResults(worker_id, value list)
Reduce request sent to worker_id
resulted in output of
value list
.
Marshalling and unmarshalling of most values (with the exception of functions)
is built into OCaml. Values can be converted to and from strings explicitly
using Marshal.to_string
and Marshal.from_string
.
If the values you are sending aren't strings, then you will need to use
these functions to convert them to and from strings in order to use the relevant
Map_reduce
functions. Note that your worker must also convert
the input it receives from strings back to the type it can operate on. Once
it has finished, it needs to convert its output back into strings.
Make sure that the type of input that your mapper is expecting matches the type
of input sent in the Mapper
protocol message and that the type
of output matches the type sent in the MapResults
protocol message. The same is true for reducers using the reducer
messages.
These functions are not type safe, but you can still retrieve the type information by matching on the retrieved value. Note that due to the lack of type safety, if you unmarshal a string and treat it as a value of any type other than the type it was marshalled as, your program will most likely crash. You should therefore take particular care that the types match when marshalling/unmarshalling.
All code you submit must adhere to the specifications defined in the respective mli files.
You must implement functions in the following files:
You are required to implement a hash table according to the specifications in shared/hashtable.mli. This data structure will be useful in your MapReduce implementation. YOU ARE NOT ALLOWED TO USE THE OCAML HASHTABLE LIBRARY FUNCTIONS IN YOUR IMPLEMENTATION. The only exception is you can use Hashtbl.hash as your hashing function.
The code in this module is responsible for performing the actual map, combine, and reduce operations by initializing the mappers/reducers, sending them input that hasn't been mapped/reduced yet, and then returning the final list of results.
The map
and reduce
functions must be implemented
according to the specifications in controller/map_reduce.mli. The
functionality should mirror the above example execution description. Both
functions must make use of multiple workers simultaneously, if available,
and be able to handle both silent and explicit worker failure
and individual worker slowness. However, you don't need to handle the case
when all workers fail when there is no coding error in your worker code
(i.e. complete network failure). Also, there should only be one active
request per worker at a time, and if a worker fails, it should not be
returned to the Worker_manager
using the push_worker
function.
Both map
and reduce
share the same basic structure.
First, the workers are
initialized through a call to the appropriate Worker_manager
function. Once the workers have been initialized, the input list should
be iterated over, sending each available element to a free worker, which can
be accessed using the pop_worker
function of
Worker_manager
. A mapper is invoked using
Worker_manager.map
providing the mapper's id and the (key,
value) pair. Each mapper (and therefore invocation of
Worker_manager.map
) receives an individual (key, value) pair, and
outputs a new (key, value) list, which is simply added onto the list of all
previous map results. A reducer is invoked similarly using
Worker_manager.reduce
. These functions block until a response
is received, so it is important that they are invoked using a thread from
the included Thread_pool
module. Additionally, this requires
that these spawned threads store their results in a shared data structure,
which must be accessed in a thread-safe manner.
Once all of the input has been iterated over, it is important that any input that remains unfinished, either due to a slow or failed worker, is re-submitted to available workers until all input has been processed. Once all input has been fully processed, the results are returned.
Note: It is acceptable to use Thread.delay
with a short
sleep period (0.1 seconds or less) when looping over unfinished data to send
to available workers in order to prevent a flooding of workers with unnecessary work.
The combine
function must be implemented according to specifications,
but doesn't need to make use of any workers. It should simply combine the provided
(key, value) pair list into a list of (key, value list) pairs such that each key in
the provided list occurs exactly once in the returned list, and each value list for a given
key in the returned list contains all the values that key was mapped to in the provided list.
The code in this module is responsible for handling communication between the clients that request work and the mapper/reducers that perform the work.
The handle_request
function must be implemented
according to the mli specifications and the above description, and must be
thread-safe. This function receives a client connection as input and must retrieve
the worker_request
from that connection.
If the request is for a mapper or reducer initialization, then the code must be
built using Program.build
, which returns (Some id, "")
where
id is the worker id if the compilation was successful,
or (None, error)
if the compilation was not successful. If the
build succeeds, the worker id should be stored in the appropriate mapper or
reducer set (depending on the initialization request), and the id should be
sent back to the client using send_response
. If the build fails,
then the error message should be sent back to the client using
send_response
.
If the request is to perform a map or reduce, then the worker id must
be verified to be of the correct type by looking it up in either the mapper
or reducer set before the mapper or reducer is invoked using
Program.run
. The return value is Some v
, where v
is the output of the program, or None
if the program failed to provide
any output or generated an error.
Note that the mapper and reducer sets are shared between all request handler threads
spawned by the Worker_server
, and therefore access to them must
be thread-safe.
Note: Code in shared/util.ml is accessible to all workers by default.
Note: Linux/Mac users should add ./
before all commands given.
foo
with datafile bar.
port_number
for requests.
If you would like to define additional modules, please add them to the appropriate build_*.bat file.
You will use the simplified MapReduce that you created in part 1 to implement some algorithms.
Each application will use a mapper and reducer located in
apps/name/mapper.ml
and apps/name/reducer.ml
(where name
is the name of the application). We have
implemented the mapper and reducer for word count, but
you will have to implement the others yourself.
For each application, you must also implement the code that invokes
map
and reduce
and prints the results as
specified. To do this, you will implement three functions:
Controller.inverted_index
, Controller.page_rank
,
and Controller.kmeans
.
An inverted
index is a mapping from words to the documents in which they
appear. For example, if these were your documents:
Document 1:
OCaml map reduce
fold filter ocamlThe inverted index would look like this:
word | document |
---|---|
ocaml | 1 2 |
map | 1 |
reduce | 1 |
fold | 2 |
filter | 2 |
Your task is to implement the function inverted_index
in
controller/controller.ml. The function inverted_index
should
take a dataset as input and use MapReduce to print an inverted index.
As in the word count application, you must print your map results using
Controller.print_map_results
, your combine results using
Controller.print_combine_results
,and your reduce results using
Controller.print_reduce_results
.
Your mapper and reducer code should go
in apps/inverted_index/mapper.ml and
apps/inverted_index/reducer.ml, respectively. Use the
function Util.split_words
to tokenize a string into words.
PageRank is a link analysis algorithm, used by Google to weight the importance of different websites on the Internet. In reality, PageRank can be described as the principal eigenvector of a transition matrix. However, you will be implementing a very simplified, iterative version of PageRank. To begin running simplified PageRank, we initialize each each website with PageRank of 1/n, where n is the total number of websites. Each step of the iteration has two parts: For each website, we divide its PageRank by the number of links out of it, and send this to amount to each website linked to. Then, for all websites, we sum up all values being sent into it, thus obtaining a new PageRank for each website.
For more details on simplified PageRank, see Section 14.3 in Networks, Crowds, and Markets: Reasoning About a Highly Connected World by Cornell Professors David Easley and Jon Kleinberg (http://www.cs.cornell.edu/home/kleinber/networks-book/networks-book-ch14.pdf).
Your job is to implement the function page_rank in controller/controller.ml. The function page_rank takes the filename of a dataset, along with a number of iterations.
In order to simulate websites, we have defined a new type, website. Each line is of the format pageid@pagetitle@links. links is of the form out1,out2,...,outm, where each out is the pageid of a link out of the page. We have provided you with a function called load_websites, which reads in a file in this format. pageid is interpreted int, pagetitle is a string, and links is an int list. An example of such a file is data/websites.txt. Note that this is the example on pages 407 and 408 of the Networks book draft linked above.
After performing PageRank for the specified number of iterations, you should print the final PageRank values. We have provided you with a function print_page_ranks, which takes in a list of tuples of pageids and PageRanks, and prints them to the screen.
K-means is an iterative algorithm used in machine learning to partition a set of vectors into k clusters. Each cluster is described by a centroid vector. A vector belongs to a cluster if the centroid of that cluster is closer to it than to any other centroid (as defined by Euclidean distance). At a high level, the algorithm begins with some initial set of k centroids and updates the centroids at each iteration, eventually producing the clustering given by the final centroids.
To begin running kmeans, we first need to select our initial k centroids. This is usually done heuristically, but in this project, we will have you simply select the first k vectors to be the initial centroids. Once we have the initial centroids, we can begin our iterative algorithm. Each iteration consists of two steps:
Note: The centroids do NOT need to be vectors from the set. We just use vectors in the set as a convenient starting point.
The 2-step iterative nature of this algorithm makes it a perfect fit for MapReduce. We perform these updates either for a fixed number of iterations, or until some convergence criterion is reached (for example, no vectors change clusters). In this project, you will just run a specified number of iterations, so you do not need to worry about convergence. At the end, the clusterings from the final iteration are output.
Your task is to implement the function
kmeans
in controller/controller.ml.
The function kmeans
takes the filename of a dataset, a parameter ,
and a maximum number of iterations. The vectors you will cluster will be the word
count vectors from each document in the dataset. Therefore, at the end of this
process you will have clusters of similar documents. So for each document, you
will need to compute a vector mapping a word to the number of times the word
appears in the document. These vectors will be need to be sparse vectors, because
it is infeasible to store a count for every possible word. As described above,
initialize the centroids to be the word vectors from the first
documents. The distance between two vectors is simply the Euclidean
distance between them. You should use MapReduce to perform the k-means algorithm
for the specified number of iterations.
After performing k-means for the specified number of iterations, you should print the contents of the final clusters. You should not use the same print functions that word_count
and inverted_index
use. For each cluster, print the id and title of each document that belongs to it. Your output should be in the following format:
Cluster 1 id1 doc1 id2 doc2 ... Cluster 2 id1 doc1 id2 doc2 ...
You are not required to use MapReduce to compute the word count vectors, nor are you required to use MapReduce to find the documents that compose each cluster at the end of the algorithm. You also do not need to check the clusters for convergence. The reduce phase of your algorithm should support running at least workers at once. Particularly efficient implementations that can support running more than k workers at once in the reduce phase will receive karma.
It may be useful to define a Word_vector
submodule in shared/util.ml so you can reuse this code in your mapper, reducer, and controller. It may be helpful to implement the algorithm without MapReduce first, then rewrite it using MapReduce.
Note that for the above applications, your MapReduce code will probably run a lot slower than a non-MapReduce implementation of the same algorithm. This slowdown is due to the fact that the simplified MapReduce framework is not very well optimized.
Simply zip up and submit your entire ps5 folder.
Replace ??? in the following code to make the expression evaluate to 42.
type node = NULL | PTR of node ref
let rec f (h:node) (c:node) : int =
match c with
NULL -> failwith "No cycle."
| PTR p -> if !p == h then 1
else 1 + (f h (!p))
let zardoz:node = ???
in
(f zardoz zardoz) * 21
Put solution in zardoz.txt and upload to CMS.
You are required to use a source control system like CVS or SVN. Submit the log file that describes your activity. We will provided you with an SVN repository, hosted by CSUG.
For information on how to get started with SVN there, read Using Subversion in the CSUGLab.
Note: Your repository name is cs3110_<netid(s)>
. For example,
cs3110_njl36
, or if you have a partner: cs3110_dck10_njl36
.
Notice that the netids of groups are in alphabetical order. This repository name is what
you put in place of project1
in the directions in the provided link.
If you use Windows, and are unfamiliar with the command line or Cygwin, there is a GUI based SVN client called Tortoise SVN that provides convenient context menu access to your repo.
For Debian/Ubuntu, a GUI-based solution to explore would be Rapid SVN. To install:
apt-get install rapidsvn
Mac users looking into a front-end can try SC Plugin, which claims to provide similar functionality to Tortoise. Rapid SVN might work as well.
There is also a plug-in for Eclipse called Subclipse that integrates everything for you nicely.
Note that all of these options are simply graphically based alternatives to the extremely powerful terminal commands, and are by no means necessary to successfully utilize version control.
We will be holding 15-minute design review meetings. Your group is expected to meet with one of the TAs or the instructor during their office hours, or with consultants during consulting hours. A sign-up schedule is available on CMS. You should be able to explain your design in about 10 minutes, leaving 5 minutes for questions. It's a good idea to practice your presentation ahead of time so you use your time effectively.
Your design review should focus on your approach to Part 1, however time permitting you may spend a few minutes at the end on Part 2.
In your design review, plan to talk about at least: