QSM User’s Guide

Krzysztof Ostrowski
krzys@cs.cornell.edu

 

1. Introduction

This document describes the API and usage of the 0.11 version of QSM. In this initial distribution, we offer scalable multicast with simple ACK-based reliability. We refer the reader to [1] for a more detailed description of the current version of QSM.

This release will be followed by a series of updates, replacing or extending all components of the system, including the API presented here. The following lists the major planned extensions, in no particular sequence. Independently, we will  continue to release minor updates and bug fixes.

1.      Support for automated rate control. Currently, rate admission is controlled manually, on a per-node basis, and only in terms of packets/s. We are currently working on an extension that will allow the rate to be set adaptively based on perceived system load and node capacities.

2.      Support for WS-* interfaces. We are working on a solution that will allow QSM to be used as a transport mechanism for messages carrying notifications and events in systems using the WS-Notification and WS-Eventing standards. This will be realized via a per-node local QSM service that subscribes to topics on behalf of local applications, communicates with applications and delivers data locally via web interfaces, and acts as a broker.

3.      Typing support. We shall extend QSM with basic support for group typing, allowing applications to negotiate the type of transmitted data and placing various constraints on the group members. This will facilitate creating groups dedicated for specific tasks, such as delivering news, replicating a file or particular type of service.

4.      Operating system embeddings. We plan to integrate QSM with the operating system shell, via a MyGroups folder accessible directly from the desktop, listing the available groups, and allowing the user to subscribe and attach local clients from a right-click context menu. This functionality will initially rely on our basic typing support. In subsequent releases, it will be integrated with typed endpoints and greatly extended.

5.      Typed endpoints. We are working on a new programming paradigm, typed endpoints, that generalize the traditional class or web service interfaces, verbalize functional and behavioral contracts between communicating parties, relate them to types of communication channels or clients sharing those channels. This will enable extreme extensibility and interoperability and facilitate architecting systems with provable reliability or security properties.

6.      Modular architecture. The next major release will feature a new architecture, in which different protocols can be used in different parts of the system and composed in a hierarchical manner, as described in [2]. This will allow us to optimize QSM to new environments, and it will allow the user to easily implement new protocols.

7.      Strong reliability properties. We are currently working on extending QSM with a strongly reliable multicast protocol, with properties similar to virtual synchrony.

8.      Scalable, fault-tolerant membership/naming services. We shall provide replicated, fault-tolerant, scalable version of the GMS and a fully functional naming service.

2. Setting up

2.1. Prerequisites

Because QSM relies on RDTSC instruction of Intel Pentium processors for its internal timings, it may not run on CPUs that do not support it. We recommend systems with at least 512 MB RAM. To use QSM, the final RTM (2.0.50727) version of .NET Framework 2.0 SDK must be installed.

We tested QSM on Windows 2003 Server Enterprise Edition. Although sparingly, QSM does use unmanaged C++ code and Win32 API, and it is not guaranteed to run on every Windows OS.

2.2. Installation

This version of QSM doesn’t require installation; simply unzip the provided bundle anywhere. To use QSM in your Visual Studio project, add references to the libraries in the Binaries folder, as it is done in the sample projects in the Examples folder.

3. Examples

In the discussion that follows we are assuming that the reader is familiar with [1].

3.1. Example 1

The example below implements a simple messenger.

using QS.CMS.Base2;

using QS.CMS.Base3;

using QS.CMS.Framework2;

 

(…)

 

static void Main(string[] args)

{

    using (IClient client = Client.Create(new ClientConfiguration(args[0], args[1])))

    {

        using (IGroup group = client.Open(new GroupID(1000)))

        {

            group.OnReceive += new IncomingCallback(

                delegate(InstanceID sender, ISerializable msg) { Console.WriteLine(msg.ToString()); });

 

            string line;

            while ((line = Console.ReadLine()) != null && line.Length > 0)

                group.BufferedChannel.Send(new StringWrapper(line));

        }

 

        Console.WriteLine("Unsubscribed. Confirm exit.");

        Console.ReadLine();

    }

}

 

The program requires two arguments. First, called a subnet, allows QSM to choose one of possibly many network interfaces on the local machine. It could be given either in the form of four dot-separated octets with “x” as the wildcard (such as 192.168.0.x for the C-class subnet with address 192.168.0.0, or x.x.x.x to represent the set of all addresses), or as address and mask separated by a colon (e.g. 172.23.64.0:255.255.252.0). The second parameter is the IP address of the node on which the GMS service is hosted. If a local address is given, the GMS service is hosted locally. Example arguments, for a cluster of nodes with addresses of the form 192.168.0.n, with n ranging from 1 to 10, could be 192.168.0.x 192.168.0.1, identical for all nodes. There is no separate GMS process to launch; the QSM service is an integral part of the QSM library. The node that recognizes its local address as the GMS address will automatically host the GMS.

The program subscribes to a single group and waits for user input in a loop. If a string is typed in, a text message is sent to all nodes that are currently members of the group. If an empty string is provided, the program unsubscribes from the group and waits for empty input to confirm exiting. Note that exiting the process on the node that hosts the GMS would cause other processes to lose track of the membership. A node crash during a message transmission may then lead to an endless loop of forwarding and retransmissions. QSM relies on the GMS always being alive.

These arguments are used to construct an instance of the ClientConfiguration class (located, as most of the classes the programmer would deal with, in the QS.CMS.Framework2 namespace). This class stores configuration parameters for QSM. By using the simple two-argument constructor, we set Subnet and GMSAddress and accept the defaults for all the remaining few dozen settings. Other parameters the user might want to set include port numbers, Port and GMSPort (e.g. to run multiple QSM instances on a single node), Verbose to enable internal logging (e.g. to communicate bugs to the author) and LogToConsole to direct logging to the standard console.

The configuration is passed to Client.Create, which instantiates a new QSM client, and returns a disposable reference to it. Each client object runs internally a separate QSM instance, including its own threads, and represents an independent peer. Multiple clients could co-exist on the same local machine, or even in the same process, provided that each is instantiated with either a different port number, or a different IP address chosen as the main interface[1]. When the client is no longer needed, it must be disposed to free resources (we achieve this via the using { } pattern).

Through the Open call in the IClient interface, the application may request newly created client to subscribe to a group. The argument to Open is a unique identifier of the group, an instance of GroupID. In the current version of QSM, group identifiers are simply 32-bit integers. In order to deal with more meaningful names, the user must refer to a separate naming service. The current version of QSM does not include naming services.

This particular call is synchronous, i.e. it will not complete until the client is fully subscribed and ready for sending and receiving. The BeginOpen / EndOpen combo delivers the same result in an asynchronous manner. Each of these calls also has a variant in which configuration options for the member can be set, as flags in the GroupOptions enumeration.

After subscribing, the user receives a disposable reference of type IGroup. To unsubscribe from the group, simply dispose this reference. It is legal to obtain multiple references to the same group through the Open calls to IClient interface. Such references can be used and disposed independently. The client will maintain membership of the group for as long as at least a single local reference to the group exists.

After successfully subscribing, the first action done by the messenger is to register a callback to be invoked on every incoming message. Callbacks must be delegates of type IncomingCallback and they are registered by associating them with the OnReceive event, available via the IGroup interface of the newly created subscription.

The arguments passed to the callback include the complete address of the sender, and the received message. The address is provided in the form of an instance identifier, an InstandeID class from the QS.CMS.Base3 namespace. The InstanceID class has two principal members, Address and Incarnation. The Address member has fields HostIPAddress and PortNumber, identifying the recipient socket, whereas Incarnation has field SeqNo used to distinguish between distinct processes, running at different times, that happen to allocate the same address/port pair as their main socket. We require that every time a QSM instance using a given address/port is started, it must choose an incarnation number greater that any incarnation number used for this address/port pair in the past. This can be done by setting the Incarnation property in ClientConfiguration. Unless explicitly overridden by the user, this number is generated by QSM automatically based on the current local time (with 1-second accuracy).

The second argument is the message, passed as an object that implements ISerializable. Every transmitted object must implement ISerializable. The user can implement this interface directly, or simply use one of the provided wrapper classes. Two such wrappers, StringWrapper from the QS.CMS.Base2 namespace, and XmlObject from the QS.CMS.Base3 namespace, can be used to pass strings using Unicode encoding or arbitrary .NET objects of classes known by all members of the group using the built-in XML serialization, respectively. The XML serialization is very space-inefficient, and for best performance we recommend implementing ISerializable. This interface and the serialization scheme used in QSM are discussed later.

As mentioned above, after subscribing to the group and registering the callback, the messenger is waiting for an input string, and if the input is nonempty, it sends a text message to the group. To send, we first obtain a communication channel by reading the BufferedChannel property exposed via the IGroup interface of our subscription. The returned channel is the standard one, safe to use in any context. It uses internal buffering and does not provide flow control between the application and QSM. It is left up to the user to control the amount of internal buffering by sending data in smaller portions. The user may achieve this fairly easily by exploiting the completion callback, to be discussed later.

The returned channel exposes IChannel interface including two Send calls. Both calls accept the message to transmit as an ISerializable object. Both are asynchronous, and may complete before the actual transmission takes place. One of the calls accepts a CompletionCallback delegate that will be invoked when the message is successfully acknowledged by all live members of the group, and an object to be passed as an argument to the callback.

3.2. Example 2

This example is a modification of the previous one, and it illustrates simple flow control realized through the CompletionCallback. The input given by the user is interpreted as the number of messages to send. The sender initially starts up to 100 concurrent sends. Afterwards, new sends are initiated only after completion of the preceding sends, until all messages are transmitted. Because we use an anonymous delegate (for brevity), which must be defined before we create a callback, yet it needs to use the callback itself, we pass the callback in the context argument.

Apart from the use of completion callback, this example differs from the previous one also in that it specifies the FastCallbacks flag during the group creation. When this flag is set, callbacks are always executed in the context of the internal QSM thread. This enables the highest performance, and additionally in certain cases eliminates the need for some synchronization. An example of the latter is in the receive callback, here we can increment the nreceived counter in an unsafe manner because no other thread besides the QSM thread can enter this section. The FastCallbacks option must be used with caution. Specifically, callbacks must terminate very fast, and blocking synchronization should be used with extreme caution, preferably avoided. Note how in the completion callback we use the non-blocking operation Interlocked.Decrement to update the tosend counter. We could not decrement this counter in an unsafe manner, like we did in the receive callback, because at the beginning of the experiment, during the initial burst of sends, the counter may be accessed both by the internal QSM thread executing the callback, and by the application thread.

static int nreceived, tosend, toack;

static AutoResetEvent done = new AutoResetEvent(false);

 

static void Main(string[] args)

{

    using (IClient client = Client.Create(new ClientConfiguration(args[0], args[1])))

    {

        using (IGroup group = client.Open(new GroupID(1000), GroupOptions.FastCallbacks))

        {

            group.OnReceive += new IncomingCallback(

                delegate(InstanceID sender, ISerializable message) { nreceived++; });

 

            string line;

            while ((line = Console.ReadLine()) != null && line.Length > 0)

            {

                tosend = toack = System.Convert.ToInt32(line);

 

                CompletionCallback completionCallback =

                    new CompletionCallback(

                        delegate(bool success, Exception error, object context)

                        {

                            if (Interlocked.Decrement(ref tosend) >= 0)

                                group.BufferedChannel.Send(

                                    new StringWrapper("a message"),  (CompletionCallback) context, context);

 

                            if (--toack == 0)

                                done.Set();

                        });

                        

                for (int nsends = 0; nsends < 100 && Interlocked.Decrement(ref tosend) >= 0; nsends++)

                    group.BufferedChannel.Send(

                        new StringWrapper("a message"), completionCallback, completionCallback);

 

                done.WaitOne();

                Console.WriteLine("Acknowledged");

            }

        }

 

        Console.WriteLine("Unsubscribed. Received " + nreceived.ToString() + ". Confirm exit.");

        Console.ReadLine();

    }

}

 

3.3. Example 3

Apart from the traditional push-style sending, QSM provides also a pull-style interface, offering higher performance and freeing the user from dealing with flow control. Here, instead of making calls to send messages directly to the BufferedChannel, the user schedules a send operation and provides a sending callback, to be invoked when the rate, flow and concurrency control conditions permit sending.

static int nreceived, tosend, toack;

static AutoResetEvent done = new AutoResetEvent(false);

 

static void Main(string[] args)

{

    using (IClient client = Client.Create(new ClientConfiguration(args[0], args[1])))

    {

        using (IGroup group = client.Open(new GroupID(1000), GroupOptions.FastCallbacks))

        {

            group.OnReceive += new IncomingCallback(

                delegate(InstanceID sender, ISerializable message) { nreceived++; });

 

            string line;

            while ((line = Console.ReadLine()) != null && line.Length > 0)

            {

                tosend = toack = System.Convert.ToInt32(line);

 

                CompletionCallback completionCallback = new CompletionCallback(

                    delegate(bool success, Exception error, object context)

                    {

                        if (--toack == 0)

                            done.Set();

                    });

 

                group.ScheduleSend(new OutgoingCallback(

                    delegate(IChannel channel, uint maxsend, out bool hasmore)

                    {

                        while ((hasmore = tosend > 0) && maxsend > 0)

                        {

                            channel.Send(new StringWrapper("a message"), completionCallback, null);

 

                            tosend--;

                            maxsend--;

                        }

                    }));

 

                done.WaitOne();

                Console.WriteLine("Acknowledged");

            }

        }

 

        Console.WriteLine("Unsubscribed. Received " + nreceived.ToString() + ". Confirm exit.");

        Console.ReadLine();

    }

}

 

Scheduling asynchronous sends in this manner is performed via a call to ScheduleSend, which takes a single argument, an OutgoingCallback delegate. The sending callback takes three arguments. The first one is a channel to be used for sending within the context of this callback. While it is possible to still use the BufferedChannel from within the callback, it makes little sense, for the channel provided in the callback is more efficient and more functional. The second argument is the maximum number of messages that can be sent from within this callback. This is a soft limit. The application may send more; the extra messages will be simply buffered and transmitted at a later time. Finally, the third, output argument is a Boolean value indicating whether there are more messages that the application would like to send at a later time. If this parameter is set to true, the callback will be triggered again, otherwise it will be permanently unregistered. In the latter case, the application will have to invoke ScheduleSend again to re-register the callback.

3.4. Example 4

In this example, we introduce the QSM serialization. The following is an example user class that holds a pair of double values and implements the ISerializable interface.

[ClassID(MyMessage.CLASSID)]

public class MyMessage : ISerializable

{

    public MyMessage() { }

    public MyMessage(double x, double y) { this.x = x; this.y = y; }

 

    public const ushort CLASSID = ClassID.UserMin + 100;

    public double x, y;

 

    public override string ToString() { return x.ToString() + ", " + y.ToString(); }

 

    unsafe SerializableInfo ISerializable.SerializableInfo

    {

        get { return new SerializableInfo(MyMessage.CLASSID, 2 * sizeof(double)); }

    }

 

unsafe void ISerializable.SerializeTo(

    ref QS.Fx.Base.ConsumableBlock header, ref IList<QS.Fx.Base.Block> data)

    {

        fixed (byte* pheader = header.Array)

        {

            *((double*)(pheader + header.Offset)) = x;

            *((double*)(pheader + header.Offset + sizeof(double))) = y;

        }

        header.consume(2 * sizeof(double));

    }

 

unsafe void ISerializable.DeserializeFrom(

    ref QS.Fx.Base.ConsumableBlock header, ref QS.Fx.Base.ConsumableBlock data)

    {

        fixed (byte* pheader = header.Array)

        {

            x = *((double*)(pheader + header.Offset));

            y = *((double*)(pheader + header.Offset + sizeof(double)));

        }

        header.consume(2 * sizeof(double));

    }

}

First, every serializable class must have a unique 16-bit identifier, known to QSM at the receivers. This identifier, included in the transmitted messages, allows QSM to determine what type of object to construct to deserialize the message contents. The simplest way to make a class known to QSM is to annotate it with ClassID attribute (from the QS.CMS.Base2 namespace). When the QSM library is first loaded, it iterates over all assemblies in the current app domain and registers all types implementing ISerializable and annotated with the ClassID attribute. For dynamically loaded assemblies, registration may be performed explicitly with Serializer.registerClass. The identifiers chosen for the user classes must not collide with internal QSM class identifiers. To this purpose, we provide constants Class.UserMin and Class.UserMax, represents the smallest and largest legal user class identifiers, respectively.

Serializable classes must also have a default no-argument constructor, used by the internal QSM serializer to create objects. Finally, they have to implement ISerializable, including three members: SerializeTo to serialize an object, DeserializeFrom to initialize an object that was created by the serializer using a default, no-argument constructor, and property SerializableInfo providing serialization-related information, such as a class identifier and space requirements.

In the QSM serialization scheme, a representation of every serialized object includes two parts, header and the data proper. Headers are assumed to occupy small, contiguous chunks of memory, while data for each object could include multiple memory segments. The user places data in the header by appending bytes to a pre-allocated block of memory (the header argument of the SerializeTo method), whereas storing in the data section is done by appending references to the user’s own memory buffers to a gather-style list of memory segments (the data argument), without any copying. The reason for this distinction is efficiency. For very small amounts of data, of a few to a few tens of bytes, appending to the header is very fast. For larger amounts of data, this is not efficient, for it involves an unnecessary extra memory copy, hence the gather-style list.

Internally, when sending data at the lowest level, QSM will serialize a transmitted tree of objects by first reading their SerializableInfo properties to obtain their space requirements, then allocating a segment of memory that can store all the objects’ headers, and finally invoking the objects’ SerializeTo methods to append their header data to the pre-allocated common memory segment and to store their buffers on the list. When transmitted over the network, all the objects’ headers, stored in the shared chunk of memory, precede all of their data sections. Before transmission, the transmitted chunks of memory are pinned, and scatter-gather I/O is used to place the data on the wire. The overheads of memory pinning and performing scatter-gather I/O grow with the number of transmitted memory segments. By packing multiple small headers together in one segment we reduce it. For this reason, we recommend that the user also keep smaller amounts of data in the header, while leaving larger memory chunks for the gather list.

The header is represented as a ConsumableBlock, a structure very similar to the standard ArraySegment. The Array property returns an internal byte array, Offset returns the position in the array where data should be appended, and Count returns the maximum number of bytes that may be stored at the current position. We append to such a header segment by writing to the array returned by Array, at the position indicated by Offset, and invoking the consume method to update the offset and the remaining space, the number of bytes appended given to it as argument. Deserialization follows a similar pattern. First read the data, then invoke consume.

Appending data segments during serialization is as simple as placing a Block representing a contiguous region of memory on a list, in our example this was not necessary. During deserialization, all data segments are concatenated into a single ConsumableBlock. If the sizes of segments may vary, the user should put the necessary information in the header during serialization, so that the deserialization code knows where the respective pieces of serialized data start and end.

The SerializableInfo structure, returned by a property of the sane name, contains class identifier, the header size, the total size of the serialized data, and the number of data segments on a gather list. Several constructors are available. The one used in the example assumes that there is no data outside the header; hence it only requires the class identifier and the header size, the remaining parameters are assumed to be zero.

The new class can be used as in the following code sample.

static void Main(string[] args)

{           

    using (IClient client = Client.Create(new ClientConfiguration(args[0], args[1])))

    {

        using (IGroup group = client.Open(new GroupID(1000)))

        {

            group.OnReceive += new IncomingCallback(

                delegate(InstanceID sender, ISerializable msg) { Console.WriteLine(msg.ToString()); });

 

            string line;

            while ((line = Console.ReadLine()) != null && line.Length > 0)

            {

                double x = System.Convert.ToDouble(line.Split(',')[0]);

   double y = System.Convert.ToDouble(line.Split(',')[1]);

 

                group.BufferedChannel.Send(new MyMessage(x, y));

            }

        }

 

        Console.WriteLine("Unsubscribed. Confirm exit.");

        Console.ReadLine();

    }

}

 

4. Remarks

4.1. Controlling Send Rates

The current version of QSM does not automatically adjust the sending rate based on system load and node capacities. Sending, however, is rate-limited. The application can control the maximum sending rate for multicast traffic by adjusting the DefaultMaximumRateForMulticast parameter in ClientConfiguration. This value controls the rate of IP multicast on a per IP multicast address basis. The total rate of the outgoing traffic across all IP multicast groups could be higher. A similar parameter DefaultMaximumRateForUnicast limits unicast rate, on a per-receiver basis. By default, maximum multicast rate is set to 10000, and maximum unicast rate is set to 300.

4.2. Hybring Sending Mode

As described in [1], a group may use a hybrid sending mode where data is initially transmitted to a per-group IP multicast address and then retransmitted and acknowledged on a per-region basis. This is achieved by setting the GroupOptions.Hybrid option when opening the group. Note that both the sending client and the GMS client must be launched with the EnableHybridMulticast option set. The application may use both modes of multicasting simultaneously for a given group by opening the group twice with and without the GroupOptions.Hybrid flag. In that case, both types of multicast will use separate channels.

4.3. Multicast Performance

Performance results in [1] were obtained using a test written with a slightly lower-level interface. The interface presented here is a wrapper over it. Although we expect that with a FastCallback setting performance should be similar to that cited in [1], we have not evaluated it. The user may request a source distribution and analyze the QS.TMS.Experiments.Experiment_263 class that was used for all experiments in [1], or contact the author. If performance in any specific scenario turns out to be insufficient, the author will provide an updated version of this distribution with an appropriate optimization or a bug fix, or document the lower-level interface.

4.4. Threads and Callbacks

Each of the three types of callbacks: the send callbacks (of type OutgoingCallback), completion callbacks (of type CompletionCallback) and receive callbacks (of type IncomingCallback) can be invoked either within the context of the QSM core thread, or within the context of a separate QSM completion thread with a lowered priority, created specifically for this purpose. This can be controlled for each type of callbacks[2] separately, by setting or not setting FastSendCallback, FastCompletionCallback, and FastReceiveCallback, respectively.

As noted earlier, executing within the context of the core thread places stricter timing requirements on the application and is generally unsafe. Work should be limited to minimum and synchronization should be avoided, or done in a non-blocking manner.

Note that even when executing in the context of the completion thread, currently the default for receives and completions, the application might introduce deadlocks if using blocking synchronization, for in this release we use only a single completion thread to handle all unsafe execution. We recommend using synchronization in this context only for critical sections and avoid waiting.

4.5. Changing Membership

By default, QSM batches membership change requests, first at clients, and then once more at the GMS, to support large scale configurations. The batching interval on the clients is controlled by the MembershipChangeClientBatchingInterval configuration setting (100ms by default). The batching interval on the GMS is controlled by the GMSBatchingTime setting (1s by default).

5. References

[1]   Krzysztof Ostrowski, Ken Birman, Amar Phanishayee. QuickSilver Scalable Multicast. In Submission, 2006.
http://www.cs.cornell.edu/projects/quicksilver/public_pdfs/QSM-OSDI2006.pdf

[2]   Krzysztof Ostrowski, Ken Birman. Extensible Web Services Architecture for Notification in Large-Scale Systems. To appear in IEEE ICWS 2006.
http://www.cs.cornell.edu/projects/quicksilver/public_pdfs/krzys_icws2006.pdf

 

 

 



[1] While it is possible for nodes on different subnets to communicate with one another and form groups, they will not be able to receive each other’s multicast, because by default, we are always setting TTL for multicast to 1. This may be made more flexible in the subsequent release, if necessary (contact the author).

[2] In the current release, we made the FastSendCallback option mandatory. We will soon follow up with an update that relaxes this restriction.