gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r26333 - gnunet/src/testbed


From: gnunet
Subject: [GNUnet-SVN] r26333 - gnunet/src/testbed
Date: Wed, 6 Mar 2013 11:52:53 +0100

Author: harsha
Date: 2013-03-06 11:52:53 +0100 (Wed, 06 Mar 2013)
New Revision: 26333

Modified:
   gnunet/src/testbed/gnunet_testbed_mpi_spawn.c
Log:
Remove MPI code
Starts the child process only in one instance (checks for lowest host ip; and
that host ensures all instances co-ordinate via explicit lock file creation)
Implemented basic ARM-like functionality for dealing with child processes



Modified: gnunet/src/testbed/gnunet_testbed_mpi_spawn.c
===================================================================
--- gnunet/src/testbed/gnunet_testbed_mpi_spawn.c       2013-03-06 10:38:19 UTC 
(rev 26332)
+++ gnunet/src/testbed/gnunet_testbed_mpi_spawn.c       2013-03-06 10:52:53 UTC 
(rev 26333)
@@ -1,12 +1,13 @@
 #include "platform.h"
 #include "gnunet_util_lib.h"
-#include <mpi.h>
+#include "gnunet_testbed_service.h"
 
+
 /**
  * Generic logging shorthand
  */
 #define LOG(kind,...)                           \
-  fprintf (stderr, __VA_ARGS__)
+  GNUNET_log (kind, __VA_ARGS__)
 
 /**
  * Debug logging shorthand
@@ -19,84 +20,362 @@
  */
 static int ret;
 
+/**
+ * The child process we spawn
+ */
+static struct GNUNET_OS_Process *child;
 
 /**
+ * The arguments including the binary to spawn 
+ */
+static char **argv2;
+
+/**
+ * All our IP addresses
+ */
+static char **our_addrs;
+
+/**
+ * Pipe used to communicate shutdown via signal.
+ */
+static struct GNUNET_DISK_PipeHandle *sigpipe;
+
+/**
+ * Filename of the unique file
+ */
+static char *fn;
+
+/**
+ * Handle to the unique file
+ */
+static int fh;
+
+/**
+ * The return code of the binary
+ */
+static unsigned long child_exit_code;
+
+/**
+ * The process status of the child
+ */
+static enum GNUNET_OS_ProcessStatusType child_status;
+
+/**
+ * how many IP addresses are currently assigned to us
+ */
+static unsigned int num_addrs;
+
+/**
+ * The shutdown task
+ */
+static GNUNET_SCHEDULER_TaskIdentifier shutdown_task_id;
+
+/**
+ * Task to kill the child
+ */
+static GNUNET_SCHEDULER_TaskIdentifier terminate_task_id;
+
+/**
+ * Task to kill the child
+ */
+static GNUNET_SCHEDULER_TaskIdentifier child_death_task_id;
+
+/**
+ * The shutdown task
+ */
+static void
+shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  shutdown_task_id = GNUNET_SCHEDULER_NO_TASK;
+  if (0 != child_exit_code)
+  {
+    LOG (GNUNET_ERROR_TYPE_WARNING, "Child exited with error code: %lu\n", 
+         child_exit_code);
+    ret = 128 + (int) child_exit_code;
+  }
+  if (0 != fh)
+  {
+    close (fh);
+  }
+  if ((NULL != fn) && (0 != unlink (fn)))
+  {
+    GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "open");
+    ret = GNUNET_SYSERR;
+  }
+}
+
+
+static void
+terminate_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  static int hard_kill;
+
+  GNUNET_assert (NULL != child);
+  terminate_task_id = 
+      GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
+                                    &terminate_task, NULL);
+  if (0 != hard_kill)
+  {
+    switch (hard_kill)
+    {
+    case 1:
+    case 2:
+      LOG (GNUNET_ERROR_TYPE_WARNING, 
+           "%d more interrupts needed to send SIGKILL to the child\n",
+           3 - hard_kill);
+      hard_kill++;
+      return;
+    case 3:
+      GNUNET_break (0 == GNUNET_OS_process_kill (child, SIGKILL));
+      return;
+    }
+  }
+  hard_kill++;
+  GNUNET_break (0 == GNUNET_OS_process_kill (child, SIGTERM));
+  LOG (GNUNET_ERROR_TYPE_INFO, _("Waiting for child to exit.\n"));
+}
+
+
+/**
+ * Task triggered whenever we receive a SIGCHLD (child
+ * process died).
+ *
+ * @param cls closure, NULL if we need to self-restart
+ * @param tc context
+ */
+static void
+child_death_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  const struct GNUNET_DISK_FileHandle *pr;
+  char c[16];
+
+  pr = GNUNET_DISK_pipe_handle (sigpipe, GNUNET_DISK_PIPE_END_READ);
+  child_death_task_id = GNUNET_SCHEDULER_NO_TASK;
+  if (0 == (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY))
+  {
+    child_death_task_id =
+       GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL,
+                                       pr, &child_death_task, NULL);
+    return;
+  }
+  /* consume the signal */
+  GNUNET_break (0 < GNUNET_DISK_file_read (pr, &c, sizeof (c)));
+  LOG_DEBUG ("Child died\n");
+  GNUNET_SCHEDULER_cancel (terminate_task_id);
+  terminate_task_id = GNUNET_SCHEDULER_NO_TASK;
+  GNUNET_assert (GNUNET_OK == GNUNET_OS_process_status (child, &child_status,
+                                                        &child_exit_code));
+  GNUNET_OS_process_destroy (child);
+  child = NULL;
+  shutdown_task_id = GNUNET_SCHEDULER_add_now (&shutdown_task, NULL);
+}
+
+
+/**
+ * Callback function invoked for each interface found.
+ *
+ * @param cls NULL
+ * @param name name of the interface (can be NULL for unknown)
+ * @param isDefault is this presumably the default interface
+ * @param addr address of this interface (can be NULL for unknown or 
unassigned)
+ * @param broadcast_addr the broadcast address (can be NULL for unknown or 
unassigned)
+ * @param netmask the network mask (can be NULL for unknown or unassigned))
+ * @param addrlen length of the address
+ * @return GNUNET_OK to continue iteration, GNUNET_SYSERR to abort
+ */
+static int 
+addr_proc (void *cls, const char *name, int isDefault,
+           const struct sockaddr *addr,
+           const struct sockaddr *broadcast_addr,
+           const struct sockaddr *netmask, socklen_t addrlen)
+{
+  const struct sockaddr_in *in_addr;
+  char *ipaddr;
+
+  if (sizeof (struct sockaddr_in) != addrlen)
+    return GNUNET_OK;
+  in_addr = (const struct sockaddr_in *) addr;
+  if (NULL == (ipaddr = inet_ntoa (in_addr->sin_addr)))
+    return GNUNET_OK;
+  GNUNET_array_append (our_addrs, num_addrs, GNUNET_strdup (ipaddr));
+  return GNUNET_OK;
+}
+
+
+static void
+destroy_hosts(struct GNUNET_TESTBED_Host **hosts, unsigned int nhosts)
+{
+  unsigned int host;
+
+  GNUNET_assert (NULL != hosts);
+  for (host = 0; host < nhosts; host++)
+    if (NULL != hosts[host])
+      GNUNET_TESTBED_host_destroy (hosts[host]);
+  GNUNET_free (hosts);
+  hosts = NULL;
+}
+
+
+/**
+ * The main scheduler run task
+ *
+ * @param cls NULL
+ * @param tc scheduler task context
+ */
+static void
+run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct GNUNET_TESTBED_Host **hosts;
+  const struct GNUNET_CONFIGURATION_Handle *null_cfg;
+  const char *host_ip;
+  char *tmpdir;
+  unsigned int nhosts;
+  unsigned int host_cnt;
+  unsigned int addr_cnt;
+
+  GNUNET_OS_network_interfaces_list (&addr_proc, NULL);
+  if (0 == num_addrs)
+  {
+    GNUNET_break (0);
+    ret = GNUNET_SYSERR;
+    return;
+  }
+  null_cfg = GNUNET_CONFIGURATION_create ();
+  nhosts = GNUNET_TESTBED_hosts_load_from_loadleveler (null_cfg, &hosts);
+  if (0 == nhosts)
+  {
+    GNUNET_break (0);
+    ret = GNUNET_SYSERR;
+    return;
+  }
+  for (host_cnt = 0; host_cnt < nhosts; host_cnt++)
+  {
+    host_ip = GNUNET_TESTBED_host_get_hostname (hosts[host_cnt]);
+    for (addr_cnt = 0; addr_cnt < num_addrs; addr_cnt++)
+      if (0 == strcmp (host_ip, our_addrs[addr_cnt]))
+        goto proceed;
+  }
+  GNUNET_break (0);
+  ret = GNUNET_SYSERR;
+  destroy_hosts (hosts, nhosts);
+  return;
+
+ proceed:
+  destroy_hosts (hosts, nhosts);
+  if (0 != host_cnt)
+  {
+    LOG_DEBUG ("Exiting as we are not the lowest host\n");
+    ret = GNUNET_OK;
+    return;
+  }
+  tmpdir = getenv ("TMPDIR");
+  if (NULL == tmpdir)
+    tmpdir = getenv ("TMP");
+  if (NULL == tmpdir)
+    tmpdir = getenv ("TEMP");  
+  if (NULL == tmpdir)
+    tmpdir = "/tmp";
+  (void) GNUNET_asprintf (&fn, "%s/gnunet-testbed-spawn.lock", tmpdir);
+  /* Open the unique file; we can create it then we can spawn the child process
+     else we exit */
+  fh = open (fn, O_CREAT | O_EXCL | O_CLOEXEC,
+             S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
+  if (-1 == fh)
+  {
+    if (EEXIST == errno)
+    {
+      LOG_DEBUG ("Lock file already created by other process.  Exiting\n");
+      ret = GNUNET_OK;
+      return;
+    }
+    GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "open");
+    ret = GNUNET_SYSERR;
+    return;
+  }
+  /* Spawn the new process here */
+  LOG (GNUNET_ERROR_TYPE_INFO, _("Spawning process `%s'\n"), argv2[0]);
+  child = GNUNET_OS_start_process_vap (GNUNET_NO, GNUNET_OS_INHERIT_STD_ALL, 
NULL,
+                                       NULL,
+                                       argv2[0], argv2);
+  if (NULL == child)
+  {
+    GNUNET_break (0);
+    ret = GNUNET_SYSERR;
+    return;
+  }
+  ret = GNUNET_OK;
+  terminate_task_id =
+      GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
+                                    &terminate_task, NULL);
+  child_death_task_id =
+    GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL,
+                                   GNUNET_DISK_pipe_handle (sigpipe,
+                                                            
GNUNET_DISK_PIPE_END_READ),
+                                   &child_death_task, NULL);
+}
+
+
+/**
+ * Signal handler called for SIGCHLD.
+ */
+static void
+sighandler_child_death ()
+{
+  static char c;
+  int old_errno = errno;       /* back-up errno */
+
+  GNUNET_break (1 ==
+               GNUNET_DISK_file_write (GNUNET_DISK_pipe_handle
+                                       (sigpipe, GNUNET_DISK_PIPE_END_WRITE),
+                                       &c, sizeof (c)));
+  errno = old_errno;           /* restore errno */
+}
+
+
+/**
  * Execution start point
  */
 int
 main (int argc, char *argv[])
 {
-  static const struct GNUNET_GETOPT_CommandLineOption options[] = {
-    GNUNET_GETOPT_OPTION_END
-  };
-  struct GNUNET_OS_Process *proc;
-  char **argv2;
-  unsigned long code;
-  enum GNUNET_OS_ProcessStatusType proc_status;
-  int rank;
-  int chstat;
-  unsigned int host;
+  struct GNUNET_SIGNAL_Context *shc_chld;
   unsigned int cnt;
-  
+
   ret = -1;
   if (argc < 2)
   {
     printf ("Need arguments: gnunet-testbed-mpi-spawn <cmd> <cmd_args>");
     return 1;
   }
-  if (MPI_SUCCESS != MPI_Init (&argc, &argv))
+  if (GNUNET_OK != GNUNET_log_setup ("gnunet-testbed-spawn", NULL, NULL))
   {
     GNUNET_break (0);
-    return 2;
+    return 1;
   }
-  if (MPI_SUCCESS != MPI_Comm_rank (MPI_COMM_WORLD, &rank))
+  if (NULL == (sigpipe = GNUNET_DISK_pipe (GNUNET_NO, GNUNET_NO, 
+                                           GNUNET_NO, GNUNET_NO)))
   {
     GNUNET_break (0);
-    ret = 3;
-    (void) MPI_Finalize ();
-    goto end;
+    ret = GNUNET_SYSERR;
+    return 1;
   }
-  if (0 != rank)
+  shc_chld =
+      GNUNET_SIGNAL_handler_install (GNUNET_SIGCHLD, &sighandler_child_death);
+  if (NULL == shc_chld)
   {
-    ret = 0;
-    (void) MPI_Finalize ();
-    goto end;
+    LOG (GNUNET_ERROR_TYPE_ERROR, "Cannot install a signal handler\n");
+    return 1;
   }
-  (void) MPI_Finalize ();
-  PRINTF ("Spawning process\n");
   argv2 = GNUNET_malloc (sizeof (char *) * argc);
   for (cnt = 1; cnt < argc; cnt++)
     argv2[cnt - 1] = argv[cnt];
-  proc =
-      GNUNET_OS_start_process_vap (GNUNET_NO, GNUNET_OS_INHERIT_STD_ALL, NULL,
-                                   NULL, argv2[0], argv2);
-  if (NULL == proc)
-  {
-    LOG (GNUNET_ERROR_TYPE_ERROR, "Cannot exec\n");
-    ret = 5;
-    goto end;
-  }
-  do
-  {
-    (void) sleep (1);
-    chstat = GNUNET_OS_process_status (proc, &proc_status, &code);
-  }
-  while (GNUNET_NO == chstat);
-  if (GNUNET_OK != chstat)
-  { 
-    ret = 6;
-    goto end;
-  }
-  if (0 != code)
-  {
-    LOG (GNUNET_ERROR_TYPE_WARNING, "Child terminated abnormally\n");
-    ret = 50 + (int) code;
-    goto end;
-  }
-  ret = 0;
-  
- end:  
-  if (0 != ret)
-    LOG (GNUNET_ERROR_TYPE_ERROR, "Something went wrong. Error: %d\n", ret);
-  return ret;
+  GNUNET_SCHEDULER_run (run, NULL);  
+  GNUNET_free (argv2);
+  GNUNET_SIGNAL_handler_uninstall (shc_chld);
+  shc_chld = NULL;
+  GNUNET_DISK_pipe_close (sigpipe);
+  GNUNET_free_non_null (fn);
+  if (GNUNET_OK != ret)
+    return ret;
+  return 0;
 }




reply via email to

[Prev in Thread] Current Thread [Next in Thread]