None.
In this problem set, you will be implementing a simplified version of Google's MapReduce. Prior to starting this problem set, you should read the short paper on MapReduce to familiarize yourself with the basic MapReduce architecture. This writeup will assume that you have read sections 1–3.1 of that paper.
The map and reduce functions have the following OCaml types:
val map : 'a -> 'b -> ('c * 'd) list  
val reduce : 'c -> 'd list -> 'e list
These functions will be computed in a distributed fashion by several agents running concurrently. The agents communicate with each other using a messaging protocol defined in shared/protocol.ml. However, that protocol only allows for the transmission of strings, so you must use OCaml's built-in marshalling and unmarshalling facilities to transmit values of different types. We will explain this more thoroughly below.
For a given MapReduce application foo, the application-specific code
is stored in the directory apps/foo/.  The code for the mappers is in
apps/foo/mapper.ml and the code for the reducers is in
apps/foo/reducer.ml.  There is also a controller in the file
apps/foo/foo.ml to handle the initialization of and communication with
the mappers and reducers, reading inputs, and printing results.
The mappers and reducers receive
their input by calling Program.get_input and communicate their
results by calling Program.set_output. The specific mechanisms
for these operations are described below.
Execution is divided into five phases:
    controller.exe is invoked with the name of a MapReduce application on the command
    line along with other application-specific information (e.g., input file name,
    any other parameters as required by the application).  This information is passed to
    main method of the controller in the application directory.
    This method reads the data from the input file
    and parses it into a list of (key, value) pairs to provide as input to the mappers.
    It then connects to worker servers and initializes mappers on these servers.
    
The controller sends each (key, value) pair to an available mapper. The mapper calculates the mapping function on this (key, value) pair, resulting in a list of (key, value) pairs (the types of the input and output do not have to be the same). The resulting list of (key, value) pairs is sent back to the controller. The controller continues to send inputs to the next available mapper until all pairs have been mapped.
For each key produced by the mappers, all of its corresponding values are collected into a single list, resulting in a (key, value list) pair for that key.
The controller connects to worker servers and initializes reducers on these servers. The (key, value list) pairs produced in the Combine phase are then sent to available reducers. The reducers perform the reduce operation and send the results back to the controller. This continues until all input has been reduced.
The controller collects the results of the Reduce phase and outputs them in some manner.
Consider the canonical MapReduce example of counting the number of occurrences of each word in a set of documents. The input is a set of (document id, contents) pairs. In the Map phase, each word that occurs in the contents of a file is mapped to the pair (word, "1"), indicating that the word has been seen once. In the Combine phase, all pairs with the same first component are collected to form the pair (word, ["1"; "1"; ...; "1"]), one such pair for each word. Then in the Reduce phase, for each word, the list of ones is summed to determine the number of occurrences of that word.
For simplicity, our framework accepts only a single data file containing all the
  input documents.  Each document is represented in the input file as a
  (id, document name, contents) triple.  See 
  data/reuters.txt or the shorter files data/test1.txt
  or data/test2.txt for an example of the formatting.
  Document files can be loaded and parsed by calling
  Util.load and passing it Map_reduce.convert. The dataset
  data/reuters.txt is a collection of Reuters articles from the 1980's.
We have included a full implementation of this application in apps/word_count. Here is a detailed explanation of the sequence of events.
controller.exe word_count filename.  It immediately calls
    the main method of apps/word_count/word_count.ml,
    passing it the argument list.  The main method calls
    controller/Map_reduce.map_reduce, which is common controller code
    for performing a simple one-phase MapReduce on documents.  Other more involved
    applications have their own controller code.
    filename are read in and parsed
    using Util.load, passing it the parsing method controller/Map_reduce/convert,
    which splits the collection of documents into {id; title; contents} triples.
    Map_reduce.map kv_pairs "" "apps/word_count/mapper.ml".
    The second argument represents shared data that is accessible to
    all mappers in addition to their input (key, value) pair.  For this application,
    there is no shared data, so this argument is the empty string.  In general,
    the shared data can be accessed by a mapper using Program.get_shared_data().
    Map_reduce.map initializes a mapper worker manager using
    Worker_manager.initialize_mappers "apps/word_count/mapper.ml" "".
    The Worker_manager loads in the list of available workers from
    the file named addresses.  Each line of this file contains a
    worker address of the form ip_address:port_number.  Each of these
    workers is sent the mapper code.  The worker creates a mapper with that code and
    and sends back the id of the resulting mapper.  This id is combined with the address
    of the worker to uniquely identify that mapper.  If certain workers are unavailable,
    this function will report this fact, but will continue to run successfully.
    Map_reduce.map then sends individual unmapped (id, contents) 
    pairs to available mappers until it has received results for all pairs.  Free 
    mappers are obtained using Worker_manager.pop_worker().  Mappers
    should be released once their results have been received using 
    Worker_manager.push_worker.  Read 
    controller/worker_manager.mli for more complete documentation.
    Once all (id, contents) pairs have been mapped, the new list of (word, "1")
    pairs is returned.
    Map_reduce.map_reduce receives the results of the mapping.
    If desired, Util.print_map_results can be called here to display the
    results of the Map phase for debugging purposes.
    Map_reduce.combine.  The results
    can be displayed using Util.print_combine_results.
    Map_reduce.reduce is then called with the results of
    Map_reduce.combine, along with the (empty) shared data string
    and the name of the reducer file apps/word_count/reducer.ml.
    Map_reduce.reduce initializes the reducer worker manager by calling the
    appropriate Worker_manager function, which retrieves worker
    addresses in the same manner as for mappers.
    Map_reduce.reduce then sends the unreduced (word, count list)
    pairs to available reducers until it has received results for all input.
    This is performed in essentially the same manner as the map phase.  When
    all pairs have been reduced, the new list of (word, count) tuples is
    returned.  In this application, because the key value doesn't change,
    Worker_manager.reduce and the reduce workers only
    calculate and return the new value (in this case, count), but don't return
    the key (in this case, word).
    main method of word_count.ml,
    which displays them using Util.print_reduce_results.
    Worker_server receives a connection and spawns a thread, which
    calls Worker.handle_request to handle it.
    Worker.handle_request determines the request type.  If it is an
    initialization request, then the new mapper or reducer is built using
    Program.build, which returns either the new worker id or 
    the compilation error.  See worker_server/program.ml for more complete
    documentation.  If it is a map or reduce request,
    the worker id is verified as referencing a valid mapper or reducer, respectively.
    If the id is valid, then Program.run is called with that id and
    the provided input.  This runs the relevant worker, which receives its input
    by calling Program.get_input(). Once the worker terminates, having
    set its output using Program.set_output, these results are returned
    by Program.run.  If the request is invalid, then the appropriate
    error message, as defined in shared/protocol.ml, is prepared.
    Worker.send_response is called, which is responsible for
    sending the result back to the client.  If the response is sent successfully,
    then Worker.handle_request simply recurses, otherwise it returns unit.
    controller/controller.ml
    This is the main module that is called to start up an application. It does not do anything except parse the command-line arguments to determine which application to start, then calls the main method of that application, passing it the command-line arguments.
map_reduce.ml
    This module provides functionality for performing the actual map, combine, and reduce operations. These methods can be called by the individual applications. A more thorough description is given below.
The method map_reduce in this module contains controller code
    common to a number of applications that manipulate documents.  It can be used with different
    mappers and reducers to achieve different results.  It loads and parses the documents
    from the input file, then calls Map_reduce.map to run the mappers.
    Next, the list of (key, value)
    pairs are combined using Map_reduce.combine.  The combined values are then reduced
    using Map_reduce.reduce.  These results are passed back to the indivitual
    application controllers for output.
    
worker_manager.ml
    
    This module provides the capability to initialize mappers and reducers,
    select inactive workers using the pop function, assign a
    map/reduce task to a worker using either the map or
    reduce function, and return workers that have completed 
    their task to the pool of inactive workers using the push 
    function.
    
    If a request fails due to some sort of network or file descriptor error,
    then the Worker_manager will automatically retry a fixed number
    of times.  It is therefore acceptable for workers to intermittently output
    errors as long as they are still able to service requests.
    
worker_server/program.ml
    This module provides the ability to build and run mappers/reducers.
worker_server.ml
    
    This module is responsible for spawning threads to handle client connection
    requests.  These threads simply invoke Worker.handle_request.
    
worker.ml
    This module is responsible for managing communication between the clients and the mappers/reducers that they build and then run. A more thorough description is contained below.
The protocol that the controller application and the workers use to communicate is stored in shared/protocol.ml.
InitMapper (code, shared_data)
    
    Initialize a mapper.  Each element in the code list is a line in 
    mapper.ml.  shared_data is accessible to all mappers, as
    described above.
    
InitReducer (code, shared_data)
    Initialize a reducer. Same as above.
MapRequest (worker_id, key, value)
    
    Execute mapper with id worker_id with (key, value)
    as input.
    
ReduceRequest (worker_id, key, value list)
    
    Execute reducer with id worker_id with 
    (key, value list) as input.
    
Mapper (worker_id option, error)
    
    Mapper (Some id, _) indicates the requested mapper was successfully
    built, and has the returned id.  Mapper (None, error) indicates
    compilation of the mapper failed with the returned error.
    
Reducer (worker_id option, error)
    Same as above, except for a reducer.
InvalidWorker (worker_id)
    
    Either a map request was made, and the worker_id doesn't correspond to a
    mapper, or a reduce request was made, and the worker_id doesn't correspond
    to a reducer.
    
RuntimeError (worker_id, error)
    
    Worker with id worker_id output the runtime error(s) stored in
    error.  Also used to indicate that the worker didn't return any
    output, meaning Program.run returned None.
    
MapResults (worker_id, (key, value) list)
    
    Map request sent to worker_id resulted in output of (key,
        value) list.
    
ReduceResults (worker_id, value list)
    
    Reduce request sent to worker_id resulted in output of
    value list.
    
Only string data can be communicated between agents.
Values can be converted to and from strings explicitly 
using Util.marshal, which calls the OCaml built-in Marshal.to_string, and
Util.unmarshal, which calls Marshal.from_string.
You can send strings via communication channels without marshalling, but other
values need to be marshalled.  Your mapper or reducer must also convert
the input it receives from strings back to the appropriate type it can operate on, and once
it has finished, it needs to convert its output into strings to communicate it back.
Note that marshalling is not type safe. If you unmarshal a string and treat it as a value of any type other than the type it was marshalled as, your program will most likely crash. You should therefore take particular care that the types match when marshalling/unmarshalling. Make sure that the type of marshalled input sent to your mapper matches the type that the mapper expects, and that the type of the marshalled results that the mapper sends back matches the type expected. The same is true for reducers using the reducer messages. You can still retrieve the type information by matching on the retrieved value.
All code you submit must adhere to the specifications defined in the respective .mli files.
You must implement functions in the following files:
shared/hashtable.ml
    
    You are required to implement a hash table according to the specifications
    in shared/hashtable.mli.  This data structure will be useful in your
    MapReduce implementation. Note: You may not use the OCaml Hashtbl library functions in your implementation. The only exception is that you may use Hashtbl.hash as your hashing function.
    
controller/map_reduce.ml
    The code in this module is responsible for performing the actual map, combine, and reduce operations by initializing the mappers/reducers, sending them input that hasn't been mapped/reduced yet, and then returning the final list of results.
    The map and reduce functions must be implemented
    according to the specifications in controller/map_reduce.mli.  The
    functionality should mirror the above example execution description.  Both
    functions must make use of multiple workers simultaneously, if available,
    and be able to handle both silent and explicit worker failure
    and individual worker slowness.  However, you don't need to handle the case
    when all workers fail when there is no coding error in your worker code
    (i.e. complete network failure).  Also, there should only be one active
    request per worker at a time, and if a worker fails, it should not be
    returned to the Worker_manager using the push_worker
    function.
    
    Both map and reduce share the same basic structure.
    First, the workers are
    initialized through a call to the appropriate Worker_manager
    function.  Once the workers have been initialized, the input list should
    be iterated over, sending each available element to a free worker, which can
    be accessed using the pop_worker function of
    Worker_manager.  A mapper is invoked using
    Worker_manager.map providing the mapper's id and the (key,
    value) pair.  Each mapper (and therefore invocation of
    Worker_manager.map) receives an individual (key, value) pair, and
    outputs a new (key, value) list, which is simply added onto the list of all
    previous map results. A reducer is invoked similarly using 
    Worker_manager.reduce.  These functions block until a response
    is received, so it is important that they are invoked using a thread from
    the included Thread_pool module.  Additionally, this requires
    that these spawned threads store their results in a shared data structure,
    which must be accessed in a thread-safe manner.
    
Once all of the input has been iterated over, it is important that any input that remains unfinished, either due to a slow or failed worker, is re-submitted to available workers until all input has been processed. Once all input has been fully processed, the results are returned.
    Note: It is acceptable to use Thread.delay with a short 
    sleep period (0.1 seconds or less) when looping over unfinished data to send
    to available workers in order to prevent a flooding of workers with unnecessary work.
    
The combine function must be implemented according to specifications,
    but doesn't need to make use of any workers.  It should simply combine the provided
    (key, value) pair list into a list of (key, value list) pairs such that each key in 
    the provided list occurs exactly once in the returned list, and each value list for a given
    key in the returned list contains all the values that key was mapped to in the provided list.
    
worker_server/worker.ml
    The code in this module is responsible for handling communication between the clients that request work and the mapper/reducers that perform the work.
    The handle_request function must be implemented 
    according to the mli specifications and the above description, and must be 
    thread-safe.  This function receives a client connection as input and must retrieve
    the worker_request from that connection.  
    
    If the request is for a mapper or reducer initialization, then the code must be 
    built using Program.build, which returns (Some id, "") where
    id is the worker id if the compilation was successful,
    or (None, error) if the compilation was not successful.  If the
    build succeeds, the worker id should be stored in the appropriate mapper or
    reducer set (depending on the initialization request), and the id should be
    sent back to the client using send_response.  If the build fails,
    then the error message should be sent back to the client using
    send_response.
    
    If the request is to perform a map or reduce, then the worker id must
    be verified to be of the correct type by looking it up in either the mapper
    or reducer set before the mapper or reducer is invoked using 
    Program.run.  The return value is Some v, where v
    is the output of the program, or None if the program failed to provide
    any output or generated an error.
    
    Note that the mapper and reducer sets are shared between all request handler threads 
    spawned by the Worker_server, and therefore access to them must
    be thread-safe.
    
Note: Code in shared/util.ml is accessible to all workers by default.
We have provided a Makefile and a build script build.bat that you can use to build your project.
port_number for requests.
    word_count with datafile filename.
    
    If you would like to define additional modules, please add them to
    the appropriate build.bat file or Makefile.
You will use the simplified MapReduce that you created in part 1 to
implement three applications: inverted_index, page_rank, and apm (A Perfect Matching, our CS 3110 dating service).
Each application will require a mapper apps/xxx/mapper.ml and reducer apps/xxx/reducer.ml,
where xxx is the name of the application, as well as a controller
apps/xxx/xxx.ml. We have supplied
full implementations of a simple application word_count and a
more involved application kmeans from a previous semester as models.
An inverted
index is a mapping from words to the documents in which they
appear.  For example, if these were your documents:
Document 1: 
OCaml map reduce
fold filter ocamlThe inverted index would look like this:
| word | document | 
|---|---|
| ocaml | 1 2 | 
| map | 1 | 
| reduce | 1 | 
| fold | 2 | 
| filter | 2 | 
To implement this application, you should
take a dataset of documents (such as data/reuters.txt) as input and use MapReduce to produce an inverted index.
This can be done in a way very similar to word_count.
In particular, you can use Map_reduce.map_reduce.
Print your final results using Util.print_reduce_results.
Your mapper and reducer code should go in apps/inverted_index/mapper.ml and apps/inverted_index/reducer.ml, respectively, and your controller code should go in apps/inverted_index/inverted_index.ml.
PageRank is a link analysis algorithm used by Google to weight the
importance of different websites on the Internet. 
You will be implementing a very simplified, iterative version
of PageRank. We have supplied the mapper and reducer and part of the
controller, but you have to write the rest.  The supplied method convert
can be used with Util.load to load and parse the contents of
a data file data/websites.txt containing a list of simulated websites,
and print_page_ranks can be used to display the final results.
To begin, we initialize each website with PageRank of 1/n, where n is the total number of websites. Each step of the iteration has two parts: For each website, we divide its PageRank by the number of links out of it, and send this to amount to each website linked to. Then, for all websites, we sum up all values being sent into it, thus obtaining a new PageRank for each website. The number of iterations is supplied on the command line.
For more details on simplified PageRank, see Section 14.3 in Networks, Crowds, and Markets: Reasoning About a Highly Connected World by Cornell Professors David Easley and Jon Kleinberg.
Your job is to implement the function main in
apps/page_rank/page_rank.ml. This function takes the filename of
a dataset and a number of iterations as command-line arguments.
In order to simulate websites, we have defined a new type, website.
Each line is of the format pageid@pagetitle@links. The field links is of the
form out1,out2,...,outm, where each outi is the pageid of a link out of
the page.  The file data/websites.txt contains the same example as
on pages 407 and 408 of the Networks book draft linked above.
After performing PageRank for the specified number of iterations, you should print the final results using the supplied function.
In this part, you will implement the application apm, a simple dating service.  There are
some randomly generated profiles of fictional people seeking partners for a romantic relationship
in the files data/profiles.txt (long) and data/test_profiles.txt (shorter).
The type of a profile is Apm.profile.  We have supplied code in apps/apm/apm.ml 
for parsing the data files and printing the results.
This application takes several command-line arguments: the app name, the data file, the number of matches requested,
and the first and last name of the client (let's call him Jeremiah), who should have a profile in the data file.  The application should
pair Jeremiah's profile with every other profile in the database and send these pairs to the mappers.  For each pair, the mappers should
compute a compatibility index, which is a float between 0. (perfectly incompatible) and 1. (perfectly compatible).
We leave the details of this computation up to you.  The compatibility index and pair of profiles are returned to the controller.
Then the controller should combine everything into one list for Jeremiah and send it to a reducer, supplying the desired number of matches n as shared data.  The reducer should return the top n matches from the list, which the controller should then print out
using the supplied print methods.
Here is a sample run from our solution code.
Tpool destroy, still waiting for 93 threads. Tpool destroy, still waiting for 55 threads. ------------------------------ Client: Jeremiah Sanford sex: M age: 23 profession: trucker nondrinker nonsmoker has children does not want children prefers a female partner between the ages of 19 and 29 likes classical music and sports 10 best matches: ------------------------------ Compatibility index: 0.691358 Katy Allen sex: F age: 21 profession: trucker nondrinker nonsmoker no children does not want children prefers a male partner between the ages of 18 and 24 likes country music and sports ------------------------------ Compatibility index: 0.691358 Corina Savage sex: F age: 24 profession: trucker nondrinker nonsmoker no children does not want children prefers a male partner between the ages of 18 and 32 likes country music and sports ------------------------------ Compatibility index: 0.691358 Jenifer Turner sex: F age: 23 profession: unemployed nondrinker nonsmoker has children wants children prefers a male partner between the ages of 18 and 29 likes classical music and sports ------------------------------ Compatibility index: 0.691358 Cassondra Donovan sex: F age: 21 profession: trucker nondrinker nonsmoker has children wants children prefers a male partner between the ages of 18 and 29 likes classical music and movies ------------------------------ Compatibility index: 0.648148 Eboni Case sex: F age: 32 profession: trucker nondrinker nonsmoker no children wants children prefers a male partner between the ages of 24 and 39 likes classical music and sports ------------------------------ Compatibility index: 0.648148 Cinthia Gutierrez sex: F age: 22 profession: trucker nondrinker nonsmoker no children wants children prefers a male partner between the ages of 18 and 26 likes classical music and sports ------------------------------ Compatibility index: 0.648148 Chantel Wyatt sex: F age: 21 profession: trucker nondrinker nonsmoker no children wants children prefers a male partner between the ages of 18 and 28 likes rock music and sports ------------------------------ Compatibility index: 0.648148 Brooklyn Reese sex: F age: 23 profession: trucker nondrinker nonsmoker no children wants children prefers a male partner between the ages of 18 and 28 likes country music and sports ------------------------------ Compatibility index: 0.604938 Kandace Patton sex: F age: 18 profession: trucker nondrinker nonsmoker no children does not want children prefers a male partner between the ages of 18 and 27 likes classical music and sports ------------------------------ Compatibility index: 0.518519 Marie Turner sex: F age: 29 profession: lawyer nondrinker nonsmoker no children does not want children prefers a male partner between the ages of 21 and 35 likes classical music and sports
Note that for the above applications, your MapReduce code will probably run a lot slower than a non-MapReduce implementation of the same algorithm. This slowdown is due to the fact that there is a lot of overhead and the simplified MapReduce framework is not very well optimized. Performance gains are only realized when doing this on a massive scale.
Simply zip up and submit your entire ps5 folder.
You are required to use a source control system like CVS or SVN. Submit the log file that describes your activity. We will provided you with an SVN repository, hosted by CSUG.
For information on how to get started with SVN there, read Using Subversion in the CSUGLab.
Note: Your repository name is cs3110_<netid(s)>. For example,
cs3110_njl36, or if you have a partner: cs3110_dck10_njl36.
Notice that the netids of groups are in alphabetical order. This repository name is what
you put in place of project1 in the directions in the provided link.
If you use Windows, and are unfamiliar with the command line or Cygwin, there is a GUI based SVN client called Tortoise SVN that provides convenient context menu access to your repo.
For Debian/Ubuntu, a GUI-based solution to explore would be Rapid SVN. To install:
apt-get install rapidsvn
Mac users looking into a front-end can try SC Plugin, which claims to provide similar functionality to Tortoise. Rapid SVN might work as well.
There is also a plug-in for Eclipse called Subclipse that integrates everything for you nicely.
Note that all of these options are simply graphically based alternatives to the extremely powerful terminal commands, and are by no means necessary to successfully utilize version control.
We will be holding 15-minute design review meetings. Your group is expected to meet with one of the TAs or the instructor during their office hours, or with consultants during consulting hours. A sign-up schedule is available on CMS. You should be able to explain your design in about 10 minutes, leaving 5 minutes for questions. It's a good idea to practice your presentation ahead of time so you use your time effectively.
Your design review should focus on your approach to Part 1, however time permitting you may spend a few minutes at the end on Part 2.
In your design review, plan to talk about at least:
Our dating service is rather primitive.  For extra karma, feel free to enhance it any way you like to make it more realistic.  We have posted the code we used to generate the random profiles, which you may modify to your liking.  You will find in there a file resources/categories.txt, which defines the categories recorded in a profile, along with a probability distribution for each category used for random generation.  For example, one category is
height: short 25, average 50, tall 25which means that 25% of people are short, 50% are average height, and 25% are tall. The set of categories and alternatives for each category are rather limited, and the probability weightings are just guesses. You may change these any way you like or add new categories. Please note that if you do this, there are also several things in the
apm application that must also be changed (e.g., the type definition Apm.profile, among others).
Right now the apm application only takes one client on the command line.  Thus only one reducer is needed to handle that client.  You might wish to modify the controller to allow multiple clients, which would make better use of the reducers.  The names of the clients should be read in from a file whose filename is specified on the command line.  In case a client is not found in the database, the application should not give up, but should give a warning and handle the other clients.
Just for your information, the lists of first names we used are the 1000 most popular boys' and girls' names in the US in 1991, around the time many of you were born. The last names are the 1000 most frequent last names in the US from the last census, along with their frequencies.