gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r11387 - gnunet/src/fs


From: gnunet
Subject: [GNUnet-SVN] r11387 - gnunet/src/fs
Date: Sat, 15 May 2010 21:07:15 +0200

Author: grothoff
Date: 2010-05-15 21:07:15 +0200 (Sat, 15 May 2010)
New Revision: 11387

Modified:
   gnunet/src/fs/fs.h
   gnunet/src/fs/gnunet-service-fs.c
Log:
towards migration

Modified: gnunet/src/fs/fs.h
===================================================================
--- gnunet/src/fs/fs.h  2010-05-15 17:50:41 UTC (rev 11386)
+++ gnunet/src/fs/fs.h  2010-05-15 19:07:15 UTC (rev 11387)
@@ -37,6 +37,11 @@
 #define MAX_DATASTORE_QUEUE 16
 
 /**
+ * Maximum number of blocks we keep in memory for migration.
+ */
+#define MAX_MIGRATION_QUEUE 32
+
+/**
  * Size of the individual blocks used for file-sharing.
  */
 #define DBLOCK_SIZE (32*1024)

Modified: gnunet/src/fs/gnunet-service-fs.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs.c   2010-05-15 17:50:41 UTC (rev 11386)
+++ gnunet/src/fs/gnunet-service-fs.c   2010-05-15 19:07:15 UTC (rev 11387)
@@ -560,6 +560,44 @@
 
 
 /**
+ * Block that is ready for migration to other peers.  Actual data is at the 
end of the block.
+ */
+struct MigrationReadyBlock
+{
+
+  /**
+   * This is a doubly-linked list.
+   */
+  struct MigrationReadyBlock *next;
+
+  /**
+   * This is a doubly-linked list.
+   */
+  struct MigrationReadyBlock *prev;
+
+  /**
+   * Query for the block.
+   */
+  GNUNET_HashCode query;
+
+  /**
+   * When does this block expire? 
+   */
+  struct GNUNET_TIME_Absolute expiration;
+
+  /**
+   * Size of the block.
+   */
+  size_t size;
+
+  /**
+   * Type of the block.
+   */
+  enum GNUNET_BLOCK_Type type;
+};
+
+
+/**
  * Our scheduler.
  */
 static struct GNUNET_SCHEDULER_Handle *sched;
@@ -611,6 +649,37 @@
 static struct GNUNET_CORE_Handle *core;
 
 /**
+ * Head of linked list of blocks that can be migrated.
+ */
+static struct MigrationReadyBlock *mig_head;
+
+/**
+ * Tail of linked list of blocks that can be migrated.
+ */
+static struct MigrationReadyBlock *mig_tail;
+
+/**
+ * Request to datastore for migration (or NULL).
+ */
+static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
+
+/**
+ * ID of task that collects blocks for migration.
+ */
+static GNUNET_SCHEDULER_TaskIdentifier mig_task;
+
+/**
+ * What is the maximum frequency at which we are allowed to
+ * poll the datastore for migration content?
+ */
+static struct GNUNET_TIME_Relative min_migration_delay;
+
+/**
+ * Size of the doubly-linked list of migration blocks.
+ */
+static unsigned int mig_size;
+
+/**
  * Are we allowed to migrate content to this peer.
  */
 static int active_migration;
@@ -619,6 +688,141 @@
 
 
 /**
+ * Delete the given migration block.
+ *
+ * @param mb block to delete
+ */
+static void
+delete_migration_block (struct MigrationReadyBlock *mb)
+{
+  GNUNET_CONTAINER_DLL_remove (mig_head,
+                              mig_tail,
+                              mb);
+  mig_size--;
+  GNUNET_free (mb);
+}
+
+
+/**
+ * Consider migrating content to a given peer.
+ *
+ * @param cls not used
+ * @param key ID of the peer (not used)
+ * @param value 'struct ConnectedPeer' of the peer
+ * @return GNUNET_YES (always continue iteration)2
+ */
+static int
+consider_migration (void *cls,
+                   const GNUNET_HashCode *key,
+                   void *value)
+{
+  struct ConnectedPeer *cp = value;
+  
+  if (cp->cth != NULL)
+    return GNUNET_YES; /* or what? */
+  /* FIXME: not implemented! */
+  return GNUNET_YES;
+}
+
+
+
+
+/**
+ * Task that is run periodically to obtain blocks for content
+ * migration
+ * 
+ * @param cls unused
+ * @param tc scheduler context (also unused)
+ */
+static void
+gather_migration_blocks (void *cls,
+                        const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Process content offered for migration.
+ *
+ * @param cls closure
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ *        maybe 0 if no unique identifier is available
+ */
+static void
+process_migration_content (void *cls,
+                          const GNUNET_HashCode * key,
+                          uint32_t size,
+                          const void *data,
+                          enum GNUNET_BLOCK_Type type,
+                          uint32_t priority,
+                          uint32_t anonymity,
+                          struct GNUNET_TIME_Absolute
+                          expiration, uint64_t uid)
+{
+  struct MigrationReadyBlock *mb;
+  struct GNUNET_TIME_Relative delay;
+  
+  if (key == NULL)
+    {
+      mig_qe = NULL;
+      if (mig_size < MAX_MIGRATION_QUEUE)  
+       {
+         delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+                                                mig_size);
+         delay = GNUNET_TIME_relative_divide (GNUNET_TIME_UNIT_SECONDS,
+                                              MAX_MIGRATION_QUEUE);
+         delay = GNUNET_TIME_relative_max (delay,
+                                           min_migration_delay);
+         mig_task = GNUNET_SCHEDULER_add_delayed (sched,
+                                                  delay,
+                                                  &gather_migration_blocks,
+                                                  NULL);
+       }
+      return;
+    }
+  mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
+  mb->query = *key;
+  mb->expiration = expiration;
+  mb->size = size;
+  mb->type = type;
+  memcpy (&mb[1], data, size);
+  GNUNET_CONTAINER_DLL_insert_after (mig_head,
+                                    mig_tail,
+                                    mig_tail,
+                                    mb);
+  mig_size++;
+  if (mig_size == 1)
+    GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+                                          &consider_migration,
+                                          NULL);
+  GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+}
+
+
+/**
+ * Task that is run periodically to obtain blocks for content
+ * migration
+ * 
+ * @param cls unused
+ * @param tc scheduler context (also unused)
+ */
+static void
+gather_migration_blocks (void *cls,
+                        const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  mig_task = GNUNET_SCHEDULER_NO_TASK;
+  mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, -1,
+                                       GNUNET_TIME_UNIT_FOREVER_REL,
+                                       &process_migration_content, NULL);
+}
+
+
+/**
  * We're done with a particular message list entry.
  * Free all associated resources.
  * 
@@ -782,9 +986,12 @@
                                                   &peer->hashPubKey,
                                                   cp,
                                                   
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  if (mig_size > 0)
+    (void) consider_migration (NULL, &peer->hashPubKey, cp);
 }
 
 
+
 /**
  * Free (each) request made by the peer.
  *
@@ -974,6 +1181,16 @@
 shutdown_task (void *cls,
               const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
+  if (mig_qe != NULL)
+    {
+      GNUNET_DATASTORE_cancel (mig_qe);
+      mig_qe = NULL;
+    }
+  if (GNUNET_SCHEDULER_NO_TASK != mig_task)
+    {
+      GNUNET_SCHEDULER_cancel (sched, mig_task);
+      mig_task = GNUNET_SCHEDULER_NO_TASK;
+    }
   while (client_list != NULL)
     handle_client_disconnect (NULL,
                              client_list->client);
@@ -1001,6 +1218,9 @@
     }
   GNUNET_DATASTORE_disconnect (dsh,
                               GNUNET_NO);
+  while (mig_head != NULL)
+    delete_migration_block (mig_head);
+  GNUNET_assert (0 == mig_size);
   dsh = NULL;
   sched = NULL;
   cfg = NULL;  
@@ -1065,7 +1285,7 @@
     }
 #if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Transmitting %u bytes to peer %u.\n",
+             "Transmitting %u bytes to peer %u\n",
              msize,
              cp->pid);
 #endif
@@ -2930,6 +3150,7 @@
   sched = s;
   cfg = c;
   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
+  min_migration_delay = GNUNET_TIME_UNIT_SECONDS; // FIXME: get from config
   connected_peers = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get 
size from config
   query_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: 
get size from config
   peer_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get 
size from config
@@ -2964,6 +3185,13 @@
        }
       return GNUNET_SYSERR;
     }
+  /* FIXME: distinguish between sending and storing in options? */
+  if (active_migration) 
+    {
+      mig_task = GNUNET_SCHEDULER_add_now (sched,
+                                          &gather_migration_blocks,
+                                          NULL);
+    }
   GNUNET_SERVER_disconnect_notify (server, 
                                   &handle_client_disconnect,
                                   NULL);




reply via email to

[Prev in Thread] Current Thread [Next in Thread]