gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r10146 - gnunet/src/fs


From: gnunet
Subject: [GNUnet-SVN] r10146 - gnunet/src/fs
Date: Tue, 26 Jan 2010 14:29:45 +0100

Author: grothoff
Date: 2010-01-26 14:29:45 +0100 (Tue, 26 Jan 2010)
New Revision: 10146

Modified:
   gnunet/src/fs/gnunet-service-fs.c
Log:
stuff

Modified: gnunet/src/fs/gnunet-service-fs.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs.c   2010-01-26 09:53:40 UTC (rev 10145)
+++ gnunet/src/fs/gnunet-service-fs.c   2010-01-26 13:29:45 UTC (rev 10146)
@@ -50,8 +50,6 @@
 
 #define DEBUG_FS GNUNET_NO
 
-
-
 /**
  * Signature of a function that is called whenever a datastore
  * request can be processed (or an entry put on the queue times out).
@@ -398,6 +396,13 @@
   struct GNUNET_CORE_InformationRequestContext *irc;
 
   /**
+   * Handle for an active request for transmission to this peer, or
+   * NULL.  Only used for replies that we are trying to send to a peer
+   * that we are not yet connected to.
+   */
+  struct GNUNET_CORE_TransmitHandle *cth;
+
+  /**
    * Replies that we have received but were unable to forward yet
    * (typically non-null only if we have a pending transmission
    * request with the client or the respective peer).
@@ -405,15 +410,6 @@
   struct PendingMessage *replies_pending;
 
   /**
-   * Pending transmission request with the core service for the target
-   * peer (for processing of 'replies_pending') or Handle for a
-   * pending query-request for P2P-transmission with the core service.
-   * If non-NULL, this request must be cancelled should this struct be
-   * destroyed!
-   */
-  struct GNUNET_CORE_TransmitHandle *cth;
-
-  /**
    * Pending transmission request for the target client (for processing of
    * 'replies_pending').
    */
@@ -558,8 +554,8 @@
 
 
 /**
- * Linked list of all clients that we are 
- * currently processing requests for.
+ * Linked list of all clients that we are currently processing
+ * requests for.
  */
 struct ClientList
 {
@@ -658,7 +654,7 @@
    * Handle for an active request for transmission to this
    * peer, or NULL.
    */
-  struct GNUNET_CORE_PeerRequestHandle *prh;
+  struct GNUNET_CORE_TransmitHandle *cth;
 
   /**
    * Messages (replies, queries, content migration) we would like to
@@ -779,9 +775,6 @@
 
 
 
-
-
-
 /**
  * Run the next DS request in our
  * queue, we're done with the current one.
@@ -950,8 +943,7 @@
 
 
 /**
- * Mingle hash with the mingle_number to
- * produce different bits.
+ * Mingle hash with the mingle_number to produce different bits.
  */
 static void
 mingle_hash (const GNUNET_HashCode * in,
@@ -1114,12 +1106,10 @@
 
 
 /**
- * Task that is run for each request with the
- * goal of forwarding the associated query to
- * other peers.  The task should re-schedule
- * itself to be re-run once the TTL has expired.
- * (or at a later time if more peers should
- * be queried earlier).
+ * Task that is run for each request with the goal of forwarding the
+ * associated query to other peers.  The task should re-schedule
+ * itself to be re-run once the TTL has expired.  (or at a later time
+ * if more peers should be queried earlier).
  *
  * @param cls the requests "struct PendingRequest*"
  * @param tc task context (unused)
@@ -1130,9 +1120,9 @@
 
 
 /**
- * We've selected a peer for forwarding of a query.
- * Construct the message and then re-schedule the
- * task to forward again to (other) peers.
+ * We've selected a peer for forwarding of a query.  Construct the
+ * message and then re-schedule the task to forward again to (other)
+ * peers.
  *
  * @param cls closure
  * @param size number of bytes available in buf
@@ -1144,29 +1134,102 @@
                     size_t size, 
                     void *buf)
 {
+  struct ConnectedPeer *cp = cls;
+  char *cbuf = buf;
+  struct GNUNET_PeerIdentity target;
+  struct PendingMessage *pr;
+  size_t tot;
+
+  cp->cth = NULL;
+  tot = 0;
+  while ( (NULL != (pr = cp->pending_messages)) &&
+         (pr->msize < size - tot) )
+    {
+      memcpy (&cbuf[tot],
+             &pr[1],
+             pr->msize);
+      tot += pr->msize;
+      cp->pending_messages = pr->next;
+      GNUNET_free (pr);
+    }
+  if (NULL != pr)
+    {
+      GNUNET_PEER_resolve (cp->pid,
+                          &target);    
+      cp->cth = GNUNET_CORE_notify_transmit_ready (core,
+                                                  pr->priority,
+                                                  GNUNET_TIME_UNIT_FOREVER_REL,
+                                                  &target,
+                                                  pr->msize,
+                                                  &transmit_request_cb,
+                                                  cp);
+    }
+  return tot;
+}
+
+
+/**
+ * Function called after we've tried to reserve a certain amount of
+ * bandwidth for a reply.  Check if we succeeded and if so send our
+ * query.
+ *
+ * @param cls the requests "struct PendingRequest*"
+ * @param peer identifies the peer
+ * @param bpm_in set to the current bandwidth limit (receiving) for this peer
+ * @param bpm_out set to the current bandwidth limit (sending) for this peer
+ * @param amount set to the amount that was actually reserved or unreserved
+ * @param preference current traffic preference for the given peer
+ */
+static void
+target_reservation_cb (void *cls,
+                      const struct
+                      GNUNET_PeerIdentity * peer,
+                      unsigned int bpm_in,
+                      unsigned int bpm_out,
+                      int amount,
+                      uint64_t preference)
+{
   struct PendingRequest *pr = cls;
+  struct ConnectedPeer *cp;
+  struct PendingMessage *pm;
+  struct PendingMessage *pos;
+  struct PendingMessage *prev;
   struct GetMessage *gm;
   GNUNET_HashCode *ext;
   char *bfdata;
   size_t msize;
   unsigned int k;
 
-  pr->cth = NULL;
-  /* (1) check for timeout */
-  if (NULL == buf)
+  pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+                                          get_processing_delay (), // FIXME: 
longer?
+                                          &forward_request_task,
+                                          pr);
+  pr->irc = NULL;
+  GNUNET_assert (peer != NULL);
+  if (amount != DBLOCK_SIZE) 
     {
-      /* timeout, try another peer immediately again */
-      pr->task = GNUNET_SCHEDULER_add_with_priority (sched,
-                                                    
GNUNET_SCHEDULER_PRIORITY_IDLE,
-                                                    &forward_request_task,
-                                                    pr);
-      return 0;
+      /* FIXME: call stats... */
+      return; /* this target round failed */    
     }
-  /* (2) build query message */
+  // (2) transmit, update ttl/priority
+  cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                         &peer->hashPubKey);
+  if (cp == NULL)
+    {
+      /* Peer must have just left; try again immediately */
+      pr->task = GNUNET_SCHEDULER_add_now (sched,
+                                          &forward_request_task,
+                                          pr);
+      return;
+    }
+  /* build message and insert message into priority queue */
   k = 0; // FIXME: count hash codes!
   msize = sizeof (struct GetMessage) + pr->bf_size + k * 
sizeof(GNUNET_HashCode);
   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
-  gm = (struct GetMessage*) buf;
+  pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
+  pm->msize = msize;
+  pm->priority = 0; // FIXME: calculate priority properly!
+  gm = (struct GetMessage*) &pm[1];
   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
   gm->header.size = htons (msize);
   gm->type = htonl (pr->type);
@@ -1177,91 +1240,52 @@
   gm->hash_bitmap = htonl (42);
   gm->query = pr->query;
   ext = (GNUNET_HashCode*) &gm[1];
+
   // FIXME: setup "ext[0]..[k-1]"
   bfdata = (char *) &ext[k];
   if (pr->bf != NULL)
     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
                                               bfdata,
                                               pr->bf_size);
-  
-  /* (3) schedule job to do it again (or another peer, etc.) */
-  pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                          get_processing_delay (), // FIXME!
-                                          &forward_request_task,
-                                          pr);
 
-  return msize;
-}
 
-
-/**
- * Function called after we've tried to reserve
- * a certain amount of bandwidth for a reply.
- * Check if we succeeded and if so send our query.
- *
- * @param cls the requests "struct PendingRequest*"
- * @param peer identifies the peer
- * @param bpm_in set to the current bandwidth limit (receiving) for this peer
- * @param bpm_out set to the current bandwidth limit (sending) for this peer
- * @param amount set to the amount that was actually reserved or unreserved
- * @param preference current traffic preference for the given peer
- */
-static void
-target_reservation_cb (void *cls,
-                      const struct
-                      GNUNET_PeerIdentity * peer,
-                      unsigned int bpm_in,
-                      unsigned int bpm_out,
-                      int amount,
-                      uint64_t preference)
-{
-  struct PendingRequest *pr = cls;
-  uint32_t priority;
-  uint16_t size;
-  struct GNUNET_TIME_Relative maxdelay;
-
-  pr->irc = NULL;
-  GNUNET_assert (peer != NULL);
-  if ( (amount != DBLOCK_SIZE) ||
-       (pr->cth != NULL) )
+  prev = NULL;
+  pos = cp->pending_messages;
+  while ( (pos != NULL) &&
+         (pm->priority < pos->priority) )
     {
-      /* try again later; FIXME: we may need to un-reserve "amount"? */
-      pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                              get_processing_delay (), // 
FIXME: longer?
-                                              &forward_request_task,
-                                              pr);
-      return;
+      prev = pos;
+      pos = pos->next;
     }
-  // (2) transmit, update ttl/priority
-  // FIXME: calculate priority, maxdelay, size properly!
-  priority = 0;
-  size = 60000;
-  maxdelay = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
-  pr->cth = GNUNET_CORE_notify_transmit_ready (core,
-                                              priority,
-                                              maxdelay,
-                                              peer,
-                                              size,
-                                              &transmit_request_cb,
-                                              pr);
-  if (pr->cth == NULL)
+  if (prev == NULL)
+    cp->pending_messages = pm;
+  else
+    prev->next = pm;
+  pm->next = pos;
+  if (cp->cth == NULL)
+    cp->cth = GNUNET_CORE_notify_transmit_ready (core,
+                                                cp->pending_messages->priority,
+                                                GNUNET_TIME_UNIT_FOREVER_REL,
+                                                peer,
+                                                msize,
+                                                &transmit_request_cb,
+                                                cp);
+  if (cp->cth == NULL)
     {
-      /* try again later */
-      pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                              get_processing_delay (), // 
FIXME: longer?
-                                              &forward_request_task,
-                                              pr);
+      /* technically, this should not be a 'break'; but
+        we don't handle this (rare) case yet, so let's warn
+        about it... */
+      GNUNET_break (0);
+      // FIXME: now what?
     }
 }
 
 
 /**
- * Task that is run for each request with the
- * goal of forwarding the associated query to
- * other peers.  The task should re-schedule
- * itself to be re-run once the TTL has expired.
- * (or at a later time if more peers should
- * be queried earlier).
+ * Task that is run for each request with the goal of forwarding the
+ * associated query to other peers.  The task should re-schedule
+ * itself to be re-run once the TTL has expired.  (or at a later time
+ * if more peers should be queried earlier).
  *
  * @param cls the requests "struct PendingRequest*"
  * @param tc task context (unused)
@@ -1274,15 +1298,6 @@
   struct PeerSelectionContext psc;
 
   pr->task = GNUNET_SCHEDULER_NO_TASK;
-  if (pr->cth != NULL) 
-    {
-      /* we're busy transmitting a result, wait a bit */
-      pr->task = GNUNET_SCHEDULER_add_delayed (sched,
-                                              get_processing_delay (), 
-                                              &forward_request_task,
-                                              pr);
-      return;
-    }
   /* (1) select target */
   psc.pr = pr;
   psc.target_score = DBL_MIN;
@@ -1301,13 +1316,13 @@
   /* (2) reserve reply bandwidth */
   GNUNET_assert (NULL == pr->irc);
   pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
-                                      &psc.target,
-                                      GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
-                                      -1,
-                                      DBLOCK_SIZE, // FIXME: make dependent on 
type?
-                                      0,
-                                      &target_reservation_cb,
-                                      pr);
+                                               &psc.target,
+                                               
GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
+                                               -1,
+                                               DBLOCK_SIZE, // FIXME: make 
dependent on type?
+                                               0,
+                                               &target_reservation_cb,
+                                               pr);
 }
 
 
@@ -1668,8 +1683,6 @@
     }
   if (GNUNET_SCHEDULER_NO_TASK != pr->task)
     GNUNET_SCHEDULER_cancel (sched, pr->task);
-  if (NULL != pr->cth)
-    GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
   if (NULL != pr->bf)
     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
   if (NULL != pr->th)
@@ -1679,6 +1692,8 @@
       pr->replies_pending = reply->next;
       GNUNET_free (reply);
     }
+  if (NULL != pr->cth)
+    GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
   GNUNET_PEER_change_rc (pr->source_pid, -1);
   GNUNET_PEER_change_rc (pr->target_pid, -1);
   GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
@@ -1862,12 +1877,23 @@
                         GNUNET_PeerIdentity * peer)
 {
   struct ConnectedPeer *cp;
+  struct PendingMessage *pm;
 
   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
                                          &peer->hashPubKey);
-  GNUNET_PEER_change_rc (cp->pid, -1);
-  GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
-  GNUNET_free (cp);
+  if (cp != NULL)
+    {
+      GNUNET_PEER_change_rc (cp->pid, -1);
+      GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
+      if (NULL != cp->cth)
+       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+      while (NULL != (pm = cp->pending_messages))
+       {
+         cp->pending_messages = pm->next;
+         GNUNET_free (pm);
+       }
+      GNUNET_free (cp);
+    }
   GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer,
                                              &peer->hashPubKey,
                                              &destroy_request,
@@ -1876,9 +1902,8 @@
 
 
 /**
- * We're processing a GET request from
- * another peer and have decided to forward
- * it to other peers.
+ * We're processing a GET request from another peer and have decided
+ * to forward it to other peers.
  *
  * @param cls our "struct ProcessGetContext *"
  * @param tc unused
@@ -2407,7 +2432,7 @@
 
 
 /**
- * Iterator over pending requests.
+ * We have received a reply; handle it!
  *
  * @param cls response (struct ProcessReplyClosure)
  * @param key our query
@@ -2425,6 +2450,7 @@
   struct PendingMessage *reply;
   struct PutMessage *pm;
   struct ContentMessage *cm;
+  struct ConnectedPeer *cp;
   GNUNET_HashCode chash;
   GNUNET_HashCode mhash;
   struct GNUNET_PeerIdentity target;
@@ -2472,19 +2498,20 @@
     }
   if (pr->client == NULL)
     {
+      GNUNET_PEER_resolve (pr->source_pid,
+                          &target);
+      cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                             &target.hashPubKey);
       msize = sizeof (struct ContentMessage) + prq->size;
       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
       reply->msize = msize;
+      reply->priority = (uint32_t) -1; /* send replies first! */
       cm = (struct ContentMessage*) &reply[1];
       cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
       cm->header.size = htons (msize);
       cm->type = htonl (prq->type);
       cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
-      reply->next = pr->replies_pending;
-      pr->replies_pending = reply;
       memcpy (&reply[1], prq->data, prq->size);
-      if (pr->cth != NULL)
-       return GNUNET_YES;
       max_delay = GNUNET_TIME_UNIT_FOREVER_REL;
       if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= 
max_pending_requests)
        {
@@ -2494,19 +2521,38 @@
          max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time,
                                                           eer->start_time);
        }
-      GNUNET_PEER_resolve (pr->source_pid,
-                          &target);
-      pr->cth = GNUNET_CORE_notify_transmit_ready (core,
-                                                  prio,
-                                                  max_delay,
-                                                  &target,
-                                                  msize,
-                                                  &transmit_result,
-                                                  pr);
-      if (NULL == pr->cth)
+
+      if (cp == NULL)
        {
-         // FIXME: now what? discard?
+         /* FIXME: bound queue size! */
+         reply->next = pr->replies_pending;
+         pr->replies_pending = reply;
+         if (pr->cth == NULL)
+           {
+             /* implicitly tries to connect */
+             pr->cth = GNUNET_CORE_notify_transmit_ready (core,
+                                                          prio,
+                                                          max_delay,
+                                                          &target,
+                                                          msize,
+                                                          &transmit_result,
+                                                          pr);
+           }
        }
+      else
+       {
+         /* insert replies always at the head */
+         reply->next = cp->pending_messages;
+         cp->pending_messages = reply;
+         if (cp->cth == NULL)
+           cp->cth = GNUNET_CORE_notify_transmit_ready (core,
+                                                        reply->priority,
+                                                        
GNUNET_TIME_UNIT_FOREVER_REL,
+                                                        &target,
+                                                        msize,
+                                                        &transmit_request_cb,
+                                                        cp);
+       }
     }
   else
     {
@@ -2746,7 +2792,6 @@
   sched = s;
   cfg = c;
 
-
   requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: 
get size from config
   requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get 
size from config
   connected_peers = GNUNET_CONTAINER_multihashmap_create (64);





reply via email to

[Prev in Thread] Current Thread [Next in Thread]