In this recitation, we will look at the complete code for some basic
servers. First, we will look at a server that accepts a single
connection and simply echos on that connection. We will do this using
both blocking and non-blocking IO.
Then we will expand upon that server to write a server that accepts
multiple connections and echos text from one to all of them (like a
chat server). Again we will look at both blocking and non-blocking
versions.
(* wraps a buffer for transmission *)
fun makeBuf(vec) = {buf = vec, i = 0, sz = NONE}
First, the blocking version. It binds a socket to port 8000, waits
for a connection, and then simply echos what it reads.
let
val socket:Socket.passive INetSock.stream_sock = INetSock.TCP.socket()
val myaddr = valOf(NetHostDB.fromString("127.0.0.1"))
val _ = Socket.bind(socket,INetSock.toAddr(myaddr,8000))
val _ = Socket.listen(socket,20)
val (newsock,addr) = Socket.accept(socket)
fun spin() =
(Socket.sendVec(newsock,makeBuf(Socket.recvVec(newsock,10000)));
spin())
in
spin()
end
Here is the non-blocking version. The isready function tells if the
socket is ready for reading.
...
val (newsock,addr) = Socket.accept(socket)
val descs = [OS.IO.pollIn(Socket.pollDesc(newsock))]
fun isready() =
OS.IO.poll(descs,SOME Time.zeroTime) <> []
fun spin() =
if(isready()) then
(Socket.sendVec(newsock,makeBuf(Socket.recvVec(newsock,10000)));
spin())
else
spin()
in
spin()
end
One drawback to the above implementation using NBIO is that it uses an
excessive amount of CPU power, as it is in a tight loop checking for
new input. Newer versions of SML have a way around this, but for our
purposes the best (only) solution is to have the thread sleep for a
short time between checks.
Chat server
First, we'll look at the blocking version. It will require one thread
per socket. Each thread will send the messages it reads to a mailbox.
Another thread will pull messages off the mailbox and send them out to
all the sockets. In addition, when we lose a connection, we need to
remove it from the list. Its important to note that the code below is not
actually thread safe. If multiple threads try to remove a socket from the
list, or if one thread is removing while another is adding, the socks list
could end up with the wrong value. The fix is to change socks from a ref to
an mvar.
...
(*This is the list of active sockets*)
val socks:(int * (INetSock.inet,Socket.active Socket.stream) Socket.sock) list ref = ref []
val id = ref 0
val mb:Word8Vector.vector Mailbox.mbox = Mailbox.mailbox()
(*This is the function that reads the mailbox and then sends msgs*)
fun writer() =
let
val msg = Mailbox.recv(mb)
in
List.app(fn(_,sock) =>
(Socket.sendVec(sock,makeBuf(msg));())) (!socks);
writer()
end
(*removes a socket when a connection is lost*)
fun remove(id) =
let
fun help(ls) =
case ls of
[] => []
| (a,b)::t => if(id = a) then t else (a,b)::help(t)
in
socks := help(!socks)
end
(*as in the echo server, this reads messages to send to the mailbox*)
fun spin(id,sock) =
let
val vec = Socket.recvVec(sock,1000)
in
if(Word8Vector.length(vec) = 0) then remove(id)
else(
Mailbox.send(mb,vec);
spin(id,sock))
end
(*The listener accepts new connections*)
fun listen() =
let
val (newsock,addr) = Socket.accept(socket)
in
id := !id + 1;
socks := (!id,newsock)::(!socks);
CML.spawn(fn() => spin(!id,newsock));
listen()
end
(*the entry point*)
fun start() =
(CML.spawn(writer);
listen());
RunCML.doit(start,NONE)
Now, the chat server with non-blocking IO. This version requires only
two threads, one for reading and one for writing. We could in fact do
this with just one thread if we wanted. The reader thread could
easily write an incoming message to all the sockets, rather than
putting it on a mailbox. Again, this code is not quite thread safe, since one
thread can add to socks while another is removing. Again, the fix is to change
socks from a ref to an mvar.
...
fun writer() =
let
val msg = Mailbox.recv(mb)
in
List.app(fn(_,_,sock) =>
(Socket.sendVec(sock,makeBuf(msg));())) (!socks);
writer()
end
fun reader() =
let
fun read([]) = false
| read((id,desc,sock)::t) =
if(OS.IO.poll([desc],SOME Time.zeroTime) = []) then read(t)
else (
let val msg = Socket.recvVec(sock,1000) in
if(Word8Vector.length(msg) = 0)then remove(id)
else Mailbox.send(mb,msg)
end;
read(t);
true
)
in
if(read(!socks)) then ()
else CML.sync(CML.timeOutEvt (Time.fromMilliseconds(Int32.fromInt 100)));
reader()
end
fun listen() =
let
val (newsock,addr) = Socket.accept(socket)
in
id := !id + 1;
socks := (!id,OS.IO.pollIn(Socket.pollDesc(newsock)),newsock)::(!socks);
listen()
end
fun start() =
(CML.spawn(reader);
CML.spawn(writer);
listen());
RunCML.doit(start,NONE)