[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r15070 - gnunet/src/datastore
From: |
gnunet |
Subject: |
[GNUnet-SVN] r15070 - gnunet/src/datastore |
Date: |
Thu, 21 Apr 2011 17:14:03 +0200 |
Author: grothoff
Date: 2011-04-21 17:14:03 +0200 (Thu, 21 Apr 2011)
New Revision: 15070
Modified:
gnunet/src/datastore/datastore.h
gnunet/src/datastore/datastore_api.c
Log:
fixes
Modified: gnunet/src/datastore/datastore.h
===================================================================
--- gnunet/src/datastore/datastore.h 2011-04-21 13:44:50 UTC (rev 15069)
+++ gnunet/src/datastore/datastore.h 2011-04-21 15:14:03 UTC (rev 15070)
@@ -27,7 +27,7 @@
#ifndef DATASTORE_H
#define DATASTORE_H
-#define DEBUG_DATASTORE GNUNET_NO
+#define DEBUG_DATASTORE GNUNET_YES
#include "gnunet_util_lib.h"
Modified: gnunet/src/datastore/datastore_api.c
===================================================================
--- gnunet/src/datastore/datastore_api.c 2011-04-21 13:44:50 UTC (rev
15069)
+++ gnunet/src/datastore/datastore_api.c 2011-04-21 15:14:03 UTC (rev
15070)
@@ -187,7 +187,6 @@
*/
const struct GNUNET_CONFIGURATION_Handle *cfg;
-
/**
* Current connection to the datastore service.
*/
@@ -241,6 +240,12 @@
*/
int in_receive;
+ /**
+ * We should either receive (and ignore) an 'END' message or force a
+ * disconnect for the next message from the service.
+ */
+ unsigned int expect_end_or_disconnect;
+
};
@@ -600,7 +605,7 @@
h->in_receive = GNUNET_YES;
GNUNET_CLIENT_receive (h->client,
qe->response_proc,
- qe,
+ h,
GNUNET_TIME_absolute_get_remaining (qe->timeout));
GNUNET_STATISTICS_update (h->stats,
gettext_noop ("# bytes sent to datastore"),
@@ -710,16 +715,23 @@
const struct
GNUNET_MessageHeader * msg)
{
- struct GNUNET_DATASTORE_QueueEntry *qe = cls;
- struct GNUNET_DATASTORE_Handle *h = qe->h;
- struct StatusContext rc = qe->qc.sc;
+ struct GNUNET_DATASTORE_Handle *h = cls;
+ struct GNUNET_DATASTORE_QueueEntry *qe;
+ struct StatusContext rc;
const struct StatusMessage *sm;
const char *emsg;
int32_t status;
int was_transmitted;
h->in_receive = GNUNET_NO;
+ if (NULL == (qe = h->queue_head))
+ {
+ GNUNET_break (0);
+ do_disconnect (h);
+ return;
+ }
was_transmitted = qe->was_transmitted;
+ rc = qe->qc.sc;
if (msg == NULL)
{
free_queue_entry (qe);
@@ -734,7 +746,6 @@
return;
}
GNUNET_assert (GNUNET_YES == qe->was_transmitted);
- GNUNET_assert (h->queue_head == qe);
free_queue_entry (qe);
if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
(ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) )
@@ -1169,43 +1180,54 @@
*/
static void
process_result_message (void *cls,
- const struct GNUNET_MessageHeader * msg)
+ const struct GNUNET_MessageHeader *msg)
{
- struct GNUNET_DATASTORE_QueueEntry *qe = cls;
- struct GNUNET_DATASTORE_Handle *h = qe->h;
- struct ResultContext rc = qe->qc.rc;
+ struct GNUNET_DATASTORE_Handle *h = cls;
+ struct GNUNET_DATASTORE_QueueEntry *qe;
+ struct ResultContext rc;
const struct DataMessage *dm;
int was_transmitted;
h->in_receive = GNUNET_NO;
if (msg == NULL)
- {
- was_transmitted = qe->was_transmitted;
- free_queue_entry (qe);
- if (was_transmitted == GNUNET_YES)
+ {
+ if (NULL != (qe = h->queue_head))
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _("Failed to receive response from database.\n"));
- do_disconnect (h);
- }
- else
- {
+ was_transmitted = qe->was_transmitted;
+ free_queue_entry (qe);
+ rc = qe->qc.rc;
+ if (was_transmitted == GNUNET_YES)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ _("Failed to receive response from database.\n"));
+ do_disconnect (h);
+ }
+ else
+ {
#if DEBUG_DATASTORE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Request dropped due to finite datastore queue
length.\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Request dropped due to finite datastore queue
length.\n");
#endif
+ }
+ if (rc.iter != NULL)
+ rc.iter (rc.iter_cls,
+ NULL, 0, NULL, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
}
- if (rc.iter != NULL)
- rc.iter (rc.iter_cls,
- NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
- GNUNET_assert (GNUNET_YES == qe->was_transmitted);
- GNUNET_assert (h->queue_head == qe);
if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
{
GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
+ if (h->expect_end_or_disconnect > 0)
+ {
+ h->expect_end_or_disconnect--;
+ process_queue (h);
+ return;
+ }
+ qe = h->queue_head;
+ rc = qe->qc.rc;
+ GNUNET_assert (GNUNET_YES == qe->was_transmitted);
free_queue_entry (qe);
#if DEBUG_DATASTORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1221,6 +1243,16 @@
process_queue (h);
return;
}
+ if (h->expect_end_or_disconnect > 0)
+ {
+ /* only 'END' allowed, must reconnect */
+ h->retry_time = GNUNET_TIME_UNIT_ZERO;
+ do_disconnect (h);
+ return;
+ }
+ qe = h->queue_head;
+ rc = qe->qc.rc;
+ GNUNET_assert (GNUNET_YES == qe->was_transmitted);
if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
(ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
(ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct
DataMessage*)msg)->size)) )
@@ -1500,11 +1532,10 @@
{
struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
- GNUNET_assert (&process_result_message == qe->response_proc);
h->in_receive = GNUNET_YES;
GNUNET_CLIENT_receive (h->client,
- qe->response_proc,
- qe,
+ &process_result_message,
+ h,
GNUNET_TIME_absolute_get_remaining (qe->timeout));
}
@@ -1531,8 +1562,7 @@
if (GNUNET_YES == qe->was_transmitted)
{
free_queue_entry (qe);
- h->retry_time = GNUNET_TIME_UNIT_ZERO;
- do_disconnect (h);
+ h->expect_end_or_disconnect++;
return;
}
free_queue_entry (qe);
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r15070 - gnunet/src/datastore,
gnunet <=