gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r27365 - gnunet/src/fs


From: gnunet
Subject: [GNUnet-SVN] r27365 - gnunet/src/fs
Date: Tue, 4 Jun 2013 15:14:21 +0200

Author: grothoff
Date: 2013-06-04 15:14:21 +0200 (Tue, 04 Jun 2013)
New Revision: 27365

Added:
   gnunet/src/fs/gnunet-service-fs_mesh.c
   gnunet/src/fs/gnunet-service-fs_mesh.h
   gnunet/src/fs/test_fs_download_mesh.conf
   gnunet/src/fs/test_gnunet_service_fs_p2p_mesh.conf
Removed:
   gnunet/src/fs/gnunet-service-fs_stream.c
   gnunet/src/fs/gnunet-service-fs_stream.h
   gnunet/src/fs/test_fs_download_stream.conf
   gnunet/src/fs/test_gnunet_service_fs_p2p_stream.conf
Modified:
   gnunet/src/fs/Makefile.am
   gnunet/src/fs/gnunet-service-fs.c
   gnunet/src/fs/gnunet-service-fs_pr.c
   gnunet/src/fs/test_fs_download.c
   gnunet/src/fs/test_gnunet_service_fs_p2p.c
Log:
-file renaming stream->mesh

Modified: gnunet/src/fs/Makefile.am
===================================================================
--- gnunet/src/fs/Makefile.am   2013-06-04 13:10:47 UTC (rev 27364)
+++ gnunet/src/fs/Makefile.am   2013-06-04 13:14:21 UTC (rev 27365)
@@ -186,7 +186,7 @@
  gnunet-service-fs_pr.c gnunet-service-fs_pr.h \
  gnunet-service-fs_push.c gnunet-service-fs_push.h \
  gnunet-service-fs_put.c gnunet-service-fs_put.h \
- gnunet-service-fs_stream.c gnunet-service-fs_stream.h 
+ gnunet-service-fs_mesh.c gnunet-service-fs_mesh.h 
 gnunet_service_fs_LDADD =  \
  $(top_builddir)/src/fs/libgnunetfs.la \
  $(top_builddir)/src/dht/libgnunetdht.la \
@@ -239,7 +239,7 @@
  test_plugin_block_fs \
  test_fs_directory \
  test_fs_download \
- test_fs_download_stream \
+ test_fs_download_mesh \
  test_fs_download_indexed \
  test_fs_download_persistence \
  test_fs_file_information \
@@ -260,7 +260,7 @@
  test_fs_uri \
  test_gnunet_service_fs_migration \
  test_gnunet_service_fs_p2p \
- test_gnunet_service_fs_p2p_stream \
+ test_gnunet_service_fs_p2p_mesh \
  $(FS_BENCHMARKS)
 
 test_plugin_block_fs_SOURCES = \
@@ -306,7 +306,7 @@
  test_fs_test_lib \
  test_gnunet_service_fs_migration \
  test_gnunet_service_fs_p2p \
- test_gnunet_service_fs_p2p_stream \
+ test_gnunet_service_fs_p2p_mesh \
  perf_gnunet_service_fs_p2p \
  perf_gnunet_service_fs_p2p_index \
  perf_gnunet_service_fs_p2p_respect \
@@ -343,9 +343,9 @@
   $(top_builddir)/src/fs/libgnunetfs.la  \
   $(top_builddir)/src/util/libgnunetutil.la  
 
-test_fs_download_stream_SOURCES = \
+test_fs_download_mesh_SOURCES = \
  test_fs_download.c
-test_fs_download_stream_LDADD = \
+test_fs_download_mesh_LDADD = \
   $(top_builddir)/src/testing/libgnunettesting.la  \
   $(top_builddir)/src/fs/libgnunetfs.la  \
   $(top_builddir)/src/util/libgnunetutil.la  
@@ -469,9 +469,9 @@
   $(top_builddir)/src/fs/libgnunetfs.la  \
   $(top_builddir)/src/util/libgnunetutil.la  
 
-test_gnunet_service_fs_p2p_stream_SOURCES = \
+test_gnunet_service_fs_p2p_mesh_SOURCES = \
  test_gnunet_service_fs_p2p.c
-test_gnunet_service_fs_p2p_stream_LDADD = \
+test_gnunet_service_fs_p2p_mesh_LDADD = \
   $(top_builddir)/src/fs/libgnunetfstest.a \
   $(top_builddir)/src/testbed/libgnunettestbed.la \
   $(top_builddir)/src/fs/libgnunetfs.la  \
@@ -547,7 +547,7 @@
   test_fs_data.conf \
   test_fs_download_data.conf \
   test_fs_download_indexed.conf \
-  test_fs_download_stream.conf \
+  test_fs_download_mesh.conf \
   test_fs_file_information_data.conf \
   fs_test_lib_data.conf \
   test_fs_list_indexed_data.conf \
@@ -557,7 +557,7 @@
   test_fs_unindex_data.conf \
   test_fs_uri_data.conf \
   test_gnunet_service_fs_migration_data.conf \
-  test_gnunet_service_fs_p2p_stream.conf \
+  test_gnunet_service_fs_p2p_mesh.conf \
   test_gnunet_fs_idx_data.conf \
   test_gnunet_fs_ns_data.conf \
   test_gnunet_fs_psd_data.conf \

Modified: gnunet/src/fs/gnunet-service-fs.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs.c   2013-06-04 13:10:47 UTC (rev 27364)
+++ gnunet/src/fs/gnunet-service-fs.c   2013-06-04 13:14:21 UTC (rev 27365)
@@ -43,7 +43,7 @@
 #include "gnunet-service-fs_pr.h"
 #include "gnunet-service-fs_push.h"
 #include "gnunet-service-fs_put.h"
-#include "gnunet-service-fs_stream.h"
+#include "gnunet-service-fs_mesh.h"
 #include "fs.h"
 
 /**

Copied: gnunet/src/fs/gnunet-service-fs_mesh.c (from rev 27364, 
gnunet/src/fs/gnunet-service-fs_stream.c)
===================================================================
--- gnunet/src/fs/gnunet-service-fs_mesh.c                              (rev 0)
+++ gnunet/src/fs/gnunet-service-fs_mesh.c      2013-06-04 13:14:21 UTC (rev 
27365)
@@ -0,0 +1,1243 @@
+/*
+     This file is part of GNUnet.
+     (C) 2012, 2013 Christian Grothoff (and other contributing authors)
+
+     GNUnet is free software; you can redistribute it and/or modify
+     it under the terms of the GNU General Public License as published
+     by the Free Software Foundation; either version 3, or (at your
+     option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     General Public License for more details.
+
+     You should have received a copy of the GNU General Public License
+     along with GNUnet; see the file COPYING.  If not, write to the
+     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+     Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file fs/gnunet-service-fs_mesh.c
+ * @brief non-anonymous file-transfer
+ * @author Christian Grothoff
+ *
+ * TODO:
+ * - update comments on functions (still matches 'stream')
+ * - MESH2 API doesn't allow flow control for server yet (needed!)
+ * - likely need to register clean up handler with mesh to handle
+ *   client disconnect (likely leaky right now)
+ * - server is optional, currently client code will NPE if we have
+ *   no server, again MESH2 API requirement forcing this for now
+ * - message handlers are symmetric for client/server, should be
+ *   separated (currently clients can get requests and servers can
+ *   handle answers, not good)
+ * - code is entirely untested
+ * - might have overlooked a few possible simplifications
+ * - PORT is set to old application type, unsure if we should keep
+ *   it that way (fine for now)
+ */
+#include "platform.h"
+#include "gnunet_constants.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_mesh2_service.h"
+#include "gnunet_protocols.h"
+#include "gnunet_applications.h"
+#include "gnunet-service-fs.h"
+#include "gnunet-service-fs_indexing.h"
+#include "gnunet-service-fs_mesh.h"
+
+/**
+ * After how long do we termiante idle connections?
+ */
+#define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 
2)
+
+/**
+ * After how long do we reset connections without replies?
+ */
+#define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS, 30)
+
+
+/**
+ * A message in the queue to be written to the stream.
+ */
+struct WriteQueueItem
+{
+  /**
+   * Kept in a DLL.
+   */
+  struct WriteQueueItem *next;
+
+  /**
+   * Kept in a DLL.
+   */
+  struct WriteQueueItem *prev;
+
+  /**
+   * Number of bytes of payload, allocated at the end of this struct.
+   */
+  size_t msize;
+};
+
+
+/**
+ * Information we keep around for each active streaming client.
+ */
+struct StreamClient
+{
+  /**
+   * DLL
+   */ 
+  struct StreamClient *next;
+
+  /**
+   * DLL
+   */ 
+  struct StreamClient *prev;
+
+  /**
+   * Socket for communication.
+   */ 
+  struct GNUNET_MESH_Tunnel *socket;
+
+  /**
+   * Handle for active write operation, or NULL.
+   */ 
+  struct GNUNET_MESH_TransmitHandle *wh;
+
+  /**
+   * Head of write queue.
+   */
+  struct WriteQueueItem *wqi_head;
+
+  /**
+   * Tail of write queue.
+   */
+  struct WriteQueueItem *wqi_tail;
+  
+  /**
+   * Current active request to the datastore, if we have one pending.
+   */
+  struct GNUNET_DATASTORE_QueueEntry *qe;
+
+  /**
+   * Task that is scheduled to asynchronously terminate the connection.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier terminate_task;
+
+  /**
+   * Task that is scheduled to terminate idle connections.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
+  /**
+   * Size of the last write that was initiated.
+   */ 
+  size_t reply_size;
+
+};
+
+
+/**
+ * Query from one peer, asking the other for CHK-data.
+ */
+struct StreamQueryMessage
+{
+
+  /**
+   * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Block type must be DBLOCK or IBLOCK.
+   */
+  uint32_t type;
+
+  /**
+   * Query hash from CHK (hash of encrypted block).
+   */
+  struct GNUNET_HashCode query;
+
+};
+
+
+/**
+ * Reply to a StreamQueryMessage.
+ */
+struct StreamReplyMessage
+{
+
+  /**
+   * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Block type must be DBLOCK or IBLOCK.
+   */
+  uint32_t type;
+
+  /**
+   * Expiration time for the block.
+   */
+  struct GNUNET_TIME_AbsoluteNBO expiration;
+
+  /* followed by the encrypted block */
+
+};
+
+
+/** 
+ * Handle for a stream to another peer.
+ */
+struct StreamHandle;
+
+
+/**
+ * Handle for a request that is going out via stream API.
+ */
+struct GSF_StreamRequest
+{
+
+  /**
+   * DLL.
+   */
+  struct GSF_StreamRequest *next;
+
+  /**
+   * DLL.
+   */
+  struct GSF_StreamRequest *prev;
+
+  /**
+   * Which stream is this request associated with?
+   */
+  struct StreamHandle *sh;
+
+  /**
+   * Function to call with the result.
+   */
+  GSF_StreamReplyProcessor proc;
+
+  /**
+   * Closure for 'proc'
+   */
+  void *proc_cls;
+
+  /**
+   * Query to transmit to the other peer.
+   */
+  struct GNUNET_HashCode query;
+
+  /**
+   * Desired type for the reply.
+   */
+  enum GNUNET_BLOCK_Type type;
+
+  /**
+   * Did we transmit this request already? YES if we are
+   * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
+   */
+  int was_transmitted;
+};
+
+
+/** 
+ * Handle for a stream to another peer.
+ */
+struct StreamHandle
+{
+  /**
+   * Head of DLL of pending requests on this stream.
+   */
+  struct GSF_StreamRequest *pending_head;
+
+  /**
+   * Tail of DLL of pending requests on this stream.
+   */
+  struct GSF_StreamRequest *pending_tail;
+
+  /**
+   * Map from query to 'struct GSF_StreamRequest's waiting for
+   * a reply.
+   */
+  struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
+
+  /**
+   * Connection to the other peer.
+   */
+  struct GNUNET_MESH_Tunnel *stream;
+
+  /**
+   * Handle for active write operation, or NULL.
+   */ 
+  struct GNUNET_MESH_TransmitHandle *wh;
+
+  /**
+   * Which peer does this stream go to?
+   */ 
+  struct GNUNET_PeerIdentity target;
+
+  /**
+   * Task to kill inactive streams (we keep them around for
+   * a few seconds to give the application a chance to give
+   * us another query).
+   */
+  GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
+  /**
+   * Task to reset streams that had errors (asynchronously,
+   * as we may not be able to do it immediately during a
+   * callback from the stream API).
+   */
+  GNUNET_SCHEDULER_TaskIdentifier reset_task;
+
+  /**
+   * Is this stream ready for transmission?
+   */
+  int is_ready;
+
+};
+
+
+/**
+ * Listen socket for incoming requests.
+ */
+static struct GNUNET_MESH_Handle *listen_socket;
+
+/**
+ * Head of DLL of stream clients.
+ */ 
+static struct StreamClient *sc_head;
+
+/**
+ * Tail of DLL of stream clients.
+ */ 
+static struct StreamClient *sc_tail;
+
+/**
+ * Number of active stream clients in the 'sc_*'-DLL.
+ */
+static unsigned int sc_count;
+
+/**
+ * Maximum allowed number of stream clients.
+ */
+static unsigned long long sc_count_max;
+
+/**
+ * Map from peer identities to 'struct StreamHandles' with streams to
+ * those peers.
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *stream_map;
+
+
+/* ********************* client-side code ************************* */
+
+/**
+ * Iterator called on each entry in a waiting map to 
+ * call the 'proc' continuation and release associated
+ * resources.
+ *
+ * @param cls the 'struct StreamHandle'
+ * @param key the key of the entry in the map (the query)
+ * @param value the 'struct GSF_StreamRequest' to clean up
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+free_waiting_entry (void *cls,
+                   const struct GNUNET_HashCode *key,
+                   void *value)
+{
+  struct GSF_StreamRequest *sr = value;
+
+  sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
+           GNUNET_TIME_UNIT_FOREVER_ABS,
+           0, NULL);
+  GSF_stream_query_cancel (sr);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Destroy a stream handle.
+ *
+ * @param sh stream to process
+ */
+static void
+destroy_stream_handle (struct StreamHandle *sh)
+{
+  struct GSF_StreamRequest *sr;
+
+  while (NULL != (sr = sh->pending_head))
+  {
+    sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
+             GNUNET_TIME_UNIT_FOREVER_ABS,
+             0, NULL);
+    GSF_stream_query_cancel (sr);
+  }
+  GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
+                                        &free_waiting_entry,
+                                        sh);
+  if (NULL != sh->wh)
+    GNUNET_MESH_notify_transmit_ready_cancel (sh->wh);
+  if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
+    GNUNET_SCHEDULER_cancel (sh->timeout_task);
+  if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
+    GNUNET_SCHEDULER_cancel (sh->reset_task);
+  GNUNET_MESH_tunnel_destroy (sh->stream);
+  GNUNET_assert (GNUNET_OK ==
+                GNUNET_CONTAINER_multihashmap_remove (stream_map,
+                                                      &sh->target.hashPubKey,
+                                                      sh));
+  GNUNET_CONTAINER_multihashmap_destroy (sh->waiting_map);
+  GNUNET_free (sh);
+}
+
+
+/**
+ * Transmit pending requests via the stream.
+ *
+ * @param sh stream to process
+ */
+static void
+transmit_pending (struct StreamHandle *sh);
+
+
+/**
+ * Iterator called on each entry in a waiting map to 
+ * move it back to the pending list.
+ *
+ * @param cls the 'struct StreamHandle'
+ * @param key the key of the entry in the map (the query)
+ * @param value the 'struct GSF_StreamRequest' to move to pending
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+move_to_pending (void *cls,
+                const struct GNUNET_HashCode *key,
+                void *value)
+{
+  struct StreamHandle *sh = cls;
+  struct GSF_StreamRequest *sr = value;
+  
+  GNUNET_assert (GNUNET_YES ==
+                GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
+                                                      key,
+                                                      value));
+  GNUNET_CONTAINER_DLL_insert (sh->pending_head,
+                              sh->pending_tail,
+                              sr);
+  sr->was_transmitted = GNUNET_NO;
+  return GNUNET_YES;
+}
+
+
+/**
+ * We had a serious error, tear down and re-create stream from scratch.
+ *
+ * @param sh stream to reset
+ */
+static void
+reset_stream (struct StreamHandle *sh)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Resetting stream to %s\n",
+             GNUNET_i2s (&sh->target));
+  GNUNET_MESH_tunnel_destroy (sh->stream);
+  sh->is_ready = GNUNET_NO;
+  GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
+                                        &move_to_pending,
+                                        sh);
+  sh->stream = GNUNET_MESH_tunnel_create (listen_socket,
+                                         sh,                               
+                                         &sh->target,
+                                         
GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER);
+}
+
+
+/**
+ * Task called when it is time to destroy an inactive stream.
+ *
+ * @param cls the 'struct StreamHandle' to tear down
+ * @param tc scheduler context, unused
+ */
+static void
+stream_timeout (void *cls,
+               const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct StreamHandle *sh = cls;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Timeout on stream to %s\n",
+             GNUNET_i2s (&sh->target));
+  sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+  destroy_stream_handle (sh);
+}
+
+
+/**
+ * Task called when it is time to reset an stream.
+ *
+ * @param cls the 'struct StreamHandle' to tear down
+ * @param tc scheduler context, unused
+ */
+static void
+reset_stream_task (void *cls,
+                  const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct StreamHandle *sh = cls;
+
+  sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
+  reset_stream (sh);
+}
+
+
+/**
+ * We had a serious error, tear down and re-create stream from scratch,
+ * but do so asynchronously.
+ *
+ * @param sh stream to reset
+ */
+static void
+reset_stream_async (struct StreamHandle *sh)
+{
+  if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
+    GNUNET_SCHEDULER_cancel (sh->reset_task);
+  sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_stream_task,
+                                            sh);
+}
+
+
+/**
+ * Functions of this signature are called whenever we are ready to transmit
+ * query via a stream.
+ *
+ * @param cls the struct StreamHandle for which we did the write call
+ * @param size the number of bytes that can be written to 'buf'
+ * @param buf where to write the message
+ * @return number of bytes written to 'buf'
+ */
+static size_t
+transmit_sqm (void *cls,
+             size_t size,
+             void *buf)
+{
+  struct StreamHandle *sh = cls;
+  struct StreamQueryMessage sqm;
+  struct GSF_StreamRequest *sr;
+
+  sh->wh = NULL;
+  if (NULL == buf)
+  {
+    reset_stream (sh);
+    return 0;
+  }
+  sr = sh->pending_head;
+  if (NULL == sr)
+    return 0;
+  GNUNET_assert (size >= sizeof (struct StreamQueryMessage));
+  GNUNET_CONTAINER_DLL_remove (sh->pending_head,
+                              sh->pending_tail,
+                              sr);
+  GNUNET_CONTAINER_multihashmap_put (sh->waiting_map,
+                                    &sr->query,
+                                    sr,
+                                    
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Sending query via stream to %s\n",
+             GNUNET_i2s (&sh->target));
+  sr->was_transmitted = GNUNET_YES;
+  sqm.header.size = htons (sizeof (sqm));
+  sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY);
+  sqm.type = htonl (sr->type);
+  sqm.query = sr->query;
+  memcpy (buf, &sqm, sizeof (sqm));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Successfully transmitted %u bytes via mesh to %s\n",
+             (unsigned int) size,
+             GNUNET_i2s (&sh->target));
+  transmit_pending (sh);
+  return sizeof (sqm);
+}
+         
+
+/**
+ * Transmit pending requests via the stream.
+ *
+ * @param sh stream to process
+ */
+static void
+transmit_pending (struct StreamHandle *sh)
+{
+  if (NULL != sh->wh)
+    return;
+  sh->wh = GNUNET_MESH_notify_transmit_ready (sh->stream, GNUNET_YES /* allow 
cork */,
+                                             GNUNET_TIME_UNIT_FOREVER_REL,
+                                             sizeof (struct 
StreamQueryMessage),
+                                             &transmit_sqm, sh);
+}
+
+
+/**
+ * Closure for 'handle_reply'.
+ */
+struct HandleReplyClosure
+{
+
+  /**
+   * Reply payload.
+   */ 
+  const void *data;
+
+  /**
+   * Expiration time for the block.
+   */
+  struct GNUNET_TIME_Absolute expiration;
+
+  /**
+   * Number of bytes in 'data'.
+   */
+  size_t data_size;
+
+  /** 
+   * Type of the block.
+   */
+  enum GNUNET_BLOCK_Type type;
+  
+  /**
+   * Did we have a matching query?
+   */
+  int found;
+};
+
+
+/**
+ * Iterator called on each entry in a waiting map to 
+ * process a result.
+ *
+ * @param cls the 'struct HandleReplyClosure'
+ * @param key the key of the entry in the map (the query)
+ * @param value the 'struct GSF_StreamRequest' to handle result for
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+handle_reply (void *cls,
+             const struct GNUNET_HashCode *key,
+             void *value)
+{
+  struct HandleReplyClosure *hrc = cls;
+  struct GSF_StreamRequest *sr = value;
+  
+  sr->proc (sr->proc_cls,
+           hrc->type,
+           hrc->expiration,
+           hrc->data_size,
+           hrc->data);
+  GSF_stream_query_cancel (sr);
+  hrc->found = GNUNET_YES;
+  return GNUNET_YES;
+}
+
+
+/**
+ * Functions with this signature are called whenever a
+ * complete reply is received.
+ *
+ * @param cls closure with the 'struct StreamHandle'
+ * @param client identification of the client, NULL
+ * @param message the actual message
+ * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
+ */
+static int
+reply_cb (void *cls,
+         struct GNUNET_MESH_Tunnel *tunnel,
+         void **tunnel_ctx,
+         const struct GNUNET_PeerIdentity *sender,
+         const struct GNUNET_MessageHeader *message)
+{
+  struct StreamHandle *sh = *tunnel_ctx;
+  const struct StreamReplyMessage *srm;
+  struct HandleReplyClosure hrc;
+  uint16_t msize;
+  enum GNUNET_BLOCK_Type type;
+  struct GNUNET_HashCode query;
+
+  msize = ntohs (message->size);
+  if (sizeof (struct StreamReplyMessage) > msize)
+  {
+    GNUNET_break_op (0);
+    reset_stream_async (sh);
+    return GNUNET_SYSERR;
+  }
+  srm = (const struct StreamReplyMessage *) message;
+  msize -= sizeof (struct StreamReplyMessage);
+  type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
+  if (GNUNET_YES !=
+      GNUNET_BLOCK_get_key (GSF_block_ctx,
+                           type,
+                           &srm[1], msize, &query))
+  {
+    GNUNET_break_op (0); 
+    reset_stream_async (sh);
+    return GNUNET_SYSERR;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received reply `%s' via stream\n",
+             GNUNET_h2s (&query));
+  GNUNET_STATISTICS_update (GSF_stats,
+                           gettext_noop ("# replies received via stream"), 1,
+                           GNUNET_NO);
+  hrc.data = &srm[1];
+  hrc.data_size = msize;
+  hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
+  hrc.type = type;
+  hrc.found = GNUNET_NO;
+  GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map,
+                                             &query,
+                                             &handle_reply,
+                                             &hrc);
+  if (GNUNET_NO == hrc.found)
+  {
+    GNUNET_STATISTICS_update (GSF_stats,
+                             gettext_noop ("# replies received via stream 
dropped"), 1,
+                             GNUNET_NO);
+    return GNUNET_OK;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Get (or create) a stream to talk to the given peer.
+ *
+ * @param target peer we want to communicate with
+ */
+static struct StreamHandle *
+get_stream (const struct GNUNET_PeerIdentity *target)
+{
+  struct StreamHandle *sh;
+
+  sh = GNUNET_CONTAINER_multihashmap_get (stream_map,
+                                         &target->hashPubKey);
+  if (NULL != sh)
+  {
+    if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
+    {
+      GNUNET_SCHEDULER_cancel (sh->timeout_task);
+      sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+    }
+    return sh;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Creating stream to %s\n",
+             GNUNET_i2s (target));
+  sh = GNUNET_malloc (sizeof (struct StreamHandle));
+  sh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
+                                                &reset_stream_task,
+                                                sh);
+  sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES);
+  sh->target = *target;
+  sh->stream = GNUNET_MESH_tunnel_create (listen_socket,
+                                         sh,
+                                         &sh->target,
+                                         
GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER);
+  GNUNET_assert (GNUNET_OK ==
+                GNUNET_CONTAINER_multihashmap_put (stream_map,
+                                                   &sh->target.hashPubKey,
+                                                   sh,
+                                                   
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  return sh;
+}
+
+
+/**
+ * Look for a block by directly contacting a particular peer.
+ *
+ * @param target peer that should have the block
+ * @param query hash to query for the block
+ * @param type desired type for the block
+ * @param proc function to call with result
+ * @param proc_cls closure for 'proc'
+ * @return handle to cancel the operation
+ */
+struct GSF_StreamRequest *
+GSF_stream_query (const struct GNUNET_PeerIdentity *target,
+                 const struct GNUNET_HashCode *query,
+                 enum GNUNET_BLOCK_Type type,
+                 GSF_StreamReplyProcessor proc, void *proc_cls)
+{
+  struct StreamHandle *sh;
+  struct GSF_StreamRequest *sr;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Preparing to send query for %s via stream to %s\n",
+             GNUNET_h2s (query),
+             GNUNET_i2s (target));
+  sh = get_stream (target);
+  sr = GNUNET_malloc (sizeof (struct GSF_StreamRequest));
+  sr->sh = sh;
+  sr->proc = proc;
+  sr->proc_cls = proc_cls;
+  sr->type = type;
+  sr->query = *query;
+  GNUNET_CONTAINER_DLL_insert (sh->pending_head,
+                              sh->pending_tail,
+                              sr);
+  if (GNUNET_YES == sh->is_ready)
+    transmit_pending (sh);
+  return sr;
+}
+
+
+/**
+ * Cancel an active request; must not be called after 'proc'
+ * was calld.
+ *
+ * @param sr request to cancel
+ */
+void
+GSF_stream_query_cancel (struct GSF_StreamRequest *sr)
+{
+  struct StreamHandle *sh = sr->sh;
+
+  if (GNUNET_YES == sr->was_transmitted)
+    GNUNET_assert (GNUNET_OK ==
+                  GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
+                                                        &sr->query,
+                                                        sr));
+  else
+    GNUNET_CONTAINER_DLL_remove (sh->pending_head,
+                                sh->pending_tail,
+                                sr);
+  GNUNET_free (sr);
+  if ( (0 == GNUNET_CONTAINER_multihashmap_size (sh->waiting_map)) &&
+       (NULL == sh->pending_head) )
+    sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
+                                                    &stream_timeout,
+                                                    sh);
+}
+
+
+/* ********************* server-side code ************************* */
+
+
+/**
+ * We're done with a particular client, clean up.
+ *
+ * @param sc client to clean up
+ */
+static void
+terminate_stream (struct StreamClient *sc)
+{
+  GNUNET_STATISTICS_update (GSF_stats,
+                           gettext_noop ("# stream connections active"), -1,
+                           GNUNET_NO);
+  if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
+    GNUNET_SCHEDULER_cancel (sc->terminate_task); 
+  if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
+    GNUNET_SCHEDULER_cancel (sc->timeout_task); 
+  if (NULL != sc->wh)
+    GNUNET_MESH_notify_transmit_ready_cancel (sc->wh);
+  if (NULL != sc->qe)
+    GNUNET_DATASTORE_cancel (sc->qe);
+  GNUNET_MESH_tunnel_destroy (sc->socket);
+  struct WriteQueueItem *wqi;
+  while (NULL != (wqi = sc->wqi_head))
+  {
+    GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
+                                sc->wqi_tail,
+                                wqi);
+    GNUNET_free (wqi);
+  }
+  GNUNET_CONTAINER_DLL_remove (sc_head,
+                              sc_tail,
+                              sc);
+  sc_count--;
+  GNUNET_free (sc);
+}
+
+
+/**
+ * Task run to asynchronously terminate the stream due to timeout.
+ *
+ * @param cls the 'struct StreamClient'
+ * @param tc scheduler context
+ */ 
+static void
+timeout_stream_task (void *cls,
+                    const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct StreamClient *sc = cls;
+
+  sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+  terminate_stream (sc);
+}
+
+
+/**
+ * Reset the timeout for the stream client (due to activity).
+ *
+ * @param sc client handle to reset timeout for
+ */
+static void
+refresh_timeout_task (struct StreamClient *sc)
+{
+  if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
+    GNUNET_SCHEDULER_cancel (sc->timeout_task); 
+  sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
+                                                  &timeout_stream_task,
+                                                  sc);
+}
+
+
+/**
+ * We're done handling a request from a client, read the next one.
+ *
+ * @param sc client to continue reading requests from
+ */
+static void
+continue_reading (struct StreamClient *sc)
+{
+  refresh_timeout_task (sc);
+}
+
+
+/**
+ * Transmit the next entry from the write queue.
+ *
+ * @param sc where to process the write queue
+ */
+static void
+continue_writing (struct StreamClient *sc);
+
+
+/**
+ * Send a reply now, mesh is ready.
+ *
+ * @param cls closure with the struct StreamClient which sent the query
+ * @param size number of bytes available in 'buf'
+ * @param buf where to write the message
+ * @return number of bytes written to 'buf'
+ */
+static size_t
+write_continuation (void *cls,
+                   size_t size,
+                   void *buf)
+{
+  struct StreamClient *sc = cls;
+  struct WriteQueueItem *wqi;
+  size_t ret;
+
+  sc->wh = NULL;
+  if (NULL == (wqi = sc->wqi_head))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Write queue empty, reading more requests\n");
+    return 0;
+  }
+  if (0 == size)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Transmission of reply failed, terminating stream\n");
+    terminate_stream (sc);    
+    return 0;
+  }
+  GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
+                              sc->wqi_tail,
+                              wqi);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Transmitted %u byte reply via stream\n",
+             (unsigned int) size);
+  GNUNET_STATISTICS_update (GSF_stats,
+                           gettext_noop ("# Blocks transferred via stream"), 1,
+                           GNUNET_NO);
+  memcpy (buf, &wqi[1], ret = wqi->msize);
+  GNUNET_free (wqi);
+  continue_writing (sc);
+  return ret;
+}
+
+
+/**
+ * Transmit the next entry from the write queue.
+ *
+ * @param sc where to process the write queue
+ */
+static void
+continue_writing (struct StreamClient *sc)
+{
+  struct WriteQueueItem *wqi;
+
+  if (NULL != sc->wh)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Write pending, waiting for it to complete\n");
+    return; /* write already pending */
+  }
+  if (NULL == (wqi = sc->wqi_head))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Write queue empty, reading more requests\n");
+    continue_reading (sc);
+    return;
+  }
+  sc->wh = GNUNET_MESH_notify_transmit_ready (sc->socket, GNUNET_NO,
+                                             GNUNET_TIME_UNIT_FOREVER_REL,
+                                             wqi->msize,                       
              
+                                             &write_continuation,
+                                             sc);
+  if (NULL == sc->wh)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Write failed; terminating stream\n");
+    terminate_stream (sc);
+    return;
+  }
+}
+
+
+/**
+ * Process a datum that was stored in the datastore.
+ *
+ * @param cls closure with the struct StreamClient which sent the query
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ *        maybe 0 if no unique identifier is available
+ */
+static void 
+handle_datastore_reply (void *cls,
+                       const struct GNUNET_HashCode * key,
+                       size_t size, const void *data,
+                       enum GNUNET_BLOCK_Type type,
+                       uint32_t priority,
+                       uint32_t anonymity,
+                       struct GNUNET_TIME_Absolute
+                       expiration, uint64_t uid)
+{
+  struct StreamClient *sc = cls;
+  size_t msize = size + sizeof (struct StreamReplyMessage);
+  struct WriteQueueItem *wqi;
+  struct StreamReplyMessage *srm;
+
+  sc->qe = NULL;
+  if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Performing on-demand encoding\n");
+    if (GNUNET_OK !=
+       GNUNET_FS_handle_on_demand_block (key,
+                                         size, data, type,
+                                         priority, anonymity,
+                                         expiration, uid,
+                                         &handle_datastore_reply,
+                                         sc))
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "On-demand encoding request failed\n");
+      continue_writing (sc);
+    }
+    return;
+  }
+  if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
+  {
+    GNUNET_break (0);
+    continue_writing (sc);
+    return;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Starting transmission of %u byte reply for query `%s' via 
stream\n",
+             (unsigned int) size,
+             GNUNET_h2s (key));
+  wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
+  wqi->msize = msize;
+  srm = (struct StreamReplyMessage *) &wqi[1];
+  srm->header.size = htons ((uint16_t) msize);
+  srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
+  srm->type = htonl (type);
+  srm->expiration = GNUNET_TIME_absolute_hton (expiration);
+  memcpy (&srm[1], data, size);
+  sc->reply_size = msize;
+  GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
+                              sc->wqi_tail,
+                              wqi);
+  continue_writing (sc);
+}
+
+
+/**
+ * Functions with this signature are called whenever a
+ * complete query message is received.
+ *
+ * Do not call GNUNET_SERVER_mst_destroy in callback
+ *
+ * @param cls closure with the 'struct StreamClient'
+ * @param client identification of the client, NULL
+ * @param message the actual message
+ * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
+ */
+static int
+request_cb (void *cls,
+           struct GNUNET_MESH_Tunnel *tunnel,
+           void **tunnel_ctx,
+           const struct GNUNET_PeerIdentity *sender,
+           const struct GNUNET_MessageHeader *message)
+{
+  struct StreamClient *sc = *tunnel_ctx;
+  const struct StreamQueryMessage *sqm;
+
+  sqm = (const struct StreamQueryMessage *) message;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received query for `%s' via stream\n",
+             GNUNET_h2s (&sqm->query));
+  GNUNET_STATISTICS_update (GSF_stats,
+                           gettext_noop ("# queries received via stream"), 1,
+                           GNUNET_NO);
+  refresh_timeout_task (sc);
+  sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
+                                    0,
+                                    &sqm->query,
+                                    ntohl (sqm->type),
+                                    0 /* priority */, 
+                                    GSF_datastore_queue_size,
+                                    GNUNET_TIME_UNIT_FOREVER_REL,
+                                    &handle_datastore_reply, sc);
+  if (NULL == sc->qe)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Queueing request with datastore failed (queue full?)\n");
+    continue_writing (sc);
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Functions of this type are called upon new stream connection from other 
peers
+ * or upon binding error which happen when the app_port given in
+ * GNUNET_STREAM_listen() is already taken.
+ *
+ * @param cls the closure from GNUNET_STREAM_listen
+ * @param socket the socket representing the stream
+ * @param initiator the identity of the peer who wants to establish a stream
+ *            with us; NULL on binding error
+ * @return initial tunnel context (our 'struct StreamClient')
+ */
+static void *
+accept_cb (void *cls,
+          struct GNUNET_MESH_Tunnel *socket,
+          const struct GNUNET_PeerIdentity *initiator,
+          uint32_t port)
+{
+  struct StreamClient *sc;
+
+  GNUNET_assert (NULL != socket);
+  if (sc_count >= sc_count_max)
+  {
+    GNUNET_STATISTICS_update (GSF_stats,
+                             gettext_noop ("# stream client connections 
rejected"), 1,
+                             GNUNET_NO);
+    GNUNET_MESH_tunnel_destroy (socket);
+    return NULL;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Accepting inbound stream connection from `%s'\n",
+             GNUNET_i2s (initiator));
+  GNUNET_STATISTICS_update (GSF_stats,
+                           gettext_noop ("# stream connections active"), 1,
+                           GNUNET_NO);
+  sc = GNUNET_malloc (sizeof (struct StreamClient));
+  sc->socket = socket;
+  GNUNET_CONTAINER_DLL_insert (sc_head,
+                              sc_tail,
+                              sc);
+  sc_count++;
+  refresh_timeout_task (sc);
+  return sc;
+}
+
+
+/**
+ * Initialize subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_stream_start ()
+{
+  static const struct GNUNET_MESH_MessageHandler handlers[] = {
+    { &request_cb, GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY, sizeof (struct 
StreamQueryMessage)},
+    { &reply_cb, GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY, 0 },
+    { NULL, 0, 0 }
+  };
+  static const uint32_t ports[] = {
+    GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
+    0
+  };
+
+  stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
+  if (GNUNET_YES ==
+      GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
+                                            "fs",
+                                            "MAX_STREAM_CLIENTS",
+                                            &sc_count_max))
+  {
+    listen_socket = GNUNET_MESH_connect (GSF_cfg,
+                                        NULL,
+                                        &accept_cb,
+                                        NULL /* FIXME: have a cleanup 
callback? */,
+                                        handlers,
+                                        ports);
+  } 
+}
+
+
+/**
+ * Function called on each active streams to shut them down.
+ *
+ * @param cls NULL
+ * @param key target peer, unused
+ * @param value the 'struct StreamHandle' to destroy
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+release_streams (void *cls,
+                const struct GNUNET_HashCode *key,
+                void *value)
+{
+  struct StreamHandle *sh = value;
+
+  destroy_stream_handle (sh);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Shutdown subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_stream_stop ()
+{
+  struct StreamClient *sc;
+
+  while (NULL != (sc = sc_head))
+    terminate_stream (sc);
+  if (NULL != listen_socket)
+  {
+    GNUNET_MESH_disconnect (listen_socket);
+    listen_socket = NULL;
+  }
+  GNUNET_CONTAINER_multihashmap_iterate (stream_map,
+                                        &release_streams,
+                                        NULL);
+  GNUNET_CONTAINER_multihashmap_destroy (stream_map);
+  stream_map = NULL;
+}
+
+/* end of gnunet-service-fs_stream.c */

Copied: gnunet/src/fs/gnunet-service-fs_mesh.h (from rev 27357, 
gnunet/src/fs/gnunet-service-fs_stream.h)
===================================================================
--- gnunet/src/fs/gnunet-service-fs_mesh.h                              (rev 0)
+++ gnunet/src/fs/gnunet-service-fs_mesh.h      2013-06-04 13:14:21 UTC (rev 
27365)
@@ -0,0 +1,91 @@
+/*
+     This file is part of GNUnet.
+     (C) 2012 Christian Grothoff (and other contributing authors)
+
+     GNUnet is free software; you can redistribute it and/or modify
+     it under the terms of the GNU General Public License as published
+     by the Free Software Foundation; either version 3, or (at your
+     option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     General Public License for more details.
+
+     You should have received a copy of the GNU General Public License
+     along with GNUnet; see the file COPYING.  If not, write to the
+     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+     Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file fs/gnunet-service-fs_mesh.h
+ * @brief non-anonymous file-transfer
+ * @author Christian Grothoff
+ */
+#ifndef GNUNET_SERVICE_FS_MESH_H
+#define GNUNET_SERVICE_FS_MESH_H
+
+/**
+ * Handle for a request that is going out via stream API.
+ */
+struct GSF_StreamRequest;
+
+
+/**
+ * Function called with a reply from the stream.
+ * 
+ * @param cls closure
+ * @param type type of the block, ANY on error
+ * @param expiration expiration time for the block
+ * @param data_size number of bytes in 'data', 0 on error
+ * @param data reply block data, NULL on error
+ */
+typedef void (*GSF_StreamReplyProcessor)(void *cls,
+                                        enum GNUNET_BLOCK_Type type,
+                                        struct GNUNET_TIME_Absolute expiration,
+                                        size_t data_size,
+                                        const void *data);
+
+
+/**
+ * Look for a block by directly contacting a particular peer.
+ *
+ * @param target peer that should have the block
+ * @param query hash to query for the block
+ * @param type desired type for the block
+ * @param proc function to call with result
+ * @param proc_cls closure for 'proc'
+ * @return handle to cancel the operation
+ */
+struct GSF_StreamRequest *
+GSF_stream_query (const struct GNUNET_PeerIdentity *target,
+                 const struct GNUNET_HashCode *query,
+                 enum GNUNET_BLOCK_Type type,
+                 GSF_StreamReplyProcessor proc, void *proc_cls);
+
+
+/**
+ * Cancel an active request; must not be called after 'proc'
+ * was calld.
+ *
+ * @param sr request to cancel
+ */
+void
+GSF_stream_query_cancel (struct GSF_StreamRequest *sr);
+
+
+/**
+ * Initialize subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_stream_start (void);
+
+
+/**
+ * Shutdown subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_stream_stop (void);
+
+#endif

Modified: gnunet/src/fs/gnunet-service-fs_pr.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.c        2013-06-04 13:10:47 UTC (rev 
27364)
+++ gnunet/src/fs/gnunet-service-fs_pr.c        2013-06-04 13:14:21 UTC (rev 
27365)
@@ -30,7 +30,7 @@
 #include "gnunet-service-fs_indexing.h"
 #include "gnunet-service-fs_pe.h"
 #include "gnunet-service-fs_pr.h"
-#include "gnunet-service-fs_stream.h"
+#include "gnunet-service-fs_mesh.h"
 
 
 /**

Deleted: gnunet/src/fs/gnunet-service-fs_stream.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_stream.c    2013-06-04 13:10:47 UTC (rev 
27364)
+++ gnunet/src/fs/gnunet-service-fs_stream.c    2013-06-04 13:14:21 UTC (rev 
27365)
@@ -1,1243 +0,0 @@
-/*
-     This file is part of GNUnet.
-     (C) 2012, 2013 Christian Grothoff (and other contributing authors)
-
-     GNUnet is free software; you can redistribute it and/or modify
-     it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 3, or (at your
-     option) any later version.
-
-     GNUnet is distributed in the hope that it will be useful, but
-     WITHOUT ANY WARRANTY; without even the implied warranty of
-     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-     General Public License for more details.
-
-     You should have received a copy of the GNU General Public License
-     along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
-*/
-
-/**
- * @file fs/gnunet-service-fs_stream.c
- * @brief non-anonymous file-transfer
- * @author Christian Grothoff
- *
- * TODO:
- * - update comments on functions (still matches 'stream')
- * - MESH2 API doesn't allow flow control for server yet (needed!)
- * - likely need to register clean up handler with mesh to handle
- *   client disconnect (likely leaky right now)
- * - server is optional, currently client code will NPE if we have
- *   no server, again MESH2 API requirement forcing this for now
- * - message handlers are symmetric for client/server, should be
- *   separated (currently clients can get requests and servers can
- *   handle answers, not good)
- * - code is entirely untested
- * - might have overlooked a few possible simplifications
- * - PORT is set to old application type, unsure if we should keep
- *   it that way (fine for now)
- */
-#include "platform.h"
-#include "gnunet_constants.h"
-#include "gnunet_util_lib.h"
-#include "gnunet_mesh2_service.h"
-#include "gnunet_protocols.h"
-#include "gnunet_applications.h"
-#include "gnunet-service-fs.h"
-#include "gnunet-service-fs_indexing.h"
-#include "gnunet-service-fs_stream.h"
-
-/**
- * After how long do we termiante idle connections?
- */
-#define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 
2)
-
-/**
- * After how long do we reset connections without replies?
- */
-#define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS, 30)
-
-
-/**
- * A message in the queue to be written to the stream.
- */
-struct WriteQueueItem
-{
-  /**
-   * Kept in a DLL.
-   */
-  struct WriteQueueItem *next;
-
-  /**
-   * Kept in a DLL.
-   */
-  struct WriteQueueItem *prev;
-
-  /**
-   * Number of bytes of payload, allocated at the end of this struct.
-   */
-  size_t msize;
-};
-
-
-/**
- * Information we keep around for each active streaming client.
- */
-struct StreamClient
-{
-  /**
-   * DLL
-   */ 
-  struct StreamClient *next;
-
-  /**
-   * DLL
-   */ 
-  struct StreamClient *prev;
-
-  /**
-   * Socket for communication.
-   */ 
-  struct GNUNET_MESH_Tunnel *socket;
-
-  /**
-   * Handle for active write operation, or NULL.
-   */ 
-  struct GNUNET_MESH_TransmitHandle *wh;
-
-  /**
-   * Head of write queue.
-   */
-  struct WriteQueueItem *wqi_head;
-
-  /**
-   * Tail of write queue.
-   */
-  struct WriteQueueItem *wqi_tail;
-  
-  /**
-   * Current active request to the datastore, if we have one pending.
-   */
-  struct GNUNET_DATASTORE_QueueEntry *qe;
-
-  /**
-   * Task that is scheduled to asynchronously terminate the connection.
-   */
-  GNUNET_SCHEDULER_TaskIdentifier terminate_task;
-
-  /**
-   * Task that is scheduled to terminate idle connections.
-   */
-  GNUNET_SCHEDULER_TaskIdentifier timeout_task;
-
-  /**
-   * Size of the last write that was initiated.
-   */ 
-  size_t reply_size;
-
-};
-
-
-/**
- * Query from one peer, asking the other for CHK-data.
- */
-struct StreamQueryMessage
-{
-
-  /**
-   * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY.
-   */
-  struct GNUNET_MessageHeader header;
-
-  /**
-   * Block type must be DBLOCK or IBLOCK.
-   */
-  uint32_t type;
-
-  /**
-   * Query hash from CHK (hash of encrypted block).
-   */
-  struct GNUNET_HashCode query;
-
-};
-
-
-/**
- * Reply to a StreamQueryMessage.
- */
-struct StreamReplyMessage
-{
-
-  /**
-   * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY.
-   */
-  struct GNUNET_MessageHeader header;
-
-  /**
-   * Block type must be DBLOCK or IBLOCK.
-   */
-  uint32_t type;
-
-  /**
-   * Expiration time for the block.
-   */
-  struct GNUNET_TIME_AbsoluteNBO expiration;
-
-  /* followed by the encrypted block */
-
-};
-
-
-/** 
- * Handle for a stream to another peer.
- */
-struct StreamHandle;
-
-
-/**
- * Handle for a request that is going out via stream API.
- */
-struct GSF_StreamRequest
-{
-
-  /**
-   * DLL.
-   */
-  struct GSF_StreamRequest *next;
-
-  /**
-   * DLL.
-   */
-  struct GSF_StreamRequest *prev;
-
-  /**
-   * Which stream is this request associated with?
-   */
-  struct StreamHandle *sh;
-
-  /**
-   * Function to call with the result.
-   */
-  GSF_StreamReplyProcessor proc;
-
-  /**
-   * Closure for 'proc'
-   */
-  void *proc_cls;
-
-  /**
-   * Query to transmit to the other peer.
-   */
-  struct GNUNET_HashCode query;
-
-  /**
-   * Desired type for the reply.
-   */
-  enum GNUNET_BLOCK_Type type;
-
-  /**
-   * Did we transmit this request already? YES if we are
-   * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
-   */
-  int was_transmitted;
-};
-
-
-/** 
- * Handle for a stream to another peer.
- */
-struct StreamHandle
-{
-  /**
-   * Head of DLL of pending requests on this stream.
-   */
-  struct GSF_StreamRequest *pending_head;
-
-  /**
-   * Tail of DLL of pending requests on this stream.
-   */
-  struct GSF_StreamRequest *pending_tail;
-
-  /**
-   * Map from query to 'struct GSF_StreamRequest's waiting for
-   * a reply.
-   */
-  struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
-
-  /**
-   * Connection to the other peer.
-   */
-  struct GNUNET_MESH_Tunnel *stream;
-
-  /**
-   * Handle for active write operation, or NULL.
-   */ 
-  struct GNUNET_MESH_TransmitHandle *wh;
-
-  /**
-   * Which peer does this stream go to?
-   */ 
-  struct GNUNET_PeerIdentity target;
-
-  /**
-   * Task to kill inactive streams (we keep them around for
-   * a few seconds to give the application a chance to give
-   * us another query).
-   */
-  GNUNET_SCHEDULER_TaskIdentifier timeout_task;
-
-  /**
-   * Task to reset streams that had errors (asynchronously,
-   * as we may not be able to do it immediately during a
-   * callback from the stream API).
-   */
-  GNUNET_SCHEDULER_TaskIdentifier reset_task;
-
-  /**
-   * Is this stream ready for transmission?
-   */
-  int is_ready;
-
-};
-
-
-/**
- * Listen socket for incoming requests.
- */
-static struct GNUNET_MESH_Handle *listen_socket;
-
-/**
- * Head of DLL of stream clients.
- */ 
-static struct StreamClient *sc_head;
-
-/**
- * Tail of DLL of stream clients.
- */ 
-static struct StreamClient *sc_tail;
-
-/**
- * Number of active stream clients in the 'sc_*'-DLL.
- */
-static unsigned int sc_count;
-
-/**
- * Maximum allowed number of stream clients.
- */
-static unsigned long long sc_count_max;
-
-/**
- * Map from peer identities to 'struct StreamHandles' with streams to
- * those peers.
- */
-static struct GNUNET_CONTAINER_MultiHashMap *stream_map;
-
-
-/* ********************* client-side code ************************* */
-
-/**
- * Iterator called on each entry in a waiting map to 
- * call the 'proc' continuation and release associated
- * resources.
- *
- * @param cls the 'struct StreamHandle'
- * @param key the key of the entry in the map (the query)
- * @param value the 'struct GSF_StreamRequest' to clean up
- * @return GNUNET_YES (continue to iterate)
- */
-static int
-free_waiting_entry (void *cls,
-                   const struct GNUNET_HashCode *key,
-                   void *value)
-{
-  struct GSF_StreamRequest *sr = value;
-
-  sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
-           GNUNET_TIME_UNIT_FOREVER_ABS,
-           0, NULL);
-  GSF_stream_query_cancel (sr);
-  return GNUNET_YES;
-}
-
-
-/**
- * Destroy a stream handle.
- *
- * @param sh stream to process
- */
-static void
-destroy_stream_handle (struct StreamHandle *sh)
-{
-  struct GSF_StreamRequest *sr;
-
-  while (NULL != (sr = sh->pending_head))
-  {
-    sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
-             GNUNET_TIME_UNIT_FOREVER_ABS,
-             0, NULL);
-    GSF_stream_query_cancel (sr);
-  }
-  GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
-                                        &free_waiting_entry,
-                                        sh);
-  if (NULL != sh->wh)
-    GNUNET_MESH_notify_transmit_ready_cancel (sh->wh);
-  if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
-    GNUNET_SCHEDULER_cancel (sh->timeout_task);
-  if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
-    GNUNET_SCHEDULER_cancel (sh->reset_task);
-  GNUNET_MESH_tunnel_destroy (sh->stream);
-  GNUNET_assert (GNUNET_OK ==
-                GNUNET_CONTAINER_multihashmap_remove (stream_map,
-                                                      &sh->target.hashPubKey,
-                                                      sh));
-  GNUNET_CONTAINER_multihashmap_destroy (sh->waiting_map);
-  GNUNET_free (sh);
-}
-
-
-/**
- * Transmit pending requests via the stream.
- *
- * @param sh stream to process
- */
-static void
-transmit_pending (struct StreamHandle *sh);
-
-
-/**
- * Iterator called on each entry in a waiting map to 
- * move it back to the pending list.
- *
- * @param cls the 'struct StreamHandle'
- * @param key the key of the entry in the map (the query)
- * @param value the 'struct GSF_StreamRequest' to move to pending
- * @return GNUNET_YES (continue to iterate)
- */
-static int
-move_to_pending (void *cls,
-                const struct GNUNET_HashCode *key,
-                void *value)
-{
-  struct StreamHandle *sh = cls;
-  struct GSF_StreamRequest *sr = value;
-  
-  GNUNET_assert (GNUNET_YES ==
-                GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
-                                                      key,
-                                                      value));
-  GNUNET_CONTAINER_DLL_insert (sh->pending_head,
-                              sh->pending_tail,
-                              sr);
-  sr->was_transmitted = GNUNET_NO;
-  return GNUNET_YES;
-}
-
-
-/**
- * We had a serious error, tear down and re-create stream from scratch.
- *
- * @param sh stream to reset
- */
-static void
-reset_stream (struct StreamHandle *sh)
-{
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Resetting stream to %s\n",
-             GNUNET_i2s (&sh->target));
-  GNUNET_MESH_tunnel_destroy (sh->stream);
-  sh->is_ready = GNUNET_NO;
-  GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
-                                        &move_to_pending,
-                                        sh);
-  sh->stream = GNUNET_MESH_tunnel_create (listen_socket,
-                                         sh,                               
-                                         &sh->target,
-                                         
GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER);
-}
-
-
-/**
- * Task called when it is time to destroy an inactive stream.
- *
- * @param cls the 'struct StreamHandle' to tear down
- * @param tc scheduler context, unused
- */
-static void
-stream_timeout (void *cls,
-               const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct StreamHandle *sh = cls;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Timeout on stream to %s\n",
-             GNUNET_i2s (&sh->target));
-  sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
-  destroy_stream_handle (sh);
-}
-
-
-/**
- * Task called when it is time to reset an stream.
- *
- * @param cls the 'struct StreamHandle' to tear down
- * @param tc scheduler context, unused
- */
-static void
-reset_stream_task (void *cls,
-                  const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct StreamHandle *sh = cls;
-
-  sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
-  reset_stream (sh);
-}
-
-
-/**
- * We had a serious error, tear down and re-create stream from scratch,
- * but do so asynchronously.
- *
- * @param sh stream to reset
- */
-static void
-reset_stream_async (struct StreamHandle *sh)
-{
-  if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
-    GNUNET_SCHEDULER_cancel (sh->reset_task);
-  sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_stream_task,
-                                            sh);
-}
-
-
-/**
- * Functions of this signature are called whenever we are ready to transmit
- * query via a stream.
- *
- * @param cls the struct StreamHandle for which we did the write call
- * @param size the number of bytes that can be written to 'buf'
- * @param buf where to write the message
- * @return number of bytes written to 'buf'
- */
-static size_t
-transmit_sqm (void *cls,
-             size_t size,
-             void *buf)
-{
-  struct StreamHandle *sh = cls;
-  struct StreamQueryMessage sqm;
-  struct GSF_StreamRequest *sr;
-
-  sh->wh = NULL;
-  if (NULL == buf)
-  {
-    reset_stream (sh);
-    return 0;
-  }
-  sr = sh->pending_head;
-  if (NULL == sr)
-    return 0;
-  GNUNET_assert (size >= sizeof (struct StreamQueryMessage));
-  GNUNET_CONTAINER_DLL_remove (sh->pending_head,
-                              sh->pending_tail,
-                              sr);
-  GNUNET_CONTAINER_multihashmap_put (sh->waiting_map,
-                                    &sr->query,
-                                    sr,
-                                    
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Sending query via stream to %s\n",
-             GNUNET_i2s (&sh->target));
-  sr->was_transmitted = GNUNET_YES;
-  sqm.header.size = htons (sizeof (sqm));
-  sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY);
-  sqm.type = htonl (sr->type);
-  sqm.query = sr->query;
-  memcpy (buf, &sqm, sizeof (sqm));
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Successfully transmitted %u bytes via mesh to %s\n",
-             (unsigned int) size,
-             GNUNET_i2s (&sh->target));
-  transmit_pending (sh);
-  return sizeof (sqm);
-}
-         
-
-/**
- * Transmit pending requests via the stream.
- *
- * @param sh stream to process
- */
-static void
-transmit_pending (struct StreamHandle *sh)
-{
-  if (NULL != sh->wh)
-    return;
-  sh->wh = GNUNET_MESH_notify_transmit_ready (sh->stream, GNUNET_YES /* allow 
cork */,
-                                             GNUNET_TIME_UNIT_FOREVER_REL,
-                                             sizeof (struct 
StreamQueryMessage),
-                                             &transmit_sqm, sh);
-}
-
-
-/**
- * Closure for 'handle_reply'.
- */
-struct HandleReplyClosure
-{
-
-  /**
-   * Reply payload.
-   */ 
-  const void *data;
-
-  /**
-   * Expiration time for the block.
-   */
-  struct GNUNET_TIME_Absolute expiration;
-
-  /**
-   * Number of bytes in 'data'.
-   */
-  size_t data_size;
-
-  /** 
-   * Type of the block.
-   */
-  enum GNUNET_BLOCK_Type type;
-  
-  /**
-   * Did we have a matching query?
-   */
-  int found;
-};
-
-
-/**
- * Iterator called on each entry in a waiting map to 
- * process a result.
- *
- * @param cls the 'struct HandleReplyClosure'
- * @param key the key of the entry in the map (the query)
- * @param value the 'struct GSF_StreamRequest' to handle result for
- * @return GNUNET_YES (continue to iterate)
- */
-static int
-handle_reply (void *cls,
-             const struct GNUNET_HashCode *key,
-             void *value)
-{
-  struct HandleReplyClosure *hrc = cls;
-  struct GSF_StreamRequest *sr = value;
-  
-  sr->proc (sr->proc_cls,
-           hrc->type,
-           hrc->expiration,
-           hrc->data_size,
-           hrc->data);
-  GSF_stream_query_cancel (sr);
-  hrc->found = GNUNET_YES;
-  return GNUNET_YES;
-}
-
-
-/**
- * Functions with this signature are called whenever a
- * complete reply is received.
- *
- * @param cls closure with the 'struct StreamHandle'
- * @param client identification of the client, NULL
- * @param message the actual message
- * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
- */
-static int
-reply_cb (void *cls,
-         struct GNUNET_MESH_Tunnel *tunnel,
-         void **tunnel_ctx,
-         const struct GNUNET_PeerIdentity *sender,
-         const struct GNUNET_MessageHeader *message)
-{
-  struct StreamHandle *sh = *tunnel_ctx;
-  const struct StreamReplyMessage *srm;
-  struct HandleReplyClosure hrc;
-  uint16_t msize;
-  enum GNUNET_BLOCK_Type type;
-  struct GNUNET_HashCode query;
-
-  msize = ntohs (message->size);
-  if (sizeof (struct StreamReplyMessage) > msize)
-  {
-    GNUNET_break_op (0);
-    reset_stream_async (sh);
-    return GNUNET_SYSERR;
-  }
-  srm = (const struct StreamReplyMessage *) message;
-  msize -= sizeof (struct StreamReplyMessage);
-  type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
-  if (GNUNET_YES !=
-      GNUNET_BLOCK_get_key (GSF_block_ctx,
-                           type,
-                           &srm[1], msize, &query))
-  {
-    GNUNET_break_op (0); 
-    reset_stream_async (sh);
-    return GNUNET_SYSERR;
-  }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Received reply `%s' via stream\n",
-             GNUNET_h2s (&query));
-  GNUNET_STATISTICS_update (GSF_stats,
-                           gettext_noop ("# replies received via stream"), 1,
-                           GNUNET_NO);
-  hrc.data = &srm[1];
-  hrc.data_size = msize;
-  hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
-  hrc.type = type;
-  hrc.found = GNUNET_NO;
-  GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map,
-                                             &query,
-                                             &handle_reply,
-                                             &hrc);
-  if (GNUNET_NO == hrc.found)
-  {
-    GNUNET_STATISTICS_update (GSF_stats,
-                             gettext_noop ("# replies received via stream 
dropped"), 1,
-                             GNUNET_NO);
-    return GNUNET_OK;
-  }
-  return GNUNET_OK;
-}
-
-
-/**
- * Get (or create) a stream to talk to the given peer.
- *
- * @param target peer we want to communicate with
- */
-static struct StreamHandle *
-get_stream (const struct GNUNET_PeerIdentity *target)
-{
-  struct StreamHandle *sh;
-
-  sh = GNUNET_CONTAINER_multihashmap_get (stream_map,
-                                         &target->hashPubKey);
-  if (NULL != sh)
-  {
-    if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
-    {
-      GNUNET_SCHEDULER_cancel (sh->timeout_task);
-      sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
-    }
-    return sh;
-  }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Creating stream to %s\n",
-             GNUNET_i2s (target));
-  sh = GNUNET_malloc (sizeof (struct StreamHandle));
-  sh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
-                                                &reset_stream_task,
-                                                sh);
-  sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES);
-  sh->target = *target;
-  sh->stream = GNUNET_MESH_tunnel_create (listen_socket,
-                                         sh,
-                                         &sh->target,
-                                         
GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER);
-  GNUNET_assert (GNUNET_OK ==
-                GNUNET_CONTAINER_multihashmap_put (stream_map,
-                                                   &sh->target.hashPubKey,
-                                                   sh,
-                                                   
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-  return sh;
-}
-
-
-/**
- * Look for a block by directly contacting a particular peer.
- *
- * @param target peer that should have the block
- * @param query hash to query for the block
- * @param type desired type for the block
- * @param proc function to call with result
- * @param proc_cls closure for 'proc'
- * @return handle to cancel the operation
- */
-struct GSF_StreamRequest *
-GSF_stream_query (const struct GNUNET_PeerIdentity *target,
-                 const struct GNUNET_HashCode *query,
-                 enum GNUNET_BLOCK_Type type,
-                 GSF_StreamReplyProcessor proc, void *proc_cls)
-{
-  struct StreamHandle *sh;
-  struct GSF_StreamRequest *sr;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Preparing to send query for %s via stream to %s\n",
-             GNUNET_h2s (query),
-             GNUNET_i2s (target));
-  sh = get_stream (target);
-  sr = GNUNET_malloc (sizeof (struct GSF_StreamRequest));
-  sr->sh = sh;
-  sr->proc = proc;
-  sr->proc_cls = proc_cls;
-  sr->type = type;
-  sr->query = *query;
-  GNUNET_CONTAINER_DLL_insert (sh->pending_head,
-                              sh->pending_tail,
-                              sr);
-  if (GNUNET_YES == sh->is_ready)
-    transmit_pending (sh);
-  return sr;
-}
-
-
-/**
- * Cancel an active request; must not be called after 'proc'
- * was calld.
- *
- * @param sr request to cancel
- */
-void
-GSF_stream_query_cancel (struct GSF_StreamRequest *sr)
-{
-  struct StreamHandle *sh = sr->sh;
-
-  if (GNUNET_YES == sr->was_transmitted)
-    GNUNET_assert (GNUNET_OK ==
-                  GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
-                                                        &sr->query,
-                                                        sr));
-  else
-    GNUNET_CONTAINER_DLL_remove (sh->pending_head,
-                                sh->pending_tail,
-                                sr);
-  GNUNET_free (sr);
-  if ( (0 == GNUNET_CONTAINER_multihashmap_size (sh->waiting_map)) &&
-       (NULL == sh->pending_head) )
-    sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
-                                                    &stream_timeout,
-                                                    sh);
-}
-
-
-/* ********************* server-side code ************************* */
-
-
-/**
- * We're done with a particular client, clean up.
- *
- * @param sc client to clean up
- */
-static void
-terminate_stream (struct StreamClient *sc)
-{
-  GNUNET_STATISTICS_update (GSF_stats,
-                           gettext_noop ("# stream connections active"), -1,
-                           GNUNET_NO);
-  if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
-    GNUNET_SCHEDULER_cancel (sc->terminate_task); 
-  if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
-    GNUNET_SCHEDULER_cancel (sc->timeout_task); 
-  if (NULL != sc->wh)
-    GNUNET_MESH_notify_transmit_ready_cancel (sc->wh);
-  if (NULL != sc->qe)
-    GNUNET_DATASTORE_cancel (sc->qe);
-  GNUNET_MESH_tunnel_destroy (sc->socket);
-  struct WriteQueueItem *wqi;
-  while (NULL != (wqi = sc->wqi_head))
-  {
-    GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
-                                sc->wqi_tail,
-                                wqi);
-    GNUNET_free (wqi);
-  }
-  GNUNET_CONTAINER_DLL_remove (sc_head,
-                              sc_tail,
-                              sc);
-  sc_count--;
-  GNUNET_free (sc);
-}
-
-
-/**
- * Task run to asynchronously terminate the stream due to timeout.
- *
- * @param cls the 'struct StreamClient'
- * @param tc scheduler context
- */ 
-static void
-timeout_stream_task (void *cls,
-                    const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct StreamClient *sc = cls;
-
-  sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
-  terminate_stream (sc);
-}
-
-
-/**
- * Reset the timeout for the stream client (due to activity).
- *
- * @param sc client handle to reset timeout for
- */
-static void
-refresh_timeout_task (struct StreamClient *sc)
-{
-  if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
-    GNUNET_SCHEDULER_cancel (sc->timeout_task); 
-  sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
-                                                  &timeout_stream_task,
-                                                  sc);
-}
-
-
-/**
- * We're done handling a request from a client, read the next one.
- *
- * @param sc client to continue reading requests from
- */
-static void
-continue_reading (struct StreamClient *sc)
-{
-  refresh_timeout_task (sc);
-}
-
-
-/**
- * Transmit the next entry from the write queue.
- *
- * @param sc where to process the write queue
- */
-static void
-continue_writing (struct StreamClient *sc);
-
-
-/**
- * Send a reply now, mesh is ready.
- *
- * @param cls closure with the struct StreamClient which sent the query
- * @param size number of bytes available in 'buf'
- * @param buf where to write the message
- * @return number of bytes written to 'buf'
- */
-static size_t
-write_continuation (void *cls,
-                   size_t size,
-                   void *buf)
-{
-  struct StreamClient *sc = cls;
-  struct WriteQueueItem *wqi;
-  size_t ret;
-
-  sc->wh = NULL;
-  if (NULL == (wqi = sc->wqi_head))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Write queue empty, reading more requests\n");
-    return 0;
-  }
-  if (0 == size)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Transmission of reply failed, terminating stream\n");
-    terminate_stream (sc);    
-    return 0;
-  }
-  GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
-                              sc->wqi_tail,
-                              wqi);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Transmitted %u byte reply via stream\n",
-             (unsigned int) size);
-  GNUNET_STATISTICS_update (GSF_stats,
-                           gettext_noop ("# Blocks transferred via stream"), 1,
-                           GNUNET_NO);
-  memcpy (buf, &wqi[1], ret = wqi->msize);
-  GNUNET_free (wqi);
-  continue_writing (sc);
-  return ret;
-}
-
-
-/**
- * Transmit the next entry from the write queue.
- *
- * @param sc where to process the write queue
- */
-static void
-continue_writing (struct StreamClient *sc)
-{
-  struct WriteQueueItem *wqi;
-
-  if (NULL != sc->wh)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Write pending, waiting for it to complete\n");
-    return; /* write already pending */
-  }
-  if (NULL == (wqi = sc->wqi_head))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Write queue empty, reading more requests\n");
-    continue_reading (sc);
-    return;
-  }
-  sc->wh = GNUNET_MESH_notify_transmit_ready (sc->socket, GNUNET_NO,
-                                             GNUNET_TIME_UNIT_FOREVER_REL,
-                                             wqi->msize,                       
              
-                                             &write_continuation,
-                                             sc);
-  if (NULL == sc->wh)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Write failed; terminating stream\n");
-    terminate_stream (sc);
-    return;
-  }
-}
-
-
-/**
- * Process a datum that was stored in the datastore.
- *
- * @param cls closure with the struct StreamClient which sent the query
- * @param key key for the content
- * @param size number of bytes in data
- * @param data content stored
- * @param type type of the content
- * @param priority priority of the content
- * @param anonymity anonymity-level for the content
- * @param expiration expiration time for the content
- * @param uid unique identifier for the datum;
- *        maybe 0 if no unique identifier is available
- */
-static void 
-handle_datastore_reply (void *cls,
-                       const struct GNUNET_HashCode * key,
-                       size_t size, const void *data,
-                       enum GNUNET_BLOCK_Type type,
-                       uint32_t priority,
-                       uint32_t anonymity,
-                       struct GNUNET_TIME_Absolute
-                       expiration, uint64_t uid)
-{
-  struct StreamClient *sc = cls;
-  size_t msize = size + sizeof (struct StreamReplyMessage);
-  struct WriteQueueItem *wqi;
-  struct StreamReplyMessage *srm;
-
-  sc->qe = NULL;
-  if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Performing on-demand encoding\n");
-    if (GNUNET_OK !=
-       GNUNET_FS_handle_on_demand_block (key,
-                                         size, data, type,
-                                         priority, anonymity,
-                                         expiration, uid,
-                                         &handle_datastore_reply,
-                                         sc))
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "On-demand encoding request failed\n");
-      continue_writing (sc);
-    }
-    return;
-  }
-  if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
-  {
-    GNUNET_break (0);
-    continue_writing (sc);
-    return;
-  }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Starting transmission of %u byte reply for query `%s' via 
stream\n",
-             (unsigned int) size,
-             GNUNET_h2s (key));
-  wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
-  wqi->msize = msize;
-  srm = (struct StreamReplyMessage *) &wqi[1];
-  srm->header.size = htons ((uint16_t) msize);
-  srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
-  srm->type = htonl (type);
-  srm->expiration = GNUNET_TIME_absolute_hton (expiration);
-  memcpy (&srm[1], data, size);
-  sc->reply_size = msize;
-  GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
-                              sc->wqi_tail,
-                              wqi);
-  continue_writing (sc);
-}
-
-
-/**
- * Functions with this signature are called whenever a
- * complete query message is received.
- *
- * Do not call GNUNET_SERVER_mst_destroy in callback
- *
- * @param cls closure with the 'struct StreamClient'
- * @param client identification of the client, NULL
- * @param message the actual message
- * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
- */
-static int
-request_cb (void *cls,
-           struct GNUNET_MESH_Tunnel *tunnel,
-           void **tunnel_ctx,
-           const struct GNUNET_PeerIdentity *sender,
-           const struct GNUNET_MessageHeader *message)
-{
-  struct StreamClient *sc = *tunnel_ctx;
-  const struct StreamQueryMessage *sqm;
-
-  sqm = (const struct StreamQueryMessage *) message;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Received query for `%s' via stream\n",
-             GNUNET_h2s (&sqm->query));
-  GNUNET_STATISTICS_update (GSF_stats,
-                           gettext_noop ("# queries received via stream"), 1,
-                           GNUNET_NO);
-  refresh_timeout_task (sc);
-  sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
-                                    0,
-                                    &sqm->query,
-                                    ntohl (sqm->type),
-                                    0 /* priority */, 
-                                    GSF_datastore_queue_size,
-                                    GNUNET_TIME_UNIT_FOREVER_REL,
-                                    &handle_datastore_reply, sc);
-  if (NULL == sc->qe)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Queueing request with datastore failed (queue full?)\n");
-    continue_writing (sc);
-  }
-  return GNUNET_OK;
-}
-
-
-/**
- * Functions of this type are called upon new stream connection from other 
peers
- * or upon binding error which happen when the app_port given in
- * GNUNET_STREAM_listen() is already taken.
- *
- * @param cls the closure from GNUNET_STREAM_listen
- * @param socket the socket representing the stream
- * @param initiator the identity of the peer who wants to establish a stream
- *            with us; NULL on binding error
- * @return initial tunnel context (our 'struct StreamClient')
- */
-static void *
-accept_cb (void *cls,
-          struct GNUNET_MESH_Tunnel *socket,
-          const struct GNUNET_PeerIdentity *initiator,
-          uint32_t port)
-{
-  struct StreamClient *sc;
-
-  GNUNET_assert (NULL != socket);
-  if (sc_count >= sc_count_max)
-  {
-    GNUNET_STATISTICS_update (GSF_stats,
-                             gettext_noop ("# stream client connections 
rejected"), 1,
-                             GNUNET_NO);
-    GNUNET_MESH_tunnel_destroy (socket);
-    return NULL;
-  }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Accepting inbound stream connection from `%s'\n",
-             GNUNET_i2s (initiator));
-  GNUNET_STATISTICS_update (GSF_stats,
-                           gettext_noop ("# stream connections active"), 1,
-                           GNUNET_NO);
-  sc = GNUNET_malloc (sizeof (struct StreamClient));
-  sc->socket = socket;
-  GNUNET_CONTAINER_DLL_insert (sc_head,
-                              sc_tail,
-                              sc);
-  sc_count++;
-  refresh_timeout_task (sc);
-  return sc;
-}
-
-
-/**
- * Initialize subsystem for non-anonymous file-sharing.
- */
-void
-GSF_stream_start ()
-{
-  static const struct GNUNET_MESH_MessageHandler handlers[] = {
-    { &request_cb, GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY, sizeof (struct 
StreamQueryMessage)},
-    { &reply_cb, GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY, 0 },
-    { NULL, 0, 0 }
-  };
-  static const uint32_t ports[] = {
-    GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
-    0
-  };
-
-  stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
-  if (GNUNET_YES ==
-      GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
-                                            "fs",
-                                            "MAX_STREAM_CLIENTS",
-                                            &sc_count_max))
-  {
-    listen_socket = GNUNET_MESH_connect (GSF_cfg,
-                                        NULL,
-                                        &accept_cb,
-                                        NULL /* FIXME: have a cleanup 
callback? */,
-                                        handlers,
-                                        ports);
-  } 
-}
-
-
-/**
- * Function called on each active streams to shut them down.
- *
- * @param cls NULL
- * @param key target peer, unused
- * @param value the 'struct StreamHandle' to destroy
- * @return GNUNET_YES (continue to iterate)
- */
-static int
-release_streams (void *cls,
-                const struct GNUNET_HashCode *key,
-                void *value)
-{
-  struct StreamHandle *sh = value;
-
-  destroy_stream_handle (sh);
-  return GNUNET_YES;
-}
-
-
-/**
- * Shutdown subsystem for non-anonymous file-sharing.
- */
-void
-GSF_stream_stop ()
-{
-  struct StreamClient *sc;
-
-  while (NULL != (sc = sc_head))
-    terminate_stream (sc);
-  if (NULL != listen_socket)
-  {
-    GNUNET_MESH_disconnect (listen_socket);
-    listen_socket = NULL;
-  }
-  GNUNET_CONTAINER_multihashmap_iterate (stream_map,
-                                        &release_streams,
-                                        NULL);
-  GNUNET_CONTAINER_multihashmap_destroy (stream_map);
-  stream_map = NULL;
-}
-
-/* end of gnunet-service-fs_stream.c */

Deleted: gnunet/src/fs/gnunet-service-fs_stream.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs_stream.h    2013-06-04 13:10:47 UTC (rev 
27364)
+++ gnunet/src/fs/gnunet-service-fs_stream.h    2013-06-04 13:14:21 UTC (rev 
27365)
@@ -1,91 +0,0 @@
-/*
-     This file is part of GNUnet.
-     (C) 2012 Christian Grothoff (and other contributing authors)
-
-     GNUnet is free software; you can redistribute it and/or modify
-     it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 3, or (at your
-     option) any later version.
-
-     GNUnet is distributed in the hope that it will be useful, but
-     WITHOUT ANY WARRANTY; without even the implied warranty of
-     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-     General Public License for more details.
-
-     You should have received a copy of the GNU General Public License
-     along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
-*/
-
-/**
- * @file fs/gnunet-service-fs_stream.h
- * @brief non-anonymous file-transfer
- * @author Christian Grothoff
- */
-#ifndef GNUNET_SERVICE_FS_STREAM_H
-#define GNUNET_SERVICE_FS_STREAM_H
-
-/**
- * Handle for a request that is going out via stream API.
- */
-struct GSF_StreamRequest;
-
-
-/**
- * Function called with a reply from the stream.
- * 
- * @param cls closure
- * @param type type of the block, ANY on error
- * @param expiration expiration time for the block
- * @param data_size number of bytes in 'data', 0 on error
- * @param data reply block data, NULL on error
- */
-typedef void (*GSF_StreamReplyProcessor)(void *cls,
-                                        enum GNUNET_BLOCK_Type type,
-                                        struct GNUNET_TIME_Absolute expiration,
-                                        size_t data_size,
-                                        const void *data);
-
-
-/**
- * Look for a block by directly contacting a particular peer.
- *
- * @param target peer that should have the block
- * @param query hash to query for the block
- * @param type desired type for the block
- * @param proc function to call with result
- * @param proc_cls closure for 'proc'
- * @return handle to cancel the operation
- */
-struct GSF_StreamRequest *
-GSF_stream_query (const struct GNUNET_PeerIdentity *target,
-                 const struct GNUNET_HashCode *query,
-                 enum GNUNET_BLOCK_Type type,
-                 GSF_StreamReplyProcessor proc, void *proc_cls);
-
-
-/**
- * Cancel an active request; must not be called after 'proc'
- * was calld.
- *
- * @param sr request to cancel
- */
-void
-GSF_stream_query_cancel (struct GSF_StreamRequest *sr);
-
-
-/**
- * Initialize subsystem for non-anonymous file-sharing.
- */
-void
-GSF_stream_start (void);
-
-
-/**
- * Shutdown subsystem for non-anonymous file-sharing.
- */
-void
-GSF_stream_stop (void);
-
-#endif

Modified: gnunet/src/fs/test_fs_download.c
===================================================================
--- gnunet/src/fs/test_fs_download.c    2013-06-04 13:10:47 UTC (rev 27364)
+++ gnunet/src/fs/test_fs_download.c    2013-06-04 13:14:21 UTC (rev 27365)
@@ -329,10 +329,10 @@
     binary_name = "test-fs-download-indexed";
     config_name = "test_fs_download_indexed.conf";
   }
-  if (NULL != strstr (argv[0], "stream"))
+  if (NULL != strstr (argv[0], "mesh"))
   {
-    binary_name = "test-fs-download-stream";
-    config_name = "test_fs_download_stream.conf";
+    binary_name = "test-fs-download-mesh";
+    config_name = "test_fs_download_mesh.conf";
   }
   if (0 != GNUNET_TESTING_peer_run (binary_name,
                                    config_name,

Copied: gnunet/src/fs/test_fs_download_mesh.conf (from rev 27357, 
gnunet/src/fs/test_fs_download_stream.conf)
===================================================================
--- gnunet/src/fs/test_fs_download_mesh.conf                            (rev 0)
+++ gnunet/src/fs/test_fs_download_mesh.conf    2013-06-04 13:14:21 UTC (rev 
27365)
@@ -0,0 +1,10 @@
address@hidden@ test_fs_defaults.conf
+[PATHS]
+SERVICEHOME = /tmp/gnunet-test-fs-download/
+
+[download-test]
+# set to 'YES' to test non-anonymous download
+USE_STREAM = NO
+
+# set to 'YES' to use indexing
+USE_INDEX = NO

Deleted: gnunet/src/fs/test_fs_download_stream.conf
===================================================================
--- gnunet/src/fs/test_fs_download_stream.conf  2013-06-04 13:10:47 UTC (rev 
27364)
+++ gnunet/src/fs/test_fs_download_stream.conf  2013-06-04 13:14:21 UTC (rev 
27365)
@@ -1,10 +0,0 @@
address@hidden@ test_fs_defaults.conf
-[PATHS]
-SERVICEHOME = /tmp/gnunet-test-fs-download/
-
-[download-test]
-# set to 'YES' to test non-anonymous download
-USE_STREAM = NO
-
-# set to 'YES' to use indexing
-USE_INDEX = NO

Modified: gnunet/src/fs/test_gnunet_service_fs_p2p.c
===================================================================
--- gnunet/src/fs/test_gnunet_service_fs_p2p.c  2013-06-04 13:10:47 UTC (rev 
27364)
+++ gnunet/src/fs/test_gnunet_service_fs_p2p.c  2013-06-04 13:14:21 UTC (rev 
27365)
@@ -144,7 +144,7 @@
 
   progname = argv[0];
   if (NULL != strstr (progname, "stream"))
-    config = "test_gnunet_service_fs_p2p_stream.conf";
+    config = "test_gnunet_service_fs_p2p_mesh.conf";
   else
     config = "fs_test_lib_data.conf";
   (void) GNUNET_TESTBED_test_run ("test-gnunet-service-fs-p2p",

Copied: gnunet/src/fs/test_gnunet_service_fs_p2p_mesh.conf (from rev 27357, 
gnunet/src/fs/test_gnunet_service_fs_p2p_stream.conf)
===================================================================
--- gnunet/src/fs/test_gnunet_service_fs_p2p_mesh.conf                          
(rev 0)
+++ gnunet/src/fs/test_gnunet_service_fs_p2p_mesh.conf  2013-06-04 13:14:21 UTC 
(rev 27365)
@@ -0,0 +1,16 @@
address@hidden@ fs_test_lib_data.conf
+
+[fs]
+# FIXME: this option needs to be set for the
+# testcase to truly work; however, as the code
+# is not finished, not setting the option should
+# allow the test to at least pass for now...
+DISABLE_ANON_TRANSFER = YES
+
+# Do we cache content from other nodes? (may improve anonymity)
+CONTENT_CACHING = NO
+
+# Do we send unsolicited data to other nodes if we have excess bandwidth?
+# (may improve anonymity, probably not a good idea if content_caching is NO)
+CONTENT_PUSHING = NO
+

Deleted: gnunet/src/fs/test_gnunet_service_fs_p2p_stream.conf
===================================================================
--- gnunet/src/fs/test_gnunet_service_fs_p2p_stream.conf        2013-06-04 
13:10:47 UTC (rev 27364)
+++ gnunet/src/fs/test_gnunet_service_fs_p2p_stream.conf        2013-06-04 
13:14:21 UTC (rev 27365)
@@ -1,16 +0,0 @@
address@hidden@ fs_test_lib_data.conf
-
-[fs]
-# FIXME: this option needs to be set for the
-# testcase to truly work; however, as the code
-# is not finished, not setting the option should
-# allow the test to at least pass for now...
-DISABLE_ANON_TRANSFER = YES
-
-# Do we cache content from other nodes? (may improve anonymity)
-CONTENT_CACHING = NO
-
-# Do we send unsolicited data to other nodes if we have excess bandwidth?
-# (may improve anonymity, probably not a good idea if content_caching is NO)
-CONTENT_PUSHING = NO
-




reply via email to

[Prev in Thread] Current Thread [Next in Thread]