gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r30003 - msh/src


From: gnunet
Subject: [GNUnet-SVN] r30003 - msh/src
Date: Tue, 8 Oct 2013 19:33:28 +0200

Author: harsha
Date: 2013-10-08 19:33:28 +0200 (Tue, 08 Oct 2013)
New Revision: 30003

Added:
   msh/src/mshd2.c
Modified:
   msh/src/mshd-server.c
   msh/src/mshd.c
   msh/src/mshd_pmonitor.c
Log:
- fixes and new mshd2


Modified: msh/src/mshd-server.c
===================================================================
--- msh/src/mshd-server.c       2013-10-08 17:18:31 UTC (rev 30002)
+++ msh/src/mshd-server.c       2013-10-08 17:33:28 UTC (rev 30003)
@@ -118,10 +118,6 @@
    */
   GNUNET_SCHEDULER_TaskIdentifier fin_task;
 
-  /**
-   * the file descriptor associated with the client connection
-   */
-  int conn_fd;
 };
 
 
@@ -805,14 +801,12 @@
  * @param conn the connection to derive the client from
  */
 void
-daemon_server_add_connection (struct GNUNET_CONNECTION_Handle *conn, 
-                              int conn_fd)
+daemon_server_add_connection (struct GNUNET_CONNECTION_Handle *conn)
 {  
   struct ExecCtx *exec_ctx;
   struct GNUNET_SERVER_Client *client;
 
   exec_ctx = GNUNET_malloc (sizeof (struct ExecCtx));
-  exec_ctx->conn_fd = conn_fd;
   client = GNUNET_SERVER_connect_socket (daemon_serv, conn);
   exec_ctx->client = client;
   GNUNET_SERVER_client_set_user_context (client, exec_ctx);

Modified: msh/src/mshd.c
===================================================================
--- msh/src/mshd.c      2013-10-08 17:18:31 UTC (rev 30002)
+++ msh/src/mshd.c      2013-10-08 17:33:28 UTC (rev 30003)
@@ -408,13 +408,11 @@
     {
       struct GNUNET_NETWORK_Handle *client_sock;
       struct GNUNET_CONNECTION_Handle *client_conn;
-      int client_sock_fd;
       
       LOG_DEBUG ("Got a command execution connection\n");
       client_sock = GNUNET_NETWORK_socket_accept (listen_socket, NULL, NULL);
-      client_sock_fd = GNUNET_NETWORK_get_fd (client_sock);
       client_conn = GNUNET_CONNECTION_create_from_existing (client_sock);
-      daemon_server_add_connection (client_conn, client_sock_fd);
+      daemon_server_add_connection (client_conn);
     }
     break;
   default:

Copied: msh/src/mshd2.c (from rev 29987, msh/src/mshd.c)
===================================================================
--- msh/src/mshd2.c                             (rev 0)
+++ msh/src/mshd2.c     2013-10-08 17:33:28 UTC (rev 30003)
@@ -0,0 +1,1287 @@
+/**
+ * @file mshd.c
+ * @brief implementation of the MSH Daemon
+ * @author Sree Harsha Totakura <address@hidden> 
+ */
+
+#include "common.h"
+#include <gnunet/gnunet_util_lib.h>
+#include <mpi.h>
+#include "util.h"
+#include "mtypes.h"
+#include "bitmap.h"
+#include "addressmap.h"
+
+#define LOG(kind,...)                           \
+  GNUNET_log (kind, __VA_ARGS__)
+
+#define LOG_DEBUG(...) LOG(GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
+
+#define LOG_ERROR(...) LOG(GNUNET_ERROR_TYPE_ERROR, __VA_ARGS__)
+
+#define LOG_STRERROR(kind,cmd)                  \
+  GNUNET_log_from_strerror (kind, "mshd", cmd)
+
+/**
+ * Polling interval for checking termination signal
+ */
+#define POLL_SHUTDOWN_INTERVAL                  \
+  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 500)
+
+/**
+ * Context for verifying addresses
+ */
+struct VerifyAddressesCtx
+{
+  /**
+   * The DLL next ptr
+   */
+  struct VerifyAddressesCtx *next;
+
+  /**
+   * The DLL prev ptr
+   */
+  struct VerifyAddressesCtx *prev;
+
+  /**
+   * The instance addresses
+   */
+  struct InstanceAddrInfo *iainfo;
+
+  /**
+   * The connection handle to the received instance address
+   */
+  struct GNUNET_CONNECTION_Handle *conn;
+
+  /**
+   * The transmit handle for the above connection
+   */
+  struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
+
+  /**
+   * task to close the connection
+   */
+  GNUNET_SCHEDULER_TaskIdentifier close_task;
+
+  /**
+   * state for the context 
+   */
+  enum {
+    VERIFY_ADDRESS_CTX_WRITE,
+
+    VERIFY_ADDRESS_CTX_CLOSE
+  } state;
+
+  /**
+   * the ip address
+   */
+  in_addr_t ip;
+
+  /**
+   * the port number
+   */
+  uint16_t port;
+
+};
+
+
+/**
+ * Context information for reading from incoming connections
+ */
+struct ReadContext
+{
+  /**
+   * next pointer for DLL
+   */
+  struct ReadContext *next;
+
+  /**
+   * prev pointer for DLL
+   */
+  struct ReadContext *prev;
+
+  /**
+   * The connection
+   */
+  struct GNUNET_CONNECTION_Handle *conn;
+
+  /**
+   * are we waiting for a read on the above connection
+   */
+  int in_receive;
+};
+
+
+/**
+ * The mode of the current listen socket; 
+ */
+enum ListenMode
+{
+  /**
+   * Mode in which the listen socket accepts connections from other instances
+   * and closes them immediately after reading some data.  The incoming
+   * connections are used to verify which IP addresses of this instance are
+   * reachable from other instances
+   */
+  MODE_PROBE,
+
+  /**
+   * In this mode the listen socket accepts requests for starting remote 
processes
+   */
+  MODE_SERV,
+
+  /**
+   * Simple worker mode.  No listening is done.
+   */
+  MODE_WORKER,
+
+  /**
+   * Worker mode with protocol.
+   */
+  MODE_PROTOWORKER
+
+} mode;
+
+
+/**
+ * Mapping for instance addresses
+ */
+AddressMap *addrmap;
+
+/**
+ * Reverse mapping of the address map
+ */
+struct ReverseAddressMap *rmap;
+
+/**
+ * Rank of this process
+ */
+int rank;
+
+/**
+ * width of the round -- how many other mshd instances verify our IP addresses
+ * in a round
+ */
+unsigned int rwidth;
+
+/**
+ * The number of total mshd processes 
+ */
+int nproc;
+
+
+/****************************/
+/*     static variables     */
+/****************************/
+
+/**
+ * DLL head for address verification contexts
+ */
+static struct VerifyAddressesCtx *vactx_head;
+
+/**
+ * DLL tail for address verification contexts
+ */
+static struct VerifyAddressesCtx *vactx_tail;
+
+/**
+ * Array of our IP addresses in network-byte format
+ */
+static in_addr_t *s_addrs;
+
+/**
+ * network handle for the listen socket
+ */
+static struct GNUNET_NETWORK_Handle *listen_socket;
+
+/**
+ * The process handle of the process started by instance running with rank 0
+ */
+static struct GNUNET_OS_Process *proc;
+
+/**
+ * Task for running a round
+ */
+static GNUNET_SCHEDULER_TaskIdentifier rtask;
+
+/**
+ * Task for asynchronous accept on the socket
+ */
+static GNUNET_SCHEDULER_TaskIdentifier atask;
+
+/**
+ * Task for finalising a round
+ */
+static GNUNET_SCHEDULER_TaskIdentifier finalise_task;
+
+/**
+ * Task for waiting for a shutdown signal
+ */
+static GNUNET_SCHEDULER_TaskIdentifier sigread_task;
+
+/**
+ * Bitmap for checking which MPI processes have verified our addresses in the
+ * current round
+ */
+static struct BitMap *bitmap;
+
+/**
+ * Instances addresses learnt in the current round
+ */
+struct InstanceAddrInfo **riainfos;
+
+/**
+ * head for read context DLL
+ */
+static struct ReadContext *rhead;
+
+/**
+ * tail for read context DLL
+ */
+static struct ReadContext *rtail;
+
+/**
+ * arguments representing the command to run and its arguments
+ */
+static char **run_args;
+
+/**
+ * the process handle for the command to run
+ */
+static struct GNUNET_OS_Process *process;
+
+/**
+ * The path of the unix domain socket we use for communication with local MSH 
clients
+ */
+static char *unixpath;
+
+/**
+ * The file where the addresses of available hosts are written to
+ */
+static char *hostsfile;
+
+/**
+ * shutdown task
+ */
+GNUNET_SCHEDULER_TaskIdentifier shutdown_task;
+
+/**
+ * Shutdown polling task
+ */
+GNUNET_SCHEDULER_TaskIdentifier poll_shutdown_task;
+
+/**
+ * Random hashcode for authentication
+ */
+struct GNUNET_HashCode shash;
+
+/**
+ * Number of IP addresses
+ */
+static unsigned int nips;
+
+/**
+ * Current IP verification round 
+ */
+static unsigned int current_round;
+
+/**
+ * Do we have to create a pty
+ */
+static int need_pty;
+
+/**
+ * The port number of our local socket
+ */
+uint16_t listen_port;
+
+
+/**
+ * Perform cleanup for shutdown
+ *
+ * @param cls NULL
+ * @param tc scheduler task context
+ */
+static void
+do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  shutdown_task = GNUNET_SCHEDULER_NO_TASK;
+  switch (mode)
+  {
+  case MODE_PROBE:
+    break;
+  case MODE_SERV:
+    shutdown_local_server ();
+    MSH_pmonitor_shutdown ();
+    break;
+  case MODE_WORKER:
+    break;
+  case MODE_PROTOWORKER:
+    shutdown_daemon_server ();
+    break;
+  }
+  if (GNUNET_SCHEDULER_NO_TASK != accept_task)
+  {
+    GNUNET_SCHEDULER_cancel (accept_task);
+    accept_task = GNUNET_SCHEDULER_NO_TASK;
+  }
+  if (NULL != listen_socket)
+  {
+    GNUNET_NETWORK_socket_close (listen_socket);
+    listen_socket = NULL;
+  }
+  if (NULL != bitmap)
+  {
+    bitmap_destroy (bitmap);
+    bitmap = NULL;
+  }
+  if (NULL != addrmap)
+  {
+    addressmap_destroy (addrmap);
+    addressmap = NULL;
+  }
+  if (NULL != rmap)
+  {
+    reverse_map_destroy (rmap);
+    rmap = NULL;
+  }
+  GNUNET_free_non_null (s_addrs);
+  s_addrs = NULL;
+  if (NULL != run_args)
+  {
+    free_argv (run_args);
+    run_args = NULL;
+  }
+  GNUNET_free_non_null (unixpath);
+  unixpath = NULL;
+  if (NULL != hostsfile)
+  {
+    (void) unlink (hostsfile);
+    GNUNET_free (hostsfile);
+    hostsfile = NULL;
+  }
+}
+
+
+/**
+ * Callback function invoked for each interface found.
+ *
+ * @param cls closure
+ * @param name name of the interface (can be NULL for unknown)
+ * @param isDefault is this presumably the default interface
+ * @param addr address of this interface (can be NULL for unknown or 
unassigned)
+ * @param broadcast_addr the broadcast address (can be NULL for unknown or 
unassigned)
+ * @param netmask the network mask (can be NULL for unknown or unassigned))
+ * @param addrlen length of the address
+ * @return GNUNET_OK to continue iteration, GNUNET_SYSERR to abort
+ */
+static int net_if_processor (void *cls, const char *name,
+                             int isDefault,
+                             const struct sockaddr *addr,
+                             const struct sockaddr *broadcast_addr,
+                             const struct sockaddr *netmask, 
+                             socklen_t addrlen)
+{
+  char *hostip;
+  in_addr_t ip;  
+  const struct sockaddr_in *inaddr;
+
+  if (sizeof (struct sockaddr_in) != addrlen)
+    return GNUNET_OK;           /* Only consider IPv4 for now */
+  inaddr = (const struct sockaddr_in *) addr;
+  ip = ntohl (inaddr->sin_addr.s_addr);
+  if (127 == ip >> 24)          /* ignore loopback addresses */
+    return GNUNET_OK;
+  GNUNET_array_append (s_addrs, nips, ip);
+  LOG_DEBUG ("%d: Found IP: %s\n", rank, ip2str (ip));
+  addressmap_add (addrmap, rank, listen_port, ip);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Callback function for data received from the network.  Note that
+ * both "available" and "err" would be 0 if the read simply timed out.
+ *inaddr->sin_addr.s_addrinaddr->sin_addr.s_addr
+ * @param cls the read context
+ * @param buf pointer to received data
+ * @param available number of bytes availabe in "buf",
+ *        possibly 0 (on errors)
+ * @param addr address of the sender
+ * @param addrlen size of addr
+ * @param errCode value of errno (on receiving errors)
+ */
+static void
+conn_reader(void *cls, const void *buf, size_t available,
+            const struct sockaddr * addr, socklen_t addrlen, int errCode)
+{
+  struct ReadContext *rc = cls;
+  uint32_t cid;
+
+  if (0 == available)
+  {
+    GNUNET_break (0);
+    goto clo_ret;
+  }
+  if ((NULL == buf) || (0 == available))
+    goto clo_ret;
+  (void) memcpy (&cid, buf, sizeof (uint32_t));
+  cid = ntohl (cid);
+  LOG_DEBUG ("%d: read id %u from connection\n", rank, cid);
+
+ clo_ret:
+  GNUNET_CONTAINER_DLL_remove (rhead, rtail, rc);
+  GNUNET_CONNECTION_destroy (rc->conn);
+  GNUNET_free (rc);
+}
+
+
+/**
+ * Fork a worker process.  This process sets up a PTY if needed, forks a child
+ * which exec's the binary to start and manages the communication between the
+ * binary and network if given a network connection.
+ */
+static pid_t
+spawn_worker (int do_protocol)
+{
+  struct GNUNET_NETWORK_Handle *sock;
+  struct GNUNET_CONNECTION_Handle *conn;
+  pid_t ret;
+
+  ret = fork ();
+  if (0 != ret)
+    return ret;
+  /* Child process continues here */
+  if (do_protocol)
+  {
+    GNUNET_assert (MODE_SERV == mode);
+    GNUNET_assert (NULL != listen_socket);
+    sock = GNUNET_NETWORK_socket_accept (listen_socket, NULL, NULL);
+    conn = GNUNET_CONNECTION_create_from_existing (sock);
+  }
+  GNUNET_SCHEDULER_cancel (shutdown_task);
+  shutdown_task = GNUNET_SCHEDULER_NO_TASK;
+  do_shutdown (NULL, NULL);
+  mode = MODE_WORKER;
+  if (do_protocol)
+  {
+    mode = MODE_PROTOWORKER;
+    init_daemon_server ();
+    daemon_server_add_connection (conn);
+  }
+  return 0;
+}
+
+
+/**
+ * Task to call accept and close on a listening socket
+ *
+ * @param cls NULL
+ * @param tc the scheduler task context
+ */
+static void
+accept_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct ReadContext *rctx;
+  struct GNUNET_CONNECTION_Handle *conn;
+  pid_t pid;
+  int csock;
+
+  atask = GNUNET_SCHEDULER_NO_TASK;
+  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+  {
+    goto clo_ret;
+  }
+  switch (mode)
+  {
+  case MODE_PROBE:
+    LOG_DEBUG ("%d: Got a probe connect\n", rank);
+    conn = GNUNET_CONNECTION_create_from_accept (NULL, NULL, listen_socket);
+    if (NULL == conn)
+    {
+      GNUNET_break (0);
+      goto clo_ret;
+    }
+    rctx = GNUNET_malloc (sizeof (struct ReadContext));
+    rctx->conn = conn;
+    rctx->in_receive = GNUNET_YES;
+    GNUNET_CONNECTION_receive (rctx->conn, sizeof (unsigned int),
+                               GNUNET_TIME_UNIT_FOREVER_REL, conn_reader, 
rctx);
+    GNUNET_CONTAINER_DLL_insert_tail (rhead, rtail, rctx);    
+    break;
+  case MODE_SERV:
+    pid = spawn_worker (NULL);
+    if (-1 == pid)
+    {
+      GNUNET_break (0);
+      GNUNET_SCHEDULER_shutdown (0);
+      goto clo_ret;
+    }
+    if (0 == pid)               /* state is cleared and hence we return */
+      return;
+    break;
+  case MODE_WORKER:
+  case MODE_PROTOWORKER:
+    GNUNET_assert (0);
+  }
+  atask = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
+                                         listen_socket, &accept_task, NULL);
+  return;
+
+ clo_ret:
+  GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (listen_socket));
+  listen_socket = NULL;
+}
+
+
+/**
+ * Task to check if we received a shutdown signal through MPI message from
+ * instance 0.  This task is to be run every 500ms
+ *
+ * @param cls NULL
+ * @param tc scheduler task context
+ */
+static void
+poll_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  MPI_Status status;
+  int flag;
+
+  poll_shutdown_task = GNUNET_SCHEDULER_NO_TASK;
+  if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+    return;
+  flag = 0;
+  if (MPI_SUCCESS != MPI_Iprobe(0, MSH_MTYPE_SHUTDOWN, MPI_COMM_WORLD, &flag,
+                                MPI_STATUS_IGNORE))
+  {
+    GNUNET_break (0);
+    goto reschedule;
+  }
+  if (0 == flag)
+    goto reschedule;
+  LOG_DEBUG ("Got termination signal.  Shutting down\n");
+  GNUNET_SCHEDULER_shutdown (); /* We terminate */
+  return;
+
+ reschedule:
+  poll_shutdown_task = GNUNET_SCHEDULER_add_delayed (POLL_SHUTDOWN_INTERVAL,
+                                                     &poll_shutdown, NULL);
+}
+
+
+/**
+ * Sends termination signal to all other instances through MPI messaging
+ */
+static void
+send_term_signal ()
+{
+  unsigned int cnt;
+  MPI_Request *req;
+
+  /* We broadcase termination signal.  Can't use MPI_Bcast here... */
+  req = GNUNET_malloc (sizeof (MPI_Request) * (nproc - 1));
+  for (cnt = 1; cnt < nproc; cnt++)
+  {
+    GNUNET_assert (MPI_SUCCESS ==
+                   MPI_Isend (&cnt, 1, MPI_INT, cnt, MSH_MTYPE_SHUTDOWN, 
+                              MPI_COMM_WORLD, &req[cnt - 1]));    
+  }
+  GNUNET_assert (MPI_SUCCESS == MPI_Waitall (nproc - 1, req,
+                                             MPI_STATUSES_IGNORE));
+  GNUNET_free (req);
+}
+
+
+/**
+ * Callbacks of this type can be supplied to MSH_monitor_process() to be
+ * notified when the corresponding processes exits.
+ *
+ * @param cls the closure passed to MSH_monitor_process()
+ * @param type the process status type
+ * @param long the return/exit code of the process
+ */
+static void
+proc_exit_cb (void *cls, enum GNUNET_OS_ProcessStatusType type, int code)
+{
+  GNUNET_OS_process_destroy (proc);
+  proc = NULL;
+  LOG (GNUNET_ERROR_TYPE_INFO, "Main process died.  Exiting.\n"); 
+  GNUNET_SCHEDULER_shutdown ();
+  send_term_signal ();
+}
+
+
+/**
+ * Task for running a round
+ *
+ * @param cls NULL
+ * @param tc scheduler task context
+ */
+static void
+run_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Schedules next round.  If all the rounds are completed, call the next 
+ */
+static void
+schedule_next_round ()
+{
+  pid_t pid;
+  int total_rounds;
+
+  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == rtask);
+  /* Number of rounds required to contact all processes except ourselves 
(rwidth
+     in parallel in each round) */
+  total_rounds = ((nproc - 1) + (rwidth - 1)) / rwidth;
+  if (current_round < total_rounds)
+  {
+    rtask = GNUNET_SCHEDULER_add_now (&run_round, NULL);
+    return;
+  }
+  if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
+  {
+    GNUNET_break (0);
+    return;
+  }
+  LOG_DEBUG ("Verification phase complete; commencing reduction phase\n");
+  GNUNET_break (GNUNET_OK == reduce_ntree ());
+  addressmap_print (addrmap);
+  rmap = addressmap_create_reverse_mapping (addrmap);
+  pid = getpid ();
+  GNUNET_assert (0 < asprintf (&unixpath, "%ju.sock", (intmax_t) pid));
+  setenv (MSHD_SOCK_NAME, unixpath, 1);
+  hostsfile = GNUNET_DISK_mktemp ("MSHD_HOSTS");
+  if (GNUNET_OK != addressmap_write_hosts (addrmap, hostsfile))
+  {
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  }
+  setenv (MSHD_HOSTSFILE, hostsfile, 1);
+  init_local_server (unixpath);
+  MSH_pmonitor_init ();
+  mode = MODE_SERV;
+  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == atask);
+  atask = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
+                                           listen_socket, &accept_task, NULL);
+  if (0 == rank)
+  {
+    pid = spawn_worker (0);
+    if (-1 == pid)
+    {
+      GNUNET_break (0);
+      GNUNET_SCHEDULER_shutdown (0);
+      return;
+    }
+    if (0 != pid)
+    {
+      MSH_monitor_process_pid (proc, &proc_exit_cb, NULL);
+      goto end;
+    }
+    if (reverse_connect)
+      do_reverse_connect ();
+    if (need_pty)
+      create_pty ();
+    fork_and_exec (run_args[0]);
+    return;
+  }
+ end:
+  poll_shutdown_task = GNUNET_SCHEDULER_add_delayed (POLL_SHUTDOWN_INTERVAL,
+                                                     &poll_shutdown, NULL);
+}
+
+
+/**
+ * Cleans up the address verification context
+ *
+ * @param ctx the context
+ */
+static void
+cleanup_verifyaddressctx (struct VerifyAddressesCtx *ctx)
+{
+  if (GNUNET_SCHEDULER_NO_TASK != ctx->close_task)
+    GNUNET_SCHEDULER_cancel (ctx->close_task);
+  if (NULL != ctx->conn)
+    GNUNET_CONNECTION_destroy (ctx->conn);
+  GNUNET_CONTAINER_DLL_remove (vactx_head, vactx_tail, ctx);  
+  GNUNET_free (ctx);  
+}
+
+
+/**
+ * Finalise a round by freeing the resources used by it, cancel the accept task
+ * and schedule next round
+ *
+ * @param cls NULL
+ * @param tc scheduler task context
+ */
+static void
+finalise_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct VerifyAddressesCtx *ctx;
+  unsigned int cnt;
+
+  finalise_task = GNUNET_SCHEDULER_NO_TASK;
+  GNUNET_SCHEDULER_cancel (atask);
+  atask = GNUNET_SCHEDULER_NO_TASK;
+  while (NULL != (ctx = vactx_head))
+  {
+    cleanup_verifyaddressctx (ctx);
+  }
+  for (cnt = 0; cnt < rwidth; cnt++)
+    instance_address_info_destroy (riainfos[cnt]);
+  if (1 != bitmap_allset (bitmap))
+  {
+    LOG_ERROR ("Could not verify addresses of all hosts\n");
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  }
+  current_round++;
+  schedule_next_round ();
+}
+
+
+/**
+ * Task for closing a connection
+ *
+ * @param cls the verify address context
+ * @param tc the scheduler task context
+ */
+static void
+conn_close_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct VerifyAddressesCtx *ctx = cls;
+  int lb;
+  int source;
+  int off;
+
+  ctx->close_task = GNUNET_SCHEDULER_NO_TASK;
+  lb = rank - (current_round * rwidth) - rwidth + nproc;
+  GNUNET_assert (0 <= lb);
+  lb %= nproc;
+  source = instance_address_info_get_rank (ctx->iainfo);
+  if (lb <= source)
+    off = source - lb;
+  else
+    off = nproc - lb + source;
+  bitmap_set (bitmap, off, 1);
+  addressmap_add (addrmap, instance_address_info_get_rank (ctx->iainfo),
+                  ctx->port, ctx->ip);
+  cleanup_verifyaddressctx (ctx);
+}
+
+
+/**
+ * Function called to notify a client about the connection
+ * begin ready to queue more data.  "buf" will be
+ * NULL and "size" zero if the connection was closed for
+ * writing in the meantime.
+ *
+ * @param cls closure
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t 
+conn_write_cb (void *cls, size_t size, void *buf)
+{
+  struct VerifyAddressesCtx *ctx = cls;
+  size_t rsize;
+  uint32_t rank_;
+
+  ctx->transmit_handle = NULL;
+  rsize = 0;
+  if ((NULL == buf) || (0 == size))
+  {
+    goto clo_ret;
+  }
+  if (size < sizeof (uint32_t))
+  {
+    GNUNET_break (0);
+    goto clo_ret;
+  }
+  switch (ctx->state)
+  {
+  case VERIFY_ADDRESS_CTX_WRITE:
+    rank_ = htonl (rank);
+    rsize = sizeof (uint32_t);
+    (void) memcpy (buf, &rank_, rsize);
+    ctx->transmit_handle =
+        GNUNET_CONNECTION_notify_transmit_ready (ctx->conn, 0,
+                                                 GNUNET_TIME_UNIT_FOREVER_REL,
+                                                 &conn_write_cb, ctx);
+    ctx->state = VERIFY_ADDRESS_CTX_CLOSE;
+    return rsize;
+  case VERIFY_ADDRESS_CTX_CLOSE:
+    ctx->close_task = GNUNET_SCHEDULER_add_now (&conn_close_task, ctx);
+    GNUNET_CONNECTION_destroy (ctx->conn);
+    ctx->conn = NULL;
+    return 0;
+  default:
+    GNUNET_assert (0);
+  }
+
+ clo_ret:
+  cleanup_verifyaddressctx (ctx);
+  return size;
+}
+
+
+static unsigned int bmx;
+
+static int
+address_iterator_cb (void *cls, uint16_t port, in_addr_t ip)
+{
+  struct VerifyAddressesCtx *ctx;
+  struct InstanceAddrInfo *iainfo = cls;
+  struct sockaddr_in in_addr;;
+
+  LOG_DEBUG ("%d: \t %d Opening connection to: %s\n", rank, bmx++, ip2str 
((uint32_t) ip) );
+  in_addr.sin_family = AF_INET;
+  in_addr.sin_port = htons (port);
+  in_addr.sin_addr.s_addr = htonl ((uint32_t) ip);
+  ctx = GNUNET_malloc (sizeof (struct VerifyAddressesCtx));
+  ctx->conn = 
+      GNUNET_CONNECTION_create_from_sockaddr (AF_INET, 
+                                              (const struct sockaddr *)
+                                              &in_addr, 
+                                              sizeof (struct sockaddr_in));
+  if (NULL == ctx->conn)
+  {
+    GNUNET_break (0);
+    free (ctx);
+    return GNUNET_SYSERR;
+  }
+  ctx->port = port;
+  ctx->ip = ip;
+  ctx->iainfo = iainfo;
+  ctx->state = VERIFY_ADDRESS_CTX_WRITE;
+  GNUNET_CONTAINER_DLL_insert_tail (vactx_head, vactx_tail, ctx);
+  ctx->transmit_handle = 
+      GNUNET_CONNECTION_notify_transmit_ready (ctx->conn, sizeof (uint32_t),
+                                               GNUNET_TIME_UNIT_FOREVER_REL,
+                                               &conn_write_cb, ctx);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Verify the addresses of an instance by connecting to the instance's listen
+ * socket
+ *
+ * @param iainfo the instance's address information
+ * @return GNUNET_OK upon success initialisation of the connection to 
instance's
+ *           listen socket (this does not mean that the connection is
+ *           established or an address is verified); GNUNET_SYSERR upon error
+ */
+static int
+verify_addresses (struct InstanceAddrInfo *iainfo)
+{
+  
+  struct InstanceAddr *iaddr;
+  
+  bmx = 0;
+  if (GNUNET_OK != instance_address_info_iterate_addresses (iainfo,
+                                                            
&address_iterator_cb,
+                                                            iainfo))
+    return GNUNET_SYSERR;
+  return GNUNET_OK;
+}
+
+
+/**
+ * Parse a verfication message from a source for its address information
+ *
+ * @param msg the message to parse
+ * @param source the MPI id of the instance which has sent this message
+ * @return the instance's address information
+ */
+static struct InstanceAddrInfo *
+parse_verify_address_msg (struct MSH_MSG_VerifyAddress *msg, int source)
+{
+  struct InstanceAddr *iaddr;
+  struct InstanceAddrInfo *iainfo;
+  size_t size;
+  uint16_t nips;
+  uint16_t cnt;
+
+  size = ntohs (msg->header.size);
+  nips = ntohs (msg->nips);
+  if (size != (sizeof (struct MSH_MSG_VerifyAddress) 
+               + (sizeof (uint32_t) * nips)))
+  {
+    LOG_ERROR ("Parsing failed\n");
+    return NULL;
+  }
+  iainfo = instance_address_info_create (source);
+  for (cnt = 0; cnt < nips; cnt++)
+  {
+    LOG_DEBUG ("%d: Parsed address: %s\n", rank, ip2str ((in_addr_t) ntohl 
(msg->ipaddrs[cnt])));
+    iaddr = instance_address_create_sockaddr_in (ntohs (msg->port),
+                                                 (in_addr_t) ntohl 
(msg->ipaddrs[cnt]));
+    GNUNET_break (GNUNET_OK == instance_address_info_add_address (iainfo, 
iaddr));
+  }
+  return iainfo;
+}
+
+
+/**
+ * Receives the IP addresses to verify in the current round from instances
+ *
+ * @return an array containing the instance addresses; NULL upon a receive 
error
+ */
+static struct InstanceAddrInfo **
+receive_addresses ()
+{
+  struct InstanceAddrInfo **iainfos;  
+  MPI_Status status;
+  int cnt;
+
+  iainfos = GNUNET_malloc (sizeof (struct InstanceAddrInfo *) * rwidth);
+  for (cnt=0; cnt < rwidth; cnt++)
+  {
+    struct MSH_MSG_VerifyAddress *msg;
+    int rsize;
+    int lb;
+    int up;
+    int source;
+    int ret;
+
+    GNUNET_break (MPI_SUCCESS == 
+               MPI_Probe (MPI_ANY_SOURCE, MSH_MTYPE_VERIFY_ADDRESSES, 
+                          MPI_COMM_WORLD, &status));
+    MPI_Get_elements (&status, MPI_BYTE, &rsize);
+    /* We expect a message from peers with id p in the range:       
+       (rank - current_round * rwidth - rwidth) <= p <= (rank - (current_round 
* rwidth) -1) */
+    lb = rank - current_round * rwidth - rwidth + nproc;
+    up = rank - (current_round * rwidth) - 1 + nproc;
+    GNUNET_assert (lb >= 0);
+    GNUNET_assert (up >= 0);
+    lb %= nproc;
+    up %= nproc;
+    source = status.MPI_SOURCE;
+    if (lb == up) 
+      if (source != lb)
+      {
+        GNUNET_break (0);
+        LOG_ERROR ("%d: Error: source %d; lb: %d; up: %d\n", rank, source, lb, 
up);
+        goto err_ret;
+      }
+    else if ((source > up) || (source < lb))
+    {
+      GNUNET_break (0);
+      goto err_ret;
+    }
+    msg = GNUNET_malloc (rsize);
+    if (MPI_SUCCESS != MPI_Recv (msg, rsize, MPI_BYTE, source,
+                                 MSH_MTYPE_VERIFY_ADDRESSES, MPI_COMM_WORLD,
+                                 MPI_STATUS_IGNORE))
+    {
+      GNUNET_break (0);
+      goto err_ret;
+    }
+    LOG_DEBUG ("%d: Received message of size %d from %d\n", rank, rsize, 
source);
+    if (NULL == (iainfos[cnt] = parse_verify_address_msg (msg, source)))
+    {
+      free (msg);
+      goto err_ret;
+    }
+    free (msg);
+  }
+  return iainfos;
+  
+ err_ret:
+  for (cnt=0; cnt < rwidth; cnt++)
+  {
+    if (NULL != iainfos[cnt])
+      instance_address_info_destroy (iainfos[cnt]);
+  }
+  free (iainfos);
+  return NULL;
+}
+
+
+/**
+ * Send our addresses to an MPI processes
+ *
+ * @param rank the rank of the process which has to receive our request
+ * @return GNUNET_OK on success; GNUNET_SYSERR upon error
+ */
+static int
+send_addresses ()
+{
+  struct MSH_MSG_VerifyAddress *msg;
+  struct MSH_MSG_VerifyAddress *cpys;  
+  MPI_Request *sreqs;
+  size_t msize;
+  int cnt;
+  int ret;
+  int target;
+  unsigned int width;
+
+  msize = sizeof (struct MSH_MSG_VerifyAddress) + (nips * sizeof (uint32_t));
+  msg = GNUNET_malloc (msize);
+  msg->header.size = htons (msize);
+  msg->port = htons (listen_port);
+  msg->nips = htons (nips);
+  for (cnt = 0; cnt < nips; cnt++)
+  {    
+    msg->ipaddrs[cnt] = htonl ((uint32_t) s_addrs[cnt]);
+  }
+  width = rwidth;  
+  if ( (0 != ( (nproc - 1) % rwidth)) && (current_round == ( (nproc - 1) / 
rwidth)) )
+    width = (nproc - 1) % rwidth;
+  cpys = NULL;
+  cpys = GNUNET_malloc (msize * width);
+  sreqs = GNUNET_malloc (width * sizeof (MPI_Request));
+  for (cnt=0; cnt < width; cnt++)
+  {    
+    (void) memcpy (&cpys[cnt], msg, msize);
+    target = (current_round * rwidth) + cnt + 1;
+    GNUNET_assert (target < nproc);
+    target = (rank + target) % nproc;
+    LOG_DEBUG ("%d: Sending message to %d\n", rank, target);
+    ret = MPI_Isend (&cpys[cnt], msize, MPI_BYTE, target, 
+                     MSH_MTYPE_VERIFY_ADDRESSES, MPI_COMM_WORLD, &sreqs[cnt]);
+    if (MPI_SUCCESS != ret)
+      break;
+  }
+  free (msg);
+  msg = NULL;
+  if (cnt != width)
+  {
+    for (cnt--; cnt >= 0; cnt--)
+    {
+      GNUNET_break (MPI_SUCCESS == MPI_Cancel (&sreqs[cnt]));
+      GNUNET_break (MPI_SUCCESS == MPI_Wait (&sreqs[cnt], MPI_STATUS_IGNORE));
+    }
+    goto err_ret;
+  }
+  for (cnt=0; cnt < width; cnt++)
+  {
+    GNUNET_break (MPI_SUCCESS == MPI_Wait (&sreqs[cnt], MPI_STATUS_IGNORE));   
 
+  }
+  LOG_DEBUG ("%d: Round: %d -- All messages sent successfully\n", rank, 
current_round);
+  if (NULL != cpys)
+  {    
+    free (cpys);
+    cpys = NULL;
+  }
+
+ err_ret:
+  GNUNET_free_non_null (cpys);
+  GNUNET_free_non_null (sreqs);  
+  return (MPI_SUCCESS == ret) ? GNUNET_OK : GNUNET_SYSERR;
+}
+
+
+/**
+ * This functions opens a listen socket, sends this instance's IP addresses to
+ * other instances and receives their IP addresses, starts accepting 
connections
+ * on listen socket and verifies the IP addresses of other instances by
+ * connecting to their listen sockets
+ *
+ * @return GNUNET_OK if verification is successful; GNUNET_SYSERR upon error 
(an error
+ *           message is logged)
+ */
+static int
+run_round_ ()
+{
+  unsigned int cnt;
+
+  if (GNUNET_SYSERR == send_addresses ())
+    return GNUNET_SYSERR;
+  if (NULL == (riainfos = receive_addresses ()))
+    return GNUNET_SYSERR;
+  atask = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
+                                         listen_socket, &accept_task, NULL);
+
+  if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  for (cnt = 0; cnt < rwidth; cnt++)
+    verify_addresses (riainfos[cnt]);
+  finalise_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, 
+                                                &finalise_round, NULL);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Task for running a round
+ *
+ * @param cls NULL
+ * @param tc scheduler task context
+ */
+static void
+run_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  rtask = GNUNET_SCHEDULER_NO_TASK;
+  if (GNUNET_OK != run_round_ ())
+    GNUNET_SCHEDULER_shutdown ();
+}
+
+
+/**
+ * Function to copy NULL terminated list of arguments
+ *
+ * @param argv the NULL terminated list of arguments. Cannot be NULL.
+ * @return the copied NULL terminated arguments
+ */
+static char **
+copy_argv (char *const *argv)
+{
+  char **argv_dup;
+  unsigned int argp;
+
+  GNUNET_assert (NULL != argv);
+  for (argp = 0; NULL != argv[argp]; argp++) ;
+  argv_dup = GNUNET_malloc (sizeof (char *) * (argp + 1));
+  for (argp = 0; NULL != argv[argp]; argp++)
+    argv_dup[argp] = strdup (argv[argp]);
+  return argv_dup;
+}
+
+
+/**
+ * Frees the given NULL terminated arguments
+ *
+ * @param argv the NULL terminated list of arguments
+ */
+static void
+free_argv (char **argv)
+{
+  unsigned int argp;
+
+  for (argp = 0; NULL != argv[argp]; argp++)
+    GNUNET_free (argv[argp]);
+  GNUNET_free (argv);
+}
+
+
+/**
+ * Main function that will be run.
+ *
+ * @param cls closure
+ * @param args remaining command-line arguments
+ * @param cfgfile name of the configuration file used (for saving, can be 
NULL!)
+ * @param cfg configuration
+ */
+static void 
+run (void *cls, char *const *args, const char *cfgfile,
+     const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+  const struct GNUNET_DISK_FileHandle *fh;
+  struct sockaddr_in addr;
+  socklen_t addrlen;
+  unsigned int cnt;
+
+  LOG_DEBUG ("Running main task\n");
+  if (0 == rwidth)
+  {
+    LOG_ERROR ("Round width cannot be 0.  Exiting\n");
+    return;
+  }
+  if (nproc <= rwidth)
+  {
+    LOG_ERROR ("Round width should be less than the number of processes\n");
+    return;
+  }
+  for (cnt = 0; NULL != args[cnt]; cnt++);
+  if (0 == cnt)
+  {
+    LOG_ERROR ("Need a command to execute\n");
+    return;
+  }
+  run_args = copy_argv (args);
+  bitmap = bitmap_create (rwidth);
+  addrmap = addressmap_create (nproc);
+  addrlen = sizeof (struct sockaddr_in);
+  (void) memset (&addr, 0, addrlen);
+  addr.sin_addr.s_addr = INADDR_ANY;   /* bind to all available addresses */
+  listen_socket = open_listen_socket ((struct sockaddr *) &addr, addrlen, 
rwidth);
+  listen_port = ntohs (addr.sin_port);
+  if (NULL == listen_socket)
+    return;
+  if (0 == listen_port)
+  {
+    GNUNET_break (0);
+    goto clo_ret;
+  }
+  LOG_DEBUG ("Listening on port %u\n", listen_port);
+  GNUNET_OS_network_interfaces_list (&net_if_processor, NULL);
+  if (0 == nips)
+  {
+    LOG_ERROR ("No IP addresses found\n");
+    return;
+  }
+  schedule_next_round ();
+  shutdown_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, 
+                                                &do_shutdown, NULL);
+  return;
+
+ clo_ret:
+  GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (listen_socket));
+  listen_socket = NULL;
+}
+
+
+/**
+ * The execution start point
+ *
+ * @param argc the number of arguments
+ * @param argv the argument strings
+ * @return 0 for successful termination; 1 for termination upon error
+ */
+int 
+main (int argc, char **argv)
+{
+  static const struct GNUNET_GETOPT_CommandLineOption options[] = {
+    {'w', "round-width", "COUNT",
+     "set the size of each round to COUNT",
+     GNUNET_YES, &GNUNET_GETOPT_set_uint, &rwidth},
+    GNUNET_GETOPT_OPTION_END
+  };
+  int ret;
+  int c;
+
+  ret = 1;
+  rwidth = 1;
+  GNUNET_log_setup ("mshd", NULL, NULL);
+  if (MPI_SUCCESS != MPI_Init(&argc, &argv))
+  {
+    LOG_ERROR ("Failed to initialise MPI\n");
+    return 1;
+  }
+  if (MPI_SUCCESS != MPI_Comm_size (MPI_COMM_WORLD, &nproc))
+  {
+    LOG_ERROR ("Cannot determine the number of mshd processes\n");
+    goto fail;
+  }
+  if (nproc <= rwidth)
+  {
+    LOG_ERROR ("Given round width is greater than or equal to number of mshd 
processes\n");
+    goto fail;
+  }
+  if (MPI_SUCCESS != MPI_Comm_rank (MPI_COMM_WORLD, &rank))
+  {
+    LOG_ERROR ("Cannot determine our MPI rank\n");
+    goto fail;
+  }
+  if (GNUNET_OK != GNUNET_PROGRAM_run (argc, argv, "mshd", "mshd: MSH daemon",
+                                       options, &run, NULL))
+  {
+    GNUNET_break (0);
+    goto fail;
+  }
+  ret = 0;
+  
+ fail:
+  if (MODE_WORKER <= mode)
+    return;
+  LOG_DEBUG ("Returning\n");
+  GNUNET_break (MPI_SUCCESS == MPI_Finalize());
+  return ret;
+}

Modified: msh/src/mshd_pmonitor.c
===================================================================
--- msh/src/mshd_pmonitor.c     2013-10-08 17:18:31 UTC (rev 30002)
+++ msh/src/mshd_pmonitor.c     2013-10-08 17:33:28 UTC (rev 30003)
@@ -24,11 +24,6 @@
 struct MonitorCtx
 {
   /**
-   * The process to monitor
-   */
-  struct GNUNET_OS_Process *proc;
-
-  /**
    * Termination notification callback
    */
   MSH_ProcExitCallback cb;
@@ -243,16 +238,13 @@
  * @param cls the closure for the above callback
  */
 void
-MSH_monitor_process (struct GNUNET_OS_Process *proc,
-                     MSH_ProcExitCallback cb, void *cls)
+MSH_monitor_process_pid (pid_t pid,
+                         MSH_ProcExitCallback cb, void *cls)
 {
   struct MonitorCtx *ctx;
-  pid_t pid;
 
   GNUNET_assert (NULL != map);
-  pid = GNUNET_OS_process_get_pid (proc);
   ctx = GNUNET_malloc (sizeof (struct MonitorCtx));
-  ctx->proc = proc;
   ctx->cb = cb;
   ctx->cls = cls;
   GNUNET_assert 
@@ -263,6 +255,25 @@
 
 
 /**
+ * Monitors a process for its termination.
+ *
+ * @param proc the process to monitor for termination
+ * @param cb the callback to be called for notifying the termination of the
+ *    process
+ * @param cls the closure for the above callback
+ */
+void
+MSH_monitor_process (struct GNUNET_OS_Process *proc,
+                     MSH_ProcExitCallback cb, void *cls)
+{
+  pid_t pid;
+
+  pid = GNUNET_OS_process_get_pid (proc);
+  MSH_monitor_process_pid (pid, cb, cls);
+}
+
+
+/**
  * Stop monitoring a process
  *
  * @param proc
@@ -270,13 +281,11 @@
  *   monitored earlier
  */
 int
-MSH_monitor_process_cancel (struct GNUNET_OS_Process *proc)
+MSH_monitor_process_pid_cancel (pid_t pid)
 {
   struct MonitorCtx *ctx;
-  pid_t pid;
-  
-  GNUNET_assert (NULL != map);
-  pid = GNUNET_OS_process_get_pid (proc);
+
+  GNUNET_assert (NULL != map);  
   ctx = GNUNET_CONTAINER_multihashmap32_get (map, (uint32_t) pid);
   if (NULL == ctx)
     return GNUNET_SYSERR;
@@ -286,3 +295,20 @@
   GNUNET_free (ctx);
   return GNUNET_OK;
 }
+
+
+/**
+ * Stop monitoring a process
+ *
+ * @param proc
+ * @return GNUNET_OK upon success; GNUNET_SYSERR if the process is not being
+ *   monitored earlier
+ */
+int
+MSH_monitor_process_cancel (struct GNUNET_OS_Process *proc)
+{
+  pid_t pid;
+  
+  pid = GNUNET_OS_process_get_pid (proc);
+  return MSH_monitor_process_pid_cancel (pid);
+}




reply via email to

[Prev in Thread] Current Thread [Next in Thread]