[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r25132 - gnunet/src/fs
From: |
gnunet |
Subject: |
[GNUnet-SVN] r25132 - gnunet/src/fs |
Date: |
Sun, 25 Nov 2012 20:18:09 +0100 |
Author: grothoff
Date: 2012-11-25 20:18:09 +0100 (Sun, 25 Nov 2012)
New Revision: 25132
Modified:
gnunet/src/fs/gnunet-service-fs_pr.c
gnunet/src/fs/gnunet-service-fs_stream.c
Log:
-switch to hash map for replies to avoid linear scan, add timeout for inactive
clients
Modified: gnunet/src/fs/gnunet-service-fs_pr.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.c 2012-11-25 13:12:04 UTC (rev
25131)
+++ gnunet/src/fs/gnunet-service-fs_pr.c 2012-11-25 19:18:09 UTC (rev
25132)
@@ -1181,7 +1181,11 @@
GNUNET_break (0 == data_size);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Error retrieiving block via stream\n");
- /* FIXME: should re-try a few times... */
+ /* FIXME: maybe we should re-try a few times; but then
+ we MUST bound the number of re-tries to not keep
+ asking indefinitely with fresh streams; this should
+ be implemented if/when the stream code gets its
+ timeout/parallel-session limits */
return;
}
if (GNUNET_YES !=
Modified: gnunet/src/fs/gnunet-service-fs_stream.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_stream.c 2012-11-25 13:12:04 UTC (rev
25131)
+++ gnunet/src/fs/gnunet-service-fs_stream.c 2012-11-25 19:18:09 UTC (rev
25132)
@@ -24,7 +24,7 @@
* @author Christian Grothoff
*
* TODO:
- * - limit # concurrent clients, have timeouts for server-side
+ * - limit # concurrent clients
*/
#include "platform.h"
#include "gnunet_constants.h"
@@ -37,6 +37,12 @@
#include "gnunet-service-fs_stream.h"
/**
+ * After how long do we termiante idle connections?
+ */
+#define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES,
2)
+
+
+/**
* Information we keep around for each active streaming client.
*/
struct StreamClient
@@ -82,6 +88,11 @@
GNUNET_SCHEDULER_TaskIdentifier terminate_task;
/**
+ * Task that is scheduled to terminate idle connections.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
+ /**
* Size of the last write that was initiated.
*/
size_t reply_size;
@@ -210,16 +221,12 @@
struct GSF_StreamRequest *pending_tail;
/**
- * Head of DLL of requests waiting for a reply on this stream.
+ * Map from query to 'struct GSF_StreamRequest's waiting for
+ * a reply.
*/
- struct GSF_StreamRequest *waiting_head;
+ struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
/**
- * Tail of DLL of requests waiting for a reply on this stream.
- */
- struct GSF_StreamRequest *waiting_tail;
-
- /**
* Connection to the other peer.
*/
struct GNUNET_STREAM_Socket *stream;
@@ -290,7 +297,31 @@
/* ********************* client-side code ************************* */
+/**
+ * Iterator called on each entry in a waiting map to
+ * call the 'proc' continuation and release associated
+ * resources.
+ *
+ * @param cls the 'struct StreamHandle'
+ * @param key the key of the entry in the map (the query)
+ * @param value the 'struct GSF_StreamRequest' to clean up
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+free_waiting_entry (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct GSF_StreamRequest *sr = value;
+ sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
+ GNUNET_TIME_UNIT_FOREVER_ABS,
+ 0, NULL);
+ GSF_stream_query_cancel (sr);
+ return GNUNET_YES;
+}
+
+
/**
* Destroy a stream handle.
*
@@ -308,13 +339,9 @@
0, NULL);
GSF_stream_query_cancel (sr);
}
- while (NULL != (sr = sh->waiting_head))
- {
- sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
- GNUNET_TIME_UNIT_FOREVER_ABS,
- 0, NULL);
- GSF_stream_query_cancel (sr);
- }
+ GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
+ &free_waiting_entry,
+ sh);
if (NULL != sh->wh)
GNUNET_STREAM_io_write_cancel (sh->wh);
if (NULL != sh->rh)
@@ -326,6 +353,7 @@
GNUNET_CONTAINER_multihashmap_remove (stream_map,
&sh->target.hashPubKey,
sh));
+ GNUNET_CONTAINER_multihashmap_destroy (sh->waiting_map);
GNUNET_free (sh);
}
@@ -357,6 +385,35 @@
/**
+ * Iterator called on each entry in a waiting map to
+ * move it back to the pending list.
+ *
+ * @param cls the 'struct StreamHandle'
+ * @param key the key of the entry in the map (the query)
+ * @param value the 'struct GSF_StreamRequest' to move to pending
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+move_to_pending (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct StreamHandle *sh = cls;
+ struct GSF_StreamRequest *sr = value;
+
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
+ key,
+ value));
+ GNUNET_CONTAINER_DLL_insert (sh->pending_head,
+ sh->pending_tail,
+ sr);
+ sr->was_transmitted = GNUNET_NO;
+ return GNUNET_YES;
+}
+
+
+/**
* We had a serious error, tear down and re-create stream from scratch.
*
* @param sh stream to reset
@@ -364,8 +421,6 @@
static void
reset_stream (struct StreamHandle *sh)
{
- struct GSF_StreamRequest *sr;
-
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Resetting stream to %s\n",
GNUNET_i2s (&sh->target));
@@ -373,16 +428,9 @@
GNUNET_STREAM_io_read_cancel (sh->rh);
GNUNET_STREAM_close (sh->stream);
sh->is_ready = GNUNET_NO;
- while (NULL != (sr = sh->waiting_tail))
- {
- GNUNET_CONTAINER_DLL_remove (sh->waiting_head,
- sh->waiting_tail,
- sr);
- GNUNET_CONTAINER_DLL_insert (sh->pending_head,
- sh->pending_tail,
- sr);
- sr->was_transmitted = GNUNET_NO;
- }
+ GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
+ &move_to_pending,
+ sh);
sh->stream = GNUNET_STREAM_open (GSF_cfg,
&sh->target,
GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
@@ -541,9 +589,10 @@
GNUNET_CONTAINER_DLL_remove (sh->pending_head,
sh->pending_tail,
sr);
- GNUNET_CONTAINER_DLL_insert_tail (sh->waiting_head,
- sh->waiting_tail,
- sr);
+ GNUNET_CONTAINER_multihashmap_put (sh->waiting_map,
+ &sr->query,
+ sr,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending query via stream to %s\n",
GNUNET_i2s (&sh->target));
@@ -561,6 +610,67 @@
/**
+ * Closure for 'handle_reply'.
+ */
+struct HandleReplyClosure
+{
+
+ /**
+ * Reply payload.
+ */
+ const void *data;
+
+ /**
+ * Expiration time for the block.
+ */
+ struct GNUNET_TIME_Absolute expiration;
+
+ /**
+ * Number of bytes in 'data'.
+ */
+ size_t data_size;
+
+ /**
+ * Type of the block.
+ */
+ enum GNUNET_BLOCK_Type type;
+
+ /**
+ * Did we have a matching query?
+ */
+ int found;
+};
+
+
+/**
+ * Iterator called on each entry in a waiting map to
+ * process a result.
+ *
+ * @param cls the 'struct HandleReplyClosure'
+ * @param key the key of the entry in the map (the query)
+ * @param value the 'struct GSF_StreamRequest' to handle result for
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+handle_reply (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct HandleReplyClosure *hrc = cls;
+ struct GSF_StreamRequest *sr = value;
+
+ sr->proc (sr->proc_cls,
+ hrc->type,
+ hrc->expiration,
+ hrc->data_size,
+ hrc->data);
+ GSF_stream_query_cancel (sr);
+ hrc->found = GNUNET_YES;
+ return GNUNET_YES;
+}
+
+
+/**
* Functions with this signature are called whenever a
* complete reply is received.
*
@@ -578,10 +688,10 @@
{
struct StreamHandle *sh = cls;
const struct StreamReplyMessage *srm;
+ struct HandleReplyClosure hrc;
uint16_t msize;
enum GNUNET_BLOCK_Type type;
struct GNUNET_HashCode query;
- struct GSF_StreamRequest *sr;
msize = ntohs (message->size);
switch (ntohs (message->type))
@@ -611,24 +721,22 @@
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# replies received via stream"), 1,
GNUNET_NO);
- for (sr = sh->waiting_head; NULL != sr; sr = sr->next)
- if (0 == memcmp (&query,
- &sr->query,
- sizeof (struct GNUNET_HashCode)))
- break;
- if (NULL == sr)
+ hrc.data = &srm[1];
+ hrc.data_size = msize;
+ hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
+ hrc.type = type;
+ hrc.found = GNUNET_NO;
+ GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map,
+ &query,
+ &handle_reply,
+ &hrc);
+ if (GNUNET_NO == hrc.found)
{
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# replies received via stream
dropped"), 1,
GNUNET_NO);
return GNUNET_OK;
}
- sr->proc (sr->proc_cls,
- type,
- GNUNET_TIME_absolute_ntoh (srm->expiration),
- msize,
- &srm[1]);
- GSF_stream_query_cancel (sr);
return GNUNET_OK;
default:
GNUNET_break_op (0);
@@ -665,6 +773,7 @@
sh = GNUNET_malloc (sizeof (struct StreamHandle));
sh->mst = GNUNET_SERVER_mst_create (&reply_cb,
sh);
+ sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES);
sh->target = *target;
sh->stream = GNUNET_STREAM_open (GSF_cfg,
&sh->target,
@@ -731,15 +840,15 @@
struct StreamHandle *sh = sr->sh;
if (GNUNET_YES == sr->was_transmitted)
- GNUNET_CONTAINER_DLL_remove (sh->waiting_head,
- sh->waiting_tail,
- sr);
+ GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
+ &sr->query,
+ sr);
else
GNUNET_CONTAINER_DLL_remove (sh->pending_head,
sh->pending_tail,
sr);
GNUNET_free (sr);
- if ( (NULL == sh->waiting_head) &&
+ if ( (0 == GNUNET_CONTAINER_multihashmap_size (sh->waiting_map)) &&
(NULL == sh->pending_head) )
sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
&stream_timeout,
@@ -763,6 +872,8 @@
GNUNET_NO);
if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
GNUNET_SCHEDULER_cancel (sc->terminate_task);
+ if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
+ GNUNET_SCHEDULER_cancel (sc->timeout_task);
if (NULL != sc->rh)
GNUNET_STREAM_io_read_cancel (sc->rh);
if (NULL != sc->wh)
@@ -796,6 +907,39 @@
/**
+ * Task run to asynchronously terminate the stream due to timeout.
+ *
+ * @param cls the 'struct StreamClient'
+ * @param tc scheduler context
+ */
+static void
+timeout_stream_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct StreamClient *sc = cls;
+
+ sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ terminate_stream (sc);
+}
+
+
+/**
+ * Reset the timeout for the stream client (due to activity).
+ *
+ * @param sc client handle to reset timeout for
+ */
+static void
+refresh_timeout_task (struct StreamClient *sc)
+{
+ if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
+ GNUNET_SCHEDULER_cancel (sc->timeout_task);
+ sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
+ &timeout_stream_task,
+ sc);
+}
+
+
+/**
* We had a serious error, termiante stream,
* but do so asynchronously.
*
@@ -845,6 +989,7 @@
GNUNET_NO, GNUNET_YES);
if (GNUNET_NO == ret)
return;
+ refresh_timeout_task (sc);
sc->rh = GNUNET_STREAM_read (sc->socket,
GNUNET_TIME_UNIT_FOREVER_REL,
&process_request,
@@ -1049,6 +1194,7 @@
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# queries received via stream"), 1,
GNUNET_NO);
+ refresh_timeout_task (sc);
sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
0,
&sqm->query,
@@ -1106,6 +1252,7 @@
GNUNET_CONTAINER_DLL_insert (sc_head,
sc_tail,
sc);
+ refresh_timeout_task (sc);
return GNUNET_OK;
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r25132 - gnunet/src/fs,
gnunet <=