The project is online, or should be shortly. The main focus of the project is for you to design and implement a complex, concurrent server, we are going to walk you an example -- a different application that will nonetheless demonstrate many of the same design principles you will need to use.
First, we need a little review. A socket is an open connection to another computer. To open a socket connection to another computer, we need that computer's address, and a port number. A port is basically a virtual address within the other computer. It is not a physical location on the receiving computer, but rather just a tag that accompanies the message that helps tell the operating system which process the message should go to. To open a socket on our own computer, then, we need to provide the port that we will listen for connections on.
Socket communication works like this: the listener creates a socket, binds it to an address, and begins to listen for connections. In CML, the code looks like this:
val insock:(Socket.passive INetSock.stream_sock) = INetSock.TCP.socket()
val myaddr = (case NetHostDB.fromString("127.0.0.1") of
SOME x => x
| NONE => raise Fail "Couldn't resolve 127.0.0.1!")
val _ = Socket.bind(insock,INetSock.toAddr(myaddr,8012))
val _ = Socket.listen(insock,10)
You don't need to worry about exactly what every bit of this code means. The only important things to know are that this code creates and begins listening on a new socket created (in this case) on port 8012.
At this point, though, we don't have a connection to another computer. In order to wait for a connection, we need to call another function, Socket.accept. This takes a passive socket and returns a new socket and an address. The best way to explain this is as follows: sockets are a virtual distinction created and enforced so that we can distinguish between different data streams. We have one generic data stream for "requests for new connections with us," because until we have examined these requests we don't know how to split them up; we don't know who they're from. Once we read one of these requests, though, it makes more sense to split off our communications with the sender into a new socket, since it is a separate stream that will be used in a separate data transaction.
Now, say we have some server that accepts requests and handles them through some function "handle." After we've initialized the socket as above, we might run code such as the following:
let
fun run() =
let
val (newsock,addr) = Socket.accept(insock)
in
(TextIO.print("Got one! Throwing it on the queue.\n");
ThreadPool.addJob(pl,fn()=>handle(newsock,addr,anyotherarguments));
run())
end
in
run()
end
I've put the accept call in a tight loop, because "accept" only yields us one socket. I've also used a thread pool to handle requests. This is because there is overhead involved with creating a new thread that in many cases overshadows the trivial costs of actually answering a request, which means that the majority of time spent by the program will be spent creating and destroying threads.
Before we get more into our example, I want to go over some of the technical awkwardness of using sockets in CML. The major issue is that a lot of conversion is needed to put the data into a form that can be read (or written into a logfile.) The address that returns from "Socket.accept" can be decoded into strings as follows:
val (inetaddr,port) = INetSock.fromAddr(addr) val addrstring = NetHostDB.toString(inetaddr) ^ ":" ^ Int.toString(port)The more serious issue is that sockets communicate not in strings, but in Word8Vectors, because CML is no Python. You will have to convert your strings to Word8Vectors. Because Word8Vectors have changed from 110.0.7 to the current version of SML, the online documentation is inaccurate. (I believe it may be inaccurate for sockets as well.) The relevant functions that you should know about, therefore, are:
Word8Vector.fromList : Word8 list -> Word8Vector Word8.fromInt : int -> Word8 Word8Vector.extract : Word8Vector * int * int option -> Word8Vector Word8Vector.foldl : (Word8 * 'b -> 'b) -> 'b -> Word8Vector -> 'b Word8.toInt : Word8 -> intThe "foldl" and "toInt" functions are probably the easiest ways to get data in a form you're accustomed to out of a Word8Vector.
The next snafu is that sending over sockets is hairy, because the function takes a type called a "buf" that is not in the current version of SML, and thus undocumented. Here is the wrapper function that has worked for us in dealing with Socket.sendVec:
fun makeBuf(vec) =
{buf = vec, i = Word8Vector.length(vec), sz = NONE}
val sent = Socket.sendVec(sock,makeBuf(vec))
There are several problems with this method, and we'll take them one at a time. The first, and most obvious, is that requests will not necessarily be acknowledged quickly. If I request a large file, the server will simply stop responding to everyone else until my download is finished. What's worse, because we're using reliable network connections, the "send" operation to me waits a while for me to acknowledge that I received the entire file, so if I lose my connection the entire server is halted to wait for me even when it is finished sending data to me.
The immediate solution here is multi-threading. As discussed before, a thread pool is a significantly better solution than merely creating new threads as they are needed. A good implementation will adjust the number of threads on the fly to meet current demand.
Now, say we create a multi-threaded server. It accepts requests and hands them off to some function, handleReq, which works as follows (any code that is length and irrelevant has been relegated to a side function):
fun handleReq(newsock,addr,logConneHere, assume that "readFromHardDrive" is a function that simply creates a string description of the transaction and calls a file i/o library to write it to the already-open connection to the logfile.ction) = let val msg = Sock.recvVec(newsock,Settings.maxSize) val fname = Utils.getFileName(msg) val data = Utils.readFromHardDrive(data) val _ = Sock.sendVec(newsock,makeBuf(data)) val _ = Utils.logTransaction(logConnection,addr,newsock) in () end
This, of course, is a problem. Multiple threads can be trying to write to the logfile at one time, which will either cause a jumbled logfile output or, more likely, some sort of error. Clearly, we need some sort of control on the logfile connection to prevent this. If we use a SyncVar to ensure that only one of us has the connection at any point in time, it seems like a reasonable solution, but the problem is that file I/O is particularly slow. If every thread has to queue up to write to the log, this can debilitate response time, which is all-important in an HTTP server. A better solution is that every thread drop off its message for the logfile in a message box, and that a separate thread transcribes messages from this box into the logfile. Again, of course, there is some implicit queuing up. Mailboxes are essentially queues with locks, so when two threads are trying to put messages for the logfile onto a mailbox at the same time, one must wait for the other. However, the cost of writing to an in-memory queue is much less than the cost of writing to a file, and so the threads that are handling responses spend much less time waiting on the logging process. Note that under this scheme, the handler function does not take the logfile connection as a parameter; instead, it takes a mailbox that the logfile thread is listening on.
FTP, or File Transfer Protocol, is another protocol used for fielding file requests. Again, the server receives a request, parses it for a filename, reads the file, and sends it to the originator of the request. There are other protocols that also support this. Say that we want an all-purpose mirror fileserver. It periodically copies data from a local computer, then opens sockets on several different ports supporting several different file transfer protocols through which it makes the copied data available. Say that every protocol has its own logfile. How would this be implemented?
Say that we start a separate thread for every protocol supported. Each thread opens the logfile, creates a mailbox for log messages and creates a new thread that listens on the mailbox and transcribes messages into the log. Then each original protocol thread opens the appropriate socket and calls a recursive run() function that accepts messages and hands them off to the appropriate handler function. The handler functions behave as above.
There's only one trick: when we need to copy our files over, how can we know that it's safe to perform writes? We need to stop fielding requests so that we know we don't have a file access conflict. We need to know that we've stopped, and we need to know that all of the requests we've accepted have been finished.
This last part is the easiest. We add a counter -- say, an int SyncVar. Now, we change every handler protocol as follows:
fun handleReq(newsock,addr,logConneNow we know easily how many outstanding operations are being performed by our handler threads. The problem, though, is that we can't control when new operations start, and also that our "copy" function will have to repeatedly take and return the counter to check when it hits 0. Here is a better solution:ction,counter) = let val curr = SyncVar.mTake(counter) val _ = SyncVar.mPut(counter,curr+1) val msg = Sock.recvVec(newsock,Settings.maxSize) val fname = Utils.getFileName(msg) val data = Utils.readFromHardDrive(data) val _ = Sock.sendVec(newsock,makeBuf(data)) val _ = Utils.logTransaction(logConnection,addr,newsock) val curr = SyncVar.mTake(counter) val _ = SyncVar.mput(counter,curr-1) in () end
fun handleReq(newsock,addr,logConneNow, the code for copying will look like this:ction,counter,mutex:unit SyncVar) = let val curr = SyncVar.mTake(counter) val _ = if curr = 0 then SyncVar.mTake(mutex) else () val _ = SyncVar.mPut(counter,curr+1) val msg = Sock.recvVec(newsock,Settings.maxSize) val fname = Utils.getFileName(msg) val data = Utils.readFromHardDrive(data) val _ = Sock.sendVec(newsock,makeBuf(data)) val _ = Utils.logTransaction(logConnection,addr,newsock) val curr = SyncVar.mTake(counter) val _ = if curr = 1 then SyncVar.mPut(mutex) else () val _ = SyncVar.mput(counter,curr-1) in () end
SyncVar.mTake(mutex) (* Do actual copying *) Syncar.mPut(mutex)If this code is able to "take" the SyncVar "mutex," then we know that no handle operations are occurring, and therefore that the write is safe.
There's just one problem: if there's always at least one handle occurring because the server is relatively busy, the mTake on "mutex" will simply never terminate. We need a way to stop our threads from accepting requests. This not only ensures that we will be able to copy without waiting indefinitely, it also ensures that we will not be acknowleding file requests that we are not following up on.
The problem, then, is that we want to communicate with our protocol threads and tell them to stop running in their "accept" loops. It's reasonable to assume we'd want either a mailbox or a channel for each to talk to them. The crude way for them to listen would be as follows:
let
fun run() =
(case (Mailbox.recvPoll(stopWorkingMbox)) of
SOME => (Mailbox.recv(startWorkingAgainMbox); run())
| _ => let
val (newsock,addr) = Socket.accept(insock)
in
(TextIO.print("Got one! Throwing it on the queue.\n");
ThreadPool.addJob(pl,fn()=>handle(newsock,addr,anyotherarguments));
run())
end)
in
run()
end
Mailbox.recvPoll is a non-waiting receive that returns NONE if no
message is in the mailbox and SOME(m) if some message m is. Observe
that this provides the main thread a way of telling the
protocol threads to stop fielding requests as well as a way of telling
them to start again. Second, note that in real life we would not need
two distinct mailboxes, but it makes for an easier example, and might
make for more sensible code.
Third, observe that this doesn't work perfectly. Suppose that the main thread decides to copy files. It sends a "stop" message on every "stopWorkingMBox" -- one for each protocol thread -- and then waits for the counter to hit zero by trying to call an mTake on "mutex." All of the HTTP handler threads finish, but the HTTP protocol thread just happens to experience a lull in traffic; it hangs on the call to Socket.accept. Meanwhile, the counter hits zero, and copying begins. While I am copying, the HTTP server receives a new request, and calls handle.
This is not a disaster, in this case; if you notice the handle function above, you can see that the first "handle" to be called while I am copying will hang on mTake(mutex) and that any future calls to "handle" will hang on mTake(counter), so no actual file access errors will occur. Still, it will result in me acknowledging a request and then failing to fulfill it, which is a situation I would like to avoid. In addition, there are other situations in some applications where it would be even more crucial to me to know that all of my subprocesses are halted before I continue.
The better move, then, is to not have my "run" process hang on "accept" alone, but instead to use an event. This works as follows: I call Mailbox.recvEvt on stopWorkingMbox and Socket.acceptEvt on insock. I have to wrap each with a function so that both events are of the same type; therefore, I will have to create a datatype that can either take a unit, or a value of type (socket * address) -- this is an oversimplifaction of the actual types of "newsock" and "addr" here. I perform a "select" on these two events. What this essentially does is tells the computer to listen on both the mailbox and the socket, and return to me the output from whichever function returns first. Therefore, if a protocol thread never happens to receive an "accept," it will nonetheless react immediately to a request to stop working.
let
fun wrapQuitMsg() = NONE
fun wrapAccept(newsock,addr) = SOME (newsock,addr)
fun run() =
let
val evt1 = wrap(wrapQuitMsg,Mailbox.recvEvt(stopWorkingMbox)
val evt2 = Socket.recvVecEvt(insock)
val outcome = select([evt1,evt2])
in
case outcome of
NONE => (Mailbox.recv(startWorkingAgainMbox);run())
|SOME (newsock,addr) => (TextIO.print("Got one! Throwing it on the queue.\n");
ThreadPool.addJob(pl,fn()=>handle(newsock,addr,anyotherarguments));
run())
end
in
run()
end
Even now, it is uncertain whether or not we will leave any requests dangling. Consider this possibility: the main thread sends "quit" messages on the mailbox. The counter hits zero quickly, but one of the protocol threads has not quit yet. It has just accepted a new request that it has not yet handled. The main thread takes the "counter" SyncVar, and the handler thread hangs while the frustrated users waits for his (acknowledged) request to be fulfilled.
The solution here is twofold. First, we need to move some of the code for "counter:"
let
fun wrapQuitMsg() = NONE
fun wrapAccept(newsock,addr) = SOME (newsock,addr)
fun run() =
let
val evt1 = wrap(wrapQuitMsg,Mailbox.recvEvt(stopWorkingMbox)
val evt2 = Socket.recvVecEvt(insock)
val outcome = select([evt1,evt2])
in
case outcome of
NONE => (Mailbox.recv(startWorkingAgainMbox);run())
|SOME (newsock,addr) => (addToCounter(counter);
TextIO.print("Got one! Throwing it on the queue.\n");
ThreadPool.addJob(pl,fn()=>handle(newsock,addr,anyotherarguments));
run())
end
in
run()
end
Here, "addToCounter" is the first three lines that increment the value
in "counter" and possibly call mTake on "mutex." By placing this
inside of the loop, we ensure that it does not occur after this thread
has already received the "stop working" message. The second change is
to use channels instead of mailboxes; in this way, the main thread
actually makes sure it does not operate until this thread has received
the "stop working" message. Together, these measures ensure that
copying will not begin until all open requests are filled and future
requests are no longer accepted.