gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r27930 - msh/src


From: gnunet
Subject: [GNUnet-SVN] r27930 - msh/src
Date: Thu, 11 Jul 2013 17:06:38 +0200

Author: harsha
Date: 2013-07-11 17:06:37 +0200 (Thu, 11 Jul 2013)
New Revision: 27930

Added:
   msh/src/bitmap.c
   msh/src/bitmap.h
   msh/src/test_bitmap.c
Modified:
   msh/src/
   msh/src/Makefile.am
   msh/src/common.h
   msh/src/mshd.c
   msh/src/scheduler.c
   msh/src/scheduler.h
Log:
- address verfication


Index: msh/src
===================================================================
--- msh/src     2013-07-11 15:04:18 UTC (rev 27929)
+++ msh/src     2013-07-11 15:06:37 UTC (rev 27930)

Property changes on: msh/src
___________________________________________________________________
Modified: svn:ignore
## -5,5 +5,4 ##
 mping
 test-suite.log
 test-scheduler*
-test-scheduler*.log
-test-scheduler*.trs
+test-bitmap*
Modified: msh/src/Makefile.am
===================================================================
--- msh/src/Makefile.am 2013-07-11 15:04:18 UTC (rev 27929)
+++ msh/src/Makefile.am 2013-07-11 15:06:37 UTC (rev 27930)
@@ -2,14 +2,16 @@
 
 mping_SOURCES = mping.c
 
-mshd_SOURCES = mshd.c util.c util.h scheduler.c scheduler.h common.h
+mshd_SOURCES = mshd.c util.c util.h scheduler.c scheduler.h \
+  common.h bitmap.c bitmap.h
 mshd_LDADD = -levent
 mshd_CPPFLAGS = $(LIBEVENT_CPPFLAGS)
 mshd_LDFLAGS =  $(LIBEVENT_LDFLAGS)
 
 check_PROGRAMS = \
   test-scheduler \
-  test-scheduler-socket
+  test-scheduler-socket \
+  test-bitmap
 
 test_scheduler_SOURCES = test_scheduler.c scheduler.c scheduler.h common.h \
        util.c util.h common.h
@@ -23,6 +25,8 @@
 test_scheduler_socket_CPPFLAGS = $(LIBEVENT_CPPFLAGS)
 test_scheduler_socket_LDFLAGS = $(LIBEVENT_LDFLAGS)
 
+test_bitmap_SOURCES = test_bitmap.c bitmap.c bitmap.h
+
 TESTS = \
   test-scheduler \
   test-scheduler-socket

Added: msh/src/bitmap.c
===================================================================
--- msh/src/bitmap.c                            (rev 0)
+++ msh/src/bitmap.c    2013-07-11 15:06:37 UTC (rev 27930)
@@ -0,0 +1,159 @@
+#include "common.h"
+#include "bitmap.h"
+
+/**
+ * @file bitmap
+ * @brief implementation of bitmap array
+ * @author Sree Harsha Totakura <address@hidden> 
+ */
+
+/**
+ * Handler to the bitmap
+ */
+struct BitMap
+{
+  /**
+   * Number of relevant bits in the bitmap
+   */
+  unsigned int len;
+
+  /**
+   * The size of the barray.  Should be enough to hold size number of bits
+   */
+  unsigned int array_size;
+
+  /**
+   * The bitmap array
+   */
+  uint32_t barray[0];
+
+};
+
+#define ELE_BITSIZE (8 * sizeof (bm->barray[0]))
+
+/**
+ * Initialize bitmap array
+ *
+ * @param len the number of bits to be present in the bitmap
+ * @return handle to the bitmap
+ */
+struct BitMap *
+bitmap_create (unsigned int len)
+{
+  struct BitMap *bm;
+  unsigned int array_size;
+
+  array_size = (len + ELE_BITSIZE - 1) / ELE_BITSIZE;
+  bm = MSH_malloc (sizeof (struct BitMap) + (array_size * sizeof
+                                             (bm->barray[0])) );
+  bm->len = len;
+  bm->array_size = array_size;
+  return bm;
+}
+
+
+/**
+ * Destroy a bitmap handle and free its resources
+ *
+ * @param bm the bitmap to destroy
+ */
+void
+bitmap_destroy (struct BitMap *bm)
+{
+  free (bm);
+}
+
+
+/**
+ * Clear all the bits in the bitmap
+ *
+ * @param bm the handle to the bitmap
+ */
+void
+bitmap_clear (struct BitMap *bm)
+{
+  (void) memset (bm->barray, 0, bm->array_size * sizeof(bm->barray[0]));
+}
+
+
+/**
+ * Set the bit at given index to the given value
+ *
+ * @param bm the handle to the bitmap
+ * @param id the index of the bit to set
+ * @param val the value; should be either 1 or 0
+ */
+void
+bitmap_set (struct BitMap *bm, unsigned int id, int val)
+{
+  unsigned int off;
+  unsigned int bitidx;  
+  typeof (bm->barray[0]) one;
+
+  MSH_assert (id < bm->len);
+  MSH_assert ( (0 == val) || (1 == val) );
+  off = id / ELE_BITSIZE;
+  bitidx = id % ELE_BITSIZE;
+  MSH_assert (off < bm->array_size);
+  one = (typeof (bm->barray[0])) 1; /* cast */
+  one = one << bitidx;
+  if (1 == val)
+    bm->barray[off] |= one;
+  else if (0 != (bm->barray[off] & one))
+    bm->barray[off] ^= one;
+}
+
+
+/**
+ * Checks if the given bit in the bit array is set to not
+ *
+ * @param bm the handle to the bitmap
+ * @param id the index of the bit to set
+ * @return 1 if the given bit is set; 0 otherwise
+ */
+int
+bitmap_isbitset (struct BitMap *bm, unsigned int id)
+{
+  unsigned int off;
+  unsigned int bitidx;
+  typeof (bm->barray[0]) one;
+  
+  MSH_assert (id < bm->len);
+  off = id / ELE_BITSIZE;
+  bitidx = id % ELE_BITSIZE;
+  MSH_assert (off < bm->array_size);
+  one = (typeof (bm->barray[0])) 1; /* cast */
+  return (0 == (bm->barray[off] & (one << bitidx))) ? 0 : 1;
+}
+
+
+/**
+ * Check if all relevant bits in the bitmap are set
+ *
+ * @param bm the handle to the bitmap
+ * @return 1 if all the bits are set; 0 if not
+ */
+int
+bitmap_allset (struct BitMap *bm)
+{
+  unsigned int off;
+  unsigned int bitidx;
+  unsigned int cnt;
+  typeof (bm->barray[0]) max;
+
+  max = (typeof (bm->barray[0])) 0;
+  max = max - 1;
+  off = bm->len / ELE_BITSIZE;
+  bitidx = bm->len % ELE_BITSIZE;
+  MSH_assert (off < bm->array_size);
+  for (cnt = 0; cnt < off; cnt ++)
+  {
+    if (0 != max ^ bm->barray[cnt])
+      return 0;
+  }
+  if (0 == bitidx)
+    return 1;
+  max = (typeof (bm->barray[0])) 1;
+  max = (max << bitidx) - 1;
+  return (max == (bm->barray[off] & max)) ? 1 : 0;
+}

Added: msh/src/bitmap.h
===================================================================
--- msh/src/bitmap.h                            (rev 0)
+++ msh/src/bitmap.h    2013-07-11 15:06:37 UTC (rev 27930)
@@ -0,0 +1,70 @@
+/**
+ * @file bitmap.h
+ * @brief interface for bitmap array
+ * @author Sree Harsha Totakura <address@hidden> 
+ */
+
+#ifndef BITMAP_H_
+#define BITMAP_H_
+
+
+/**
+ * Opaque handle to the bitmap
+ */
+struct BitMap;
+
+
+/**
+ * Initialize bitmap array
+ *
+ * @param len the number of bits to be present in the bitmap
+ * @return handle to the bitmap
+ */
+struct BitMap *
+bitmap_create (unsigned int len);
+
+
+/**
+ * Destroy a bitmap handle and free its resources
+ *
+ * @param bm the bitmap to destroy
+ */
+void
+bitmap_destroy (struct BitMap *bm);
+
+
+/**
+ * Clear all the bits in the bitmap
+ *
+ * @param bm the handle to the bitmap
+ */
+void
+bitmap_clear (struct BitMap *bm);
+
+
+/**
+ * Set the bit at given index to the given value
+ *
+ * @param bm the handle to the bitmap
+ * @param id the index of the bit to set
+ * @param val the value; should be either 1 or 0
+ */
+void
+bitmap_set (struct BitMap *bm, unsigned int id, int val);
+
+
+/**
+ * Checks if the given bit in the bit array is set to not
+ *
+ * @param bm the handle to the bitmap
+ * @param id the index of the bit to set
+ * @return 1 if the given bit is set; 0 otherwise
+ */
+int
+bitmap_isbitset (struct BitMap *bm, unsigned int id);
+
+
+
+#endif  /* #ifndef BITMAP_H_ */
+
+/* End of bitmap.h  */

Modified: msh/src/common.h
===================================================================
--- msh/src/common.h    2013-07-11 15:04:18 UTC (rev 27929)
+++ msh/src/common.h    2013-07-11 15:06:37 UTC (rev 27930)
@@ -1,3 +1,12 @@
+/**
+ * @file common.h
+ * @brief common header which is to be included in all sources
+ * @author Sree Harsha Totakura <address@hidden> 
+ */
+
+#ifndef COMMON_H_
+#define COMMON_H_
+
 #ifndef _GNU_SOURCE
 #define _GNU_SOURCE 1
 #endif
@@ -203,3 +212,7 @@
     (element)->next->prev = (element)->prev; \
   (element)->next = NULL; \
   (element)->prev = NULL; } while (0)
+
+#endif /* COMMON_H_ */
+
+/* End of common.h */

Modified: msh/src/mshd.c
===================================================================
--- msh/src/mshd.c      2013-07-11 15:04:18 UTC (rev 27929)
+++ msh/src/mshd.c      2013-07-11 15:06:37 UTC (rev 27930)
@@ -3,42 +3,123 @@
 #include "util.h"
 #include "scheduler.h"
 #include "mtypes.h"
+#include "bitmap.h"
 
+
 /**
- * The port number of our local socket
+ * An address of an instance
  */
-uint16_t lport;
+struct InstanceAddr
+{
+  /**
+   * The length of the instance address
+   */
+  socklen_t addrlen;
 
+  /**
+   * The instance address to be followed here
+   */
+  struct sockaddr saddr[0];
+};
+
 /**
- * The number of total mshd processes 
+ * Instance address information
  */
-static int nproc;
+struct InstanceAddrInfo
+{
+  /**
+   * Array of addresses
+   */
+  struct InstanceAddr **addrs;
 
+  /**
+   * Number of addresses in the above array
+   */
+  unsigned int naddrs;
+  
+  /**
+   * The MPI id of the instance to whom these addresses belong to
+   */
+  unsigned int source;
+};
+
+
 /**
- * Rank of this process
+ * Context for verifying addresses
  */
-static int rank;
+struct VerifyAddressesCtx
+{
+  /**
+   * The DLL next ptr
+   */
+  struct VerifyAddressesCtx *next;
 
+  /**
+   * The DLL prev ptr
+   */
+  struct VerifyAddressesCtx *prev;
+
+  /**
+   * The instance addresses
+   */
+  struct InstanceAddrInfo *iainfo;
+
+  /**
+   * The socket open handle to the instance address
+   */
+  struct SocketOpenHandle *soh;
+
+  /**
+   * close task handle
+   */
+  struct Task *close_task;
+
+  /**
+   * The index of the address being verified in association with this context
+   */
+  unsigned int naddr;
+
+  /**
+   * The socket file descriptor associated with the connection used to verify
+   * the address 
+   */
+  int sock;
+};
+
+
+struct ReadContext
+{
+  struct ReadContext *next;
+
+  struct ReadContext *prev;
+
+  /* struct sockaddr_in addr; */
+  
+  /* socklen_t addrlen; */
+  
+  struct Task *task;
+};
+
+
 /**
- * Array of our IP addresses in network-byte format
+ * DLL head for address verification contexts
  */
-static in_addr_t *s_addrs;
+static struct VerifyAddressesCtx *vactx_head;
 
 /**
- * Number of IP addresses
+ * DLL tail for address verification contexts
  */
-static unsigned int nips;
+static struct VerifyAddressesCtx *vactx_tail;
 
 /**
- * Current IP verification round 
+ * Task for finalising a round
  */
-static unsigned int round;
+static struct Task *finalise_task;
 
 /**
- * width of the round -- how many other mshd instances verify our IP addresses
- * in a round
+ * Array of our IP addresses in network-byte format
  */
-static unsigned int rwidth;
+static in_addr_t *s_addrs;
 
 /**
  * Tasks for handling SIGINT and SIGTERM
@@ -56,60 +137,75 @@
 static struct Task *atask;
 
 /**
- * Array for checking which MPI processes have verified our addresses in the
+ * Bitmap for checking which MPI processes have verified our addresses in the
  * current round
  */
-static uint8_t *barray;
+static struct BitMap *bitmap;
 
-static size_t barray_size;
+/**
+ * Instances addresses learnt in the current round
+ */
+struct InstanceAddrInfo **riainfos;
 
-static void
-barray_init ()
-{
-  barray_size = (rwidth + sizeof (barray[0]) - 1) / sizeof (barray[0]);
-  barray = MSH_malloc (barray_size);
-}
+/**
+ * head for read context DLL
+ */
+static struct ReadContext *rhead;
 
-static void
-barray_destroy ()
-{
-  free (barray);
-  barray = NULL;
-}
+/**
+ * tail for read context DLL
+ */
+static struct ReadContext *rtail;
 
-static void
-barray_clear ()
-{
-  (void) memset (barray, 0, barray_size);
-}
+/**
+ * The number of total mshd processes 
+ */
+static int nproc;
 
-static void
-barray_set (unsigned int id)
+/**
+ * Rank of this process
+ */
+static int rank;
+
+/**
+ * The listen socket for the current round
+ */
+static int listen_sock;
+
+/**
+ * Number of IP addresses
+ */
+static unsigned int nips;
+
+/**
+ * Current IP verification round 
+ */
+static unsigned int round;
+
+/**
+ * width of the round -- how many other mshd instances verify our IP addresses
+ * in a round
+ */
+static unsigned int rwidth;
+
+/**
+ * The port number of our local socket
+ */
+uint16_t lport;
+
+
+static char *
+saddr2str (const struct sockaddr *addr, const socklen_t addrlen)
 {
-  unsigned int off;
-  unsigned int idx;
-  typeof (barray[0]) one;
-  
-  off = id / sizeof (barray[0]);
-  idx = id % sizeof (barray[0]);
-  MSH_assert (off < barray_size);
-  one = (typeof (barray[0])) 1; /* cast */
-  MSH_assert (0 == (barray[off] & (one << idx)) );
-  barray[off] = barray[off] | (one << idx);
-}
+  static char hostip[NI_MAXHOST];
 
-static int
-barray_isset (unsigned int id)
-{
-  unsigned int off;
-  unsigned int idx;
-  typeof (barray[0]) one;
-  
-  off = id / sizeof (barray[0]);
-  idx = id % sizeof (barray[0]);
-  MSH_assert (off < barray_size);
-  one = (typeof (barray[0])) 1; /* cast */
-  return (0 == (barray[off] & (one << idx)) ) ? 0 : 1;
+  if (0 != getnameinfo (addr, addrlen, hostip, NI_MAXHOST, NULL, 0, 
+                        NI_NUMERICHOST))
+  {
+    LOG_STRERROR ("getnameinfo");
+    return NULL;
+  }
+  return hostip;
 }
 
 
@@ -132,17 +228,14 @@
                              const struct sockaddr *netmask, 
                              socklen_t addrlen)
 {
-  char hostip[NI_MAXHOST];
+  char *hostip;
   const struct sockaddr_in *inaddr;
 
   if (sizeof (struct sockaddr_in) != addrlen)
     return MSH_OK;           /* Only consider IPv4 for now */
-  if (0 !=
-      getnameinfo (addr, addrlen, hostip, NI_MAXHOST, NULL, 0, NI_NUMERICHOST))
-  {  
-    LOG_STRERROR ("getnameinfo");
+  hostip = saddr2str (addr, addrlen);
+  if (NULL == hostip)
     return MSH_OK;
-  }
   inaddr = (const struct sockaddr_in *) addr;
   MSH_array_append (s_addrs, nips, inaddr->sin_addr.s_addr);
   LOG_DEBUG ("%d: Found IP: %s\n", rank, hostip);
@@ -150,24 +243,6 @@
 }
 
 
-struct ReadContext
-{
-  struct ReadContext *next;
-
-  struct ReadContext *prev;
-
-  /* struct sockaddr_in addr; */
-  
-  /* socklen_t addrlen; */
-  
-  struct Task *task;
-};
-
-static struct ReadContext *rhead;
-
-static struct ReadContext *rtail;
-
-
 /**
  * Task to read from socket
  *
@@ -190,7 +265,7 @@
     MSH_close (sock);
     return;
   }
-  rsize = read (sock, &cid, sizeof (cid));
+  rsize = read (sock, &cid, sizeof (uint32_t));
   if (rsize < 0)
   {
     LOG_STRERROR ("read");  
@@ -202,8 +277,9 @@
     goto err_ret;
   }
   cid = ntohl (cid);
-  if (!barray_isset (cid))
-    barray_set (cid);
+  LOG_DEBUG ("%d: read id %u from connection\n", rank, cid);
+  /* if (!barray_isset (cid)) */
+  /*   barray_set (cid); */
   MSH_close (sock);
   return;
   
@@ -234,7 +310,7 @@
     (void) close (sock);
     return;
   }
-  LOG_DEBUG ("Got a connect\n");
+  LOG_DEBUG ("%d: Got a connect\n", rank);
   if (0 > (csock = accept4 (sock, NULL, NULL, SOCK_NONBLOCK | SOCK_CLOEXEC)))
   {
     LOG_STRERROR ("accept4");
@@ -245,25 +321,286 @@
   rctx = MSH_malloc (sizeof (struct ReadContext));
   DLL_insert_tail (rhead, rtail, rctx);
   rctx->task = scheduler_add_socket (csock, EV_READ, &read_socket, rctx, NULL);
+  /* resume accepting connections on the listen sock */
+  atask = scheduler_add_socket (sock, EV_READ, &accept_task, &atask, NULL);
 }
 
 
+/**
+ * Task for running a round
+ *
+ * @param nosock we have no sockets associated with this callback
+ * @param flags EV_* flags
+ * @param cls NULL
+ */
+static void
+run_round (evutil_socket_t nosock, short flags, void *cls);
+
+
+/**
+ * Schedules next round
+ */
+static void
+schedule_next_round ()
+{
+  int trounds;
+
+  MSH_assert (NULL == rtask);
+  /* Number of rounds required to contact all processes except ourselves 
(rwidth
+     in parallel in each round) */
+  trounds = ((nproc - 1) + (rwidth - 1)) / rwidth;
+  if (round < trounds)
+    rtask = scheduler_add (&run_round, NULL, TV_IMMEDIATE);
+  else
+    LOG_DEBUG ("Verification phase complete; commencing reduction phase\n");
+}
+
+
+/**
+ * Free an instance's address information
+ *
+ * @param iainfos the instance address information
+ */
+static void
+free_instance_addresses (struct InstanceAddrInfo *iainfos)
+{
+  unsigned int cnt;
+  
+  for (cnt = 0; cnt < iainfos->naddrs; cnt++)
+    free (iainfos->addrs[cnt]);
+  free (iainfos->addrs);
+  free (iainfos);
+}
+
+
+/**
+ * Callback trigger to finalise a round
+ *
+ * @param sock -1 do not use this
+ * @param flags EV_* flags
+ * @param cls
+ */
+static void
+finalise_round (evutil_socket_t sock, short flags, void *cls)
+{
+  struct VerifyAddressesCtx *ctx;
+  unsigned int cnt;
+
+  scheduler_remove (finalise_task);
+  finalise_task = NULL;
+  while (NULL != (ctx = vactx_head))
+  {
+    if (NULL != ctx->soh)
+      scheduler_open_socket_cancel (ctx->soh);
+    if (NULL != ctx->close_task)
+    {
+      MSH_close (ctx->sock);
+      scheduler_remove (ctx->close_task);
+    }
+    DLL_remove (vactx_head, vactx_tail, ctx);
+    free (ctx);
+  }
+  for (cnt = 0; cnt < rwidth; cnt++)
+    free_instance_addresses (riainfos[cnt]);
+  if (IS_SHUTDOWN_EVENT (flags))
+    return;
+  MSH_close (listen_sock);
+  listen_sock = -1;
+  scheduler_remove (atask);
+  atask = NULL;
+  if (1 != bitmap_allset (bitmap))
+  {
+    LOG_ERROR ("Could not verify addresses of all hosts\n");
+    scheduler_shutdown ();
+    return;
+  }
+  round++;
+  schedule_next_round ();
+}
+
+
+/**
+ * Callback triggered when the data on the sock is written.  This function
+ * closes the socket.
+ *
+ * @param sock the socket file descriptor
+ * @param flags EV_* flags
+ * @param cls context for verifying addresses
+ */
+static void
+socket_close_cb (evutil_socket_t sock, short flags, void *cls)
+{
+  struct VerifyAddressesCtx *ctx = cls;
+  int lb;
+
+  scheduler_remove (ctx->close_task);
+  ctx->close_task = NULL;
+  if (ctx->sock == sock)
+    MSH_close (sock);
+  else if (-1 == sock)
+    MSH_break (0);
+  if (IS_SHUTDOWN_EVENT (flags))
+  {
+    DLL_remove (vactx_head, vactx_tail, ctx);
+    free (ctx);
+    return;
+  }
+  /* FIXME: add the addresses associated with the contex to the mapping */
+  lb = rank - round * rwidth - rwidth + nproc;
+  MSH_assert (0 <= lb);
+  lb %= nproc;
+  MSH_assert (lb <= ctx->iainfo->source);
+  bitmap_set (bitmap, ctx->iainfo->source - lb, 1);
+  return;
+}
+
+
+/**
+ * Callback triggered when a socket connection is ready to be written to
+ *
+ * @param sockfd the file descriptor of the socket which is ready to be written
+ *          to
+ * @param cls context information for verifying an instance address
+ */
+static void 
+socket_open_cb (int sockfd, void *cls)
+{
+  struct VerifyAddressesCtx *ctx = cls;
+  struct InstanceAddr *iaddr;
+  uint32_t id;
+
+  ctx->soh = NULL;
+  if (-1 == sockfd)
+  {
+    MSH_break (0);
+    /* FIXME: Check if we already got a mapping for the instance */
+    goto err_ret;
+  }
+  iaddr = ctx->iainfo->addrs[ctx->naddr];
+  LOG_DEBUG ("%d: Opened a connection to %s\n", rank, 
+             saddr2str (iaddr->saddr, iaddr->addrlen));
+  ctx->sock = sockfd;
+  id = htonl ((uint32_t) rank);
+  if (sizeof (uint32_t) != write (sockfd, &id, sizeof (uint32_t)))
+  {
+    MSH_break (0);  /* FIXME: handle error */
+    MSH_close (sockfd);
+    goto err_ret;
+  }
+  ctx->close_task = 
+      scheduler_add_socket (sockfd, EV_WRITE, &socket_close_cb, ctx, NULL);
+  return;
+
+ err_ret:
+  DLL_remove (vactx_head, vactx_tail, ctx);
+  free (ctx);
+}
+
+
+/**
+ * Verify the addresses of an instance by connecting to the instance's listen
+ * socket
+ *
+ * @param iainfo the instance's address information
+ * @return MSH_OK upon success initialisation of the connection to instance's
+ *           listen socket (this does not mean that the connection is
+ *           established or an address is verified); MSH_SYSERR upon error
+ */
 static int
+verify_addresses (struct InstanceAddrInfo *iainfo)
+{
+  struct VerifyAddressesCtx *ctx;
+  struct InstanceAddr *iaddr;
+  unsigned int cnt;
+
+  for (cnt = 0; cnt < iainfo->naddrs; cnt++)
+  {
+    iaddr = iainfo->addrs[cnt];
+    ctx = MSH_malloc (sizeof (struct VerifyAddressesCtx));
+    ctx->naddr = cnt;
+    ctx->soh = scheduler_open_socket (iaddr->saddr, iaddr->addrlen,
+                                      &socket_open_cb, ctx);
+    ctx->iainfo = iainfo;
+    ctx->sock = -1;
+    if (NULL == ctx->soh)
+    {
+      MSH_break (0);
+      free (ctx);
+      return MSH_SYSERR;
+    }
+    DLL_insert_tail (vactx_head, vactx_tail, ctx);
+  }
+  return MSH_OK;
+}
+
+
+/**
+ * Parse a verfication message from a source for its address information
+ *
+ * @param msg the message to parse
+ * @param source the MPI id of the instance which has sent this message
+ * @return the instance's address information
+ */
+static struct InstanceAddrInfo *
+parse_verify_address_msg (struct MSH_MSG_VerifyAddress *msg, int source)
+{
+  struct InstanceAddr *addr;
+  struct sockaddr_in *inaddr;
+  struct InstanceAddrInfo *iainfo;
+  size_t size;
+  socklen_t addrlen;
+  uint16_t port;  
+  uint16_t nips;
+  uint16_t cnt;
+
+  size = ntohs (msg->header.size);
+  nips = ntohs (msg->nips);
+  if (size != (sizeof (struct MSH_MSG_VerifyAddress) 
+               + (sizeof (uint32_t) * nips)))
+  {
+    LOG_ERROR ("Parsing failed\n");
+    return NULL;
+  }
+  iainfo = MSH_malloc (sizeof (struct InstanceAddrInfo));
+  iainfo->source = source;
+  for (cnt = 0; cnt < nips; cnt++)
+  {
+    addrlen = sizeof (struct sockaddr_in); /* IPv4 */
+    addr = MSH_malloc (sizeof (struct InstanceAddr) + addrlen);
+    addr->addrlen = addrlen;
+    inaddr = (struct sockaddr_in *) addr->saddr;
+    inaddr->sin_family = AF_INET;
+    /* assign directly as address and port already in NB format */
+    inaddr->sin_port = msg->port;
+    inaddr->sin_addr.s_addr = (in_addr_t) msg->ipaddrs[cnt];
+    MSH_array_append (iainfo->addrs, iainfo->naddrs, addr);
+  }
+  return iainfo;
+}
+
+
+/**
+ * Receives the IP addresses to verify in the current round from instances
+ *
+ * @return an array containing the instance addresses; NULL upon a receive 
error
+ */
+static struct InstanceAddrInfo **
 receive_addresses ()
 {
-  struct MSH_MSG_VerifyAddress **rmsgs;
+  struct InstanceAddrInfo **iainfos;  
   MPI_Status status;
-  int rsize;
-  int lb;
-  int up;
-  int source;
-  int ret;
   int cnt;
 
-  ret = MSH_SYSERR;
-  rmsgs = MSH_malloc (sizeof (struct MSH_MSG_VerifyAddress *) * rwidth);
+  iainfos = MSH_malloc (sizeof (struct InstanceAddrInfo *) * rwidth);
   for (cnt=0; cnt < rwidth; cnt++)
   {
+    struct MSH_MSG_VerifyAddress *msg;
+    int rsize;
+    int lb;
+    int up;
+    int source;
+    int ret;
+
     MPI_Probe (MPI_ANY_SOURCE, MSH_MTYPE_VERIFY_ADDRESSES, MPI_COMM_WORLD,
                &status);
     MPI_Get_elements (&status, MPI_BYTE, &rsize);
@@ -288,29 +625,32 @@
       MSH_break (0);
       goto err_ret;
     }
-    rmsgs[cnt] = MSH_malloc (rsize);
-    if (MPI_SUCCESS != MPI_Recv (rmsgs[cnt], rsize, MPI_BYTE, source,
+    msg = MSH_malloc (rsize);
+    if (MPI_SUCCESS != MPI_Recv (msg, rsize, MPI_BYTE, source,
                                  MSH_MTYPE_VERIFY_ADDRESSES, MPI_COMM_WORLD,
                                  MPI_STATUS_IGNORE))
     {
       MSH_break (0);
       goto err_ret;
     }
+    if (NULL == (iainfos[cnt] = parse_verify_address_msg (msg, source)))
+    {
+      free (msg);
+      goto err_ret;
+    }
+    free (msg);
     LOG_DEBUG ("%d: Received message of size %d from %d\n", rank, rsize, 
source);
   }
-  /* remove this later on and do something useful */
-  for (cnt = 0; cnt < rwidth; cnt++)
+  return iainfos;
+  
+ err_ret:
+  for (cnt=0; cnt < rwidth; cnt++)
   {
-    MSH_free_non_null (rmsgs[cnt]);
-    rmsgs[cnt] = NULL;
+    if (NULL != iainfos[cnt])
+      free_instance_addresses (iainfos[cnt]);
   }
-  ret = MSH_OK;
-  
- err_ret:
-  for (cnt = 0; cnt < rwidth; cnt++)
-    MSH_free_non_null (rmsgs[cnt]);
-  free (rmsgs);
-  return ret;
+  free (iainfos);
+  return NULL;
 }
 
 
@@ -321,7 +661,7 @@
  * @return MSH_OK on success; MSH_SYSERR upon error
  */
 static int
-send_receive_addresses ()
+send_addresses ()
 {
   struct MSH_MSG_VerifyAddress *msg;
   struct MSH_MSG_VerifyAddress *cpys;  
@@ -338,7 +678,9 @@
   msg->port = htons (lport);
   msg->nips = htons (nips);
   for (cnt = 0; cnt < nips; cnt++)
+  {    
     msg->ipaddrs[cnt] = (uint32_t) s_addrs[cnt]; /* IPs already in NB */
+  }
   width = rwidth;  
   if ( (0 != ( (nproc - 1) % rwidth)) && (round == ( (nproc - 1) / rwidth)) )
     width = (nproc - 1) % rwidth;
@@ -366,7 +708,7 @@
       MSH_break (MPI_SUCCESS == MPI_Cancel (&sreqs[cnt]));
       MSH_break (MPI_SUCCESS == MPI_Wait (&sreqs[cnt], MPI_STATUS_IGNORE));
     }
-    goto end;
+    goto err_ret;
   }
   for (cnt=0; cnt < width; cnt++)
   {
@@ -378,10 +720,8 @@
     free (cpys);
     cpys = NULL;
   }
-  if (MSH_SYSERR == receive_addresses ())
-    goto end;
-  
- end:
+
+ err_ret:
   MSH_free_non_null (cpys);
   MSH_free_non_null (sreqs);  
   return (MPI_SUCCESS == ret) ? MSH_OK : MSH_SYSERR;
@@ -389,17 +729,22 @@
 
 
 /**
- * Verify IP addresses of all the hosts where mshd services are running
+ * This functions opens a listen socket, sends this instance's IP addresses to
+ * other instances and receives their IP addresses, starts accepting 
connections
+ * on listen socket and verifies the IP addresses of other instances by
+ * connecting to their listen sockets
  *
  * @return MSH_OK if verification is successful; MSH_SYSERR upon error (an 
error
  *           message is logged)
  */
 static int
-verify_addresses ()
+run_round_ ()
 {
   struct sockaddr_in addr;
+  struct timeval tv;
   socklen_t addrlen;
   int sock;
+  unsigned int cnt;
 
   addrlen = sizeof (struct sockaddr_in);
   (void) memset (&addr, 0, addrlen);
@@ -412,21 +757,27 @@
     MSH_break (0);
     goto clo_ret;
   }
-  LOG_DEBUG ("Bound to local port %u\n", lport);
   if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
   {
     MSH_break (0);
     goto clo_ret;
   }
-  if (MSH_SYSERR == send_receive_addresses ())
+  if (MSH_SYSERR == send_addresses ())
     goto clo_ret;
-  MSH_close (sock);             /* FIXME: remove later */
+  if (NULL == (riainfos = receive_addresses ()))
+    goto clo_ret;
   atask = scheduler_add_socket (sock, EV_READ, &accept_task, &atask, NULL);
   if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
   {
     MSH_break (0);
     goto clo_ret;
   }
+  tv.tv_sec = 1;
+  tv.tv_usec = 0;
+  for (cnt = 0; NULL != riainfos[cnt]; cnt++)
+    verify_addresses (riainfos[cnt]);
+  listen_sock = sock;
+  finalise_task = scheduler_add (&finalise_round, NULL, &tv);
   return MSH_OK;
 
  clo_ret:
@@ -465,49 +816,14 @@
  * @param cls NULL
  */
 static void
-run_round (evutil_socket_t nosock, short flags, void *cls);
-
-
-/**
- * Schedules next round
- */
-static void
-schedule_next_round ()
-{
-  int trounds;
-
-  MSH_assert (NULL == rtask);
-  /* Number of rounds required to contact all processes except ourselves 
(rwidth
-     in parallel in each round) */
-  trounds = ((nproc - 1) + (rwidth - 1)) / rwidth;
-  if (round < trounds)
-    rtask = scheduler_add (&run_round, NULL, TV_IMMEDIATE);
-  else
-    LOG_DEBUG ("Verification phase complete; commencing reduction phase\n");
-}
-
-
-/**
- * Task for running a round
- *
- * @param nosock we have no sockets associated with this callback
- * @param flags EV_* flags
- * @param cls NULL
- */
-static void
 run_round (evutil_socket_t nosock, short flags, void *cls)
 {
   scheduler_remove (rtask);
   rtask = NULL;
   if (IS_SHUTDOWN_EVENT (flags))
     return;
-  if (MSH_OK == verify_addresses ())
-  {
-    round++;
-    scheduler_remove (atask);
-    atask = NULL;
-    schedule_next_round ();
-  }
+  if (MSH_OK != run_round_ ())
+    scheduler_shutdown ();
 }
 
 
@@ -531,6 +847,9 @@
 }
 
 
+/**
+ * Prints help message for this program
+ */
 static void
 print_help ()
 {
@@ -546,6 +865,14 @@
   fprintf (stderr, "%s", msg);
 }
 
+
+/**
+ * The execution start point
+ *
+ * @param argc the number of arguments
+ * @param argv the argument strings
+ * @return 0 for successful termination; 1 for termination upon error
+ */
 int 
 main (int argc, char **argv)
 {
@@ -555,6 +882,7 @@
   ret = 1;
   rwidth = 1;
   
+  listen_sock = -1;
   while (-1 != (c = getopt (argc, argv, "hw:")))
   {
     switch (c)
@@ -614,17 +942,21 @@
     LOG_ERROR ("No IP addresses found\n");
     goto fail;
   }
-  barray_init ();
+  bitmap = bitmap_create (rwidth);
   if (MSH_OK != scheduler_run (&run, NULL))
   {
     MSH_break (0);
-    barray_destroy ();
     goto fail;
   }
-  barray_destroy ();
+  //barray_destroy ();
   ret = 0;
   
  fail:
+  if (NULL != bitmap)
+  {
+    bitmap_destroy (bitmap);
+    bitmap = NULL;
+  }
   MSH_break (MPI_SUCCESS == MPI_Finalize());
   MSH_free_non_null (s_addrs);
   //libevent_global_shutdown ();

Modified: msh/src/scheduler.c
===================================================================
--- msh/src/scheduler.c 2013-07-11 15:04:18 UTC (rev 27929)
+++ msh/src/scheduler.c 2013-07-11 15:06:37 UTC (rev 27930)
@@ -6,6 +6,10 @@
 
 #include "scheduler.h"
 
+/**
+ * variable for 0 time.  Externalised in scheduler.h
+ */
+struct timeval tv_immediate;
 
 struct Task
 {
@@ -32,10 +36,25 @@
  */
 static struct Task *ttail;
 
-
+/**
+ * Our event base
+ */
 static struct event_base *ebase;
 
 
+/**
+ * Adds a task which is to be executed when one of the given events are
+ * triggered on the given socket or upon the expiry of the given timeout
+ *
+ * @param sock the sock to wait for
+ * @param flags EV_* events; the callback cb
+ * @param cb the callback to call when one of the events marked in flags are
+ *          triggered on for the sock
+ * @param cls closure for the callback
+ * @param tv how long should we wait for the events.  Upon this value the cb is
+ *          called with EV_TIMEOUT flag.  Use NULL to wait forever.
+ * @return handle for the task; NULL upon error
+ */
 struct Task *
 scheduler_add_socket (evutil_socket_t sock, short flags, event_callback_fn cb, 
                       void *cls, const struct timeval *tv)
@@ -55,6 +74,15 @@
 }
 
 
+/**
+ * Adds a task which is to be executed after given interval
+ *
+ * @param cb the callback to call for executing the task
+ * @param cls closure for the above callback
+ * @param tv the interval after which the task has to be executed; NULL to
+ *          denote infinite delay
+ * @return handle for task; NULL upon error
+ */
 struct Task *
 scheduler_add (event_callback_fn cb, void *cls, const struct timeval *tv)
 {
@@ -62,6 +90,17 @@
 }
 
 
+/**
+ * Add a task to be executed upon reception of a signal or upon the expiry of a
+ * given timeout
+ *
+ * @param signal the signal to wait for
+ * @param cb the callback to call upon reception of the signal
+ * @param cls closure for the above callback
+ * @param tv how long should we wait for the signal before.  Upon this value 
the cb is
+ *          called with EV_TIMEOUT flag.  Use NULL to wait forever.
+ * @return handle for the task; NULL upon error
+ */
 struct Task *
 scheduler_add_signal (int signal, event_callback_fn cb, void *cls, 
                       const struct timeval *tv)
@@ -70,6 +109,12 @@
 }
 
 
+/**
+ * Remove a task.  All tasks are to be removed (even after their respective
+ * callbacks are executed)
+ *
+ * @param task the task handle to remove
+ */
 void
 scheduler_remove (struct Task *task)
 {
@@ -79,6 +124,14 @@
   free (task);
 }
 
+/**
+ * Shutdowns the scheduler.  All pending tasks are executed (their respective
+ * callbacks will be called).  Use IS_SHUTDOWN_EVENT() to check if the 
callbacks
+ * are called upon scheduler's shutdown.  It is not possible to add any tasks
+ * after this function is called.
+ *
+ * @see IS_SHUTDOWN_EVENT
+ */
 void
 scheduler_shutdown ()
 {
@@ -91,6 +144,15 @@
 }
 
 
+/**
+ * Run the scheduler loop by calling the given callback.  This function returns
+ * once all tasks are finished or after a call to scheduler_shutdown() (which
+ * causes all waiting tasks to be executed)
+ *
+ * @param cb the callback to call when the scheduler is ready.  Further tasks
+ *          can be added through this callback.
+ * @return MSH_OK if all tasks are successfully executed; MSH_SYSERR upon error
+ */
 int
 scheduler_run (event_callback_fn cb, void *cls)
 {
@@ -114,17 +176,41 @@
 }
 
 
+/**
+ * Handle to be returned from scheduler_open_socket()
+ */
 struct SocketOpenHandle
 {
+  /**
+   * the function to call when the socket is ready
+   */
   socket_open_fn cb;
 
+  /**
+   * The closure for the above callback
+   */
   void *cls;
 
+  /**
+   * The task associated with the socket.  Will be executed when the connection
+   * on the socket is ready
+   */
   struct Task *task;
 
+  /**
+   * The file descriptor of the socket
+   */
   int sock;
 };
 
+
+/**
+ * Callback that will be called when the socket is ready for reading
+ *
+ * @param sock the file descriptor of the socket
+ * @param flags EV_* flags
+ * @param cls the closure
+ */
 static void
 open_socket_cb (evutil_socket_t sock, short flags, void *cls)
 {
@@ -163,6 +249,17 @@
 }
 
 
+/**
+ * Open a socket, connect it to the target address and schedule a task to be
+ * executed when the connection is ready.
+ *
+ * @param addr the target address to connect
+ * @param addrlen the length of the addr
+ * @param cb the callback to call to signal success or failure
+ * @param cls the closure for the above callback
+ * @return a handle which can be used to cancel the task to be executed when 
the
+ *           connection is ready; NULL upon error
+ */
 struct SocketOpenHandle *
 scheduler_open_socket (const struct sockaddr *addr, const socklen_t addrlen,
                        socket_open_fn cb, void *cls)
@@ -181,6 +278,12 @@
   return h;
 }
 
+
+/**
+ * Cancel a handle created with scheduler_open_socket()
+ *
+ * @param h the handle to cancel
+ */
 void
 scheduler_open_socket_cancel (struct SocketOpenHandle *h)
 {

Modified: msh/src/scheduler.h
===================================================================
--- msh/src/scheduler.h 2013-07-11 15:04:18 UTC (rev 27929)
+++ msh/src/scheduler.h 2013-07-11 15:06:37 UTC (rev 27930)
@@ -1,37 +1,151 @@
+/**
+ * @file scheduler.h
+ * @brief interface for task scheduler based on libevent
+ * @author Sree Harsha Totakura <address@hidden> 
+ */
+
+#ifndef SCHEDULER_H_
+#define SCHEDULER_H_
+
 #include "common.h"
 #include "event2/event.h"
 
-static struct timeval tv_immediate;
+extern struct timeval tv_immediate;
 
+/**
+ * Use this for scheduling tasks immediately
+ */
 #define TV_IMMEDIATE &tv_immediate
 
+/**
+ * Returns true if the flags denote a shutdown event
+ */
 #define IS_SHUTDOWN_EVENT(flags) ((flags & (EV_READ | EV_WRITE | EV_TIMEOUT)) 
== (EV_READ | EV_WRITE | EV_TIMEOUT))
 
+
+/**
+ * Opaque handle for a task
+ */
 struct Task;
 
+
+/**
+ * Adds a task which is to be executed when one of the given events are
+ * triggered on the given socket or upon the expiry of the given timeout
+ *
+ * @param sock the sock to wait for
+ * @param flags EV_* events; the callback cb
+ * @param cb the callback to call when one of the events marked in flags are
+ *          triggered on for the sock
+ * @param cls closure for the callback
+ * @param tv how long should we wait for the events.  Upon this value the cb is
+ *          called with EV_TIMEOUT flag.  Use NULL to wait forever.
+ * @return handle for the task; NULL upon error
+ */
 struct Task *
 scheduler_add_socket (evutil_socket_t sock, short flags, event_callback_fn cb, 
                       void *cls, const struct timeval *tv);
 
+
+/**
+ * Adds a task which is to be executed after given interval
+ *
+ * @param cb the callback to call for executing the task
+ * @param cls closure for the above callback
+ * @param tv the interval after which the task has to be executed; NULL to
+ *          denote infinite delay
+ * @return handle for task; NULL upon error
+ */
 struct Task *
 scheduler_add (event_callback_fn cb, void *cls, const struct timeval *tv);
 
 
+/**
+ * Add a task to be executed upon reception of a signal or upon the expiry of a
+ * given timeout
+ *
+ * @param signal the signal to wait for
+ * @param cb the callback to call upon reception of the signal
+ * @param cls closure for the above callback
+ * @param tv how long should we wait for the signal before.  Upon this value 
the cb is
+ *          called with EV_TIMEOUT flag.  Use NULL to wait forever.
+ * @return handle for the task; NULL upon error
+ */
 struct Task *
 scheduler_add_signal (int signal, event_callback_fn cb, void *cls, 
                       const struct timeval *tv);
 
+
+/**
+ * Remove a task.  All tasks are to be removed (even after their respective
+ * callbacks are executed)
+ *
+ * @param task the task handle to remove
+ */
 void
 scheduler_remove (struct Task *task);
 
+
+/**
+ * Shutdowns the scheduler.  All pending tasks are executed (their respective
+ * callbacks will be called).  Use IS_SHUTDOWN_EVENT() to check if the 
callbacks
+ * are called upon scheduler's shutdown.  It is not possible to add any tasks
+ * after this function is called.
+ *
+ * @see IS_SHUTDOWN_EVENT
+ */
 void
 scheduler_shutdown ();
 
+
+/**
+ * Run the scheduler loop by calling the given callback.  This function returns
+ * once all tasks are finished or after a call to scheduler_shutdown() (which
+ * causes all waiting tasks to be executed)
+ *
+ * @param cb the callback to call when the scheduler is ready.  Further tasks
+ *          can be added through this callback.
+ * @return MSH_OK if all tasks are successfully executed; MSH_SYSERR upon error
+ */
 int
 scheduler_run (event_callback_fn cb, void *cls);
 
+
+/**
+ * The type of the function which is used as a callback argument to
+ * scheduler_open_socket().  The callback will be called when a socket
+ * connection is either successfully established or failed
+ *
+ * @param sockfd the socket file descriptor; upon failure its value is -1
+ * @param cls the closure for this callback as passed to 
scheduler_open_socket()
+ */
 typedef void (* socket_open_fn) (int sockfd, void *cls);
 
+
+/**
+ * Open a socket, connect it to the target address and schedule a task to be
+ * executed when the connection is ready.
+ *
+ * @param addr the target address to connect
+ * @param addrlen the length of the addr
+ * @param cb the callback to call to signal success or failure
+ * @param cls the closure for the above callback
+ * @return a handle which can be used to cancel the task to be executed when 
the
+ *           connection is ready; NULL upon error
+ */
 struct SocketOpenHandle *
 scheduler_open_socket (const struct sockaddr *addr, const socklen_t addrlen,
                        socket_open_fn cb, void *cls);
+
+
+/**
+ * Cancel a handle created with scheduler_open_socket()
+ *
+ * @param h the handle to cancel
+ */
+void
+scheduler_open_socket_cancel (struct SocketOpenHandle *h);
+
+#endif  /* SCHEDULER_H_ */
+
+/* End of scheduler.h */

Added: msh/src/test_bitmap.c
===================================================================
--- msh/src/test_bitmap.c                               (rev 0)
+++ msh/src/test_bitmap.c       2013-07-11 15:06:37 UTC (rev 27930)
@@ -0,0 +1,43 @@
+/**
+ * @file test_bitmap.c
+ * @brief testcase for bitmap 
+ * @author Sree Harsha Totakura <address@hidden> 
+ */
+
+#include "common.h"
+#include "bitmap.h"
+
+#define FAIL_TEST(cond,ret)  \
+do { if (!cond){fprintf (stderr, "Assertion failed at %s:%d\n", __FILE__, 
__LINE__); ret;} }while(0)
+
+int main ()
+{
+  struct BitMap *bm;
+  unsigned int len = 80;
+  unsigned int cnt;
+
+  bm = bitmap_create (len);
+  bitmap_set (bm, 13, 1);
+  MSH_assert (1 == bitmap_isbitset (bm, 13));
+  for (cnt = 0; cnt < len; cnt++)
+  {
+    if (cnt == 13)
+      continue;
+    FAIL_TEST (0 == bitmap_isbitset (bm, cnt), return 1);
+  }
+  bitmap_clear (bm);
+  for (cnt = 0; cnt < len; cnt++)
+  {
+    FAIL_TEST (0 == bitmap_isbitset (bm, cnt), return 1);
+  }
+  bitmap_destroy (bm);
+  len = 9;
+  bm = bitmap_create (len);
+  for (cnt = 0; cnt < len; cnt++)
+  {
+    bitmap_set (bm, cnt, 1);
+  }
+  FAIL_TEST (1 == bitmap_allset (bm), return 1);
+  bitmap_destroy (bm);
+  return 0;
+}




reply via email to

[Prev in Thread] Current Thread [Next in Thread]