JavaGroups - A Reliable Multicast Communication Toolkit for Java

NOTE: The latest version of JavaGroups is available at:

http://www.sourceforge.net/projects/javagroups

JavaGroups is a group communication toolkit written entirely in Java. It is based on IP multicast, but extends it with
  1. reliability and
  2. group membership.
Reliability includes Group Membership includes


The table below shows where JavaGroups fits in. In unicast communication, where one sender sends a message to one receiver, there is UDP and TCP. UDP is unreliable, packets may get lost, duplicated, may arrive out of order, and there is a maximum packet size restriction. TCP is also unicast, but takes care of message retransmission for missing messages, weeds out duplicates, fragments packets that are too big and present messages to the application in the order in which they were sent. In the multicast case, where one sender sends a message to many receivers, IP Multicast extends UDP: a sender sends messages to a multicast address and the receivers have to join that multicast address to receive them. Like in UDP, message transmission is still unreliable, and there is no notion of membership (who has currently joined the multicast address).
 

Unreliable Reliable
Unicast UDP TCP
Multicast IP Multicast JavaGroups

JavaGroups extends reliable unicast message transmission (like in TCP) to multicast settings. As described above it provides reliability and group membership on top of IP Multicast. Since every application has different reliability needs, JavaGroups provides a flexible protocol stack architecture that allows users to put together custom-tailored stacks, ranging from unreliable but fast to highly reliable but slower stacks. As an example, a user might start with a stack only containing IP Multicast. To add loss-less transmission, he might add the NAKACK protocol (which also weeds out duplicates). Now messages sent from a sender are always received by the recipients, but the order in which they will be received is undefined. Therefore, the user might choose to add the FIFO layer to impose per/sender ordering. If ordering should be imposed over all the senders, then the TOTAL protocol may be added. The Group Membership Service (GMS) and FLUSH protocols provide for group membership: they allow the user to register a callback function that will be invoked whenever the membership changes, e.g. a member joins, leaves or crashes. In the latter case, a failure detector (FD) protocol is needed by the GMS to announce crashed members. If new members want to obtain the current state of existing members, then the STATE_TRANSFER protocol has to be present in this custom-made stack. Finally, the user may want to encrypt messages, so he adds the CRYPT protocol (which encrypts/decrypts messages and takes care of re-keying using a key distribution toolkit).

Using composition of protocols (each taking care of a different aspect) to build reliable multicast communication has the benefit that


 

The JavaGroups API

The API of JavaGroups is very simple (similar to a UDP socket) and is always the same, regardless of how the underlying protocol stack is composed. To be able to send/receive messages, a channel has to be created. The reliability of a channel is specified as a string, which then causes the creation of the underlying protocol stack. The example below creates a channel and sends/receives 1 message:
 
String props="UDP:PING:FD:STABLE:NAKACK:UNICAST:" +
             "FRAG:FLUSH:GMS:VIEW_ENFORCER:STATE_TRANSFER:QUEUE";
Message send_msg, recv_msg;
Channel channel=new JChannel(props);

channel.Connect("MyGroup");
send_msg=new Message(null, null, "Hello world");
channel.Send(send_msg);
recv_msg=(Message)channel.Receive(0);
System.out.println("Received " + recv_msg);

channel.Disconnect();
channel.Close();


The channel's properties are specified when creating it. They include IP Multicast (UDP), failure detection (FD), distributed message garbage collection (STABLE), multicast (NAKACK) and unicast (UNICAST) loss-less and FIFO delivery, fragmentation (FRAG), group membership (GMS, FLUSH, VIEW_ENFORCER, QUEUE) and state transfer (STATE_TRANSFER). Each protocol "X" results in the creation of an instance of a Java class X. All protocol instances are kept in a linked list (the protocol stack), which messages traverse up/down.

To join a group, Connect() is called. It returns when the member has successfully joined the group named "MyGroup", or when it has created a new group (if it is the first member).

Then a message is sent using the Send() method. A message contains the receiver's address ('null' = all group members), the sender's address ('null', will be filled in by the protocol stack before sending the message) and a byte buffer. In the example, the String object "Hello world" is set to be the message's contents. It is serialized into the message's byte buffer.

Since the message is sent to all members, the sender will also receive it (unless the socket option 'receive own multicasts' is set to 'true'). This is done using the Receive() method.

Finally, the member disconnects from the channel and then closes it. This results in a notification being sent to all members who are registered for membership change notifications.
 
 

Building Blocks

Channels are deliberately simple so that all possible can use them. However, JavaGroups also provides high-level abstractions, so called building blocks. They can be put between the application and the channel. The application would then use the building blocks instead of channels. An example is RpcDispatcher which allows applications to make remote group method calls:
 

public class RpcDispatcherTest {
    Channel       channel;
    RpcDispatcher disp;
    RspList       rsp_list;
    String        props="UDP:PING:FD:STABLE:NAKACK:UNICAST:FLUSH:GMS:VIEW_ENFORCER:QUEUE";

    public int Print(int number) {
        System.out.println("Print(" + number + ")");
        return number * 2;
    }

    public void Start() throws Exception {
        channel=new JChannel(props);
        disp=new RpcDispatcher(channel, null, null, this);
        channel.Connect("RpcDispatcherTestGroup");

        for(int i=0; i < 100; i++) {
            Util.Sleep(1000);
            rsp_list=disp.CallRemoteMethods(null, "Print", new Integer(i),
                                            GroupRequest.GET_ALL, 0);
            System.out.println("Responses: " + rsp_list);
        }
        channel.Close();
    }

}
 

As before, the example creates a channel specifying the properties. It also defines a method Print() which will be called by the RpcDispatcher later on. Then an instance of RpcDispatcher is created on top of the channel and the channel connected (this joins the new member to the group). Now, messages can be sent and received. But instead of sending/receiving messages using the channel, the application invokes remote method call using RpcDispatcher's CallRemoteMethods(). The first argument 'null' means send to all group members, "Print" is the name of the method to be invoked, 'new Integer(i)' is the argument to the Print() method, GET_ALL means wait until the responses from all group members have been received and '0' specifies the timeout (in this case, it means wait forever). RpcDispatcher sends a multicast message (containing the method call) to all members (e.g. 4 members, including itself) and waits for 4 replies. If one or more of the members crash in the meantime, the call nevertheless returns and has those replies marked as 'suspected' in the response list. The response list contains an entry for each expected reply, which has the address of the replier, the value (if any, in our case it is an integer), and a flag (received, not received (in case of timeouts) or suspected). If this member is the only group member, then the method call will call its own Print() method.
 
 

More Information


The current set of protocols shipped with JavaGroups provide Virtual Synchrony properties: messages are sent/received in views. Each member has a view which is a set of members that constitute the current membership. When the membership changes, a new view will be installed by all members. Views are installed in the same order at all members. The set of messages between 2 consecutive views will be the same at all receivers. Messages sent in view V1 will be received by all non-faulty members in view V1.

Through its flexible protocol stack architecture, JavaGroups can be adapted to any environment. This can be done by replacing, removing or modifying existing protocols, or by adding new protocols. JavaGroups is an ideal testbed for development and experimentation of new reliable multicast protocols written in Java. As JavaGroups is an OpenSource project, new protocols are always welcome and will be integrated.