This document describes the user level interface to the basic Horus functionality. This interface is highly compatible with the UNIX system call interface for familiarity and ease of use. Horus adds two important extensions to this interface. First, it adds a new address family, AF_HORUS, that can be used to create group sockets. By binding a group address to a group socket a process may join a group. Second, it add multi-threading to the UNIX interface, to simplify the development of truly distributed applications. Each thread may individually execute blocking UNIX system calls without interfering with the other threads. This works not only for reading from group sockets, but also for, say, reading from standard input or TCP sockets. In particular, threads can perform Remote Procedure Calls without blocking the process.
The socket interface may be used together with rvr_threads, if a multi-threading environment is required.
include/horus/socket.h src/socket/socket_intf.c
#include "horus/horus.h"
socket(), bind(),
connect(), read(), recv(), recvfrom(), write(), send(), sendto(),
getsockname(), getpeername(), ioctl(),
and close()
.
accept()
and listen()
are not supported
currently on Horus sockets.
A Horus socket has to be created in the AF_HORUS domain (rather than
AF_INET or AF_UNIX). The interfaces should use addresses of type
struct sockaddr_horus
rather than
struct sockaddr_inet
. To join a group, you have to bind
or connect a Horus socket to a Horus group address. To obtain such an
address, the following API has been added to the socket interface:
int newgroupaddr(
struct sockaddr_horus *addr,
int *len
);
Obtain a Horus group address. It is returned in the buffer
pointed to by addr. *len should contain the size of this
buffer. In *len, the resulting size is returned.
The corresponding group may subsequently be joined using
bind()
. If non-existent, a new group will be created.
Messages may be multicast to the group using the normal UNIX
write()
or send()
, and received using
read()
or recv
. A group may be left by
closing the socket.
Group addresses can be communicated through a shared file system, a
name service (such as the Network Information Service), or by sending
them over a previously set-up communication channel. Given a bound
socket, you can retrieve the group address using getpeername()
,
Each member in the group has a unique local address, called the
endpoint address of that member. Such an address is also of type
struct sockaddr_horus
, and may be obtained with
getsockname()
. Both recvfrom()
and
recvmsg()
return for each message the endpoint
address of the sender of that message (NOTE: sendmsg()
and
recvmsg()
are not implemented yet). Using
sendto()
and sendmsg
point-to-point message may
be sent within a group by specifying a Horus endpoint address. This is
useful for replying to messages.
The complete membership of a group can be obtained using a
SIOCGGROUPINFO ioctl
on the following structure:
struct horus_group_info {
int hgi_nmembers,
struct sockaddr_horus *hgi_members,
...
};
Initially, nmembers
contains the size of the
members
array. After completion, it contains the
actual number of members in the group, As many as possible
will have been copied into the members array.
For process interested in an exact membership they need to see messages
and data in a well-ordered sequence. To synchronize this, you need to
set the per-view flags. The flags are encoded in an unsigned
integer, and can be set by a SIOCSPVFLAGS ioctl
:
errno
set to ECONNRESET
until the new
membership is retrieved (using the SIOCGGROUPINFO ioctl
).
errno
set to ECONNRESET
until the new
membership is retrieved. This way processes know in which membership
they are sending messages. Messages are always delivered to the same
membership.
/* Communicate a group address through the given file. */ void get_groupaddr(char *file, struct sockaddr_horus *addr){ int fd, size; /* See if there is a group address in the given file. * If not, create one. If so, read it. */ if ((fd = creat(file, 0444)) >= 0) { size = sizeof(*addr); if (newgroupaddr(addr, &size) < 0) { perror("newgroupaddr"); exit(1); } write(fd, addr, size); } else if ((fd = open(file, 0)) < 0) { perror(file); exit(1); } else if (read(fd, addr, sizeof(*addr)) != sizeof(*addr)) { fprintf(stderr, "bad group address in %s\n", file); exit(1); } close(fd); }Whenever the membership has changed on some socket, it can be retrieved using the following function.
/* This is the membership data structure. */ static struct sockaddr_horus mbr_addrs[MAX_MEMBERS]; static struct horus_group_info hgi = { MAX_MEMBERS, mbr_addrs }; /* A new view is ready on the given socket. Get the membership. */ static void get_mbrship(int s){ int i, size; struct sessvr_member *mbr; e_enter(get_mbrship); /* Get the membership list. */ hgi.hgi_nmembers = MAX_MEMBERS; if (ioctl(s, SIOCGGROUPINFO, &hgi) < 0) { perror("ioctl SIOCGGROUPINFO"); exit(1); } if (hgi.hgi_nmembers < 1) { printf("join_group: no members (%d)\n", hgi.hgi_nmembers); abort(); } /* Get my current address. */ size = sizeof(my_addr); if (getsockname(s, &my_addr, &size) < 0 || size != sizeof(my_addr)) sys_panic("get_mbrship: getsockname failed"); }The following function joins a group and gets the initial membership.
/* Join a group and return a socket descriptor for it. */ int join_group(char *name){ unsigned int flags = PV_RECV; struct sockaddr_horus addr; int s; /* Create a group socket. */ if ((s = socket(AF_HORUS, SOCK_DGRAM, 0)) < 0) { perror("socket"); exit(1); } /* Prepare the socket to give membership updates. */ ioctl(s, SIOCSPVFLAGS, &flags); /* Get the group address. */ get_groupaddr(name, &addr); /* Join the group. */ if (bind(s, &addr, sizeof(addr)) < 0) { perror("bind"); exit(1); } /* Await the first view. This read is supposed to fail because * the first event will be a membership change. It will block * though until that membership is available. */ if (read(s, "", 1) >= 0 || errno != ECONNRESET) { printf("join_group: read *should have* failed\n"); abort(); } /* Get the membership. */ get_mbrship(s); return s; }The main function of a program would look something like this:
int main(int argc, char *argv[]) { static char buf[8 * 1024]; int s, n, fromlen; struct sockaddr_horus from; /* Join the group "my_group". */ s = join_group("my_group"); /* Await messages. */ for (;;) { fromlen = sizeof(from); n = recvfrom(s, buf, sizeof(buf), 0, &from, &fromlen); if (n > 0) handle_message(s, buf, n, &from, fromlen); if (n < 0 && errno == ECONNRESET) { get_mbrship(s); continue; } if (n <= 0) return 0; } }Messages may be multicast using
write()
or send()
.
The function handle_message
may return a point-to-point
reply message to from
using sendto()
.