[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r28352 - msh/src
From: |
gnunet |
Subject: |
[GNUnet-SVN] r28352 - msh/src |
Date: |
Thu, 1 Aug 2013 16:22:54 +0200 |
Author: harsha
Date: 2013-08-01 16:22:53 +0200 (Thu, 01 Aug 2013)
New Revision: 28352
Added:
msh/src/msh.c
Modified:
msh/src/
msh/src/Makefile.am
msh/src/mshd.c
msh/src/mtypes.h
Log:
MSH command line utility
Index: msh/src
===================================================================
--- msh/src 2013-08-01 14:11:28 UTC (rev 28351)
+++ msh/src 2013-08-01 14:22:53 UTC (rev 28352)
Property changes on: msh/src
___________________________________________________________________
Modified: svn:ignore
## -2,6 +2,7 ##
Makefile
.deps
mshd
+msh
mping
test-suite.log
test-scheduler*
Modified: msh/src/Makefile.am
===================================================================
--- msh/src/Makefile.am 2013-08-01 14:11:28 UTC (rev 28351)
+++ msh/src/Makefile.am 2013-08-01 14:22:53 UTC (rev 28352)
@@ -1,11 +1,14 @@
-bin_PROGRAMS = mping mshd
+bin_PROGRAMS = mping mshd msh
mping_SOURCES = mping.c
-mshd_SOURCES = mshd.c mshd.h util.c util.h \
+mshd_SOURCES = mshd.c mshd.h util.c util.h mtypes.h \
common.h bitmap.c bitmap.h addressmap.c addressmap.h reduce.h reduce.c
mshd_LDADD = -lgnunetutil -lm
+msh_SOURCES = msh.c mtypes.h
+msh_LDADD = -lgnunetutil
+
check_PROGRAMS = \
test-bitmap \
test-addressmap
Added: msh/src/msh.c
===================================================================
--- msh/src/msh.c (rev 0)
+++ msh/src/msh.c 2013-08-01 14:22:53 UTC (rev 28352)
@@ -0,0 +1,488 @@
+/**
+ * @file msh.c
+ * @brief program to launch remote processes through MSH. This program is
+ * to be started by the MSH Daemon or by any process in the process
+ * subtree having the MSH Daemon as a parent process.
+ * @author Sree Harsha Totakura <address@hidden>
+ */
+
+#include "common.h"
+#include <gnunet/gnunet_util_lib.h>
+#include "util.h"
+#include "mtypes.h"
+
+#define LOG(kind,...) \
+ GNUNET_log (kind, __VA_ARGS__)
+
+#define LOG_DEBUG(...) LOG(GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
+
+#define LOG_ERROR(...) LOG(GNUNET_ERROR_TYPE_ERROR, __VA_ARGS__)
+
+/**
+ * The message queue for sending messages to the controller service
+ */
+struct MessageQueue
+{
+ /**
+ * next pointer for DLL
+ */
+ struct MessageQueue *next;
+
+ /**
+ * prev pointer for DLL
+ */
+ struct MessageQueue *prev;
+
+ /**
+ * The message to be sent
+ */
+ struct GNUNET_MessageHeader *msg;
+};
+
+/**
+ * DLL head for the message queue
+ */
+static struct MessageQueue *mq_head;
+
+/**
+ * DLL tail for the message queue
+ */
+static struct MessageQueue *mq_tail;
+
+/**
+ * Our connection to the parent MSHD
+ */
+static struct GNUNET_CLIENT_Connection *client;
+
+/**
+ * Handle for transmissions on conn
+ */
+static struct GNUNET_CLIENT_TransmitHandle *th;
+
+/**
+ * file handle for the stdin
+ */
+static struct GNUNET_DISK_FileHandle *fh_stdin;
+
+/**
+ * file handle for stdout
+ */
+static struct GNUNET_DISK_FileHandle *fh_stdout;
+
+/**
+ * The command string (cmd + parameters)
+ */
+static char *cmdstr;
+
+/**
+ * Program help text
+ */
+static char *help_text =
+ "MSH utility to lauch remote programs\n"
+ "Usage: msh <ip-address> <cmd> [arg] ... \n";
+
+/**
+ * Buffer used to read data from input streams
+ */
+static char recv_buf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
+
+/**
+ * Handle for a test to check if MSHD is running in a mode accepting remote
commands.
+ */
+struct GNUNET_CLIENT_TestHandle *test;
+
+/**
+ * our configuration for accessing GNUnet's client API functions
+ */
+struct GNUNET_CONFIGURATION_Handle *cfg;
+
+/**
+ * length of the cmdstr
+ */
+static size_t cmdstr_len;
+
+/**
+ * Task to forward stdin to MSHD
+ */
+static GNUNET_SCHEDULER_TaskIdentifier task_fwd_stdin;
+
+/**
+ * Task to forward remote command output from MSHD to stdout
+ */
+static GNUNET_SCHEDULER_TaskIdentifier task_fwd_stdout;
+
+/**
+ * retry backoff time
+ */
+static struct GNUNET_TIME_Relative retry_backoff;
+
+/**
+ * flag to indicate successful completion by setting it to GNUNET_OK
+ */
+static int result;
+
+/**
+ * are we waiting for receiving something on connection conn
+ */
+static int in_receive;
+
+/**
+ * different states
+ */
+enum State
+{
+ /**
+ * Initial state where the remote command is delivered to the MSHD
+ */
+ STATE_DELIVER_CMD,
+
+ /**
+ * State where the stdin and stdout streams are forwarded to and from the
MSHD
+ */
+ STATE_FORWARD_STREAMS
+} state;
+
+/**
+ * The ip address of the remote node
+ */
+static in_addr_t target;
+
+
+/**
+ * Function to cleanup the message queue
+ *
+ * @param
+ * @return
+ */
+static void
+cleanup_mq ()
+{
+ struct MessageQueue *mq;
+
+ while (NULL != (mq = mq_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (mq_head, mq_tail, mq);
+ GNUNET_free (mq->msg);
+ GNUNET_free (mq);
+ }
+}
+
+
+/**
+ * Function called to notify a client about the connection
+ * begin ready to queue more data. "buf" will be
+ * NULL and "size" zero if the connection was closed for
+ * writing in the meantime.
+ *
+ * @param cls closure
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+conn_transmit_ready (void *cls, size_t size, void *buf)
+{
+ struct MessageQueue *mq;
+ uint16_t msg_size;
+ size_t wrote;
+
+ wrote = 0;
+ th = NULL;
+ if ((0 == size) && (NULL == buf))
+ {
+ LOG_ERROR ("Connection to MSHD broken\n");
+ cleanup_mq ();
+ return 0;
+ }
+ mq = mq_head;
+ GNUNET_assert (NULL != mq);
+ if ((0 == size) || (NULL == buf))
+ {
+ LOG_DEBUG ("Message sending timed out -- retrying\n");
+ retry_backoff = GNUNET_TIME_STD_BACKOFF (retry_backoff);
+ th =
+ GNUNET_CLIENT_notify_transmit_ready (client, ntohs (mq->msg->size),
+ retry_backoff, GNUNET_NO,
+ &conn_transmit_ready,
+ NULL);
+
+ return 0;
+ }
+ retry_backoff = GNUNET_TIME_UNIT_ZERO;
+ msg_size = ntohs (mq->msg->size);
+ GNUNET_assert ( msg_size <= size);
+ (void) memcpy (buf, mq->msg, msg_size);
+ wrote = msg_size;
+ GNUNET_free (mq->msg);
+ GNUNET_CONTAINER_DLL_remove (mq_head, mq_tail, mq);
+ GNUNET_free (mq);
+ if (NULL != (mq = mq_head))
+ {
+ retry_backoff = GNUNET_TIME_STD_BACKOFF (retry_backoff);
+ th =
+ GNUNET_CLIENT_notify_transmit_ready (client, ntohs (mq->msg->size),
+ retry_backoff, GNUNET_NO,
+ &conn_transmit_ready,
+ NULL);
+ }
+ return wrote;
+}
+
+
+/**
+ * Queues a message in send queue assocaited with conn
+ *
+ * @param msg the message to queue
+ */
+static void
+queue_message (struct GNUNET_MessageHeader *msg)
+{
+ struct MessageQueue *mq;
+ uint16_t type;
+ uint16_t size;
+
+ type = ntohs (msg->type);
+ size = ntohs (msg->size);
+ mq = GNUNET_malloc (sizeof (struct MessageQueue));
+ mq->msg = msg;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Queueing message of type %u, size %u for sending\n", type,
+ ntohs (msg->size));
+ GNUNET_CONTAINER_DLL_insert_tail (mq_head, mq_tail, mq);
+ if (NULL == th)
+ {
+ retry_backoff = GNUNET_TIME_STD_BACKOFF (retry_backoff);
+ th =
+ GNUNET_CLIENT_notify_transmit_ready (client, size,
+ retry_backoff, GNUNET_NO,
+ &conn_transmit_ready,
+ NULL);
+ }
+}
+
+
+/**
+ * forward stdin data to MSHD
+ *
+ * @param cls NULL
+ * @param tc scheduler task context
+ */
+static void
+fwd_stdin (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct MSH_MSG_CmdIO *msg;
+ ssize_t size;
+ uint16_t msg_size;
+
+ task_fwd_stdin = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ {
+ GNUNET_break (GNUNET_OK == GNUNET_DISK_file_close (fh_stdin));
+ return;
+ }
+ size = GNUNET_DISK_file_read (fh_stdin, recv_buf,
+ GNUNET_SERVER_MAX_MESSAGE_SIZE
+ - sizeof (struct MSH_MSG_CmdIO));
+ if (GNUNET_SYSERR == size)
+ {
+ GNUNET_break (0);
+ GNUNET_break (GNUNET_OK == GNUNET_DISK_file_close (fh_stdin));
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ if (0 == size)
+ {
+ GNUNET_break (0);
+ goto reschedule;
+ }
+ msg_size = sizeof (struct MSH_MSG_CmdIO) + size;
+ msg->header.size = htons (msg_size);
+ msg->header.type = htons (MSH_MTYPE_CMD_STREAM_STDIN);
+ (void) memcpy (msg->data, recv_buf, size);
+ queue_message (&msg->header);
+
+ reschedule:
+ task_fwd_stdin =
+ GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL,
+ fh_stdin, &fwd_stdin, NULL);
+ return;
+}
+
+
+/**
+ * Type of a function to call when we receive a message
+ * from the service.
+ *
+ * @param cls closure
+ * @param msg message received, NULL on timeout or fatal error
+ */
+static void
+forward_output (void *cls, const struct GNUNET_MessageHeader *msg_)
+{
+ const struct MSH_MSG_CmdIO *msg;
+ int fd_stdout;
+ uint16_t size;
+
+ msg = (const struct MSH_MSG_CmdIO *) msg_;
+ if (MSH_MTYPE_CMD_STREAM_STDOUT != ntohs (msg->header.type))
+ {
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ size = ntohs (msg->header.size);
+ fd_stdout = fileno (stdout);
+ GNUNET_assert (-1 < fd_stdout);
+ GNUNET_break (size == write (fd_stdout, msg->data,
+ size - sizeof (struct MSH_MSG_CmdIO)));
+ GNUNET_CLIENT_receive (client, &forward_output, NULL,
+ GNUNET_TIME_UNIT_FOREVER_REL);
+}
+
+
+/**
+ * Type of a function to call when we receive a message
+ * from the service.
+ *
+ * @param cls closure
+ * @param msg message received, NULL on timeout or fatal error
+ */
+static void
+delivery_status (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+ if (MSH_MTYPE_SERVER_RUNCMDSTATUS != ntohs (msg->type))
+ {
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ fh_stdin = GNUNET_DISK_get_handle_from_native (stdin);
+ fh_stdout = GNUNET_DISK_get_handle_from_native (stdout);
+ state = STATE_FORWARD_STREAMS;
+ GNUNET_CLIENT_receive (client, &forward_output, NULL,
+ GNUNET_TIME_UNIT_FOREVER_REL);
+ task_fwd_stdin =
+ GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL,
+ fh_stdin, &fwd_stdin, NULL);
+}
+
+
+/**
+ * Function called with the result on the service test.
+ *
+ * @param cls closure
+ * @param result GNUNET_YES if the service is running,
+ * GNUNET_NO if the service is not running
+ * GNUNET_SYSERR if the configuration is invalid
+ */
+static void
+result_cb (void *cls, int result)
+{
+ struct MSH_MSG_RunCmd *msg;
+ uint16_t size;
+
+ test = NULL;
+ if (GNUNET_YES != result)
+ {
+ LOG_ERROR ("Service not running\n");
+ return;
+ }
+ client = GNUNET_CLIENT_connect ("mshd", cfg);
+ GNUNET_assert (NULL != client);
+ GNUNET_assert (0 != cmdstr_len);
+ size = sizeof (struct MSH_MSG_RunCmd) + cmdstr_len;
+ msg = GNUNET_malloc (size);
+ msg->header.size = htons (size);
+ msg->header.type = htons (MSH_MTYPE_CLIENT_RUNCMD);
+ msg->ip = htonl ((uint32_t) target);
+ (void) memcpy (msg->cmd, cmdstr, cmdstr_len);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CLIENT_transmit_and_get_response (client, &msg->header,
+
GNUNET_TIME_UNIT_SECONDS,
+ GNUNET_NO,
+ &delivery_status,
+ NULL));
+
+}
+
+
+/**
+ * Main function that will be run.
+ *
+ * @param cls closure
+ * @param args remaining command-line arguments
+ * @param cfgfile name of the configuration file used (for saving, can be
NULL!)
+ * @param cfg_ configuration
+ */
+static void
+run (void *cls, char *const *args, const char *cfgfile,
+ const struct GNUNET_CONFIGURATION_Handle *cfg_)
+{
+ char *ipstr;
+ char *psock_path;
+ size_t arg_len;
+ unsigned int cnt;
+
+ state = STATE_DELIVER_CMD;
+ if (NULL == (ipstr = args[0]))
+ {
+ LOG_ERROR ("Require an IP address\n");
+ fprintf (stderr, "%s", help_text);
+ return;
+ }
+ target = inet_network (ipstr);
+ if ((uint32_t) -1 == target)
+ {
+ LOG_ERROR ("Invalid IP address given");
+ return;
+ }
+ for (cnt = 1; NULL != args[cnt]; cnt++)
+ {
+ arg_len = strlen (args[cnt]) + 1;
+ cmdstr = GNUNET_realloc (cmdstr, cmdstr_len + arg_len);
+ (void) memcpy (cmdstr + cmdstr_len, args[cnt], arg_len);
+ cmdstr_len += arg_len;
+ }
+ if (0 == cmdstr_len)
+ {
+ LOG_ERROR ("Require command to execute remotely\n");
+ fprintf (stderr, "%s", help_text);
+ return;
+ }
+ if (NULL == (psock_path = getenv("MSHD_SOCK")) )
+ {
+ LOG_ERROR ("Could not find socket for local MSHD. Are we running under
MSHD?\n");
+ return;
+ }
+ cfg = GNUNET_CONFIGURATION_dup (cfg_);
+ GNUNET_CONFIGURATION_set_value_string (cfg, "MSHD", "UNIXPATH", psock_path);
+ if (NULL == client)
+ {
+ LOG_ERROR ("Cannot open connection to MSHD\n");
+ return;
+ }
+ test = GNUNET_CLIENT_service_test ("mshd", cfg, GNUNET_TIME_UNIT_SECONDS,
+ &result_cb, NULL);
+}
+
+
+/**
+ * Program entry point
+ */
+int
+main (int argc, char * const argv[])
+{
+ static const struct GNUNET_GETOPT_CommandLineOption options[] = {
+ GNUNET_GETOPT_OPTION_END
+ };
+
+ result = GNUNET_SYSERR;
+ if (GNUNET_OK != GNUNET_PROGRAM_run (argc, argv, "msh",
+ help_text,
+ options, &run, NULL))
+ {
+ GNUNET_break (0);
+ return 1;
+ }
+ return (GNUNET_OK == result) ? 0 : 2;
+}
Modified: msh/src/mshd.c
===================================================================
--- msh/src/mshd.c 2013-08-01 14:11:28 UTC (rev 28351)
+++ msh/src/mshd.c 2013-08-01 14:22:53 UTC (rev 28352)
@@ -1,3 +1,9 @@
+/**
+ * @file mshd.c
+ * @brief implementation of the MSH Daemon
+ * @author Sree Harsha Totakura <address@hidden>
+ */
+
#include "common.h"
#include <gnunet/gnunet_util_lib.h>
#include <mpi.h>
Modified: msh/src/mtypes.h
===================================================================
--- msh/src/mtypes.h 2013-08-01 14:11:28 UTC (rev 28351)
+++ msh/src/mtypes.h 2013-08-01 14:22:53 UTC (rev 28352)
@@ -11,18 +11,6 @@
/**
- * Message header that will be included for all messages
- */
-struct MSH_MessageHeader
-{
- /**
- * The size of the message
- */
- uint16_t size GNUNET_PACKED;
-};
-
-
-/**
* Message for sending addresses for verification
*/
struct MSH_MSG_VerifyAddress
@@ -30,7 +18,7 @@
/**
* Message header
*/
- struct MSH_MessageHeader header;
+ struct GNUNET_MessageHeader header;
/**
* Randomly chosen port number
@@ -80,7 +68,7 @@
/**
* Header for this message
*/
- struct MSH_MessageHeader header;
+ struct GNUNET_MessageHeader header;
/**
* The rank of the instance
@@ -114,4 +102,58 @@
#define MSH_MTYPE_INSTANCE_ADDRESS 101
+/****************************************************************/
+/* MSH Daemon and MSH command line tool communication messsages */
+/****************************************************************/
+
+#define MSH_MTYPE_CLIENT_RUNCMD 200
+
+#define MSH_MTYPE_SERVER_RUNCMDSTATUS 201
+
+#define MSH_MTYPE_CMD_STREAM_STDIN 202
+
+#define MSH_MTYPE_CMD_STREAM_STDOUT 203
+
+
+/**
+ * Message for sending a remote command and its arguments to MSHD
+ */
+struct MSH_MSG_RunCmd
+{
+ /**
+ * header
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * The IP of the remote host
+ */
+ uint32_t ip;
+
+ /**
+ * The command and all its arguments to be followed. Each string is to be
+ * NULL-terminated.
+ */
+ char cmd[0];
+};
+
+
+/**
+ * Message for sending STDIN for the remote command to MSHD
+ */
+struct MSH_MSG_CmdIO
+{
+ /**
+ * header; set type to MSH_MTYPE_CMD_STREAM_STDIN or
+ * MSH_MTYPE_CMD_STREAM_STDOUT according to the direction of the stream
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * stdin data
+ */
+ char data[0];
+};
+
+
#endif /* MTYPES_H_ */
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r28352 - msh/src,
gnunet <=