[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet] branch master updated: -fix properly emptying queue; add parall
From: |
gnunet |
Subject: |
[gnunet] branch master updated: -fix properly emptying queue; add parallelization to monitor |
Date: |
Thu, 20 Oct 2022 10:31:54 +0200 |
This is an automated email from the git hooks/post-receive script.
martin-schanzenbach pushed a commit to branch master
in repository gnunet.
The following commit(s) were added to refs/heads/master by this push:
new 01b5953cb -fix properly emptying queue; add parallelization to monitor
01b5953cb is described below
commit 01b5953cb3d3c7f072115ffa7b72884c3614cbae
Author: Martin Schanzenbach <schanzen@gnunet.org>
AuthorDate: Thu Oct 20 17:31:48 2022 +0900
-fix properly emptying queue; add parallelization to monitor
---
src/zonemaster/gnunet-service-zonemaster.c | 185 +++++++++++------------------
1 file changed, 68 insertions(+), 117 deletions(-)
diff --git a/src/zonemaster/gnunet-service-zonemaster.c
b/src/zonemaster/gnunet-service-zonemaster.c
index 42b3abf91..fb55fd718 100644
--- a/src/zonemaster/gnunet-service-zonemaster.c
+++ b/src/zonemaster/gnunet-service-zonemaster.c
@@ -423,7 +423,8 @@ shutdown_task (void *cls)
}
while (NULL != (ma = ma_head))
{
- GNUNET_DHT_put_cancel (ma->ph);
+ if (NULL != ma->ph)
+ GNUNET_DHT_put_cancel (ma->ph);
ma_queue_length--;
GNUNET_CONTAINER_DLL_remove (ma_head,
ma_tail,
@@ -818,36 +819,25 @@ perform_dht_put (const struct GNUNET_IDENTITY_PrivateKey
*key,
GNUNET_free (emsg);
}
- if (cache_keys)
- {
- GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create2 (key,
- expire_pub,
- label,
- rd_public,
-
rd_public_count,
- &block));
- }
- else
- {
- GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create (key,
- expire_pub,
- label,
- rd_public,
- rd_public_count,
- &block));
- }
+ GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create_unsigned (key,
+
expire_pub,
+ label,
+
rd_public,
+
rd_public_count,
+ &block));
if (NULL == block)
{
GNUNET_break (0);
return NULL; /* whoops */
}
if (rd_count != rd_public_count)
- GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create (key,
- expire,
- label,
- rd,
- rd_count,
- &block_priv));
+ GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create_unsigned (key,
+
expire,
+ label,
+ rd,
+
rd_count,
+ &
+
block_priv));
else
block_priv = block;
block_size = GNUNET_GNSRECORD_block_get_size (block);
@@ -879,22 +869,31 @@ initiate_put_from_pipe_trigger (void *cls)
{
struct GNUNET_HashCode query;
struct OpenSignJob *job;
+ const struct GNUNET_DISK_FileHandle *np_fh;
+ char buf[100];
+ ssize_t nf_count;
pipe_read_task = NULL;
GNUNET_assert (0 == pthread_mutex_lock (&results_lock));
job = results_head;
+ np_fh = GNUNET_DISK_pipe_handle (notification_pipe,
+ GNUNET_DISK_PIPE_END_READ);
+ pipe_read_task =
+ GNUNET_SCHEDULER_add_read_file (
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ np_fh,
+ notification_pipe_cb,
+ NULL);
+ /* empty queue */
+ while (GNUNET_SYSERR !=
+ (nf_count = GNUNET_DISK_file_read (np_fh, buf, sizeof (buf))))
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Read %lld notifications from pipe\n",
+ nf_count);
if (NULL == job)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Hmm... no results. Back to sleep.\n");
GNUNET_assert (0 == pthread_mutex_unlock (&results_lock));
- const struct GNUNET_DISK_FileHandle *np_fh = GNUNET_DISK_pipe_handle (
- notification_pipe,
- GNUNET_DISK_PIPE_END_READ);
- pipe_read_task =
- GNUNET_SCHEDULER_add_read_file (
- GNUNET_TIME_UNIT_FOREVER_REL,
- np_fh,
- notification_pipe_cb,
- NULL);
return;
}
GNUNET_CONTAINER_DLL_remove (results_head, results_tail, job);
@@ -1119,8 +1118,6 @@ dht_put_monitor_continuation (void *cls)
{
struct DhtPutActivity *ma = cls;
- GNUNET_NAMESTORE_zone_monitor_next (zmon,
- 1);
ma_queue_length--;
GNUNET_CONTAINER_DLL_remove (ma_head,
ma_tail,
@@ -1172,73 +1169,39 @@ perform_dht_put_monitor (const struct
GNUNET_IDENTITY_PrivateKey *key,
GNUNET_free (emsg);
}
- if (cache_keys)
- GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create2 (key,
- expire_pub,
- label,
- rd_public,
-
rd_public_count,
- &block));
- else
- GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create (key,
- expire_pub,
- label,
- rd_public,
- rd_public_count,
- &block));
+ GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create_unsigned (key,
+
expire_pub,
+ label,
+
rd_public,
+
rd_public_count,
+ &block));
if (NULL == block)
{
GNUNET_break (0);
return NULL; /* whoops */
}
if (rd_count != rd_public_count)
- GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create (key,
- expire,
- label,
- rd,
- rd_count,
- &block_priv));
+ GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create_unsigned (key,
+
expire,
+ label,
+ rd,
+
rd_count,
+ &
+
block_priv));
else
block_priv = block;
block_size = GNUNET_GNSRECORD_block_get_size (block);
GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock));
struct OpenSignJob *job = GNUNET_new (struct OpenSignJob);
- job->block = GNUNET_malloc (block_size); // FIXME this does not need to be
copied, can be freed by worker
- memcpy (job->block, block, block_size);
+ job->block = block;
+ job->block_size = block_size;
+ job->block_priv = block_priv;
job->zone = *key;
+ job->ma = ma;
job->label = GNUNET_strdup (label);
+ job->expire_pub = expire_pub;
GNUNET_CONTAINER_DLL_insert (jobs_head, jobs_tail, job);
GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
- GNUNET_GNSRECORD_query_from_private_key (key,
- label,
- &query);
- GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
- GNUNET_STATISTICS_update (statistics,
- "DHT put operations initiated",
- 1,
- GNUNET_NO);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Storing %u public of %u record(s) for label `%s' in DHT with
expiration `%s' under key %s\n",
- rd_public_count,
- rd_count,
- label,
- GNUNET_STRINGS_absolute_time_to_string (expire),
- GNUNET_h2s (&query));
- ret = GNUNET_DHT_put (dht_handle,
- &query,
- DHT_GNS_REPLICATION_LEVEL,
- GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
- GNUNET_BLOCK_TYPE_GNS_NAMERECORD,
- block_size,
- block,
- expire_pub,
- &dht_put_monitor_continuation,
- ma);
- refresh_block (block_priv);
- if (block != block_priv)
- GNUNET_free (block_priv);
- GNUNET_free (block);
- return ret;
}
/**
@@ -1277,41 +1240,26 @@ handle_monitor_event (void *cls,
1);
return; /* nothing to do */
}
- ma = GNUNET_new (struct DhtPutActivity);
- ma->start_date = GNUNET_TIME_absolute_get ();
- ma->ph = perform_dht_put_monitor (zone,
- label,
- rd,
- rd_count,
- expire,
- ma);
- if (NULL == ma->ph)
+ if (dht_queue_length >= DHT_QUEUE_LIMIT)
{
- /* PUT failed, do not remember operation */
- GNUNET_free (ma);
- GNUNET_NAMESTORE_zone_monitor_next (zmon,
- 1);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "DHT PUT queue length exceeded (%u), aborting PUT\n",
+ DHT_QUEUE_LIMIT);
return;
}
+ ma = GNUNET_new (struct DhtPutActivity);
+ perform_dht_put_monitor (zone,
+ label,
+ rd,
+ rd_count,
+ expire,
+ ma);
+ GNUNET_NAMESTORE_zone_monitor_next (zmon,
+ 1);
GNUNET_CONTAINER_DLL_insert_tail (ma_head,
ma_tail,
ma);
ma_queue_length++;
- if (ma_queue_length > DHT_QUEUE_LIMIT)
- {
- ma = ma_head;
- GNUNET_CONTAINER_DLL_remove (ma_head,
- ma_tail,
- ma);
- GNUNET_DHT_put_cancel (ma->ph);
- ma_queue_length--;
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "DHT PUT unconfirmed after %s, aborting PUT\n",
- GNUNET_STRINGS_relative_time_to_string (
- GNUNET_TIME_absolute_get_duration (ma->start_date),
- GNUNET_YES));
- GNUNET_free (ma);
- }
}
@@ -1351,12 +1299,15 @@ sign_worker (void *)
GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
if (NULL != job)
{
+ GNUNET_GNSRECORD_block_sign (&job->zone, job->label, job->block);
+ if (job->block != job->block_priv)
+ GNUNET_GNSRECORD_block_sign (&job->zone, job->label, job->block_priv);
GNUNET_assert (0 == pthread_mutex_lock (&results_lock));
GNUNET_CONTAINER_DLL_insert (results_head, results_tail, job);
GNUNET_assert (0 == pthread_mutex_unlock (&results_lock));
job = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Done, notifying main thread throug pipe!\n");
+ "Done, notifying main thread through pipe!\n");
GNUNET_DISK_file_write (fh, "!", 1);
}
else {
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [gnunet] branch master updated: -fix properly emptying queue; add parallelization to monitor,
gnunet <=