gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet] branch master updated: -DHT: deduplicate monitor matching logic


From: gnunet
Subject: [gnunet] branch master updated: -DHT: deduplicate monitor matching logic
Date: Sun, 02 Jan 2022 20:46:09 +0100

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository gnunet.

The following commit(s) were added to refs/heads/master by this push:
     new c891e4d29 -DHT: deduplicate monitor matching logic
c891e4d29 is described below

commit c891e4d29ca772b6b246b928a1bda8d8c9ef499f
Author: Christian Grothoff <christian@grothoff.org>
AuthorDate: Sun Jan 2 20:46:06 2022 +0100

    -DHT: deduplicate monitor matching logic
---
 src/dht/gnunet-service-dht.h         |   2 +-
 src/dht/gnunet-service-dht_clients.c | 539 +++++++++++++++++++++--------------
 2 files changed, 323 insertions(+), 218 deletions(-)

diff --git a/src/dht/gnunet-service-dht.h b/src/dht/gnunet-service-dht.h
index d520cc905..e9b1ff63a 100644
--- a/src/dht/gnunet-service-dht.h
+++ b/src/dht/gnunet-service-dht.h
@@ -89,7 +89,7 @@ GDS_CLIENTS_handle_reply (const struct 
GDS_DATACACHE_BlockData *bd,
  * @param key Key of the requested data.
  */
 void
-GDS_CLIENTS_process_get (uint32_t options,
+GDS_CLIENTS_process_get (enum GNUNET_DHT_RouteOption options,
                          enum GNUNET_BLOCK_Type type,
                          uint32_t hop_count,
                          uint32_t desired_replication_level,
diff --git a/src/dht/gnunet-service-dht_clients.c 
b/src/dht/gnunet-service-dht_clients.c
index b520cda41..8acde2fe7 100644
--- a/src/dht/gnunet-service-dht_clients.c
+++ b/src/dht/gnunet-service-dht_clients.c
@@ -855,78 +855,6 @@ handle_dht_local_get_stop (
 }
 
 
-/**
- * Handler for monitor start messages
- *
- * @param cls the client we received this message from
- * @param msg the actual message received
- *
- */
-static void
-handle_dht_local_monitor (void *cls,
-                          const struct GNUNET_DHT_MonitorStartStopMessage *msg)
-{
-  struct ClientHandle *ch = cls;
-  struct ClientMonitorRecord *r;
-
-  r = GNUNET_new (struct ClientMonitorRecord);
-  r->ch = ch;
-  r->type = ntohl (msg->type);
-  r->get = ntohs (msg->get);
-  r->get_resp = ntohs (msg->get_resp);
-  r->put = ntohs (msg->put);
-  if (0 != ntohs (msg->filter_key))
-    r->key = msg->key;
-  GNUNET_CONTAINER_DLL_insert (monitor_head,
-                               monitor_tail,
-                               r);
-  GNUNET_SERVICE_client_continue (ch->client);
-}
-
-
-/**
- * Handler for monitor stop messages
- *
- * @param cls the client we received this message from
- * @param msg the actual message received
- */
-static void
-handle_dht_local_monitor_stop (
-  void *cls,
-  const struct GNUNET_DHT_MonitorStartStopMessage *msg)
-{
-  struct ClientHandle *ch = cls;
-
-  GNUNET_SERVICE_client_continue (ch->client);
-  for (struct ClientMonitorRecord *r = monitor_head;
-       NULL != r;
-       r = r->next)
-  {
-    bool keys_match;
-
-    keys_match =
-      (GNUNET_is_zero (&r->key))
-      ? (0 == ntohs (msg->filter_key))
-      : ( (0 != ntohs (msg->filter_key)) &&
-          (! GNUNET_memcmp (&r->key,
-                            &msg->key)) );
-    if ( (ch == r->ch) &&
-         (ntohl (msg->type) == r->type) &&
-         (r->get == msg->get) &&
-         (r->get_resp == msg->get_resp) &&
-         (r->put == msg->put) &&
-         keys_match)
-    {
-      GNUNET_CONTAINER_DLL_remove (monitor_head,
-                                   monitor_tail,
-                                   r);
-      GNUNET_free (r);
-      return;     /* Delete only ONE entry */
-    }
-  }
-}
-
-
 /**
  * Closure for #forward_reply()
  */
@@ -1132,26 +1060,106 @@ GDS_CLIENTS_handle_reply (const struct 
GDS_DATACACHE_BlockData *bd,
 }
 
 
+/* ************* logic for monitors ************** */
+
+
 /**
- * Check if some client is monitoring GET messages and notify
- * them in that case.  If tracked, @a path should include the local peer.
+ * Handler for monitor start messages
+ *
+ * @param cls the client we received this message from
+ * @param msg the actual message received
  *
- * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
- * @param type The type of data in the request.
- * @param hop_count Hop count so far.
- * @param path_length number of entries in path (or 0 if not recorded).
- * @param path peers on the GET path (or NULL if not recorded).
- * @param desired_replication_level Desired replication level.
- * @param key Key of the requested data.
  */
-void
-GDS_CLIENTS_process_get (uint32_t options,
-                         enum GNUNET_BLOCK_Type type,
-                         uint32_t hop_count,
-                         uint32_t desired_replication_level,
-                         unsigned int path_length,
-                         const struct GNUNET_PeerIdentity *path,
-                         const struct GNUNET_HashCode *key)
+static void
+handle_dht_local_monitor (void *cls,
+                          const struct GNUNET_DHT_MonitorStartStopMessage *msg)
+{
+  struct ClientHandle *ch = cls;
+  struct ClientMonitorRecord *r;
+
+  r = GNUNET_new (struct ClientMonitorRecord);
+  r->ch = ch;
+  r->type = ntohl (msg->type);
+  r->get = ntohs (msg->get);
+  r->get_resp = ntohs (msg->get_resp);
+  r->put = ntohs (msg->put);
+  if (0 != ntohs (msg->filter_key))
+    r->key = msg->key;
+  GNUNET_CONTAINER_DLL_insert (monitor_head,
+                               monitor_tail,
+                               r);
+  GNUNET_SERVICE_client_continue (ch->client);
+}
+
+
+/**
+ * Handler for monitor stop messages
+ *
+ * @param cls the client we received this message from
+ * @param msg the actual message received
+ */
+static void
+handle_dht_local_monitor_stop (
+  void *cls,
+  const struct GNUNET_DHT_MonitorStartStopMessage *msg)
+{
+  struct ClientHandle *ch = cls;
+
+  GNUNET_SERVICE_client_continue (ch->client);
+  for (struct ClientMonitorRecord *r = monitor_head;
+       NULL != r;
+       r = r->next)
+  {
+    bool keys_match;
+
+    keys_match =
+      (GNUNET_is_zero (&r->key))
+      ? (0 == ntohs (msg->filter_key))
+      : ( (0 != ntohs (msg->filter_key)) &&
+          (! GNUNET_memcmp (&r->key,
+                            &msg->key)) );
+    if ( (ch == r->ch) &&
+         (ntohl (msg->type) == r->type) &&
+         (r->get == msg->get) &&
+         (r->get_resp == msg->get_resp) &&
+         (r->put == msg->put) &&
+         keys_match)
+    {
+      GNUNET_CONTAINER_DLL_remove (monitor_head,
+                                   monitor_tail,
+                                   r);
+      GNUNET_free (r);
+      return;     /* Delete only ONE entry */
+    }
+  }
+}
+
+
+/**
+ * Function to call by #for_matching_monitors().
+ *
+ * @param cls closure
+ * @param m a matching monitor
+ */
+typedef void
+(*MonitorAction)(void *cls,
+                 struct ClientMonitorRecord *m);
+
+
+/**
+ * Call @a cb on all monitors that watch for blocks of @a type
+ * and key @a key.
+ *
+ * @param type the type to match
+ * @param key the key to match
+ * @param cb function to call
+ * @param cb_cls closure for @a cb
+ */
+static void
+for_matching_monitors (enum GNUNET_BLOCK_Type type,
+                       const struct GNUNET_HashCode *key,
+                       MonitorAction cb,
+                       void *cb_cls)
 {
   struct ClientHandle **cl = NULL;
   unsigned int cl_size = 0;
@@ -1161,16 +1169,12 @@ GDS_CLIENTS_process_get (uint32_t options,
        m = m->next)
   {
     if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) ||
-           (m->type == type)) &&
+           (m->type == type) ) &&
          ( (GNUNET_is_zero (&m->key)) ||
            (0 ==
             GNUNET_memcmp (key,
-                           &m->key))))
+                           &m->key)) ) )
     {
-      struct GNUNET_MQ_Envelope *env;
-      struct GNUNET_DHT_MonitorGetMessage *mmsg;
-      struct GNUNET_PeerIdentity *msg_path;
-      size_t msize;
       unsigned int i;
 
       /* Don't send duplicates */
@@ -1182,87 +1186,230 @@ GDS_CLIENTS_process_get (uint32_t options,
       GNUNET_array_append (cl,
                            cl_size,
                            m->ch);
-      msize = path_length * sizeof(struct GNUNET_PeerIdentity);
-      env = GNUNET_MQ_msg_extra (mmsg,
-                                 msize,
-                                 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
-      mmsg->options = htonl (options);
-      mmsg->type = htonl (type);
-      mmsg->hop_count = htonl (hop_count);
-      mmsg->desired_replication_level = htonl (desired_replication_level);
-      mmsg->get_path_length = htonl (path_length);
-      mmsg->key = *key;
-      msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
-      GNUNET_memcpy (msg_path,
-                     path,
-                     path_length * sizeof(struct GNUNET_PeerIdentity));
-      GNUNET_MQ_send (m->ch->mq,
-                      env);
+      cb (cb_cls,
+          m);
     }
   }
   GNUNET_free (cl);
 }
 
 
+/**
+ * Closure for #get_action();
+ */
+struct GetActionContext
+{
+  enum GNUNET_DHT_RouteOption options;
+  enum GNUNET_BLOCK_Type type;
+  uint32_t hop_count;
+  uint32_t desired_replication_level;
+  unsigned int get_path_length;
+  const struct GNUNET_PeerIdentity *get_path;
+  const struct GNUNET_HashCode *key;
+};
+
+
+/**
+ * Function called on monitors that match a GET.
+ * Sends the GET notification to the monitor.
+ *
+ * @param cls a `struct GetActionContext`
+ * @param m a matching monitor
+ */
+static void
+get_action (void *cls,
+            struct ClientMonitorRecord *m)
+{
+  struct GetActionContext *gac = cls;
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_DHT_MonitorGetMessage *mmsg;
+  struct GNUNET_PeerIdentity *msg_path;
+  size_t msize;
+
+  msize = gac->get_path_length * sizeof(struct GNUNET_PeerIdentity);
+  env = GNUNET_MQ_msg_extra (mmsg,
+                             msize,
+                             GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
+  mmsg->options = htonl (gac->options);
+  mmsg->type = htonl (gac->type);
+  mmsg->hop_count = htonl (gac->hop_count);
+  mmsg->desired_replication_level = htonl (gac->desired_replication_level);
+  mmsg->get_path_length = htonl (gac->get_path_length);
+  mmsg->key = *gac->key;
+  msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
+  GNUNET_memcpy (msg_path,
+                 gac->get_path,
+                 gac->get_path_length * sizeof(struct GNUNET_PeerIdentity));
+  GNUNET_MQ_send (m->ch->mq,
+                  env);
+}
+
+
+/**
+ * Check if some client is monitoring GET messages and notify
+ * them in that case.  If tracked, @a path should include the local peer.
+ *
+ * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
+ * @param type The type of data in the request.
+ * @param hop_count Hop count so far.
+ * @param path_length number of entries in path (or 0 if not recorded).
+ * @param path peers on the GET path (or NULL if not recorded).
+ * @param desired_replication_level Desired replication level.
+ * @param key Key of the requested data.
+ */
+void
+GDS_CLIENTS_process_get (enum GNUNET_DHT_RouteOption options,
+                         enum GNUNET_BLOCK_Type type,
+                         uint32_t hop_count,
+                         uint32_t desired_replication_level,
+                         unsigned int path_length,
+                         const struct GNUNET_PeerIdentity *path,
+                         const struct GNUNET_HashCode *key)
+{
+  struct GetActionContext gac = {
+    .options = options,
+    .type = type,
+    .hop_count = hop_count,
+    .desired_replication_level = desired_replication_level,
+    .get_path_length = path_length,
+    .get_path = path,
+    .key = key
+  };
+
+  for_matching_monitors (type,
+                         key,
+                         &get_action,
+                         &gac);
+}
+
+
+/**
+ * Closure for response_action().
+ */
+struct ResponseActionContext
+{
+  const struct GDS_DATACACHE_BlockData *bd;
+  const struct GNUNET_PeerIdentity *get_path;
+  unsigned int get_path_length;
+};
+
+
+/**
+ * Function called on monitors that match a response.
+ * Sends the response notification to the monitor.
+ *
+ * @param cls a `struct ResponseActionContext`
+ * @param m a matching monitor
+ */
+static void
+response_action (void *cls,
+                 struct ClientMonitorRecord *m)
+{
+  const struct ResponseActionContext *resp_ctx = cls;
+  const struct GDS_DATACACHE_BlockData *bd = resp_ctx->bd;
+
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
+  struct GNUNET_PeerIdentity *path;
+  size_t msize;
+
+  msize = bd->data_size;
+  msize += (resp_ctx->get_path_length + bd->put_path_length)
+           * sizeof(struct GNUNET_PeerIdentity);
+  env = GNUNET_MQ_msg_extra (mmsg,
+                             msize,
+                             GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
+  mmsg->type = htonl (bd->type);
+  mmsg->put_path_length = htonl (bd->put_path_length);
+  mmsg->get_path_length = htonl (resp_ctx->get_path_length);
+  mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time);
+  mmsg->key = bd->key;
+  path = (struct GNUNET_PeerIdentity *) &mmsg[1];
+  GNUNET_memcpy (path,
+                 bd->put_path,
+                 bd->put_path_length * sizeof(struct GNUNET_PeerIdentity));
+  GNUNET_memcpy (path,
+                 resp_ctx->get_path,
+                 resp_ctx->get_path_length * sizeof(struct
+                                                    GNUNET_PeerIdentity));
+  GNUNET_memcpy (&path[resp_ctx->get_path_length],
+                 bd->data,
+                 bd->data_size);
+  GNUNET_MQ_send (m->ch->mq,
+                  env);
+}
+
+
 void
 GDS_CLIENTS_process_get_resp (const struct GDS_DATACACHE_BlockData *bd,
                               const struct GNUNET_PeerIdentity *get_path,
                               unsigned int get_path_length)
 {
-  struct ClientHandle **cl = NULL;
-  unsigned int cl_size = 0;
+  struct ResponseActionContext rac = {
+    .bd = bd,
+    .get_path = get_path,
+    .get_path_length = get_path_length
+  };
 
-  for (struct ClientMonitorRecord *m = monitor_head;
-       NULL != m;
-       m = m->next)
-  {
-    if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) ||
-           (m->type == bd->type) ) &&
-         ( (GNUNET_is_zero (&m->key)) ||
-           (0 == GNUNET_memcmp (&bd->key,
-                                &m->key)) ) )
-    {
-      struct GNUNET_MQ_Envelope *env;
-      struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
-      struct GNUNET_PeerIdentity *path;
-      size_t msize;
-      unsigned int i;
+  for_matching_monitors (bd->type,
+                         &bd->key,
+                         &response_action,
+                         &rac);
+}
 
-      /* Don't send duplicates */
-      for (i = 0; i < cl_size; i++)
-        if (cl[i] == m->ch)
-          break;
-      if (i < cl_size)
-        continue;
-      GNUNET_array_append (cl,
-                           cl_size,
-                           m->ch);
-      msize = bd->data_size;
-      msize += (get_path_length + bd->put_path_length)
-               * sizeof(struct GNUNET_PeerIdentity);
-      env = GNUNET_MQ_msg_extra (mmsg,
-                                 msize,
-                                 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
-      mmsg->type = htonl (bd->type);
-      mmsg->put_path_length = htonl (bd->put_path_length);
-      mmsg->get_path_length = htonl (get_path_length);
-      mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time);
-      mmsg->key = bd->key;
-      path = (struct GNUNET_PeerIdentity *) &mmsg[1];
-      GNUNET_memcpy (path,
-                     bd->put_path,
-                     bd->put_path_length * sizeof(struct GNUNET_PeerIdentity));
-      GNUNET_memcpy (path,
-                     get_path,
-                     get_path_length * sizeof(struct GNUNET_PeerIdentity));
-      GNUNET_memcpy (&path[get_path_length],
-                     bd->data,
-                     bd->data_size);
-      GNUNET_MQ_send (m->ch->mq,
-                      env);
-    }
-  }
-  GNUNET_free (cl);
+
+/**
+ * Closure for put_action().
+ */
+struct PutActionContext
+{
+  const struct GDS_DATACACHE_BlockData *bd;
+  enum GNUNET_DHT_RouteOption options;
+  uint32_t hop_count;
+  uint32_t desired_replication_level;
+};
+
+
+/**
+ * Function called on monitors that match a PUT.
+ * Sends the PUT notification to the monitor.
+ *
+ * @param cls a `struct PutActionContext`
+ * @param m a matching monitor
+ */
+static void
+put_action (void *cls,
+            struct ClientMonitorRecord *m)
+{
+  const struct PutActionContext *put_ctx = cls;
+  const struct GDS_DATACACHE_BlockData *bd = put_ctx->bd;
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_DHT_MonitorPutMessage *mmsg;
+  struct GNUNET_PeerIdentity *msg_path;
+  size_t msize;
+
+  msize = bd->data_size
+          + bd->put_path_length
+          * sizeof(struct GNUNET_PeerIdentity);
+  env = GNUNET_MQ_msg_extra (mmsg,
+                             msize,
+                             GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
+  mmsg->options = htonl (put_ctx->options);
+  mmsg->type = htonl (bd->type);
+  mmsg->hop_count = htonl (put_ctx->hop_count);
+  mmsg->desired_replication_level = htonl (put_ctx->desired_replication_level);
+  mmsg->put_path_length = htonl (bd->put_path_length);
+  mmsg->key = bd->key;
+  mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time);
+  msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
+  GNUNET_memcpy (msg_path,
+                 bd->put_path,
+                 bd->put_path_length * sizeof(struct GNUNET_PeerIdentity));
+  GNUNET_memcpy (&msg_path[bd->put_path_length],
+                 bd->data,
+                 bd->data_size);
+  GNUNET_MQ_send (m->ch->mq,
+                  env);
 }
 
 
@@ -1272,59 +1419,17 @@ GDS_CLIENTS_process_put (enum GNUNET_DHT_RouteOption 
options,
                          uint32_t hop_count,
                          uint32_t desired_replication_level)
 {
-  struct ClientHandle **cl = NULL;
-  unsigned int cl_size = 0;
-
-  for (struct ClientMonitorRecord *m = monitor_head;
-       NULL != m;
-       m = m->next)
-  {
-    if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) ||
-           (m->type == bd->type) ) &&
-         ( (GNUNET_is_zero (&m->key)) ||
-           (0 ==
-            GNUNET_memcmp (&bd->key,
-                           &m->key)) ) )
-    {
-      struct GNUNET_MQ_Envelope *env;
-      struct GNUNET_DHT_MonitorPutMessage *mmsg;
-      struct GNUNET_PeerIdentity *msg_path;
-      size_t msize;
-      unsigned int i;
+  struct PutActionContext put_ctx = {
+    .bd = bd,
+    .hop_count = hop_count,
+    .desired_replication_level = desired_replication_level,
+    .options = options
+  };
 
-      /* Don't send duplicates */
-      for (i = 0; i < cl_size; i++)
-        if (cl[i] == m->ch)
-          break;
-      if (i < cl_size)
-        continue;
-      GNUNET_array_append (cl,
-                           cl_size,
-                           m->ch);
-      msize = bd->data_size;
-      msize += bd->put_path_length * sizeof(struct GNUNET_PeerIdentity);
-      env = GNUNET_MQ_msg_extra (mmsg,
-                                 msize,
-                                 GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
-      mmsg->options = htonl (options);
-      mmsg->type = htonl (bd->type);
-      mmsg->hop_count = htonl (hop_count);
-      mmsg->desired_replication_level = htonl (desired_replication_level);
-      mmsg->put_path_length = htonl (bd->put_path_length);
-      mmsg->key = bd->key;
-      mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time);
-      msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
-      GNUNET_memcpy (msg_path,
-                     bd->put_path,
-                     bd->put_path_length * sizeof(struct GNUNET_PeerIdentity));
-      GNUNET_memcpy (&msg_path[bd->put_path_length],
-                     bd->data,
-                     bd->data_size);
-      GNUNET_MQ_send (m->ch->mq,
-                      env);
-    }
-  }
-  GNUNET_free (cl);
+  for_matching_monitors (bd->type,
+                         &bd->key,
+                         &put_action,
+                         &put_ctx);
 }
 
 
@@ -1334,7 +1439,7 @@ GDS_CLIENTS_process_put (enum GNUNET_DHT_RouteOption 
options,
  * @param server the initialized server
  */
 static void
-GDS_CLIENTS_init ()
+GDS_CLIENTS_init (void)
 {
   forward_map
     = GNUNET_CONTAINER_multihashmap_create (1024,
@@ -1348,7 +1453,7 @@ GDS_CLIENTS_init ()
  * Shutdown client subsystem.
  */
 static void
-GDS_CLIENTS_stop ()
+GDS_CLIENTS_stop (void)
 {
   if (NULL != retry_task)
   {

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]