[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r11355 - gnunet/src/datastore
From: |
gnunet |
Subject: |
[GNUnet-SVN] r11355 - gnunet/src/datastore |
Date: |
Wed, 12 May 2010 17:01:03 +0200 |
Author: grothoff
Date: 2010-05-12 17:01:03 +0200 (Wed, 12 May 2010)
New Revision: 11355
Modified:
gnunet/src/datastore/datastore_api.c
gnunet/src/datastore/plugin_datastore_sqlite.c
gnunet/src/datastore/test_datastore_api.c
gnunet/src/datastore/test_datastore_api_data.conf
Log:
adding support for request queueing to datastore API
Modified: gnunet/src/datastore/datastore_api.c
===================================================================
--- gnunet/src/datastore/datastore_api.c 2010-05-12 14:30:04 UTC (rev
11354)
+++ gnunet/src/datastore/datastore_api.c 2010-05-12 15:01:03 UTC (rev
11355)
@@ -67,12 +67,12 @@
/**
* Function to call after transmission of the request.
*/
- GNUNET_DATASTORE_ContinuationWithStatus cont;
+ GNUNET_DATASTORE_ContinuationWithStatus contX;
/**
* Closure for 'cont'.
*/
- void *cont_cls;
+ void *cont_clsX;
/**
* Task for timeout signalling.
@@ -296,8 +296,6 @@
GNUNET_CONTAINER_DLL_remove (h->queue_head,
h->queue_tail,
qe);
- if (qe->cont != NULL)
- qe->cont (qe->cont_cls, GNUNET_NO, _("timeout"));
if (qe->response_proc != NULL)
qe->response_proc (qe, NULL);
GNUNET_free (qe);
@@ -314,8 +312,6 @@
* @param max_queue_size at what queue size should this request be dropped
* (if other requests of higher priority are in the queue)
* @param timeout timeout for the operation
- * @param cont continuation to call when done with request transmission (can
be NULL)
- * @param cont_cls closure for cont
* @param response_proc function to call with replies (can be NULL)
* @param client_ctx client context (NOT a closure for response_proc)
* @return NULL if the queue is full (and this entry was dropped)
@@ -326,8 +322,6 @@
unsigned int queue_priority,
unsigned int max_queue_size,
struct GNUNET_TIME_Relative timeout,
- GNUNET_DATASTORE_ContinuationWithStatus cont,
- void *cont_cls,
GNUNET_CLIENT_MessageHandler response_proc,
void *client_ctx)
{
@@ -368,8 +362,6 @@
ret->h = h;
ret->response_proc = response_proc;
ret->client_ctx = client_ctx;
- ret->cont = cont;
- ret->cont_cls = cont_cls;
ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
timeout,
&timeout_queue_entry,
@@ -391,8 +383,6 @@
pos);
GNUNET_SCHEDULER_cancel (h->sched,
pos->task);
- if (pos->cont != NULL)
- pos->cont (pos->cont_cls, GNUNET_NO, _("Message queue full"));
if (pos->response_proc != NULL)
pos->response_proc (pos, NULL);
GNUNET_free (pos);
@@ -494,26 +484,13 @@
}
memcpy (buf, &qe[1], msize);
qe->was_transmitted = GNUNET_YES;
- if (qe->cont != NULL)
- qe->cont (qe->cont_cls, GNUNET_OK, NULL);
GNUNET_SCHEDULER_cancel (h->sched,
qe->task);
qe->task = GNUNET_SCHEDULER_NO_TASK;
- if (qe->response_proc == NULL)
- {
- GNUNET_CONTAINER_DLL_remove (h->queue_head,
- h->queue_tail,
- qe);
- GNUNET_free (qe);
- process_queue (h);
- }
- else
- {
- GNUNET_CLIENT_receive (h->client,
- qe->response_proc,
- qe,
- GNUNET_TIME_absolute_get_remaining (qe->timeout));
- }
+ GNUNET_CLIENT_receive (h->client,
+ qe->response_proc,
+ qe,
+ GNUNET_TIME_absolute_get_remaining (qe->timeout));
return msize;
}
@@ -546,77 +523,8 @@
}
-/**
- * Store an item in the datastore. If the item is already present,
- * the priorities are summed up and the higher expiration time and
- * lower anonymity level is used.
- *
- * @param h handle to the datastore
- * @param rid reservation ID to use (from "reserve"); use 0 if no
- * prior reservation was made
- * @param key key for the value
- * @param size number of bytes in data
- * @param data content stored
- * @param type type of the content
- * @param priority priority of the content
- * @param anonymity anonymity-level for the content
- * @param expiration expiration time for the content
- * @param queue_priority ranking of this request in the priority queue
- * @param max_queue_size at what queue size should this request be dropped
- * (if other requests of higher priority are in the queue)
- * @param timeout timeout for the operation
- * @param cont continuation to call when done
- * @param cont_cls closure for cont
- */
-void
-GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
- int rid,
- const GNUNET_HashCode * key,
- uint32_t size,
- const void *data,
- enum GNUNET_BLOCK_Type type,
- uint32_t priority,
- uint32_t anonymity,
- struct GNUNET_TIME_Absolute expiration,
- unsigned int queue_priority,
- unsigned int max_queue_size,
- struct GNUNET_TIME_Relative timeout,
- GNUNET_DATASTORE_ContinuationWithStatus cont,
- void *cont_cls)
-{
- struct QueueEntry *qe;
- struct DataMessage *dm;
- size_t msize;
-#if DEBUG_DATASTORE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Asked to put %u bytes of data under key `%s'\n",
- size,
- GNUNET_h2s (key));
-#endif
- msize = sizeof(struct DataMessage) + size;
- GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
- qe = make_queue_entry (h, msize,
- queue_priority, max_queue_size, timeout,
- cont, cont_cls, NULL, NULL);
- if (qe == NULL)
- return;
- dm = (struct DataMessage* ) &qe[1];
- dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
- dm->header.size = htons(msize);
- dm->rid = htonl(rid);
- dm->size = htonl(size);
- dm->type = htonl(type);
- dm->priority = htonl(priority);
- dm->anonymity = htonl(anonymity);
- dm->uid = GNUNET_htonll(0);
- dm->expiration = GNUNET_TIME_absolute_hton(expiration);
- dm->key = *key;
- memcpy (&dm[1], data, size);
- process_queue (h);
-}
-
/**
* Context for processing status messages.
*/
@@ -636,6 +544,20 @@
/**
+ * Dummy continuation used to do nothing (but be non-zero).
+ *
+ * @param cls closure
+ * @param result result
+ * @param emsg error message
+ */
+static void
+drop_status_cont (void *cls, int result, const char *emsg)
+{
+ /* do nothing */
+}
+
+
+/**
* Type of a function to call when we receive a message
* from the service.
*
@@ -711,6 +633,81 @@
/**
+ * Store an item in the datastore. If the item is already present,
+ * the priorities are summed up and the higher expiration time and
+ * lower anonymity level is used.
+ *
+ * @param h handle to the datastore
+ * @param rid reservation ID to use (from "reserve"); use 0 if no
+ * prior reservation was made
+ * @param key key for the value
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param queue_priority ranking of this request in the priority queue
+ * @param max_queue_size at what queue size should this request be dropped
+ * (if other requests of higher priority are in the queue)
+ * @param timeout timeout for the operation
+ * @param cont continuation to call when done
+ * @param cont_cls closure for cont
+ */
+void
+GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
+ int rid,
+ const GNUNET_HashCode * key,
+ uint32_t size,
+ const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ struct GNUNET_TIME_Absolute expiration,
+ unsigned int queue_priority,
+ unsigned int max_queue_size,
+ struct GNUNET_TIME_Relative timeout,
+ GNUNET_DATASTORE_ContinuationWithStatus cont,
+ void *cont_cls)
+{
+ struct StatusContext *scont;
+ struct QueueEntry *qe;
+ struct DataMessage *dm;
+ size_t msize;
+
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Asked to put %u bytes of data under key `%s'\n",
+ size,
+ GNUNET_h2s (key));
+#endif
+ msize = sizeof(struct DataMessage) + size;
+ GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
+ scont = GNUNET_malloc (sizeof (struct StatusContext));
+ scont->cont = cont;
+ scont->cont_cls = cont_cls;
+ qe = make_queue_entry (h, msize,
+ queue_priority, max_queue_size, timeout,
+ &process_status_message, scont);
+ if (qe == NULL)
+ return;
+ dm = (struct DataMessage* ) &qe[1];
+ dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
+ dm->header.size = htons(msize);
+ dm->rid = htonl(rid);
+ dm->size = htonl(size);
+ dm->type = htonl(type);
+ dm->priority = htonl(priority);
+ dm->anonymity = htonl(anonymity);
+ dm->uid = GNUNET_htonll(0);
+ dm->expiration = GNUNET_TIME_absolute_hton(expiration);
+ dm->key = *key;
+ memcpy (&dm[1], data, size);
+ process_queue (h);
+}
+
+
+/**
* Reserve space in the datastore. This function should be used
* to avoid "out of space" failures during a longer sequence of "put"
* operations (for example, when a file is being inserted).
@@ -740,6 +737,8 @@
struct ReserveMessage *rm;
struct StatusContext *scont;
+ if (cont == NULL)
+ cont = &drop_status_cont;
#if DEBUG_DATASTORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Asked to reserve %llu bytes of data and %u entries'\n",
@@ -751,7 +750,7 @@
scont->cont_cls = cont_cls;
qe = make_queue_entry (h, sizeof(struct ReserveMessage),
queue_priority, max_queue_size, timeout,
- NULL, NULL, &process_status_message, scont);
+ &process_status_message, scont);
if (qe == NULL)
return;
rm = (struct ReserveMessage*) &qe[1];
@@ -794,18 +793,19 @@
struct ReleaseReserveMessage *rrm;
struct StatusContext *scont;
+ if (cont == NULL)
+ cont = &drop_status_cont;
#if DEBUG_DATASTORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Asked to reserve %llu bytes of data and %u entries'\n",
- (unsigned long long) amount,
- (unsigned int) entries);
+ "Asked to release reserve %d\n",
+ rid);
#endif
scont = GNUNET_malloc (sizeof (struct StatusContext));
scont->cont = cont;
scont->cont_cls = cont_cls;
qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
queue_priority, max_queue_size, timeout,
- NULL, NULL, &process_status_message, scont);
+ &process_status_message, scont);
if (qe == NULL)
return;
rrm = (struct ReleaseReserveMessage*) &qe[1];
@@ -845,6 +845,8 @@
struct UpdateMessage *um;
struct StatusContext *scont;
+ if (cont == NULL)
+ cont = &drop_status_cont;
#if DEBUG_DATASTORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Asked to update entry %llu raising priority by %u and expiration
to %llu\n",
@@ -857,7 +859,7 @@
scont->cont_cls = cont_cls;
qe = make_queue_entry (h, sizeof(struct UpdateMessage),
queue_priority, max_queue_size, timeout,
- NULL, NULL, &process_status_message, scont);
+ &process_status_message, scont);
if (qe == NULL)
return;
um = (struct UpdateMessage*) &qe[1];
@@ -904,6 +906,8 @@
size_t msize;
struct StatusContext *scont;
+ if (cont == NULL)
+ cont = &drop_status_cont;
#if DEBUG_DATASTORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Asked to remove %u bytes under key `%s'\n",
@@ -917,7 +921,7 @@
GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
qe = make_queue_entry (h, msize,
queue_priority, max_queue_size, timeout,
- NULL, NULL, &process_status_message, scont);
+ &process_status_message, scont);
if (qe == NULL)
return;
dm = (struct DataMessage*) &qe[1];
@@ -952,13 +956,6 @@
*/
void *iter_cls;
- /**
- * Automatically get the next result, or wait for a call to
- * GNUNET_DATASTORE_get_next? GNUNET_YES means we automatically
- * get the next one (if there are more).
- */
- int get_next;
-
};
@@ -1048,11 +1045,6 @@
ntohl(dm->anonymity),
GNUNET_TIME_absolute_ntoh(dm->expiration),
GNUNET_ntohll(dm->uid));
- if (rc->get_next == GNUNET_YES)
- GNUNET_CLIENT_receive (h->client,
- qe->response_proc,
- qe,
- GNUNET_TIME_absolute_get_remaining (qe->timeout));
}
@@ -1089,10 +1081,9 @@
rcont = GNUNET_malloc (sizeof (struct ResultContext));
rcont->iter = iter;
rcont->iter_cls = iter_cls;
- rcont->get_next = GNUNET_YES;
qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
queue_priority, max_queue_size, timeout,
- NULL, NULL, &process_result_message, rcont);
+ &process_result_message, rcont);
if (qe == NULL)
return;
m = (struct GNUNET_MessageHeader*) &qe[1];
@@ -1144,10 +1135,9 @@
rcont = GNUNET_malloc (sizeof (struct ResultContext));
rcont->iter = iter;
rcont->iter_cls = iter_cls;
- rcont->get_next = GNUNET_NO;
qe = make_queue_entry (h, sizeof(struct GetMessage),
queue_priority, max_queue_size, timeout,
- NULL, NULL, &process_result_message, rcont);
+ &process_result_message, rcont);
if (qe == NULL)
return;
gm = (struct GetMessage*) &qe[1];
@@ -1183,7 +1173,6 @@
GNUNET_assert (NULL != qe);
GNUNET_assert (&process_result_message == qe->response_proc);
- GNUNET_assert (rc->get_next == GNUNET_NO);
if (GNUNET_YES == more)
{
GNUNET_CLIENT_receive (h->client,
Modified: gnunet/src/datastore/plugin_datastore_sqlite.c
===================================================================
--- gnunet/src/datastore/plugin_datastore_sqlite.c 2010-05-12 14:30:04 UTC
(rev 11354)
+++ gnunet/src/datastore/plugin_datastore_sqlite.c 2010-05-12 15:01:03 UTC
(rev 11355)
@@ -258,11 +258,21 @@
"datastore-sqlite");
return GNUNET_SYSERR;
}
- if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
+ if (GNUNET_OK != GNUNET_DISK_file_test (afsdir))
{
- GNUNET_break (0);
- GNUNET_free (afsdir);
- return GNUNET_SYSERR;
+ if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
+ {
+ GNUNET_break (0);
+ GNUNET_free (afsdir);
+ return GNUNET_SYSERR;
+ }
+ /* database is new or got deleted, reset payload to zero! */
+ if (plugin->stat_get != NULL)
+ {
+ GNUNET_STATISTICS_get_cancel (plugin->stat_get);
+ plugin->stat_get = NULL;
+ }
+ plugin->payload = 0;
}
plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
#ifdef ENABLE_NLS
@@ -779,6 +789,9 @@
LOG_SQLITE (plugin, msg,
GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
"sqlite3_step");
sqlite3_reset (stmt);
+ database_shutdown (plugin);
+ database_setup (plugin->env->cfg,
+ plugin);
return GNUNET_SYSERR;
}
if (SQLITE_OK != sqlite3_reset (stmt))
Modified: gnunet/src/datastore/test_datastore_api.c
===================================================================
--- gnunet/src/datastore/test_datastore_api.c 2010-05-12 14:30:04 UTC (rev
11354)
+++ gnunet/src/datastore/test_datastore_api.c 2010-05-12 15:01:03 UTC (rev
11355)
@@ -109,7 +109,8 @@
RP_GET_MULTIPLE_DONE,
RP_UPDATE,
RP_UPDATE_VALIDATE,
- RP_UPDATE_DONE
+ RP_UPDATE_DONE,
+ RP_ERROR
};
@@ -139,9 +140,13 @@
{
struct CpsRunContext *crc = cls;
if (GNUNET_OK != success)
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "%s\n", msg);
- GNUNET_assert (GNUNET_OK == success);
+ {
+ ok = 42;
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "%s\n", msg);
+ GNUNET_SCHEDULER_shutdown (crc->sched);
+ return;
+ }
GNUNET_free_non_null (crc->data);
crc->data = NULL;
GNUNET_SCHEDULER_add_continuation (crc->sched,
@@ -221,7 +226,16 @@
struct CpsRunContext *crc = cls;
if (key == NULL)
{
- crc->phase = RP_DO_DEL;
+ if (crc->data == NULL)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Content not found!\n");
+ crc->phase = RP_ERROR;
+ }
+ else
+ {
+ crc->phase = RP_DO_DEL;
+ }
GNUNET_SCHEDULER_add_continuation (crc->sched,
&run_continuation,
crc,
@@ -386,6 +400,7 @@
"DEL",
crc->i);
#endif
+ crc->data = NULL;
GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
GNUNET_DATASTORE_get (datastore,
&crc->key,
@@ -515,6 +530,12 @@
GNUNET_DATASTORE_disconnect (datastore, GNUNET_YES);
GNUNET_free (crc);
ok = 0;
+ break;
+ case RP_ERROR:
+ GNUNET_DATASTORE_disconnect (datastore, GNUNET_YES);
+ GNUNET_free (crc);
+ ok = 43;
+ break;
}
}
Modified: gnunet/src/datastore/test_datastore_api_data.conf
===================================================================
--- gnunet/src/datastore/test_datastore_api_data.conf 2010-05-12 14:30:04 UTC
(rev 11354)
+++ gnunet/src/datastore/test_datastore_api_data.conf 2010-05-12 15:01:03 UTC
(rev 11355)
@@ -30,6 +30,7 @@
# REJECT_FROM =
# REJECT_FROM6 =
# PREFIX =
+# DEBUG = YES
#PREFIX = valgrind --tool=memcheck --leak-check=yes
#BINARY = /home/grothoff/bin/gnunet-service-datastore
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r11355 - gnunet/src/datastore,
gnunet <=