The Horus Uniform Group Interface

This document is part of the online Horus Documentation, under Horus Utilities.


In a complex distributed system, many protocols implement the support that the various applications require. Rather than implementing one ``super''-protocol that implements any possible feature that may ever be necessary, it is necessary for performance and maintenance reasons to implement and maintain protocols independently. Applications should be able to choose the protocols they require and combine the protocols in some desirable way. Although many distributed systems are built in some layered fashion, they are usually configured at system-building time rather than configured by the application program itself. To be able to do configure protocol stacks after a system has been booted, a generic interface between the layers needs to exist which allows the layers to be bound in an arbitrary fashion.

The Horus architecture allows protocol modules to be stacked at run-time in any order. The interface between the layers, called the Uniform Group Interface is strong enough to support a wide variety of known protocols and their semantics in a convenient way. The architecture introduces objects for communication endpoints and for groups of endpoints.

Horus explicitly separates the notions group and view of a group. A group is an addressing abstraction. A view is a list of endpoints. The notion joining a group should be understood in light of this difference. An endpoint that joins a group is willing to receive messages addressed to the group, but is not guaranteed to receive all messages that will be sent to the group during the membership. It often also indicates a desire to send to the group. Again, messages send to the group are not guaranteed to be delivered to all (or even any) members of the group.

Delivery guarantees are usually specified in terms of views: messages sent to a view are often guaranteed to be delivered to all (non-failing) endpoints in the view. Weaker and stronger guarantees are possible, and depend on which protocol modules the particular endpoints have stacked. We do not present any guarantees here, but just an architecture for a system that can implement a variety of guarantees.

SOURCES

include/horus/xxx_layer.h
src/hutil/xxx_layer.c

INCLUDE

#include "muts.h"
#include "muts/xxx_layer.h"

INTERFACE

Layer Initialization/Registration

When Horus initializes, each protocol layer has to register itself, specifying a unique ASCII name for the layer, and a dispatch table of all the entry function in the layer. For this it uses the xxx_config structure:

struct xxx_config {
        char                    *name;
        xxx_ack_f               *ack;
        xxx_address_f           *address;
        xxx_cast_f              *cast;
        xxx_contact_f           *contact;
        xxx_destroy_f           *destroy;
        xxx_done_f              *done;
        xxx_endpoint_f          *endpoint;
        xxx_entity_f            *entity;
        xxx_flush_f             *flush;
        xxx_flush_ok_f          *flush_ok;
        xxx_join_denied_f       *join_denied;
        xxx_join_f              *join;
        xxx_join_granted_f      *join_granted;
        xxx_leave_f             *leave;
        xxx_merge_f             *merge;
        xxx_pt2pt_group_f       *pt2pt_group;
        xxx_send_f              *send;
        xxx_view_f              *view;
        xxx_focus_f             *focus;
        xxx_stable_f            *stable;
        xxx_dump_f              *dump;
};

In this structure, name is the ASCII name of the layer. The rest are pointers to the various downcalls that layers can provide to upper layers or applications. (Note that the xxx_pt2pt_group function pointer is only included for compability reasons, but its use is no longer supported.) At initialization time, the layer calls xxx_config() to pass the structure to the Horus system (most Horus routines return a MUTS error_t indication of success or error):

error_t xxx_config(
        struct xxx_config *config
);

For example, the fragmentation/reassembly layer FRAG contains the following initialization code:

static struct xxx_config frag_config = {
        "FRAG",
        (xxx_ack_f *)           frag_ack,
        (xxx_address_f *)       frag_address,
        (xxx_cast_f *)          frag_cast,
        (xxx_contact_f *)       frag_contact,
        (xxx_destroy_f *)       frag_destroy,
        (xxx_done_f *)          frag_done,
        (xxx_endpoint_f *)      frag_endpoint,
        (xxx_entity_f *)        frag_entity,
        (xxx_flush_f *)         frag_flush,
        (xxx_flush_ok_f *)      frag_flush_ok,
        (xxx_join_denied_f *)   frag_join_denied,
        (xxx_join_f *)          frag_join,
        (xxx_join_granted_f *)  frag_join_granted,
        (xxx_leave_f *)         frag_leave,
        (xxx_merge_f *)         frag_merge,
        (xxx_pt2pt_group_f *)   0,
        (xxx_send_f *)          frag_send,
        (xxx_view_f *)          frag_view,
        (xxx_focus_f *)         frag_focus,
        (xxx_stable_f *)        0,
        (xxx_dump_f *)          frag_dump
};

error_t frag_interface_init(void){
        e_enter(frag_interface_init);
        return e_pass(xxx_config(&frag_config));
}

The frag_interface_init routine can be included in the MUTS conf table, which is a list of function that is invoked at initialization.

Endpoint Creation

Endpoints are objects of type xxx_endpoint_id. They are generated by the function xxx_endpoint():

error_t xxx_endpoint(
        xxx_endpoint_id below,
        char *stack,
        unsigned int options,
        thread upcall,
        void *env,
        OUT xxx_endpoint_id *endpoint
);

Here, stack is a null-terminated ASCII string that specifies the stack of protocols that this endpoint will use for communication. Below specifies a previously generated endpoint on which this protocols will be stacked. Below is null in the common case in which an endpoint is generated from scratch. Options is a bit mask of flags that influences the detailed behavior of xxx_endpoint(). Upcall is a descriptor of how events specific to the endpoint should be delivered. Env is passed to the upcall. The result of xxx_endpoint() is returned, by reference, in endpoint.

The stack string specified to xxx_endpoint() has a syntactical structure. It consists of a list of layer descriptors separated by colons. Each layer descriptor is an alpha-numeric name, optionally followed by arguments enclosed in braces. The arguments are separated by commas (each comma may optionally be followed by spaces). Each argument is of the form name=value, where name is an alphanumeric string. For example, this is a valid stack string: ``MBRSHIP:FRAG(size=1024):NAK(window=16, sweep=1000):COM''.

The effect of calling xxx_endpoint() with this stack is that the endpoint entry function of the MBRSHIP layer will be invoked with the remaining string ``FRAG(size=1024):NAK(window=16, sweep=1000):COM''. The endpoint entry function has the same signature as xxx_endpoint(). Typically, this function will allocate a record to return to the invoker, and will then call xxx_endpoint() itself with the remaining stack. The xxx_endpoint_id this is returned by this function is saved in the record. As this progresses recursively, a linked list of endpoint records is formed that resembles the protocol stack.

In addition, each entry routine will call xxx_get_args() to get the arguments optionally provided to each layer through the stack specification:

error_t xxx_get_args(
        OUT xxx_args_t *args,
        IN OUT char **stackp
);

This routine takes a pointer to the stack specification string (pointed to be stackp, and extracts the arguments to this layer, which it saves in args. The arguments can then be inspected using xxx_getstr():

char *xxx_getstr(
        struct mem_chan *mc,
        xxx_args_t args,
        char *name
);

This will return the ASCII value for the given name if specified, allocated from the mc memory channel. If no such argument is specified, NULL is returned.

xxx_rel_args() is invoked to release the information kept to save the arguments:

void xxx_rel_args(
        xxx_args_t args
);

The endpoint structure that is returned is of type xxx_endpoint_id, which is defined as follows:

struct xxx_ident {
        struct xxx_config *config;
        enum { XXX_ENDPOINT, XXX_GROUP } type;
        char *name;
};
typedef struct xxx_ident *xxx_id;
typedef xxx_id xxx_endpoint_id;

Here config points to the layers configuration table (so future generic xxx operations on the endpoint can dispatch the right function), and the type of the identifier, which for this case has to be XXX_ENDPOINT. Optionally, for debugging purposes, the endpoint may be given an ASCII name.

Most layers will want to include additional information in the endpoint. This is why the layers typically define an endpoint as a structure that starts out with the struct xxx_ident structure, but is then followed by layer-specific information. For example, the fragmentation layer uses the following endpoint definition:

struct frag_endpoint {
        struct xxx_ident xxx;
        xxx_endpoint_id below;
        int frag_size;
};
typedef struct frag_endpoint *frag_endpoint_id;

Here below points to the endpoint of the layer below this layer, and frag_size is the fragmentation size used by this endpoint. The frag_endpoint() entry in the FRAG layer allocates and initializes this structure and returns it to the invoker:

error_t frag_endpoint(
        xxx_endpoint_id below, char *stack
        unsigned int options,
        thread upcall, void *env,
        frag_endpoint_id *epp
) {
        frag_endpoint_id ep; xxx_args_t args;
        char *mem; error_t err;

        e_enter(frag_endpoint);

        /* Get the arguments. */
        err = xxx_get_args(&args, &stack);
        if (!e_isok(err))
                return e_pass(err);

        /* Allocate an endpoint structure. */
        ep = mc_alloc(frag_memory, sizeof(*ep));
        ep->xxx.config = &frag_config;
        ep->xxx.type = XXX_ENDPOINT;

        /* Get the 'size' argument.  If it isn't there, assign a default. */
        mem = xxx_getstr(frag_memory, args, "size");
        if (mem != 0) {
                ep->frag_size = atoi(mem);
                mc_free(frag_memory, mem);
        }
        else ep->frag_size = FRAG_SIZE;

        xxx_rel_args(args);

        /* Call the layer below. */
        err = xxx_endpoint(below, stack, options, upcall, env, &ep->below);
        if (!e_isok(err)) {
                mc_free(frag_memory, ep);
                return e_pass(err);
        }

        *epp = ep;
        return e_ok();
}

Endpoint entity identifiers

In MUTS, addresses are called entity identifiers. MUTS defines the type eid_t to contain addresses for various protocols. Horus, for software engineering technical reasons, has defines its own address type: xxx_entity_t. Although currently an xxx_entity_t is a structure containing nothing but an eid_t, in the future it is possible that Horus will associate extra information with entity identifiers.

Given a group handle of type xxx_endpoint_id, it is possible to retrieve the corresponding (local) address using the function xxx_entity():

error_t xxx_entity(
        xxx_endpoint_id ep,
        OUT xxx_entity_t *address
);

Group Structures

An endpoint object is allowed to join one or more groups. For each of the groups that it wishes to join, it has to allocate a group structure, which is termed, perhaps unfortunately, a group object. And perhaps even more unfortunately, the routine to allocate the group object is called xxx_join(). It is similar to the xxx_endpoint() function:

error_t xxx_join(
        xxx_endpoint_id ep,
        eid_t *address,
        unsigned char *key,
        eid_t *contact,
        unsigned int options,
        thread upcall,
        void *env,
        xxx_message_t *msg,
        char *name,
        OUT xxx_group_id *group
);

In this function, ep specifies which endpoint is allocating the group object. Address is the MUTS group entity identifier that serves as the address of the group. It is allocated by the MUTS mt_neweid function. Key is an optional group key. For now we assume that NULL is specified. Contact is the MUTS entity identifier for the group contact. Again, contact is optional, and we will for now assume that NULL is specified. Msg is an optional message that is transfered to the contact. It is ignore if contact is NULL. name is an ASCII string used for debugging. Options, upcall, and env are as in xxx_endpoint(), and more about this will follow later. A group object identifier is returned.

The group object identifier type is defined the same as endpoints:

typedef xxx_id xxx_group_id;

That is, technically, there is no difference between an endpoint type and a group object type. However, the type field in a group object identifier is set to XXX_GROUP. As with endpoints, layers can add their own information to group objects. For example, the FRAG layer defines its group object as follows:

struct frag_group {
        struct xxx_ident xxx;
        lock_t lock;
        frag_endpoint_id endpoint;
        xxx_group_id below;
        evcnt_t evcnt;
        thread upcall;
        void *env;
};

The frag_join operation is implemented as follows (for brevity, error handling has been omitted):

error_t frag_join(
        frag_endpoint_id ep,             eid_t *group_address,
        unsigned char *public_key,       eid_t *group_contact,
        unsigned int options,            thread upcall,
        void *env,                       xxx_message_t *msg,
        char *name,                      frag_group_id *pgroup
) {
        frag_group_id group;
        error_t err;

        e_enter(frag_join);

        group = mc_alloc(frag_memory, sizeof(*group));
        sys_zero(group, sizeof(*group));
        group->xxx.config = &frag_config;
        group->xxx.type = XXX_GROUP;
        l_init("frag_group", &group->lock);
        ec_init("frag_group", &group->evcnt);
        group->endpoint = ep;
        group->upcall = upcall;
        group->env = env;

        l_lock(&group->lock);

        err = xxx_join(ep->below, group_address, public_key,
                        group_contact, options,
                        frag_group_event_thread, group,
                        msg, name, &group->below);

        l_unlock(&group->lock);

        return e_pass(err);
}

When comparing this function with the frag_endpoint() function, it can be seen they are quite similar. In fact, two of the basic differences (using a lock, and specifying a local upcall function) are due to simplifications in the presentation of frag_endpoint(). Thus the only real difference is that frag_join does not receive an ASCII protocol stack specification, and thus does not need to parse this. Instead, a group object basically inherits the protocol stack from the endpoint.

As with endpoint objects, there is a function to retrieve the address of a group: xxx_address(). This function can also be used to retrieve the key of a secure group:

error_t xxx_address(
        xxx_group_id group,
        OUT eid_t *address,
        OUT unsigned char *key
);

Either the address or key pointer can be null if that information is not necessary. The buffer pointed to by key has to be large enough to hold the key. It is assumed that, since the key was provided when xxx_join() was invoked, the size is known. Note that this function is strictly for convenience only: the data returned should be the same as which was passed to xxx_join() before.

Upcalls

Both Horus endpoint and group objects have upcalls associated with them. These upcalls are basically functions that are invoked in response to certain events, which we will define below. The upcalls are defined by the MUTS t_declare() function. Upcalls can currently be functions, queues, or threads. For now, we will assume that each upcall results in a thread invocation. As defined by MUTS, a thread invokes a procedure (specified in t_declare() with two void* arguments. The procedure returns void. The first argument passed to this procedure is the env argument that was previously specified by the invoker of xxx_endpoint() or xxx_join(). It is typically a pointer to the endpoint resp. the group structure that the layer allocated, so that the upcall has access to this structure. The second argument points to an struct xxx_event structure that describes the event. This structure is defined as follows:

struct xxx_event {
        struct xxx_event *next;
        enum xxx_event_type type;
        int event_number;
        int nmembers;
        int my_rank;
        int origin;
        unsigned int flags;
        xxx_entity_t *view;
        xxx_entity_t *joiners;
        int njoiners;
        xxx_join_id join_ident;
        int join_granted;
        xxx_entity_t *problem_list;
        int problem_size;
        xxx_message_t *msg;
        struct xxx_ack_matrix *mcast_acks;
        struct xxx_ack_matrix *pt2pt_acks;
        struct xxx_ack_matrix *current_time;
        struct xxx_ack_matrix *msg_time;
};

Here, next is a pointer that can be used by the upcall in any way it would like, and is useful for building lists of event structures. Type specifies the type of the event, which is currently one of:

enum xxx_event_type {
        XXX_JOIN_REQUEST,               XXX_JOIN_FAILED,
        XXX_JOIN_DENIED,                XXX_JOIN_READY,
        XXX_FLUSH,                      XXX_FLUSH_OK,
        XXX_VIEW,                       XXX_CAST,
        XXX_SEND,                       XXX_LEAVE,
        XXX_LOST_MESSAGE,               XXX_STABLE,
        XXX_PROBLEM,                    XXX_SYSTEM_ERROR,
        XXX_EXIT
};

Each event passed to an upcall is numbered, starting at 0, in the field event_number. Currently, the remaining fields only apply to group objects. nmembers specifies the number of members in the view of this endpoint. The members are ranked 0, ..., nmembers - 1. The rank of this endpoint in the view is my_rank. The rank of the member causing this event is in origin. Flags is a bitmap of boolean values. For example, the XXX_BYTE_SWAPPED bit specifies that origin machine of the event uses a different byte order from this machine. The other fields are defined on a per upcall type basis. Details about each event will be described later.

Many upcalls have a message associated with them. This message is passed up as a MUTS message in the msg field. In case the message was not sent by the same endpoint (i.e., origin != my_rank), the message should be separately acknowledged using the xxx_ack() downcall. Only this way the layer below can be sure that this endpoint has safely received the message. Often xxx_ack() is invoked immediately, but some applications may prefer to take other actions (like logging the message) first. The signature of xxx_ack() is as follows:

error_t xxx_ack(
        xxx_id xxx,
        xxx_message_t *msg
);

(Currently no events containing messages are ever delivered to endpoint objects, so the first argument is always an xxx_group_id.)

When an upcall is finished with an event, it has to invoke the routine xxx_done(), which is defined as follows:

error_t xxx_done(
        xxx_id xxx,
        struct xxx_event *ev
);

Here, xxx is the group or endpoint to which the upcall applied, and ev the pointer to the event descriptor that was passed to the upcall. xxx_done() invokes the appropriate layer function (frag_done() in our example), which is responsible for cleaning up the data structures that were allocated by the layer for the purpose of the event.

The last event ever passed to an endpoint or group upcall is the XXX_EXIT event. It signals that the corresponding xxx_endpoint_id or xxx_group_id handles should no longer be used. In fact, when the corresponding xxx_done() is invoked on the event, the handle becomes invalid.

For example, the FRAG layer uses an upcall thread like this one:

static void frag_event(void *env, void *param){
        frag_group_id group = env;
        struct xxx_event *ev = param;
        int cleanup = 0;

        e_enter(frag_event);

        /* Synchronize the event, and get the group lock. */
        ec_sync(&group->evcnt, ev->event_number);
        l_lock(&group->lock);

        /* Acknowledge the message, if any. */
        if (ev->msg != 0 && ev->origin != ev->my_rank)
                (void) xxx_ack(group, ev->msg);

        switch (ev->type) {
        case XXX_EXIT:
                cleanup = 1;
                /* FALL THROUGH */
        default:
                frag_deliver(group, ev);
        }

        l_unlock(&group->lock);
        ec_inc(&group->evcnt);

        /* Release the event structure. */
        (void) xxx_done(group->below, ev);

        if (cleanup)
                frag_cleanup(group);

        e_exit();
}

(NOTE: some layers add information to events by defining a new event structure that starts out with an struct xxx_event field, in the same way endpoint and group objects are built. When the corresponding xxx_done() routine of the layer is invoked, it can access this information.)

The XXX_VIEW upcall

Typically the first event to be delivered to a group object is the XXX_VIEW upcall. In the event structure, view is an array of size nmembers containing the addresses of the members that are in the view of the upcall receiver. (In other upcalls, this pointer is usually set to NULL to minimize overhead.) The msg field points to a message which is associated with each view. The contents of the message is layer and application dependent, and will become clear when the xxx_view() downcall is described.

The view list may be stolen from the event structure by clearing the field in the structure. For example, XXX_VIEW upcalls often execute the following code:

   if (group->view != 0)
                mc_free(group->mc, group->view);
        group->view = ev->view;
        ev->view = 0;

The view array, as everything else in Horus, was allocated through memory channels (using mc_alloc()), and should be freed using mc_free().

Right after invoking xxx_join(), the initial view consists of only the endpoint. Depending on the layer, this view may or may not be delivered back to the endpoint using an XXX_VIEW upcall. Every view is guaranteed to contain the address of the endpoint, and its index in the view array is indicated by my_rank.

All events other than the XXX_VIEW and XXX_EXIT events are delivered to the current view. The current view is the last view that was delivered, or the initial view if no XXX_VIEW upcall has been delivered yet.

Flushing a view

Downcalls, such as sending messages, are also done to the current view. However, since upcalls are delivered asynchronously, it may not always be clear what the current view is (after all, a XXX_VIEW upcall may be scheduled, but not have been delivered). Therefore neighboring layers go through a process called flushing. Implementing flushing is optional, and some layers do not. As a result, these layers do not guarantee that messages are delivered to the view in which they were sent.

For the remainder, we will assume that layers do wish to synchronize view changes and provide a flush interface. We will talk about the lower layer that wishes to install a new view, and the upper layer that receives the view. (Typically, the lower layer installs a view at the request of the upper layer, but this will be discussed later.)

The lower layer first delivers an XXX_FLUSH upcall to the upper layer, notifying that the lower layer wishes to change the view. This upcall has a message associated with it just like XXX_VIEW upcalls. The upper layer acknowledges the upcall with the xxx_flush_ok() downcall. Until this is done, the downcalls issued by the upper layer are to the old view. The signature of xxx_flush_ok() is as follows:

error_t xxx_flush_ok(
        xxx_group_id group
);

This will invoke the corresponding entry in the lower layer, and brings the endpoint in a state where the lower layer typically disallows sending messages, or sometimes buffers newly sent messages for transmission to the next view. This state is terminated when the next XXX_VIEW upcall arrives. Before this happens, it is possible to receive other upcalls that pertain to the old view.

The view is usually flushed at the request of the upper layer (but not necessarily). When the upper layer wishes to flush the view (in response to join requests or failures), it can do so by invoking xxx_flush():

error_t xxx_flush(
        xxx_group_id group,
        xxx_entity_t remove_members,
        int n,
        unsigned int options,
        xxx_message_t *msg
);

As with XXX_FLUSH upcalls, this brings the endpoint in a state where it is usually not allowed to send messages, or where it is not clear to which view the messages will be delivered (until the next XXX_VIEW event is delivered). In the downcall, it is possible to request a set of members to be removed from the next view, using the remove_members list of size n. It is also possible to specify a message that is delivered to an eventual XXX_FLUSH upcall generated by this downcall. This upcall is typically (but not necessarily) delivered to all other members of the old view, but not to the member that issued the xxx_flush() downcall. We call this member the coordinator of the flush.

Eventually, each xxx_flush() downcall results in exactly one XXX_FLUSH_OK upcall to the coordinator, indicating the termination of the flush process. As described later, layers typically only deliver an XXX_FLUSH_OK upcall after having received a response for each member that is not being removed from the view. Therefore, to terminate a flush, it may be necessary to invoke the xxx_flush() downcall again. The corresponding XXX_FLUSH_OK upcalls are delivered in the order of the corresponding xxx_flush() downcalls.

Layers need not support the xxx_flush() downcall. If invoked, the downcall will return a MUTS error indication. Installing new views

Often, a new view installation is requested by the upper layer to the lower layer. This is done by the xxx_view() downcall:

error_t xxx_view(
        xxx_group_id group,
        xxx_entity_t add_members,
        int n,
        unsigned int options,
        xxx_message_t *msg
);

Note that this function is almost like the xxx_flush() counterpart. Instead of specifying a list of members to remove from the view, a list of members to add to the view is specified. It is allowed to include in this list addresses of members that are already in the view. The msg argument is passed to the corresponding XXX_VIEW upcall.

xxx_view() may only be invoked by flushed members. An endpoint becomes flushed after it invoked xxx_flush() and this operation completed. In many cases, endpoints start out initially in a flushed state.

State Machine

It should be clear by now that members effectively run a state machine, the transitions triggered by various upcalls and downcalls. Not all operations are possible in all states. It is therefore useful to make this state machine explicit. A somewhat simplified representation is depicted below:

Here, the large arrow labeled xxx_flush() indicates that this transition is possible from any state. The NORMAL state is the normal running state in which message send primitives are allowed. After an XXX_FLUSH event arrives, an xxx_flush_ok() downcall brings the endpoint in the state WAITING_FOR_VIEW where it is waiting for a view upcall. (Technically, there should be a FLUSH_RECEIVED state in between the NORMAL and WAITING_FOR_VIEW states, but for simplicity this has been omitted.) An XXX_VIEW upcall brings the endpoint back in NORMAL state.

In any state the member can invoke xxx_flush(), bringing it in FLUSHING state (if the layer supports xxx_flush(). More than one xxx_flush() may be issued, keeping the state at FLUSHING. The last XXX_FLUSH_OK upcall brings endpoint in the FLUSHED state. From there, an xxx_view() downcall brings the endpoint back in NORMAL state. Alternatively, the endpoint can invoke xxx_merge() to go into the WAITING_FOR_VIEW state. This downcall will be described below.

Merging Views

At layers where group membership is an issue, it is undesirable to have multiple views of a group. Yet, due to network partitions, different endpoints may have different views. Also, when a new endpoint tries to join a group, it initially has only itself in its view. Two members that can communicate, but with different views of a group, will want to merge their views. In Horus, this can be done by bringing one member (the coordinator) in FLUSHED state, and all the other members involved in the WAITING_FOR_VIEW state. The coordinator now specifies the members that are not in its view already in an xxx_view() downcall. At a layer that implements group membership, this will result in an XXX_VIEW upcall at all members (including the coordinator), completing the merge.

Bringing all members but one in the WAITING_FOR_VIEW state involves one more step. Consider two views, V_1 and V_2. Only a member of V_1 can flush V_1. This member is the coordinator C_1 of V_1. Similarly, only the coordinator C_2 can flush V_2. After both flushes complete, all members but two are in WAITING_FOR_VIEW state. One of the coordinators has to decide to become the subordinate endpoint, while the other will install the view. The downcall xxx_merge() brings a coordinator in WAITING_FOR_VIEW state, and, as a side-effect, can send a message to the other coordinator signaling that it (and the other members of its view) are ready to receive a view. The signature of xxx_merge() is the following:

error_t xxx_merge(
        xxx_group_id group,
        eid_t *contact,
        unsigned int options,
        xxx_message_t *msg
);

Optionally, contact may be null, in which case the msg may be omitted. Otherwise the message is forwarded to the given contact endpoint, where it results in an XXX_JOIN_REQUEST upcall. On the message, the view of the invoker will be piggybacked. The event structure has a field joiners containing the view of the member requesting the join, and a field njoiners containing the size of this view. As with views, the joiners list may be stolen by overwriting the field with a null pointer. If the contact does not respond within a certain time, an XXX_JOIN_FAILED event may result at the upcall of the invoker of xxx_merge(). This brings its state back to FLUSHED.

The JOIN_REQUEST event also contains a join_granted field. If non-zero, the upcall is supposed to either invoke xxx_join_granted() or xxx_join_denied(), depending on whether it accepts the request or not. Both are invoked with the join_ident field of the event structure. If the join_granted field is non-zero, neither of these routines should be invoked. The signatures are as follows:

error_t xxx_join_granted(
        xxx_group_id group,
        xxx_join_id join_ident,
        void *grant_ident
);

error_t xxx_join_denied(
        xxx_group_id group,
        xxx_join_id join_ident,
        unsigned int options,
        xxx_message_t *msg
);

xxx_join_denied() should send a notification back to the invoker of xxx_join() explaining the refusal. This notification is delivered as a JOIN_DENIED event.

When a contact (and a message) is specified to xxx_join(), it will automatically invoke xxx_merge(). Thus xxx_join() without specifying a contact brings the endpoint in FLUSHED state, whereas the state is initialized to WAITING_FOR_VIEW if a contact is specified.

Obtaining the contact address

If an endpoint wants to be able to serve as a contact for the merge operation, it will have to obtain and publish its address. To obtain its contact address, the endpoint invokes xxx_contact():

error_t xxx_contact(
        xxx_group_id group,
        OUT eid_t *contact
);

Note, this routine does not return the address of the contact, but the address of the local endpoint for use with xxx_merge() or xxx_join(). Often the address is the local endpoint address. However, there are other cases. For example, in secure operation, the contact address contains different information.

Publishing the contact address is up to the application. For example, it can do so via a name or directory service, or it can write the address on a shared file.

Usually, only one member in the view serves as contact. When the contact receives a join request, it buffers the request for later while flushing the group. Other join requests that arrive while flushing are also buffered, but no new flushes need be started. When the XXX_FLUSH_OK event arrives, the contact invokes xxx_view() with its current queue of join requests and truncates the queue.

Typically, the contact is the member that is ranked 0. When an endpoint receives an XXX_VIEW event with my_rank = 0, it invokes xxx_contact() and publishes the result somewhere. It only needs to do so if it did not previously have rank 0. This same contact should also be the coordinator for the flush protocol in response to failures, so that no two members attempt the flush the view concurrently.

Secure Operation

The Horus interface has provisions for access control, allowing group members to deny access to endpoints that wish to join (or merge in their views). This is done on a per-view basis. Currently, the only way to join a secure view is through xxx_join() (not xxx_merge()). When joining the group, the endpoint specifies the XXX_SECURE option, a key, and a contact of the view it wishes the join into.

The JOIN_REQUEST upcall is invoked with an event structure containing a zero join_granted field, meaning that xxx_join_granted() or xxx_join_denied() has to be invoked. The joiners list is currently guaranteed to be of size one, and to contain an authenticated endpoint. The xxx_join_granted() downcall eventually results in an XXX_JOIN_READY event. Before this event occurs, the joiner should not be included in a new view. However, the old view can be flushed while waiting for this event. During this time, the security code runs a protocol with the joiner to transfer some necessary state to the joiner allowing it to receive the next view.

In addition to access control, XXX_SECURE option results in views where each packet between members is cryptographically signed and checked on receipt. Also, if the XXX_ENCRYPT option is specified in message send operations, the data is encrypted during transmission.

Dealing with Failures

Many fault-tolerant algorithms need accurate information about failures. Unfortunately, due to a theoretical result, it is often not possible to distinguish crashed processes from slow processes. Since Horus cannot solve this as well as possible, it allows for the application to decide which problems constitute failures.

When layers detect problems, they deliver an XXX_PROBLEM event. Such problems include broken connections, crashed processes, and timeouts. The problem event structure contains a list of member addresses, problem_list of size problem_size. As with views, this list may be stolen by overwriting the problem_list field with a null pointer.

Based on the algorithm the application is trying to implement, the application decides which of these members can be removed from the membership. The members can be removed using xxx_flush().

EXAMPLE HERE OF PROBLEMS AND JOIN REQUEST WITH FLUSH.

Sending Messages

A member of a view may send a message to all members using xxx_cast():

error_t xxx_cast(
        xxx_group_id group,
        unsigned int options,
        xxx_message_t *msg
);

This message is delivered as an XXX_CAST upcall. Different layers will provide different semantics and ordering. Not all layers deliver locally. Obviously, a layer that provides total ordering has to deliver locally, but a layer that provides only FIFO or causal ordering does not need to do so. Some of these layers recognize the XXX_LOCAL option, forcing a local delivery.

The semantics of xxx_cast() are typically only provides while in NORMAL state. Most layers guarantee that the message is then delivered in the view in which it was sent. However, while flushing, or waiting for a view to be installed, layers are allowed to deny message send operations, or delay delivery until a next view.

There is also an interface to send to a subset of the view membership:

error_t xxx_send(
        xxx_group_id group,
        int rank_list[],
        int n,
        unsigned int options,
        xxx_message_t *msg
);

With this interface, a list of member ranks is specified (or size n), to which the message should be delivered. The semantics of this interface is again up to the layer. Many layers only accepts lists of size one, while other layers do not implement this interface at all (and return an error). Layers may also implement special options where the rank_list is not a list of ranks, but instead a list of int-sized words encoding the destination set of the view in some other way.

Some layers can lose messages under certain circumstances. For example, in a layer that implements FIFO transmission purely based on negative acknowledgements, a slow receiver may not be able to retrieve an old message. Under such conditions, the layer may deliver an XXX_LOST_MESSAGE event to notify the application, without terminating the membership. Applications that cannot deal with this can treat this event as a XXX_PROBLEM event.

Matrices

The Horus interface includes functions to create, update, and inspect two-dimensional matrices of integers. Several of these matrices are used to maintain message stability information and causal time information. Other uses are feasible. A matrix contains a reference counter and a lock to allow for sharing between layers.

The matrix is of type struct xxx_ack_matrix, and can be allocated using xxx_matrix_alloc():

error_t xxx_matrix_alloc(
        struct mem_chan *mem,
        int nrows, int ncols,
        OUT struct xxx_ack_matrix **pmat
);

To retrieve information out of the matrix, you specify a row x and a column y, and the number of entries n you wish to extract. Per default, the information is extracted a row at a time, but by specifying the XXX_COLUMN this can be overridden. Overflowing from one row into the next is handled naturally (same for columns). Rows and columns both are numbered from zero. The result is returned in an array of 32-bit unsigned integers:

error_t xxx_get_acks(
        struct xxx_ack_matrix *mat,
        int x, int y, int n,
        OUT uint32_t *out
);

To release the matrix, use xxx_matrix_release():

error_t xxx_matrix_release(
        struct xxx_ack_matrix *mat
);

The interface is otherwise very incomplete, and often requires an understanding of the contents of struct xxx_ack_matrix. Luckily, most applications only need the interfaces specified above.

Acknowledgements and Stability

Optionally, layers can provide information about which endpoints acknowledged (using xxx_ack()) which messages. For this purpose, every message. This allows processes to inspect the ``stability'' of messages they sent or received.

For xxx_cast messages, the information is maintained in the mcast_acks matrix in the event upcall. This matrix is of size nmembers X nmembers. Entry (x, y) contains the number of messages from x, acknowledged by y. On the diagonal, entry (x, x) contains the number of messages that were sent by x. The minimum of a column y is then the number of messages that are fully acknowledged.

For xxx_send messages, less information is provided. The matrix pt2pt_acks is a vector of size 1 X nmembers. Entry i contains the number of acknowledged messages sent using xxx_send(). It depends on the particular layer whether these messages were acknowledged by all members in the view, or whether only a partial stability is indicated.

The fields current_time and msg_time are matrices of size nmembers X nmembers. There uses depends on the layer that provides them (they usually keep information about causal time). Any of these matrix fields may be null, in which case the layer does not provide the corresponding information.

Sometimes, the application (or a higher level layer) has more information about message stability. Using the xxx_stable() downcall it can pass the information to the layer below:

error_t xxx_stable(
        xxx_group_id group,
        unsigned int options,
        struct xxx_ack_matrix *am
);

Currently there has not been much use of xxx_stable(), and the exact semantics of the call are not thoroughly defined.

Focussing on a layer

Layers and applications sometimes need to be able to interact with a particular lower-level layer. For example, an application that uses the TOTAL layer, which implements total order through a token, may want to explicitly manipulate the token, or may want to change the token passing policy used by the layer. For this, the TOTAL layer provides functions that operate on the endpoint and group object handles that it returns to the layer directly on top of the TOTAL layer. Because the use of Horus is not limited to object-oriented languages (and, in fact, most layers are written in C rather than C++), we cannot assume that these function are inherited by the layers that are stacked on top of the TOTAL layer.

Therefore, Horus provides a function that can return endpoint and group object handles anywhere in the protocol stack. This function, called xxx_focus(), can search through the stack recursively (like most other Horus layer functions) until it finds the relevant layer. Its signature is as follows:

error_t xxx_focus(
        xxx_id in,
        char *ident,
        OUT xxx_id *out,
);

The function takes a top-level group or endpoint handle, and returns a lower-level handle based on the specified ident. Each layer is supposed to implement this function, recursively invoking it unless it recognizes the given ident. This is perhaps best clarified by an example. Below is the implementation of the focus function of the TOTAL layer:

error_t to_focus(
        to_group_id group,
        char *ident,
        xxx_group_id *pg
) {
        e_enter(to_focus);

        /* Intercept if a TOTAL or TOKEN focus is specified. */
        if (!strcmp(ident, "TOTAL") || !strcmp(ident, "TOKEN")) {
                *pg = (xxx_group_id) group;
                return e_ok();
        }

        /* Otherwise pass it on. */
        return e_pass(xxx_focus(group->below, ident, pg));
}

When invoked with the string TOTAL or the string TOKEN, it returns the group pointer. Otherwise, it recursively invokes xxx_focus() to find another layer that may recognize the given ident. When the bottom of the protocol stack is hit, xxx_focus() returns an error.

Note that the example is somewhat simplified. It really should check that a group object was passed to the focus function (by testing the type field of the xxx_ident structure). In the case an endpoint object is specified, similar code should be executed for endpoints instead.

Dumping layer information

For debugging, it is useful to see the state of the various protocols in the stack. For this, Horus provides the xxx_dump() function. Like xxx_focus(), it can travel recursively down a stack of endpoint or group objects. Each layer prints the information that it wants to supply. The prototype of xxx_dump() is:

error_t xxx_dump(
        xxx_id xxx,
        unsigned int options,
        int cmd
);

Only if the XXX_DUMP_RECUR option is specified, the xxx_dump() should be invoked recursively. The cmd argument is currently unused, but may later specify which classes of information should be printed. Here is, for example, the implementation of frag_dump():

error_t frag_dump(xxx_id xxx, unsigned int options, int cmd){
        frag_endpoint_id ep;
        frag_group_id grp;

        e_enter(frag_dump);

        switch (xxx->type) {
        case XXX_ENDPOINT:
                ep = (frag_endpoint_id) xxx;
                pprintf("FRAGMENT endpoint\n");
                pprintf("\tfrag_size = %d\n", ep->frag_size);
                if (options & XXX_DUMP_RECUR)
                return e_pass(xxx_dump(ep->below, options, cmd));
                break;
        case XXX_GROUP:
                grp = (frag_group_id) xxx;
                pprintf("FRAGMENT group\n");
                pprintf("\tmy_rank = %d\n", grp->my_rank);
                pprintf("\tnmembers = %d\n", grp->nmembers);
                if (options & XXX_DUMP_RECUR)
                        return e_pass(xxx_dump(grp->below, options, cmd));
        }
        return e_ok();
}

System Errors

As one of the target uses for Horus is fault-tolerant applications, the use of panic() (or sys_panic() as available in MUTS is avoided. Rather, Horus defines a XXX_SYSTEM_ERROR upcall which allows applications to save state or take other useful actions before restarting to recover from an internal error. Alternatively, the MUTS sys_on_exit() functionality may be used, which allows function to be specified that are invoked before the process terminates.

Leaving the Group

Leaving the group behaves much like a crash. An endpoint that wants to leave invokes:

error_t xxx_leave(
        xxx_group_id group,
        unsigned int options,
        xxx_message_t *msg
);

This is often much like xxx_cast(), but instead results in an XXX_LEAVE upcall (which is usually delivered with the same semantics as XXX_CAST. However, some layers do not send a message in response to an xxx_leave() at all.

xxx_leave() results in an XXX_EXIT upcall. After xxx_done() is invoked, the group pointer becomes invalid, allowing the layer to release all its resources. Before this upcall, many layers will not allow the same endpoint, or even a different endpoint within the same process, to join the same group.

EXAMPLE HERE OF GROUP CLEAN UP.

Destroying an endpoint

To release the data structures associated with an endpoint object, use xxx_destroy():

error_t xxx_destroy(
        xxx_endpoint_id endpoint,
        unsigned int options,
        xxx_message_t *msg
);

Layers may require that the endpoint may not be joined to any group. In other layers the endpoint is automatically removed from the membership of groups. Eventually, xxx_destroy() has to lead to an XXX_EXIT event to the endpoint upcall, which is guaranteed to be the last event passed to the upcall. Currently, it is up to the individual layers how the msg argument is used.


This document is part of the online
Horus Documentation, under Horus Utilities.