gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r33258 - in gnunet/src: include multicast psyc


From: gnunet
Subject: [GNUnet-SVN] r33258 - in gnunet/src: include multicast psyc
Date: Tue, 13 May 2014 14:08:14 +0200

Author: tg
Date: 2014-05-13 14:08:14 +0200 (Tue, 13 May 2014)
New Revision: 33258

Modified:
   gnunet/src/include/gnunet_multicast_service.h
   gnunet/src/include/gnunet_protocols.h
   gnunet/src/multicast/Makefile.am
   gnunet/src/multicast/gnunet-service-multicast.c
   gnunet/src/multicast/multicast.h
   gnunet/src/multicast/multicast_api.c
   gnunet/src/psyc/Makefile.am
   gnunet/src/psyc/gnunet-service-psyc.c
   gnunet/src/psyc/psyc_api.c
   gnunet/src/psyc/test_psyc.conf
Log:
multicast: send messages between client lib & service

Modified: gnunet/src/include/gnunet_multicast_service.h
===================================================================
--- gnunet/src/include/gnunet_multicast_service.h       2014-05-13 12:06:08 UTC 
(rev 33257)
+++ gnunet/src/include/gnunet_multicast_service.h       2014-05-13 12:08:14 UTC 
(rev 33258)
@@ -598,7 +598,7 @@
  * Handle for a request to send a message to all multicast group members
  * (from the origin).
  */
-struct GNUNET_MULTICAST_OriginMessageHandle;
+struct GNUNET_MULTICAST_OriginTransmitHandle;
 
 
 /**
@@ -612,7 +612,7 @@
  * @param notify_cls Closure for @a notify.
  * @return NULL on error (i.e. request already pending).
  */
-struct GNUNET_MULTICAST_OriginMessageHandle *
+struct GNUNET_MULTICAST_OriginTransmitHandle *
 GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *origin,
                                 uint64_t message_id,
                                 uint64_t group_generation,
@@ -624,19 +624,19 @@
 /**
  * Resume message transmission to multicast group.
  *
- * @param mh Request to cancel.
+ * @param th  Transmission to cancel.
  */
 void
-GNUNET_MULTICAST_origin_to_all_resume (struct 
GNUNET_MULTICAST_OriginMessageHandle *mh);
+GNUNET_MULTICAST_origin_to_all_resume (struct 
GNUNET_MULTICAST_OriginTransmitHandle *th);
 
 
 /**
  * Cancel request for message transmission to multicast group.
  *
- * @param mh Request to cancel.
+ * @param th  Transmission to cancel.
  */
 void
-GNUNET_MULTICAST_origin_to_all_cancel (struct 
GNUNET_MULTICAST_OriginMessageHandle *mh);
+GNUNET_MULTICAST_origin_to_all_cancel (struct 
GNUNET_MULTICAST_OriginTransmitHandle *th);
 
 
 /**
@@ -788,6 +788,7 @@
  *        @a data, should be set to the number of bytes written to data.
  * @param[out] data Where to write the body of the message to give to the
  *         method. The function must copy at most @a data_size bytes to @a 
data.
+ *
  * @return #GNUNET_SYSERR on error (fatal, aborts transmission)
  *         #GNUNET_NO on success, if more data is to be transmitted later.
  *         Should be used if @a data_size was not big enough to take all the
@@ -804,7 +805,7 @@
 /**
  * Handle for a message to be delivered from a member to the origin.
  */
-struct GNUNET_MULTICAST_MemberRequestHandle;
+struct GNUNET_MULTICAST_MemberTransmitHandle;
 
 
 /**
@@ -814,9 +815,10 @@
  * @param request_id Application layer ID for the request.  Opaque to 
multicast.
  * @param notify Callback to call to get the message.
  * @param notify_cls Closure for @a notify.
+ *
  * @return Handle to cancel request, NULL on error (i.e. request already 
pending).
  */
-struct GNUNET_MULTICAST_MemberRequestHandle *
+struct GNUNET_MULTICAST_MemberTransmitHandle *
 GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *member,
                                    uint64_t request_id,
                                    GNUNET_MULTICAST_MemberTransmitNotify 
notify,
@@ -826,19 +828,19 @@
 /**
  * Resume message transmission to origin.
  *
- * @param rh Request to cancel.
+ * @param th  Transmission to cancel.
  */
 void
-GNUNET_MULTICAST_member_to_origin_resume (struct 
GNUNET_MULTICAST_MemberRequestHandle *rh);
+GNUNET_MULTICAST_member_to_origin_resume (struct 
GNUNET_MULTICAST_MemberTransmitHandle *th);
 
 
 /**
  * Cancel request for message transmission to origin.
  *
- * @param rh Request to cancel.
+ * @param th  Transmission to cancel.
  */
 void
-GNUNET_MULTICAST_member_to_origin_cancel (struct 
GNUNET_MULTICAST_MemberRequestHandle *rh);
+GNUNET_MULTICAST_member_to_origin_cancel (struct 
GNUNET_MULTICAST_MemberTransmitHandle *th);
 
 
 #if 0                           /* keep Emacsens' auto-indent happy */

Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h       2014-05-13 12:06:08 UTC (rev 
33257)
+++ gnunet/src/include/gnunet_protocols.h       2014-05-13 12:08:14 UTC (rev 
33258)
@@ -2326,48 +2326,51 @@
  * MULTICAST message types
  
******************************************************************************/
 
+/**
+ * C: client
+ * S: service
+ * T: cadet
+ */
 
-/* WIP: no numbers assigned yet */
-
 /**
- * Start an origin.
+ * C->S: Start the origin.
  */
 #define GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START 750
 
 /**
- * Stop an origin.
+ * C->S: Stop the origin.
  */
 #define GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_STOP 751
 
 /**
- * Join a group as a member.
+ * C->S: Join group as a member.
  */
 #define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN 752
 
 /**
- * Leave a group.
+ * C->S: Part the group.
  */
 #define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_PART 753
 
 /**
- * Multicast message from the origin to all members.
+ * C<->S<->T: Multicast message from the origin to all members.
  */
 #define GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE 754
 
 /**
- * A unicast message from a group member to the origin.
+ * C<->S<->T: Unicast request from a group member to the origin.
  */
 #define GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST 755
 
 /**
- * A peer wants to join the group.
+ * C<--S<->T: A peer wants to join the group.
  *
  * Unicast message to the origin or another group member.
  */
 #define GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST
 
 /**
- * Response to a join request.
+ * C<->S<->T: Response to a join request.
  *
  * Unicast message from a group member to the peer wanting to join.
  */

Modified: gnunet/src/multicast/Makefile.am
===================================================================
--- gnunet/src/multicast/Makefile.am    2014-05-13 12:06:08 UTC (rev 33257)
+++ gnunet/src/multicast/Makefile.am    2014-05-13 12:08:14 UTC (rev 33258)
@@ -39,12 +39,18 @@
 gnunet_multicast_LDADD = \
   $(top_builddir)/src/util/libgnunetutil.la \
   $(GN_LIBINTL)
+gnunet_multicast_DEPENDENCIES = \
+  $(top_builddir)/src/util/libgnunetutil.la
 
 gnunet_service_multicast_SOURCES = \
  gnunet-service-multicast.c
 gnunet_service_multicast_LDADD = \
   $(top_builddir)/src/util/libgnunetutil.la \
+  $(top_builddir)/src/statistics/libgnunetstatistics.la \
   $(GN_LIBINTL)
+gnunet_service_multicast_DEPENDENCIES = \
+  $(top_builddir)/src/util/libgnunetutil.la \
+  $(top_builddir)/src/statistics/libgnunetstatistics.la
 
 
 check_PROGRAMS = \

Modified: gnunet/src/multicast/gnunet-service-multicast.c
===================================================================
--- gnunet/src/multicast/gnunet-service-multicast.c     2014-05-13 12:06:08 UTC 
(rev 33257)
+++ gnunet/src/multicast/gnunet-service-multicast.c     2014-05-13 12:08:14 UTC 
(rev 33258)
@@ -25,9 +25,105 @@
  */
 #include "platform.h"
 #include "gnunet_util_lib.h"
+#include "gnunet_signatures.h"
+#include "gnunet_statistics_service.h"
+#include "gnunet_multicast_service.h"
+#include "multicast.h"
 
+/**
+ * Handle to our current configuration.
+ */
+static const struct GNUNET_CONFIGURATION_Handle *cfg;
 
 /**
+ * Handle to the statistics service.
+ */
+static struct GNUNET_STATISTICS_Handle *stats;
+/**
+ * Notification context, simplifies client broadcasts.
+ */
+static struct GNUNET_SERVER_NotificationContext *nc;
+
+/**
+ * All connected origins.
+ * Group's pub_key_hash -> struct Group
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *origins;
+
+/**
+ * All connected members.
+ * Group's pub_key_hash -> struct Group
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *members;
+
+/**
+ * Common part of the client context for both an origin and member.
+ */
+struct Group
+{
+  struct GNUNET_SERVER_Client *client;
+
+  /**
+   * Public key of the group.
+   */
+  struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
+
+  /**
+   * Hash of @a pub_key.
+   */
+  struct GNUNET_HashCode pub_key_hash;
+
+  /**
+   * Is this an origin (#GNUNET_YES), or member (#GNUNET_NO)?
+   */
+  uint8_t is_origin;
+
+  /**
+   * Is the client disconnected? #GNUNET_YES or #GNUNET_NO
+   */
+  uint8_t disconnected;
+};
+
+
+/**
+ * Client context for a group's origin.
+ */
+struct Origin
+{
+  struct Group grp;
+
+  /**
+   * Private key of the group.
+   */
+  struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
+
+  /**
+   * Last message fragment ID sent to the group.
+   */
+  uint64_t max_fragment_id;
+};
+
+
+/**
+ * Client context for a group member.
+ */
+struct Member
+{
+  struct Group grp;
+
+  /**
+   * Private key of the member.
+   */
+  struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
+
+  /**
+   * Last request fragment ID sent to the origin.
+   */
+  uint64_t max_fragment_id;
+};
+
+
+/**
  * Task run during shutdown.
  *
  * @param cls unused
@@ -41,13 +137,86 @@
 
 
 /**
+ * Iterator callback for sending a message to clients.
+ */
+static int
+message_callback (void *cls, const struct GNUNET_HashCode *pub_key_hash,
+                  void *group)
+{
+  const struct GNUNET_MessageHeader *msg = cls;
+  struct Group *grp = group;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+              "%p Sending message to client.\n", grp);
+
+  GNUNET_SERVER_notification_context_add (nc, grp->client);
+  GNUNET_SERVER_notification_context_unicast (nc, grp->client, msg, GNUNET_NO);
+
+  return GNUNET_YES;
+}
+
+
+/**
+ * Send message to all origin and member clients connected to the group.
+ *
+ * @param grp  The group to send @a msg to.
+ * @param msg  Message to send.
+ */
+static void
+message_to_group (struct Group *grp, const struct GNUNET_MessageHeader *msg)
+{
+  if (origins != NULL)
+    GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
+                                                message_callback, (void *) 
msg);
+  if (members != NULL)
+    GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
+                                                message_callback, (void *) 
msg);
+}
+
+
+/**
+ * Send message to all origin clients connected to the group.
+ *
+ * @param grp  The group to send @a msg to.
+ * @param msg  Message to send.
+ */
+static void
+message_to_origin (struct Group *grp, const struct GNUNET_MessageHeader *msg)
+{
+  if (origins != NULL)
+    GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
+                                                message_callback, (void *) 
msg);
+}
+
+
+/**
  * Handle a connecting client starting an origin.
  */
 static void
 handle_origin_start (void *cls, struct GNUNET_SERVER_Client *client,
-                     const struct GNUNET_MessageHeader *msg)
+                     const struct GNUNET_MessageHeader *m)
 {
+  const struct MulticastOriginStartMessage *
+    msg = (const struct MulticastOriginStartMessage *) m;
 
+  struct Origin *orig = GNUNET_new (struct Origin);
+  orig->priv_key = msg->group_key;
+
+  struct Group *grp = &orig->grp;
+  grp->is_origin = GNUNET_YES;
+  grp->client = client;
+
+  GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &grp->pub_key);
+  GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), 
&grp->pub_key_hash);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Client connected as origin to group %s.\n",
+              orig, GNUNET_h2s (&grp->pub_key_hash));
+
+  GNUNET_SERVER_client_set_user_context (client, grp);
+  GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
+                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
 
@@ -58,7 +227,6 @@
 handle_origin_stop (void *cls, struct GNUNET_SERVER_Client *client,
                     const struct GNUNET_MessageHeader *msg)
 {
-
 }
 
 
@@ -67,9 +235,28 @@
  */
 static void
 handle_member_join (void *cls, struct GNUNET_SERVER_Client *client,
-                     const struct GNUNET_MessageHeader *msg)
+                     const struct GNUNET_MessageHeader *m)
 {
+  struct MulticastMemberJoinMessage *
+    msg = (struct MulticastMemberJoinMessage *) m;
 
+  struct Member *mem = GNUNET_new (struct Member);
+  mem->priv_key = msg->member_key;
+
+  struct Group *grp = &mem->grp;
+  grp->is_origin = GNUNET_NO;
+  grp->client = client;
+  grp->pub_key = msg->group_key;
+  GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), 
&grp->pub_key_hash);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Client connected as member to group %s.\n",
+              mem, GNUNET_h2s (&grp->pub_key_hash));
+
+  GNUNET_SERVER_client_set_user_context (client, grp);
+  GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
+                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
 
@@ -89,9 +276,33 @@
  */
 static void
 handle_multicast_message (void *cls, struct GNUNET_SERVER_Client *client,
-                         const struct GNUNET_MessageHeader *msg)
+                          const struct GNUNET_MessageHeader *m)
 {
+  struct Group *
+    grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
+  GNUNET_assert (GNUNET_YES == grp->is_origin);
+  struct Origin *orig = (struct Origin *) grp;
+  struct GNUNET_MULTICAST_MessageHeader *
+    msg = (struct GNUNET_MULTICAST_MessageHeader *) m;
 
+  msg->fragment_id = GNUNET_htonll (orig->max_fragment_id++);
+  msg->purpose.size = htonl (sizeof (*msg) + ntohs (m->size)
+                             - sizeof (msg->header)
+                             - sizeof (msg->hop_counter)
+                             - sizeof (msg->signature));
+  msg->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
+
+  if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&orig->priv_key, &msg->purpose,
+                                             &msg->signature))
+  {
+    /* FIXME: handle error */
+    return;
+  }
+
+  /* FIXME: send to remote members */
+
+  message_to_group (grp, m);
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
 
@@ -100,9 +311,35 @@
  */
 static void
 handle_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
-                         const struct GNUNET_MessageHeader *msg)
+                          const struct GNUNET_MessageHeader *m)
 {
+  struct Group *
+    grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
+  GNUNET_assert (GNUNET_NO == grp->is_origin);
+  struct Member *mem = (struct Member *) grp;
 
+  struct GNUNET_MULTICAST_RequestHeader *
+    req = (struct GNUNET_MULTICAST_RequestHeader *) m;
+
+  req->fragment_id = GNUNET_ntohll (mem->max_fragment_id++);
+
+  req->purpose.size = htonl (sizeof (*req) + ntohs (m->size)
+                             - sizeof (req->header)
+                             - sizeof (req->member_key)
+                             - sizeof (req->signature));
+  req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
+
+  if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&mem->priv_key, &req->purpose,
+                                             &req->signature))
+  {
+    /* FIXME: handle error */
+    return;
+  }
+
+  /* FIXME: send to remote origin */
+
+  message_to_origin (grp, m);
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
 /**
@@ -114,7 +351,7 @@
  */
 static void
 run (void *cls, struct GNUNET_SERVER_Handle *server,
-     const struct GNUNET_CONFIGURATION_Handle *cfg)
+     const struct GNUNET_CONFIGURATION_Handle *c)
 {
   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
     { &handle_origin_start, NULL,
@@ -137,7 +374,13 @@
 
     {NULL, NULL, 0, 0}
   };
-  /* FIXME: do setup here */
+
+  cfg = c;
+  stats = GNUNET_STATISTICS_create ("multicast", cfg);
+  origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+  members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+  nc = GNUNET_SERVER_notification_context_create (server, 1);
+
   GNUNET_SERVER_add_handlers (server, handlers);
   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &cleanup_task,
                                 NULL);

Modified: gnunet/src/multicast/multicast.h
===================================================================
--- gnunet/src/multicast/multicast.h    2014-05-13 12:06:08 UTC (rev 33257)
+++ gnunet/src/multicast/multicast.h    2014-05-13 12:08:14 UTC (rev 33258)
@@ -183,9 +183,8 @@
  */
 struct MulticastOriginStartMessage
 {
-
   /**
-   *
+   * Type: GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START
    */
   struct GNUNET_MessageHeader header;
 
@@ -195,19 +194,39 @@
   uint32_t reserved;
 
   /**
-   * Private, non-ephemeral key for the mutlicast group.
+   * Private, non-ephemeral key for the multicast group.
    */
   struct GNUNET_CRYPTO_EddsaPrivateKey group_key;
 
   /**
-   * Last fragment ID, used to continue counting fragments if we resume 
operating
-   * a group.
+   * Last fragment ID sent to the group, used to continue counting fragments if
+   * we resume operating * a group.
    */
-  uint64_t last_fragment_id;
+  uint64_t max_fragment_id;
 
 };
 
 
+struct MulticastMemberJoinMessage
+{
+  /**
+   * Type: GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN
+   */
+  struct GNUNET_MessageHeader header;
+
+  uint32_t relay_count GNUNET_PACKED;
+
+  struct GNUNET_CRYPTO_EddsaPublicKey group_key;
+
+  struct GNUNET_CRYPTO_EddsaPrivateKey member_key;
+
+  struct GNUNET_PeerIdentity origin;
+
+  /* Followed by struct GNUNET_PeerIdentity relays[relay_count] */
+  /* Followed by struct GNUNET_MessageHeader join_request */
+};
+
+
 /**
  * Message sent from the client to the service to broadcast to all group
  * members.

Modified: gnunet/src/multicast/multicast_api.c
===================================================================
--- gnunet/src/multicast/multicast_api.c        2014-05-13 12:06:08 UTC (rev 
33257)
+++ gnunet/src/multicast/multicast_api.c        2014-05-13 12:08:14 UTC (rev 
33258)
@@ -26,7 +26,6 @@
  */
 #include "platform.h"
 #include "gnunet_util_lib.h"
-#include "gnunet_signatures.h"
 #include "gnunet_multicast_service.h"
 #include "multicast.h"
 
@@ -46,11 +45,18 @@
 static struct GNUNET_CONTAINER_MultiHashMap *members;
 
 
+struct MessageQueue
+{
+  struct MessageQueue *prev;
+  struct MessageQueue *next;
+};
+
+
 /**
  * Handle for a request to send a message to all multicast group members
  * (from the origin).
  */
-struct GNUNET_MULTICAST_OriginMessageHandle
+struct GNUNET_MULTICAST_OriginTransmitHandle
 {
   GNUNET_MULTICAST_OriginTransmitNotify notify;
   void *notify_cls;
@@ -62,47 +68,104 @@
 };
 
 
-struct GNUNET_MULTICAST_Group
+/**
+ * Handle for a message to be delivered from a member to the origin.
+ */
+struct GNUNET_MULTICAST_MemberTransmitHandle
 {
-  uint8_t is_origin;
+  GNUNET_MULTICAST_MemberTransmitNotify notify;
+  void *notify_cls;
+  struct GNUNET_MULTICAST_Member *member;
+
+  uint64_t request_id;
+  uint64_t fragment_offset;
 };
 
-/**
- * Handle for the origin of a multicast group.
- */
-struct GNUNET_MULTICAST_Origin
+
+struct GNUNET_MULTICAST_Group
 {
-  struct GNUNET_MULTICAST_Group grp;
+  /**
+   * Configuration to use.
+   */
+  const struct GNUNET_CONFIGURATION_Handle *cfg;
 
-  struct GNUNET_MULTICAST_OriginMessageHandle msg_handle;
-  struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
+  /**
+   * Socket (if available).
+   */
+  struct GNUNET_CLIENT_Connection *client;
 
+  /**
+   * Currently pending transmission request, or NULL for none.
+   */
+  struct GNUNET_CLIENT_TransmitHandle *th;
+
+  /**
+   * Head of operations to transmit.
+   */
+  struct MessageQueue *tmit_head;
+
+  /**
+   * Tail of operations to transmit.
+   */
+  struct MessageQueue *tmit_tail;
+
+  /**
+   * Message being transmitted to the Multicast service.
+   */
+  struct MessageQueue *tmit_msg;
+
+  /**
+   * Message to send on reconnect.
+   */
+  struct GNUNET_MessageHeader *reconnect_msg;
+
+  /**
+   * Task doing exponential back-off trying to reconnect.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
+
+  /**
+   * Time for next connect retry.
+   */
+  struct GNUNET_TIME_Relative reconnect_delay;
+
+  struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
+  struct GNUNET_HashCode pub_key_hash;
+
   GNUNET_MULTICAST_JoinCallback join_cb;
-  GNUNET_MULTICAST_MembershipTestCallback mem_test_cb;
+  GNUNET_MULTICAST_MembershipTestCallback member_test_cb;
   GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
   GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
-  GNUNET_MULTICAST_RequestCallback request_cb;
   GNUNET_MULTICAST_MessageCallback message_cb;
-  void *cls;
+  void *cb_cls;
 
-  uint64_t next_fragment_id;
+  /**
+   * Are we polling for incoming messages right now?
+   */
+  uint8_t in_receive;
 
-  struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
-  struct GNUNET_HashCode pub_key_hash;
+  /**
+   * Are we currently transmitting a message?
+   */
+  uint8_t in_transmit;
+
+  /**
+   * Is this the origin or a member?
+   */
+  uint8_t is_origin;
 };
 
 
 /**
- * Handle for a message to be delivered from a member to the origin.
+ * Handle for the origin of a multicast group.
  */
-struct GNUNET_MULTICAST_MemberRequestHandle
+struct GNUNET_MULTICAST_Origin
 {
-  GNUNET_MULTICAST_MemberTransmitNotify notify;
-  void *notify_cls;
-  struct GNUNET_MULTICAST_Member *member;
+  struct GNUNET_MULTICAST_Group grp;
+  struct GNUNET_MULTICAST_OriginTransmitHandle tmit;
+  struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
 
-  uint64_t request_id;
-  uint64_t fragment_offset;
+  GNUNET_MULTICAST_RequestCallback request_cb;
 };
 
 
@@ -112,24 +175,16 @@
 struct GNUNET_MULTICAST_Member
 {
   struct GNUNET_MULTICAST_Group grp;
+  struct GNUNET_MULTICAST_MemberTransmitHandle tmit;
 
-  struct GNUNET_MULTICAST_MemberRequestHandle req_handle;
-
-  struct GNUNET_CRYPTO_EddsaPublicKey group_key;
-  struct GNUNET_CRYPTO_EddsaPrivateKey member_key;
+  struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
   struct GNUNET_PeerIdentity origin;
   struct GNUNET_PeerIdentity relays;
   uint32_t relay_count;
+
   struct GNUNET_MessageHeader *join_request;
-  GNUNET_MULTICAST_JoinCallback join_cb;
-  GNUNET_MULTICAST_MembershipTestCallback member_test_cb;
-  GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
-  GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
-  GNUNET_MULTICAST_MessageCallback message_cb;
-  void *cls;
 
   uint64_t next_fragment_id;
-  struct GNUNET_HashCode group_key_hash;
 };
 
 
@@ -168,35 +223,234 @@
 };
 
 
+static void
+reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+static void
+reschedule_connect (struct GNUNET_MULTICAST_Group *grp);
+
+
 /**
+ * Schedule transmission of the next message from our queue.
+ *
+ * @param grp PSYC channel handle
+ */
+static void
+transmit_next (struct GNUNET_MULTICAST_Group *grp);
+
+
+static void
+message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
+
+
+/**
+ * Reschedule a connect attempt to the service.
+ *
+ * @param c channel to reconnect
+ */
+static void
+reschedule_connect (struct GNUNET_MULTICAST_Group *grp)
+{
+  GNUNET_assert (grp->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
+
+  if (NULL != grp->th)
+  {
+    GNUNET_CLIENT_notify_transmit_ready_cancel (grp->th);
+    grp->th = NULL;
+  }
+  if (NULL != grp->client)
+  {
+    GNUNET_CLIENT_disconnect (grp->client);
+    grp->client = NULL;
+  }
+  grp->in_receive = GNUNET_NO;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Scheduling task to reconnect to Multicast service in %s.\n",
+       GNUNET_STRINGS_relative_time_to_string (grp->reconnect_delay, 
GNUNET_YES));
+  grp->reconnect_task =
+      GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, &reconnect, grp);
+  grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
+}
+
+
+/**
+ * Reset stored data related to the last received message.
+ */
+static void
+recv_reset (struct GNUNET_MULTICAST_Group *grp)
+{
+}
+
+
+static void
+recv_error (struct GNUNET_MULTICAST_Group *grp)
+{
+  if (NULL != grp->message_cb)
+    grp->message_cb (grp->cb_cls, NULL);
+
+  recv_reset (grp);
+}
+
+
+/**
+ * Transmit next message to service.
+ *
+ * @param cls  The struct GNUNET_MULTICAST_Group.
+ * @param size Number of bytes available in @a buf.
+ * @param buf  Where to copy the message.
+ *
+ * @return Number of bytes copied to @a buf.
+ */
+static size_t
+send_next_message (void *cls, size_t size, void *buf)
+{
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n");
+  struct GNUNET_MULTICAST_Group *grp = cls;
+  struct MessageQueue *mq = grp->tmit_head;
+  if (NULL == mq)
+    return 0;
+  struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
+  size_t ret = ntohs (qmsg->size);
+  grp->th = NULL;
+  if (ret > size)
+  {
+    reschedule_connect (grp);
+    return 0;
+  }
+  memcpy (buf, qmsg, ret);
+
+  GNUNET_CONTAINER_DLL_remove (grp->tmit_head, grp->tmit_tail, mq);
+  GNUNET_free (mq);
+
+  if (NULL != grp->tmit_head)
+    transmit_next (grp);
+
+  if (GNUNET_NO == grp->in_receive)
+  {
+    grp->in_receive = GNUNET_YES;
+    GNUNET_CLIENT_receive (grp->client, &message_handler, grp,
+                           GNUNET_TIME_UNIT_FOREVER_REL);
+  }
+  return ret;
+}
+
+
+/**
+ * Schedule transmission of the next message from our queue.
+ *
+ * @param grp  Multicast group handle.
+ */
+static void
+transmit_next (struct GNUNET_MULTICAST_Group *grp)
+{
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n");
+  if (NULL != grp->th || NULL == grp->client)
+    return;
+
+  struct MessageQueue *mq = grp->tmit_head;
+  if (NULL == mq)
+    return;
+  struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
+
+  grp->th = GNUNET_CLIENT_notify_transmit_ready (grp->client,
+                                                 ntohs (qmsg->size),
+                                                 GNUNET_TIME_UNIT_FOREVER_REL,
+                                                 GNUNET_NO,
+                                                 &send_next_message,
+                                                 grp);
+}
+
+
+/**
+ * Try again to connect to the Multicast service.
+ *
+ * @param cls Channel handle.
+ * @param tc Scheduler context.
+ */
+static void
+reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct GNUNET_MULTICAST_Group *grp = cls;
+
+  recv_reset (grp);
+  grp->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Connecting to Multicast service.\n");
+  GNUNET_assert (NULL == grp->client);
+  grp->client = GNUNET_CLIENT_connect ("multicast", grp->cfg);
+  GNUNET_assert (NULL != grp->client);
+  uint16_t reconn_size = ntohs (grp->reconnect_msg->size);
+
+  if (NULL == grp->tmit_head ||
+      0 != memcmp (&grp->tmit_head[1], grp->reconnect_msg, reconn_size))
+  {
+    struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size);
+    memcpy (&mq[1], grp->reconnect_msg, reconn_size);
+    GNUNET_CONTAINER_DLL_insert (grp->tmit_head, grp->tmit_tail, mq);
+  }
+  transmit_next (grp);
+}
+
+
+/**
+ * Disconnect from the Multicast service.
+ *
+ * @param g  Group handle to disconnect.
+ */
+static void
+disconnect (void *g)
+{
+  struct GNUNET_MULTICAST_Group *grp = g;
+
+  GNUNET_assert (NULL != grp);
+  if (grp->tmit_head != grp->tmit_tail)
+  {
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+         "Disconnecting while there are still outstanding messages!\n");
+    GNUNET_break (0);
+  }
+  if (grp->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
+  {
+    GNUNET_SCHEDULER_cancel (grp->reconnect_task);
+    grp->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
+  }
+  if (NULL != grp->th)
+  {
+    GNUNET_CLIENT_notify_transmit_ready_cancel (grp->th);
+    grp->th = NULL;
+  }
+  if (NULL != grp->client)
+  {
+    GNUNET_CLIENT_disconnect (grp->client);
+    grp->client = NULL;
+  }
+  if (NULL != grp->reconnect_msg)
+  {
+    GNUNET_free (grp->reconnect_msg);
+    grp->reconnect_msg = NULL;
+  }
+}
+
+
+/**
  * Iterator callback for calling message callbacks for all groups.
  */
 static int
-message_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash,
-                   void *group)
+message_callback (void *cls, const struct GNUNET_HashCode *pub_key_hash,
+                  void *group)
 {
   const struct GNUNET_MessageHeader *msg = cls;
   struct GNUNET_MULTICAST_Group *grp = group;
 
-  if (GNUNET_YES == grp->is_origin)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Calling origin's message callback "
-                "with a message of type %u and size %u.\n",
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Calling message callback with a message "
+              "of type %u and size %u.\n",
               ntohs (msg->type), ntohs (msg->size));
-    struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *) 
grp;
-    orig->message_cb (orig->cls, msg);
-  }
-  else
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Calling member's message callback "
-                "with a message of type %u and size %u.\n",
-                ntohs (msg->type), ntohs (msg->size));
-    struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *) 
grp;
-    mem->message_cb (mem->cls, msg);
-  }
 
+  if (NULL != grp->message_cb)
+    grp->message_cb (grp->cb_cls, msg);
+
   return GNUNET_YES;
 }
 
@@ -213,25 +467,12 @@
 handle_multicast_message (struct GNUNET_MULTICAST_Group *grp,
                           const struct GNUNET_MULTICAST_MessageHeader *msg)
 {
-  struct GNUNET_HashCode *hash;
-
-  if (GNUNET_YES == grp->is_origin)
-  {
-    struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *) 
grp;
-    hash = &orig->pub_key_hash;
-  }
-  else
-  {
-    struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *) 
grp;
-    hash = &mem->group_key_hash;
-  }
-
   if (origins != NULL)
-    GNUNET_CONTAINER_multihashmap_get_multiple (origins, hash, 
message_callback,
-                                                (void *) msg);
+    GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
+                                                message_callback, (void *) 
msg);
   if (members != NULL)
-    GNUNET_CONTAINER_multihashmap_get_multiple (members, hash, 
message_callback,
-                                                (void *) msg);
+    GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
+                                                message_callback, (void *) 
msg);
 }
 
 
@@ -249,7 +490,7 @@
               "Calling request callback for a request of type %u and size 
%u.\n",
               ntohs (req->header.type), ntohs (req->header.size));
 
-  orig->request_cb (orig->cls, &req->member_key,
+  orig->request_cb (orig->grp.cb_cls, &req->member_key,
                     (const struct GNUNET_MessageHeader *) req, 0);
   return GNUNET_YES;
 }
@@ -264,16 +505,94 @@
  * @param msg The message.
  */
 static void
-handle_multicast_request (const struct GNUNET_HashCode *group_key_hash,
+handle_multicast_request (struct GNUNET_MULTICAST_Group *grp,
                           const struct GNUNET_MULTICAST_RequestHeader *req)
 {
   if (NULL != origins)
-    GNUNET_CONTAINER_multihashmap_get_multiple (origins, group_key_hash,
+    GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
                                                 request_callback, (void *) 
req);
 }
 
 
 /**
+ * Function called when we receive a message from the service.
+ *
+ * @param cls  struct GNUNET_MULTICAST_Group
+ * @param msg  Message received, NULL on timeout or fatal error.
+ */
+static void
+message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_MULTICAST_Group *grp = cls;
+
+  if (NULL == msg)
+  {
+    // timeout / disconnected from service, reconnect
+    reschedule_connect (grp);
+    return;
+  }
+
+  uint16_t size_eq = 0;
+  uint16_t size_min = 0;
+  uint16_t size = ntohs (msg->size);
+  uint16_t type = ntohs (msg->type);
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received message of type %d and size %u from Multicast service\n",
+       type, size);
+
+  switch (type)
+  {
+  case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
+    size_min = sizeof (struct GNUNET_MULTICAST_MessageHeader);
+    break;
+
+  case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
+    size_min = sizeof (struct GNUNET_MULTICAST_RequestHeader);
+    break;
+
+  default:
+    GNUNET_break_op (0);
+    return;
+  }
+
+  if (! ((0 < size_eq && size == size_eq)
+         || (0 < size_min && size_min <= size)))
+  {
+    GNUNET_break_op (0);
+    return;
+  }
+
+  switch (type)
+  {
+  case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
+    handle_multicast_message (grp, (struct GNUNET_MULTICAST_MessageHeader *) 
msg);
+    break;
+
+  case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
+    if (GNUNET_YES != grp->is_origin)
+    {
+      GNUNET_break (0);
+      break;
+    }
+
+    handle_multicast_request (grp, (struct GNUNET_MULTICAST_RequestHeader *) 
msg);
+    break;
+
+  default:
+    GNUNET_break_op (0);
+    return;
+  }
+
+  if (NULL != grp->client)
+  {
+    GNUNET_CLIENT_receive (grp->client, &message_handler, grp,
+                           GNUNET_TIME_UNIT_FOREVER_REL);
+  }
+}
+
+
+/**
  * Function to call with the decision made for a join request.
  *
  * Must be called once and only once in response to an invocation of the
@@ -375,28 +694,29 @@
  * candidate will be given a response.  Members in the group can send messages
  * to the origin (one at a time).
  *
- * @param cfg Configuration to use.
- * @param priv_key ECC key that will be used to sign messages for this
+ * @param cfg  Configuration to use.
+ * @param priv_key  ECC key that will be used to sign messages for this
  *        multicast session; public key is used to identify the multicast 
group;
- * @param next_fragment_id Next fragment ID to continue counting fragments from
- *        when restarting the origin.  0 for a new group.
- * @param join_cb Function called to approve / disapprove joining of a peer.
- * @param mem_test_cb Function multicast can use to test group membership.
- * @param replay_frag_cb Function that can be called to replay a message 
fragment.
- * @param replay_msg_cb Function that can be called to replay a message.
- * @param request_cb Function called with message fragments from group members.
- * @param message_cb Function called with the message fragments sent to the
+ * @param max_fragment_id  Maximum fragment ID already sent to the group.
+ *        0 for a new group.
+ * @param join_cb  Function called to approve / disapprove joining of a peer.
+ * @param member_test_cb  Function multicast can use to test group membership.
+ * @param replay_frag_cb  Function that can be called to replay a message 
fragment.
+ * @param replay_msg_cb  Function that can be called to replay a message.
+ * @param request_cb  Function called with message fragments from group 
members.
+ * @param message_cb  Function called with the message fragments sent to the
  *        network by GNUNET_MULTICAST_origin_to_all().  These message fragments
  *        should be stored for answering replay requests later.
- * @param cls Closure for the various callbacks that follow.
+ * @param cls  Closure for the various callbacks that follow.
+ *
  * @return Handle for the origin, NULL on error.
  */
 struct GNUNET_MULTICAST_Origin *
 GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
                                const struct GNUNET_CRYPTO_EddsaPrivateKey 
*priv_key,
-                               uint64_t next_fragment_id,
+                               uint64_t max_fragment_id,
                                GNUNET_MULTICAST_JoinCallback join_cb,
-                               GNUNET_MULTICAST_MembershipTestCallback 
mem_test_cb,
+                               GNUNET_MULTICAST_MembershipTestCallback 
member_test_cb,
                                GNUNET_MULTICAST_ReplayFragmentCallback 
replay_frag_cb,
                                GNUNET_MULTICAST_ReplayMessageCallback 
replay_msg_cb,
                                GNUNET_MULTICAST_RequestCallback request_cb,
@@ -404,28 +724,40 @@
                                void *cls)
 {
   struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig));
-  orig->grp.is_origin = GNUNET_YES;
+  struct GNUNET_MULTICAST_Group *grp = &orig->grp;
+  struct MulticastOriginStartMessage *start = GNUNET_malloc (sizeof (*start));
+
+  start->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START);
+  start->header.size = htons (sizeof (*start));
+  start->max_fragment_id = max_fragment_id;
+  memcpy (&start->group_key, priv_key, sizeof (*priv_key));
+
+  grp->reconnect_msg = (struct GNUNET_MessageHeader *) start;
+  grp->is_origin = GNUNET_YES;
+  grp->cfg = cfg;
+
+  grp->cb_cls = cls;
+  grp->join_cb = join_cb;
+  grp->member_test_cb = member_test_cb;
+  grp->replay_frag_cb = replay_frag_cb;
+  grp->replay_msg_cb = replay_msg_cb;
+  grp->message_cb = message_cb;
+
+  orig->request_cb = request_cb;
   orig->priv_key = *priv_key;
-  orig->next_fragment_id = next_fragment_id;
-  orig->join_cb = join_cb;
-  orig->mem_test_cb = mem_test_cb;
-  orig->replay_frag_cb = replay_frag_cb;
-  orig->replay_msg_cb = replay_msg_cb;
-  orig->request_cb = request_cb;
-  orig->message_cb = message_cb;
-  orig->cls = cls;
 
-  GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &orig->pub_key);
-  GNUNET_CRYPTO_hash (&orig->pub_key, sizeof (orig->pub_key),
-                      &orig->pub_key_hash);
+  GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &grp->pub_key);
+  GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key),
+                      &grp->pub_key_hash);
 
   if (NULL == origins)
     origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
 
-  GNUNET_CONTAINER_multihashmap_put (origins, &orig->pub_key_hash, orig,
+  GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
                                      
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
 
-  /* FIXME: send ORIGIN_START to service */
+  grp->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
+  grp->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, grp);
 
   return orig;
 }
@@ -439,124 +771,105 @@
 void
 GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig)
 {
-  GNUNET_CONTAINER_multihashmap_remove (origins, &orig->pub_key_hash, orig);
+  disconnect (&orig->grp);
+  GNUNET_CONTAINER_multihashmap_remove (origins, &orig->grp.pub_key_hash, 
orig);
   GNUNET_free (orig);
 }
 
 
-/* FIXME: for now just call clients' callbacks
- *        without sending anything to multicast. */
 static void
-schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext 
*tc)
+origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
 {
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "schedule_origin_to_all()\n");
-  struct GNUNET_MULTICAST_Origin *orig = cls;
-  struct GNUNET_MULTICAST_OriginMessageHandle *mh = &orig->msg_handle;
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "origin_to_all()\n");
+  struct GNUNET_MULTICAST_Group *grp = &orig->grp;
+  struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
 
   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
-  char buf[GNUNET_MULTICAST_FRAGMENT_MAX_SIZE] = "";
-  struct GNUNET_MULTICAST_MessageHeader *msg
-    = (struct GNUNET_MULTICAST_MessageHeader *) buf;
-  int ret = mh->notify (mh->notify_cls, &buf_size, &msg[1]);
+  struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + buf_size);
+  GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq);
 
+  struct GNUNET_MULTICAST_MessageHeader *
+    msg = (struct GNUNET_MULTICAST_MessageHeader *) &mq[1];
+  int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]);
+
   if (! (GNUNET_YES == ret || GNUNET_NO == ret)
       || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size)
   {
     LOG (GNUNET_ERROR_TYPE_ERROR,
          "OriginTransmitNotify() returned error or invalid message size.\n");
     /* FIXME: handle error */
+    GNUNET_free (mq);
     return;
   }
 
   if (GNUNET_NO == ret && 0 == buf_size)
+  {
+    GNUNET_free (mq);
     return; /* Transmission paused. */
+  }
 
   msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
   msg->header.size = htons (sizeof (*msg) + buf_size);
-  msg->message_id = GNUNET_htonll (mh->message_id);
-  msg->group_generation = mh->group_generation;
+  msg->message_id = GNUNET_htonll (tmit->message_id);
+  msg->group_generation = tmit->group_generation;
+  msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
+  tmit->fragment_offset += sizeof (*msg) + buf_size;
 
-  /* FIXME: add fragment ID and signature in the service instead of here */
-  msg->fragment_id = GNUNET_htonll (orig->next_fragment_id++);
-  msg->fragment_offset = GNUNET_htonll (mh->fragment_offset);
-  mh->fragment_offset += sizeof (*msg) + buf_size;
-  msg->purpose.size = htonl (sizeof (*msg) + buf_size
-                             - sizeof (msg->header)
-                             - sizeof (msg->hop_counter)
-                             - sizeof (msg->signature));
-  msg->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
-
-  if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&orig->priv_key, &msg->purpose,
-                                           &msg->signature))
-  {
-    /* FIXME: handle error */
-    return;
-  }
-
-  /* FIXME: send msg to the service and only then call handle_multicast_message
-   *        with the returned signed message.
-   */
-  handle_multicast_message (&orig->grp, msg);
-
-  if (GNUNET_NO == ret)
-    GNUNET_SCHEDULER_add_delayed (
-      GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1),
-      schedule_origin_to_all, orig);
+  transmit_next (grp);
 }
 
 
 /**
  * Send a message to the multicast group.
  *
- * @param origin Handle to the multicast group.
- * @param message_id Application layer ID for the message.  Opaque to 
multicast.
- * @param group_generation Group generation of the message.  Documented in
- *             `struct GNUNET_MULTICAST_MessageHeader`.
- * @param notify Function to call to get the message.
- * @param notify_cls Closure for @a notify.
- * @return NULL on error (i.e. request already pending).
+ * @param orig  Handle to the multicast group.
+ * @param message_id  Application layer ID for the message.  Opaque to 
multicast.
+ * @param group_generation  Group generation of the message.
+ *                          Documented in struct 
GNUNET_MULTICAST_MessageHeader.
+ * @param notify  Function to call to get the message.
+ * @param notify_cls  Closure for @a notify.
+ *
+ * @return Message handle on success,
+ *         NULL on error (i.e. another request is already pending).
  */
-struct GNUNET_MULTICAST_OriginMessageHandle *
-GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *origin,
+struct GNUNET_MULTICAST_OriginTransmitHandle *
+GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
                                 uint64_t message_id,
                                 uint64_t group_generation,
                                 GNUNET_MULTICAST_OriginTransmitNotify notify,
                                 void *notify_cls)
 {
-  struct GNUNET_MULTICAST_OriginMessageHandle *mh = &origin->msg_handle;
-  mh->origin = origin;
-  mh->message_id = message_id;
-  mh->group_generation = group_generation;
-  mh->notify = notify;
-  mh->notify_cls = notify_cls;
+  struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
+  tmit->origin = orig;
+  tmit->message_id = message_id;
+  tmit->group_generation = group_generation;
+  tmit->notify = notify;
+  tmit->notify_cls = notify_cls;
 
-  /* add some delay for testing */
-  GNUNET_SCHEDULER_add_delayed (
-    GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1),
-    schedule_origin_to_all, origin);
-  return &origin->msg_handle;
+  origin_to_all (orig);
+  return tmit;
 }
 
 
 /**
  * Resume message transmission to multicast group.
  *
- * @param mh Request to cancel.
+ * @param th  Transmission to cancel.
  */
 void
-GNUNET_MULTICAST_origin_to_all_resume (struct 
GNUNET_MULTICAST_OriginMessageHandle *mh)
+GNUNET_MULTICAST_origin_to_all_resume (struct 
GNUNET_MULTICAST_OriginTransmitHandle *th)
 {
-  GNUNET_SCHEDULER_add_now (schedule_origin_to_all, mh->origin);
+  origin_to_all (th->origin);
 }
 
 
 /**
  * Cancel request for message transmission to multicast group.
  *
- * @param mh Request to cancel.
+ * @param th  Transmission to cancel.
  */
 void
-GNUNET_MULTICAST_origin_to_all_cancel (struct 
GNUNET_MULTICAST_OriginMessageHandle *mh)
+GNUNET_MULTICAST_origin_to_all_cancel (struct 
GNUNET_MULTICAST_OriginTransmitHandle *th)
 {
 }
 
@@ -584,12 +897,12 @@
  * @param relays Peer identities of members of the group, which serve as relays
  *        and can be used to join the group at. and send the @a join_request 
to.
  *        If empty, the @a join_request is sent directly to the @a origin.
- * @param join_request  Application-dependent join request to be passed to the 
peer
+ * @param join_req  Application-dependent join request to be passed to the peer
  *        @a relay (might, for example, contain a user, bind user
  *        identity/pseudonym to peer identity, application-level message to
  *        origin, etc.).
  * @param join_cb Function called to approve / disapprove joining of a peer.
- * @param mem_test_cb Function multicast can use to test group membership.
+ * @param member_test_cb Function multicast can use to test group membership.
  * @param replay_frag_cb Function that can be called to replay message 
fragments
  *        this peer already knows from this group. NULL if this
  *        client is unable to support replay.
@@ -609,7 +922,7 @@
                               const struct GNUNET_PeerIdentity *origin,
                               uint32_t relay_count,
                               const struct GNUNET_PeerIdentity *relays,
-                              const struct GNUNET_MessageHeader *join_request,
+                              const struct GNUNET_MessageHeader *join_req,
                               GNUNET_MULTICAST_JoinCallback join_cb,
                               GNUNET_MULTICAST_MembershipTestCallback 
member_test_cb,
                               GNUNET_MULTICAST_ReplayFragmentCallback 
replay_frag_cb,
@@ -618,33 +931,47 @@
                               void *cls)
 {
   struct GNUNET_MULTICAST_Member *mem = GNUNET_malloc (sizeof (*mem));
-  mem->group_key = *group_key;
-  mem->member_key = *member_key;
+  struct GNUNET_MULTICAST_Group *grp = &mem->grp;
+
+  uint16_t relay_size = relay_count * sizeof (*relays);
+  uint16_t join_req_size = (NULL != join_req) ? ntohs (join_req->size) : 0;
+  struct MulticastMemberJoinMessage *
+    join = GNUNET_malloc (sizeof (*join) + relay_size + join_req_size);
+  join->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
+  join->header.size = htons (sizeof (*join) + relay_size + join_req_size);
+  join->group_key = *group_key;
+  join->member_key = *member_key;
+  join->origin = *origin;
+  memcpy (&join[1], relays, relay_size);
+  memcpy (((char *) &join[1]) + relay_size, join_req, join_req_size);
+
+  grp->reconnect_msg = (struct GNUNET_MessageHeader *) join;
+  grp->is_origin = GNUNET_NO;
+  grp->cfg = cfg;
+  grp->pub_key = *group_key;
+
+  grp->join_cb = join_cb;
+  grp->member_test_cb = member_test_cb;
+  grp->replay_frag_cb = replay_frag_cb;
+  grp->message_cb = message_cb;
+  grp->cb_cls = cls;
+
   mem->origin = *origin;
   mem->relay_count = relay_count;
   mem->relays = *relays;
-  mem->join_cb = join_cb;
-  mem->member_test_cb = member_test_cb;
-  mem->replay_frag_cb = replay_frag_cb;
-  mem->message_cb = message_cb;
-  mem->cls = cls;
+  mem->priv_key = *member_key;
 
-  if (NULL != join_request)
-  {
-    uint16_t size = ntohs (join_request->size);
-    mem->join_request = GNUNET_malloc (size);
-    memcpy (mem->join_request, join_request, size);
-  }
+  GNUNET_CRYPTO_eddsa_key_get_public (&mem->priv_key, &grp->pub_key);
+  GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), 
&grp->pub_key_hash);
 
-  GNUNET_CRYPTO_hash (&mem->group_key, sizeof (mem->group_key), 
&mem->group_key_hash);
-
   if (NULL == members)
     members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
 
-  GNUNET_CONTAINER_multihashmap_put (members, &mem->group_key_hash, mem,
+  GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
                                      
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
 
-  /* FIXME: send MEMBER_JOIN to service */
+  grp->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
+  grp->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, grp);
 
   return mem;
 }
@@ -663,7 +990,8 @@
 void
 GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem)
 {
-  GNUNET_CONTAINER_multihashmap_remove (members, &mem->group_key_hash, mem);
+  disconnect (&mem->grp);
+  GNUNET_CONTAINER_multihashmap_remove (members, &mem->grp.pub_key_hash, mem);
   GNUNET_free (mem);
 }
 
@@ -729,20 +1057,21 @@
 }
 
 
-/* FIXME: for now just send back to the client what it sent. */
 static void
-schedule_member_to_origin (void *cls, const struct 
GNUNET_SCHEDULER_TaskContext *tc)
+member_to_origin (struct GNUNET_MULTICAST_Member *mem)
 {
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "schedule_member_to_origin()\n");
-  struct GNUNET_MULTICAST_Member *mem = cls;
-  struct GNUNET_MULTICAST_MemberRequestHandle *rh = &mem->req_handle;
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n");
+  struct GNUNET_MULTICAST_Group *grp = &mem->grp;
+  struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
 
   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD;
-  char buf[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
-  struct GNUNET_MULTICAST_RequestHeader *req
-    = (struct GNUNET_MULTICAST_RequestHeader *) buf;
-  int ret = rh->notify (rh->notify_cls, &buf_size, &req[1]);
+  struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + buf_size);
+  GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq);
 
+  struct GNUNET_MULTICAST_RequestHeader *
+    req = (struct GNUNET_MULTICAST_RequestHeader *) &mq[1];
+  int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]);
+
   if (! (GNUNET_YES == ret || GNUNET_NO == ret)
       || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size)
   {
@@ -757,73 +1086,47 @@
 
   req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
   req->header.size = htons (sizeof (*req) + buf_size);
-  req->request_id = GNUNET_htonll (rh->request_id);
+  req->request_id = GNUNET_htonll (tmit->request_id);
+  req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset);
+  tmit->fragment_offset += sizeof (*req) + buf_size;
 
-  /* FIXME: add fragment ID and signature in the service instead of here */
-  req->fragment_id = GNUNET_ntohll (mem->next_fragment_id++);
-  req->fragment_offset = GNUNET_ntohll (rh->fragment_offset);
-  rh->fragment_offset += sizeof (*req) + buf_size;
-  req->purpose.size = htonl (sizeof (*req) + buf_size
-                             - sizeof (req->header)
-                             - sizeof (req->member_key)
-                             - sizeof (req->signature));
-  req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
-
-  if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&mem->member_key, &req->purpose,
-                                           &req->signature))
-  {
-    /* FIXME: handle error */
-    return;
-  }
-
-  /* FIXME: send req to the service and only then call handle_multicast_request
-   *        with the returned request.
-   */
-  handle_multicast_request (&mem->group_key_hash, req);
-
-  if (GNUNET_NO == ret)
-    GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
-                                  (GNUNET_TIME_UNIT_SECONDS, 1),
-                                  schedule_member_to_origin, mem);
+  transmit_next (grp);
 }
 
 
 /**
  * Send a message to the origin of the multicast group.
  *
- * @param member Membership handle.
+ * @param mem Membership handle.
  * @param request_id Application layer ID for the request.  Opaque to 
multicast.
  * @param notify Callback to call to get the message.
  * @param notify_cls Closure for @a notify.
  * @return Handle to cancel request, NULL on error (i.e. request already 
pending).
  */
-struct GNUNET_MULTICAST_MemberRequestHandle *
-GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *member,
+struct GNUNET_MULTICAST_MemberTransmitHandle *
+GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
                                    uint64_t request_id,
                                    GNUNET_MULTICAST_MemberTransmitNotify 
notify,
                                    void *notify_cls)
 {
-  struct GNUNET_MULTICAST_MemberRequestHandle *rh = &member->req_handle;
-  rh->member = member;
-  rh->request_id = request_id;
-  rh->notify = notify;
-  rh->notify_cls = notify_cls;
+  struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
+  tmit->member = mem;
+  tmit->request_id = request_id;
+  tmit->notify = notify;
+  tmit->notify_cls = notify_cls;
 
-  /* FIXME: remove delay, it's there only for testing */
-  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
-                                (GNUNET_TIME_UNIT_SECONDS, 1),
-                                schedule_member_to_origin, member);
-  return &member->req_handle;
+  member_to_origin (mem);
+  return tmit;
 }
 
 
 /**
  * Resume message transmission to origin.
  *
- * @param rh Request to cancel.
+ * @param th  Transmission to cancel.
  */
 void
-GNUNET_MULTICAST_member_to_origin_resume (struct 
GNUNET_MULTICAST_MemberRequestHandle *rh)
+GNUNET_MULTICAST_member_to_origin_resume (struct 
GNUNET_MULTICAST_MemberTransmitHandle *th)
 {
 
 }
@@ -832,10 +1135,10 @@
 /**
  * Cancel request for message transmission to origin.
  *
- * @param rh Request to cancel.
+ * @param th  Transmission to cancel.
  */
 void
-GNUNET_MULTICAST_member_to_origin_cancel (struct 
GNUNET_MULTICAST_MemberRequestHandle *rh)
+GNUNET_MULTICAST_member_to_origin_cancel (struct 
GNUNET_MULTICAST_MemberTransmitHandle *th)
 {
 }
 

Modified: gnunet/src/psyc/Makefile.am
===================================================================
--- gnunet/src/psyc/Makefile.am 2014-05-13 12:06:08 UTC (rev 33257)
+++ gnunet/src/psyc/Makefile.am 2014-05-13 12:08:14 UTC (rev 33258)
@@ -42,14 +42,14 @@
  gnunet-service-psyc.c \
  psyc_common.c
 gnunet_service_psyc_LDADD = \
+  $(top_builddir)/src/util/libgnunetutil.la \
   $(top_builddir)/src/statistics/libgnunetstatistics.la \
-  $(top_builddir)/src/util/libgnunetutil.la \
   $(top_builddir)/src/multicast/libgnunetmulticast.la \
   $(top_builddir)/src/psycstore/libgnunetpsycstore.la \
   $(GN_LIBINTL)
 gnunet_service_psyc_DEPENDENCIES = \
+  $(top_builddir)/src/util/libgnunetutil.la \
   $(top_builddir)/src/statistics/libgnunetstatistics.la \
-  $(top_builddir)/src/util/libgnunetutil.la \
   $(top_builddir)/src/multicast/libgnunetmulticast.la \
   $(top_builddir)/src/psycstore/libgnunetpsycstore.la
 gnunet_service_psyc_CFLAGS = $(AM_CFLAGS)

Modified: gnunet/src/psyc/gnunet-service-psyc.c
===================================================================
--- gnunet/src/psyc/gnunet-service-psyc.c       2014-05-13 12:06:08 UTC (rev 
33257)
+++ gnunet/src/psyc/gnunet-service-psyc.c       2014-05-13 12:08:14 UTC (rev 
33258)
@@ -58,12 +58,18 @@
 static struct GNUNET_PSYCSTORE_Handle *store;
 
 /**
- * All connected masters and slaves.
+ * All connected masters.
  * Channel's pub_key_hash -> struct Channel
  */
-static struct GNUNET_CONTAINER_MultiHashMap *clients;
+static struct GNUNET_CONTAINER_MultiHashMap *masters;
 
+/**
+ * All connected slaves.
+ * Channel's pub_key_hash -> struct Channel
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *slaves;
 
+
 /**
  * Message in the transmission queue.
  */
@@ -158,7 +164,7 @@
 
 
 /**
- * Common part of the client context for both a master and slave channel.
+ * Common part of the client context for both a channel master and slave.
  */
 struct Channel
 {
@@ -266,7 +272,7 @@
   /**
    * Transmit handle for multicast.
    */
-  struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
+  struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
 
   /**
    * Last message ID transmitted to this channel.
@@ -307,7 +313,7 @@
   /**
    * Private key of the slave.
    */
-  struct GNUNET_CRYPTO_EddsaPrivateKey slave_key;
+  struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
 
   /**
    * Handle for the multicast member.
@@ -317,7 +323,7 @@
   /**
    * Transmit handle for multicast.
    */
-  struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle;
+  struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
 
   /**
    * Peer identity of the origin.
@@ -382,7 +388,7 @@
     struct Master *mst = (struct Master *) ch;
     if (NULL != mst->origin)
       GNUNET_MULTICAST_origin_stop (mst->origin);
-    GNUNET_CONTAINER_multihashmap_remove (clients, &ch->pub_key_hash, mst);
+    GNUNET_CONTAINER_multihashmap_remove (masters, &ch->pub_key_hash, ch);
   }
   else
   {
@@ -393,6 +399,7 @@
       GNUNET_free (slv->relays);
     if (NULL != slv->member)
       GNUNET_MULTICAST_member_part (slv->member);
+    GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, ch);
   }
 
   GNUNET_free (ch);
@@ -975,7 +982,7 @@
                                                           NULL, NULL))
     {
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "%p Dropping message with invalid parts "
+                  "%p Dropping request with invalid parts "
                   "received from multicast.\n", ch);
       GNUNET_break_op (0);
       break;
@@ -1017,6 +1024,7 @@
 {
   struct Master *mst = cls;
   struct Channel *ch = &mst->channel;
+
   struct CountersResult *res = GNUNET_malloc (sizeof (*res));
   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
   res->header.size = htons (sizeof (*res));
@@ -1031,12 +1039,20 @@
     mst->max_group_generation = max_group_generation;
     mst->origin
       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key,
-                                       max_fragment_id + 1,
+                                       max_fragment_id,
                                        join_cb, membership_test_cb,
                                        replay_fragment_cb, replay_message_cb,
                                        request_cb, message_cb, ch);
     ch->ready = GNUNET_YES;
   }
+  else
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "%p GNUNET_PSYCSTORE_counters_get() "
+                "returned %d for channel %s.\n",
+                ch, result, GNUNET_h2s (&ch->pub_key_hash));
+  }
+
   GNUNET_SERVER_notification_context_add (nc, ch->client);
   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
                                               GNUNET_NO);
@@ -1054,6 +1070,7 @@
 {
   struct Slave *slv = cls;
   struct Channel *ch = &slv->channel;
+
   struct CountersResult *res = GNUNET_malloc (sizeof (*res));
   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
   res->header.size = htons (sizeof (*res));
@@ -1065,7 +1082,7 @@
     ch->max_message_id = max_message_id;
     ch->max_state_message_id = max_state_message_id;
     slv->member
-      = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->slave_key,
+      = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key,
                                       &slv->origin,
                                       slv->relay_count, slv->relays,
                                       slv->join_req, join_cb,
@@ -1074,6 +1091,13 @@
                                       message_cb, ch);
     ch->ready = GNUNET_YES;
   }
+  else
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "%p GNUNET_PSYCSTORE_counters_get() "
+                "returned %d for channel %s.\n",
+                ch, result, GNUNET_h2s (&ch->pub_key_hash));
+  }
 
   GNUNET_SERVER_notification_context_add (nc, ch->client);
   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
@@ -1118,9 +1142,9 @@
 
   GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, master_counters_cb, mst);
 
-  GNUNET_SERVER_client_set_user_context (client, &mst->channel);
-  GNUNET_CONTAINER_multihashmap_put (clients, &ch->pub_key_hash, mst,
+  GNUNET_CONTAINER_multihashmap_put (masters, &ch->pub_key_hash, ch,
                                      
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  GNUNET_SERVER_client_set_user_context (client, ch);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
@@ -1135,7 +1159,7 @@
   const struct SlaveJoinRequest *req
     = (const struct SlaveJoinRequest *) msg;
   struct Slave *slv = GNUNET_new (struct Slave);
-  slv->slave_key = req->slave_key;
+  slv->priv_key = req->slave_key;
   slv->origin = req->origin;
   slv->relay_count = ntohl (req->relay_count);
   if (0 < slv->relay_count)
@@ -1163,6 +1187,8 @@
 
   GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, slave_counters_cb, slv);
 
+  GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch,
+                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
   GNUNET_SERVER_client_set_user_context (client, &slv->channel);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
@@ -1183,8 +1209,7 @@
   res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
 
   GNUNET_SERVER_notification_context_add (nc, ch->client);
-  GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res,
-                                              GNUNET_NO);
+  GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res, GNUNET_NO);
 }
 
 
@@ -1554,7 +1579,8 @@
   cfg = c;
   store = GNUNET_PSYCSTORE_connect (cfg);
   stats = GNUNET_STATISTICS_create ("psyc", cfg);
-  clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+  masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+  slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
   nc = GNUNET_SERVER_notification_context_create (server, 1);
   GNUNET_SERVER_add_handlers (server, handlers);

Modified: gnunet/src/psyc/psyc_api.c
===================================================================
--- gnunet/src/psyc/psyc_api.c  2014-05-13 12:06:08 UTC (rev 33257)
+++ gnunet/src/psyc/psyc_api.c  2014-05-13 12:08:14 UTC (rev 33258)
@@ -41,11 +41,10 @@
 
 #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
 
-struct OperationHandle
+struct MessageQueue
 {
-  struct OperationHandle *prev;
-  struct OperationHandle *next;
-  struct GNUNET_MessageHeader *msg;
+  struct MessageQueue *prev;
+  struct MessageQueue *next;
 };
 
 
@@ -87,19 +86,19 @@
   struct GNUNET_CLIENT_TransmitHandle *th;
 
   /**
-   * Head of operations to transmit.
+   * Head of messages to transmit to the service.
    */
-  struct OperationHandle *tmit_head;
+  struct MessageQueue *tmit_head;
 
   /**
-   * Tail of operations to transmit.
+   * Tail of operations to transmit to the service.
    */
-  struct OperationHandle *tmit_tail;
+  struct MessageQueue *tmit_tail;
 
   /**
-   * Message being transmitted to the PSYC service.
+   * Message currently being transmitted to the service.
    */
-  struct OperationHandle *tmit_msg;
+  struct MessageQueue *tmit_msg;
 
   /**
    * Message to send on reconnect.
@@ -201,8 +200,6 @@
   struct GNUNET_PSYC_Channel ch;
 
   GNUNET_PSYC_MasterStartCallback start_cb;
-
-  uint64_t max_message_id;
 };
 
 
@@ -214,8 +211,6 @@
   struct GNUNET_PSYC_Channel ch;
 
   GNUNET_PSYC_SlaveJoinCallback join_cb;
-
-  uint64_t max_message_id;
 };
 
 
@@ -269,30 +264,30 @@
 /**
  * Reschedule a connect attempt to the service.
  *
- * @param c channel to reconnect
+ * @param ch  Channel to reconnect.
  */
 static void
-reschedule_connect (struct GNUNET_PSYC_Channel *c)
+reschedule_connect (struct GNUNET_PSYC_Channel *ch)
 {
-  GNUNET_assert (c->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
+  GNUNET_assert (ch->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
 
-  if (NULL != c->th)
+  if (NULL != ch->th)
   {
-    GNUNET_CLIENT_notify_transmit_ready_cancel (c->th);
-    c->th = NULL;
+    GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th);
+    ch->th = NULL;
   }
-  if (NULL != c->client)
+  if (NULL != ch->client)
   {
-    GNUNET_CLIENT_disconnect (c->client);
-    c->client = NULL;
+    GNUNET_CLIENT_disconnect (ch->client);
+    ch->client = NULL;
   }
-  c->in_receive = GNUNET_NO;
+  ch->in_receive = GNUNET_NO;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Scheduling task to reconnect to PSYC service in %s.\n",
-       GNUNET_STRINGS_relative_time_to_string (c->reconnect_delay, 
GNUNET_YES));
-  c->reconnect_task =
-      GNUNET_SCHEDULER_add_delayed (c->reconnect_delay, &reconnect, c);
-  c->reconnect_delay = GNUNET_TIME_STD_BACKOFF (c->reconnect_delay);
+       GNUNET_STRINGS_relative_time_to_string (ch->reconnect_delay, 
GNUNET_YES));
+  ch->reconnect_task =
+      GNUNET_SCHEDULER_add_delayed (ch->reconnect_delay, &reconnect, ch);
+  ch->reconnect_delay = GNUNET_TIME_STD_BACKOFF (ch->reconnect_delay);
 }
 
 
@@ -306,7 +301,7 @@
 
 
 /**
- * Reset data stored related to the last received message.
+ * Reset stored data related to the last received message.
  */
 static void
 recv_reset (struct GNUNET_PSYC_Channel *ch)
@@ -356,51 +351,53 @@
        "Queueing message of type %u and size %u (end: %u)).\n",
        ntohs (msg->type), size, end);
 
-  struct OperationHandle *op = ch->tmit_msg;
-  if (NULL != op)
+  struct MessageQueue *mq = ch->tmit_msg;
+  struct GNUNET_MessageHeader *qmsg = NULL;
+  if (NULL != mq)
   {
+    qmsg = (struct GNUNET_MessageHeader *) &mq[1];
     if (NULL == msg
-        || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < op->msg->size + size)
+        || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < qmsg->size + size)
     {
       /* End of message or buffer is full, add it to transmission queue
        * and start with empty buffer */
-      op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
-      op->msg->size = htons (op->msg->size);
-      GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
-      ch->tmit_msg = op = NULL;
+      qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+      qmsg->size = htons (qmsg->size);
+      GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq);
+      ch->tmit_msg = mq = NULL;
       ch->tmit_ack_pending++;
     }
     else
     {
       /* Message fits in current buffer, append */
-      ch->tmit_msg = op
-        = GNUNET_realloc (op, sizeof (*op) + op->msg->size + size);
-      op->msg = (struct GNUNET_MessageHeader *) &op[1];
-      memcpy ((char *) op->msg + op->msg->size, msg, size);
-      op->msg->size += size;
+      ch->tmit_msg
+        = mq = GNUNET_realloc (mq, sizeof (*mq) + qmsg->size + size);
+      qmsg = (struct GNUNET_MessageHeader *) &mq[1];
+      memcpy ((char *) qmsg + qmsg->size, msg, size);
+      qmsg->size += size;
     }
   }
 
-  if (NULL == op && NULL != msg)
+  if (NULL == mq && NULL != msg)
   {
     /* Empty buffer, copy over message. */
-    ch->tmit_msg = op
-      = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg) + size);
-    op->msg = (struct GNUNET_MessageHeader *) &op[1];
-    op->msg->size = sizeof (*op->msg) + size;
-    memcpy (&op->msg[1], msg, size);
+    ch->tmit_msg
+      = mq = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg) + size);
+    qmsg = (struct GNUNET_MessageHeader *) &mq[1];
+    qmsg->size = sizeof (*qmsg) + size;
+    memcpy (&qmsg[1], msg, size);
   }
 
-  if (NULL != op
+  if (NULL != mq
       && (GNUNET_YES == end
           || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
-              < op->msg->size + sizeof (struct GNUNET_MessageHeader))))
+              < qmsg->size + sizeof (struct GNUNET_MessageHeader))))
   {
     /* End of message or buffer is full, add it to transmission queue. */
-    op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
-    op->msg->size = htons (op->msg->size);
-    GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
-    ch->tmit_msg = op = NULL;
+    qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+    qmsg->size = htons (qmsg->size);
+    GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq);
+    ch->tmit_msg = mq = NULL;
     ch->tmit_ack_pending++;
   }
 
@@ -577,6 +574,7 @@
  * @param notify_data Function to call to obtain fragments of the data.
  * @param notify_cls Closure for @a notify_mod and @a notify_data.
  * @param flags Flags for the message being transmitted.
+ *
  * @return Transmission handle, NULL on error (i.e. more than one request 
queued).
  */
 static struct GNUNET_PSYC_ChannelTransmitHandle *
@@ -593,14 +591,14 @@
 
   size_t size = strlen (method_name) + 1;
   struct GNUNET_PSYC_MessageMethod *pmeth;
-  struct OperationHandle *op;
+  struct GNUNET_MessageHeader *qmsg;
+  struct MessageQueue *
+    mq = ch->tmit_msg = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg)
+                                       + sizeof (*pmeth) + size);
+  qmsg = (struct GNUNET_MessageHeader *) &mq[1];
+  qmsg->size = sizeof (*qmsg) + sizeof (*pmeth) + size;
 
-  ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg)
-                                     + sizeof (*pmeth) + size);
-  op->msg = (struct GNUNET_MessageHeader *) &op[1];
-  op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size;
-
-  pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1];
+  pmeth = (struct GNUNET_PSYC_MessageMethod *) &qmsg[1];
   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
   pmeth->header.size = htons (sizeof (*pmeth) + size);
   pmeth->flags = htonl (flags);
@@ -928,7 +926,7 @@
 
   if (NULL == msg)
   {
-    // timeout / disconnected from server, reconnect
+    // timeout / disconnected from service, reconnect
     reschedule_connect (ch);
     return;
   }
@@ -970,17 +968,15 @@
   case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK:
   {
     struct CountersResult *cres = (struct CountersResult *) msg;
-    mst->max_message_id = GNUNET_ntohll (cres->max_message_id);
     if (NULL != mst->start_cb)
-      mst->start_cb (ch->cb_cls, mst->max_message_id);
+      mst->start_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id));
     break;
   }
   case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
   {
     struct CountersResult *cres = (struct CountersResult *) msg;
-    slv->max_message_id = GNUNET_ntohll (cres->max_message_id);
     if (NULL != slv->join_cb)
-      slv->join_cb (ch->cb_cls, slv->max_message_id);
+      slv->join_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id));
     break;
   }
   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
@@ -1005,31 +1001,32 @@
 /**
  * Transmit next message to service.
  *
- * @param cls The 'struct GNUNET_PSYC_Channel'.
- * @param size Number of bytes available in buf.
- * @param buf Where to copy the message.
- * @return Number of bytes copied to buf.
+ * @param cls  The struct GNUNET_PSYC_Channel.
+ * @param size Number of bytes available in @a buf.
+ * @param buf  Where to copy the message.
+ *
+ * @return Number of bytes copied to @a buf.
  */
 static size_t
 send_next_message (void *cls, size_t size, void *buf)
 {
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n");
   struct GNUNET_PSYC_Channel *ch = cls;
-  struct OperationHandle *op = ch->tmit_head;
-  size_t ret;
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n");
+  struct MessageQueue *mq = ch->tmit_head;
+  if (NULL == mq)
+    return 0;
+  struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
+  size_t ret = ntohs (qmsg->size);
   ch->th = NULL;
-  if (NULL == op->msg)
-    return 0;
-  ret = ntohs (op->msg->size);
   if (ret > size)
   {
     reschedule_connect (ch);
     return 0;
   }
-  memcpy (buf, op->msg, ret);
+  memcpy (buf, qmsg, ret);
 
-  GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, op);
-  GNUNET_free (op);
+  GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, mq);
+  GNUNET_free (mq);
 
   if (NULL != ch->tmit_head)
     transmit_next (ch);
@@ -1056,12 +1053,13 @@
   if (NULL != ch->th || NULL == ch->client)
     return;
 
-  struct OperationHandle *op = ch->tmit_head;
-  if (NULL == op)
+  struct MessageQueue *mq = ch->tmit_head;
+  if (NULL == mq)
     return;
+  struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
 
   ch->th = GNUNET_CLIENT_notify_transmit_ready (ch->client,
-                                                ntohs (op->msg->size),
+                                                ntohs (qmsg->size),
                                                 GNUNET_TIME_UNIT_FOREVER_REL,
                                                 GNUNET_NO,
                                                 &send_next_message,
@@ -1087,15 +1085,14 @@
   GNUNET_assert (NULL == ch->client);
   ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg);
   GNUNET_assert (NULL != ch->client);
+  uint16_t reconn_size = ntohs (ch->reconnect_msg->size);
 
   if (NULL == ch->tmit_head ||
-      ch->tmit_head->msg->type != ch->reconnect_msg->type)
+      0 != memcmp (&ch->tmit_head[1], ch->reconnect_msg, reconn_size))
   {
-    uint16_t reconn_size = ntohs (ch->reconnect_msg->size);
-    struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + reconn_size);
-    memcpy (&op[1], ch->reconnect_msg, reconn_size);
-    op->msg = (struct GNUNET_MessageHeader *) &op[1];
-    GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, op);
+    struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size);
+    memcpy (&mq[1], ch->reconnect_msg, reconn_size);
+    GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, mq);
   }
   transmit_next (ch);
 }
@@ -1104,7 +1101,7 @@
 /**
  * Disconnect from the PSYC service.
  *
- * @param c Channel handle to disconnect
+ * @param c  Channel handle to disconnect.
  */
 static void
 disconnect (void *c)
@@ -1167,6 +1164,7 @@
  * @param join_cb Function to invoke when a peer wants to join.
  * @param master_started_cb Function to invoke after the channel master 
started.
  * @param cls Closure for @a master_started_cb and @a join_cb.
+ *
  * @return Handle for the channel master, NULL on error.
  */
 struct GNUNET_PSYC_Master *
@@ -1187,17 +1185,16 @@
   req->channel_key = *channel_key;
   req->policy = policy;
 
+  mst->start_cb = master_started_cb;
+  ch->message_cb = message_cb;
+  ch->join_cb = join_cb;
+  ch->cb_cls = cls;
   ch->cfg = cfg;
   ch->is_master = GNUNET_YES;
   ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
   ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
   ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst);
 
-  ch->message_cb = message_cb;
-  ch->join_cb = join_cb;
-  ch->cb_cls = cls;
-  mst->start_cb = master_started_cb;
-
   return mst;
 }
 
@@ -1260,6 +1257,7 @@
  * @param notify_data Function to call to obtain fragments of the data.
  * @param notify_cls Closure for @a notify_mod and @a notify_data.
  * @param flags Flags for the message being transmitted.
+ *
  * @return Transmission handle, NULL on error (i.e. more than one request 
queued).
  */
 struct GNUNET_PSYC_MasterTransmitHandle *
@@ -1330,6 +1328,7 @@
  * @param env Environment containing transient variables for the request, or 
NULL.
  * @param data Payload for the join message.
  * @param data_size Number of bytes in @a data.
+ *
  * @return Handle for the slave, NULL on error.
  */
 struct GNUNET_PSYC_Slave *
@@ -1361,6 +1360,7 @@
   req->relay_count = htonl (relay_count);
   memcpy (&req[1], relays, relay_count * sizeof (*relays));
 
+  slv->join_cb = slave_joined_cb;
   ch->message_cb = message_cb;
   ch->join_cb = join_cb;
   ch->cb_cls = cls;
@@ -1371,7 +1371,6 @@
   ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
   ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv);
 
-  slv->join_cb = slave_joined_cb;
   return slv;
 }
 
@@ -1401,6 +1400,7 @@
  * @param notify_data Function to call to obtain fragments of the data.
  * @param notify_cls Closure for @a notify.
  * @param flags Flags for the message being transmitted.
+ *
  * @return Transmission handle, NULL on error (i.e. more than one request
  *         queued).
  */
@@ -1447,6 +1447,7 @@
  * APIs.
  *
  * @param master Channel master handle.
+ *
  * @return Channel handle, valid for as long as @a master is valid.
  */
 struct GNUNET_PSYC_Channel *
@@ -1460,6 +1461,7 @@
  * Convert @a slave to a @e channel handle to access the @e channel APIs.
  *
  * @param slave Slave handle.
+ *
  * @return Channel handle, valid for as long as @a slave is valid.
  */
 struct GNUNET_PSYC_Channel *
@@ -1497,18 +1499,16 @@
                                uint64_t effective_since)
 {
   struct ChannelSlaveAdd *slvadd;
-  struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*slvadd));
+  struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvadd));
 
-  slvadd = (struct ChannelSlaveAdd *) &op[1];
-  op->msg = (struct GNUNET_MessageHeader *) slvadd;
-
+  slvadd = (struct ChannelSlaveAdd *) &mq[1];
   slvadd->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD);
   slvadd->header.size = htons (sizeof (*slvadd));
   slvadd->announced_at = GNUNET_htonll (announced_at);
   slvadd->effective_since = GNUNET_htonll (effective_since);
   GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
                                     channel->tmit_tail,
-                                    op);
+                                    mq);
   transmit_next (channel);
 }
 
@@ -1540,16 +1540,15 @@
                                   uint64_t announced_at)
 {
   struct ChannelSlaveRemove *slvrm;
-  struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*slvrm));
+  struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvrm));
 
-  slvrm = (struct ChannelSlaveRemove *) &op[1];
-  op->msg = (struct GNUNET_MessageHeader *) slvrm;
+  slvrm = (struct ChannelSlaveRemove *) &mq[1];
   slvrm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM);
   slvrm->header.size = htons (sizeof (*slvrm));
   slvrm->announced_at = GNUNET_htonll (announced_at);
   GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
                                     channel->tmit_tail,
-                                    op);
+                                    mq);
   transmit_next (channel);
 }
 
@@ -1573,6 +1572,7 @@
  *        has been called, the client must not call
  *        GNUNET_PSYC_channel_story_tell_cancel() anymore.
  * @param cls Closure for the callbacks.
+ *
  * @return Handle to cancel story telling operation.
  */
 struct GNUNET_PSYC_Story *
@@ -1615,6 +1615,7 @@
  * @param cb Function called once when a matching state variable is found.
  *        Not called if there's no matching state variable.
  * @param cb_cls Closure for the callbacks.
+ *
  * @return Handle that can be used to cancel the query operation.
  */
 struct GNUNET_PSYC_StateQuery *
@@ -1641,6 +1642,7 @@
  * @param name_prefix Prefix of the state variable name to match.
  * @param cb Function to call with the matching state variables.
  * @param cb_cls Closure for the callbacks.
+ *
  * @return Handle that can be used to cancel the query operation.
  */
 struct GNUNET_PSYC_StateQuery *

Modified: gnunet/src/psyc/test_psyc.conf
===================================================================
--- gnunet/src/psyc/test_psyc.conf      2014-05-13 12:06:08 UTC (rev 33257)
+++ gnunet/src/psyc/test_psyc.conf      2014-05-13 12:08:14 UTC (rev 33258)
@@ -1,17 +1,2 @@
 [arm]
-UNIXPATH = $GNUNET_RUNTIME_DIR/test-gnunet-service-arm.sock
-DEFAULTSERVICES = psyc
-
-[psyc]
-AUTOSTART = YES
-BINARY = gnunet-service-psyc
-UNIXPATH = $GNUNET_RUNTIME_DIR/test-gnunet-service-psyc.sock
-UNIX_MATCH_UID = NO
-UNIX_MATCH_GID = YES
-
-[psycstore]
-AUTOSTART = YES
-BINARY = gnunet-service-psycstore
-UNIXPATH = $GNUNET_RUNTIME_DIR/test-gnunet-service-psycstore.sock
-UNIX_MATCH_UID = NO
-UNIX_MATCH_GID = YES
+DEFAULTSERVICES = psyc psycstore multicast




reply via email to

[Prev in Thread] Current Thread [Next in Thread]