[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r28335 - in msh: . src
From: |
gnunet |
Subject: |
[GNUnet-SVN] r28335 - in msh: . src |
Date: |
Mon, 29 Jul 2013 09:10:22 +0200 |
Author: harsha
Date: 2013-07-29 09:10:22 +0200 (Mon, 29 Jul 2013)
New Revision: 28335
Removed:
msh/src/scheduler.c
msh/src/scheduler.h
Modified:
msh/configure.ac
msh/src/Makefile.am
msh/src/addressmap.c
msh/src/mshd.c
msh/src/mtypes.h
msh/src/reduce.c
msh/src/util.c
msh/src/util.h
Log:
- use gnunetutil
Modified: msh/configure.ac
===================================================================
--- msh/configure.ac 2013-07-29 03:50:22 UTC (rev 28334)
+++ msh/configure.ac 2013-07-29 07:10:22 UTC (rev 28335)
@@ -46,39 +46,34 @@
AC_MSG_ERROR([MSH requires MPI libraries])
fi
-# test for libevent
-libevent=0
-AC_MSG_CHECKING(for libevent)
-AC_ARG_WITH(libevent,
- [AS_HELP_STRING([--with-libevent=PFX],
- [base of libevent installation])],
- [AC_MSG_RESULT([$with_libevent])
- case $with_libevent in
+# test for libgnunetutil
+libgnunetutil=0
+AC_MSG_CHECKING(for libgnunetutil)
+AC_ARG_WITH(gnunet,
+ [AS_HELP_STRING([--with-gnunet=PFX], [base of gnunet installation])],
+ [AC_MSG_RESULT([$with_gnunet])
+ case $with_gnunet in
no)
;;
yes)
- AC_CHECK_HEADERS([event2/event.h], [AC_CHECK_LIB([event],
[event_base_new], libevent=1)])
+ AC_CHECK_HEADERS([gnunet/gnunet_util_lib.h],
[AC_CHECK_LIB([gnunetutil], [GNUNET_SCHEDULER_run], libgnunetutil=1)])
;;
*)
- SAVE_LDFLAGS=$LDFLAGS
- SAVE_CPPFLAGS=$CPPFLAGS
- LDFLAGS="-L$with_libevent/lib $LDFLAGS"
- CPPFLAGS="-I$with_libevent/include $CPPFLAGS"
- AC_CHECK_HEADERS([event2/event.h],
- [AC_CHECK_LIB([event], [event_base_new],
- [LIBEVENT_LDFLAGS="-L$with_libevent/lib"
- LIBEVENT_CPPFLAGS="-I$with_libevent/include"
- libevent=1])])
- LDFLAGS=$SAVE_LDFLAGS
- CPPFLAGS=$SAVE_CPPFLAGS
+ LDFLAGS="-L$with_gnunet/lib $LDFLAGS"
+ CPPFLAGS="-I$with_gnunet/include $CPPFLAGS"
+ AC_CHECK_HEADERS([gnunet/gnunet_util_lib.h],
+ [AC_CHECK_LIB([gnunetutil], [GNUNET_SCHEDULER_run],
+ [GNUNET_LDFLAGS="-L$with_gnunet/lib"
+ GNUNET_CPPFLAGS="-I$with_gnunet/include"
+ libgnunetutil=1])])
;;
esac
],
- [AC_MSG_RESULT([--with-libevent not specified])
- AC_CHECK_HEADERS([event2/event.h], [AC_CHECK_LIB([event],
[event_base_new], libevent=1)])])
-if test "$libevent" != 1
+ [AC_MSG_RESULT([--with-gnunet not specified])
+ AC_CHECK_HEADERS([gnunet/gnunet_util_lib.h], [AC_CHECK_LIB([gnunetutil],
[GNUNET_SCHEDULER_run], libgnunetutil=1)])])
+if test "$libgnunetutil" != 1
then
- AC_MSG_ERROR([MSH requires libevent])
+ AC_MSG_ERROR([MSH requires gnunet])
fi
dnl have all messages as of now
Modified: msh/src/Makefile.am
===================================================================
--- msh/src/Makefile.am 2013-07-29 03:50:22 UTC (rev 28334)
+++ msh/src/Makefile.am 2013-07-29 07:10:22 UTC (rev 28335)
@@ -2,30 +2,14 @@
mping_SOURCES = mping.c
-mshd_SOURCES = mshd.c mshd.h util.c util.h scheduler.c scheduler.h \
+mshd_SOURCES = mshd.c mshd.h util.c util.h \
common.h bitmap.c bitmap.h addressmap.c addressmap.h reduce.h reduce.c
-mshd_LDADD = -lgnunetutil -levent -lm
-mshd_CPPFLAGS = $(LIBEVENT_CPPFLAGS)
-mshd_LDFLAGS = $(LIBEVENT_LDFLAGS)
+mshd_LDADD = -lgnunetutil -lm
check_PROGRAMS = \
- test-scheduler \
- test-scheduler-socket \
test-bitmap \
test-addressmap
-test_scheduler_SOURCES = test_scheduler.c scheduler.c scheduler.h common.h \
- util.c util.h common.h
-test_scheduler_LDADD = -lgnunetutil -levent
-test_scheduler_CPPFLAGS = $(LIBEVENT_CPPFLAGS)
-test_scheduler_LDFLAGS = $(LIBEVENT_LDFLAGS)
-
-test_scheduler_socket_SOURCES = test_scheduler_socket.c scheduler.c
scheduler.h \
- common.h util.c util.h
-test_scheduler_socket_LDADD = -lgnunetutil -levent
-test_scheduler_socket_CPPFLAGS = $(LIBEVENT_CPPFLAGS)
-test_scheduler_socket_LDFLAGS = $(LIBEVENT_LDFLAGS)
-
test_bitmap_SOURCES = test_bitmap.c bitmap.c bitmap.h
test_bitmap_LDADD = -lgnunetutil
@@ -38,6 +22,3 @@
test-bitmap \
test-addressmap
-noinst_PROGRAMS = test
-test_SOURCES = test.c
-test_LDADD = -lgnunetutil
\ No newline at end of file
Modified: msh/src/addressmap.c
===================================================================
--- msh/src/addressmap.c 2013-07-29 03:50:22 UTC (rev 28334)
+++ msh/src/addressmap.c 2013-07-29 07:10:22 UTC (rev 28335)
@@ -75,7 +75,7 @@
* Get the 32bit IP addresses from an instance address
*/
#define instance_address_ip(iaddr) \
- ((uint32_t) iaddr->ip)
+ (NULL == iaddr ? 0 : ((uint32_t) iaddr->ip))
/**
@@ -323,11 +323,17 @@
old_ia = old->addr_head;
new_ia = new->addr_head;
n = 0;
+ LOG_DEBUG ("Intersecting %u old address with %u new addresses for instance
%u\n",
+ old->naddrs, new->naddrs, rank);
while (NULL != old_ia)
{
if ((NULL == new_ia)
|| (instance_address_ip (old_ia) < instance_address_ip (new_ia)) )
{
+ LOG_DEBUG ("Removing old address %u < new address %u\n",
+ instance_address_ip (old_ia), instance_address_ip (new_ia));
+ LOG_DEBUG ("\t old address: %s\n", ip2str (instance_address_ip(old_ia)));
+ LOG_DEBUG ("\t new address: %s\n", ip2str (instance_address_ip(new_ia)));
tmp = old_ia->next;
GNUNET_CONTAINER_DLL_remove (old->addr_head, old->addr_tail, old_ia);
free (old_ia);
@@ -345,6 +351,8 @@
new_ia = new_ia->next;
n++;
}
+ LOG_DEBUG ("Number of addresses for instance %u after intersection: %u\n",
+ rank, n);
return n;
}
@@ -472,7 +480,8 @@
for (nip = 0; nip < iainfo->naddrs; nip++, iaddr = iaddr->next)
{
GNUNET_assert (NULL != iaddr);
- _iaddr_msgs[cnt]->ipaddrs[nip] = instance_address_ip (iaddr);
+ GNUNET_assert (0 != htonl (instance_address_ip (iaddr)));
+ _iaddr_msgs[cnt]->ipaddrs[nip] = htonl (instance_address_ip (iaddr));
}
}
*iaddr_msgs = _iaddr_msgs;
@@ -519,7 +528,7 @@
for (cnt = 0; cnt < n; cnt++)
{
iaddr = instance_address_create_sockaddr_in
- (0, (in_addr_t) iaddr_msg->ipaddrs[cnt]);
+ (0, (in_addr_t) ntohl (iaddr_msg->ipaddrs[cnt]));
instance_address_info_add_address (iainfo, iaddr);
}
n = addressmap_intersect (m, iainfo);
Modified: msh/src/mshd.c
===================================================================
--- msh/src/mshd.c 2013-07-29 03:50:22 UTC (rev 28334)
+++ msh/src/mshd.c 2013-07-29 07:10:22 UTC (rev 28335)
@@ -2,7 +2,6 @@
#include <gnunet/gnunet_util_lib.h>
#include <mpi.h>
#include "util.h"
-#include "scheduler.h"
#include "mtypes.h"
#include "bitmap.h"
#include "addressmap.h"
@@ -38,46 +37,66 @@
struct InstanceAddrInfo *iainfo;
/**
- * The socket open handle to the instance address
+ * The connection handle to the received instance address
*/
- struct SocketOpenHandle *soh;
+ struct GNUNET_CONNECTION_Handle *conn;
/**
- * close task handle
+ * The transmit handle for the above connection
*/
- struct Task *close_task;
+ struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
/**
- * the port number
+ * task to close the connection
*/
- uint16_t port;
+ GNUNET_SCHEDULER_TaskIdentifier close_task;
/**
+ * state for the context
+ */
+ enum {
+ VERIFY_ADDRESS_CTX_WRITE,
+
+ VERIFY_ADDRESS_CTX_CLOSE
+ } state;
+
+ /**
* the ip address
*/
in_addr_t ip;
/**
- * The socket file descriptor associated with the connection used to verify
- * the address
+ * the port number
*/
- int sock;
+ uint16_t port;
+
};
struct ReadContext
{
+ /**
+ * next pointer for DLL
+ */
struct ReadContext *next;
+ /**
+ * prev pointer for DLL
+ */
struct ReadContext *prev;
- /* struct sockaddr_in addr; */
-
- /* socklen_t addrlen; */
-
- struct Task *task;
+ /**
+ * The connection
+ */
+ struct GNUNET_CONNECTION_Handle *conn;
+
+ /**
+ * are we waiting for a read on the above connection
+ */
+ int in_receive;
};
+
/**
* Mapping for instance addresses
*/
@@ -115,31 +134,51 @@
static struct VerifyAddressesCtx *vactx_tail;
/**
- * Task for finalising a round
+ * Array of our IP addresses in network-byte format
*/
-static struct Task *finalise_task;
+static in_addr_t *s_addrs;
/**
- * Array of our IP addresses in network-byte format
+ * Signal handler for SIGINT
*/
-static in_addr_t *s_addrs;
+static struct GNUNET_SIGNAL_Context *shc_int;
/**
- * Tasks for handling SIGINT and SIGTERM
+ * Signal handler for SIGTERM
*/
-static struct Task *sigshut_tasks[2];
+static struct GNUNET_SIGNAL_Context *shc_term;
/**
+ * Pipe used to communicate shutdown via signal.
+ */
+static struct GNUNET_DISK_PipeHandle *sigpipe;
+
+/**
+ * network handle for the listen socket
+ */
+static struct GNUNET_NETWORK_Handle *listen_socket;
+
+/**
* Task for running a round
*/
-static struct Task *rtask;
+static GNUNET_SCHEDULER_TaskIdentifier rtask;
/**
* Task for asynchronous accept on the socket
*/
-static struct Task *atask;
+static GNUNET_SCHEDULER_TaskIdentifier atask;
/**
+ * Task for finalising a round
+ */
+static GNUNET_SCHEDULER_TaskIdentifier finalise_task;
+
+/**
+ * Task for waiting for a shutdown signal
+ */
+static GNUNET_SCHEDULER_TaskIdentifier sigread_task;
+
+/**
* Bitmap for checking which MPI processes have verified our addresses in the
* current round
*/
@@ -161,11 +200,6 @@
static struct ReadContext *rtail;
/**
- * The listen socket for the current round
- */
-static int listen_sock;
-
-/**
* Number of IP addresses
*/
static unsigned int nips;
@@ -178,7 +212,7 @@
/**
* The port number of our local socket
*/
-uint16_t lport;
+uint16_t listen_port;
static char *
@@ -196,7 +230,7 @@
}
-static char *
+char *
ip2str (const in_addr_t ip)
{
static char hostip[NI_MAXHOST];
@@ -223,7 +257,7 @@
* @return GNUNET_OK to continue iteration, GNUNET_SYSERR to abort
*/
static int net_if_processor (void *cls, const char *name,
- int isDefault,
+ int isDefault,
const struct sockaddr *addr,
const struct sockaddr *broadcast_addr,
const struct sockaddr *netmask,
@@ -234,108 +268,104 @@
if (sizeof (struct sockaddr_in) != addrlen)
return GNUNET_OK; /* Only consider IPv4 for now */
- hostip = saddr2str (addr, addrlen);
- if (NULL == hostip)
- return GNUNET_OK;
inaddr = (const struct sockaddr_in *) addr;
- GNUNET_array_append (s_addrs, nips, inaddr->sin_addr.s_addr);
- LOG_DEBUG ("%d: Found IP: %s\n", rank, hostip);
+ GNUNET_array_append (s_addrs, nips, ntohl (inaddr->sin_addr.s_addr));
+ LOG_DEBUG ("%d: Found IP: %s\n", rank,
+ ip2str (ntohl (inaddr->sin_addr.s_addr)));
+ addressmap_add (addrmap, rank, listen_port,
+ ntohl (inaddr->sin_addr.s_addr));
return GNUNET_OK;
}
/**
- * Task to read from socket
- *
- * @param sock the socket
- * @param flags EV_* flags
- * @param cls &atask
+ * Callback function for data received from the network. Note that
+ * both "available" and "err" would be 0 if the read simply timed out.
+ *inaddr->sin_addr.s_addrinaddr->sin_addr.s_addr
+ * @param cls the read context
+ * @param buf pointer to received data
+ * @param available number of bytes availabe in "buf",
+ * possibly 0 (on errors)
+ * @param addr address of the sender
+ * @param addrlen size of addr
+ * @param errCode value of errno (on receiving errors)
*/
static void
-read_socket (evutil_socket_t sock, short flags, void *cls)
+conn_reader(void *cls, const void *buf, size_t available,
+ const struct sockaddr * addr, socklen_t addrlen, int errCode)
{
- struct ReadContext *ctx = cls;
- ssize_t rsize;
+ struct ReadContext *rc = cls;
uint32_t cid;
- scheduler_remove (ctx->task);
- GNUNET_CONTAINER_DLL_remove (rhead, rtail, ctx);
- free (ctx);
- if (IS_SHUTDOWN_EVENT (flags))
+ if (0 == available)
{
- MSH_close (sock);
- return;
- }
- rsize = read (sock, &cid, sizeof (uint32_t));
- if (rsize < 0)
- {
- LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR, "read");
- goto err_ret;
- }
- if (rsize == 0)
- {
GNUNET_break (0);
- goto err_ret;
+ goto clo_ret;
}
+ if ((NULL == buf) || (0 == available))
+ goto clo_ret;
+ (void) memcpy (&cid, buf, sizeof (uint32_t));
cid = ntohl (cid);
LOG_DEBUG ("%d: read id %u from connection\n", rank, cid);
- /* if (!barray_isset (cid)) */
- /* barray_set (cid); */
- MSH_close (sock);
- return;
-
- err_ret:
- MSH_close (sock);
- scheduler_shutdown ();
- return;
+
+ clo_ret:
+ GNUNET_CONTAINER_DLL_remove (rhead, rtail, rc);
+ GNUNET_CONNECTION_destroy (rc->conn);
+ GNUNET_free (rc);
}
/**
* Task to call accept and close on a listening socket
*
- * @param sock the socket
- * @param flags EV_* flags
- * @param cls &atask
+ * @param cls NULL
+ * @param tc the scheduler task context
*/
static void
-accept_task (evutil_socket_t sock, short flags, void *cls)
+accept_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct ReadContext *rctx;
+ struct GNUNET_CONNECTION_Handle *conn;
int csock;
- scheduler_remove (atask);
- atask = NULL;
- if (IS_SHUTDOWN_EVENT (flags))
+ atask = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
{
- (void) close (sock);
- return;
+ GNUNET_break (0);
+ goto clo_ret;
}
LOG_DEBUG ("%d: Got a connect\n", rank);
- if (0 > (csock = accept4 (sock, NULL, NULL, SOCK_NONBLOCK | SOCK_CLOEXEC)))
+ conn = GNUNET_CONNECTION_create_from_accept (NULL, NULL, listen_socket);
+ if (NULL == conn)
{
- LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR, "accept4");
- MSH_close (sock);
- scheduler_shutdown ();
- return;
+ GNUNET_break (0);
+ goto clo_ret;
}
rctx = GNUNET_malloc (sizeof (struct ReadContext));
+ rctx->conn = conn;
+ rctx->in_receive = GNUNET_YES;
+ GNUNET_CONNECTION_receive (rctx->conn, sizeof (unsigned int),
+ GNUNET_TIME_UNIT_FOREVER_REL, conn_reader, rctx);
GNUNET_CONTAINER_DLL_insert_tail (rhead, rtail, rctx);
- rctx->task = scheduler_add_socket (csock, EV_READ, &read_socket, rctx, NULL);
/* resume accepting connections on the listen sock */
- atask = scheduler_add_socket (sock, EV_READ, &accept_task, &atask, NULL);
+ atask = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
+ listen_socket, &accept_task, NULL);
+ return;
+
+ clo_ret:
+ GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (listen_socket));
+ listen_socket = NULL;
}
/**
* Task for running a round
*
- * @param nosock we have no sockets associated with this callback
- * @param flags EV_* flags
* @param cls NULL
+ * @param tc scheduler task context
*/
static void
-run_round (evutil_socket_t nosock, short flags, void *cls);
+run_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
/**
@@ -346,13 +376,13 @@
{
int total_rounds;
- GNUNET_assert (NULL == rtask);
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == rtask);
/* Number of rounds required to contact all processes except ourselves
(rwidth
in parallel in each round) */
total_rounds = ((nproc - 1) + (rwidth - 1)) / rwidth;
if (current_round < total_rounds)
{
- rtask = scheduler_add (&run_round, NULL, TV_IMMEDIATE);
+ rtask = GNUNET_SCHEDULER_add_now (&run_round, NULL);
return;
}
LOG_DEBUG ("Verification phase complete; commencing reduction phase\n");
@@ -361,44 +391,45 @@
/**
+ * Cleans up the address verification context
+ *
+ * @param ctx the context
+ */
+static void
+cleanup_verifiyaddressctx (struct VerifyAddressesCtx *ctx)
+{
+ if (GNUNET_SCHEDULER_NO_TASK != ctx->close_task)
+ GNUNET_SCHEDULER_cancel (ctx->close_task);
+ GNUNET_CONTAINER_DLL_remove (vactx_head, vactx_tail, ctx);
+ GNUNET_free (ctx);
+}
+
+
+/**
* Callback triggered to finalise a round
*
- * @param sock -1 do not use this
- * @param flags EV_* flags
- * @param cls
+ * @param cls NULL
+ * @param tc scheduler task context
*/
static void
-finalise_round (evutil_socket_t sock, short flags, void *cls)
+finalise_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct VerifyAddressesCtx *ctx;
unsigned int cnt;
- scheduler_remove (finalise_task);
- finalise_task = NULL;
+ finalise_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_cancel (atask);
+ atask = GNUNET_SCHEDULER_NO_TASK;
while (NULL != (ctx = vactx_head))
{
- if (NULL != ctx->soh)
- scheduler_open_socket_cancel (ctx->soh);
- if (NULL != ctx->close_task)
- {
- MSH_close (ctx->sock);
- scheduler_remove (ctx->close_task);
- }
- GNUNET_CONTAINER_DLL_remove (vactx_head, vactx_tail, ctx);
- free (ctx);
+ cleanup_verifiyaddressctx (ctx);
}
for (cnt = 0; cnt < rwidth; cnt++)
instance_address_info_destroy (riainfos[cnt]);
- if (IS_SHUTDOWN_EVENT (flags))
- return;
- MSH_close (listen_sock);
- listen_sock = -1;
- scheduler_remove (atask);
- atask = NULL;
if (1 != bitmap_allset (bitmap))
{
LOG_ERROR ("Could not verify addresses of all hosts\n");
- scheduler_shutdown ();
+ GNUNET_SCHEDULER_shutdown ();
return;
}
current_round++;
@@ -407,36 +438,21 @@
/**
- * Callback triggered when the data on the sock is written and the socket is
- * available for writing again. We close the associated socket in this
callback.
+ * Task for closing a connection
*
- * @param sock the socket file descriptor
- * @param flags EV_* flags
- * @param cls context for verifying addresses
+ * @param cls the verify address context
+ * @param tc the scheduler task context
*/
static void
-socket_write_cb (evutil_socket_t sock, short flags, void *cls)
+conn_close_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct VerifyAddressesCtx *ctx = cls;
int lb;
int source;
int off;
- scheduler_remove (ctx->close_task);
- ctx->close_task = NULL;
- if (ctx->sock == sock)
- MSH_close (sock);
- else if (-1 == sock)
- GNUNET_break (0);
- if (IS_SHUTDOWN_EVENT (flags))
- {
- GNUNET_CONTAINER_DLL_remove (vactx_head, vactx_tail, ctx);
- free (ctx);
- return;
- }
- /* FIXME: add the addresses associated with the contex to the mapping */
-
- lb = rank - current_round * rwidth - rwidth + nproc;
+ ctx->close_task = GNUNET_SCHEDULER_NO_TASK;
+ lb = rank - (current_round * rwidth) - rwidth + nproc;
GNUNET_assert (0 <= lb);
lb %= nproc;
source = instance_address_info_get_rank (ctx->iainfo);
@@ -447,49 +463,66 @@
bitmap_set (bitmap, off, 1);
addressmap_add (addrmap, instance_address_info_get_rank (ctx->iainfo),
ctx->port, ctx->ip);
- return;
+ cleanup_verifiyaddressctx (ctx);
}
/**
- * Callback triggered when a socket connection is ready to be written to
+ * Function called to notify a client about the connection
+ * begin ready to queue more data. "buf" will be
+ * NULL and "size" zero if the connection was closed for
+ * writing in the meantime.
*
- * @param sockfd the file descriptor of the socket which is ready to be written
- * to
- * @param cls context information for verifying an instance address
+ * @param cls closure
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
*/
-static void
-socket_open_cb (int sockfd, void *cls)
+static size_t
+conn_write_cb (void *cls, size_t size, void *buf)
{
struct VerifyAddressesCtx *ctx = cls;
- uint32_t id;
+ size_t rsize;
+ uint32_t rank_;
- ctx->soh = NULL;
- if (-1 == sockfd)
+ ctx->transmit_handle = NULL;
+ rsize = 0;
+ if ((NULL == buf) || (0 == size))
{
+ goto clo_ret;
+ }
+ if (size < sizeof (uint32_t))
+ {
GNUNET_break (0);
- /* FIXME: Check if we already got a mapping for the instance */
- goto err_ret;
+ goto clo_ret;
}
- LOG_DEBUG ("%d: Opened a connection to %s:%u\n", rank,
- ip2str (ctx->ip), ctx->port);
- ctx->sock = sockfd;
- id = htonl ((uint32_t) rank);
- if (sizeof (uint32_t) != write (sockfd, &id, sizeof (uint32_t)))
+ switch (ctx->state)
{
- GNUNET_break (0); /* FIXME: handle error */
- MSH_close (sockfd);
- goto err_ret;
+ case VERIFY_ADDRESS_CTX_WRITE:
+ rank_ = htonl (rank);
+ rsize = sizeof (uint32_t);
+ (void) memcpy (buf, &rank_, rsize);
+ ctx->transmit_handle =
+ GNUNET_CONNECTION_notify_transmit_ready (ctx->conn, 0,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &conn_write_cb, ctx);
+ ctx->state = VERIFY_ADDRESS_CTX_CLOSE;
+ return rsize;
+ case VERIFY_ADDRESS_CTX_CLOSE:
+ ctx->close_task = GNUNET_SCHEDULER_add_now (&conn_close_task, ctx);
+ return 0;
+ default:
+ GNUNET_assert (0);
}
- ctx->close_task =
- scheduler_add_socket (sockfd, EV_WRITE, &socket_write_cb, ctx, NULL);
- return;
- err_ret:
+ clo_ret:
GNUNET_CONTAINER_DLL_remove (vactx_head, vactx_tail, ctx);
- free (ctx);
+ GNUNET_CONNECTION_destroy (ctx->conn);
+ GNUNET_free (ctx);
+ return size;
}
+
static unsigned int bmx;
static int
@@ -504,20 +537,26 @@
in_addr.sin_port = htons (port);
in_addr.sin_addr.s_addr = htonl ((uint32_t) ip);
ctx = GNUNET_malloc (sizeof (struct VerifyAddressesCtx));
- ctx->soh = scheduler_open_socket ((struct sockaddr *) &in_addr,
- sizeof (struct sockaddr_in),
- &socket_open_cb, ctx);
- ctx->port = port;
- ctx->ip = ip;
- ctx->sock = -1;
- ctx->iainfo = iainfo;
- if (NULL == ctx->soh)
+ ctx->conn =
+ GNUNET_CONNECTION_create_from_sockaddr (AF_INET,
+ (const struct sockaddr *)
+ &in_addr,
+ sizeof (struct sockaddr_in));
+ if (NULL == ctx->conn)
{
GNUNET_break (0);
free (ctx);
return GNUNET_SYSERR;
}
+ ctx->port = port;
+ ctx->ip = ip;
+ ctx->iainfo = iainfo;
+ ctx->state = VERIFY_ADDRESS_CTX_WRITE;
GNUNET_CONTAINER_DLL_insert_tail (vactx_head, vactx_tail, ctx);
+ ctx->transmit_handle =
+ GNUNET_CONNECTION_notify_transmit_ready (ctx->conn, sizeof (uint32_t),
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &conn_write_cb, ctx);
return GNUNET_OK;
}
@@ -539,8 +578,8 @@
bmx = 0;
if (GNUNET_OK != instance_address_info_iterate_addresses (iainfo,
- &address_iterator_cb,
- iainfo))
+
&address_iterator_cb,
+ iainfo))
return GNUNET_SYSERR;
return GNUNET_OK;
}
@@ -679,11 +718,11 @@
msize = sizeof (struct MSH_MSG_VerifyAddress) + (nips * sizeof (uint32_t));
msg = GNUNET_malloc (msize);
msg->header.size = htons (msize);
- msg->port = htons (lport);
+ msg->port = htons (listen_port);
msg->nips = htons (nips);
for (cnt = 0; cnt < nips; cnt++)
{
- msg->ipaddrs[cnt] = (uint32_t) s_addrs[cnt]; /* IPs already in NB */
+ msg->ipaddrs[cnt] = htonl ((uint32_t) s_addrs[cnt]);
}
width = rwidth;
if ( (0 != ( (nproc - 1) % rwidth)) && (current_round == ( (nproc - 1) /
rwidth)) )
@@ -744,49 +783,30 @@
static int
run_round_ ()
{
- struct sockaddr_in addr;
- struct timeval tv;
- socklen_t addrlen;
- int sock;
unsigned int cnt;
-
- addrlen = sizeof (struct sockaddr_in);
- (void) memset (&addr, 0, addrlen);
- sock = open_listen_socket ((struct sockaddr *) &addr, addrlen, rwidth);
- if (-1 == sock)
- return GNUNET_SYSERR;
- lport = ntohs (addr.sin_port);
- if (0 == lport)
- {
- GNUNET_break (0);
- goto clo_ret;
- }
+
if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
{
GNUNET_break (0);
- goto clo_ret;
+ return GNUNET_SYSERR;
}
if (GNUNET_SYSERR == send_addresses ())
- goto clo_ret;
+ return GNUNET_SYSERR;
if (NULL == (riainfos = receive_addresses ()))
- goto clo_ret;
- atask = scheduler_add_socket (sock, EV_READ, &accept_task, &atask, NULL);
+ return GNUNET_SYSERR;
+ atask = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
+ listen_socket, &accept_task, NULL);
+
if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
{
GNUNET_break (0);
- goto clo_ret;
+ return GNUNET_SYSERR;
}
- tv.tv_sec = 1;
- tv.tv_usec = 0;
for (cnt = 0; cnt < rwidth; cnt++)
verify_addresses (riainfos[cnt]);
- listen_sock = sock;
- finalise_task = scheduler_add (&finalise_round, NULL, &tv);
+ finalise_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
+ &finalise_round, NULL);
return GNUNET_OK;
-
- clo_ret:
- (void) close (sock);
- return GNUNET_SYSERR;
}
@@ -798,75 +818,109 @@
* @param cls pointer to the corresponding Task
*/
static void
-sig_shutdown (evutil_socket_t signal, short flags, void *cls)
+sighandler_shutdown ()
{
- struct Task **task = cls;
- unsigned int cnt;
+ static char c;
+ int old_errno; /* back-up errno */
- scheduler_remove (*task);
- *task = NULL;
- if (IS_SHUTDOWN_EVENT (flags))
- return;
- LOG_DEBUG ("Got signal %d. Exiting.\n", signal);
- scheduler_shutdown ();
+ old_errno = errno;
+ GNUNET_break (1 ==
+ GNUNET_DISK_file_write (GNUNET_DISK_pipe_handle
+ (sigpipe, GNUNET_DISK_PIPE_END_WRITE),
+ &c, sizeof (c)));
+ errno = old_errno;
}
/**
* Task for running a round
*
- * @param nosock we have no sockets associated with this callback
- * @param flags EV_* flags
* @param cls NULL
+ * @param tc scheduler task context
*/
static void
-run_round (evutil_socket_t nosock, short flags, void *cls)
+run_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- scheduler_remove (rtask);
- rtask = NULL;
- if (IS_SHUTDOWN_EVENT (flags))
- return;
+ rtask = GNUNET_SCHEDULER_NO_TASK;
if (GNUNET_OK != run_round_ ())
- scheduler_shutdown ();
+ GNUNET_SCHEDULER_shutdown ();
}
/**
- * Event callback for the first running task
+ * Function called whenever a signal is written to the signal pipe
*
- * @param nosock we have no sockets associated with this callback
- * @param flags EV_* flags
* @param cls NULL
+ * @param tc scheduler task context
*/
static void
-run (evutil_socket_t nosock, short flags, void *cls)
+sigpipe_read (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- LOG_DEBUG ("Running main task\n");
- sigshut_tasks[0] = scheduler_add_signal (SIGINT, &sig_shutdown,
- &sigshut_tasks[0], NULL);
- sigshut_tasks[1] = scheduler_add_signal (SIGTERM, &sig_shutdown,
- &sigshut_tasks[1], NULL);
- //rtask = scheduler_add (&run_round, NULL, TV_IMMEDIATE);
- schedule_next_round ();
+ const struct GNUNET_DISK_FileHandle *pr;
+ char c[16];
+
+ sigread_task = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ return;
+ pr = GNUNET_DISK_pipe_handle (sigpipe, GNUNET_DISK_PIPE_END_READ);
+ GNUNET_break (0 < GNUNET_DISK_file_read (pr, &c, sizeof (c)));
+ LOG_DEBUG ("Caught signal. Exiting.\n");
+ GNUNET_SCHEDULER_shutdown ();
}
/**
- * Prints help message for this program
+ * Main function that will be run.
+ *
+ * @param cls closure
+ * @param args remaining command-line arguments
+ * @param cfgfile name of the configuration file used (for saving, can be
NULL!)
+ * @param cfg configuration
*/
-static void
-print_help ()
+static void
+run (void *cls, char *const *args, const char *cfgfile,
+ const struct GNUNET_CONFIGURATION_Handle *cfg)
{
- char *msg =
-"mshd: MSH daemon.\n"
-"This binary is a part of "PACKAGE_NAME "-" PACKAGE_VERSION " available from "
PACKAGE_URL ".\n"
-"This program takes the following options:\n"
-" -w num\t: \t The number of processes which verify at each round.\n"
-" -h \t: \t Print this help\n"
-"Report bugs to " PACKAGE_BUGREPORT "\n"
- ;
-
- fprintf (stderr, "%s", msg);
+ const struct GNUNET_DISK_FileHandle *fh;
+ struct sockaddr_in addr;
+ socklen_t addrlen;
+ unsigned int cnt;
+
+ LOG_DEBUG ("Running main task\n");
+ if (0 == rwidth)
+ {
+ LOG_ERROR ("Round width cannot be 0. Exiting\n");
+ return;
+ }
+ bitmap = bitmap_create (rwidth);
+ addrmap = addressmap_create (nproc);
+ fh = GNUNET_DISK_pipe_handle (sigpipe, GNUNET_DISK_PIPE_END_READ);
+ sigread_task =
+ GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL, fh,
+ &sigpipe_read, NULL);
+ addrlen = sizeof (struct sockaddr_in);
+ (void) memset (&addr, 0, addrlen);
+ listen_socket = open_listen_socket ((struct sockaddr *) &addr, addrlen,
rwidth);
+ listen_port = ntohs (addr.sin_port);
+ if (NULL == listen_socket)
+ return;
+ if (0 == listen_port)
+ {
+ GNUNET_break (0);
+ goto clo_ret;
+ }
+ GNUNET_OS_network_interfaces_list (&net_if_processor, NULL);
+ if (0 == nips)
+ {
+ LOG_ERROR ("No IP addresses found\n");
+ return;
+ }
+ schedule_next_round ();
+ return;
+
+ clo_ret:
+ GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (listen_socket));
+ listen_socket = NULL;
}
@@ -880,51 +934,36 @@
int
main (int argc, char **argv)
{
- extern char *optarg;
+ static const struct GNUNET_GETOPT_CommandLineOption options[] = {
+ {'w', "round-width", "COUNT",
+ "set the size of each round to COUNT",
+ GNUNET_YES, &GNUNET_GETOPT_set_uint, &rwidth},
+ GNUNET_GETOPT_OPTION_END
+ };
int ret;
int c;
+
ret = 1;
- rwidth = 1;
-
- listen_sock = -1;
- GNUNET_log_setup ("mshd", "DEBUG", NULL);
- while (-1 != (c = getopt (argc, argv, "hw:")))
+ rwidth = 1;
+ if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv,
+ &argc, (char *const **)
&argv))
{
- switch (c)
- {
- case 'w':
- if (NULL == optarg)
- {
- LOG_ERROR ("Argument is NULL\n");
- return 1;
- }
- if (1 != sscanf (optarg, "%u", &rwidth))
- {
- LOG_ERROR ("-w option requires an unsinged number argument.\n");
- print_help ();
- return 1;
- }
- if (0 == rwidth)
- {
- LOG_ERROR ("Round width cannot be 0\n");
- return 1;
- }
- break;
- case 'h':
- print_help ();
- return 0;
- case '?':
- print_help();
- return 1;
- default:
- printf ("Unknown option: %c\n", c);
- GNUNET_assert (0);
- }
+ GNUNET_break (0);
+ return 2;
}
+ if (NULL == (sigpipe = GNUNET_DISK_pipe (GNUNET_NO, GNUNET_NO,
+ GNUNET_NO, GNUNET_NO)))
+ {
+ GNUNET_break (0);
+ ret = GNUNET_SYSERR;
+ return 1;
+ }
+ shc_int = GNUNET_SIGNAL_handler_install (SIGINT, &sighandler_shutdown);
+ shc_term = GNUNET_SIGNAL_handler_install (SIGTERM, &sighandler_shutdown);
if (MPI_SUCCESS != MPI_Init(&argc, &argv))
{
LOG_ERROR ("Failed to initialise MPI\n");
- return 1;
+ goto uninstall_sighandlers;
}
if (MPI_SUCCESS != MPI_Comm_size (MPI_COMM_WORLD, &nproc))
{
@@ -941,16 +980,9 @@
LOG_ERROR ("Cannot determine our MPI rank\n");
goto fail;
}
- GNUNET_OS_network_interfaces_list (&net_if_processor, NULL);
- if (0 == nips)
+ if (GNUNET_OK != GNUNET_PROGRAM_run (argc, argv, "mshd", "mshd: MSH daemon",
+ options, &run, NULL))
{
- LOG_ERROR ("No IP addresses found\n");
- goto fail;
- }
- bitmap = bitmap_create (rwidth);
- addrmap = addressmap_create (nproc);
- if (GNUNET_OK != scheduler_run (&run, NULL))
- {
GNUNET_break (0);
goto fail;
}
@@ -970,5 +1002,12 @@
GNUNET_break (MPI_SUCCESS == MPI_Finalize());
GNUNET_free_non_null (s_addrs);
LOG_ERROR ("Returning\n");
+
+ uninstall_sighandlers:
+ if (NULL != listen_socket)
+ GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (listen_socket));
+ GNUNET_DISK_pipe_close (sigpipe);
+ GNUNET_SIGNAL_handler_uninstall (shc_int);
+ GNUNET_SIGNAL_handler_uninstall (shc_term);
return ret;
}
Modified: msh/src/mtypes.h
===================================================================
--- msh/src/mtypes.h 2013-07-29 03:50:22 UTC (rev 28334)
+++ msh/src/mtypes.h 2013-07-29 07:10:22 UTC (rev 28335)
@@ -9,11 +9,6 @@
#include "common.h"
-/**
- * gcc-ism to get packed structs. This won't work on W32 but then do we use
W32
- * for HPC? ;)
- */
-#define MSH_PACKED __attribute__((packed))
/**
* Message header that will be included for all messages
@@ -23,7 +18,7 @@
/**
* The size of the message
*/
- uint16_t size MSH_PACKED;
+ uint16_t size GNUNET_PACKED;
};
@@ -40,12 +35,12 @@
/**
* Randomly chosen port number
*/
- uint16_t port MSH_PACKED;
+ uint16_t port GNUNET_PACKED;
/**
* Number of IP addresses
*/
- uint16_t nips MSH_PACKED;
+ uint16_t nips GNUNET_PACKED;
/**
* IPv4 addresses to follow as 32 bit unsigned integeters
@@ -90,12 +85,12 @@
/**
* The rank of the instance
*/
- uint16_t rank MSH_PACKED;
+ uint16_t rank GNUNET_PACKED;
/**
* The number of addresses
*/
- uint16_t nips MSH_PACKED;
+ uint16_t nips GNUNET_PACKED;
/**
* IPv4 addresses to follow as 32 bit unsigned integers
Modified: msh/src/reduce.c
===================================================================
--- msh/src/reduce.c 2013-07-29 03:50:22 UTC (rev 28334)
+++ msh/src/reduce.c 2013-07-29 07:10:22 UTC (rev 28335)
@@ -12,7 +12,7 @@
#include "mtypes.h"
#define LOG(kind,...) \
- GNUNET_log_from (kind, "mshd-addressmap", __VA_ARGS__)
+ GNUNET_log_from (kind, "mshd-reduce", __VA_ARGS__)
#define LOG_DEBUG(...) LOG(GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
@@ -42,6 +42,7 @@
return ret;
}
send_reqs = GNUNET_malloc (sizeof (MPI_Request) * nmsg);
+ LOG_DEBUG ("%d: Sending addressmap to instance %d\n", rank, instance);
for (cnt = 0; cnt < nmsg; cnt++)
{
if (MPI_SUCCESS !=
@@ -60,6 +61,8 @@
GNUNET_break (0);
goto cleanup;
}
+ GNUNET_free (send_reqs);
+ send_reqs = NULL;
for (cnt = 0; cnt < nmsg; cnt++)
{
if (MPI_SUCCESS != stats[cnt].MPI_ERROR)
@@ -69,12 +72,12 @@
}
}
ret = GNUNET_OK;
-
+
cleanup:
- for (;(NULL != send_reqs) && (cnt >= 0); cnt--)
+ for (;(cnt > 0) && (NULL != send_reqs); cnt--)
{
- GNUNET_break (MPI_SUCCESS == MPI_Cancel (&send_reqs[cnt]));
- GNUNET_break (MPI_SUCCESS == MPI_Wait (&send_reqs[cnt],
MPI_STATUS_IGNORE));
+ GNUNET_break (MPI_SUCCESS == MPI_Cancel (&send_reqs[cnt - 1]));
+ GNUNET_break (MPI_SUCCESS == MPI_Wait (&send_reqs[cnt - 1],
MPI_STATUS_IGNORE));
}
for (cnt = 0; cnt < nmsg; cnt++)
free (iaddr_msgs[cnt]);
@@ -118,14 +121,13 @@
else
{
nrecv = ((nproc - rank) - 1) / step_width;
- if (0 != nrecv)
- {
- lb = rank + 1;
- ub = nproc - 1;
- }
+ if (0 == nrecv)
+ return GNUNET_OK;
+ lb = rank + 1;
+ ub = nproc - 1;
}
GNUNET_assert (nrecv >= 0);
- nrecv *= nproc;
+ nrecv *= nproc; /* we get a message for each instance from each instance */
for (cnt = 0; cnt < nrecv; cnt++)
{
msg = NULL;
@@ -140,6 +142,8 @@
GNUNET_break (0);
goto cleanup;
}
+ LOG_DEBUG ("%d: Receiving %d (nd/th) addressmap message from instance
%d\n",
+ rank, cnt, stat.MPI_SOURCE);
msize = 0;
if ((MPI_SUCCESS != MPI_Get_elements (&stat, MPI_BYTE, &msize))
|| (msize <= 0))
@@ -198,9 +202,12 @@
if (0 != (aggregator = (rank % step_width)))
{
aggregator = rank - aggregator;
- return send_addressmap (aggregator);
+ if (GNUNET_SYSERR == send_addressmap (aggregator))
+ return GNUNET_SYSERR;
}
/* receive address maps */
- receive_addressmap (step);
+ if (GNUNET_SYSERR == receive_addressmap (step))
+ return GNUNET_SYSERR;
}
+ return GNUNET_OK;
}
Deleted: msh/src/scheduler.c
===================================================================
--- msh/src/scheduler.c 2013-07-29 03:50:22 UTC (rev 28334)
+++ msh/src/scheduler.c 2013-07-29 07:10:22 UTC (rev 28335)
@@ -1,305 +0,0 @@
-/**
- * @file scheduler.c
- * @brief task scheduler based on libevent
- * @author Sree Harsha Totakura <address@hidden>
- */
-
-#include "common.h"
-#include "gnunet/gnunet_util_lib.h"
-#include "scheduler.h"
-
-#define LOG(kind,...) \
- GNUNET_log_from (kind, "mshd-scheduler", __VA_ARGS__);
-
-#define LOG_DEBUG(...) LOG(GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__);
-
-#define LOG_ERROR(...) LOG(GNUNET_ERROR_TYPE_ERROR, __VA_ARGS__);
-
-#define LOG_STRERROR(cmd) \
- GNUNET_log_from_strerror (GNUNET_ERROR_TYPE_WARNING, "mshd", cmd)
-
-/**
- * variable for 0 time. Externalised in scheduler.h
- */
-struct timeval tv_immediate;
-
-struct Task
-{
- /**
- * DLL next
- */
- struct Task *next;
-
- /**
- * DLL prev
- */
- struct Task *prev;
-
- struct event *ev;
-};
-
-/**
- * Head for the DLL
- */
-static struct Task *thead;
-
-/**
- * Tail for the DLL
- */
-static struct Task *ttail;
-
-/**
- * Our event base
- */
-static struct event_base *ebase;
-
-
-/**
- * Adds a task which is to be executed when one of the given events are
- * triggered on the given socket or upon the expiry of the given timeout
- *
- * @param sock the sock to wait for
- * @param flags EV_* events; the callback cb
- * @param cb the callback to call when one of the events marked in flags are
- * triggered on for the sock
- * @param cls closure for the callback
- * @param tv how long should we wait for the events. Upon this value the cb is
- * called with EV_TIMEOUT flag. Use NULL to wait forever.
- * @return handle for the task; NULL upon error
- */
-struct Task *
-scheduler_add_socket (evutil_socket_t sock, short flags, event_callback_fn cb,
- void *cls, const struct timeval *tv)
-{
- struct Task *task;
-
- GNUNET_assert (NULL != ebase);
- task = GNUNET_malloc (sizeof (struct Task));
- task->ev = event_new (ebase, sock, flags, cb, cls);
- if (0 != event_add (task->ev, tv))
- {
- free (task);
- return NULL;
- }
- GNUNET_CONTAINER_DLL_insert_tail (thead, ttail, task);
- return task;
-}
-
-
-/**
- * Adds a task which is to be executed after given interval
- *
- * @param cb the callback to call for executing the task
- * @param cls closure for the above callback
- * @param tv the interval after which the task has to be executed; NULL to
- * denote infinite delay
- * @return handle for task; NULL upon error
- */
-struct Task *
-scheduler_add (event_callback_fn cb, void *cls, const struct timeval *tv)
-{
- return scheduler_add_socket (-1, 0, cb, cls, tv);
-}
-
-
-/**
- * Add a task to be executed upon reception of a signal or upon the expiry of a
- * given timeout
- *
- * @param signal the signal to wait for
- * @param cb the callback to call upon reception of the signal
- * @param cls closure for the above callback
- * @param tv how long should we wait for the signal before. Upon this value
the cb is
- * called with EV_TIMEOUT flag. Use NULL to wait forever.
- * @return handle for the task; NULL upon error
- */
-struct Task *
-scheduler_add_signal (int signal, event_callback_fn cb, void *cls,
- const struct timeval *tv)
-{
- return scheduler_add_socket (signal, EV_SIGNAL, cb, cls, tv);
-}
-
-
-/**
- * Remove a task. All tasks are to be removed (even after their respective
- * callbacks are executed)
- *
- * @param task the task handle to remove
- */
-void
-scheduler_remove (struct Task *task)
-{
- GNUNET_CONTAINER_DLL_remove (thead, ttail, task);
- GNUNET_break (0 == event_del (task->ev));
- event_free (task->ev);
- free (task);
-}
-
-/**
- * Shutdowns the scheduler. All pending tasks are executed (their respective
- * callbacks will be called). Use IS_SHUTDOWN_EVENT() to check if the
callbacks
- * are called upon scheduler's shutdown. It is not possible to add any tasks
- * after this function is called.
- *
- * @see IS_SHUTDOWN_EVENT
- */
-void
-scheduler_shutdown ()
-{
- struct Task *task;
-
- for (task = thead; NULL != task; task = task->next)
- {
- event_active (task->ev, EV_READ | EV_WRITE | EV_TIMEOUT, 0);
- }
-}
-
-
-/**
- * Run the scheduler loop by calling the given callback. This function returns
- * once all tasks are finished or after a call to scheduler_shutdown() (which
- * causes all waiting tasks to be executed)
- *
- * @param cb the callback to call when the scheduler is ready. Further tasks
- * can be added through this callback.
- * @return GNUNET_OK if all tasks are successfully executed; GNUNET_SYSERR
upon error
- */
-int
-scheduler_run (event_callback_fn cb, void *cls)
-{
- struct Task *task;
- struct event *sev;
- int ret;
-
- ebase = event_base_new ();
- if (NULL == ebase)
- {
- LOG_ERROR ("Cannot allocate libevent event base\n");
- return GNUNET_SYSERR;
- }
- sev = evtimer_new (ebase, cb, cls);
- evtimer_add (sev, TV_IMMEDIATE);
- ret = event_base_dispatch (ebase);
- evtimer_del (sev);
- event_free (sev);
- event_base_free (ebase);
- return (1 == ret) ? GNUNET_OK : GNUNET_SYSERR;
-}
-
-
-/**
- * Handle to be returned from scheduler_open_socket()
- */
-struct SocketOpenHandle
-{
- /**
- * the function to call when the socket is ready
- */
- socket_open_fn cb;
-
- /**
- * The closure for the above callback
- */
- void *cls;
-
- /**
- * The task associated with the socket. Will be executed when the connection
- * on the socket is ready
- */
- struct Task *task;
-
- /**
- * The file descriptor of the socket
- */
- int sock;
-};
-
-
-/**
- * Callback that will be called when the socket is ready for reading
- *
- * @param sock the file descriptor of the socket
- * @param flags EV_* flags
- * @param cls the closure
- */
-static void
-open_socket_cb (evutil_socket_t sock, short flags, void *cls)
-{
- struct SocketOpenHandle *h = cls;
- socket_open_fn cb;
- void *cbcls;
- int errval;
- socklen_t optlen;
-
- scheduler_remove (h->task);
- h->task = NULL;
- cb = h->cb;
- cbcls = h->cls;
- GNUNET_assert (h->sock == sock);
- free (h);
- if (IS_SHUTDOWN_EVENT (flags))
- goto err_ret;
- errval = 1;
- optlen = sizeof (errval);
- if (0 != getsockopt (sock, SOL_SOCKET, SO_ERROR, &errval, &optlen))
- {
- LOG_STRERROR ("getsockopt");
- goto err_ret;
- }
- if (0 != errval)
- {
- LOG_ERROR ("connect() failed for a socket: %s\n", strerror (errval));
- goto err_ret;
- }
- cb (sock, cbcls);
- return;
-
- err_ret:
- MSH_close (sock);
- cb (-1, cbcls);
-}
-
-
-/**
- * Open a socket, connect it to the target address and schedule a task to be
- * executed when the connection is ready.
- *
- * @param addr the target address to connect
- * @param addrlen the length of the addr
- * @param cb the callback to call to signal success or failure
- * @param cls the closure for the above callback
- * @return a handle which can be used to cancel the task to be executed when
the
- * connection is ready; NULL upon error
- */
-struct SocketOpenHandle *
-scheduler_open_socket (const struct sockaddr *addr, const socklen_t addrlen,
- socket_open_fn cb, void *cls)
-{
- struct SocketOpenHandle *h;
- int sock;
-
- GNUNET_assert (NULL != cb);
- if (-1 == (sock = open_socket (addr, addrlen)))
- return NULL;
- h = GNUNET_malloc (sizeof (struct SocketOpenHandle));
- h->cb = cb;
- h->cls = cls;
- h->sock = sock;
- h->task = scheduler_add_socket (sock, EV_WRITE, &open_socket_cb, h, NULL);
- return h;
-}
-
-
-/**
- * Cancel a handle created with scheduler_open_socket()
- *
- * @param h the handle to cancel
- */
-void
-scheduler_open_socket_cancel (struct SocketOpenHandle *h)
-{
- scheduler_remove (h->task);
- MSH_close (h->sock);
- free (h);
-}
Deleted: msh/src/scheduler.h
===================================================================
--- msh/src/scheduler.h 2013-07-29 03:50:22 UTC (rev 28334)
+++ msh/src/scheduler.h 2013-07-29 07:10:22 UTC (rev 28335)
@@ -1,151 +0,0 @@
-/**
- * @file scheduler.h
- * @brief interface for task scheduler based on libevent
- * @author Sree Harsha Totakura <address@hidden>
- */
-
-#ifndef SCHEDULER_H_
-#define SCHEDULER_H_
-
-#include "common.h"
-#include "event2/event.h"
-
-extern struct timeval tv_immediate;
-
-/**
- * Use this for scheduling tasks immediately
- */
-#define TV_IMMEDIATE &tv_immediate
-
-/**
- * Returns true if the flags denote a shutdown event
- */
-#define IS_SHUTDOWN_EVENT(flags) ((flags & (EV_READ | EV_WRITE | EV_TIMEOUT))
== (EV_READ | EV_WRITE | EV_TIMEOUT))
-
-
-/**
- * Opaque handle for a task
- */
-struct Task;
-
-
-/**
- * Adds a task which is to be executed when one of the given events are
- * triggered on the given socket or upon the expiry of the given timeout
- *
- * @param sock the sock to wait for
- * @param flags EV_* events; the callback cb
- * @param cb the callback to call when one of the events marked in flags are
- * triggered on for the sock
- * @param cls closure for the callback
- * @param tv how long should we wait for the events. Upon this value the cb is
- * called with EV_TIMEOUT flag. Use NULL to wait forever.
- * @return handle for the task; NULL upon error
- */
-struct Task *
-scheduler_add_socket (evutil_socket_t sock, short flags, event_callback_fn cb,
- void *cls, const struct timeval *tv);
-
-
-/**
- * Adds a task which is to be executed after given interval
- *
- * @param cb the callback to call for executing the task
- * @param cls closure for the above callback
- * @param tv the interval after which the task has to be executed; NULL to
- * denote infinite delay
- * @return handle for task; NULL upon error
- */
-struct Task *
-scheduler_add (event_callback_fn cb, void *cls, const struct timeval *tv);
-
-
-/**
- * Add a task to be executed upon reception of a signal or upon the expiry of a
- * given timeout
- *
- * @param signal the signal to wait for
- * @param cb the callback to call upon reception of the signal
- * @param cls closure for the above callback
- * @param tv how long should we wait for the signal before. Upon this value
the cb is
- * called with EV_TIMEOUT flag. Use NULL to wait forever.
- * @return handle for the task; NULL upon error
- */
-struct Task *
-scheduler_add_signal (int signal, event_callback_fn cb, void *cls,
- const struct timeval *tv);
-
-
-/**
- * Remove a task. All tasks are to be removed (even after their respective
- * callbacks are executed)
- *
- * @param task the task handle to remove
- */
-void
-scheduler_remove (struct Task *task);
-
-
-/**
- * Shutdowns the scheduler. All pending tasks are executed (their respective
- * callbacks will be called). Use IS_SHUTDOWN_EVENT() to check if the
callbacks
- * are called upon scheduler's shutdown. It is not possible to add any tasks
- * after this function is called.
- *
- * @see IS_SHUTDOWN_EVENT
- */
-void
-scheduler_shutdown ();
-
-
-/**
- * Run the scheduler loop by calling the given callback. This function returns
- * once all tasks are finished or after a call to scheduler_shutdown() (which
- * causes all waiting tasks to be executed)
- *
- * @param cb the callback to call when the scheduler is ready. Further tasks
- * can be added through this callback.
- * @return MSH_OK if all tasks are successfully executed; MSH_SYSERR upon error
- */
-int
-scheduler_run (event_callback_fn cb, void *cls);
-
-
-/**
- * The type of the function which is used as a callback argument to
- * scheduler_open_socket(). The callback will be called when a socket
- * connection is either successfully established or failed
- *
- * @param sockfd the socket file descriptor; upon failure its value is -1
- * @param cls the closure for this callback as passed to
scheduler_open_socket()
- */
-typedef void (* socket_open_fn) (int sockfd, void *cls);
-
-
-/**
- * Open a socket, connect it to the target address and schedule a task to be
- * executed when the connection is ready.
- *
- * @param addr the target address to connect
- * @param addrlen the length of the addr
- * @param cb the callback to call to signal success or failure
- * @param cls the closure for the above callback
- * @return a handle which can be used to cancel the task to be executed when
the
- * connection is ready; NULL upon error
- */
-struct SocketOpenHandle *
-scheduler_open_socket (const struct sockaddr *addr, const socklen_t addrlen,
- socket_open_fn cb, void *cls);
-
-
-/**
- * Cancel a handle created with scheduler_open_socket()
- *
- * @param h the handle to cancel
- */
-void
-scheduler_open_socket_cancel (struct SocketOpenHandle *h);
-
-#endif /* SCHEDULER_H_ */
-
-/* End of scheduler.h */
Modified: msh/src/util.c
===================================================================
--- msh/src/util.c 2013-07-29 03:50:22 UTC (rev 28334)
+++ msh/src/util.c 2013-07-29 07:10:22 UTC (rev 28335)
@@ -1,9 +1,12 @@
#include "common.h"
+#include <gnunet/gnunet_util_lib.h>
+#include "mshd.h"
#include "util.h"
#define LOG_STRERROR(cmd) \
GNUNET_log_from_strerror (GNUNET_ERROR_TYPE_WARNING, "mshd-util", cmd)
+
/**
* Creates a new non-blocking socket and binds it to the given address and
makes
* it a listen socket
@@ -12,46 +15,47 @@
* @param addrlen the length of the addr
* @param backlog the max length of the pending connections. This will be
* passed to listen()
- * @return the socket's fd; -1 on error
+ * @return the handler to the socket; NULL upon error
*/
-int
+struct GNUNET_NETWORK_Handle *
open_listen_socket (struct sockaddr *addr, const socklen_t addrlen, int
backlog)
{
+ struct GNUNET_NETWORK_Handle *lsock;
socklen_t newaddrlen;
- int sock;
+ int sockfd;
- sock = socket (AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
- if (-1 == sock)
+ if (NULL == (lsock = GNUNET_NETWORK_socket_create (AF_INET, SOCK_STREAM, 0)))
{
- LOG_STRERROR ("socket");
- return -1;
+ GNUNET_break (0);
+ return NULL;
}
- if (-1 == bind (sock, addr, addrlen))
+ if (GNUNET_OK != GNUNET_NETWORK_socket_bind (lsock, addr, addrlen, 0))
{
- LOG_STRERROR ("bind");
+ GNUNET_break (0);
goto clo_ret;
}
+ sockfd = GNUNET_NETWORK_get_fd (lsock);
newaddrlen = addrlen;
- if (-1 == getsockname (sock, addr, &newaddrlen))
+ if (-1 == getsockname (sockfd, addr, &newaddrlen))
{
LOG_STRERROR ("getsockname");
goto clo_ret;
- }
+ }
if (newaddrlen != addrlen)
{
GNUNET_break (0);
goto clo_ret;
}
- if (-1 == listen (sock, backlog))
+ if (GNUNET_OK != GNUNET_NETWORK_socket_listen (lsock, rwidth))
{
- LOG_STRERROR ("listen");
+ GNUNET_break (0);
goto clo_ret;
}
- return sock;
+ return lsock;
clo_ret:
- (void) close (sock);
- return -1;
+ GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (lsock));
+ return NULL;
}
Modified: msh/src/util.h
===================================================================
--- msh/src/util.h 2013-07-29 03:50:22 UTC (rev 28334)
+++ msh/src/util.h 2013-07-29 07:10:22 UTC (rev 28335)
@@ -3,6 +3,7 @@
#include <gnunet/gnunet_common.h>
+
/**
* Creates a new non-blocking socket and binds it to the given address and
makes
* it a listen socket
@@ -11,9 +12,9 @@
* @param addrlen the length of the addr
* @param backlog the max length of the pending connections. This will be
* passed to listen()
- * @return the socket's fd; -1 on error
+ * @return the handler to the socket; NULL upon error
*/
-int
+struct GNUNET_NETWORK_Handle *
open_listen_socket (struct sockaddr *addr, const socklen_t addrlen, int
backlog);
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r28335 - in msh: . src,
gnunet <=