gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r37995 - gnunet/src/psyc


From: gnunet
Subject: [GNUnet-SVN] r37995 - gnunet/src/psyc
Date: Sat, 24 Sep 2016 00:51:13 +0200

Author: tg
Date: 2016-09-24 00:51:13 +0200 (Sat, 24 Sep 2016)
New Revision: 37995

Modified:
   gnunet/src/psyc/gnunet-service-psyc.c
Log:
psyc: switch to SERVICE API

Modified: gnunet/src/psyc/gnunet-service-psyc.c
===================================================================
--- gnunet/src/psyc/gnunet-service-psyc.c       2016-09-23 21:17:16 UTC (rev 
37994)
+++ gnunet/src/psyc/gnunet-service-psyc.c       2016-09-23 22:51:13 UTC (rev 
37995)
@@ -44,14 +44,14 @@
 static const struct GNUNET_CONFIGURATION_Handle *cfg;
 
 /**
- * Handle to the statistics service.
+ * Service handle.
  */
-static struct GNUNET_STATISTICS_Handle *stats;
+struct GNUNET_SERVICE_Handle *service;
 
 /**
- * Notification context, simplifies client broadcasts.
+ * Handle to the statistics service.
  */
-static struct GNUNET_SERVER_NotificationContext *nc;
+static struct GNUNET_STATISTICS_Handle *stats;
 
 /**
  * Handle to the PSYCstore.
@@ -85,7 +85,7 @@
   struct TransmitMessage *prev;
   struct TransmitMessage *next;
 
-  struct GNUNET_SERVER_Client *client;
+  struct GNUNET_SERVICE_Client *client;
 
   /**
    * ID assigned to the message.
@@ -185,12 +185,12 @@
 /**
  * List of connected clients.
  */
-struct Client
+struct ClientList
 {
-  struct Client *prev;
-  struct Client *next;
+  struct ClientList *prev;
+  struct ClientList *next;
 
-  struct GNUNET_SERVER_Client *client;
+  struct GNUNET_SERVICE_Client *client;
 };
 
 
@@ -199,8 +199,8 @@
   struct Operation *prev;
   struct Operation *next;
 
-  struct GNUNET_SERVER_Client *client;
-  struct Channel *chn;
+  struct GNUNET_SERVICE_Client *client;
+  struct Channel *channel;
   uint64_t op_id;
   uint32_t flags;
 };
@@ -211,8 +211,8 @@
  */
 struct Channel
 {
-  struct Client *clients_head;
-  struct Client *clients_tail;
+  struct ClientList *clients_head;
+  struct ClientList *clients_tail;
 
   struct Operation *op_head;
   struct Operation *op_tail;
@@ -270,11 +270,6 @@
   uint32_t tmit_mod_value_size;
 
   /**
-   * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
-   */
-  uint8_t is_master;
-
-  /**
    * Is this channel ready to receive messages from client?
    * #GNUNET_YES or #GNUNET_NO
    */
@@ -285,6 +280,16 @@
    * #GNUNET_YES or #GNUNET_NO
    */
   uint8_t is_disconnected;
+
+  /**
+   * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
+   */
+  uint8_t is_master;
+
+  union {
+    struct Master *master;
+    struct Slave *slave;
+  };
 };
 
 
@@ -296,7 +301,7 @@
   /**
    * Channel struct common for Master and Slave
    */
-  struct Channel chn;
+  struct Channel channel;
 
   /**
    * Private key of the channel.
@@ -353,7 +358,7 @@
   /**
    * Channel struct common for Master and Slave
    */
-  struct Channel chn;
+  struct Channel channel;
 
   /**
    * Private key of the slave.
@@ -417,6 +422,24 @@
 };
 
 
+/**
+ * Client context.
+ */
+struct Client {
+  struct GNUNET_SERVICE_Client *client;
+  struct Channel *channel;
+};
+
+
+struct ReplayRequestKey
+{
+  uint64_t fragment_id;
+  uint64_t message_id;
+  uint64_t fragment_offset;
+  uint64_t flags;
+};
+
+
 static void
 transmit_message (struct Channel *chn);
 
@@ -444,11 +467,6 @@
 static void
 shutdown_task (void *cls)
 {
-  if (NULL != nc)
-  {
-    GNUNET_SERVER_notification_context_destroy (nc);
-    nc = NULL;
-  }
   if (NULL != stats)
   {
     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
@@ -458,12 +476,12 @@
 
 
 static struct Operation *
-op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
+op_add (struct Channel *chn, struct GNUNET_SERVICE_Client *client,
         uint64_t op_id, uint32_t flags)
 {
   struct Operation *op = GNUNET_malloc (sizeof (*op));
   op->client = client;
-  op->chn = chn;
+  op->channel = chn;
   op->op_id = op_id;
   op->flags = flags;
   GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
@@ -474,7 +492,7 @@
 static void
 op_remove (struct Operation *op)
 {
-  GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op);
+  GNUNET_CONTAINER_DLL_remove (op->channel->op_head, op->channel->op_tail, op);
   GNUNET_free (op);
 }
 
@@ -485,7 +503,7 @@
 static void
 cleanup_master (struct Master *mst)
 {
-  struct Channel *chn = &mst->chn;
+  struct Channel *chn = &mst->channel;
 
   if (NULL != mst->origin)
     GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
@@ -500,7 +518,7 @@
 static void
 cleanup_slave (struct Slave *slv)
 {
-  struct Channel *chn = &slv->chn;
+  struct Channel *chn = &slv->channel;
   struct GNUNET_CONTAINER_MultiHashMap *
     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
                                                 &chn->pub_key_hash);
@@ -556,8 +574,8 @@
   }
 
   (GNUNET_YES == chn->is_master)
-    ? cleanup_master ((struct Master *) chn)
-    : cleanup_slave ((struct Slave *) chn);
+    ? cleanup_master (chn->master)
+    : cleanup_slave (chn->slave);
   GNUNET_free (chn);
 }
 
@@ -566,18 +584,18 @@
  * Called whenever a client is disconnected.
  * Frees our resources associated with that client.
  *
- * @param cls Closure.
- * @param client Identification of the client.
+ * @param cls closure
+ * @param client identification of the client
+ * @param app_ctx must match @a client
  */
 static void
-client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
+client_notify_disconnect (void *cls,
+                          struct GNUNET_SERVICE_Client *client,
+                          void *app_ctx)
 {
-  if (NULL == client)
-    return;
+  struct Client *c = app_ctx;
+  struct Channel *chn = c->channel;
 
-  struct Channel *
-    chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
-
   if (NULL == chn)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -593,7 +611,7 @@
               (GNUNET_YES == chn->is_master) ? "master" : "slave",
               GNUNET_h2s (&chn->pub_key_hash));
 
-  struct Client *cli = chn->clients_head;
+  struct ClientList *cli = chn->clients_head;
   while (NULL != cli)
   {
     if (cli->client == client)
@@ -637,6 +655,28 @@
 
 
 /**
+ * A new client connected.
+ *
+ * @param cls NULL
+ * @param client client to add
+ * @param mq message queue for @a client
+ * @return @a client
+ */
+static void *
+client_notify_connect (void *cls,
+                       struct GNUNET_SERVICE_Client *client,
+                       struct GNUNET_MQ_Handle *mq)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
+
+  struct Client *c = GNUNET_malloc (sizeof (*c));
+  c->client = client;
+
+  return c;
+}
+
+
+/**
  * Send message to all clients connected to the channel.
  */
 static void
@@ -647,11 +687,15 @@
               "%p Sending message to clients.\n",
               chn);
 
-  struct Client *cli = chn->clients_head;
+  struct ClientList *cli = chn->clients_head;
   while (NULL != cli)
   {
-    GNUNET_SERVER_notification_context_add (nc, cli->client);
-    GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, 
GNUNET_NO);
+    struct GNUNET_MQ_Envelope *
+      env = GNUNET_MQ_msg_copy (msg);
+
+    GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cli->client),
+                    env);
+
     cli = cli->next;
   }
 }
@@ -672,14 +716,14 @@
  *        Size of @a data.
  */
 static void
-client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
+client_send_result (struct GNUNET_SERVICE_Client *client, uint64_t op_id,
                     int64_t result_code, const void *data, uint16_t data_size)
 {
   struct GNUNET_OperationResultMessage *res;
-
-  res = GNUNET_malloc (sizeof (*res) + data_size);
-  res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
-  res->header.size = htons (sizeof (*res) + data_size);
+  struct GNUNET_MQ_Envelope *
+    env = GNUNET_MQ_msg_extra (res,
+                               data_size,
+                               GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
   res->result_code = GNUNET_htonll (result_code);
   res->op_id = op_id;
   if (0 < data_size)
@@ -692,10 +736,7 @@
               result_code,
               data_size);
 
-  GNUNET_SERVER_notification_context_add (nc, client);
-  GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
-                                              GNUNET_NO);
-  GNUNET_free (res);
+  GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
 }
 
 
@@ -705,8 +746,8 @@
 struct JoinMemTestClosure
 {
   struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
-  struct Channel *chn;
-  struct GNUNET_MULTICAST_JoinHandle *jh;
+  struct Channel *channel;
+  struct GNUNET_MULTICAST_JoinHandle *join_handle;
   struct GNUNET_PSYC_JoinRequestMessage *join_msg;
 };
 
@@ -720,15 +761,15 @@
 {
   struct JoinMemTestClosure *jcls = cls;
 
-  if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
+  if (GNUNET_NO == result && GNUNET_YES == jcls->channel->is_master)
   { /* Pass on join request to client if this is a master channel */
-    struct Master *mst = (struct Master *) jcls->chn;
+    struct Master *mst = jcls->channel->master;
     struct GNUNET_HashCode slave_pub_hash;
     GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
                         &slave_pub_hash);
-    GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, 
jcls->jh,
+    GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, 
jcls->join_handle,
                                        
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-    client_send_msg (jcls->chn, &jcls->join_msg->header);
+    client_send_msg (jcls->channel, &jcls->join_msg->header);
   }
   else
   {
@@ -739,7 +780,7 @@
                   err_msg_size, err_msg);
     }
     // FIXME: add relays
-    GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
+    GNUNET_MULTICAST_join_decision (jcls->join_handle, result, 0, NULL, NULL);
   }
   GNUNET_free (jcls->join_msg);
   GNUNET_free (jcls);
@@ -786,8 +827,8 @@
 
   struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
   jcls->slave_pub_key = *slave_pub_key;
-  jcls->chn = chn;
-  jcls->jh = jh;
+  jcls->channel = chn;
+  jcls->join_handle = jh;
   jcls->join_msg = req;
 
   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
@@ -807,7 +848,7 @@
                           const struct GNUNET_MessageHeader *join_resp)
 {
   struct Slave *slv = cls;
-  struct Channel *chn = &slv->chn;
+  struct Channel *chn = &slv->channel;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p Got join decision: %d\n",
               slv,
@@ -1033,7 +1074,7 @@
 client_send_mcast_req (struct Master *mst,
                        const struct GNUNET_MULTICAST_RequestHeader *req)
 {
-  struct Channel *chn = &mst->chn;
+  struct Channel *chn = &mst->channel;
 
   struct GNUNET_PSYC_MessageHeader *pmsg;
   uint16_t size = ntohs (req->header.size);
@@ -1090,7 +1131,7 @@
 
   if (NULL == fragq)
   {
-    fragq = GNUNET_new (struct FragmentQueue);
+    fragq = GNUNET_malloc (sizeof (*fragq));
     fragq->state = MSG_FRAG_STATE_HEADER;
     fragq->fragments
       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
@@ -1122,7 +1163,7 @@
                 chn,
                 fragq->header_size,
                 size);
-    cache_entry = GNUNET_new (struct RecvCacheEntry);
+    cache_entry = GNUNET_malloc (sizeof (*cache_entry));
     cache_entry->ref_count = 1;
     cache_entry->mmsg = GNUNET_malloc (size);
     GNUNET_memcpy (cache_entry->mmsg, mmsg, size);
@@ -1305,7 +1346,7 @@
 
 struct StateModifyClosure
 {
-  struct Channel *chn;
+  struct Channel *channel;
   uint64_t msg_id;
   struct GNUNET_HashCode msg_id_hash;
 };
@@ -1316,7 +1357,7 @@
                                 const char *err_msg, uint16_t err_msg_size)
 {
   struct StateModifyClosure *mcls = cls;
-  struct Channel *chn = mcls->chn;
+  struct Channel *chn = mcls->channel;
   uint64_t msg_id = mcls->msg_id;
 
   struct FragmentQueue *
@@ -1434,7 +1475,7 @@
           }
 
           struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
-          mcls->chn = chn;
+          mcls->channel = chn;
           mcls->msg_id = msg_id;
           mcls->msg_id_hash = msg_id_hash;
 
@@ -1604,7 +1645,7 @@
                             uint64_t max_state_message_id)
 {
   struct Master *mst = cls;
-  struct Channel *chn = &mst->chn;
+  struct Channel *chn = &mst->channel;
   chn->store_op = NULL;
 
   struct GNUNET_PSYC_CountersResultMessage res;
@@ -1649,7 +1690,7 @@
                            uint64_t max_state_message_id)
 {
   struct Slave *slv = cls;
-  struct Channel *chn = &slv->chn;
+  struct Channel *chn = &slv->channel;
   chn->store_op = NULL;
 
   struct GNUNET_PSYC_CountersResultMessage res;
@@ -1703,11 +1744,11 @@
  * Handle a connecting client starting a channel master.
  */
 static void
-client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
-                          const struct GNUNET_MessageHeader *msg)
+handle_client_master_start (void *cls,
+                            const struct MasterStartRequest *req)
 {
-  const struct MasterStartRequest *req
-    = (const struct MasterStartRequest *) msg;
+  struct Client *c = cls;
+  struct GNUNET_SERVICE_Client *client = c->client;
 
   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
   struct GNUNET_HashCode pub_key_hash;
@@ -1721,12 +1762,13 @@
 
   if (NULL == mst)
   {
-    mst = GNUNET_new (struct Master);
+    mst = GNUNET_malloc (sizeof (*mst));
     mst->policy = ntohl (req->policy);
     mst->priv_key = req->channel_key;
     mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
 
-    chn = &mst->chn;
+    chn = c->channel = &mst->channel;
+    chn->master = mst;
     chn->is_master = GNUNET_YES;
     chn->pub_key = pub_key;
     chn->pub_key_hash = pub_key_hash;
@@ -1739,17 +1781,15 @@
   }
   else
   {
-    chn = &mst->chn;
+    chn = &mst->channel;
 
-    struct GNUNET_PSYC_CountersResultMessage res;
-    res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
-    res.header.size = htons (sizeof (res));
-    res.result_code = htonl (GNUNET_OK);
-    res.max_message_id = GNUNET_htonll (mst->max_message_id);
+    struct GNUNET_PSYC_CountersResultMessage *res;
+    struct GNUNET_MQ_Envelope *
+      env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
+    res->result_code = htonl (GNUNET_OK);
+    res->max_message_id = GNUNET_htonll (mst->max_message_id);
 
-    GNUNET_SERVER_notification_context_add (nc, client);
-    GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
-                                                GNUNET_NO);
+    GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
   }
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1756,24 +1796,32 @@
               "%p Client connected as master to channel %s.\n",
               mst, GNUNET_h2s (&chn->pub_key_hash));
 
-  struct Client *cli = GNUNET_new (struct Client);
+  struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
   cli->client = client;
   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
 
-  GNUNET_SERVER_client_set_user_context (client, chn);
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  GNUNET_SERVICE_client_continue (client);
 }
 
 
+static int
+check_client_slave_join (void *cls,
+                         const struct SlaveJoinRequest *req)
+{
+  return GNUNET_OK;
+}
+
+
 /**
  * Handle a connecting client joining as a channel slave.
  */
 static void
-client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
-                        const struct GNUNET_MessageHeader *msg)
+handle_client_slave_join (void *cls,
+                          const struct SlaveJoinRequest *req)
 {
-  const struct SlaveJoinRequest *req
-    = (const struct SlaveJoinRequest *) msg;
+  struct Client *c = cls;
+  struct GNUNET_SERVICE_Client *client = c->client;
+
   uint16_t req_size = ntohs (req->header.size);
 
   struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
@@ -1794,7 +1842,7 @@
   }
   if (NULL == slv)
   {
-    slv = GNUNET_new (struct Slave);
+    slv = GNUNET_malloc (sizeof (*slv));
     slv->priv_key = req->slave_key;
     slv->pub_key = slv_pub_key;
     slv->pub_key_hash = slv_pub_hash;
@@ -1825,7 +1873,7 @@
                   join_msg_size,
                   req_size);
       GNUNET_break (0);
-      GNUNET_SERVER_client_disconnect (client);
+      GNUNET_SERVICE_client_drop (client);
       GNUNET_free (slv);
       return;
     }
@@ -1835,7 +1883,8 @@
       GNUNET_memcpy (slv->relays, &req[1], relay_size);
     }
 
-    chn = &slv->chn;
+    chn = c->channel = &slv->channel;
+    chn->slave = slv;
     chn->is_master = GNUNET_NO;
     chn->pub_key = req->channel_pub_key;
     chn->pub_key_hash = pub_key_hash;
@@ -1856,18 +1905,17 @@
   }
   else
   {
-    chn = &slv->chn;
+    chn = &slv->channel;
 
-    struct GNUNET_PSYC_CountersResultMessage res;
-    res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
-    res.header.size = htons (sizeof (res));
-    res.result_code = htonl (GNUNET_OK);
-    res.max_message_id = GNUNET_htonll (chn->max_message_id);
+    struct GNUNET_PSYC_CountersResultMessage *res;
 
-    GNUNET_SERVER_notification_context_add (nc, client);
-    GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
-                                                GNUNET_NO);
+    struct GNUNET_MQ_Envelope *
+      env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
+    res->result_code = htonl (GNUNET_OK);
+    res->max_message_id = GNUNET_htonll (chn->max_message_id);
 
+    GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
+
     if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
     {
       mcast_recv_join_decision (slv, GNUNET_YES,
@@ -1893,10 +1941,9 @@
     }
     else if (NULL != slv->join_dcsn)
     {
-      GNUNET_SERVER_notification_context_add (nc, client);
-      GNUNET_SERVER_notification_context_unicast (nc, client,
-                                                  &slv->join_dcsn->header,
-                                                  GNUNET_NO);
+      struct GNUNET_MQ_Envelope *
+        env = GNUNET_MQ_msg_copy (&slv->join_dcsn->header);
+      GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
     }
   }
 
@@ -1904,12 +1951,11 @@
               "%p Client connected as slave to channel %s.\n",
               slv, GNUNET_h2s (&chn->pub_key_hash));
 
-  struct Client *cli = GNUNET_new (struct Client);
+  struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
   cli->client = client;
   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
 
-  GNUNET_SERVER_client_set_user_context (client, chn);
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  GNUNET_SERVICE_client_continue (client);
 }
 
 
@@ -1935,31 +1981,37 @@
 }
 
 
+static int
+check_client_join_decision (void *cls,
+                            const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
+{
+  return GNUNET_OK;
+}
+
+
 /**
  * Join decision from client.
  */
 static void
-client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
-                           const struct GNUNET_MessageHeader *msg)
+handle_client_join_decision (void *cls,
+                             const struct GNUNET_PSYC_JoinDecisionMessage 
*dcsn)
 {
-  const struct GNUNET_PSYC_JoinDecisionMessage *dcsn
-    = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
-  struct Channel *chn;
-  struct Master *mst;
-  struct JoinDecisionClosure jcls;
-
-  chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+  struct Client *c = cls;
+  struct GNUNET_SERVICE_Client *client = c->client;
+  struct Channel *chn = c->channel;
   if (NULL == chn)
   {
     GNUNET_break (0);
-    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    GNUNET_SERVICE_client_drop (client);
     return;
   }
   GNUNET_assert (GNUNET_YES == chn->is_master);
-  mst = (struct Master *) chn;
+  struct Master *mst = chn->master;
+
+  struct JoinDecisionClosure jcls;
   jcls.is_admitted = ntohl (dcsn->is_admitted);
   jcls.msg
-    = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
+    = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (dcsn->header.size))
     ? (struct GNUNET_MessageHeader *) &dcsn[1]
     : NULL;
 
@@ -1977,7 +2029,7 @@
   GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
                                               &mcast_send_join_decision, 
&jcls);
   GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  GNUNET_SERVICE_client_continue (client);
 }
 
 
@@ -1989,15 +2041,14 @@
  * @param chn The channel struct for the client.
  */
 static void
-send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
+send_message_ack (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
 {
-  struct GNUNET_MessageHeader res;
-  res.size = htons (sizeof (res));
-  res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
+  struct GNUNET_MessageHeader *res;
+  struct GNUNET_MQ_Envelope *
+      env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
 
-  /* FIXME */
-  GNUNET_SERVER_notification_context_add (nc, client);
-  GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
+  /* FIXME? */
+  GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
 }
 
 
@@ -2093,7 +2144,7 @@
 static void
 master_transmit_message (struct Master *mst)
 {
-  struct Channel *chn = &mst->chn;
+  struct Channel *chn = &mst->channel;
   struct TransmitMessage *tmit_msg = chn->tmit_head;
   if (NULL == tmit_msg)
     return;
@@ -2120,13 +2171,13 @@
 static void
 slave_transmit_message (struct Slave *slv)
 {
-  if (NULL == slv->chn.tmit_head)
+  if (NULL == slv->channel.tmit_head)
     return;
   if (NULL == slv->tmit_handle)
   {
     slv->tmit_handle = (void *) &slv->tmit_handle;
     struct GNUNET_MULTICAST_MemberTransmitHandle *
-      tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member, 
slv->chn.tmit_head->id,
+      tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member, 
slv->channel.tmit_head->id,
                                                        slave_transmit_notify, 
slv);
     if (NULL != slv->tmit_handle)
       slv->tmit_handle = tmit_handle;
@@ -2142,8 +2193,8 @@
 transmit_message (struct Channel *chn)
 {
   chn->is_master
-    ? master_transmit_message ((struct Master *) chn)
-    : slave_transmit_message ((struct Slave *) chn);
+    ? master_transmit_message (chn->master)
+    : slave_transmit_message (chn->slave);
 }
 
 
@@ -2226,7 +2277,7 @@
  */
 static struct TransmitMessage *
 queue_message (struct Channel *chn,
-               struct GNUNET_SERVER_Client *client,
+               struct GNUNET_SERVICE_Client *client,
                size_t data_size,
                const void *data,
                uint16_t first_ptype, uint16_t last_ptype)
@@ -2244,8 +2295,8 @@
   GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
 
   chn->is_master
-    ? master_queue_message ((struct Master *) chn, tmit_msg)
-    : slave_queue_message ((struct Slave *) chn, tmit_msg);
+    ? master_queue_message (chn->master, tmit_msg)
+    : slave_queue_message (chn->slave, tmit_msg);
   return tmit_msg;
 }
 
@@ -2257,7 +2308,7 @@
  * @param client  Client the message originates from.
  */
 static void
-transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
+transmit_cancel (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
 {
   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
 
@@ -2272,16 +2323,30 @@
 }
 
 
+static int
+check_client_psyc_message (void *cls,
+                           const struct GNUNET_MessageHeader *msg)
+{
+  return GNUNET_OK;
+}
+
+
 /**
  * Incoming message from a master or slave client.
  */
 static void
-client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
-                          const struct GNUNET_MessageHeader *msg)
+handle_client_psyc_message (void *cls,
+                            const struct GNUNET_MessageHeader *msg)
 {
-  struct Channel *
-    chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
-  GNUNET_assert (NULL != chn);
+  struct Client *c = cls;
+  struct GNUNET_SERVICE_Client *client = c->client;
+  struct Channel *chn = c->channel;
+  if (NULL == chn)
+  {
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (client);
+    return;
+  }
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p Received message from client.\n", chn);
@@ -2292,7 +2357,7 @@
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                 "%p Channel is not ready yet, disconnecting client.\n", chn);
     GNUNET_break (0);
-    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    GNUNET_SERVICE_client_drop (client);
     return;
   }
 
@@ -2306,7 +2371,7 @@
                 (unsigned int) (size - sizeof (*msg)));
     GNUNET_break (0);
     transmit_cancel (chn, client);
-    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    GNUNET_SERVICE_client_drop (client);
     return;
   }
 
@@ -2320,7 +2385,7 @@
                 "%p Received invalid message part from client.\n", chn);
     GNUNET_break (0);
     transmit_cancel (chn, client);
-    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    GNUNET_SERVICE_client_drop (client);
     return;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2332,7 +2397,7 @@
   transmit_message (chn);
   /* FIXME: send a few ACKs even before transmit_notify is called */
 
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  GNUNET_SERVICE_client_continue (client);
 };
 
 
@@ -2348,7 +2413,7 @@
   struct Operation *op = cls;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " 
(%.*s)\n",
-              op->chn,
+              op->channel,
               result,
               (int) err_msg_size,
               err_msg);
@@ -2363,16 +2428,19 @@
  * Client requests to add/remove a slave in the membership database.
  */
 static void
-client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
-                              const struct GNUNET_MessageHeader *msg)
+handle_client_membership_store (void *cls,
+                                const struct ChannelMembershipStoreRequest 
*req)
 {
-  struct Channel *
-    chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
-  GNUNET_assert (NULL != chn);
+  struct Client *c = cls;
+  struct GNUNET_SERVICE_Client *client = c->client;
+  struct Channel *chn = c->channel;
+  if (NULL == chn)
+  {
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (client);
+    return;
+  }
 
-  const struct ChannelMembershipStoreRequest *
-    req = (const struct ChannelMembershipStoreRequest *) msg;
-
   struct Operation *op = op_add (chn, client, req->op_id, 0);
 
   uint64_t announced_at = GNUNET_ntohll (req->announced_at);
@@ -2387,7 +2455,7 @@
                                      req->did_join, announced_at, 
effective_since,
                                      0, /* FIXME: group_generation */
                                      &store_recv_membership_store_result, op);
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  GNUNET_SERVICE_client_continue (client);
 }
 
 
@@ -2405,7 +2473,7 @@
   { /* Requesting client already disconnected. */
     return GNUNET_NO;
   }
-  struct Channel *chn = op->chn;
+  struct Channel *chn = op->channel;
 
   struct GNUNET_PSYC_MessageHeader *pmsg;
   uint16_t msize = ntohs (mmsg->header.size);
@@ -2447,7 +2515,7 @@
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p History replay #%" PRIu64 ": "
               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
-              op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, 
err_msg);
+              op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, 
err_msg);
 
   if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
   {
@@ -2459,20 +2527,32 @@
 }
 
 
+static int
+check_client_history_replay (void *cls,
+                             const struct GNUNET_PSYC_HistoryRequestMessage 
*req)
+{
+  return GNUNET_OK;
+}
+
+
 /**
  * Client requests channel history.
  */
 static void
-client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
-                            const struct GNUNET_MessageHeader *msg)
+handle_client_history_replay (void *cls,
+                              const struct GNUNET_PSYC_HistoryRequestMessage 
*req)
 {
-  struct Channel *
-    chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
-  GNUNET_assert (NULL != chn);
+  struct Client *c = cls;
+  struct GNUNET_SERVICE_Client *client = c->client;
+  struct Channel *chn = c->channel;
+  if (NULL == chn)
+  {
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (client);
+    return;
+  }
 
-  const struct GNUNET_PSYC_HistoryRequestMessage *
-    req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
-  uint16_t size = ntohs (msg->size);
+  uint16_t size = ntohs (req->header.size);
   const char *method_prefix = (const char *) &req[1];
 
   if (size < sizeof (*req) + 1
@@ -2486,7 +2566,7 @@
                 size,
                 (unsigned int) sizeof (*req) + 1);
     GNUNET_break (0);
-    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    GNUNET_SERVICE_client_drop (client);
     return;
   }
 
@@ -2493,6 +2573,7 @@
   struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
 
   if (0 == req->message_limit)
+  {
     GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
                                   GNUNET_ntohll (req->start_message_id),
                                   GNUNET_ntohll (req->end_message_id),
@@ -2499,7 +2580,9 @@
                                   0, method_prefix,
                                   &store_recv_fragment_history,
                                   &store_recv_fragment_history_result, op);
+  }
   else
+  {
     GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
                                          GNUNET_ntohll (req->message_limit),
                                          method_prefix,
@@ -2506,8 +2589,8 @@
                                          &store_recv_fragment_history,
                                          &store_recv_fragment_history_result,
                                          op);
-
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  }
+  GNUNET_SERVICE_client_continue (client);
 }
 
 
@@ -2520,18 +2603,19 @@
 {
   struct Operation *op = cls;
   struct GNUNET_OperationResultMessage *res;
+  struct GNUNET_MQ_Envelope *env;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
-              op->chn, GNUNET_ntohll (op->op_id), name);
+              op->channel, GNUNET_ntohll (op->op_id), name);
 
   if (NULL != name) /* First part */
   {
     uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
     struct GNUNET_PSYC_MessageModifier *mod;
-    res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + 
value_size);
-    res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + 
value_size);
-    res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
+    env = GNUNET_MQ_msg_extra (res,
+                               sizeof (*mod) + name_size + value_size,
+                               GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
     res->op_id = op->op_id;
 
     mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
@@ -2546,9 +2630,9 @@
   else /* Continuation */
   {
     struct GNUNET_MessageHeader *mod;
-    res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
-    res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size);
-    res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
+    env = GNUNET_MQ_msg_extra (res,
+                               sizeof (*mod) + value_size,
+                               GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
     res->op_id = op->op_id;
 
     mod = (struct GNUNET_MessageHeader *) &res[1];
@@ -2558,10 +2642,7 @@
   }
 
   // FIXME: client might have been disconnected
-  GNUNET_SERVER_notification_context_add (nc, op->client);
-  GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
-                                              GNUNET_NO);
-  GNUNET_free (res);
+  GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (op->client), env);
   return GNUNET_YES;
 }
 
@@ -2578,7 +2659,7 @@
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p state_get #%" PRIu64 ": "
               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
-              op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, 
err_msg);
+              op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, 
err_msg);
 
   // FIXME: client might have been disconnected
   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
@@ -2586,97 +2667,94 @@
 }
 
 
-/**
- * Client requests best matching state variable from PSYCstore.
- */
-static void
-client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
-                       const struct GNUNET_MessageHeader *msg)
+static int
+check_client_state_get (void *cls,
+                         const struct StateRequest *req)
 {
-  struct Channel *
-    chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
-  GNUNET_assert (NULL != chn);
+  struct Client *c = cls;
+  struct Channel *chn = c->channel;
+  if (NULL == chn)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
 
-  const struct StateRequest *
-    req = (const struct StateRequest *) msg;
-
   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
   const char *name = (const char *) &req[1];
   if (0 == name_size || '\0' != name[name_size - 1])
   {
     GNUNET_break (0);
-    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
-    return;
+    return GNUNET_SYSERR;
   }
 
+  return GNUNET_OK;
+}
+
+
+/**
+ * Client requests best matching state variable from PSYCstore.
+ */
+static void
+handle_client_state_get (void *cls,
+                         const struct StateRequest *req)
+{
+  struct Client *c = cls;
+  struct GNUNET_SERVICE_Client *client = c->client;
+  struct Channel *chn = c->channel;
+
+  const char *name = (const char *) &req[1];
   struct Operation *op = op_add (chn, client, req->op_id, 0);
   GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
                               &store_recv_state_var,
                               &store_recv_state_result, op);
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  GNUNET_SERVICE_client_continue (client);
 }
 
 
-/**
- * Client requests state variables with a given prefix from PSYCstore.
- */
-static void
-client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
-                              const struct GNUNET_MessageHeader *msg)
+static int
+check_client_state_get_prefix (void *cls,
+                               const struct StateRequest *req)
 {
-  struct Channel *
-    chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
-  GNUNET_assert (NULL != chn);
+  struct Client *c = cls;
+  struct Channel *chn = c->channel;
+  if (NULL == chn)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
 
-  const struct StateRequest *
-    req = (const struct StateRequest *) msg;
-
   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
   const char *name = (const char *) &req[1];
   if (0 == name_size || '\0' != name[name_size - 1])
   {
     GNUNET_break (0);
-    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
-    return;
+    return GNUNET_SYSERR;
   }
 
+  return GNUNET_OK;
+}
+
+
+/**
+ * Client requests state variables with a given prefix from PSYCstore.
+ */
+static void
+handle_client_state_get_prefix (void *cls,
+                                const struct StateRequest *req)
+{
+  struct Client *c = cls;
+  struct GNUNET_SERVICE_Client *client = c->client;
+  struct Channel *chn = c->channel;
+
+  const char *name = (const char *) &req[1];
   struct Operation *op = op_add (chn, client, req->op_id, 0);
   GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
                                      &store_recv_state_var,
                                      &store_recv_state_result, op);
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  GNUNET_SERVICE_client_continue (client);
 }
 
 
-static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
-  { &client_recv_master_start, NULL,
-    GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
-
-  { &client_recv_slave_join, NULL,
-    GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
-
-  { &client_recv_join_decision, NULL,
-    GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
-
-  { &client_recv_psyc_message, NULL,
-    GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
-
-  { &client_recv_membership_store, NULL,
-    GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
-
-  { &client_recv_history_replay, NULL,
-    GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
-
-  { &client_recv_state_get, NULL,
-    GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
-
-  { &client_recv_state_get_prefix, NULL,
-    GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
-
-  { NULL, NULL, 0, 0 }
-};
-
-
 /**
  * Initialize the PSYC service.
  *
@@ -2685,10 +2763,12 @@
  * @param c Configuration to use.
  */
 static void
-run (void *cls, struct GNUNET_SERVER_Handle *server,
-     const struct GNUNET_CONFIGURATION_Handle *c)
+run (void *cls,
+     const struct GNUNET_CONFIGURATION_Handle *c,
+     struct GNUNET_SERVICE_Handle *svc)
 {
   cfg = c;
+  service = svc;
   store = GNUNET_PSYCSTORE_connect (cfg);
   stats = GNUNET_STATISTICS_create ("psyc", cfg);
   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
@@ -2695,27 +2775,51 @@
   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
   channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
-  nc = GNUNET_SERVER_notification_context_create (server, 1);
-  GNUNET_SERVER_add_handlers (server, server_handlers);
-  GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
 }
 
 
 /**
- * The main function for the service.
- *
- * @param argc number of arguments from the command line
- * @param argv command line arguments
- * @return 0 ok, 1 on error
+ * Define "main" method using service macro.
  */
-int
-main (int argc, char *const *argv)
-{
-  return (GNUNET_OK ==
-          GNUNET_SERVICE_run (argc, argv, "psyc",
-                             GNUNET_SERVICE_OPTION_NONE,
-                              &run, NULL)) ? 0 : 1;
-}
+GNUNET_SERVICE_MAIN
+("psyc",
+ GNUNET_SERVICE_OPTION_NONE,
+ run,
+ client_notify_connect,
+ client_notify_disconnect,
+ NULL,
+ GNUNET_MQ_hd_fixed_size (client_master_start,
+                          GNUNET_MESSAGE_TYPE_PSYC_MASTER_START,
+                          struct MasterStartRequest,
+                          NULL),
+ GNUNET_MQ_hd_var_size (client_slave_join,
+                        GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN,
+                        struct SlaveJoinRequest,
+                        NULL),
+ GNUNET_MQ_hd_var_size (client_join_decision,
+                        GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
+                        struct GNUNET_PSYC_JoinDecisionMessage,
+                        NULL),
+ GNUNET_MQ_hd_var_size (client_psyc_message,
+                        GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
+                        struct GNUNET_MessageHeader,
+                        NULL),
+ GNUNET_MQ_hd_fixed_size (client_membership_store,
+                          GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE,
+                          struct ChannelMembershipStoreRequest,
+                          NULL),
+ GNUNET_MQ_hd_var_size (client_history_replay,
+                        GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY,
+                        struct GNUNET_PSYC_HistoryRequestMessage,
+                        NULL),
+ GNUNET_MQ_hd_var_size (client_state_get,
+                        GNUNET_MESSAGE_TYPE_PSYC_STATE_GET,
+                        struct StateRequest,
+                        NULL),
+ GNUNET_MQ_hd_var_size (client_state_get_prefix,
+                        GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX,
+                        struct StateRequest,
+                        NULL));
 
 /* end of gnunet-service-psyc.c */




reply via email to

[Prev in Thread] Current Thread [Next in Thread]