gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r6605 - in GNUnet/src: applications/fs/gap include server u


From: gnunet
Subject: [GNUnet-SVN] r6605 - in GNUnet/src: applications/fs/gap include server util/network
Date: Sun, 23 Mar 2008 01:22:19 -0600 (MDT)

Author: grothoff
Date: 2008-03-23 01:22:19 -0600 (Sun, 23 Mar 2008)
New Revision: 6605

Modified:
   GNUnet/src/applications/fs/gap/fs.c
   GNUnet/src/applications/fs/gap/querymanager.c
   GNUnet/src/include/gnunet_core.h
   GNUnet/src/server/core.c
   GNUnet/src/server/tcpserver.c
   GNUnet/src/server/tcpserver.h
   GNUnet/src/util/network/select.c
Log:
fixing gnunet-download hang-bug (full TCP queues)

Modified: GNUnet/src/applications/fs/gap/fs.c
===================================================================
--- GNUnet/src/applications/fs/gap/fs.c 2008-03-23 02:57:58 UTC (rev 6604)
+++ GNUnet/src/applications/fs/gap/fs.c 2008-03-23 07:22:19 UTC (rev 6605)
@@ -445,9 +445,7 @@
   type = ntohl (dblock->type);
   GNUNET_free_non_null (enc);
   ret = coreAPI->cs_send_to_client (sock, &msg->header,
-                                    type !=
-                                    GNUNET_ECRS_BLOCKTYPE_DATA ? GNUNET_NO :
-                                    GNUNET_YES);
+                                    GNUNET_NO);
   GNUNET_free (msg);
   if (ret == GNUNET_NO)
     cls->have_more = GNUNET_YES; /* switch to async processing */
@@ -500,17 +498,26 @@
   fpp.seen = NULL;
   fpp.have_more = GNUNET_NO;
   fpp.processed = 0;
-  if (type == GNUNET_ECRS_BLOCKTYPE_DATA)
+  if (GNUNET_OK == 
+      coreAPI->cs_test_send_to_client_now(sock,
+                                         GNUNET_GAP_ESTIMATED_DATA_SIZE,
+                                         GNUNET_NO)) 
     {
-      if ((1 == datastore->get (&rs->query[0],
-                                type, &fast_path_processor, &fpp)) ||
-          (1 == datastore->get (&rs->query[0],
-                                GNUNET_ECRS_BLOCKTYPE_ONDEMAND,
-                                &fast_path_processor, &fpp)))
-        goto CLEANUP;
-    }
+      if (type == GNUNET_ECRS_BLOCKTYPE_DATA)
+       {
+         if ( ((1 == datastore->get (&rs->query[0],
+                                     type, &fast_path_processor, &fpp)) ||
+               (1 == datastore->get (&rs->query[0],
+                                     GNUNET_ECRS_BLOCKTYPE_ONDEMAND,
+                                     &fast_path_processor, &fpp))) &&
+              (fpp.have_more == GNUNET_NO) )
+           goto CLEANUP;
+       }
+      else
+       datastore->get (&rs->query[0], type, &fast_path_processor, &fpp);
+    } 
   else
-    datastore->get (&rs->query[0], type, &fast_path_processor, &fpp);
+    fpp.have_more = GNUNET_YES;
   anonymityLevel = ntohl (rs->anonymityLevel);
   keyCount =
     1 + (ntohs (req->size) -

Modified: GNUnet/src/applications/fs/gap/querymanager.c
===================================================================
--- GNUnet/src/applications/fs/gap/querymanager.c       2008-03-23 02:57:58 UTC 
(rev 6604)
+++ GNUnet/src/applications/fs/gap/querymanager.c       2008-03-23 07:22:19 UTC 
(rev 6605)
@@ -39,7 +39,7 @@
 #include "pid_table.h"
 #include "shared.h"
 
-#define CHECK_REPEAT_FREQUENCY (2 * GNUNET_CRON_SECONDS)
+#define CHECK_REPEAT_FREQUENCY (150 * GNUNET_CRON_MILLISECONDS)
 
 /**
  * Linked list with information for each client.
@@ -62,6 +62,11 @@
    */
   struct RequestList *requests;
 
+  /**
+   * Tail of the requests list.
+   */
+  struct RequestList *request_tail;
+
 };
 
 /**
@@ -70,6 +75,8 @@
  */
 static struct ClientDataList *clients;
 
+static struct ClientDataList *clients_tail;
+
 static GNUNET_CoreAPIForPlugins *coreAPI;
 
 static GNUNET_Stats_ServiceAPI *stats;
@@ -193,9 +200,13 @@
       cl->client = client;
       cl->next = clients;
       clients = cl;
+      if (clients_tail == NULL)
+       clients_tail = cl;
     }
   request->next = cl->requests;
   cl->requests = request;
+  if (cl->request_tail == NULL)
+    cl->request_tail = request;
   if ((GNUNET_YES == GNUNET_FS_PLAN_request (client, 0, request)) &&
       (stats != NULL))
     stats->change (stat_gap_client_query_injected, 1);
@@ -375,6 +386,8 @@
                 prev->next = rl->next;
               else
                 cl->requests = rl->next;
+             if (rl == cl->request_tail)
+               cl->request_tail = prev;
               GNUNET_FS_SHARED_free_request_list (rl);
               if (stats != NULL)
                 stats->change (stat_gap_client_query_tracked, -1);
@@ -416,6 +429,8 @@
       prev = cl;
       cl = cl->next;
     }
+  if (cl == clients_tail)
+    clients_tail = prev;
   if (cl != NULL)
     {
       while (cl->requests != NULL)
@@ -503,52 +518,81 @@
   GNUNET_CronTime now;
 
   GNUNET_mutex_lock (GNUNET_FS_lock);
+  if (clients == NULL)   
+    {  
+      GNUNET_mutex_lock (GNUNET_FS_lock);
+      return;
+    }
   now = GNUNET_get_time ();
-  client = clients;
-  while (client != NULL)
+  client = clients;  
+  if (clients_tail != client)
     {
-      request = client->requests;
-      while (request != NULL)
-        {
-          if (request->have_more > 0)
-            {
-              request->have_more--;
-              hmc.request = request;
-              hmc.processed = 0;
-              hmc.have_more = GNUNET_NO;
-              datastore->get (&request->queries[0], request->type,
-                              &have_more_processor, &hmc);
-              if (hmc.have_more)
-                request->have_more += HAVE_MORE_INCREMENT;
-            }
-          else
-            {
-              if ((NULL == request->plan_entries) &&
-                  ((client->client != NULL) ||
-                   (request->expiration > now)) &&
-                  (request->last_ttl_used * GNUNET_CRON_SECONDS +
-                   request->last_request_time < now))
-                {
-                  if ((GNUNET_OK ==
-                       GNUNET_FS_PLAN_request (client->client, 0, request))
-                      && (stats != NULL))
-                    stats->change (stat_gap_client_query_injected, 1);
-                }
-
-              if ((request->anonymityLevel == 0) &&
-                  (request->last_dht_get + request->dht_back_off < now))
-                {
-                  if (request->dht_back_off * 2 > request->dht_back_off)
-                    request->dht_back_off *= 2;
-                  request->last_dht_get = now;
-                  GNUNET_FS_DHT_execute_query (request->type,
-                                               &request->queries[0]);
-                }
-            }
-          request = request->next;
-        }
-      client = client->next;
+      /* move client to tail of list */
+      GNUNET_GE_ASSERT(NULL, clients_tail->next == NULL);
+      clients = clients->next;
+      clients_tail->next = client;
+      clients_tail = client;
+      client->next = NULL;
     }
+  request = client->requests;
+  if (request == NULL)
+    {
+      GNUNET_mutex_lock (GNUNET_FS_lock);
+      return;
+    }
+  if (client->request_tail != request)
+    {
+      /* move request to tail of list */
+      GNUNET_GE_ASSERT(NULL, client->request_tail->next == NULL);
+      client->requests = request->next;
+      client->request_tail->next = request;
+      client->request_tail = request;
+      request->next = NULL;
+    }
+  if ( (client->client != NULL) &&
+       (GNUNET_OK != 
+       coreAPI->cs_test_send_to_client_now(client->client,
+                                           GNUNET_GAP_ESTIMATED_DATA_SIZE,
+                                           GNUNET_NO)))
+    {
+      GNUNET_mutex_lock (GNUNET_FS_lock);
+      return;
+    }
+  if (request->have_more > 0)
+    {
+      request->have_more--;
+      hmc.request = request;
+      hmc.processed = 0;
+      hmc.have_more = GNUNET_NO;
+      datastore->get (&request->queries[0], request->type,
+                     &have_more_processor, &hmc);
+      if (hmc.have_more)
+       request->have_more += HAVE_MORE_INCREMENT;
+    }
+  else
+    {
+      if ((NULL == request->plan_entries) &&
+         ((client->client != NULL) ||
+          (request->expiration > now)) &&
+         (request->last_ttl_used * GNUNET_CRON_SECONDS +
+          request->last_request_time < now))
+       {
+         if ((GNUNET_OK ==
+              GNUNET_FS_PLAN_request (client->client, 0, request))
+             && (stats != NULL))
+           stats->change (stat_gap_client_query_injected, 1);
+       }
+      
+      if ((request->anonymityLevel == 0) &&
+         (request->last_dht_get + request->dht_back_off < now))
+       {
+         if (request->dht_back_off * 2 > request->dht_back_off)
+           request->dht_back_off *= 2;
+         request->last_dht_get = now;
+         GNUNET_FS_DHT_execute_query (request->type,
+                                      &request->queries[0]);
+       }
+    }
   GNUNET_mutex_unlock (GNUNET_FS_lock);
 }
 

Modified: GNUnet/src/include/gnunet_core.h
===================================================================
--- GNUnet/src/include/gnunet_core.h    2008-03-23 02:57:58 UTC (rev 6604)
+++ GNUnet/src/include/gnunet_core.h    2008-03-23 07:22:19 UTC (rev 6605)
@@ -564,6 +564,15 @@
                             const GNUNET_MessageHeader * message, int force);
 
   /**
+   * Test if we could send a message to the client right now.
+   * @param client
+   * @return GNUNET_OK if we could, GNUNET_NO if not, GNUNET_SYSERR on error
+   */
+  int (*cs_test_send_to_client_now) (struct GNUNET_ClientHandle * handle,
+                                    unsigned int size,
+                                    int would_force);
+
+  /**
    * Send a message to the client identified by the handle.  Note that
    * the core will typically buffer these messages as much as possible
    * and only return GNUNET_SYSERR if it runs out of buffers.  Returning 
GNUNET_OK
@@ -627,7 +636,6 @@
   struct GNUNET_GE_Context
     *(*cs_create_client_log_context) (struct GNUNET_ClientHandle * handle);
 
-
   /* ************************ MISC ************************ */
 
   /**

Modified: GNUnet/src/server/core.c
===================================================================
--- GNUnet/src/server/core.c    2008-03-23 02:57:58 UTC (rev 6604)
+++ GNUnet/src/server/core.c    2008-03-23 07:22:19 UTC (rev 6605)
@@ -565,6 +565,7 @@
 
   applicationCore.sendValueToClient = &GNUNET_CORE_cs_send_result_to_client;   
 /* tcpserver.c */
   applicationCore.cs_send_to_client = &GNUNET_CORE_cs_send_to_client;   /* 
tcpserver.c */
+  applicationCore.cs_test_send_to_client_now = 
&GNUNET_CORE_cs_test_send_to_client_now;   /* tcpserver.c */
   applicationCore.registerClientHandler = &GNUNET_CORE_register_handler;       
 /* tcpserver.c */
   applicationCore.unregisterClientHandler = &GNUNET_CORE_unregister_handler;   
 /* tcpserver.c */
   applicationCore.cs_exit_handler_register = 
&GNUNET_CORE_cs_register_exit_handler;     /* tcpserver.c */

Modified: GNUnet/src/server/tcpserver.c
===================================================================
--- GNUnet/src/server/tcpserver.c       2008-03-23 02:57:58 UTC (rev 6604)
+++ GNUnet/src/server/tcpserver.c       2008-03-23 07:22:19 UTC (rev 6605)
@@ -241,10 +241,20 @@
                  GNUNET_GE_DEBUG | GNUNET_GE_DEVELOPER | GNUNET_GE_REQUEST,
                  "%s: sending reply to client\n", __FUNCTION__);
 #endif
-  return GNUNET_select_write (selector, handle->sock, message, GNUNET_YES,
+  return GNUNET_select_write (selector, handle->sock, message, GNUNET_NO,
                               force);
 }
 
+int
+GNUNET_CORE_cs_test_send_to_client_now (struct GNUNET_ClientHandle *handle,
+                                       unsigned int size,
+                                       int force)
+{
+  return GNUNET_select_test_write_now (selector, handle->sock, 
+                                      size, GNUNET_NO, 
+                                      force);
+}
+
 void
 GNUNET_CORE_cs_terminate_client_connection (struct GNUNET_ClientHandle *sock)
 {

Modified: GNUnet/src/server/tcpserver.h
===================================================================
--- GNUnet/src/server/tcpserver.h       2008-03-23 02:57:58 UTC (rev 6604)
+++ GNUnet/src/server/tcpserver.h       2008-03-23 07:22:19 UTC (rev 6605)
@@ -93,6 +93,15 @@
 
 
 /**
+ * Test if we would be able to queue a message for delivery to this
+ * client right now.
+ */
+int GNUNET_CORE_cs_test_send_to_client_now (struct GNUNET_ClientHandle *handle,
+                                           unsigned int size,
+                                           int would_force);
+
+
+/**
  * Send a return value to the caller of a remote call via
  * TCP.
  * @param sock the TCP socket

Modified: GNUnet/src/util/network/select.c
===================================================================
--- GNUnet/src/util/network/select.c    2008-03-23 02:57:58 UTC (rev 6604)
+++ GNUnet/src/util/network/select.c    2008-03-23 07:22:19 UTC (rev 6605)
@@ -1041,10 +1041,7 @@
   GNUNET_GE_ASSERT (NULL, session->wapos >= session->wspos);
   if ((force == GNUNET_NO) &&
       (((sh->memory_quota > 0) &&
-        (session->wapos - session->wspos + len > sh->memory_quota)) ||
-       ((sh->memory_quota == 0) &&
-        (session->wapos - session->wspos + len >
-         GNUNET_MAX_GNUNET_malloc_CHECKED / 4))))
+        (session->wapos - session->wspos + len > sh->memory_quota)) ))
     {
       /* not enough free space, not allowed to grow that much */
       GNUNET_mutex_unlock (sh->lock);
@@ -1077,15 +1074,32 @@
           if ((sh->memory_quota > 0) &&
               (newBufferSize > sh->memory_quota) && (force == GNUNET_NO))
             newBufferSize = sh->memory_quota;
+         if (newBufferSize > GNUNET_MAX_GNUNET_malloc_CHECKED)
+           {
+             /* not enough free space, not allowed to grow that much,
+                even with forcing! */
+             GNUNET_mutex_unlock (sh->lock);
+             return GNUNET_NO;
+           }
           GNUNET_GE_ASSERT (NULL,
                             newBufferSize >=
                             len + session->wapos - session->wspos);
-          newBuffer = GNUNET_malloc (newBufferSize);
-          memcpy (newBuffer,
-                  &session->wbuff[session->wspos],
-                  session->wapos - session->wspos);
-          GNUNET_free_non_null (session->wbuff);
-          session->wbuff = newBuffer;
+         if (newBufferSize != session->wsize)
+           {
+             newBuffer = GNUNET_malloc (newBufferSize);
+             memcpy (newBuffer,
+                     &session->wbuff[session->wspos],
+                     session->wapos - session->wspos);
+             GNUNET_free_non_null (session->wbuff);
+             session->wbuff = newBuffer;
+           }
+         else
+           {
+             if (session->wspos != 0)
+               memmove(session->wbuff,
+                       &session->wbuff[session->wspos],
+                       session->wapos - session->wspos);
+           }
           session->wsize = newBufferSize;
           session->wapos = session->wapos - session->wspos;
           session->wspos = 0;





reply via email to

[Prev in Thread] Current Thread [Next in Thread]