next up previous contents
Next: Peer protocols Up: Implementing Protocols Using JChannel Previous: JChannel.Close()

   
Creating a sample protocol

This section discusses the implementation of the fragmentation protocol layer (FRAG, source file JavaGroups/JavaStack/Protocols/FRAG.java) as an example of a relatively simple layer which fragments large messages into smaller ones and reassembles them at the receiver's end before passing them up the stack.

FRAG essentially works as follows: whenever a message exceeding a maximum size is passed to the FRAG layer from the layer above, it is fragmented into a number of smaller messages and a header with the original message's ID, the fragment ID and the number of fragments added to each fragment. The fragments are then sent down the stack (the original message is discarded). At the receiver's side, the FRAG layer removes the header containing the original message's ID and the fragment ID and adds the fragment to the corresponding entry in a fragmentation table. When all fragments have been received, the original message is reconstructed from the fragments and passed up the stack (and the fragments are removed from the fragmentation table).

The FRAG layer extends class Protocol and overrides methods GetName, SetProperties, Up and Down.

GetName is used to identify the protocol and is used by several building blocks to differentiate between protocols: e.g. the configurator (cf. 4.1.5) checks whether duplicate protocol names occur in the stack. If this is the case, stack construction ends with an exception.

SetProperties (see 3.5.1) is used to convey parameters to the FRAG layer: current properties include a debug flag (trace) which, if set to true, causes the protocol to write diagnostic messages to stdout during its execution and frag_size which specifies the maximum size for message. If this size is exceeded, messages will be fragmented.

The fragmentation header is represented as a separate, serializable class. It was previously shown in section 3.1.3. It contains fields msg_id, the original message sequence number, frag_id the fragment ID and num_frags, the number of fragments to be expected. An example of 3 successive fragmentation headers for original message 45 seen by the receiver would be:

   {45, 0, 3} {45, 1, 3} {45, 2, 3}

The 3 fragments 0, 1 and 2 would then be reassembled into the original message 45, and the latter would be passed up the stack.

Method Down is shown below:

    public void Down(Event evt) {
        if(evt.GetType() == Event.MSG) {
            Message msg=(Message)evt.GetArg();
            long    size=msg.Size();            
            if(size > frag_size) {
                Fragment(msg);  // Fragment and pass down
                return;
            }
            else
                msg.AddHeader(new FragHeader(msg.GetId(), 0, 0)); // Add dummy header
        }
        PassDown(evt);  // Pass on to the layer below us
    }

When a message is passed to the FRAG layer by the layer above it (calling Down, FRAG checks whether the event is actually a message (not just a regular event). If this is the case, the message is extracted from the event and its size checked. If the size is larger than frag_size (set by SetProperties previously), the message is fragmented using method Fragment. Otherwise, a dummy header with a 0 for the number of fragments is created, added to the message and the message is passed down the stack. All fragmentation headers with a 0 for the fragment number are just passed up the stack by the corresponding FRAG layer on the receiver's side (no need for reassembly).

Note that FRAG always adds a header to every single message. This has the disadvantage that every message gets a little bit bigger, although most of the messages may never need to be fragmented ! If this pattern is known (many messages are very small, only a few will need to be fragmented), then a better mechanism can be used: fragmentation headers are only added when the message needs to be fragmented, otherwise no header is added. On the receiver's side, if no header is present, the message is passed up the stack (non-fragmented message). Otherwise, a fragmented message was encountered and has to be reassembled accordingly.

The Fragment method is shown below:

    private void Fragment(Message msg) {        
        ByteArrayOutputStream  out_stream;
        ObjectOutputStream     out;
        byte[]                 buffer;
        Vector                 fragments;
        FragHeader             hdr;
        Message                frag_msg;
        Object                 dest=msg.GetDest(), src=msg.GetSrc();
        long                   msg_id=msg.GetId(), rsp_id=msg.GetRspId();
        boolean                oneway=msg.IsOneway();

        try {
            out_stream=new ByteArrayOutputStream();
            out=new ObjectOutputStream(out_stream);
            out.writeObject(msg.GetHeaders());     // write headers
            out.writeObject(msg.GetBuffer());      // write buffer
            buffer=out_stream.toByteArray();
            fragments=Util.FragmentBuffer(buffer, frag_size);  // create Vector of byte[]
            for(int i=0; i < fragments.size(); i++) {
                frag_msg=new Message(dest, src, (byte[])fragments.elementAt(i));
                if(i == 0) frag_msg.SetId(msg.GetId());
                frag_msg.SetRspId(rsp_id);
                if(oneway) frag_msg.SetOneway();
                hdr=new FragHeader(msg_id, i, fragments.size());
                frag_msg.AddHeader(hdr);
                PassDown(new Event(Event.MSG, frag_msg));
            }           
        }
        catch(Exception e) {
            System.err.println("FRAG.Fragment(): " + e);
        }
    }

It serializes the original message's headers and buffer to a byte array, and then breaks that array into several smaller arrays, using Util.FragmentBuffer, giving the buffer and fragmentation size as arguments The fragmented byte buffers are returned in a vector. Now a new message is created for every element in the vector. A FragHeader is created for every message, containing the original message's sequence number, the current fragment number and the total number of fragments to be expected for the original message's sequence number. The header is then added to the message and the latter passed down the stack. Note that the original message is discarded.

Fragmented or non-fragmented messages are handled by the receiver's FRAG layer in method Up:

    public void Up(Event evt) {
        byte[]                m;
        ByteArrayInputStream  in_stream=null;
        ObjectInputStream     in=null;
        Stack                 headers;
        byte[]                tmpbuf=null;
        int                   len;

        if(evt.GetType() == Event.MSG) {
            Message    msg=(Message)evt.GetArg();
            FragHeader hdr=(FragHeader)msg.RemoveHeader();

            if(hdr.num_frags > 0) {
                m=frag_table.Add(msg.GetSrc(), hdr.msg_id, hdr.frag_id,
                                 hdr.num_frags, msg.GetBuffer());
                if(m != null) {
                    msg.SetId(hdr.msg_id);
                    try {
                        in_stream=new ByteArrayInputStream(m);
                        in=new ObjectInputStream(in_stream);                        
                        headers=(Stack)in.readObject();
                        tmpbuf=(byte[])in.readObject();
                    }
                    catch(Exception e) {
                        System.err.println("FRAG.Up(): " + e);
                        return;
                    }
                    msg.SetHeaders(headers);
                    msg.SetBuffer(tmpbuf);
                    PassUp(new Event(Event.MSG, msg));
                }
                return;
            }
        }
        PassUp(evt); // Pass up to the layer above us
    }
If the event is not a message event it is simply passed up the stack. Otherwise, the fragmentation header is removed from the message. If the number of fragments is 0, then the message is not fragmented and passed up. Otherwise, the fragment is added to the fragmentation table, which is an internal class used only by FRAG, using method Add. This method checks whether there is already an entry for the original message's sequence number. If not, a new one will be created. Each entry keeps track of all fragments received so far. When the last missing fragment has been received, Add reassembles the fragments into the original byte buffer4.6, removes the entry together with all fragments and returns the byte buffer. In all other cases, Add returns null.

When a byte buffer has been returned from Add, the headers and message buffer of the original message are reconstructed from it, and the message is passed up the stack.

Note that the FRAG layer requires loss-less delivery, i.e. if a fragment is lost, and no retransmission is initiated (by some other layer), FRAG will not be able to reconstruct the original message. Also, FRAG does only focus on fragmentation/defragmentation, it does not guarantee FIFO ordering with respect to sender. This again follows the design that layers should only focus on one functionality, making them independent from others and relatively simple.

This example of a protocol layer was a typical example: the FRAG layer is not overly complicated, and only required overriding the Up and Down methods of Protocol from which it is derived. More sophisticated protocols will be discussed in the next sections.


next up previous contents
Next: Peer protocols Up: Implementing Protocols Using JChannel Previous: JChannel.Close()

1999-12-13