gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r27799 - msh/src


From: gnunet
Subject: [GNUnet-SVN] r27799 - msh/src
Date: Mon, 8 Jul 2013 18:02:51 +0200

Author: harsha
Date: 2013-07-08 18:02:51 +0200 (Mon, 08 Jul 2013)
New Revision: 27799

Modified:
   msh/src/mshd.c
   msh/src/mtypes.h
   msh/src/test_scheduler_socket.c
   msh/src/util.c
Log:
- round interactions


Modified: msh/src/mshd.c
===================================================================
--- msh/src/mshd.c      2013-07-08 14:26:50 UTC (rev 27798)
+++ msh/src/mshd.c      2013-07-08 16:02:51 UTC (rev 27799)
@@ -2,6 +2,7 @@
 #include <mpi.h>
 #include "util.h"
 #include "scheduler.h"
+#include "mtypes.h"
 
 /**
  * The port number of our local socket
@@ -19,9 +20,9 @@
 static int rank;
 
 /**
- * Array of the string representation of our IP addresses
+ * Array of our IP addresses in network-byte format
  */
-static char **ip_addr_str;
+static in_addr_t *s_addrs;
 
 /**
  * Number of IP addresses
@@ -132,19 +133,88 @@
                              socklen_t addrlen)
 {
   char hostip[NI_MAXHOST];
+  const struct sockaddr_in *inaddr;
 
   if (sizeof (struct sockaddr_in) != addrlen)
     return MSH_OK;           /* Only consider IPv4 for now */
   if (0 !=
       getnameinfo (addr, addrlen, hostip, NI_MAXHOST, NULL, 0, NI_NUMERICHOST))
+  {  
     LOG_STRERROR ("getnameinfo");
-  MSH_array_append (ip_addr_str, nips, strdup (hostip));
+    return MSH_OK;
+  }
+  inaddr = (const struct sockaddr_in *) addr;
+  MSH_array_append (s_addrs, nips, inaddr->sin_addr.s_addr);
   LOG_DEBUG ("%d: Found IP: %s\n", rank, hostip);
   return MSH_OK;
 }
 
 
+struct ReadContext
+{
+  struct ReadContext *next;
+
+  struct ReadContext *prev;
+
+  /* struct sockaddr_in addr; */
+  
+  /* socklen_t addrlen; */
+  
+  struct Task *task;
+};
+
+static struct ReadContext *rhead;
+
+static struct ReadContext *rtail;
+
+
 /**
+ * Task to read from socket
+ *
+ * @param sock the socket
+ * @param flags EV_* flags
+ * @param cls &atask
+ */
+static void
+read_socket (evutil_socket_t sock, short flags, void *cls)
+{
+  struct ReadContext *ctx = cls;
+  ssize_t rsize;
+  uint32_t cid;
+
+  scheduler_remove (ctx->task);
+  DLL_remove (rhead, rtail, ctx);
+  free (ctx);
+  if (IS_SHUTDOWN_EVENT (flags))
+  {
+    MSH_close (sock);
+    return;
+  }
+  rsize = read (sock, &cid, sizeof (cid));
+  if (rsize < 0)
+  {
+    LOG_STRERROR ("read");  
+    goto err_ret;
+  }
+  if (rsize == 0)
+  {
+    MSH_break (0);
+    goto err_ret;
+  }
+  cid = ntohl (cid);
+  if (!barray_isset (cid))
+    barray_set (cid);
+  MSH_close (sock);
+  return;
+  
+ err_ret:
+  MSH_close (sock);
+  scheduler_shutdown ();
+  return;
+}
+
+
+/**
  * Task to call accept and close on a listening socket
  *
  * @param sock the socket
@@ -154,6 +224,9 @@
 static void
 accept_task (evutil_socket_t sock, short flags, void *cls)
 {
+  struct ReadContext *rctx;
+  int csock;
+
   scheduler_remove (atask);
   atask = NULL;
   if (IS_SHUTDOWN_EVENT (flags))
@@ -162,7 +235,82 @@
     return;
   }
   LOG_DEBUG ("Got a connect\n");
+  if (0 > (csock = accept4 (sock, NULL, NULL, SOCK_NONBLOCK | SOCK_CLOEXEC)))
+  {
+    LOG_STRERROR ("accept4");
+    MSH_close (sock);
+    scheduler_shutdown ();
+    return;
+  }
+  rctx = MSH_malloc (sizeof (struct ReadContext));
+  DLL_insert_tail (rhead, rtail, rctx);
+  rctx->task = scheduler_add_socket (csock, EV_READ, &read_socket, rctx, NULL);
+}
+
+
+static int
+receive_addresses ()
+{
+  struct MSH_MSG_VerifyAddress **rmsgs;
+  MPI_Status status;
+  int rsize;
+  int lb;
+  int up;
+  int source;
+  int ret;
+  int cnt;
+
+  ret = MSH_SYSERR;
+  rmsgs = MSH_malloc (sizeof (struct MSH_MSG_VerifyAddress *) * rwidth);
+  for (cnt=0; cnt < rwidth; cnt++)
+  {
+    MPI_Probe (MPI_ANY_SOURCE, MSH_MTYPE_VERIFY_ADDRESSES, MPI_COMM_WORLD,
+               &status);
+    MPI_Get_elements (&status, MPI_BYTE, &rsize);
+    /* We expect a message from peers with id p in the range:       
+       (rank - round * rwidth - rwidth) <= p <= (rank - (round * rwidth) -1) */
+    lb = rank - round * rwidth - rwidth + nproc;
+    up = rank - (round * rwidth) - 1 + nproc;
+    MSH_assert (lb >= 0);
+    MSH_assert (up >= 0);
+    lb %= nproc;
+    up %= nproc;
+    source = status.MPI_SOURCE;
+    if (lb == up) 
+      if (source != lb)
+      {
+        MSH_break (0);
+        LOG_ERROR ("%d: Error: source %d; lb: %d; up: %d\n", rank, source, lb, 
up);
+        goto err_ret;
+      }
+    else if ((source > up) || (source < lb))
+    {
+      MSH_break (0);
+      goto err_ret;
+    }
+    rmsgs[cnt] = MSH_malloc (rsize);
+    if (MPI_SUCCESS != MPI_Recv (rmsgs[cnt], rsize, MPI_BYTE, source,
+                                 MSH_MTYPE_VERIFY_ADDRESSES, MPI_COMM_WORLD,
+                                 MPI_STATUS_IGNORE))
+    {
+      MSH_break (0);
+      goto err_ret;
+    }
+    LOG_DEBUG ("Received message of size %d from %d\n", rsize, source);
+  }
+  /* remove this later on and do something useful */
+  for (cnt = 0; cnt < rwidth; cnt++)
+  {
+    MSH_free_non_null (rmsgs[cnt]);
+    rmsgs[cnt] = NULL;
+  }
+  ret = MSH_OK;
   
+ err_ret:
+  for (cnt = 0; cnt < rwidth; cnt++)
+    MSH_free_non_null (rmsgs[cnt]);
+  free (rmsgs);
+  return ret;
 }
 
 
@@ -173,10 +321,65 @@
  * @return MSH_OK on success; MSH_SYSERR upon error
  */
 static int
-send_addresses (int rank)
+send_receive_addresses ()
 {
-  MSH_break (0);
-  return MSH_OK;
+  struct MSH_MSG_VerifyAddress *msg;
+  struct MSH_MSG_VerifyAddress *cpys;  
+  MPI_Request *sreqs;
+  size_t msize;
+  int cnt;
+  int ret;
+  int target;
+
+  msize = sizeof (struct MSH_MSG_VerifyAddress) + (nips * sizeof (uint32_t));
+  msg = MSH_malloc (msize);
+  msg->header.size = htons (msize);
+  msg->port = htons (lport);
+  msg->nips = htons (nips);
+  for (cnt = 0; cnt < nips; cnt++)
+    msg->ipaddrs[cnt] = (uint32_t) s_addrs[cnt]; /* IPs already in NB */
+  cpys = NULL;
+  cpys = MSH_malloc (msize * rwidth);
+  sreqs = MSH_malloc (rwidth);
+  for (cnt=0; cnt < rwidth; cnt++)
+  {    
+    (void) memcpy (&cpys[cnt], msg, msize);
+    target = (round * rwidth) + cnt + 1;
+    MSH_assert (target < nproc);
+    target = (rank + target) % nproc;
+    ret = MPI_Isend (&cpys[cnt], msize, MPI_BYTE, target, 
+                     MSH_MTYPE_VERIFY_ADDRESSES, MPI_COMM_WORLD, &sreqs[cnt]);
+    if (MPI_SUCCESS != ret)
+      break;
+  }
+  free (msg);
+  msg = NULL;
+  if (cnt != rwidth)
+  {
+    for (cnt--; cnt >= 0; cnt--)
+    {
+      MSH_break (MPI_SUCCESS == MPI_Cancel (&sreqs[cnt]));
+      MSH_break (MPI_SUCCESS == MPI_Wait (&sreqs[cnt], MPI_STATUS_IGNORE));
+    }
+    goto end;
+  }
+  for (cnt=0; cnt < rwidth; cnt++)
+  {
+    MSH_break (MPI_SUCCESS == MPI_Wait (&sreqs[cnt], MPI_STATUS_IGNORE));    
+  }
+  LOG_DEBUG ("%d: Round: %d -- All messages sent successfully\n", rank, round);
+  if (NULL != cpys)
+  {    
+    free (cpys);
+    cpys = NULL;
+  }
+  if (MSH_SYSERR == receive_addresses ())
+    goto end;
+  
+ end:
+  MSH_free_non_null (cpys);
+  MSH_free_non_null (sreqs);  
+  return (MPI_SUCCESS == ret) ? MSH_OK : MSH_SYSERR;
 }
 
 
@@ -192,7 +395,6 @@
   struct sockaddr_in addr;
   socklen_t addrlen;
   int sock;
-  unsigned int cnt;
 
   addrlen = sizeof (struct sockaddr_in);
   (void) memset (&addr, 0, addrlen);
@@ -211,15 +413,14 @@
     MSH_break (0);
     goto clo_ret;
   }
-  for (cnt = 0; cnt < rwidth; cnt++)
-    if (MSH_SYSERR == send_addresses ((round * rwidth) + cnt))
-      goto clo_ret;
+  if (MSH_SYSERR == send_receive_addresses ())
+    goto clo_ret;
+  atask = scheduler_add_socket (sock, EV_READ, &accept_task, &atask, NULL);
   if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
   {
     MSH_break (0);
     goto clo_ret;
   }
-  atask = scheduler_add_socket (sock, EV_READ, &accept_task, &atask, NULL);
   return MSH_OK;
 
  clo_ret:
@@ -267,12 +468,16 @@
 static void
 schedule_next_round ()
 {
+  int trounds;
+
   MSH_assert (NULL == rtask);
-  if (round < ( (nproc + (rwidth - 1)) / rwidth) )
-  {
-    round++;
+  /* Number of rounds required to contact all processes except ourselves 
(rwidth
+     in parallel in each round) */
+  trounds = ((nproc - 1) + (rwidth - 1)) / rwidth;
+  if (round < trounds)
     rtask = scheduler_add (&run_round, NULL, TV_IMMEDIATE);
-  }
+  else
+    LOG_DEBUG ("Verification phase complete; commencing reduction phase\n");
 }
 
 
@@ -291,7 +496,10 @@
   if (IS_SHUTDOWN_EVENT (flags))
     return;
   if (MSH_OK == verify_addresses ())
+  {
+    round++;
     schedule_next_round ();
+  }
 }
 
 
@@ -310,16 +518,58 @@
                                            &sigshut_tasks[0], NULL);
   sigshut_tasks[1] = scheduler_add_signal (SIGTERM, &sig_shutdown,
                                            &sigshut_tasks[1], NULL);
-  rtask = scheduler_add (&run_round, NULL, TV_IMMEDIATE);
+  //rtask = scheduler_add (&run_round, NULL, TV_IMMEDIATE);
+  schedule_next_round ();
 }
 
 
+static void
+print_help ()
+{
+  char *msg = 
+"mshd: MSH daemon.\n" 
+"This binary is a part of "PACKAGE_NAME "-" PACKAGE_VERSION " available from " 
PACKAGE_URL ".\n"
+"This program takes the following options:\n"
+" -w num\t: \t The number of processes which verify at each round.\n"
+" -h \t: \t Print this help\n"
+"Report bugs to " PACKAGE_BUGREPORT "\n"
+      ;
+  
+  fprintf (stderr, "%s", msg);
+}
+
 int 
 main (int argc, char **argv)
 {
   int ret;
+  int c;
 
   ret = 1;
+  rwidth = 1;
+  
+  while (-1 != (c = getopt (argc, argv, "hw:")))
+  {
+    switch (c)
+    {
+    case 'w':
+      if (1 != sscanf (optarg, "%u", rwidth))
+      {
+        LOG_ERROR ("-w option requires an unsinged number argument.\n");
+        print_help ();
+        return 1;
+      }
+      break;
+    case 'h':
+      print_help ();
+      return 0;
+    case '?':
+      print_help();
+      return 1;
+    default:
+      printf ("Unknown option: %c\n", c);
+      MSH_assert (0);
+    }
+  }
   if (MPI_SUCCESS != MPI_Init(&argc, &argv))
   {
     LOG_ERROR ("Failed to initialise MPI\n");
@@ -330,6 +580,11 @@
     LOG_ERROR ("Cannot determine the number of mshd processes\n");
     goto fail;
   }
+  if (nproc <= rwidth)
+  {
+    LOG_ERROR ("Given round width is greater than or equal to number of mshd 
processes\n");
+    goto fail;
+  }
   if (MPI_SUCCESS != MPI_Comm_rank (MPI_COMM_WORLD, &rank))
   {
     LOG_ERROR ("Cannot determine our MPI rank\n");
@@ -350,10 +605,11 @@
   }
   barray_destroy ();
   ret = 0;
-
+  
  fail:
   MSH_break (MPI_SUCCESS == MPI_Finalize());
-  MSH_free_non_null (ip_addr_str);
-  //libevent_global_shutdown ();  
+  MSH_free_non_null (s_addrs);
+  //libevent_global_shutdown ();
+  LOG_ERROR ("Returning\n");
   return ret;
 }

Modified: msh/src/mtypes.h
===================================================================
--- msh/src/mtypes.h    2013-07-08 14:26:50 UTC (rev 27798)
+++ msh/src/mtypes.h    2013-07-08 16:02:51 UTC (rev 27799)
@@ -38,6 +38,8 @@
   /**
    * IPv4 addresses to follow as 32 bit unsigned integeters
    */
+  uint32_t ipaddrs[0];
+
 #if 0
   /* Internet address.  */
   typedef uint32_t in_addr_t;
@@ -59,7 +61,7 @@
                           sizeof (struct in_addr)];
   };
 #endif
-}
+};
 
 
 /*********************************************************************

Modified: msh/src/test_scheduler_socket.c
===================================================================
--- msh/src/test_scheduler_socket.c     2013-07-08 14:26:50 UTC (rev 27798)
+++ msh/src/test_scheduler_socket.c     2013-07-08 16:02:51 UTC (rev 27799)
@@ -64,6 +64,7 @@
     LOG_STRERROR ("accept4");
     MSH_close (lsock);
     scheduler_shutdown ();
+    return;
   }
   if (2 == ++result)
     scheduler_shutdown ();

Modified: msh/src/util.c
===================================================================
--- msh/src/util.c      2013-07-08 14:26:50 UTC (rev 27798)
+++ msh/src/util.c      2013-07-08 16:02:51 UTC (rev 27799)
@@ -71,7 +71,7 @@
   }
   else
   {
-    MSH_malloc (size);
+    tmp = MSH_malloc (size);
     if (*oldCount > newCount)
       *oldCount = newCount;     /* shrink is also allowed! */
     memcpy (tmp, *old, elementSize * (*oldCount));




reply via email to

[Prev in Thread] Current Thread [Next in Thread]