[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r35419 - in gnunet/src: datastore include
From: |
gnunet |
Subject: |
[GNUnet-SVN] r35419 - in gnunet/src: datastore include |
Date: |
Sat, 21 Mar 2015 04:38:29 +0100 |
Author: amatus
Date: 2015-03-21 04:38:29 +0100 (Sat, 21 Mar 2015)
New Revision: 35419
Modified:
gnunet/src/datastore/gnunet-service-datastore.c
gnunet/src/datastore/perf_plugin_datastore.c
gnunet/src/datastore/plugin_datastore_heap.c
gnunet/src/datastore/plugin_datastore_mysql.c
gnunet/src/datastore/plugin_datastore_postgres.c
gnunet/src/datastore/plugin_datastore_sqlite.c
gnunet/src/datastore/plugin_datastore_template.c
gnunet/src/datastore/test_plugin_datastore.c
gnunet/src/include/gnunet_datastore_plugin.h
Log:
Convert datastore plugin API to asynchronous
Modified: gnunet/src/datastore/gnunet-service-datastore.c
===================================================================
--- gnunet/src/datastore/gnunet-service-datastore.c 2015-03-20 23:44:57 UTC
(rev 35418)
+++ gnunet/src/datastore/gnunet-service-datastore.c 2015-03-21 03:38:29 UTC
(rev 35419)
@@ -829,38 +829,24 @@
};
-/**
- * Actually put the data message.
- *
- * @param client sender of the message
- * @param dm message with the data to store
- */
static void
-execute_put (struct GNUNET_SERVER_Client *client, const struct DataMessage *dm)
+put_continuation (void *cls, const struct GNUNET_HashCode *key, uint32_t size,
+ int status, char *msg)
{
- uint32_t size;
- char *msg;
- int ret;
+ struct GNUNET_SERVER_Client *client = cls;
- size = ntohl (dm->size);
- msg = NULL;
- ret =
- plugin->api->put (plugin->api->cls, &dm->key, size, &dm[1],
- ntohl (dm->type), ntohl (dm->priority),
- ntohl (dm->anonymity), ntohl (dm->replication),
- GNUNET_TIME_absolute_ntoh (dm->expiration), &msg);
- if (GNUNET_OK == ret)
+ if (GNUNET_OK == status)
{
GNUNET_STATISTICS_update (stats,
gettext_noop ("# bytes stored"), size,
GNUNET_YES);
- GNUNET_CONTAINER_bloomfilter_add (filter, &dm->key);
+ GNUNET_CONTAINER_bloomfilter_add (filter, key);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Successfully stored %u bytes of type %u under key `%s'\n",
- size, ntohl (dm->type), GNUNET_h2s (&dm->key));
+ "Successfully stored %u bytes under key `%s'\n",
+ size, GNUNET_h2s (key));
}
- transmit_status (client, ret, msg);
- GNUNET_free_non_null (msg);
+ transmit_status (client, status, msg);
+ GNUNET_SERVER_client_drop (client);
if (quota - reserved - cache_size < payload)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -872,7 +858,34 @@
}
}
+/**
+ * Actually put the data message.
+ *
+ * @param client sender of the message
+ * @param dm message with the data to store
+ */
+static void
+execute_put (struct GNUNET_SERVER_Client *client, const struct DataMessage *dm)
+{
+ GNUNET_SERVER_client_keep (client);
+ plugin->api->put (plugin->api->cls, &dm->key, ntohl (dm->size), &dm[1],
+ ntohl (dm->type), ntohl (dm->priority),
+ ntohl (dm->anonymity), ntohl (dm->replication),
+ GNUNET_TIME_absolute_ntoh (dm->expiration),
+ &put_continuation, client);
+}
+
+static void
+check_present_continuation (void *cls, int status, char *msg)
+{
+ struct GNUNET_SERVER_Client *client = cls;
+
+ transmit_status (client, GNUNET_NO, NULL);
+ GNUNET_SERVER_client_drop (client);
+}
+
+
/**
* Function that will check if the given datastore entry
* matches the put and if none match executes the put.
@@ -921,9 +934,13 @@
expiration.abs_value_us))
plugin->api->update (plugin->api->cls, uid,
(int32_t) ntohl (dm->priority),
- GNUNET_TIME_absolute_ntoh (dm->expiration), NULL);
- transmit_status (pc->client, GNUNET_NO, NULL);
- GNUNET_SERVER_client_drop (pc->client);
+ GNUNET_TIME_absolute_ntoh (dm->expiration),
+ check_present_continuation, pc->client);
+ else
+ {
+ transmit_status (pc->client, GNUNET_NO, NULL);
+ GNUNET_SERVER_client_drop (pc->client);
+ }
GNUNET_free (pc);
}
else
@@ -1051,6 +1068,16 @@
}
+static void
+update_continuation (void *cls, int status, char *msg)
+{
+ struct GNUNET_SERVER_Client *client = cls;
+
+ transmit_status (client, status, msg);
+ GNUNET_SERVER_client_drop (client);
+}
+
+
/**
* Handle UPDATE-message.
*
@@ -1063,21 +1090,17 @@
const struct GNUNET_MessageHeader *message)
{
const struct UpdateMessage *msg;
- int ret;
- char *emsg;
GNUNET_STATISTICS_update (stats, gettext_noop ("# UPDATE requests received"),
1, GNUNET_NO);
msg = (const struct UpdateMessage *) message;
- emsg = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request for %llu\n",
"UPDATE", (unsigned long long) GNUNET_ntohll (msg->uid));
- ret =
- plugin->api->update (plugin->api->cls, GNUNET_ntohll (msg->uid),
- (int32_t) ntohl (msg->priority),
- GNUNET_TIME_absolute_ntoh (msg->expiration), &emsg);
- transmit_status (client, ret, emsg);
- GNUNET_free_non_null (emsg);
+ GNUNET_SERVER_client_keep (client);
+ plugin->api->update (plugin->api->cls, GNUNET_ntohll (msg->uid),
+ (int32_t) ntohl (msg->priority),
+ GNUNET_TIME_absolute_ntoh (msg->expiration),
+ update_continuation, client);
}
@@ -1336,6 +1359,29 @@
}
+static const struct GNUNET_SERVER_MessageHandler handlers[] = {
+ {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE,
+ sizeof (struct ReserveMessage)},
+ {&handle_release_reserve, NULL,
+ GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE,
+ sizeof (struct ReleaseReserveMessage)},
+ {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0},
+ {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE,
+ sizeof (struct UpdateMessage)},
+ {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0},
+ {&handle_get_replication, NULL,
+ GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
+ sizeof (struct GNUNET_MessageHeader)},
+ {&handle_get_zero_anonymity, NULL,
+ GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
+ sizeof (struct GetZeroAnonymityMessage)},
+ {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0},
+ {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
+ sizeof (struct GNUNET_MessageHeader)},
+ {NULL, NULL, 0, 0}
+};
+
+
/**
* Adds a given @a key to the bloomfilter in @a cls @a count times.
*
@@ -1350,6 +1396,19 @@
{
struct GNUNET_CONTAINER_BloomFilter *bf = cls;
+ if (NULL == key)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ _("Bloomfilter construction complete.\n"));
+ GNUNET_SERVER_add_handlers (server, handlers);
+ GNUNET_SERVER_resume (server);
+ expired_kill_task
+ = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
+ &delete_expired,
+ NULL);
+ return;
+ }
+
while (0 < count--)
GNUNET_CONTAINER_bloomfilter_add (bf, key);
}
@@ -1365,27 +1424,6 @@
static void
process_stat_done (void *cls, int success)
{
- static const struct GNUNET_SERVER_MessageHandler handlers[] = {
- {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE,
- sizeof (struct ReserveMessage)},
- {&handle_release_reserve, NULL,
- GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE,
- sizeof (struct ReleaseReserveMessage)},
- {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0},
- {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE,
- sizeof (struct UpdateMessage)},
- {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0},
- {&handle_get_replication, NULL,
- GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
- sizeof (struct GNUNET_MessageHeader)},
- {&handle_get_zero_anonymity, NULL,
- GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
- sizeof (struct GetZeroAnonymityMessage)},
- {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0},
- {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
- sizeof (struct GNUNET_MessageHeader)},
- {NULL, NULL, 0, 0}
- };
stat_get = NULL;
plugin = load_plugin ();
@@ -1411,9 +1449,12 @@
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
_("Rebuilding bloomfilter. Please be patient.\n"));
if (NULL != plugin->api->get_keys)
+ {
plugin->api->get_keys (plugin->api->cls,
&add_key_to_bloomfilter,
filter);
+ return;
+ }
else
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
_("Plugin does not support get_keys function. Please
fix!\n"));
Modified: gnunet/src/datastore/perf_plugin_datastore.c
===================================================================
--- gnunet/src/datastore/perf_plugin_datastore.c 2015-03-20 23:44:57 UTC
(rev 35418)
+++ gnunet/src/datastore/perf_plugin_datastore.c 2015-03-21 03:38:29 UTC
(rev 35419)
@@ -99,15 +99,60 @@
static void
-putValue (struct GNUNET_DATASTORE_PluginFunctions *api, int i, int k)
+test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+static void
+put_continuation (void *cls, const struct GNUNET_HashCode *key,
+ uint32_t size, int status, char *msg)
{
+ struct CpsRunContext *crc = cls;
+
+ if (GNUNET_OK != status)
+ {
+ FPRINTF (stderr, "ERROR: `%s'\n", msg);
+ }
+ else
+ {
+ stored_bytes += size;
+ stored_ops++;
+ stored_entries++;
+ }
+ GNUNET_SCHEDULER_add_now (&test, crc);
+}
+
+static void
+do_put (struct CpsRunContext *crc)
+{
char value[65536];
size_t size;
static struct GNUNET_HashCode key;
- static int ic;
- char *msg;
+ static int i;
unsigned int prio;
+ if (0 == i)
+ crc->start = GNUNET_TIME_absolute_get ();
+ if (PUT_10 == i)
+ {
+ i = 0;
+ crc->end = GNUNET_TIME_absolute_get ();
+ {
+ printf ("%s took %s for %llu items\n", "Storing an item",
+ GNUNET_STRINGS_relative_time_to_string
(GNUNET_TIME_absolute_get_difference (crc->start,
+
crc->end),
+ GNUNET_YES),
+ PUT_10);
+ if (PUT_10 > 0)
+ GAUGER (category, "Storing an item",
+ (crc->end.abs_value_us - crc->start.abs_value_us) / 1000LL /
PUT_10,
+ "ms/item");
+ }
+ crc->i++;
+ crc->start = GNUNET_TIME_absolute_get ();
+ crc->phase++;
+ GNUNET_SCHEDULER_add_now (&test, crc);
+ return;
+ }
/* most content is 32k */
size = 32 * 1024;
if (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 16) == 0) /* but
some of it is less! */
@@ -120,34 +165,23 @@
memset (value, i, size);
if (i > 255)
memset (value, i - 255, size / 2);
- value[0] = k;
+ value[0] = crc->i;
memcpy (&value[4], &i, sizeof (i));
- msg = NULL;
prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 100);
- if (GNUNET_OK != api->put (api->cls, &key, size, value, 1 + i % 4 /* type */
,
- prio, i % 4 /* anonymity */ ,
- 0 /* replication */ ,
- GNUNET_TIME_relative_to_absolute
- (GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_MILLISECONDS,
- 60 * 60 * 60 * 1000 +
- GNUNET_CRYPTO_random_u32
- (GNUNET_CRYPTO_QUALITY_WEAK, 1000))), &msg))
- {
- FPRINTF (stderr, "ERROR: `%s'\n", msg);
- GNUNET_free_non_null (msg);
- return;
- }
- ic++;
- stored_bytes += size;
- stored_ops++;
- stored_entries++;
+ crc->api->put (crc->api->cls, &key, size, value, 1 + i % 4 /* type */ ,
+ prio, i % 4 /* anonymity */ ,
+ 0 /* replication */ ,
+ GNUNET_TIME_relative_to_absolute
+ (GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_MILLISECONDS,
+ 60 * 60 * 60 * 1000 +
+ GNUNET_CRYPTO_random_u32
+ (GNUNET_CRYPTO_QUALITY_WEAK, 1000))),
+ put_continuation, crc);
+ i++;
}
-static void
-test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
-
static int
iterate_zeros (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
const void *data, enum GNUNET_BLOCK_Type type, uint32_t
priority,
@@ -342,7 +376,6 @@
test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct CpsRunContext *crc = cls;
- int j;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
{
@@ -361,25 +394,7 @@
&cleaning_task, crc);
break;
case RP_PUT:
- crc->start = GNUNET_TIME_absolute_get ();
- for (j = 0; j < PUT_10; j++)
- putValue (crc->api, j, crc->i);
- crc->end = GNUNET_TIME_absolute_get ();
- {
- printf ("%s took %s for %llu items\n", "Storing an item",
- GNUNET_STRINGS_relative_time_to_string
(GNUNET_TIME_absolute_get_difference (crc->start,
-
crc->end),
- GNUNET_YES),
- PUT_10);
- if (PUT_10 > 0)
- GAUGER (category, "Storing an item",
- (crc->end.abs_value_us - crc->start.abs_value_us) / 1000LL /
PUT_10,
- "ms/item");
- }
- crc->i++;
- crc->start = GNUNET_TIME_absolute_get ();
- crc->phase++;
- GNUNET_SCHEDULER_add_now (&test, crc);
+ do_put (crc);
break;
case RP_REP_GET:
crc->api->get_replication (crc->api->cls, &replication_get, crc);
Modified: gnunet/src/datastore/plugin_datastore_heap.c
===================================================================
--- gnunet/src/datastore/plugin_datastore_heap.c 2015-03-20 23:44:57 UTC
(rev 35418)
+++ gnunet/src/datastore/plugin_datastore_heap.c 2015-03-21 03:38:29 UTC
(rev 35419)
@@ -206,10 +206,10 @@
* @param anonymity anonymity-level for the content
* @param replication replication-level for the content
* @param expiration expiration time for the content
- * @param msg set to error message
- * @return GNUNET_OK on success
+ * @param cont continuation called with success or failure status
+ * @param cont_cls continuation closure
*/
-static int
+static void
heap_plugin_put (void *cls,
const struct GNUNET_HashCode * key,
uint32_t size,
@@ -217,7 +217,9 @@
enum GNUNET_BLOCK_Type type,
uint32_t priority, uint32_t anonymity,
uint32_t replication,
- struct GNUNET_TIME_Absolute expiration, char **msg)
+ struct GNUNET_TIME_Absolute expiration,
+ PluginPutCont cont,
+ void *cont_cls)
{
struct Plugin *plugin = cls;
struct Value *value;
@@ -267,7 +269,7 @@
value,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
plugin->size += size;
- return GNUNET_OK;
+ cont (cont_cls, key, size, GNUNET_OK, NULL);
}
@@ -615,14 +617,16 @@
* @param expire new expiration time should be the
* MAX of any existing expiration time and
* this value
- * @param msg set to error message
- * @return GNUNET_OK on success
+ * @param cont continuation called with success or failure status
+ * @param cons_cls continuation closure
*/
-static int
+static void
heap_plugin_update (void *cls,
uint64_t uid,
int delta,
- struct GNUNET_TIME_Absolute expire, char **msg)
+ struct GNUNET_TIME_Absolute expire,
+ PluginUpdateCont cont,
+ void *cont_cls)
{
struct Plugin *plugin = cls;
struct Value *value;
@@ -640,7 +644,7 @@
value->priority = 0;
else
value->priority += delta;
- return GNUNET_OK;
+ cont (cont_cls, GNUNET_OK, NULL);
}
@@ -778,6 +782,7 @@
GNUNET_CONTAINER_multihashmap_iterate (plugin->keyvalue,
&return_value,
&gac);
+ proc (proc_cls, NULL, 0);
}
Modified: gnunet/src/datastore/plugin_datastore_mysql.c
===================================================================
--- gnunet/src/datastore/plugin_datastore_mysql.c 2015-03-20 23:44:57 UTC
(rev 35418)
+++ gnunet/src/datastore/plugin_datastore_mysql.c 2015-03-21 03:38:29 UTC
(rev 35419)
@@ -280,14 +280,15 @@
* @param anonymity anonymity-level for the content
* @param replication replication-level for the content
* @param expiration expiration time for the content
- * @param msg set to error message
- * @return GNUNET_OK on success
+ * @param cont continuation called with success or failure status
+ * @param cont_cls continuation closure
*/
-static int
+static void
mysql_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
const void *data, enum GNUNET_BLOCK_Type type,
uint32_t priority, uint32_t anonymity, uint32_t replication,
- struct GNUNET_TIME_Absolute expiration, char **msg)
+ struct GNUNET_TIME_Absolute expiration, PluginPutCont cont,
+ void *cont_cls)
{
struct Plugin *plugin = cls;
unsigned int irepl = replication;
@@ -305,7 +306,8 @@
if (size > MAX_DATUM_SIZE)
{
GNUNET_break (0);
- return GNUNET_SYSERR;
+ cont (cont_cls, key, size, GNUNET_SYSERR, _("Data too large"));
+ return;
}
hashSize = sizeof (struct GNUNET_HashCode);
hashSize2 = sizeof (struct GNUNET_HashCode);
@@ -322,13 +324,16 @@
MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2,
MYSQL_TYPE_BLOB, data, lsize, &lsize, -1))
- return GNUNET_SYSERR;
+ {
+ cont (cont_cls, key, size, GNUNET_SYSERR, _("MySQL statement run
failure"));
+ return;
+ }
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Inserted value `%s' with size %u into gn090 table\n",
GNUNET_h2s (key), (unsigned int) size);
if (size > 0)
plugin->env->duc (plugin->env->cls, size);
- return GNUNET_OK;
+ cont (cont_cls, key, size, GNUNET_OK, NULL);
}
@@ -352,12 +357,13 @@
* @param expire new expiration time should be the
* MAX of any existing expiration time and
* this value
- * @param msg set to error message
- * @return GNUNET_OK on success
+ * @param cont continuation called with success or failure status
+ * @param cons_cls continuation closure
*/
-static int
+static void
mysql_plugin_update (void *cls, uint64_t uid, int delta,
- struct GNUNET_TIME_Absolute expire, char **msg)
+ struct GNUNET_TIME_Absolute expire,
+ PluginUpdateCont cont, void *cont_cls)
{
struct Plugin *plugin = cls;
unsigned long long vkey = uid;
@@ -379,7 +385,7 @@
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to update value %llu\n",
vkey);
}
- return ret;
+ cont (cont_cls, ret, NULL);
}
@@ -778,6 +784,7 @@
if (statement == NULL)
{
GNUNET_MYSQL_statements_invalidate (plugin->mc);
+ proc (proc_cls, NULL, 0);
return;
}
if (mysql_stmt_prepare (statement, query, strlen (query)))
@@ -785,6 +792,7 @@
GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, "mysql",
_("Failed to prepare statement `%s'\n"), query);
GNUNET_MYSQL_statements_invalidate (plugin->mc);
+ proc (proc_cls, NULL, 0);
return;
}
GNUNET_assert (proc != NULL);
@@ -795,6 +803,7 @@
"mysql_stmt_execute", query, __FILE__, __LINE__,
mysql_stmt_error (statement));
GNUNET_MYSQL_statements_invalidate (plugin->mc);
+ proc (proc_cls, NULL, 0);
return;
}
memset (cbind, 0, sizeof (cbind));
@@ -810,6 +819,7 @@
"mysql_stmt_bind_result", __FILE__, __LINE__,
mysql_stmt_error (statement));
GNUNET_MYSQL_statements_invalidate (plugin->mc);
+ proc (proc_cls, NULL, 0);
return;
}
while (0 == (ret = mysql_stmt_fetch (statement)))
@@ -817,6 +827,7 @@
if (sizeof (struct GNUNET_HashCode) == length)
proc (proc_cls, &key, 1);
}
+ proc (proc_cls, NULL, 0);
if (ret != MYSQL_NO_DATA)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
Modified: gnunet/src/datastore/plugin_datastore_postgres.c
===================================================================
--- gnunet/src/datastore/plugin_datastore_postgres.c 2015-03-20 23:44:57 UTC
(rev 35418)
+++ gnunet/src/datastore/plugin_datastore_postgres.c 2015-03-21 03:38:29 UTC
(rev 35419)
@@ -279,15 +279,16 @@
* @param anonymity anonymity-level for the content
* @param replication replication-level for the content
* @param expiration expiration time for the content
- * @param msg set to error message
- * @return #GNUNET_OK on success
+ * @param cont continuation called with success or failure status
+ * @param cont_cls continuation closure
*/
-static int
+static void
postgres_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t
size,
const void *data, enum GNUNET_BLOCK_Type type,
uint32_t priority, uint32_t anonymity,
uint32_t replication,
- struct GNUNET_TIME_Absolute expiration, char **msg)
+ struct GNUNET_TIME_Absolute expiration, PluginPutCont
cont,
+ void *cont_cls)
{
struct Plugin *plugin = cls;
struct GNUNET_HashCode vhash;
@@ -326,12 +327,15 @@
paramFormats, 1);
if (GNUNET_OK !=
GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK,
"PQexecPrepared", "put"))
- return GNUNET_SYSERR;
+ {
+ cont (cont_cls, key, size, GNUNET_SYSERR, _("Postgress exec failure"));
+ return;
+ }
PQclear (ret);
plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
"Stored %u bytes in database\n", (unsigned int) size);
- return GNUNET_OK;
+ cont (cont_cls, key, size, GNUNET_OK, NULL);
}
@@ -753,12 +757,13 @@
* @param expire new expiration time should be the
* MAX of any existing expiration time and
* this value
- * @param msg set to error message
- * @return GNUNET_OK on success
+ * @param cont continuation called with success or failure status
+ * @param cons_cls continuation closure
*/
-static int
+static void
postgres_plugin_update (void *cls, uint64_t uid, int delta,
- struct GNUNET_TIME_Absolute expire, char **msg)
+ struct GNUNET_TIME_Absolute expire,
+ PluginUpdateCont cont, void *cont_cls)
{
struct Plugin *plugin = cls;
PGresult *ret;
@@ -783,9 +788,12 @@
paramFormats, 1);
if (GNUNET_OK !=
GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK,
"PQexecPrepared", "update"))
- return GNUNET_SYSERR;
+ {
+ cont (cont_cls, GNUNET_SYSERR, NULL);
+ return;
+ }
PQclear (ret);
- return GNUNET_OK;
+ cont (cont_cls, GNUNET_OK, NULL);
}
@@ -819,6 +827,7 @@
}
}
PQclear (res);
+ proc (proc_cls, NULL, 0);
}
Modified: gnunet/src/datastore/plugin_datastore_sqlite.c
===================================================================
--- gnunet/src/datastore/plugin_datastore_sqlite.c 2015-03-20 23:44:57 UTC
(rev 35418)
+++ gnunet/src/datastore/plugin_datastore_sqlite.c 2015-03-21 03:38:29 UTC
(rev 35419)
@@ -470,10 +470,10 @@
* @param anonymity anonymity-level for the content
* @param replication replication-level for the content
* @param expiration expiration time for the content
- * @param msg set to an error message
- * @return #GNUNET_OK on success
+ * @param cont continuation called with success or failure status
+ * @param cont_cls continuation closure
*/
-static int
+static void
sqlite_plugin_put (void *cls,
const struct GNUNET_HashCode *key,
uint32_t size,
@@ -483,7 +483,8 @@
uint32_t anonymity,
uint32_t replication,
struct GNUNET_TIME_Absolute expiration,
- char **msg)
+ PluginPutCont cont,
+ void *cont_cls)
{
struct Plugin *plugin = cls;
int n;
@@ -491,9 +492,13 @@
sqlite3_stmt *stmt;
struct GNUNET_HashCode vhash;
uint64_t rvalue;
+ char *msg = NULL;
if (size > MAX_ITEM_SIZE)
- return GNUNET_SYSERR;
+ {
+ cont (cont_cls, key, size, GNUNET_SYSERR, _("Data too large"));
+ return;
+ }
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "sqlite",
"Storing in database block with type %u/key `%s'/priority
%u/expiration in %s (%s).\n",
type,
@@ -519,13 +524,15 @@
SQLITE_TRANSIENT)) ||
(SQLITE_OK != sqlite3_bind_blob (stmt, 9, data, size, SQLITE_TRANSIENT)))
{
- LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ LOG_SQLITE (plugin, &msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
"sqlite3_bind_XXXX");
if (SQLITE_OK != sqlite3_reset (stmt))
LOG_SQLITE (plugin, NULL,
GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
"sqlite3_reset");
- return GNUNET_SYSERR;
+ cont (cont_cls, key, size, GNUNET_SYSERR, msg);
+ GNUNET_free_non_null(msg);
+ return;
}
n = sqlite3_step (stmt);
switch (n)
@@ -539,12 +546,12 @@
break;
case SQLITE_BUSY:
GNUNET_break (0);
- LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ LOG_SQLITE (plugin, &msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
"sqlite3_step");
ret = GNUNET_SYSERR;
break;
default:
- LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ LOG_SQLITE (plugin, &msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
"sqlite3_step");
if (SQLITE_OK != sqlite3_reset (stmt))
LOG_SQLITE (plugin, NULL,
@@ -552,12 +559,15 @@
"sqlite3_reset");
database_shutdown (plugin);
database_setup (plugin->env->cfg, plugin);
- return GNUNET_SYSERR;
+ cont (cont_cls, key, size, GNUNET_SYSERR, msg);
+ GNUNET_free_non_null(msg);
+ return;
}
if (SQLITE_OK != sqlite3_reset (stmt))
LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
"sqlite3_reset");
- return ret;
+ cont (cont_cls, key, size, ret, msg);
+ GNUNET_free_non_null(msg);
}
@@ -581,31 +591,34 @@
* @param expire new expiration time should be the
* MAX of any existing expiration time and
* this value
- * @param msg set to an error message
- * @return #GNUNET_OK on success
+ * @param cont continuation called with success or failure status
+ * @param cons_cls continuation closure
*/
-static int
+static void
sqlite_plugin_update (void *cls,
uint64_t uid,
int delta,
struct GNUNET_TIME_Absolute expire,
- char **msg)
+ PluginUpdateCont cont,
+ void *cont_cls)
{
struct Plugin *plugin = cls;
int n;
+ char *msg = NULL;
if ((SQLITE_OK != sqlite3_bind_int (plugin->updPrio, 1, delta)) ||
(SQLITE_OK != sqlite3_bind_int64 (plugin->updPrio, 2,
expire.abs_value_us))
|| (SQLITE_OK != sqlite3_bind_int64 (plugin->updPrio, 3, uid)))
{
- LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ LOG_SQLITE (plugin, &msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
"sqlite3_bind_XXXX");
if (SQLITE_OK != sqlite3_reset (plugin->updPrio))
LOG_SQLITE (plugin, NULL,
GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
"sqlite3_reset");
- return GNUNET_SYSERR;
-
+ cont (cont_cls, GNUNET_SYSERR, msg);
+ GNUNET_free_non_null(msg);
+ return;
}
n = sqlite3_step (plugin->updPrio);
if (SQLITE_OK != sqlite3_reset (plugin->updPrio))
@@ -615,15 +628,21 @@
{
case SQLITE_DONE:
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "sqlite", "Block updated\n");
- return GNUNET_OK;
+ cont (cont_cls, GNUNET_OK, NULL);
+ return;
case SQLITE_BUSY:
- LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_WARNING |
GNUNET_ERROR_TYPE_BULK,
+ LOG_SQLITE (plugin, &msg,
+ GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
"sqlite3_step");
- return GNUNET_NO;
+ cont (cont_cls, GNUNET_NO, msg);
+ GNUNET_free_non_null(msg);
+ return;
default:
- LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ LOG_SQLITE (plugin, &msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
"sqlite3_step");
- return GNUNET_SYSERR;
+ cont (cont_cls, GNUNET_SYSERR, msg);
+ GNUNET_free_non_null(msg);
+ return;
}
}
@@ -1098,6 +1117,7 @@
{
LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
"sqlite_prepare");
+ proc (proc_cls, NULL, 0);
return;
}
while (SQLITE_ROW == (ret = sqlite3_step (stmt)))
@@ -1111,6 +1131,7 @@
if (SQLITE_DONE != ret)
LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR, "sqlite_step");
sqlite3_finalize (stmt);
+ proc (proc_cls, NULL, 0);
}
Modified: gnunet/src/datastore/plugin_datastore_template.c
===================================================================
--- gnunet/src/datastore/plugin_datastore_template.c 2015-03-20 23:44:57 UTC
(rev 35418)
+++ gnunet/src/datastore/plugin_datastore_template.c 2015-03-21 03:38:29 UTC
(rev 35419)
@@ -69,19 +69,19 @@
* @param anonymity anonymity-level for the content
* @param replication replication-level for the content
* @param expiration expiration time for the content
- * @param msg set to error message
- * @return GNUNET_OK on success
+ * @param cont continuation called with success or failure status
+ * @param cont_cls continuation closure
*/
-static int
+static void
template_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t
size,
const void *data, enum GNUNET_BLOCK_Type type,
uint32_t priority, uint32_t anonymity,
uint32_t replication,
- struct GNUNET_TIME_Absolute expiration, char **msg)
+ struct GNUNET_TIME_Absolute expiration, PluginPutCont
cont,
+ void *cont_cls)
{
GNUNET_break (0);
- *msg = GNUNET_strdup ("not implemented");
- return GNUNET_SYSERR;
+ cont (cont_cls, key, size, GNUNET_SYSERR, "not implemented");
}
@@ -170,16 +170,16 @@
* @param expire new expiration time should be the
* MAX of any existing expiration time and
* this value
- * @param msg set to error message
- * @return GNUNET_OK on success
+ * @param cont continuation called with success or failure status
+ * @param cons_cls continuation closure
*/
-static int
+static void
template_plugin_update (void *cls, uint64_t uid, int delta,
- struct GNUNET_TIME_Absolute expire, char **msg)
+ struct GNUNET_TIME_Absolute expire,
+ PluginUpdateCont cont, void *cont_cls)
{
GNUNET_break (0);
- *msg = GNUNET_strdup ("not implemented");
- return GNUNET_SYSERR;
+ cont (cont_cls, GNUNET_SYSERR, "not implemented");
}
@@ -226,6 +226,7 @@
PluginKeyProcessor proc,
void *proc_cls)
{
+ proc (proc_cls, NULL, 0);
}
Modified: gnunet/src/datastore/test_plugin_datastore.c
===================================================================
--- gnunet/src/datastore/test_plugin_datastore.c 2015-03-20 23:44:57 UTC
(rev 35418)
+++ gnunet/src/datastore/test_plugin_datastore.c 2015-03-21 03:38:29 UTC
(rev 35419)
@@ -84,6 +84,35 @@
static void
+test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+static void
+put_continuation (void *cls, const struct GNUNET_HashCode *key,
+ uint32_t size, int status, char *msg)
+{
+ struct CpsRunContext *crc = cls;
+ static unsigned long long os;
+ unsigned long long cs;
+
+ if (GNUNET_OK != status)
+ {
+ FPRINTF (stderr, "ERROR: `%s'\n", msg);
+ }
+ else
+ {
+ crc->api->estimate_size (crc->api->cls, &cs);
+ GNUNET_assert (os <= cs);
+ os = cs;
+ stored_bytes += size;
+ stored_ops++;
+ stored_entries++;
+ }
+ GNUNET_SCHEDULER_add_now (&test, crc);
+}
+
+
+static void
gen_key (int i, struct GNUNET_HashCode * key)
{
memset (key, 0, sizeof (struct GNUNET_HashCode));
@@ -93,14 +122,21 @@
static void
-put_value (struct GNUNET_DATASTORE_PluginFunctions *api, int i, int k)
+do_put (struct CpsRunContext *crc)
{
char value[65536];
size_t size;
struct GNUNET_HashCode key;
- char *msg;
unsigned int prio;
+ static int i;
+ if (PUT_10 == i)
+ {
+ i = 0;
+ crc->phase++;
+ GNUNET_SCHEDULER_add_now (&test, crc);
+ return;
+ }
/* most content is 32k */
size = 32 * 1024;
@@ -113,36 +149,25 @@
memset (value, i, size);
if (i > 255)
memset (value, i - 255, size / 2);
- value[0] = k;
- msg = NULL;
+ value[0] = crc->i;
prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 100);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"putting type %u, anon %u under key %s\n", i + 1, i,
GNUNET_h2s (&key));
- if (GNUNET_OK != api->put (api->cls, &key, size, value, i + 1 /* type */ ,
- prio, i /* anonymity */ ,
- 0 /* replication */ ,
- GNUNET_TIME_relative_to_absolute
- (GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_MILLISECONDS,
- 60 * 60 * 60 * 1000 +
- GNUNET_CRYPTO_random_u32
- (GNUNET_CRYPTO_QUALITY_WEAK, 1000))), &msg))
- {
- FPRINTF (stderr, "ERROR: `%s'\n", msg);
- GNUNET_free_non_null (msg);
- return;
- }
- stored_bytes += size;
- stored_ops++;
- stored_entries++;
+ crc->api->put (crc->api->cls, &key, size, value, i + 1 /* type */ ,
+ prio, i /* anonymity */ ,
+ 0 /* replication */ ,
+ GNUNET_TIME_relative_to_absolute
+ (GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_MILLISECONDS,
+ 60 * 60 * 60 * 1000 +
+ GNUNET_CRYPTO_random_u32
+ (GNUNET_CRYPTO_QUALITY_WEAK, 1000))),
+ put_continuation, crc);
+ i++;
}
-static void
-test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
-
-
static uint64_t guid;
@@ -213,12 +238,20 @@
static void
+update_continuation (void *cls, int status, char *msg)
+{
+ struct CpsRunContext *crc = cls;
+
+ GNUNET_assert (GNUNET_OK == status);
+ crc->phase++;
+ GNUNET_SCHEDULER_add_now (&test, crc);
+}
+
+
+static void
test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct CpsRunContext *crc = cls;
- int j;
- unsigned long long os;
- unsigned long long cs;
struct GNUNET_HashCode key;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
@@ -237,16 +270,7 @@
GNUNET_SCHEDULER_add_now (&cleaning_task, crc);
break;
case RP_PUT:
- os = 0;
- for (j = 0; j < PUT_10; j++)
- {
- put_value (crc->api, j, crc->i);
- crc->api->estimate_size (crc->api->cls, &cs);
- GNUNET_assert (os <= cs);
- os = cs;
- }
- crc->phase++;
- GNUNET_SCHEDULER_add_now (&test, crc);
+ do_put (crc);
break;
case RP_GET:
if (crc->cnt == 1)
@@ -261,11 +285,8 @@
GNUNET_BLOCK_TYPE_ANY, &iterate_one_shot, crc);
break;
case RP_UPDATE:
- GNUNET_assert (GNUNET_OK ==
- crc->api->update (crc->api->cls, guid, 1,
- GNUNET_TIME_UNIT_ZERO_ABS, NULL));
- crc->phase++;
- GNUNET_SCHEDULER_add_now (&test, crc);
+ crc->api->update (crc->api->cls, guid, 1, GNUNET_TIME_UNIT_ZERO_ABS,
+ update_continuation, crc);
break;
case RP_ITER_ZERO:
Modified: gnunet/src/include/gnunet_datastore_plugin.h
===================================================================
--- gnunet/src/include/gnunet_datastore_plugin.h 2015-03-20 23:44:57 UTC
(rev 35418)
+++ gnunet/src/include/gnunet_datastore_plugin.h 2015-03-21 03:38:29 UTC
(rev 35419)
@@ -113,6 +113,19 @@
/**
+ * Put continuation.
+ *
+ * @param cls closure
+ * @param key key for the item stored
+ * @param size size of the item stored
+ * @param status GNUNET_OK or GNUNET_SYSERROR
+ * @param msg error message on error
+ */
+typedef void (*PluginPutCont) (void *cls, const struct GNUNET_HashCode *key,
+ uint32_t size, int status, char *msg);
+
+
+/**
* Store an item in the datastore. If the item is already present,
* the priorities and replication levels are summed up and the higher
* expiration time and lower anonymity level is used.
@@ -126,22 +139,23 @@
* @param anonymity anonymity-level for the content
* @param replication replication-level for the content
* @param expiration expiration time for the content
- * @param msg set to an error message (on failure)
- * @return #GNUNET_OK on success,
- * #GNUNET_SYSERR on failure
+ * @param cont continuation called with success or failure status
+ * @param cont_cls continuation closure
*/
-typedef int (*PluginPut) (void *cls, const struct GNUNET_HashCode * key,
uint32_t size,
- const void *data, enum GNUNET_BLOCK_Type type,
- uint32_t priority, uint32_t anonymity,
- uint32_t replication,
- struct GNUNET_TIME_Absolute expiration, char **msg);
+typedef void (*PluginPut) (void *cls, const struct GNUNET_HashCode * key,
+ uint32_t size,
+ const void *data, enum GNUNET_BLOCK_Type type,
+ uint32_t priority, uint32_t anonymity,
+ uint32_t replication,
+ struct GNUNET_TIME_Absolute expiration,
+ PluginPutCont cont, void *cont_cls);
/**
* An processor over a set of keys stored in the datastore.
*
* @param cls closure
- * @param key key in the data store
+ * @param key key in the data store, if NULL iteration is finished
* @param count how many values are stored under this key in the datastore
*/
typedef void (*PluginKeyProcessor) (void *cls,
@@ -174,8 +188,6 @@
* there may be!
* @param type entries of which type are relevant?
* Use 0 for any type.
- * @param min find the smallest key that is larger than the given min,
- * NULL for no minimum (return smallest key)
* @param proc function to call on the matching value;
* proc should be called with NULL if there is no result
* @param proc_cls closure for @a proc
@@ -201,6 +213,14 @@
void *proc_cls);
+/**
+ * Update continuation.
+ *
+ * @param cls closure
+ * @param status GNUNET_OK or GNUNET_SYSERROR
+ * @param msg error message on error
+ */
+typedef void (*PluginUpdateCont) (void *cls, int status, char *msg);
/**
@@ -220,11 +240,12 @@
* @param expire new expiration time should be the
* MAX of any existing expiration time and
* this value
- * @param msg set to an error message (on error)
- * @return #GNUNET_OK on success
+ * @param cont continuation called with success or failure status
+ * @param cons_cls continuation closure
*/
-typedef int (*PluginUpdate) (void *cls, uint64_t uid, int delta,
- struct GNUNET_TIME_Absolute expire, char **msg);
+typedef void (*PluginUpdate) (void *cls, uint64_t uid, int delta,
+ struct GNUNET_TIME_Absolute expire,
+ PluginUpdateCont cont, void *cont_cls);
/**
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r35419 - in gnunet/src: datastore include,
gnunet <=