gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r15070 - gnunet/src/datastore


From: gnunet
Subject: [GNUnet-SVN] r15070 - gnunet/src/datastore
Date: Thu, 21 Apr 2011 17:14:03 +0200

Author: grothoff
Date: 2011-04-21 17:14:03 +0200 (Thu, 21 Apr 2011)
New Revision: 15070

Modified:
   gnunet/src/datastore/datastore.h
   gnunet/src/datastore/datastore_api.c
Log:
fixes

Modified: gnunet/src/datastore/datastore.h
===================================================================
--- gnunet/src/datastore/datastore.h    2011-04-21 13:44:50 UTC (rev 15069)
+++ gnunet/src/datastore/datastore.h    2011-04-21 15:14:03 UTC (rev 15070)
@@ -27,7 +27,7 @@
 #ifndef DATASTORE_H
 #define DATASTORE_H
 
-#define DEBUG_DATASTORE GNUNET_NO
+#define DEBUG_DATASTORE GNUNET_YES
 
 #include "gnunet_util_lib.h"
 

Modified: gnunet/src/datastore/datastore_api.c
===================================================================
--- gnunet/src/datastore/datastore_api.c        2011-04-21 13:44:50 UTC (rev 
15069)
+++ gnunet/src/datastore/datastore_api.c        2011-04-21 15:14:03 UTC (rev 
15070)
@@ -187,7 +187,6 @@
    */
   const struct GNUNET_CONFIGURATION_Handle *cfg;
 
-
   /**
    * Current connection to the datastore service.
    */
@@ -241,6 +240,12 @@
    */
   int in_receive;
 
+  /**
+   * We should either receive (and ignore) an 'END' message or force a
+   * disconnect for the next message from the service.
+   */
+  unsigned int expect_end_or_disconnect;
+
 };
 
 
@@ -600,7 +605,7 @@
   h->in_receive = GNUNET_YES;
   GNUNET_CLIENT_receive (h->client,
                         qe->response_proc,
-                        qe,
+                        h,
                         GNUNET_TIME_absolute_get_remaining (qe->timeout));
   GNUNET_STATISTICS_update (h->stats,
                            gettext_noop ("# bytes sent to datastore"),
@@ -710,16 +715,23 @@
                        const struct
                        GNUNET_MessageHeader * msg)
 {
-  struct GNUNET_DATASTORE_QueueEntry *qe = cls;
-  struct GNUNET_DATASTORE_Handle *h = qe->h;
-  struct StatusContext rc = qe->qc.sc;
+  struct GNUNET_DATASTORE_Handle *h = cls;
+  struct GNUNET_DATASTORE_QueueEntry *qe;
+  struct StatusContext rc;
   const struct StatusMessage *sm;
   const char *emsg;
   int32_t status;
   int was_transmitted;
 
   h->in_receive = GNUNET_NO;
+  if (NULL == (qe = h->queue_head))
+    {
+      GNUNET_break (0);
+      do_disconnect (h);
+      return;
+    }
   was_transmitted = qe->was_transmitted;
+  rc = qe->qc.sc;
   if (msg == NULL)
     {      
       free_queue_entry (qe);
@@ -734,7 +746,6 @@
       return;
     }
   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
-  GNUNET_assert (h->queue_head == qe);
   free_queue_entry (qe);
   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
@@ -1169,43 +1180,54 @@
  */
 static void 
 process_result_message (void *cls,
-                       const struct GNUNET_MessageHeader * msg)
+                       const struct GNUNET_MessageHeader *msg)
 {
-  struct GNUNET_DATASTORE_QueueEntry *qe = cls;
-  struct GNUNET_DATASTORE_Handle *h = qe->h;
-  struct ResultContext rc = qe->qc.rc;
+  struct GNUNET_DATASTORE_Handle *h = cls;
+  struct GNUNET_DATASTORE_QueueEntry *qe;
+  struct ResultContext rc;
   const struct DataMessage *dm;
   int was_transmitted;
 
   h->in_receive = GNUNET_NO;
   if (msg == NULL)
-   {
-      was_transmitted = qe->was_transmitted;
-      free_queue_entry (qe);
-      if (was_transmitted == GNUNET_YES)
+    {
+      if (NULL != (qe = h->queue_head))
        {
-         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                     _("Failed to receive response from database.\n"));
-         do_disconnect (h);
-       }
-      else
-       {
+         was_transmitted = qe->was_transmitted;
+         free_queue_entry (qe);
+         rc = qe->qc.rc;
+         if (was_transmitted == GNUNET_YES)
+           {
+             GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                         _("Failed to receive response from database.\n"));
+             do_disconnect (h);
+           }   
+         else
+           {
 #if DEBUG_DATASTORE
-         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                     "Request dropped due to finite datastore queue 
length.\n");
+             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                         "Request dropped due to finite datastore queue 
length.\n");
 #endif
+           }   
+         if (rc.iter != NULL)
+           rc.iter (rc.iter_cls,
+                    NULL, 0, NULL, 0, 0, 0, 
+                    GNUNET_TIME_UNIT_ZERO_ABS, 0);
        }
-      if (rc.iter != NULL)
-       rc.iter (rc.iter_cls,
-                NULL, 0, NULL, 0, 0, 0, 
-                GNUNET_TIME_UNIT_ZERO_ABS, 0); 
       return;
     }
-  GNUNET_assert (GNUNET_YES == qe->was_transmitted);
-  GNUNET_assert (h->queue_head == qe);
   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
     {
       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
+      if (h->expect_end_or_disconnect > 0)
+       {
+         h->expect_end_or_disconnect--;
+         process_queue (h);
+         return;
+       }
+      qe = h->queue_head;
+      rc = qe->qc.rc;
+      GNUNET_assert (GNUNET_YES == qe->was_transmitted);
       free_queue_entry (qe);
 #if DEBUG_DATASTORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1221,6 +1243,16 @@
       process_queue (h);
       return;
     }
+  if (h->expect_end_or_disconnect > 0)
+    {
+      /* only 'END' allowed, must reconnect */
+      h->retry_time = GNUNET_TIME_UNIT_ZERO;
+      do_disconnect (h);
+      return;
+    }
+  qe = h->queue_head;
+  rc = qe->qc.rc;
+  GNUNET_assert (GNUNET_YES == qe->was_transmitted);
   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct 
DataMessage*)msg)->size)) )
@@ -1500,11 +1532,10 @@
 {
   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
 
-  GNUNET_assert (&process_result_message == qe->response_proc);
   h->in_receive = GNUNET_YES;
   GNUNET_CLIENT_receive (h->client,
-                        qe->response_proc,
-                        qe,
+                        &process_result_message,
+                        h,
                         GNUNET_TIME_absolute_get_remaining (qe->timeout));
 }
 
@@ -1531,8 +1562,7 @@
   if (GNUNET_YES == qe->was_transmitted) 
     {
       free_queue_entry (qe);
-      h->retry_time = GNUNET_TIME_UNIT_ZERO;
-      do_disconnect (h);
+      h->expect_end_or_disconnect++;
       return;
     }
   free_queue_entry (qe);




reply via email to

[Prev in Thread] Current Thread [Next in Thread]