A Socket Interface to Horus

This document describes the user level interface to the basic Horus functionality. This interface is highly compatible with the UNIX system call interface for familiarity and ease of use. Horus adds two important extensions to this interface. First, it adds a new address family, AF_HORUS, that can be used to create group sockets. By binding a group address to a group socket a process may join a group. Second, it add multi-threading to the UNIX interface, to simplify the development of truly distributed applications. Each thread may individually execute blocking UNIX system calls without interfering with the other threads. This works not only for reading from group sockets, but also for, say, reading from standard input or TCP sockets. In particular, threads can perform Remote Procedure Calls without blocking the process.

The socket interface may be used together with rvr_threads, if a multi-threading environment is required.

SOURCES

include/horus/socket.h
src/socket/socket_intf.c

INCLUDE

#include "horus/horus.h"

INTERFACE

The Horus socket interface is largely compatible with the BSD socket interface. We refer to the UNIX manual pages for socket(), bind(), connect(), read(), recv(), recvfrom(), write(), send(), sendto(), getsockname(), getpeername(), ioctl(), and close(). accept() and listen() are not supported currently on Horus sockets.

A Horus socket has to be created in the AF_HORUS domain (rather than AF_INET or AF_UNIX). The interfaces should use addresses of type struct sockaddr_horus rather than struct sockaddr_inet. To join a group, you have to bind or connect a Horus socket to a Horus group address. To obtain such an address, the following API has been added to the socket interface:

int newgroupaddr(
	struct sockaddr_horus *addr,
	int *len
);
Obtain a Horus group address. It is returned in the buffer pointed to by addr. *len should contain the size of this buffer. In *len, the resulting size is returned. The corresponding group may subsequently be joined using bind(). If non-existent, a new group will be created. Messages may be multicast to the group using the normal UNIX write() or send(), and received using read() or recv. A group may be left by closing the socket.

Group addresses can be communicated through a shared file system, a name service (such as the Network Information Service), or by sending them over a previously set-up communication channel. Given a bound socket, you can retrieve the group address using getpeername(),

Each member in the group has a unique local address, called the endpoint address of that member. Such an address is also of type struct sockaddr_horus, and may be obtained with getsockname(). Both recvfrom() and recvmsg() return for each message the endpoint address of the sender of that message (NOTE: sendmsg() and recvmsg() are not implemented yet). Using sendto() and sendmsg point-to-point message may be sent within a group by specifying a Horus endpoint address. This is useful for replying to messages.

The complete membership of a group can be obtained using a SIOCGGROUPINFO ioctl on the following structure:

	struct horus_group_info {
		int			hgi_nmembers,
		struct sockaddr_horus	*hgi_members,
		...
	};
Initially, nmembers contains the size of the members array. After completion, it contains the actual number of members in the group, As many as possible will have been copied into the members array. For process interested in an exact membership they need to see messages and data in a well-ordered sequence. To synchronize this, you need to set the per-view flags. The flags are encoded in an unsigned integer, and can be set by a SIOCSPVFLAGS ioctl:
0
Membership changes are not synchronized with communication at all.
PV_RECV
When the membership changes, subsequent reads will return -1 with errno set to ECONNRESET until the new membership is retrieved (using the SIOCGGROUPINFO ioctl).
PV_SEND
When the membership changes, subsequent writes will return -1 with errno set to ECONNRESET until the new membership is retrieved. This way processes know in which membership they are sending messages. Messages are always delivered to the same membership.

EXAMPLE

A good way to communicate group addresses is through a shared (NFS) file. For example, it could be done using the following routine:
/* Communicate a group address through the given file.
 */
void get_groupaddr(char *file, struct sockaddr_horus *addr){
     int fd, size;

     /* See if there is a group address in the given file.
      * If not, create one.  If so, read it.
      */
     if ((fd = creat(file, 0444)) >= 0) {
          size = sizeof(*addr);
          if (newgroupaddr(addr, &size) < 0) {
               perror("newgroupaddr");
               exit(1);
          }
          write(fd, addr, size);
     }
     else if ((fd = open(file, 0)) < 0) {
          perror(file);
          exit(1);
     }
     else if (read(fd, addr, sizeof(*addr)) != sizeof(*addr)) {
          fprintf(stderr, "bad group address in %s\n", file);
          exit(1);
     }
     close(fd);
}
Whenever the membership has changed on some socket, it can be retrieved using the following function.
/* This is the membership data structure.
 */
static struct sockaddr_horus mbr_addrs[MAX_MEMBERS];
static struct horus_group_info hgi = { MAX_MEMBERS, mbr_addrs };

/* A new view is ready on the given socket.  Get the membership.
 */
static void get_mbrship(int s){
     int i, size;
     struct sessvr_member *mbr;

     e_enter(get_mbrship);

     /* Get the membership list.
      */
     hgi.hgi_nmembers = MAX_MEMBERS;
     if (ioctl(s, SIOCGGROUPINFO, &hgi) < 0) {
          perror("ioctl SIOCGGROUPINFO");
          exit(1);
     }
     if (hgi.hgi_nmembers < 1) {
          printf("join_group: no members (%d)\n", hgi.hgi_nmembers);
          abort();
     }

     /* Get my current address.
      */
     size = sizeof(my_addr);
     if (getsockname(s, &my_addr, &size) < 0 || size != sizeof(my_addr))
          sys_panic("get_mbrship: getsockname failed");
}
The following function joins a group and gets the initial membership.
/* Join a group and return a socket descriptor for it.
 */
int join_group(char *name){
     unsigned int flags = PV_RECV;
     struct sockaddr_horus addr;
     int s;

     /* Create a group socket.
      */
     if ((s = socket(AF_HORUS, SOCK_DGRAM, 0)) < 0) {
          perror("socket");
          exit(1);
     }

     /* Prepare the socket to give membership updates.
      */
     ioctl(s, SIOCSPVFLAGS, &flags);

     /* Get the group address.
      */
     get_groupaddr(name, &addr);

     /* Join the group.
      */
     if (bind(s, &addr, sizeof(addr)) < 0) {
          perror("bind");
          exit(1);
     }

     /* Await the first view.  This read is supposed to fail because
      * the first event will be a membership change.  It will block
      * though until that membership is available.
      */
     if (read(s, "", 1) >= 0 || errno != ECONNRESET) {
          printf("join_group: read *should have* failed\n");
          abort();
     }

     /* Get the membership.
      */
     get_mbrship(s);

     return s;
}
The main function of a program would look something like this:
int main(int argc, char *argv[]) {
     static char buf[8 * 1024];
     int s, n, fromlen;
     struct sockaddr_horus from;

     /* Join the group "my_group".
      */
     s = join_group("my_group");

     /* Await messages.
      */
     for (;;) {
          fromlen = sizeof(from);
          n = recvfrom(s, buf, sizeof(buf), 0, &from, &fromlen);
          if (n > 0)
               handle_message(s, buf, n, &from, fromlen);
          if (n < 0 && errno == ECONNRESET) {
               get_mbrship(s);
               continue;
          }
          if (n <= 0)
               return 0;
     }
}
Messages may be multicast using write() or send(). The function handle_message may return a point-to-point reply message to from using sendto().