In this recitation, we will look at the complete code for some basic servers. First, we will look at a server that accepts a single connection and simply echos on that connection. We will do this using both blocking and non-blocking IO.

Then we will expand upon that server to write a server that accepts multiple connections and echos text from one to all of them (like a chat server). Again we will look at both blocking and non-blocking versions.

  (* wraps a buffer for transmission *)
  fun makeBuf(vec) = {buf = vec, i = 0, sz = NONE}
First, the blocking version. It binds a socket to port 8000, waits for a connection, and then simply echos what it reads.
let
  val socket:Socket.passive INetSock.stream_sock = INetSock.TCP.socket()
  val myaddr = valOf(NetHostDB.fromString("127.0.0.1"))
  val _ = Socket.bind(socket,INetSock.toAddr(myaddr,8000))
  val _ = Socket.listen(socket,20)
  val (newsock,addr) = Socket.accept(socket)
  fun spin() =
        (Socket.sendVec(newsock,makeBuf(Socket.recvVec(newsock,10000)));
        spin())
in
    spin()
end
Here is the non-blocking version. The isready function tells if the socket is ready for reading.
  ...
  val (newsock,addr) = Socket.accept(socket)
  val descs = [OS.IO.pollIn(Socket.pollDesc(newsock))]
  fun isready() =
      OS.IO.poll(descs,SOME Time.zeroTime) <> []
  fun spin() =
    if(isready()) then
        (Socket.sendVec(newsock,makeBuf(Socket.recvVec(newsock,10000)));
        spin())
    else
        spin()
in
    spin()
end
One drawback to the above implementation using NBIO is that it uses an excessive amount of CPU power, as it is in a tight loop checking for new input. Newer versions of SML have a way around this, but for our purposes the best (only) solution is to have the thread sleep for a short time between checks.

Chat server

First, we'll look at the blocking version. It will require one thread per socket. Each thread will send the messages it reads to a mailbox. Another thread will pull messages off the mailbox and send them out to all the sockets. In addition, when we lose a connection, we need to remove it from the list. Its important to note that the code below is not actually thread safe. If multiple threads try to remove a socket from the list, or if one thread is removing while another is adding, the socks list could end up with the wrong value. The fix is to change socks from a ref to an mvar.
  ...
  (*This is the list of active sockets*)
  val socks:(int * (INetSock.inet,Socket.active Socket.stream) Socket.sock) list ref = ref []
  val id = ref 0
  val mb:Word8Vector.vector Mailbox.mbox = Mailbox.mailbox()

  (*This is the function that reads the mailbox and then sends msgs*)
  fun writer() =
    let
      val msg = Mailbox.recv(mb)
    in
        List.app(fn(_,sock) =>
            (Socket.sendVec(sock,makeBuf(msg));())) (!socks);
        writer()
    end

  (*removes a socket when a connection is lost*)
  fun remove(id) =
    let
      fun help(ls) =
        case ls of
        [] => []
        | (a,b)::t => if(id = a) then t else (a,b)::help(t)
    in
      socks := help(!socks)
    end

  (*as in the echo server, this reads messages to send to the mailbox*)
  fun spin(id,sock) =
    let
      val vec = Socket.recvVec(sock,1000)
    in
      if(Word8Vector.length(vec) = 0) then remove(id)
      else(
        Mailbox.send(mb,vec);
        spin(id,sock))
    end

  (*The listener accepts new connections*)
  fun listen() =
    let
      val (newsock,addr) = Socket.accept(socket)
    in
      id := !id + 1;
      socks := (!id,newsock)::(!socks);
      CML.spawn(fn() => spin(!id,newsock));
      listen()
    end

  (*the entry point*)
  fun start() =
    (CML.spawn(writer);
    listen());


RunCML.doit(start,NONE)
Now, the chat server with non-blocking IO. This version requires only two threads, one for reading and one for writing. We could in fact do this with just one thread if we wanted. The reader thread could easily write an incoming message to all the sockets, rather than putting it on a mailbox. Again, this code is not quite thread safe, since one thread can add to socks while another is removing. Again, the fix is to change socks from a ref to an mvar.
  ...
  fun writer() =
    let
      val msg = Mailbox.recv(mb)
    in
        List.app(fn(_,_,sock) =>
            (Socket.sendVec(sock,makeBuf(msg));())) (!socks);
        writer()
    end

  fun reader() =
    let
      fun read([]) = false
      |   read((id,desc,sock)::t) =
            if(OS.IO.poll([desc],SOME Time.zeroTime) = []) then read(t)
            else (
                let val msg = Socket.recvVec(sock,1000) in
                  if(Word8Vector.length(msg) = 0)then remove(id)
                  else Mailbox.send(mb,msg)
                end;
                read(t);
                true
            )
    in
      if(read(!socks)) then ()
      else CML.sync(CML.timeOutEvt (Time.fromMilliseconds(Int32.fromInt 100)));
      reader()
    end

  fun listen() =
    let
      val (newsock,addr) = Socket.accept(socket)
    in
      id := !id + 1;
      socks := (!id,OS.IO.pollIn(Socket.pollDesc(newsock)),newsock)::(!socks);
      listen()
    end
  fun start() =
    (CML.spawn(reader);
    CML.spawn(writer);
    listen());


RunCML.doit(start,NONE)