TinyURL widget - shorten your URL's for free!

Enter a long URL to make tiny:

Tuesday, October 6, 2015

A good tutorial on multicast UDP packets

http://www.tenouk.com/Module41c.html

 http://www.tldp.org/HOWTO/Multicast-HOWTO-2.html

For those looking into multicast, the examples at the above sites work.

For those programming network sockets in C on Linux, the loopback device lo on address "127.0.0.1" is a member of the multicast group "224.0.0.1" under IPV4.

Most people write very focussed simple programs to get the point across / key concepts.

In that case, it takes a while to do something serious based on snippets.

Here is a multi-cast multi-socket that reads from one port and writes to another port using 2 threads and  main as the data passer. The only simplification is a simple custom mutex and microsleep for a proper mutex / event system. Code below:

#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <stdio.h>
#include <unistd.h>
#include <pthread.h> //for threading , link with -lpthread
#include <string.h>

#define PORT1    100000
#define PORT2     100002
#define MAXMSG  256
#define MAX_STRING  256
#define MAX_DATA     256



//
// GLOBAL VARS
//

// state flags for simple finite state machine
// 
// Ready = 1  ready for NEXT ACTION
// !Ready = 0 not ready for NEXT ACTION
//
// Read
// Compute 
// Write
//  
// data is ready for reading and computing,
// start with read in not ready state
static int data_ready =0;

// computations were done and now
// when the data is ready for writing out
static int compute_ready = 0;

// when there is data to go out, write to output
static int write_ready =0;

// the common addresses for this program
// the reading config
static struct in_addr      local_address;
static struct sockaddr_in     group_address;
// the writing config
static struct sockaddr_in writer_address;
static struct in_addr      writer;

// the input data area 
static char data_area[MAX_DATA];
static size_t data_size;

// the output data area
static char output_area[MAX_DATA];
static size_t output_size;

// Sets up Writer socket including
// Sets up group address
// Sets options
// sets local interface for outbound multicast
// then is ready to writeto interface
int set_writer_socket  (int socket, char local[], char group[], int port) 
{
    /* Initialize the group sockaddr structure with a */
    /* group address of "group" and port "port". */
    // this is the sendto data for multicast packets
    memset((char *) &writer_address, 0, sizeof(writer_address));
    writer_address.sin_family = AF_INET;
    writer_address.sin_addr.s_addr = inet_addr(group);
    writer_address.sin_port = htons(port);
    // the ip address specified 
    // set to reuse address so other sockets can join
    // char type won't work - needed unsigned int as valid argument
    // even though Open Group standard calls for a int
    u_int yes = 1;
    if ( setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (const char*)&yes, sizeof(yes)) == -1 )
    {
        perror("setsockopt SO_REUSEADDR");
    }   
#ifdef SO_REUSEPORT
    if (setsockopt(socket, SOL_SOCKET, SO_REUSEPORT, (const char*)&yes, sizeof(yes)) < 0) 
        perror("setsockopt(SO_REUSEPORT) failed");
#endif
    /* Set local interface for outbound multicast datagrams. */
    /* The IP address specified must be associated with a local, */
    /* multicast capable interface. */
    writer.s_addr = inet_addr(local);
    if(setsockopt(socket, IPPROTO_IP, IP_MULTICAST_IF, (char *)&writer, sizeof(writer)) < 0)
    {
      perror("Setting local interface error");
      exit(1);
    }
    else
      printf("Setting the local interface...%s\n", local);
    
    
}

// sets up the lower socket specifically for reading
// sets address to ANY
// binds to address
// joins multicast group on address
int set_reader_socket (int socket, char local[], char group[], int port) 
{
    struct ip_mreq group_request;
    /* Enable SO_REUSEADDR to allow multiple instances of this */
    /* application to receive copies of the multicast datagrams. */
    int reuse = 1;
    if(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse)) < 0)
    {
        perror("Setting SO_REUSEADDR error");
        close(socket);
        exit(1);
    }
    else
    {
        printf("Setting SO_REUSEADDR...OK.\n");
#ifdef SO_REUSEPORT
    if (setsockopt(socket, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuse, sizeof(reuse)) < 0) 
        perror("setsockopt(SO_REUSEPORT) failed");
#endif       
    }
    /* Bind to the proper port number with the IP address */
    /* specified as INADDR_ANY. */
    memset((char *) &group_address, 0, sizeof(group_address));
    group_address.sin_family = AF_INET;
    group_address.sin_port = htons(port);
    group_address.sin_addr.s_addr = INADDR_ANY;
    if(bind(socket, (struct sockaddr*)&group_address, sizeof(group_address)))
    {
        perror("Binding datagram socket error");
        close(socket);
        exit(1);
    }
    else
    printf("Binding datagram socket...OK.\n");
    /* Join the multicast group "group" on the local "local" */
    /* interface. Note that this IP_ADD_MEMBERSHIP option must be */
    /* called for each local interface over which the multicast */
    /* datagrams are to be received. */
    group_request.imr_multiaddr.s_addr = inet_addr(group);
    group_request.imr_interface.s_addr = inet_addr(local);
    if(setsockopt(socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *)&group_request, sizeof(group_request)) < 0)
    {
        perror("Adding multicast group error");
        close(socket);
        exit(1);
    }
    else
    {
        printf("Adding multicast group...OK.\n");   
    }
   
}
   

// Multicast socket setup options
// set sockaddr structure to a group address and port   - sockaddr_in
// set sockopt for loopback packets or not 
// set in_addr for local interface to outbound multicast datagrams 
//  
// make multicast and loopback
/*! \fn set_socket_options
 * 
 * \var socket - a pre-made socket
 * \var local_address - the local host address to bind to 
 * 
 * */
int set_socket_options (int socket, char local_address[], char group_address[], int port, struct sockaddr_in *addr, struct in_addr * local_addr )
{
    struct ip_mreq mreq;        // multicast request
    struct sockaddr_in *local;      // local address
   
    local = addr;
   
    // clear memory
    memset (  (char *) &local, 0,  sizeof( local ) );
    memset (  (char *) &mreq, 0,  sizeof( mreq ) );
   
    // the ip address specified 
    // set to reuse address so other sockets can join
    // char type won't work - needed unsigned int as valid argument
    // even though Open Group standard calls for a int
    u_int yes = 1;
    if ( setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (const char*)&yes, sizeof(yes)) == -1 )
    {
        perror("setsockopt SO_REUSEADDR");
    }   
#ifdef SO_REUSEPORT
    if (setsockopt(socket, SOL_SOCKET, SO_REUSEPORT, (const char*)&yes, sizeof(yes)) < 0) 
        perror("setsockopt(SO_REUSEPORT) failed");
#endif
    // assign socket to local address
    local->sin_family = AF_INET;
    local->sin_port = htons (port);
    local->sin_addr.s_addr = inet_addr(local_address);  

    // bind for single comms at the end of options
    if (bind (socket, (struct sockaddr *) local, sizeof (local)) < 0)
    {
        perror ("bind");
        exit (EXIT_FAILURE);
    }
    else
    {
        printf("+Bound to address : %s port : %d \n", local_address, port);
    }
   
    // change to join multicast
    // Initialize group socket address structure 
    // with a group address and port

    /* Join the multicast group "group_address" on the local "local_address" */
    /* interface. Note that this IP_ADD_MEMBERSHIP option must be */
    /* called for each local interface over which the multicast */
    /* datagrams are to be received. */

    // construct a multicast address structure 
    memset(&mreq, 0, sizeof(mreq));
    mreq.imr_multiaddr.s_addr =      inet_addr(group_address); 
    mreq.imr_interface.s_addr =     inet_addr(local_address);
    setsockopt (socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));

    // change the default Time To Live to 1 packet or more
    u_char ttl = 1;
    setsockopt(socket, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl));
   
    // change to loopback packets for local 
    // it must be setup for a packet to be read by other group members
    u_char loop = 1;
    setsockopt(socket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop));
   
    /* Set local interface for outbound multicast datagrams. */
    /* The IP address specified must be associated with a local, */
    /* multicast capable interface. */
    if(setsockopt(socket, IPPROTO_IP, IP_MULTICAST_IF, local, sizeof(local)) < 0)
    {
      perror("Setting local interface error");
      exit(1);
    }
   
  return 0;
}




int make_socket (uint16_t port)
{
  int sock;

  // Create the socket. 
  // default protocol is defined as 0
  sock = socket (PF_INET, SOCK_DGRAM, 0);
  if (sock < 0)
  {
      perror ("make_socket: socket");
      exit (EXIT_FAILURE);
  }
  return sock;
}


void write_to_server (int filedes )
{
  int nbytes;
  int flags = 0; 
  
  fprintf (stderr, "Layer: writing: \"%s\" in %d bytes\n", output_area, output_size);
  // sendto (int socket, const void *buffer, size_t size, int flags, struct sockaddr *addr, socklen_t length)
  nbytes = sendto (filedes, output_area, output_size, flags, (struct sockaddr*)&writer_address, sizeof(writer_address));
  if (nbytes < 0)
  {
      perror ("write_to_server: sendto ");
      exit (EXIT_FAILURE);
  }
  else
  {
      printf("write_to_server: wrote %d bytes to interface \n", nbytes );
  }
}
// read in data and update where from in later version
// no state logic internal just a function
// reads from bound port
int read_from_client (int filedes )
{
  char buffer[MAXMSG];
  int nbytes;
  size_t sizebuf;
  int flags = 0;

    sizebuf = sizeof(buffer);
    //recvfrom (int socket, void *buffer, size_t size, int flags, struct sockaddr *addr, socklen_t *length-ptr)
    // 
   
    nbytes = recv (filedes, buffer, sizebuf, flags );
    if (nbytes < 0)
    {
        /* Read error. */
        perror ("read");
        exit (EXIT_FAILURE);
    }
    else if (nbytes == 0)
        /* End-of-file. */
        return -1;
    else
    {
        /* Data read. */
        fprintf (stderr, "Layer: got message: \"%s\" in %d bytes\n", buffer, nbytes);
        // copy to global buffer;
        if ( nbytes <= data_size )
        {
            // copy complete message
            strncpy( (char *) data_area, (const char *) buffer,  nbytes);
            return nbytes;
        }
        else
        {
            // copy complete message
            strncpy( (char *) data_area, buffer,  sizebuf);
            return nbytes;       
        }       
        return 0;
    }
    // never gets here...
    // never gets here...
}

//
// READ THREAD 
//
void *ReadThread( void * in)
{
    int sock = *(int*)in;   
    useconds_t usec;
   
    usec = 50; // 50 usec 
    while (1)
    {
        if ( (( data_ready == 0 ) ||  ( write_ready == 1 ) )   )
        {
            read_from_client (sock );
            // enable computation
            data_ready = 1;
           
        }
        usleep( usec );     
    }
}

//
// WRITE THREAD
//
//
void *WriteThread( void * in)
{
    int sock = *(int*)in;
    useconds_t usec;
   
    usec = 50; // 50 usec 
    // delay thread operating
    usleep( usec );
    while (1)
    {
        if ( (  ( write_ready == 1 ) ) && ( compute_ready == 0)    )
        {
            write_to_server (sock);
            // enable reading 
            write_ready = 0;
            // disable computing
            compute_ready = 0;
        }
        usleep( usec );
     
    }
}





// MAIN
int main (int argc, char**argv)
{
  // LOCAL VARS
  int opt = 0;
  int sock;
  int sock2;
  char server_string[MAX_STRING];
  // set to default ports
  int lower_port = 10000;
  int upper_port = 10002;
  struct sockaddr_in servaddr,cliaddr;
  fd_set active_fd_set, read_fd_set;
  int i;
  struct sockaddr_in clientname;
  size_t size;
  char hostname[1024];
  char  arg_mcaddr_str[16] = "127.0.0.1";   // 239.255.0.0 is the local scope      IPv4 local scope
  char  arg_multicast_group[16] = "224.0.0.1";    // this is the default group for loopback service
  // two threads for use
  pthread_t thread_read;
  pthread_t thread_write;   
  // find out the host address
  struct hostent* h;    
  
    // EXTERNS

 //    
 // EXECUTION
 //
 
     
 // init the global data area
 data_size = sizeof(MAX_DATA);
 output_size  = sizeof(MAX_DATA);
  

 //
 // find the localhost name
 // 

    hostname[1023] = '\0';
    gethostname(hostname, 1023);
    printf("Hostname: %s\n", hostname);
    h = gethostbyname(hostname);
    printf("h_name: %s\n", h->h_name);
  

    // read command line options
    while ((opt = getopt(argc, argv, "a:u:l:h")) != -1) 
    {
        switch(opt) 
        {
            // address to connect to
            case 'a':
                /*  address override */
                strcpy(arg_mcaddr_str, optarg);
            break;
            case 'u':
                upper_port = atoi(optarg);

            break;
            case 'l':
                lower_port = atoi(optarg);

            break;           
            // help case
            case 'h':
            ;
            break;

        }
     }// end while

    // report options
    printf("+connect to server =%s \n", arg_mcaddr_str);   
    printf("+lower port=%d \n", lower_port);
    printf("+upper port=%d \n", upper_port);   
    //
    // clear data areas
    //
    data_size = sizeof(data_area);
    bzero( (void*) data_area, data_size );
    output_size = sizeof(output_area);
    bzero( (void *) output_area, output_size );

    // Create the socket and set it up to accept connections. 
    sock = make_socket (lower_port);
    sock2 = make_socket (upper_port);

    // Setup the socket options
    // set the reader bound to port
    set_reader_socket ( sock, arg_mcaddr_str, arg_multicast_group, lower_port);
    set_writer_socket ( sock2, arg_mcaddr_str, arg_multicast_group, upper_port); 

    //
    // starting threads 
    //
 
    // create read thread
    if( pthread_create( &thread_read , NULL ,  ReadThread , (void*) &sock) < 0)
    {
      perror("ReadThread: could not create thread");
      return 1;
    }

    if( pthread_create( &thread_write , NULL ,  WriteThread , (void*) &sock2) < 0)
    {
      perror("WriteThread: could not create thread");
      return 1;
    }

    //
    // MAIN EXECUTION
    //
    useconds_t usec;
    // do computation when ready
    while (1)
    {
        // check for new data, then copy to middle buffer 
        // computation internal
        if ( data_ready == 1 )
        {
            compute_ready = 1;
            data_ready = 0;
            // clear output data
            bzero( (void *) output_area, output_size );           
            // copy data to output if ready for it
            strncpy ( output_area, data_area, output_size);           
        }
        // compute - this time just copy over from data area to output area
        // then trigger write action
        if (compute_ready == 1)
        {
            compute_ready = 0;
            write_ready = 1;
        }
        usec = 50; // 50 usec 
        usleep( usec );   
   
    }
}

No comments:

Post a Comment