http://www.tenouk.com/Module41c.html
http://www.tldp.org/HOWTO/Multicast-HOWTO-2.html
For those looking into multicast, the examples at the above sites work.
For those programming network sockets in C on Linux, the loopback device lo on address "127.0.0.1" is a member of the multicast group "224.0.0.1" under IPV4.
Most people write very focussed simple programs to get the point across / key concepts.
In that case, it takes a while to do something serious based on snippets.
Here is a multi-cast multi-socket that reads from one port and writes to another port using 2 threads and main as the data passer. The only simplification is a simple custom mutex and microsleep for a proper mutex / event system. Code below:
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <stdio.h>
#include <unistd.h>
#include <pthread.h> //for threading , link with -lpthread
#include <string.h>
#define PORT1 100000
#define PORT2 100002
#define MAXMSG 256
#define MAX_STRING 256
#define MAX_DATA 256
//
// GLOBAL VARS
//
// state flags for simple finite state machine
//
// Ready = 1 ready for NEXT ACTION
// !Ready = 0 not ready for NEXT ACTION
//
// Read
// Compute
// Write
//
// data is ready for reading and computing,
// start with read in not ready state
static int data_ready =0;
// computations were done and now
// when the data is ready for writing out
static int compute_ready = 0;
// when there is data to go out, write to output
static int write_ready =0;
// the common addresses for this program
// the reading config
static struct in_addr local_address;
static struct sockaddr_in group_address;
// the writing config
static struct sockaddr_in writer_address;
static struct in_addr writer;
// the input data area
static char data_area[MAX_DATA];
static size_t data_size;
// the output data area
static char output_area[MAX_DATA];
static size_t output_size;
// Sets up Writer socket including
// Sets up group address
// Sets options
// sets local interface for outbound multicast
// then is ready to writeto interface
int set_writer_socket (int socket, char local[], char group[], int port)
{
/* Initialize the group sockaddr structure with a */
/* group address of "group" and port "port". */
// this is the sendto data for multicast packets
memset((char *) &writer_address, 0, sizeof(writer_address));
writer_address.sin_family = AF_INET;
writer_address.sin_addr.s_addr = inet_addr(group);
writer_address.sin_port = htons(port);
// the ip address specified
// set to reuse address so other sockets can join
// char type won't work - needed unsigned int as valid argument
// even though Open Group standard calls for a int
u_int yes = 1;
if ( setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (const char*)&yes, sizeof(yes)) == -1 )
{
perror("setsockopt SO_REUSEADDR");
}
#ifdef SO_REUSEPORT
if (setsockopt(socket, SOL_SOCKET, SO_REUSEPORT, (const char*)&yes, sizeof(yes)) < 0)
perror("setsockopt(SO_REUSEPORT) failed");
#endif
/* Set local interface for outbound multicast datagrams. */
/* The IP address specified must be associated with a local, */
/* multicast capable interface. */
writer.s_addr = inet_addr(local);
if(setsockopt(socket, IPPROTO_IP, IP_MULTICAST_IF, (char *)&writer, sizeof(writer)) < 0)
{
perror("Setting local interface error");
exit(1);
}
else
printf("Setting the local interface...%s\n", local);
}
// sets up the lower socket specifically for reading
// sets address to ANY
// binds to address
// joins multicast group on address
int set_reader_socket (int socket, char local[], char group[], int port)
{
struct ip_mreq group_request;
/* Enable SO_REUSEADDR to allow multiple instances of this */
/* application to receive copies of the multicast datagrams. */
int reuse = 1;
if(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse)) < 0)
{
perror("Setting SO_REUSEADDR error");
close(socket);
exit(1);
}
else
{
printf("Setting SO_REUSEADDR...OK.\n");
#ifdef SO_REUSEPORT
if (setsockopt(socket, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuse, sizeof(reuse)) < 0)
perror("setsockopt(SO_REUSEPORT) failed");
#endif
}
/* Bind to the proper port number with the IP address */
/* specified as INADDR_ANY. */
memset((char *) &group_address, 0, sizeof(group_address));
group_address.sin_family = AF_INET;
group_address.sin_port = htons(port);
group_address.sin_addr.s_addr = INADDR_ANY;
if(bind(socket, (struct sockaddr*)&group_address, sizeof(group_address)))
{
perror("Binding datagram socket error");
close(socket);
exit(1);
}
else
printf("Binding datagram socket...OK.\n");
/* Join the multicast group "group" on the local "local" */
/* interface. Note that this IP_ADD_MEMBERSHIP option must be */
/* called for each local interface over which the multicast */
/* datagrams are to be received. */
group_request.imr_multiaddr.s_addr = inet_addr(group);
group_request.imr_interface.s_addr = inet_addr(local);
if(setsockopt(socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *)&group_request, sizeof(group_request)) < 0)
{
perror("Adding multicast group error");
close(socket);
exit(1);
}
else
{
printf("Adding multicast group...OK.\n");
}
}
// Multicast socket setup options
// set sockaddr structure to a group address and port - sockaddr_in
// set sockopt for loopback packets or not
// set in_addr for local interface to outbound multicast datagrams
//
// make multicast and loopback
/*! \fn set_socket_options
*
* \var socket - a pre-made socket
* \var local_address - the local host address to bind to
*
* */
int set_socket_options (int socket, char local_address[], char group_address[], int port, struct sockaddr_in *addr, struct in_addr * local_addr )
{
struct ip_mreq mreq; // multicast request
struct sockaddr_in *local; // local address
local = addr;
// clear memory
memset ( (char *) &local, 0, sizeof( local ) );
memset ( (char *) &mreq, 0, sizeof( mreq ) );
// the ip address specified
// set to reuse address so other sockets can join
// char type won't work - needed unsigned int as valid argument
// even though Open Group standard calls for a int
u_int yes = 1;
if ( setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (const char*)&yes, sizeof(yes)) == -1 )
{
perror("setsockopt SO_REUSEADDR");
}
#ifdef SO_REUSEPORT
if (setsockopt(socket, SOL_SOCKET, SO_REUSEPORT, (const char*)&yes, sizeof(yes)) < 0)
perror("setsockopt(SO_REUSEPORT) failed");
#endif
// assign socket to local address
local->sin_family = AF_INET;
local->sin_port = htons (port);
local->sin_addr.s_addr = inet_addr(local_address);
// bind for single comms at the end of options
if (bind (socket, (struct sockaddr *) local, sizeof (local)) < 0)
{
perror ("bind");
exit (EXIT_FAILURE);
}
else
{
printf("+Bound to address : %s port : %d \n", local_address, port);
}
// change to join multicast
// Initialize group socket address structure
// with a group address and port
/* Join the multicast group "group_address" on the local "local_address" */
/* interface. Note that this IP_ADD_MEMBERSHIP option must be */
/* called for each local interface over which the multicast */
/* datagrams are to be received. */
// construct a multicast address structure
memset(&mreq, 0, sizeof(mreq));
mreq.imr_multiaddr.s_addr = inet_addr(group_address);
mreq.imr_interface.s_addr = inet_addr(local_address);
setsockopt (socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
// change the default Time To Live to 1 packet or more
u_char ttl = 1;
setsockopt(socket, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl));
// change to loopback packets for local
// it must be setup for a packet to be read by other group members
u_char loop = 1;
setsockopt(socket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop));
/* Set local interface for outbound multicast datagrams. */
/* The IP address specified must be associated with a local, */
/* multicast capable interface. */
if(setsockopt(socket, IPPROTO_IP, IP_MULTICAST_IF, local, sizeof(local)) < 0)
{
perror("Setting local interface error");
exit(1);
}
return 0;
}
int make_socket (uint16_t port)
{
int sock;
// Create the socket.
// default protocol is defined as 0
sock = socket (PF_INET, SOCK_DGRAM, 0);
if (sock < 0)
{
perror ("make_socket: socket");
exit (EXIT_FAILURE);
}
return sock;
}
void write_to_server (int filedes )
{
int nbytes;
int flags = 0;
fprintf (stderr, "Layer: writing: \"%s\" in %d bytes\n", output_area, output_size);
// sendto (int socket, const void *buffer, size_t size, int flags, struct sockaddr *addr, socklen_t length)
nbytes = sendto (filedes, output_area, output_size, flags, (struct sockaddr*)&writer_address, sizeof(writer_address));
if (nbytes < 0)
{
perror ("write_to_server: sendto ");
exit (EXIT_FAILURE);
}
else
{
printf("write_to_server: wrote %d bytes to interface \n", nbytes );
}
}
// read in data and update where from in later version
// no state logic internal just a function
// reads from bound port
int read_from_client (int filedes )
{
char buffer[MAXMSG];
int nbytes;
size_t sizebuf;
int flags = 0;
sizebuf = sizeof(buffer);
//recvfrom (int socket, void *buffer, size_t size, int flags, struct sockaddr *addr, socklen_t *length-ptr)
//
nbytes = recv (filedes, buffer, sizebuf, flags );
if (nbytes < 0)
{
/* Read error. */
perror ("read");
exit (EXIT_FAILURE);
}
else if (nbytes == 0)
/* End-of-file. */
return -1;
else
{
/* Data read. */
fprintf (stderr, "Layer: got message: \"%s\" in %d bytes\n", buffer, nbytes);
// copy to global buffer;
if ( nbytes <= data_size )
{
// copy complete message
strncpy( (char *) data_area, (const char *) buffer, nbytes);
return nbytes;
}
else
{
// copy complete message
strncpy( (char *) data_area, buffer, sizebuf);
return nbytes;
}
return 0;
}
// never gets here...
// never gets here...
}
//
// READ THREAD
//
void *ReadThread( void * in)
{
int sock = *(int*)in;
useconds_t usec;
usec = 50; // 50 usec
while (1)
{
if ( (( data_ready == 0 ) || ( write_ready == 1 ) ) )
{
read_from_client (sock );
// enable computation
data_ready = 1;
}
usleep( usec );
}
}
//
// WRITE THREAD
//
//
void *WriteThread( void * in)
{
int sock = *(int*)in;
useconds_t usec;
usec = 50; // 50 usec
// delay thread operating
usleep( usec );
while (1)
{
if ( ( ( write_ready == 1 ) ) && ( compute_ready == 0) )
{
write_to_server (sock);
// enable reading
write_ready = 0;
// disable computing
compute_ready = 0;
}
usleep( usec );
}
}
// MAIN
int main (int argc, char**argv)
{
// LOCAL VARS
int opt = 0;
int sock;
int sock2;
char server_string[MAX_STRING];
// set to default ports
int lower_port = 10000;
int upper_port = 10002;
struct sockaddr_in servaddr,cliaddr;
fd_set active_fd_set, read_fd_set;
int i;
struct sockaddr_in clientname;
size_t size;
char hostname[1024];
char arg_mcaddr_str[16] = "127.0.0.1"; // 239.255.0.0 is the local scope IPv4 local scope
char arg_multicast_group[16] = "224.0.0.1"; // this is the default group for loopback service
// two threads for use
pthread_t thread_read;
pthread_t thread_write;
// find out the host address
struct hostent* h;
// EXTERNS
//
// EXECUTION
//
// init the global data area
data_size = sizeof(MAX_DATA);
output_size = sizeof(MAX_DATA);
//
// find the localhost name
//
hostname[1023] = '\0';
gethostname(hostname, 1023);
printf("Hostname: %s\n", hostname);
h = gethostbyname(hostname);
printf("h_name: %s\n", h->h_name);
// read command line options
while ((opt = getopt(argc, argv, "a:u:l:h")) != -1)
{
switch(opt)
{
// address to connect to
case 'a':
/* address override */
strcpy(arg_mcaddr_str, optarg);
break;
case 'u':
upper_port = atoi(optarg);
break;
case 'l':
lower_port = atoi(optarg);
break;
// help case
case 'h':
;
break;
}
}// end while
// report options
printf("+connect to server =%s \n", arg_mcaddr_str);
printf("+lower port=%d \n", lower_port);
printf("+upper port=%d \n", upper_port);
//
// clear data areas
//
data_size = sizeof(data_area);
bzero( (void*) data_area, data_size );
output_size = sizeof(output_area);
bzero( (void *) output_area, output_size );
// Create the socket and set it up to accept connections.
sock = make_socket (lower_port);
sock2 = make_socket (upper_port);
// Setup the socket options
// set the reader bound to port
set_reader_socket ( sock, arg_mcaddr_str, arg_multicast_group, lower_port);
set_writer_socket ( sock2, arg_mcaddr_str, arg_multicast_group, upper_port);
//
// starting threads
//
// create read thread
if( pthread_create( &thread_read , NULL , ReadThread , (void*) &sock) < 0)
{
perror("ReadThread: could not create thread");
return 1;
}
if( pthread_create( &thread_write , NULL , WriteThread , (void*) &sock2) < 0)
{
perror("WriteThread: could not create thread");
return 1;
}
//
// MAIN EXECUTION
//
useconds_t usec;
// do computation when ready
while (1)
{
// check for new data, then copy to middle buffer
// computation internal
if ( data_ready == 1 )
{
compute_ready = 1;
data_ready = 0;
// clear output data
bzero( (void *) output_area, output_size );
// copy data to output if ready for it
strncpy ( output_area, data_area, output_size);
}
// compute - this time just copy over from data area to output area
// then trigger write action
if (compute_ready == 1)
{
compute_ready = 0;
write_ready = 1;
}
usec = 50; // 50 usec
usleep( usec );
}
}