gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r37657 - gnunet/src/multicast


From: gnunet
Subject: [GNUnet-SVN] r37657 - gnunet/src/multicast
Date: Thu, 4 Aug 2016 01:20:37 +0200

Author: tg
Date: 2016-08-04 01:20:37 +0200 (Thu, 04 Aug 2016)
New Revision: 37657

Modified:
   gnunet/src/multicast/multicast_api.c
Log:
multicast: switch to MQ

Modified: gnunet/src/multicast/multicast_api.c
===================================================================
--- gnunet/src/multicast/multicast_api.c        2016-08-03 23:20:34 UTC (rev 
37656)
+++ gnunet/src/multicast/multicast_api.c        2016-08-03 23:20:37 UTC (rev 
37657)
@@ -27,6 +27,7 @@
 
 #include "platform.h"
 #include "gnunet_util_lib.h"
+#include "gnunet_mq_lib.h"
 #include "gnunet_multicast_service.h"
 #include "multicast.h"
 
@@ -73,13 +74,23 @@
   /**
    * Client connection to the service.
    */
-  struct GNUNET_CLIENT_MANAGER_Connection *client;
+  struct GNUNET_MQ_Handle *mq;
 
   /**
-   * Message to send on reconnect.
+   * Time to wait until we try to reconnect on failure.
    */
-  struct GNUNET_MessageHeader *connect_msg;
+  struct GNUNET_TIME_Relative reconnect_backoff;
 
+  /**
+   * Task for reconnecting when the listener fails.
+   */
+  struct GNUNET_SCHEDULER_Task *reconnect_task;
+
+  /**
+   * Message to send on connect.
+   */
+  struct GNUNET_MQ_Envelope *connect_env;
+
   GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
   GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
   GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
@@ -198,31 +209,21 @@
 
 
 /**
- * Send first message to the service after connecting.
+ * Check join request message.
  */
-static void
-group_send_connect_msg (struct GNUNET_MULTICAST_Group *grp)
+static int
+check_group_join_request (void *cls,
+                          const struct MulticastJoinRequestMessage *jreq)
 {
-  uint16_t cmsg_size = ntohs (grp->connect_msg->size);
-  struct GNUNET_MessageHeader *cmsg = GNUNET_malloc (cmsg_size);
-  GNUNET_memcpy (cmsg, grp->connect_msg, cmsg_size);
-  GNUNET_CLIENT_MANAGER_transmit_now (grp->client, cmsg);
-  GNUNET_free (cmsg);
-}
+  uint16_t size = ntohs (jreq->header.size);
 
+  if (sizeof (*jreq) == size)
+    return GNUNET_OK;
 
-/**
- * Got disconnected from service.  Reconnect.
- */
-static void
-group_recv_disconnect (void *cls,
-                        struct GNUNET_CLIENT_MANAGER_Connection *client,
-                        const struct GNUNET_MessageHeader *msg)
-{
-  struct GNUNET_MULTICAST_Group *
-    grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
-  GNUNET_CLIENT_MANAGER_reconnect (client);
-  group_send_connect_msg (grp);
+  if (sizeof (*jreq) + sizeof (struct GNUNET_MessageHeader) <= size)
+    return GNUNET_OK;
+
+  return GNUNET_SYSERR;
 }
 
 
@@ -230,16 +231,13 @@
  * Receive join request from service.
  */
 static void
-group_recv_join_request (void *cls,
-                          struct GNUNET_CLIENT_MANAGER_Connection *client,
-                          const struct GNUNET_MessageHeader *msg)
+handle_group_join_request (void *cls,
+                           const struct MulticastJoinRequestMessage *jreq)
 {
-  struct GNUNET_MULTICAST_Group *grp;
-  const struct MulticastJoinRequestMessage *jreq;
+  struct GNUNET_MULTICAST_Group *grp = cls;
   struct GNUNET_MULTICAST_JoinHandle *jh;
-  const struct GNUNET_MessageHeader *jmsg;
+  const struct GNUNET_MessageHeader *jmsg = NULL;
 
-  grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
   if (NULL == grp)
   {
     GNUNET_break (0);
@@ -247,32 +245,39 @@
   }
   if (NULL == grp->join_req_cb)
     return;
-  /* FIXME: this fails to check that 'msg' is well-formed! */
-  jreq = (const struct MulticastJoinRequestMessage *) msg;
+
   if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size))
     jmsg = (const struct GNUNET_MessageHeader *) &jreq[1];
-  else
-    jmsg = NULL;
+
   jh = GNUNET_malloc (sizeof (*jh));
   jh->group = grp;
   jh->member_pub_key = jreq->member_pub_key;
   jh->peer = jreq->peer;
   grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh);
+
+  grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
 }
 
 
 /**
+ * Check multicast message.
+ */
+static int
+check_group_message (void *cls,
+                     const struct GNUNET_MULTICAST_MessageHeader *mmsg)
+{
+  return GNUNET_OK;
+}
+
+
+/**
  * Receive multicast message from service.
  */
 static void
-group_recv_message (void *cls,
-                    struct GNUNET_CLIENT_MANAGER_Connection *client,
-                    const struct GNUNET_MessageHeader *msg)
+handle_group_message (void *cls,
+                      const struct GNUNET_MULTICAST_MessageHeader *mmsg)
 {
-  struct GNUNET_MULTICAST_Group *
-    grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
-  struct GNUNET_MULTICAST_MessageHeader *
-    mmsg = (struct GNUNET_MULTICAST_MessageHeader *) msg;
+  struct GNUNET_MULTICAST_Group *grp = cls;
 
   if (GNUNET_YES == grp->is_disconnecting)
     return;
@@ -283,6 +288,8 @@
 
   if (NULL != grp->message_cb)
     grp->message_cb (grp->cb_cls, mmsg);
+
+  grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
 }
 
 
@@ -290,12 +297,10 @@
  * Receive message/request fragment acknowledgement from service.
  */
 static void
-group_recv_fragment_ack (void *cls,
-                         struct GNUNET_CLIENT_MANAGER_Connection *client,
-                         const struct GNUNET_MessageHeader *msg)
+handle_group_fragment_ack (void *cls,
+                           const struct GNUNET_MessageHeader *msg)
 {
-  struct GNUNET_MULTICAST_Group *
-    grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+  struct GNUNET_MULTICAST_Group *grp = cls;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n",
@@ -316,22 +321,32 @@
     origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp);
   else
     member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
+
+  grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
 }
 
+
 /**
- * Origin receives uniquest request from a member.
+ * Check unicast request.
  */
+static int
+check_origin_request (void *cls,
+                      const struct GNUNET_MULTICAST_RequestHeader *req)
+{
+  return GNUNET_OK;
+}
+
+
+/**
+ * Origin receives unicast request from a member.
+ */
 static void
-origin_recv_request (void *cls,
-                     struct GNUNET_CLIENT_MANAGER_Connection *client,
-                     const struct GNUNET_MessageHeader *msg)
+handle_origin_request (void *cls,
+                       const struct GNUNET_MULTICAST_RequestHeader *req)
 {
   struct GNUNET_MULTICAST_Group *grp;
-  struct GNUNET_MULTICAST_Origin *
-    orig = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+  struct GNUNET_MULTICAST_Origin *orig = cls;
   grp = &orig->grp;
-  struct GNUNET_MULTICAST_RequestHeader *
-    req = (struct GNUNET_MULTICAST_RequestHeader *) msg;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Calling request callback with a request of size %u.\n",
@@ -339,6 +354,8 @@
 
   if (NULL != orig->request_cb)
     orig->request_cb (grp->cb_cls, req);
+
+  grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
 }
 
 
@@ -346,14 +363,11 @@
  * Receive multicast replay request from service.
  */
 static void
-group_recv_replay_request (void *cls,
-                           struct GNUNET_CLIENT_MANAGER_Connection *client,
-                           const struct GNUNET_MessageHeader *msg)
+handle_group_replay_request (void *cls,
+                             const struct MulticastReplayRequestMessage *rep)
+
 {
-  struct GNUNET_MULTICAST_Group *
-    grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
-  struct MulticastReplayRequestMessage *
-    rep = (struct MulticastReplayRequestMessage *) msg;
+  struct GNUNET_MULTICAST_Group *grp = cls;
 
   if (GNUNET_YES == grp->is_disconnecting)
     return;
@@ -385,45 +399,72 @@
                           GNUNET_ntohll (rep->flags), rh);
     }
   }
+
+  grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
 }
 
 
 /**
- * Receive multicast replay request from service.
+ * Check replay response.
  */
+static int
+check_member_replay_response (void *cls,
+                              const struct MulticastReplayResponseMessage *res)
+{
+  uint16_t size = ntohs (res->header.size);
+
+  if (sizeof (*res) == size)
+    return GNUNET_OK;
+
+  if (sizeof (*res) + sizeof (struct GNUNET_MULTICAST_MessageHeader) <= size)
+    return GNUNET_OK;
+
+  return GNUNET_SYSERR;
+}
+
+
+/**
+ * Receive replay response from service.
+ */
 static void
-member_recv_replay_response (void *cls,
-                            struct GNUNET_CLIENT_MANAGER_Connection *client,
-                            const struct GNUNET_MessageHeader *msg)
+handle_member_replay_response (void *cls,
+                               const struct MulticastReplayResponseMessage 
*res)
 {
   struct GNUNET_MULTICAST_Group *grp;
-  struct GNUNET_MULTICAST_Member *
-    mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+  struct GNUNET_MULTICAST_Member *mem = cls;
   grp = &mem->grp;
-  // FIXME: Something is missing here for the code to make sense
-  //struct MulticastReplayResponseMessage *
-  //  res = (struct MulticastReplayResponseMessage *) msg;
+
   if (GNUNET_YES == grp->is_disconnecting)
     return;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n");
+
+  // FIXME: return result
 }
 
+
 /**
+ * Check join decision.
+ */
+static int
+check_member_join_decision (void *cls,
+                            const struct MulticastJoinDecisionMessageHeader 
*hdcsn)
+{
+  return GNUNET_OK; // checked in handle below
+}
+
+
+/**
  * Member receives join decision.
  */
 static void
-member_recv_join_decision (void *cls,
-                           struct GNUNET_CLIENT_MANAGER_Connection *client,
-                           const struct GNUNET_MessageHeader *msg)
+handle_member_join_decision (void *cls,
+                             const struct MulticastJoinDecisionMessageHeader 
*hdcsn)
 {
   struct GNUNET_MULTICAST_Group *grp;
-  struct GNUNET_MULTICAST_Member *
-    mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+  struct GNUNET_MULTICAST_Member *mem = cls;
   grp = &mem->grp;
 
-  const struct MulticastJoinDecisionMessageHeader *
-    hdcsn = (const struct MulticastJoinDecisionMessageHeader *) msg;
   const struct MulticastJoinDecisionMessage *
     dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
 
@@ -474,79 +515,15 @@
   // FIXME:
   //if (GNUNET_YES != is_admitted)
   //  GNUNET_MULTICAST_member_part (mem);
+
+  grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
 }
 
 
-/**
- * Message handlers for an origin.
- */
-static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] =
-{
-  { group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
-
-  { group_recv_message, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
-    sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
-
-  { origin_recv_request, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
-    sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES },
-
-  { group_recv_fragment_ack, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
-    sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
-
-  { group_recv_join_request, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
-    sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
-
-  { group_recv_replay_request, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
-    sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
-
-  { NULL, NULL, 0, 0, GNUNET_NO }
-};
-
-
-/**
- * Message handlers for a member.
- */
-static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] =
-{
-  { group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
-
-  { group_recv_message, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
-    sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
-
-  { group_recv_fragment_ack, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
-    sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
-
-  { group_recv_join_request, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
-    sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
-
-  { member_recv_join_decision, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
-    sizeof (struct MulticastJoinDecisionMessage), GNUNET_YES },
-
-  { group_recv_replay_request, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
-    sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
-
-  { member_recv_replay_response, NULL,
-    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
-    sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
-
-  { NULL, NULL, 0, 0, GNUNET_NO }
-};
-
-
 static void
 group_cleanup (struct GNUNET_MULTICAST_Group *grp)
 {
-  GNUNET_free (grp->connect_msg);
+  GNUNET_free (grp->connect_env);
   if (NULL != grp->disconnect_cb)
     grp->disconnect_cb (grp->disconnect_cls);
 }
@@ -609,13 +586,11 @@
   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
   uint16_t relay_size = relay_count * sizeof (*relays);
 
-  struct MulticastJoinDecisionMessageHeader * hdcsn;
+  struct MulticastJoinDecisionMessageHeader *hdcsn;
   struct MulticastJoinDecisionMessage *dcsn;
-  hdcsn = GNUNET_malloc (sizeof (*hdcsn) + sizeof (*dcsn)
-                         + relay_size + join_resp_size);
-  hdcsn->header.size = htons (sizeof (*hdcsn) + sizeof (*dcsn)
-                              + relay_size + join_resp_size);
-  hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
+  struct GNUNET_MQ_Envelope *
+    env = GNUNET_MQ_msg_extra (hdcsn, sizeof (*dcsn) + relay_size + 
join_resp_size,
+                               GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
   hdcsn->member_pub_key = join->member_pub_key;
   hdcsn->peer = join->peer;
 
@@ -629,8 +604,7 @@
   if (0 < join_resp_size)
     GNUNET_memcpy (((char *) &dcsn[1]) + relay_size, join_resp, 
join_resp_size);
 
-  GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header);
-  GNUNET_free (hdcsn);
+  GNUNET_MQ_send (grp->mq, env);
   GNUNET_free (join);
   return NULL;
 }
@@ -653,19 +627,15 @@
                                   enum GNUNET_MULTICAST_ReplayErrorCode ec)
 {
   uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0;
-  struct MulticastReplayResponseMessage *
-    res = GNUNET_malloc (sizeof (*res) + msg_size);
-  *res = (struct MulticastReplayResponseMessage) {
-    .header = {
-      .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE),
-      .size = htons (sizeof (*res) + msg_size),
-    },
-    .fragment_id = rh->req.fragment_id,
-    .message_id = rh->req.message_id,
-    .fragment_offset = rh->req.fragment_offset,
-    .flags = rh->req.flags,
-    .error_code = htonl (ec),
-  };
+  struct MulticastReplayResponseMessage *res;
+  struct GNUNET_MQ_Envelope *
+    env = GNUNET_MQ_msg_extra (res, msg_size,
+                               GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE);
+  res->fragment_id = rh->req.fragment_id;
+  res->message_id = rh->req.message_id;
+  res->fragment_offset = rh->req.fragment_offset;
+  res->flags = rh->req.flags;
+  res->error_code = htonl (ec);
 
   if (GNUNET_MULTICAST_REC_OK == ec)
   {
@@ -673,8 +643,7 @@
     GNUNET_memcpy (&res[1], msg, msg_size);
   }
 
-  GNUNET_CLIENT_MANAGER_transmit (rh->grp->client, &res->header);
-  GNUNET_free (res);
+  GNUNET_MQ_send (rh->grp->mq, env);
 
   if (GNUNET_MULTICAST_REC_OK != ec)
     GNUNET_free (rh);
@@ -692,18 +661,16 @@
 void
 GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh)
 {
-  struct MulticastReplayResponseMessage end = {
-    .header = {
-      .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END),
-      .size = htons (sizeof (end)),
-    },
-    .fragment_id = rh->req.fragment_id,
-    .message_id = rh->req.message_id,
-    .fragment_offset = rh->req.fragment_offset,
-    .flags = rh->req.flags,
-  };
+  struct MulticastReplayResponseMessage *end;
+  struct GNUNET_MQ_Envelope *
+    env = GNUNET_MQ_msg (end, 
GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END);
 
-  GNUNET_CLIENT_MANAGER_transmit (rh->grp->client, &end.header);
+  end->fragment_id = rh->req.fragment_id;
+  end->message_id = rh->req.message_id;
+  end->fragment_offset = rh->req.fragment_offset;
+  end->flags = rh->req.flags;
+
+  GNUNET_MQ_send (rh->grp->mq, env);
   GNUNET_free (rh);
 }
 
@@ -726,7 +693,93 @@
 }
 
 
+void
+origin_connect (struct GNUNET_MULTICAST_Origin *orig);
+
+
+static void
+origin_reconnect (void *cls)
+{
+  origin_connect (cls);
+}
+
+
 /**
+ * Origin client disconnected from service.
+ *
+ * Reconnect after backoff period.=
+ */
+void
+origin_disconnected (void *cls, enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_MULTICAST_Origin *orig = cls;
+  struct GNUNET_MULTICAST_Group *grp = &orig->grp;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Origin client disconnected (%d), re-connecting\n",
+       (int) error);
+  if (NULL != grp->mq)
+  {
+    GNUNET_MQ_destroy (grp->mq);
+    grp->mq = NULL;
+  }
+
+  grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_backoff,
+                                                      &origin_reconnect,
+                                                      orig);
+  grp->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (grp->reconnect_backoff);
+}
+
+
+/**
+ * Connect to service as origin.
+ */
+void
+origin_connect (struct GNUNET_MULTICAST_Origin *orig)
+{
+  struct GNUNET_MULTICAST_Group *grp = &orig->grp;
+
+  GNUNET_MQ_hd_var_size (group_message,
+                         GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+                         struct GNUNET_MULTICAST_MessageHeader);
+
+  GNUNET_MQ_hd_var_size (origin_request,
+                         GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
+                         struct GNUNET_MULTICAST_RequestHeader);
+
+  GNUNET_MQ_hd_fixed_size (group_fragment_ack,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
+                           struct GNUNET_MessageHeader);
+
+  GNUNET_MQ_hd_var_size (group_join_request,
+                         GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
+                         struct MulticastJoinRequestMessage);
+
+  GNUNET_MQ_hd_fixed_size (group_replay_request,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+                           struct MulticastReplayRequestMessage);
+
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    make_group_message_handler (grp),
+    make_origin_request_handler (orig),
+    make_group_fragment_ack_handler (grp),
+    make_group_join_request_handler (grp),
+    make_group_replay_request_handler (grp),
+    GNUNET_MQ_handler_end ()
+  };
+
+  grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast",
+                                   handlers, origin_disconnected, orig);
+  if (NULL == grp->mq)
+  {
+    GNUNET_break (0);
+    return;
+  }
+  GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
+}
+
+
+/**
  * Start a multicast group.
  *
  * Will advertise the origin in the P2P overlay network under the respective
@@ -776,14 +829,13 @@
 {
   struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig));
   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
-  struct MulticastOriginStartMessage *start = GNUNET_malloc (sizeof (*start));
 
-  start->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START);
-  start->header.size = htons (sizeof (*start));
+  struct MulticastOriginStartMessage *start;
+  grp->connect_env = GNUNET_MQ_msg (start,
+                                    
GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START);
   start->max_fragment_id = max_fragment_id;
   GNUNET_memcpy (&start->group_key, priv_key, sizeof (*priv_key));
 
-  grp->connect_msg = (struct GNUNET_MessageHeader *) start;
   grp->is_origin = GNUNET_YES;
   grp->cfg = cfg;
 
@@ -795,10 +847,7 @@
 
   orig->request_cb = request_cb;
 
-  grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", 
origin_handlers);
-  GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, orig, sizeof (*grp));
-  group_send_connect_msg (grp);
-
+  origin_connect (orig);
   return orig;
 }
 
@@ -820,8 +869,13 @@
   grp->disconnect_cb = stop_cb;
   grp->disconnect_cls = stop_cls;
 
-  GNUNET_CLIENT_MANAGER_disconnect (orig->grp.client, GNUNET_YES,
-                                    &origin_cleanup, orig);
+  // FIXME: wait till queued messages are sent
+  if (NULL != grp->mq)
+  {
+    GNUNET_MQ_destroy (grp->mq);
+    grp->mq = NULL;
+  }
+  origin_cleanup (orig);
 }
 
 
@@ -834,7 +888,11 @@
   GNUNET_assert (GNUNET_YES == grp->in_transmit);
 
   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
-  struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size);
+  struct GNUNET_MULTICAST_MessageHeader *msg;
+  struct GNUNET_MQ_Envelope *
+    env = GNUNET_MQ_msg_extra (msg, buf_size - sizeof(*msg),
+                               GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
+
   int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]);
 
   if (! (GNUNET_YES == ret || GNUNET_NO == ret)
@@ -844,7 +902,7 @@
          "%p OriginTransmitNotify() returned error or invalid message size.\n",
          orig);
     /* FIXME: handle error */
-    GNUNET_free (msg);
+    GNUNET_free (env);
     return;
   }
 
@@ -852,11 +910,10 @@
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "%p OriginTransmitNotify() - transmission paused.\n", orig);
-    GNUNET_free (msg);
+    GNUNET_free (env);
     return; /* Transmission paused. */
   }
 
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
   msg->header.size = htons (sizeof (*msg) + buf_size);
   msg->message_id = GNUNET_htonll (tmit->message_id);
   msg->group_generation = tmit->group_generation;
@@ -864,8 +921,7 @@
   tmit->fragment_offset += sizeof (*msg) + buf_size;
 
   grp->acks_pending++;
-  GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header);
-  GNUNET_free (msg);
+  GNUNET_MQ_send (grp->mq, env);
 
   if (GNUNET_YES == ret)
     grp->in_transmit = GNUNET_NO;
@@ -944,7 +1000,95 @@
 }
 
 
+ void
+member_connect (struct GNUNET_MULTICAST_Member *mem);
+
+
+static void
+member_reconnect (void *cls)
+{
+  member_connect (cls);
+}
+
+
 /**
+ * Member client disconnected from service.
+ *
+ * Reconnect after backoff period.
+ */
+void
+member_disconnected (void *cls, enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_MULTICAST_Member *mem = cls;
+  struct GNUNET_MULTICAST_Group *grp = &mem->grp;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Member client disconnected (%d), re-connecting\n",
+       (int) error);
+  GNUNET_MQ_destroy (grp->mq);
+  grp->mq = NULL;
+
+  grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_backoff,
+                                                      &member_reconnect,
+                                                      mem);
+  grp->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (grp->reconnect_backoff);
+}
+
+
+/**
+ * Connect to service as member.
+ */
+void
+member_connect (struct GNUNET_MULTICAST_Member *mem)
+{
+  struct GNUNET_MULTICAST_Group *grp = &mem->grp;
+
+  GNUNET_MQ_hd_var_size (group_message,
+                         GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+                         struct GNUNET_MULTICAST_MessageHeader);
+
+  GNUNET_MQ_hd_fixed_size (group_fragment_ack,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
+                           struct GNUNET_MessageHeader);
+
+  GNUNET_MQ_hd_var_size (group_join_request,
+                         GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
+                         struct MulticastJoinRequestMessage);
+
+  GNUNET_MQ_hd_var_size (member_join_decision,
+                         GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
+                         struct MulticastJoinDecisionMessageHeader);
+
+  GNUNET_MQ_hd_fixed_size (group_replay_request,
+                           GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+                           struct MulticastReplayRequestMessage);
+
+  GNUNET_MQ_hd_var_size (member_replay_response,
+                         GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
+                         struct MulticastReplayResponseMessage);
+
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    make_group_message_handler (grp),
+    make_group_fragment_ack_handler (grp),
+    make_group_join_request_handler (grp),
+    make_member_join_decision_handler (mem),
+    make_group_replay_request_handler (grp),
+    make_member_replay_response_handler (mem),
+    GNUNET_MQ_handler_end ()
+  };
+
+  grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast",
+                                   handlers, member_disconnected, mem);
+  if (NULL == grp->mq)
+  {
+    GNUNET_break (0);
+    return;
+  }
+  GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
+}
+
+
+/**
  * Join a multicast group.
  *
  * The entity joining is always the local peer.  Further information about the
@@ -1015,10 +1159,9 @@
 
   uint16_t relay_size = relay_count * sizeof (*relays);
   uint16_t join_msg_size = (NULL != join_msg) ? ntohs (join_msg->size) : 0;
-  struct MulticastMemberJoinMessage *
-    join = GNUNET_malloc (sizeof (*join) + relay_size + join_msg_size);
-  join->header.size = htons (sizeof (*join) + relay_size + join_msg_size);
-  join->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
+  struct MulticastMemberJoinMessage *join;
+  grp->connect_env = GNUNET_MQ_msg_extra (join, relay_size + join_msg_size,
+                                          
GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
   join->group_pub_key = *group_pub_key;
   join->member_key = *member_key;
   join->origin = *origin;
@@ -1028,7 +1171,7 @@
   if (0 < join_msg_size)
     GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
 
-  grp->connect_msg = (struct GNUNET_MessageHeader *) join;
+  grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
   grp->is_origin = GNUNET_NO;
   grp->cfg = cfg;
 
@@ -1039,10 +1182,7 @@
   grp->message_cb = message_cb;
   grp->cb_cls = cls;
 
-  grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", 
member_handlers);
-  GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, mem, sizeof (*grp));
-  group_send_connect_msg (grp);
-
+  member_connect (mem);
   return mem;
 }
 
@@ -1076,8 +1216,13 @@
   grp->replay_msg_cb = NULL;
   grp->replay_frag_cb = NULL;
 
-  GNUNET_CLIENT_MANAGER_disconnect (mem->grp.client, GNUNET_YES,
-                                    member_cleanup, mem);
+  // FIXME: wait till queued messages are sent
+  if (NULL != grp->mq)
+  {
+    GNUNET_MQ_destroy (grp->mq);
+    grp->mq = NULL;
+  }
+  member_cleanup (mem);
 }
 
 
@@ -1088,17 +1233,16 @@
                        uint64_t fragment_offset,
                        uint64_t flags)
 {
-  struct MulticastReplayRequestMessage rep = {
-    .header = {
-      .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST),
-      .size = htons (sizeof (rep)),
-    },
-    .fragment_id = GNUNET_htonll (fragment_id),
-    .message_id = GNUNET_htonll (message_id),
-    .fragment_offset = GNUNET_htonll (fragment_offset),
-    .flags = GNUNET_htonll (flags),
-  };
-  GNUNET_CLIENT_MANAGER_transmit (mem->grp.client, &rep.header);
+  struct MulticastReplayRequestMessage *rep;
+  struct GNUNET_MQ_Envelope *
+    env = GNUNET_MQ_msg (rep, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST);
+
+  rep->fragment_id = GNUNET_htonll (fragment_id);
+  rep->message_id = GNUNET_htonll (message_id);
+  rep->fragment_offset = GNUNET_htonll (fragment_offset);
+  rep->flags = GNUNET_htonll (flags);
+
+  GNUNET_MQ_send (mem->grp.mq, env);
 }
 
 
@@ -1168,7 +1312,11 @@
   GNUNET_assert (GNUNET_YES == grp->in_transmit);
 
   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
-  struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size);
+  struct GNUNET_MULTICAST_RequestHeader *req;
+  struct GNUNET_MQ_Envelope *
+    env = GNUNET_MQ_msg_extra (req, buf_size - sizeof(*req),
+                               GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
+
   int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]);
 
   if (! (GNUNET_YES == ret || GNUNET_NO == ret)
@@ -1189,14 +1337,12 @@
     return;
   }
 
-  req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
   req->header.size = htons (sizeof (*req) + buf_size);
   req->request_id = GNUNET_htonll (tmit->request_id);
   req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset);
   tmit->fragment_offset += sizeof (*req) + buf_size;
 
-  GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header);
-  GNUNET_free (req);
+  GNUNET_MQ_send (grp->mq, env);
 
   if (GNUNET_YES == ret)
     grp->in_transmit = GNUNET_NO;




reply via email to

[Prev in Thread] Current Thread [Next in Thread]