gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r28142 - gnunet/src/fs


From: gnunet
Subject: [GNUnet-SVN] r28142 - gnunet/src/fs
Date: Thu, 18 Jul 2013 13:06:23 +0200

Author: grothoff
Date: 2013-07-18 13:06:23 +0200 (Thu, 18 Jul 2013)
New Revision: 28142

Added:
   gnunet/src/fs/gnunet-service-fs_mesh_server.c
Removed:
   gnunet/src/fs/gnunet-service-fs_mesh.c
Modified:
   gnunet/src/fs/Makefile.am
   gnunet/src/fs/fs.conf.in
   gnunet/src/fs/gnunet-service-fs.c
   gnunet/src/fs/gnunet-service-fs_mesh.h
   gnunet/src/fs/test_gnunet_service_fs_p2p_mesh.conf
Log:
-splitting mesh into server and client parts

Modified: gnunet/src/fs/Makefile.am
===================================================================
--- gnunet/src/fs/Makefile.am   2013-07-18 11:04:11 UTC (rev 28141)
+++ gnunet/src/fs/Makefile.am   2013-07-18 11:06:23 UTC (rev 28142)
@@ -190,7 +190,8 @@
  gnunet-service-fs_pr.c gnunet-service-fs_pr.h \
  gnunet-service-fs_push.c gnunet-service-fs_push.h \
  gnunet-service-fs_put.c gnunet-service-fs_put.h \
- gnunet-service-fs_mesh.c gnunet-service-fs_mesh.h 
+ gnunet-service-fs_mesh_client.c gnunet-service-fs_mesh.h \
+ gnunet-service-fs_mesh_server.c
 gnunet_service_fs_LDADD =  \
  $(top_builddir)/src/fs/libgnunetfs.la \
  $(top_builddir)/src/dht/libgnunetdht.la \

Modified: gnunet/src/fs/fs.conf.in
===================================================================
--- gnunet/src/fs/fs.conf.in    2013-07-18 11:04:11 UTC (rev 28141)
+++ gnunet/src/fs/fs.conf.in    2013-07-18 11:06:23 UTC (rev 28142)
@@ -60,7 +60,7 @@
 # some reasonable level.  And if we have a very, very large
 # number, we probably won't have enough bandwidth to suppor them
 # well anyway, so better have a moderate cap.
-MAX_STREAM_CLIENTS = 128
+MAX_MESH_CLIENTS = 128
 
 
 [gnunet-auto-share]

Modified: gnunet/src/fs/gnunet-service-fs.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs.c   2013-07-18 11:04:11 UTC (rev 28141)
+++ gnunet/src/fs/gnunet-service-fs.c   2013-07-18 11:06:23 UTC (rev 28142)
@@ -475,7 +475,8 @@
 static void
 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  GSF_mesh_stop ();
+  GSF_mesh_stop_client ();
+  GSF_mesh_stop_server ();
   if (NULL != GSF_core)
   {
     GNUNET_CORE_disconnect (GSF_core);
@@ -646,7 +647,8 @@
       GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, &age_cover_counters,
                                     NULL);
   datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
-  GSF_mesh_start ();
+  GSF_mesh_start_server ();
+  GSF_mesh_start_client ();
   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
                                 NULL);
   return GNUNET_OK;

Deleted: gnunet/src/fs/gnunet-service-fs_mesh.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_mesh.c      2013-07-18 11:04:11 UTC (rev 
28141)
+++ gnunet/src/fs/gnunet-service-fs_mesh.c      2013-07-18 11:06:23 UTC (rev 
28142)
@@ -1,1245 +0,0 @@
-/*
-     This file is part of GNUnet.
-     (C) 2012, 2013 Christian Grothoff (and other contributing authors)
-
-     GNUnet is free software; you can redistribute it and/or modify
-     it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 3, or (at your
-     option) any later version.
-
-     GNUnet is distributed in the hope that it will be useful, but
-     WITHOUT ANY WARRANTY; without even the implied warranty of
-     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-     General Public License for more details.
-
-     You should have received a copy of the GNU General Public License
-     along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
-*/
-
-/**
- * @file fs/gnunet-service-fs_mesh.c
- * @brief non-anonymous file-transfer
- * @author Christian Grothoff
- *
- * TODO:
- * - MESH2 API doesn't allow flow control for server yet (needed!)
- * - likely need to register clean up handler with mesh to handle
- *   client disconnect (likely leaky right now)
- * - server is optional, currently client code will NPE if we have
- *   no server, again MESH2 API requirement forcing this for now
- * - message handlers are symmetric for client/server, should be
- *   separated (currently clients can get requests and servers can
- *   handle answers, not good)
- * - code is entirely untested
- * - might have overlooked a few possible simplifications
- * - PORT is set to old application type, unsure if we should keep
- *   it that way (fine for now)
- */
-#include "platform.h"
-#include "gnunet_constants.h"
-#include "gnunet_util_lib.h"
-#include "gnunet_mesh_service.h"
-#include "gnunet_protocols.h"
-#include "gnunet_applications.h"
-#include "gnunet-service-fs.h"
-#include "gnunet-service-fs_indexing.h"
-#include "gnunet-service-fs_mesh.h"
-
-/**
- * After how long do we termiante idle connections?
- */
-#define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 
2)
-
-/**
- * After how long do we reset connections without replies?
- */
-#define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS, 30)
-
-
-/**
- * A message in the queue to be written to the mesh.
- */
-struct WriteQueueItem
-{
-  /**
-   * Kept in a DLL.
-   */
-  struct WriteQueueItem *next;
-
-  /**
-   * Kept in a DLL.
-   */
-  struct WriteQueueItem *prev;
-
-  /**
-   * Number of bytes of payload, allocated at the end of this struct.
-   */
-  size_t msize;
-};
-
-
-/**
- * Information we keep around for each active meshing client.
- */
-struct MeshClient
-{
-  /**
-   * DLL
-   */ 
-  struct MeshClient *next;
-
-  /**
-   * DLL
-   */ 
-  struct MeshClient *prev;
-
-  /**
-   * Socket for communication.
-   */ 
-  struct GNUNET_MESH_Tunnel *socket;
-
-  /**
-   * Handle for active write operation, or NULL.
-   */ 
-  struct GNUNET_MESH_TransmitHandle *wh;
-
-  /**
-   * Head of write queue.
-   */
-  struct WriteQueueItem *wqi_head;
-
-  /**
-   * Tail of write queue.
-   */
-  struct WriteQueueItem *wqi_tail;
-  
-  /**
-   * Current active request to the datastore, if we have one pending.
-   */
-  struct GNUNET_DATASTORE_QueueEntry *qe;
-
-  /**
-   * Task that is scheduled to asynchronously terminate the connection.
-   */
-  GNUNET_SCHEDULER_TaskIdentifier terminate_task;
-
-  /**
-   * Task that is scheduled to terminate idle connections.
-   */
-  GNUNET_SCHEDULER_TaskIdentifier timeout_task;
-
-  /**
-   * Size of the last write that was initiated.
-   */ 
-  size_t reply_size;
-
-};
-
-
-/**
- * Query from one peer, asking the other for CHK-data.
- */
-struct MeshQueryMessage
-{
-
-  /**
-   * Type is GNUNET_MESSAGE_TYPE_FS_MESH_QUERY.
-   */
-  struct GNUNET_MessageHeader header;
-
-  /**
-   * Block type must be DBLOCK or IBLOCK.
-   */
-  uint32_t type;
-
-  /**
-   * Query hash from CHK (hash of encrypted block).
-   */
-  struct GNUNET_HashCode query;
-
-};
-
-
-/**
- * Reply to a MeshQueryMessage.
- */
-struct MeshReplyMessage
-{
-
-  /**
-   * Type is GNUNET_MESSAGE_TYPE_FS_MESH_REPLY.
-   */
-  struct GNUNET_MessageHeader header;
-
-  /**
-   * Block type must be DBLOCK or IBLOCK.
-   */
-  uint32_t type;
-
-  /**
-   * Expiration time for the block.
-   */
-  struct GNUNET_TIME_AbsoluteNBO expiration;
-
-  /* followed by the encrypted block */
-
-};
-
-
-/** 
- * Handle for a mesh to another peer.
- */
-struct MeshHandle;
-
-
-/**
- * Handle for a request that is going out via mesh API.
- */
-struct GSF_MeshRequest
-{
-
-  /**
-   * DLL.
-   */
-  struct GSF_MeshRequest *next;
-
-  /**
-   * DLL.
-   */
-  struct GSF_MeshRequest *prev;
-
-  /**
-   * Which mesh is this request associated with?
-   */
-  struct MeshHandle *sh;
-
-  /**
-   * Function to call with the result.
-   */
-  GSF_MeshReplyProcessor proc;
-
-  /**
-   * Closure for 'proc'
-   */
-  void *proc_cls;
-
-  /**
-   * Query to transmit to the other peer.
-   */
-  struct GNUNET_HashCode query;
-
-  /**
-   * Desired type for the reply.
-   */
-  enum GNUNET_BLOCK_Type type;
-
-  /**
-   * Did we transmit this request already? YES if we are
-   * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
-   */
-  int was_transmitted;
-};
-
-
-/** 
- * Handle for a mesh to another peer.
- */
-struct MeshHandle
-{
-  /**
-   * Head of DLL of pending requests on this mesh.
-   */
-  struct GSF_MeshRequest *pending_head;
-
-  /**
-   * Tail of DLL of pending requests on this mesh.
-   */
-  struct GSF_MeshRequest *pending_tail;
-
-  /**
-   * Map from query to 'struct GSF_MeshRequest's waiting for
-   * a reply.
-   */
-  struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
-
-  /**
-   * Connection to the other peer.
-   */
-  struct GNUNET_MESH_Tunnel *mesh;
-
-  /**
-   * Handle for active write operation, or NULL.
-   */ 
-  struct GNUNET_MESH_TransmitHandle *wh;
-
-  /**
-   * Which peer does this mesh go to?
-   */ 
-  struct GNUNET_PeerIdentity target;
-
-  /**
-   * Task to kill inactive meshs (we keep them around for
-   * a few seconds to give the application a chance to give
-   * us another query).
-   */
-  GNUNET_SCHEDULER_TaskIdentifier timeout_task;
-
-  /**
-   * Task to reset meshs that had errors (asynchronously,
-   * as we may not be able to do it immediately during a
-   * callback from the mesh API).
-   */
-  GNUNET_SCHEDULER_TaskIdentifier reset_task;
-
-  /**
-   * Is this mesh ready for transmission?
-   */
-  int is_ready;
-
-};
-
-
-/**
- * Listen socket for incoming requests.
- */
-static struct GNUNET_MESH_Handle *listen_socket;
-
-/**
- * Head of DLL of mesh clients.
- */ 
-static struct MeshClient *sc_head;
-
-/**
- * Tail of DLL of mesh clients.
- */ 
-static struct MeshClient *sc_tail;
-
-/**
- * Number of active mesh clients in the 'sc_*'-DLL.
- */
-static unsigned int sc_count;
-
-/**
- * Maximum allowed number of mesh clients.
- */
-static unsigned long long sc_count_max;
-
-/**
- * Map from peer identities to 'struct MeshHandles' with meshs to
- * those peers.
- */
-static struct GNUNET_CONTAINER_MultiHashMap *mesh_map;
-
-
-/* ********************* client-side code ************************* */
-
-/**
- * Iterator called on each entry in a waiting map to 
- * call the 'proc' continuation and release associated
- * resources.
- *
- * @param cls the 'struct MeshHandle'
- * @param key the key of the entry in the map (the query)
- * @param value the 'struct GSF_MeshRequest' to clean up
- * @return GNUNET_YES (continue to iterate)
- */
-static int
-free_waiting_entry (void *cls,
-                   const struct GNUNET_HashCode *key,
-                   void *value)
-{
-  struct GSF_MeshRequest *sr = value;
-
-  sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
-           GNUNET_TIME_UNIT_FOREVER_ABS,
-           0, NULL);
-  GSF_mesh_query_cancel (sr);
-  return GNUNET_YES;
-}
-
-
-/**
- * Destroy a mesh handle.
- *
- * @param sh mesh to process
- */
-static void
-destroy_mesh_handle (struct MeshHandle *sh)
-{
-  struct GSF_MeshRequest *sr;
-
-  while (NULL != (sr = sh->pending_head))
-  {
-    sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
-             GNUNET_TIME_UNIT_FOREVER_ABS,
-             0, NULL);
-    GSF_mesh_query_cancel (sr);
-  }
-  GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
-                                        &free_waiting_entry,
-                                        sh);
-  if (NULL != sh->wh)
-    GNUNET_MESH_notify_transmit_ready_cancel (sh->wh);
-  if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
-    GNUNET_SCHEDULER_cancel (sh->timeout_task);
-  if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
-    GNUNET_SCHEDULER_cancel (sh->reset_task);
-  GNUNET_MESH_tunnel_destroy (sh->mesh);
-  GNUNET_assert (GNUNET_OK ==
-                GNUNET_CONTAINER_multihashmap_remove (mesh_map,
-                                                      &sh->target.hashPubKey,
-                                                      sh));
-  GNUNET_CONTAINER_multihashmap_destroy (sh->waiting_map);
-  GNUNET_free (sh);
-}
-
-
-/**
- * Transmit pending requests via the mesh.
- *
- * @param sh mesh to process
- */
-static void
-transmit_pending (struct MeshHandle *sh);
-
-
-/**
- * Iterator called on each entry in a waiting map to 
- * move it back to the pending list.
- *
- * @param cls the 'struct MeshHandle'
- * @param key the key of the entry in the map (the query)
- * @param value the 'struct GSF_MeshRequest' to move to pending
- * @return GNUNET_YES (continue to iterate)
- */
-static int
-move_to_pending (void *cls,
-                const struct GNUNET_HashCode *key,
-                void *value)
-{
-  struct MeshHandle *sh = cls;
-  struct GSF_MeshRequest *sr = value;
-  
-  GNUNET_assert (GNUNET_YES ==
-                GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
-                                                      key,
-                                                      value));
-  GNUNET_CONTAINER_DLL_insert (sh->pending_head,
-                              sh->pending_tail,
-                              sr);
-  sr->was_transmitted = GNUNET_NO;
-  return GNUNET_YES;
-}
-
-
-/**
- * We had a serious error, tear down and re-create mesh from scratch.
- *
- * @param sh mesh to reset
- */
-static void
-reset_mesh (struct MeshHandle *sh)
-{
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Resetting mesh to %s\n",
-             GNUNET_i2s (&sh->target));
-  GNUNET_MESH_tunnel_destroy (sh->mesh);
-  sh->is_ready = GNUNET_NO;
-  GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
-                                        &move_to_pending,
-                                        sh);
-  sh->mesh = GNUNET_MESH_tunnel_create (listen_socket,
-                                         sh,
-                                         &sh->target,
-                                         
GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
-                      GNUNET_YES,
-                      GNUNET_YES);
-}
-
-
-/**
- * Task called when it is time to destroy an inactive mesh.
- *
- * @param cls the 'struct MeshHandle' to tear down
- * @param tc scheduler context, unused
- */
-static void
-mesh_timeout (void *cls,
-               const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct MeshHandle *sh = cls;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Timeout on mesh to %s\n",
-             GNUNET_i2s (&sh->target));
-  sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
-  destroy_mesh_handle (sh);
-}
-
-
-/**
- * Task called when it is time to reset an mesh.
- *
- * @param cls the 'struct MeshHandle' to tear down
- * @param tc scheduler context, unused
- */
-static void
-reset_mesh_task (void *cls,
-                  const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct MeshHandle *sh = cls;
-
-  sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
-  reset_mesh (sh);
-}
-
-
-/**
- * We had a serious error, tear down and re-create mesh from scratch,
- * but do so asynchronously.
- *
- * @param sh mesh to reset
- */
-static void
-reset_mesh_async (struct MeshHandle *sh)
-{
-  if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
-    GNUNET_SCHEDULER_cancel (sh->reset_task);
-  sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_mesh_task,
-                                            sh);
-}
-
-
-/**
- * Functions of this signature are called whenever we are ready to transmit
- * query via a mesh.
- *
- * @param cls the struct MeshHandle for which we did the write call
- * @param size the number of bytes that can be written to 'buf'
- * @param buf where to write the message
- * @return number of bytes written to 'buf'
- */
-static size_t
-transmit_sqm (void *cls,
-             size_t size,
-             void *buf)
-{
-  struct MeshHandle *sh = cls;
-  struct MeshQueryMessage sqm;
-  struct GSF_MeshRequest *sr;
-
-  sh->wh = NULL;
-  if (NULL == buf)
-  {
-    reset_mesh (sh);
-    return 0;
-  }
-  sr = sh->pending_head;
-  if (NULL == sr)
-    return 0;
-  GNUNET_assert (size >= sizeof (struct MeshQueryMessage));
-  GNUNET_CONTAINER_DLL_remove (sh->pending_head,
-                              sh->pending_tail,
-                              sr);
-  GNUNET_CONTAINER_multihashmap_put (sh->waiting_map,
-                                    &sr->query,
-                                    sr,
-                                    
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Sending query via mesh to %s\n",
-             GNUNET_i2s (&sh->target));
-  sr->was_transmitted = GNUNET_YES;
-  sqm.header.size = htons (sizeof (sqm));
-  sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MESH_QUERY);
-  sqm.type = htonl (sr->type);
-  sqm.query = sr->query;
-  memcpy (buf, &sqm, sizeof (sqm));
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Successfully transmitted %u bytes via mesh to %s\n",
-             (unsigned int) size,
-             GNUNET_i2s (&sh->target));
-  transmit_pending (sh);
-  return sizeof (sqm);
-}
-         
-
-/**
- * Transmit pending requests via the mesh.
- *
- * @param sh mesh to process
- */
-static void
-transmit_pending (struct MeshHandle *sh)
-{
-  if (NULL != sh->wh)
-    return;
-  sh->wh = GNUNET_MESH_notify_transmit_ready (sh->mesh, GNUNET_YES /* allow 
cork */,
-                                             GNUNET_TIME_UNIT_FOREVER_REL,
-                                             sizeof (struct MeshQueryMessage),
-                                             &transmit_sqm, sh);
-}
-
-
-/**
- * Closure for 'handle_reply'.
- */
-struct HandleReplyClosure
-{
-
-  /**
-   * Reply payload.
-   */ 
-  const void *data;
-
-  /**
-   * Expiration time for the block.
-   */
-  struct GNUNET_TIME_Absolute expiration;
-
-  /**
-   * Number of bytes in 'data'.
-   */
-  size_t data_size;
-
-  /** 
-   * Type of the block.
-   */
-  enum GNUNET_BLOCK_Type type;
-  
-  /**
-   * Did we have a matching query?
-   */
-  int found;
-};
-
-
-/**
- * Iterator called on each entry in a waiting map to 
- * process a result.
- *
- * @param cls the 'struct HandleReplyClosure'
- * @param key the key of the entry in the map (the query)
- * @param value the 'struct GSF_MeshRequest' to handle result for
- * @return GNUNET_YES (continue to iterate)
- */
-static int
-handle_reply (void *cls,
-             const struct GNUNET_HashCode *key,
-             void *value)
-{
-  struct HandleReplyClosure *hrc = cls;
-  struct GSF_MeshRequest *sr = value;
-  
-  sr->proc (sr->proc_cls,
-           hrc->type,
-           hrc->expiration,
-           hrc->data_size,
-           hrc->data);
-  GSF_mesh_query_cancel (sr);
-  hrc->found = GNUNET_YES;
-  return GNUNET_YES;
-}
-
-
-/**
- * Functions with this signature are called whenever a
- * complete reply is received.
- *
- * @param cls closure with the 'struct MeshHandle'
- * @param tunnel tunnel handle
- * @param tunnel_ctx tunnel context
- * @param message the actual message
- * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
- */
-static int
-reply_cb (void *cls,
-         struct GNUNET_MESH_Tunnel *tunnel,
-         void **tunnel_ctx,
-          const struct GNUNET_MessageHeader *message)
-{
-  struct MeshHandle *sh = *tunnel_ctx;
-  const struct MeshReplyMessage *srm;
-  struct HandleReplyClosure hrc;
-  uint16_t msize;
-  enum GNUNET_BLOCK_Type type;
-  struct GNUNET_HashCode query;
-
-  msize = ntohs (message->size);
-  if (sizeof (struct MeshReplyMessage) > msize)
-  {
-    GNUNET_break_op (0);
-    reset_mesh_async (sh);
-    return GNUNET_SYSERR;
-  }
-  srm = (const struct MeshReplyMessage *) message;
-  msize -= sizeof (struct MeshReplyMessage);
-  type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
-  if (GNUNET_YES !=
-      GNUNET_BLOCK_get_key (GSF_block_ctx,
-                           type,
-                           &srm[1], msize, &query))
-  {
-    GNUNET_break_op (0); 
-    reset_mesh_async (sh);
-    return GNUNET_SYSERR;
-  }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Received reply `%s' via mesh\n",
-             GNUNET_h2s (&query));
-  GNUNET_STATISTICS_update (GSF_stats,
-                           gettext_noop ("# replies received via mesh"), 1,
-                           GNUNET_NO);
-  hrc.data = &srm[1];
-  hrc.data_size = msize;
-  hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
-  hrc.type = type;
-  hrc.found = GNUNET_NO;
-  GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map,
-                                             &query,
-                                             &handle_reply,
-                                             &hrc);
-  if (GNUNET_NO == hrc.found)
-  {
-    GNUNET_STATISTICS_update (GSF_stats,
-                             gettext_noop ("# replies received via mesh 
dropped"), 1,
-                             GNUNET_NO);
-    return GNUNET_OK;
-  }
-  return GNUNET_OK;
-}
-
-
-/**
- * Get (or create) a mesh to talk to the given peer.
- *
- * @param target peer we want to communicate with
- */
-static struct MeshHandle *
-get_mesh (const struct GNUNET_PeerIdentity *target)
-{
-  struct MeshHandle *sh;
-
-  sh = GNUNET_CONTAINER_multihashmap_get (mesh_map,
-                                         &target->hashPubKey);
-  if (NULL != sh)
-  {
-    if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
-    {
-      GNUNET_SCHEDULER_cancel (sh->timeout_task);
-      sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
-    }
-    return sh;
-  }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Creating mesh to %s\n",
-             GNUNET_i2s (target));
-  sh = GNUNET_malloc (sizeof (struct MeshHandle));
-  sh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
-                                                &reset_mesh_task,
-                                                sh);
-  sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES);
-  sh->target = *target;
-  sh->mesh = GNUNET_MESH_tunnel_create (listen_socket,
-                                         sh,
-                                         &sh->target,
-                                         
GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
-                      GNUNET_NO,
-                      GNUNET_YES);
-  GNUNET_assert (GNUNET_OK ==
-                GNUNET_CONTAINER_multihashmap_put (mesh_map,
-                                                   &sh->target.hashPubKey,
-                                                   sh,
-                                                   
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-  return sh;
-}
-
-
-/**
- * Look for a block by directly contacting a particular peer.
- *
- * @param target peer that should have the block
- * @param query hash to query for the block
- * @param type desired type for the block
- * @param proc function to call with result
- * @param proc_cls closure for 'proc'
- * @return handle to cancel the operation
- */
-struct GSF_MeshRequest *
-GSF_mesh_query (const struct GNUNET_PeerIdentity *target,
-                 const struct GNUNET_HashCode *query,
-                 enum GNUNET_BLOCK_Type type,
-                 GSF_MeshReplyProcessor proc, void *proc_cls)
-{
-  struct MeshHandle *sh;
-  struct GSF_MeshRequest *sr;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Preparing to send query for %s via mesh to %s\n",
-             GNUNET_h2s (query),
-             GNUNET_i2s (target));
-  sh = get_mesh (target);
-  sr = GNUNET_malloc (sizeof (struct GSF_MeshRequest));
-  sr->sh = sh;
-  sr->proc = proc;
-  sr->proc_cls = proc_cls;
-  sr->type = type;
-  sr->query = *query;
-  GNUNET_CONTAINER_DLL_insert (sh->pending_head,
-                              sh->pending_tail,
-                              sr);
-  if (GNUNET_YES == sh->is_ready)
-    transmit_pending (sh);
-  return sr;
-}
-
-
-/**
- * Cancel an active request; must not be called after 'proc'
- * was calld.
- *
- * @param sr request to cancel
- */
-void
-GSF_mesh_query_cancel (struct GSF_MeshRequest *sr)
-{
-  struct MeshHandle *sh = sr->sh;
-
-  if (GNUNET_YES == sr->was_transmitted)
-    GNUNET_assert (GNUNET_OK ==
-                  GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
-                                                        &sr->query,
-                                                        sr));
-  else
-    GNUNET_CONTAINER_DLL_remove (sh->pending_head,
-                                sh->pending_tail,
-                                sr);
-  GNUNET_free (sr);
-  if ( (0 == GNUNET_CONTAINER_multihashmap_size (sh->waiting_map)) &&
-       (NULL == sh->pending_head) )
-    sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
-                                                    &mesh_timeout,
-                                                    sh);
-}
-
-
-/* ********************* server-side code ************************* */
-
-
-/**
- * We're done with a particular client, clean up.
- *
- * @param sc client to clean up
- */
-static void
-terminate_mesh (struct MeshClient *sc)
-{
-  GNUNET_STATISTICS_update (GSF_stats,
-                           gettext_noop ("# mesh connections active"), -1,
-                           GNUNET_NO);
-  if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
-    GNUNET_SCHEDULER_cancel (sc->terminate_task); 
-  if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
-    GNUNET_SCHEDULER_cancel (sc->timeout_task); 
-  if (NULL != sc->wh)
-    GNUNET_MESH_notify_transmit_ready_cancel (sc->wh);
-  if (NULL != sc->qe)
-    GNUNET_DATASTORE_cancel (sc->qe);
-  GNUNET_MESH_tunnel_destroy (sc->socket);
-  struct WriteQueueItem *wqi;
-  while (NULL != (wqi = sc->wqi_head))
-  {
-    GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
-                                sc->wqi_tail,
-                                wqi);
-    GNUNET_free (wqi);
-  }
-  GNUNET_CONTAINER_DLL_remove (sc_head,
-                              sc_tail,
-                              sc);
-  sc_count--;
-  GNUNET_free (sc);
-}
-
-
-/**
- * Task run to asynchronously terminate the mesh due to timeout.
- *
- * @param cls the 'struct MeshClient'
- * @param tc scheduler context
- */ 
-static void
-timeout_mesh_task (void *cls,
-                    const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct MeshClient *sc = cls;
-
-  sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
-  terminate_mesh (sc);
-}
-
-
-/**
- * Reset the timeout for the mesh client (due to activity).
- *
- * @param sc client handle to reset timeout for
- */
-static void
-refresh_timeout_task (struct MeshClient *sc)
-{
-  if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
-    GNUNET_SCHEDULER_cancel (sc->timeout_task); 
-  sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
-                                                  &timeout_mesh_task,
-                                                  sc);
-}
-
-
-/**
- * We're done handling a request from a client, read the next one.
- *
- * @param sc client to continue reading requests from
- */
-static void
-continue_reading (struct MeshClient *sc)
-{
-  refresh_timeout_task (sc);
-}
-
-
-/**
- * Transmit the next entry from the write queue.
- *
- * @param sc where to process the write queue
- */
-static void
-continue_writing (struct MeshClient *sc);
-
-
-/**
- * Send a reply now, mesh is ready.
- *
- * @param cls closure with the struct MeshClient which sent the query
- * @param size number of bytes available in 'buf'
- * @param buf where to write the message
- * @return number of bytes written to 'buf'
- */
-static size_t
-write_continuation (void *cls,
-                   size_t size,
-                   void *buf)
-{
-  struct MeshClient *sc = cls;
-  struct WriteQueueItem *wqi;
-  size_t ret;
-
-  sc->wh = NULL;
-  if (NULL == (wqi = sc->wqi_head))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Write queue empty, reading more requests\n");
-    return 0;
-  }
-  if (0 == size)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Transmission of reply failed, terminating mesh\n");
-    terminate_mesh (sc);    
-    return 0;
-  }
-  GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
-                              sc->wqi_tail,
-                              wqi);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Transmitted %u byte reply via mesh\n",
-             (unsigned int) size);
-  GNUNET_STATISTICS_update (GSF_stats,
-                           gettext_noop ("# Blocks transferred via mesh"), 1,
-                           GNUNET_NO);
-  memcpy (buf, &wqi[1], ret = wqi->msize);
-  GNUNET_free (wqi);
-  continue_writing (sc);
-  return ret;
-}
-
-
-/**
- * Transmit the next entry from the write queue.
- *
- * @param sc where to process the write queue
- */
-static void
-continue_writing (struct MeshClient *sc)
-{
-  struct WriteQueueItem *wqi;
-
-  if (NULL != sc->wh)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Write pending, waiting for it to complete\n");
-    return; /* write already pending */
-  }
-  if (NULL == (wqi = sc->wqi_head))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Write queue empty, reading more requests\n");
-    continue_reading (sc);
-    return;
-  }
-  sc->wh = GNUNET_MESH_notify_transmit_ready (sc->socket, GNUNET_NO,
-                                             GNUNET_TIME_UNIT_FOREVER_REL,
-                                             wqi->msize,                       
              
-                                             &write_continuation,
-                                             sc);
-  if (NULL == sc->wh)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Write failed; terminating mesh\n");
-    terminate_mesh (sc);
-    return;
-  }
-}
-
-
-/**
- * Process a datum that was stored in the datastore.
- *
- * @param cls closure with the struct MeshClient which sent the query
- * @param key key for the content
- * @param size number of bytes in data
- * @param data content stored
- * @param type type of the content
- * @param priority priority of the content
- * @param anonymity anonymity-level for the content
- * @param expiration expiration time for the content
- * @param uid unique identifier for the datum;
- *        maybe 0 if no unique identifier is available
- */
-static void 
-handle_datastore_reply (void *cls,
-                       const struct GNUNET_HashCode * key,
-                       size_t size, const void *data,
-                       enum GNUNET_BLOCK_Type type,
-                       uint32_t priority,
-                       uint32_t anonymity,
-                       struct GNUNET_TIME_Absolute
-                       expiration, uint64_t uid)
-{
-  struct MeshClient *sc = cls;
-  size_t msize = size + sizeof (struct MeshReplyMessage);
-  struct WriteQueueItem *wqi;
-  struct MeshReplyMessage *srm;
-
-  sc->qe = NULL;
-  if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Performing on-demand encoding\n");
-    if (GNUNET_OK !=
-       GNUNET_FS_handle_on_demand_block (key,
-                                         size, data, type,
-                                         priority, anonymity,
-                                         expiration, uid,
-                                         &handle_datastore_reply,
-                                         sc))
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "On-demand encoding request failed\n");
-      continue_writing (sc);
-    }
-    return;
-  }
-  if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
-  {
-    GNUNET_break (0);
-    continue_writing (sc);
-    return;
-  }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Starting transmission of %u byte reply for query `%s' via 
mesh\n",
-             (unsigned int) size,
-             GNUNET_h2s (key));
-  wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
-  wqi->msize = msize;
-  srm = (struct MeshReplyMessage *) &wqi[1];
-  srm->header.size = htons ((uint16_t) msize);
-  srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MESH_REPLY);
-  srm->type = htonl (type);
-  srm->expiration = GNUNET_TIME_absolute_hton (expiration);
-  memcpy (&srm[1], data, size);
-  sc->reply_size = msize;
-  GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
-                              sc->wqi_tail,
-                              wqi);
-  continue_writing (sc);
-}
-
-
-/**
- * Functions with this signature are called whenever a
- * complete query message is received.
- *
- * Do not call GNUNET_SERVER_mst_destroy in callback
- *
- * @param cls closure with the 'struct MeshClient'
- * @param tunnel tunnel handle
- * @param tunnel_ctx tunnel context
- * @param message the actual message
- * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
- */
-static int
-request_cb (void *cls,
-           struct GNUNET_MESH_Tunnel *tunnel,
-           void **tunnel_ctx,
-           const struct GNUNET_MessageHeader *message)
-{
-  struct MeshClient *sc = *tunnel_ctx;
-  const struct MeshQueryMessage *sqm;
-
-  sqm = (const struct MeshQueryMessage *) message;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Received query for `%s' via mesh\n",
-             GNUNET_h2s (&sqm->query));
-  GNUNET_STATISTICS_update (GSF_stats,
-                           gettext_noop ("# queries received via mesh"), 1,
-                           GNUNET_NO);
-  refresh_timeout_task (sc);
-  sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
-                                    0,
-                                    &sqm->query,
-                                    ntohl (sqm->type),
-                                    0 /* priority */, 
-                                    GSF_datastore_queue_size,
-                                    GNUNET_TIME_UNIT_FOREVER_REL,
-                                    &handle_datastore_reply, sc);
-  if (NULL == sc->qe)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Queueing request with datastore failed (queue full?)\n");
-    continue_writing (sc);
-  }
-  return GNUNET_OK;
-}
-
-
-/**
- * Functions of this type are called upon new mesh connection from other peers.
- *
- * @param cls the closure from GNUNET_MESH_connect
- * @param socket the socket representing the mesh
- * @param initiator the identity of the peer who wants to establish a mesh
- *            with us; NULL on binding error
- * @param port mesh port used for the incoming connection
- * @return initial tunnel context (our 'struct MeshClient')
- */
-static void *
-accept_cb (void *cls,
-          struct GNUNET_MESH_Tunnel *socket,
-          const struct GNUNET_PeerIdentity *initiator,
-          uint32_t port)
-{
-  struct MeshClient *sc;
-
-  GNUNET_assert (NULL != socket);
-  if (sc_count >= sc_count_max)
-  {
-    GNUNET_STATISTICS_update (GSF_stats,
-                             gettext_noop ("# mesh client connections 
rejected"), 1,
-                             GNUNET_NO);
-    GNUNET_MESH_tunnel_destroy (socket);
-    return NULL;
-  }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Accepting inbound mesh connection from `%s'\n",
-             GNUNET_i2s (initiator));
-  GNUNET_STATISTICS_update (GSF_stats,
-                           gettext_noop ("# mesh connections active"), 1,
-                           GNUNET_NO);
-  sc = GNUNET_malloc (sizeof (struct MeshClient));
-  sc->socket = socket;
-  GNUNET_CONTAINER_DLL_insert (sc_head,
-                              sc_tail,
-                              sc);
-  sc_count++;
-  refresh_timeout_task (sc);
-  return sc;
-}
-
-
-/**
- * Initialize subsystem for non-anonymous file-sharing.
- */
-void
-GSF_mesh_start ()
-{
-  static const struct GNUNET_MESH_MessageHandler handlers[] = {
-    { &request_cb, GNUNET_MESSAGE_TYPE_FS_MESH_QUERY, sizeof (struct 
MeshQueryMessage)},
-    { &reply_cb, GNUNET_MESSAGE_TYPE_FS_MESH_REPLY, 0 },
-    { NULL, 0, 0 }
-  };
-  static const uint32_t ports[] = {
-    GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
-    0
-  };
-
-  mesh_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
-  if (GNUNET_YES ==
-      GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
-                                            "fs",
-                                            "MAX_MESH_CLIENTS",
-                                            &sc_count_max))
-  {
-    listen_socket = GNUNET_MESH_connect (GSF_cfg,
-                                        NULL,
-                                        &accept_cb,
-                                        NULL /* FIXME: have a cleanup 
callback? */,
-                                        handlers,
-                                        ports);
-  } 
-}
-
-
-/**
- * Function called on each active meshs to shut them down.
- *
- * @param cls NULL
- * @param key target peer, unused
- * @param value the 'struct MeshHandle' to destroy
- * @return GNUNET_YES (continue to iterate)
- */
-static int
-release_meshs (void *cls,
-                const struct GNUNET_HashCode *key,
-                void *value)
-{
-  struct MeshHandle *sh = value;
-
-  destroy_mesh_handle (sh);
-  return GNUNET_YES;
-}
-
-
-/**
- * Shutdown subsystem for non-anonymous file-sharing.
- */
-void
-GSF_mesh_stop ()
-{
-  struct MeshClient *sc;
-
-  while (NULL != (sc = sc_head))
-    terminate_mesh (sc);
-  GNUNET_CONTAINER_multihashmap_iterate (mesh_map,
-                                        &release_meshs,
-                                        NULL);
-  GNUNET_CONTAINER_multihashmap_destroy (mesh_map);
-  mesh_map = NULL;
-  if (NULL != listen_socket)
-  {
-    GNUNET_MESH_disconnect (listen_socket);
-    listen_socket = NULL;
-  }
-}
-
-/* end of gnunet-service-fs_mesh.c */

Modified: gnunet/src/fs/gnunet-service-fs_mesh.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs_mesh.h      2013-07-18 11:04:11 UTC (rev 
28141)
+++ gnunet/src/fs/gnunet-service-fs_mesh.h      2013-07-18 11:06:23 UTC (rev 
28142)
@@ -79,13 +79,81 @@
  * Initialize subsystem for non-anonymous file-sharing.
  */
 void
-GSF_mesh_start (void);
+GSF_mesh_start_server (void);
 
 
 /**
  * Shutdown subsystem for non-anonymous file-sharing.
  */
 void
-GSF_mesh_stop (void);
+GSF_mesh_stop_server (void);
 
+/**
+ * Initialize subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_mesh_start_client (void);
+
+
+/**
+ * Shutdown subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_mesh_stop_client (void);
+
+
+GNUNET_NETWORK_STRUCT_BEGIN
+
+/**
+ * Query from one peer, asking the other for CHK-data.
+ */
+struct MeshQueryMessage
+{
+
+  /**
+   * Type is GNUNET_MESSAGE_TYPE_FS_MESH_QUERY.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Block type must be DBLOCK or IBLOCK.
+   */
+  uint32_t type GNUNET_PACKED;
+
+  /**
+   * Query hash from CHK (hash of encrypted block).
+   */
+  struct GNUNET_HashCode query;
+
+};
+
+
+/**
+ * Reply to a MeshQueryMessage.
+ */
+struct MeshReplyMessage
+{
+
+  /**
+   * Type is GNUNET_MESSAGE_TYPE_FS_MESH_REPLY.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Block type must be DBLOCK or IBLOCK.
+   */
+  uint32_t type GNUNET_PACKED;
+
+  /**
+   * Expiration time for the block.
+   */
+  struct GNUNET_TIME_AbsoluteNBO expiration;
+
+  /* followed by the encrypted block */
+
+};
+
+GNUNET_NETWORK_STRUCT_END
+
+
 #endif

Copied: gnunet/src/fs/gnunet-service-fs_mesh_server.c (from rev 28132, 
gnunet/src/fs/gnunet-service-fs_mesh.c)
===================================================================
--- gnunet/src/fs/gnunet-service-fs_mesh_server.c                               
(rev 0)
+++ gnunet/src/fs/gnunet-service-fs_mesh_server.c       2013-07-18 11:06:23 UTC 
(rev 28142)
@@ -0,0 +1,589 @@
+/*
+     This file is part of GNUnet.
+     (C) 2012, 2013 Christian Grothoff (and other contributing authors)
+
+     GNUnet is free software; you can redistribute it and/or modify
+     it under the terms of the GNU General Public License as published
+     by the Free Software Foundation; either version 3, or (at your
+     option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     General Public License for more details.
+
+     You should have received a copy of the GNU General Public License
+     along with GNUnet; see the file COPYING.  If not, write to the
+     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+     Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file fs/gnunet-service-fs_mesh.c
+ * @brief non-anonymous file-transfer
+ * @author Christian Grothoff
+ *
+ * TODO:
+ * - MESH2 API doesn't allow flow control for server yet (needed!)
+ * - likely need to register clean up handler with mesh to handle
+ *   client disconnect (likely leaky right now)
+ * - server is optional, currently client code will NPE if we have
+ *   no server, again MESH2 API requirement forcing this for now
+ * - message handlers are symmetric for client/server, should be
+ *   separated (currently clients can get requests and servers can
+ *   handle answers, not good)
+ * - code is entirely untested
+ * - might have overlooked a few possible simplifications
+ * - PORT is set to old application type, unsure if we should keep
+ *   it that way (fine for now)
+ */
+#include "platform.h"
+#include "gnunet_constants.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_mesh_service.h"
+#include "gnunet_protocols.h"
+#include "gnunet_applications.h"
+#include "gnunet-service-fs.h"
+#include "gnunet-service-fs_indexing.h"
+#include "gnunet-service-fs_mesh.h"
+
+/**
+ * After how long do we termiante idle connections?
+ */
+#define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 
2)
+
+
+/**
+ * A message in the queue to be written to the mesh.
+ */
+struct WriteQueueItem
+{
+  /**
+   * Kept in a DLL.
+   */
+  struct WriteQueueItem *next;
+
+  /**
+   * Kept in a DLL.
+   */
+  struct WriteQueueItem *prev;
+
+  /**
+   * Number of bytes of payload, allocated at the end of this struct.
+   */
+  size_t msize;
+};
+
+
+/**
+ * Information we keep around for each active meshing client.
+ */
+struct MeshClient
+{
+  /**
+   * DLL
+   */ 
+  struct MeshClient *next;
+
+  /**
+   * DLL
+   */ 
+  struct MeshClient *prev;
+
+  /**
+   * Socket for communication.
+   */ 
+  struct GNUNET_MESH_Tunnel *socket;
+
+  /**
+   * Handle for active write operation, or NULL.
+   */ 
+  struct GNUNET_MESH_TransmitHandle *wh;
+
+  /**
+   * Head of write queue.
+   */
+  struct WriteQueueItem *wqi_head;
+
+  /**
+   * Tail of write queue.
+   */
+  struct WriteQueueItem *wqi_tail;
+  
+  /**
+   * Current active request to the datastore, if we have one pending.
+   */
+  struct GNUNET_DATASTORE_QueueEntry *qe;
+
+  /**
+   * Task that is scheduled to asynchronously terminate the connection.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier terminate_task;
+
+  /**
+   * Task that is scheduled to terminate idle connections.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
+  /**
+   * Size of the last write that was initiated.
+   */ 
+  size_t reply_size;
+
+};
+
+
+/**
+ * Listen socket for incoming requests.
+ */
+static struct GNUNET_MESH_Handle *listen_socket;
+
+/**
+ * Head of DLL of mesh clients.
+ */ 
+static struct MeshClient *sc_head;
+
+/**
+ * Tail of DLL of mesh clients.
+ */ 
+static struct MeshClient *sc_tail;
+
+/**
+ * Number of active mesh clients in the 'sc_*'-DLL.
+ */
+static unsigned int sc_count;
+
+/**
+ * Maximum allowed number of mesh clients.
+ */
+static unsigned long long sc_count_max;
+
+
+
+/* ********************* server-side code ************************* */
+
+
+/**
+ * We're done with a particular client, clean up.
+ *
+ * @param sc client to clean up
+ */
+static void
+terminate_mesh (struct MeshClient *sc)
+{
+  struct WriteQueueItem *wqi;
+
+  fprintf (stderr,
+          "terminate mesh called for %p\n",
+          sc);
+  GNUNET_STATISTICS_update (GSF_stats,
+                           gettext_noop ("# mesh connections active"), -1,
+                           GNUNET_NO);
+  if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
+    GNUNET_SCHEDULER_cancel (sc->terminate_task); 
+  if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
+    GNUNET_SCHEDULER_cancel (sc->timeout_task); 
+  if (NULL != sc->wh)
+    GNUNET_MESH_notify_transmit_ready_cancel (sc->wh);
+  if (NULL != sc->qe)
+    GNUNET_DATASTORE_cancel (sc->qe);
+  while (NULL != (wqi = sc->wqi_head))
+  {
+    GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
+                                sc->wqi_tail,
+                                wqi);
+    GNUNET_free (wqi);
+  }
+  GNUNET_CONTAINER_DLL_remove (sc_head,
+                              sc_tail,
+                              sc);
+  sc_count--;
+  GNUNET_free (sc);
+}
+
+
+/**
+ * Task run to asynchronously terminate the mesh due to timeout.
+ *
+ * @param cls the 'struct MeshClient'
+ * @param tc scheduler context
+ */ 
+static void
+timeout_mesh_task (void *cls,
+                    const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct MeshClient *sc = cls;
+  struct GNUNET_MESH_Tunnel *tun;
+
+  sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+  tun = sc->socket;
+  sc->socket = NULL;
+  GNUNET_MESH_tunnel_destroy (tun);
+}
+
+
+/**
+ * Reset the timeout for the mesh client (due to activity).
+ *
+ * @param sc client handle to reset timeout for
+ */
+static void
+refresh_timeout_task (struct MeshClient *sc)
+{
+  if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
+    GNUNET_SCHEDULER_cancel (sc->timeout_task); 
+  sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
+                                                  &timeout_mesh_task,
+                                                  sc);
+}
+
+
+/**
+ * We're done handling a request from a client, read the next one.
+ *
+ * @param sc client to continue reading requests from
+ */
+static void
+continue_reading (struct MeshClient *sc)
+{
+  refresh_timeout_task (sc);
+  GNUNET_MESH_receive_done (sc->socket);
+}
+
+
+/**
+ * Transmit the next entry from the write queue.
+ *
+ * @param sc where to process the write queue
+ */
+static void
+continue_writing (struct MeshClient *sc);
+
+
+/**
+ * Send a reply now, mesh is ready.
+ *
+ * @param cls closure with the struct MeshClient which sent the query
+ * @param size number of bytes available in 'buf'
+ * @param buf where to write the message
+ * @return number of bytes written to 'buf'
+ */
+static size_t
+write_continuation (void *cls,
+                   size_t size,
+                   void *buf)
+{
+  struct MeshClient *sc = cls;
+  struct WriteQueueItem *wqi;
+  size_t ret;
+
+  sc->wh = NULL;
+  if (NULL == (wqi = sc->wqi_head))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Write queue empty, reading more requests\n");
+    return 0;
+  }
+  if (0 == size)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Transmission of reply failed, terminating mesh\n");
+    terminate_mesh (sc);    
+    return 0;
+  }
+  GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
+                              sc->wqi_tail,
+                              wqi);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Transmitted %u byte reply via mesh\n",
+             (unsigned int) size);
+  GNUNET_STATISTICS_update (GSF_stats,
+                           gettext_noop ("# Blocks transferred via mesh"), 1,
+                           GNUNET_NO);
+  memcpy (buf, &wqi[1], ret = wqi->msize);
+  GNUNET_free (wqi);
+  continue_writing (sc);
+  return ret;
+}
+
+
+/**
+ * Transmit the next entry from the write queue.
+ *
+ * @param sc where to process the write queue
+ */
+static void
+continue_writing (struct MeshClient *sc)
+{
+  struct WriteQueueItem *wqi;
+
+  if (NULL != sc->wh)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Write pending, waiting for it to complete\n");
+    return; /* write already pending */
+  }
+  if (NULL == (wqi = sc->wqi_head))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Write queue empty, reading more requests\n");
+    continue_reading (sc);
+    return;
+  }
+  sc->wh = GNUNET_MESH_notify_transmit_ready (sc->socket, GNUNET_NO,
+                                             GNUNET_TIME_UNIT_FOREVER_REL,
+                                             wqi->msize,                       
              
+                                             &write_continuation,
+                                             sc);
+  if (NULL == sc->wh)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Write failed; terminating mesh\n");
+    terminate_mesh (sc);
+    return;
+  }
+}
+
+
+/**
+ * Process a datum that was stored in the datastore.
+ *
+ * @param cls closure with the struct MeshClient which sent the query
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ *        maybe 0 if no unique identifier is available
+ */
+static void 
+handle_datastore_reply (void *cls,
+                       const struct GNUNET_HashCode * key,
+                       size_t size, const void *data,
+                       enum GNUNET_BLOCK_Type type,
+                       uint32_t priority,
+                       uint32_t anonymity,
+                       struct GNUNET_TIME_Absolute
+                       expiration, uint64_t uid)
+{
+  struct MeshClient *sc = cls;
+  size_t msize = size + sizeof (struct MeshReplyMessage);
+  struct WriteQueueItem *wqi;
+  struct MeshReplyMessage *srm;
+
+  sc->qe = NULL;
+  if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Performing on-demand encoding\n");
+    if (GNUNET_OK !=
+       GNUNET_FS_handle_on_demand_block (key,
+                                         size, data, type,
+                                         priority, anonymity,
+                                         expiration, uid,
+                                         &handle_datastore_reply,
+                                         sc))
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "On-demand encoding request failed\n");
+      continue_writing (sc);
+    }
+    return;
+  }
+  if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
+  {
+    GNUNET_break (0);
+    continue_writing (sc);
+    return;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Starting transmission of %u byte reply for query `%s' via 
mesh\n",
+             (unsigned int) size,
+             GNUNET_h2s (key));
+  wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
+  wqi->msize = msize;
+  srm = (struct MeshReplyMessage *) &wqi[1];
+  srm->header.size = htons ((uint16_t) msize);
+  srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MESH_REPLY);
+  srm->type = htonl (type);
+  srm->expiration = GNUNET_TIME_absolute_hton (expiration);
+  memcpy (&srm[1], data, size);
+  sc->reply_size = msize;
+  GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
+                              sc->wqi_tail,
+                              wqi);
+  continue_writing (sc);
+}
+
+
+/**
+ * Functions with this signature are called whenever a
+ * complete query message is received.
+ *
+ * Do not call GNUNET_SERVER_mst_destroy in callback
+ *
+ * @param cls closure with the 'struct MeshClient'
+ * @param tunnel tunnel handle
+ * @param tunnel_ctx tunnel context
+ * @param message the actual message
+ * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
+ */
+static int
+request_cb (void *cls,
+           struct GNUNET_MESH_Tunnel *tunnel,
+           void **tunnel_ctx,
+           const struct GNUNET_MessageHeader *message)
+{
+  struct MeshClient *sc = *tunnel_ctx;
+  const struct MeshQueryMessage *sqm;
+
+  fprintf (stderr,
+          "Request gets %p\n", 
+          sc);
+  sqm = (const struct MeshQueryMessage *) message;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received query for `%s' via mesh\n",
+             GNUNET_h2s (&sqm->query));
+  GNUNET_STATISTICS_update (GSF_stats,
+                           gettext_noop ("# queries received via mesh"), 1,
+                           GNUNET_NO);
+  refresh_timeout_task (sc);
+  sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
+                                    0,
+                                    &sqm->query,
+                                    ntohl (sqm->type),
+                                    0 /* priority */, 
+                                    GSF_datastore_queue_size,
+                                    GNUNET_TIME_UNIT_FOREVER_REL,
+                                    &handle_datastore_reply, sc);
+  if (NULL == sc->qe)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Queueing request with datastore failed (queue full?)\n");
+    continue_writing (sc);
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Functions of this type are called upon new mesh connection from other peers.
+ *
+ * @param cls the closure from GNUNET_MESH_connect
+ * @param socket the socket representing the mesh
+ * @param initiator the identity of the peer who wants to establish a mesh
+ *            with us; NULL on binding error
+ * @param port mesh port used for the incoming connection
+ * @return initial tunnel context (our 'struct MeshClient')
+ */
+static void *
+accept_cb (void *cls,
+          struct GNUNET_MESH_Tunnel *socket,
+          const struct GNUNET_PeerIdentity *initiator,
+          uint32_t port)
+{
+  struct MeshClient *sc;
+
+  GNUNET_assert (NULL != socket);
+  if (sc_count >= sc_count_max)
+  {
+    GNUNET_STATISTICS_update (GSF_stats,
+                             gettext_noop ("# mesh client connections 
rejected"), 1,
+                             GNUNET_NO);
+    GNUNET_MESH_tunnel_destroy (socket);
+    return NULL;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Accepting inbound mesh connection from `%s'\n",
+             GNUNET_i2s (initiator));
+  GNUNET_STATISTICS_update (GSF_stats,
+                           gettext_noop ("# mesh connections active"), 1,
+                           GNUNET_NO);
+  sc = GNUNET_new (struct MeshClient);
+  sc->socket = socket;
+  GNUNET_CONTAINER_DLL_insert (sc_head,
+                              sc_tail,
+                              sc);
+  sc_count++;
+  refresh_timeout_task (sc);
+  fprintf (stderr,
+          "Accept returns %p\n", 
+          sc);
+  return sc;
+}
+
+
+/**
+ * Function called by mesh when a client disconnects.
+ * Cleans up our 'struct MeshClient' of that tunnel.
+ *
+ * @param cls NULL
+ * @param tunnel tunnel of the disconnecting client
+ * @param tunnel_ctx our 'struct MeshClient' 
+ */
+static void
+cleaner_cb (void *cls,
+           const struct GNUNET_MESH_Tunnel *tunnel,
+           void *tunnel_ctx)
+{
+  struct MeshClient *sc = tunnel_ctx;
+
+  fprintf (stderr,
+          "Cleaner called with %p\n", 
+          sc);
+  if (NULL != sc)
+    terminate_mesh (sc);
+}
+
+
+/**
+ * Initialize subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_mesh_start_server ()
+{
+  static const struct GNUNET_MESH_MessageHandler handlers[] = {
+    { &request_cb, GNUNET_MESSAGE_TYPE_FS_MESH_QUERY, sizeof (struct 
MeshQueryMessage)},
+    { NULL, 0, 0 }
+  };
+  static const uint32_t ports[] = {
+    GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
+    0
+  };
+
+  if (GNUNET_YES !=
+      GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
+                                            "fs",
+                                            "MAX_MESH_CLIENTS",
+                                            &sc_count_max))
+    return;
+  listen_socket = GNUNET_MESH_connect (GSF_cfg,
+                                      NULL,
+                                      &accept_cb,
+                                      &cleaner_cb,
+                                      handlers,
+                                      ports);
+}
+
+
+/**
+ * Shutdown subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_mesh_stop_server ()
+{
+  struct MeshClient *sc;
+
+  while (NULL != (sc = sc_head))
+    terminate_mesh (sc);
+  if (NULL != listen_socket)
+  {
+    GNUNET_MESH_disconnect (listen_socket);
+    listen_socket = NULL;
+  }
+}
+
+/* end of gnunet-service-fs_mesh.c */

Modified: gnunet/src/fs/test_gnunet_service_fs_p2p_mesh.conf
===================================================================
--- gnunet/src/fs/test_gnunet_service_fs_p2p_mesh.conf  2013-07-18 11:04:11 UTC 
(rev 28141)
+++ gnunet/src/fs/test_gnunet_service_fs_p2p_mesh.conf  2013-07-18 11:06:23 UTC 
(rev 28142)
@@ -14,3 +14,7 @@
 # (may improve anonymity, probably not a good idea if content_caching is NO)
 CONTENT_PUSHING = NO
 
+#PREFIX = valgrind
+
+[mesh]
+#PREFIX = valgrind




reply via email to

[Prev in Thread] Current Thread [Next in Thread]