gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r28145 - gnunet/src/fs


From: gnunet
Subject: [GNUnet-SVN] r28145 - gnunet/src/fs
Date: Thu, 18 Jul 2013 13:31:34 +0200

Author: grothoff
Date: 2013-07-18 13:31:34 +0200 (Thu, 18 Jul 2013)
New Revision: 28145

Added:
   gnunet/src/fs/gnunet-service-fs_mesh_client.c
Modified:
   gnunet/src/fs/gnunet-service-fs_mesh_server.c
Log:
-missing file, more cleanup

Added: gnunet/src/fs/gnunet-service-fs_mesh_client.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_mesh_client.c                               
(rev 0)
+++ gnunet/src/fs/gnunet-service-fs_mesh_client.c       2013-07-18 11:31:34 UTC 
(rev 28145)
@@ -0,0 +1,743 @@
+/*
+     This file is part of GNUnet.
+     (C) 2012, 2013 Christian Grothoff (and other contributing authors)
+
+     GNUnet is free software; you can redistribute it and/or modify
+     it under the terms of the GNU General Public License as published
+     by the Free Software Foundation; either version 3, or (at your
+     option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     General Public License for more details.
+
+     You should have received a copy of the GNU General Public License
+     along with GNUnet; see the file COPYING.  If not, write to the
+     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+     Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file fs/gnunet-service-fs_mesh_client.c
+ * @brief non-anonymous file-transfer
+ * @author Christian Grothoff
+ *
+ * TODO:
+ * - PORT is set to old application type, unsure if we should keep
+ *   it that way (fine for now)
+ */
+#include "platform.h"
+#include "gnunet_constants.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_mesh_service.h"
+#include "gnunet_protocols.h"
+#include "gnunet_applications.h"
+#include "gnunet-service-fs.h"
+#include "gnunet-service-fs_indexing.h"
+#include "gnunet-service-fs_mesh.h"
+
+
+/**
+ * After how long do we reset connections without replies?
+ */
+#define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS, 30)
+
+
+/** 
+ * Handle for a mesh to another peer.
+ */
+struct MeshHandle;
+
+
+/**
+ * Handle for a request that is going out via mesh API.
+ */
+struct GSF_MeshRequest
+{
+
+  /**
+   * DLL.
+   */
+  struct GSF_MeshRequest *next;
+
+  /**
+   * DLL.
+   */
+  struct GSF_MeshRequest *prev;
+
+  /**
+   * Which mesh is this request associated with?
+   */
+  struct MeshHandle *mh;
+
+  /**
+   * Function to call with the result.
+   */
+  GSF_MeshReplyProcessor proc;
+
+  /**
+   * Closure for 'proc'
+   */
+  void *proc_cls;
+
+  /**
+   * Query to transmit to the other peer.
+   */
+  struct GNUNET_HashCode query;
+
+  /**
+   * Desired type for the reply.
+   */
+  enum GNUNET_BLOCK_Type type;
+
+  /**
+   * Did we transmit this request already? YES if we are
+   * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
+   */
+  int was_transmitted;
+};
+
+
+/** 
+ * Handle for a mesh to another peer.
+ */
+struct MeshHandle
+{
+  /**
+   * Head of DLL of pending requests on this mesh.
+   */
+  struct GSF_MeshRequest *pending_head;
+
+  /**
+   * Tail of DLL of pending requests on this mesh.
+   */
+  struct GSF_MeshRequest *pending_tail;
+
+  /**
+   * Map from query to 'struct GSF_MeshRequest's waiting for
+   * a reply.
+   */
+  struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
+
+  /**
+   * Tunnel to the other peer.
+   */
+  struct GNUNET_MESH_Tunnel *tunnel;
+
+  /**
+   * Handle for active write operation, or NULL.
+   */ 
+  struct GNUNET_MESH_TransmitHandle *wh;
+
+  /**
+   * Which peer does this mesh go to?
+   */ 
+  struct GNUNET_PeerIdentity target;
+
+  /**
+   * Task to kill inactive meshs (we keep them around for
+   * a few seconds to give the application a chance to give
+   * us another query).
+   */
+  GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
+  /**
+   * Task to reset meshs that had errors (asynchronously,
+   * as we may not be able to do it immediately during a
+   * callback from the mesh API).
+   */
+  GNUNET_SCHEDULER_TaskIdentifier reset_task;
+
+};
+
+
+/**
+ * Mesh tunnel for creating outbound tunnels.
+ */
+static struct GNUNET_MESH_Handle *mesh_tunnel;
+
+/**
+ * Map from peer identities to 'struct MeshHandles' with mesh
+ * tunnels to those peers.
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *mesh_map;
+
+
+/* ********************* client-side code ************************* */
+
+
+/**
+ * Transmit pending requests via the mesh.
+ *
+ * @param mh mesh to process
+ */
+static void
+transmit_pending (struct MeshHandle *mh);
+
+
+/**
+ * Iterator called on each entry in a waiting map to 
+ * move it back to the pending list.
+ *
+ * @param cls the 'struct MeshHandle'
+ * @param key the key of the entry in the map (the query)
+ * @param value the 'struct GSF_MeshRequest' to move to pending
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+move_to_pending (void *cls,
+                const struct GNUNET_HashCode *key,
+                void *value)
+{
+  struct MeshHandle *mh = cls;
+  struct GSF_MeshRequest *sr = value;
+  
+  GNUNET_assert (GNUNET_YES ==
+                GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
+                                                      key,
+                                                      value));
+  GNUNET_CONTAINER_DLL_insert (mh->pending_head,
+                              mh->pending_tail,
+                              sr);
+  sr->was_transmitted = GNUNET_NO;
+  return GNUNET_YES;
+}
+
+
+/**
+ * We had a serious error, tear down and re-create mesh from scratch.
+ *
+ * @param mh mesh to reset
+ */
+static void
+reset_mesh (struct MeshHandle *mh)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Resetting mesh tunnel to %s\n",
+             GNUNET_i2s (&mh->target));
+  GNUNET_MESH_tunnel_destroy (mh->tunnel);
+  GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
+                                        &move_to_pending,
+                                        mh);
+  mh->tunnel = GNUNET_MESH_tunnel_create (mesh_tunnel,
+                                       mh,
+                                       &mh->target,
+                                       
GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
+                                       GNUNET_YES,
+                                       GNUNET_YES);
+}
+
+
+/**
+ * Task called when it is time to destroy an inactive mesh tunnel.
+ *
+ * @param cls the 'struct MeshHandle' to tear down
+ * @param tc scheduler context, unused
+ */
+static void
+mesh_timeout (void *cls,
+             const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct MeshHandle *mh = cls;
+  struct GNUNET_MESH_Tunnel *tun;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Timeout on mesh tunnel to %s\n",
+             GNUNET_i2s (&mh->target));
+  mh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+  tun = mh->tunnel;
+  mh->tunnel = NULL;
+  GNUNET_MESH_tunnel_destroy (tun);
+}
+
+
+/**
+ * Task called when it is time to reset an mesh.
+ *
+ * @param cls the 'struct MeshHandle' to tear down
+ * @param tc scheduler context, unused
+ */
+static void
+reset_mesh_task (void *cls,
+                const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct MeshHandle *mh = cls;
+
+  mh->reset_task = GNUNET_SCHEDULER_NO_TASK;
+  reset_mesh (mh);
+}
+
+
+/**
+ * We had a serious error, tear down and re-create mesh from scratch,
+ * but do so asynchronously.
+ *
+ * @param mh mesh to reset
+ */
+static void
+reset_mesh_async (struct MeshHandle *mh)
+{
+  if (GNUNET_SCHEDULER_NO_TASK != mh->reset_task)
+    GNUNET_SCHEDULER_cancel (mh->reset_task);
+  mh->reset_task = GNUNET_SCHEDULER_add_now (&reset_mesh_task,
+                                            mh);
+}
+
+
+/**
+ * Functions of this signature are called whenever we are ready to transmit
+ * query via a mesh.
+ *
+ * @param cls the struct MeshHandle for which we did the write call
+ * @param size the number of bytes that can be written to 'buf'
+ * @param buf where to write the message
+ * @return number of bytes written to 'buf'
+ */
+static size_t
+transmit_sqm (void *cls,
+             size_t size,
+             void *buf)
+{
+  struct MeshHandle *mh = cls;
+  struct MeshQueryMessage sqm;
+  struct GSF_MeshRequest *sr;
+
+  mh->wh = NULL;
+  if (NULL == buf)
+  {
+    reset_mesh (mh);
+    return 0;
+  }
+  sr = mh->pending_head;
+  if (NULL == sr)
+    return 0;
+  GNUNET_assert (size >= sizeof (struct MeshQueryMessage));
+  GNUNET_CONTAINER_DLL_remove (mh->pending_head,
+                              mh->pending_tail,
+                              sr);
+  GNUNET_CONTAINER_multihashmap_put (mh->waiting_map,
+                                    &sr->query,
+                                    sr,
+                                    
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Sending query for %s via mesh to %s\n",
+             GNUNET_h2s (&sr->query),
+             GNUNET_i2s (&mh->target));
+  sr->was_transmitted = GNUNET_YES;
+  sqm.header.size = htons (sizeof (sqm));
+  sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MESH_QUERY);
+  sqm.type = htonl (sr->type);
+  sqm.query = sr->query;
+  memcpy (buf, &sqm, sizeof (sqm));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Successfully transmitted %u bytes via mesh to %s\n",
+             (unsigned int) size,
+             GNUNET_i2s (&mh->target));
+  transmit_pending (mh);
+  return sizeof (sqm);
+}
+         
+
+/**
+ * Transmit pending requests via the mesh.
+ *
+ * @param mh mesh to process
+ */
+static void
+transmit_pending (struct MeshHandle *mh)
+{
+  if (NULL != mh->wh)
+    return;
+  mh->wh = GNUNET_MESH_notify_transmit_ready (mh->tunnel, GNUNET_YES /* allow 
cork */,
+                                             GNUNET_TIME_UNIT_FOREVER_REL,
+                                             sizeof (struct MeshQueryMessage),
+                                             &transmit_sqm, mh);
+}
+
+
+/**
+ * Closure for 'handle_reply'.
+ */
+struct HandleReplyClosure
+{
+
+  /**
+   * Reply payload.
+   */ 
+  const void *data;
+
+  /**
+   * Expiration time for the block.
+   */
+  struct GNUNET_TIME_Absolute expiration;
+
+  /**
+   * Number of bytes in 'data'.
+   */
+  size_t data_size;
+
+  /** 
+   * Type of the block.
+   */
+  enum GNUNET_BLOCK_Type type;
+  
+  /**
+   * Did we have a matching query?
+   */
+  int found;
+};
+
+
+/**
+ * Iterator called on each entry in a waiting map to 
+ * process a result.
+ *
+ * @param cls the 'struct HandleReplyClosure'
+ * @param key the key of the entry in the map (the query)
+ * @param value the 'struct GSF_MeshRequest' to handle result for
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+handle_reply (void *cls,
+             const struct GNUNET_HashCode *key,
+             void *value)
+{
+  struct HandleReplyClosure *hrc = cls;
+  struct GSF_MeshRequest *sr = value;
+  
+  sr->proc (sr->proc_cls,
+           hrc->type,
+           hrc->expiration,
+           hrc->data_size,
+           hrc->data);
+  GSF_mesh_query_cancel (sr);
+  hrc->found = GNUNET_YES;
+  return GNUNET_YES;
+}
+
+
+/**
+ * Functions with this signature are called whenever a complete reply
+ * is received.
+ *
+ * @param cls closure with the 'struct MeshHandle'
+ * @param tunnel tunnel handle
+ * @param tunnel_ctx tunnel context
+ * @param message the actual message
+ * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
+ */
+static int
+reply_cb (void *cls,
+         struct GNUNET_MESH_Tunnel *tunnel,
+         void **tunnel_ctx,
+          const struct GNUNET_MessageHeader *message)
+{
+  struct MeshHandle *mh = *tunnel_ctx;
+  const struct MeshReplyMessage *srm;
+  struct HandleReplyClosure hrc;
+  uint16_t msize;
+  enum GNUNET_BLOCK_Type type;
+  struct GNUNET_HashCode query;
+
+  msize = ntohs (message->size);
+  if (sizeof (struct MeshReplyMessage) > msize)
+  {
+    GNUNET_break_op (0);
+    reset_mesh_async (mh);
+    return GNUNET_SYSERR;
+  }
+  srm = (const struct MeshReplyMessage *) message;
+  msize -= sizeof (struct MeshReplyMessage);
+  type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
+  if (GNUNET_YES !=
+      GNUNET_BLOCK_get_key (GSF_block_ctx,
+                           type,
+                           &srm[1], msize, &query))
+  {
+    GNUNET_break_op (0); 
+    reset_mesh_async (mh);
+    return GNUNET_SYSERR;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received reply `%s' via mesh from peer %s\n",
+             GNUNET_h2s (&query),
+             GNUNET_i2s (&mh->target));
+  GNUNET_STATISTICS_update (GSF_stats,
+                           gettext_noop ("# replies received via mesh"), 1,
+                           GNUNET_NO);
+  hrc.data = &srm[1];
+  hrc.data_size = msize;
+  hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
+  hrc.type = type;
+  hrc.found = GNUNET_NO;
+  GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map,
+                                             &query,
+                                             &handle_reply,
+                                             &hrc);
+  if (GNUNET_NO == hrc.found)
+  {
+    GNUNET_STATISTICS_update (GSF_stats,
+                             gettext_noop ("# replies received via mesh 
dropped"), 1,
+                             GNUNET_NO);
+    return GNUNET_OK;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Get (or create) a mesh to talk to the given peer.
+ *
+ * @param target peer we want to communicate with
+ */
+static struct MeshHandle *
+get_mesh (const struct GNUNET_PeerIdentity *target)
+{
+  struct MeshHandle *mh;
+
+  mh = GNUNET_CONTAINER_multihashmap_get (mesh_map,
+                                         &target->hashPubKey);
+  if (NULL != mh)
+  {
+    if (GNUNET_SCHEDULER_NO_TASK != mh->timeout_task)
+    {
+      GNUNET_SCHEDULER_cancel (mh->timeout_task);
+      mh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+    }
+    return mh;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Creating mesh tunnel to %s\n",
+             GNUNET_i2s (target));
+  mh = GNUNET_new (struct MeshHandle);
+  mh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
+                                                &reset_mesh_task,
+                                                mh);
+  mh->waiting_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
+  mh->target = *target;
+  mh->tunnel = GNUNET_MESH_tunnel_create (mesh_tunnel,
+                                       mh,
+                                       &mh->target,
+                                       
GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
+                                       GNUNET_NO,
+                                       GNUNET_YES);
+  GNUNET_assert (GNUNET_OK ==
+                GNUNET_CONTAINER_multihashmap_put (mesh_map,
+                                                   &mh->target.hashPubKey,
+                                                   mh,
+                                                   
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  return mh;
+}
+
+
+/**
+ * Look for a block by directly contacting a particular peer.
+ *
+ * @param target peer that should have the block
+ * @param query hash to query for the block
+ * @param type desired type for the block
+ * @param proc function to call with result
+ * @param proc_cls closure for 'proc'
+ * @return handle to cancel the operation
+ */
+struct GSF_MeshRequest *
+GSF_mesh_query (const struct GNUNET_PeerIdentity *target,
+               const struct GNUNET_HashCode *query,
+               enum GNUNET_BLOCK_Type type,
+               GSF_MeshReplyProcessor proc, void *proc_cls)
+{
+  struct MeshHandle *mh;
+  struct GSF_MeshRequest *sr;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Preparing to send query for %s via mesh to %s\n",
+             GNUNET_h2s (query),
+             GNUNET_i2s (target));
+  mh = get_mesh (target);
+  sr = GNUNET_new (struct GSF_MeshRequest);
+  sr->mh = mh;
+  sr->proc = proc;
+  sr->proc_cls = proc_cls;
+  sr->type = type;
+  sr->query = *query;
+  GNUNET_CONTAINER_DLL_insert (mh->pending_head,
+                              mh->pending_tail,
+                              sr);
+  transmit_pending (mh);
+  return sr;
+}
+
+
+/**
+ * Cancel an active request; must not be called after 'proc'
+ * was calld.
+ *
+ * @param sr request to cancel
+ */
+void
+GSF_mesh_query_cancel (struct GSF_MeshRequest *sr)
+{
+  struct MeshHandle *mh = sr->mh;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Cancelled query for %s via mesh to %s\n",
+             GNUNET_h2s (&sr->query),
+             GNUNET_i2s (&sr->mh->target));
+  if (GNUNET_YES == sr->was_transmitted)
+    GNUNET_assert (GNUNET_OK ==
+                  GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
+                                                        &sr->query,
+                                                        sr));
+  else
+    GNUNET_CONTAINER_DLL_remove (mh->pending_head,
+                                mh->pending_tail,
+                                sr);
+  GNUNET_free (sr);
+  if ( (0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)) &&
+       (NULL == mh->pending_head) )
+    mh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
+                                                    &mesh_timeout,
+                                                    mh);
+}
+
+
+/**
+ * Iterator called on each entry in a waiting map to 
+ * call the 'proc' continuation and release associated
+ * resources.
+ *
+ * @param cls the 'struct MeshHandle'
+ * @param key the key of the entry in the map (the query)
+ * @param value the 'struct GSF_MeshRequest' to clean up
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+free_waiting_entry (void *cls,
+                   const struct GNUNET_HashCode *key,
+                   void *value)
+{
+  struct GSF_MeshRequest *sr = value;
+
+  sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
+           GNUNET_TIME_UNIT_FOREVER_ABS,
+           0, NULL);
+  GSF_mesh_query_cancel (sr);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Function called by mesh when a client disconnects.
+ * Cleans up our 'struct MeshClient' of that tunnel.
+ *
+ * @param cls NULL
+ * @param tunnel tunnel of the disconnecting client
+ * @param tunnel_ctx our 'struct MeshClient' 
+ */
+static void
+cleaner_cb (void *cls,
+           const struct GNUNET_MESH_Tunnel *tunnel,
+           void *tunnel_ctx)
+{
+  struct MeshHandle *mh = tunnel_ctx;
+  struct GSF_MeshRequest *sr;
+
+  mh->tunnel = NULL;
+  while (NULL != (sr = mh->pending_head))
+  {
+    sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
+             GNUNET_TIME_UNIT_FOREVER_ABS,
+             0, NULL);
+    GSF_mesh_query_cancel (sr);
+  }
+  GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
+                                        &free_waiting_entry,
+                                        mh);
+  if (NULL != mh->wh)
+    GNUNET_MESH_notify_transmit_ready_cancel (mh->wh);
+  if (GNUNET_SCHEDULER_NO_TASK != mh->timeout_task)
+    GNUNET_SCHEDULER_cancel (mh->timeout_task);
+  if (GNUNET_SCHEDULER_NO_TASK != mh->reset_task)
+    GNUNET_SCHEDULER_cancel (mh->reset_task);
+  GNUNET_assert (GNUNET_OK ==
+                GNUNET_CONTAINER_multihashmap_remove (mesh_map,
+                                                      &mh->target.hashPubKey,
+                                                      mh));
+  GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
+  GNUNET_free (mh);
+}
+
+
+/**
+ * Initialize subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_mesh_start_client ()
+{
+  static const struct GNUNET_MESH_MessageHandler handlers[] = {
+    { &reply_cb, GNUNET_MESSAGE_TYPE_FS_MESH_REPLY, 0 },
+    { NULL, 0, 0 }
+  };
+  static const uint32_t ports[] = {
+    GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
+    0
+  };
+
+  mesh_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
+  mesh_tunnel = GNUNET_MESH_connect (GSF_cfg,
+                                        NULL,
+                                        NULL,
+                                        &cleaner_cb,
+                                        handlers,
+                                        ports);
+}
+
+
+/**
+ * Function called on each active meshs to shut them down.
+ *
+ * @param cls NULL
+ * @param key target peer, unused
+ * @param value the 'struct MeshHandle' to destroy
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+release_meshs (void *cls,
+              const struct GNUNET_HashCode *key,
+              void *value)
+{
+  struct MeshHandle *mh = value;
+  struct GNUNET_MESH_Tunnel *tun;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Timeout on mesh tunnel to %s\n",
+             GNUNET_i2s (&mh->target));
+  tun = mh->tunnel;
+  mh->tunnel = NULL;
+  if (NULL != tun)
+    GNUNET_MESH_tunnel_destroy (tun);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Shutdown subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_mesh_stop_client ()
+{
+  GNUNET_CONTAINER_multihashmap_iterate (mesh_map,
+                                        &release_meshs,
+                                        NULL);
+  GNUNET_CONTAINER_multihashmap_destroy (mesh_map);
+  mesh_map = NULL;
+  if (NULL != mesh_tunnel)
+  {
+    GNUNET_MESH_disconnect (mesh_tunnel);
+    mesh_tunnel = NULL;
+  }
+}
+
+
+/* end of gnunet-service-fs_mesh_client.c */

Modified: gnunet/src/fs/gnunet-service-fs_mesh_server.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_mesh_server.c       2013-07-18 11:19:51 UTC 
(rev 28144)
+++ gnunet/src/fs/gnunet-service-fs_mesh_server.c       2013-07-18 11:31:34 UTC 
(rev 28145)
@@ -24,16 +24,6 @@
  * @author Christian Grothoff
  *
  * TODO:
- * - MESH2 API doesn't allow flow control for server yet (needed!)
- * - likely need to register clean up handler with mesh to handle
- *   client disconnect (likely leaky right now)
- * - server is optional, currently client code will NPE if we have
- *   no server, again MESH2 API requirement forcing this for now
- * - message handlers are symmetric for client/server, should be
- *   separated (currently clients can get requests and servers can
- *   handle answers, not good)
- * - code is entirely untested
- * - might have overlooked a few possible simplifications
  * - PORT is set to old application type, unsure if we should keep
  *   it that way (fine for now)
  */
@@ -91,9 +81,9 @@
   struct MeshClient *prev;
 
   /**
-   * Socket for communication.
+   * Tunnel for communication.
    */ 
-  struct GNUNET_MESH_Tunnel *socket;
+  struct GNUNET_MESH_Tunnel *tunnel;
 
   /**
    * Handle for active write operation, or NULL.
@@ -134,9 +124,9 @@
 
 
 /**
- * Listen socket for incoming requests.
+ * Listen tunnel for incoming requests.
  */
-static struct GNUNET_MESH_Handle *listen_socket;
+static struct GNUNET_MESH_Handle *listen_tunnel;
 
 /**
  * Head of DLL of mesh clients.
@@ -160,49 +150,7 @@
 
 
 
-/* ********************* server-side code ************************* */
-
-
 /**
- * We're done with a particular client, clean up.
- *
- * @param sc client to clean up
- */
-static void
-terminate_mesh (struct MeshClient *sc)
-{
-  struct WriteQueueItem *wqi;
-
-  fprintf (stderr,
-          "terminate mesh called for %p\n",
-          sc);
-  GNUNET_STATISTICS_update (GSF_stats,
-                           gettext_noop ("# mesh connections active"), -1,
-                           GNUNET_NO);
-  if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
-    GNUNET_SCHEDULER_cancel (sc->terminate_task); 
-  if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
-    GNUNET_SCHEDULER_cancel (sc->timeout_task); 
-  if (NULL != sc->wh)
-    GNUNET_MESH_notify_transmit_ready_cancel (sc->wh);
-  if (NULL != sc->qe)
-    GNUNET_DATASTORE_cancel (sc->qe);
-  while (NULL != (wqi = sc->wqi_head))
-  {
-    GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
-                                sc->wqi_tail,
-                                wqi);
-    GNUNET_free (wqi);
-  }
-  GNUNET_CONTAINER_DLL_remove (sc_head,
-                              sc_tail,
-                              sc);
-  sc_count--;
-  GNUNET_free (sc);
-}
-
-
-/**
  * Task run to asynchronously terminate the mesh due to timeout.
  *
  * @param cls the 'struct MeshClient'
@@ -216,8 +164,8 @@
   struct GNUNET_MESH_Tunnel *tun;
 
   sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
-  tun = sc->socket;
-  sc->socket = NULL;
+  tun = sc->tunnel;
+  sc->tunnel = NULL;
   GNUNET_MESH_tunnel_destroy (tun);
 }
 
@@ -247,7 +195,7 @@
 continue_reading (struct MeshClient *sc)
 {
   refresh_timeout_task (sc);
-  GNUNET_MESH_receive_done (sc->socket);
+  GNUNET_MESH_receive_done (sc->tunnel);
 }
 
 
@@ -274,6 +222,7 @@
                    void *buf)
 {
   struct MeshClient *sc = cls;
+  struct GNUNET_MESH_Tunnel *tun;
   struct WriteQueueItem *wqi;
   size_t ret;
 
@@ -288,7 +237,9 @@
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                "Transmission of reply failed, terminating mesh\n");
-    terminate_mesh (sc);    
+    tun = sc->tunnel;
+    sc->tunnel = NULL;
+    GNUNET_MESH_tunnel_destroy (tun);
     return 0;
   }
   GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
@@ -316,6 +267,7 @@
 continue_writing (struct MeshClient *sc)
 {
   struct WriteQueueItem *wqi;
+  struct GNUNET_MESH_Tunnel *tun;
 
   if (NULL != sc->wh)
   {
@@ -330,7 +282,7 @@
     continue_reading (sc);
     return;
   }
-  sc->wh = GNUNET_MESH_notify_transmit_ready (sc->socket, GNUNET_NO,
+  sc->wh = GNUNET_MESH_notify_transmit_ready (sc->tunnel, GNUNET_NO,
                                              GNUNET_TIME_UNIT_FOREVER_REL,
                                              wqi->msize,                       
              
                                              &write_continuation,
@@ -339,7 +291,9 @@
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                "Write failed; terminating mesh\n");
-    terminate_mesh (sc);
+    tun = sc->tunnel;
+    sc->tunnel = NULL;
+    GNUNET_MESH_tunnel_destroy (tun);
     return;
   }
 }
@@ -361,7 +315,7 @@
  */
 static void 
 handle_datastore_reply (void *cls,
-                       const struct GNUNET_HashCode * key,
+                       const struct GNUNET_HashCode *key,
                        size_t size, const void *data,
                        enum GNUNET_BLOCK_Type type,
                        uint32_t priority,
@@ -378,7 +332,8 @@
   if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Performing on-demand encoding\n");
+               "Performing on-demand encoding for query %s\n",
+               GNUNET_h2s (key));
     if (GNUNET_OK !=
        GNUNET_FS_handle_on_demand_block (key,
                                          size, data, type,
@@ -440,13 +395,11 @@
   struct MeshClient *sc = *tunnel_ctx;
   const struct MeshQueryMessage *sqm;
 
-  fprintf (stderr,
-          "Request gets %p\n", 
-          sc);
   sqm = (const struct MeshQueryMessage *) message;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Received query for `%s' via mesh\n",
-             GNUNET_h2s (&sqm->query));
+             "Received query for `%s' via mesh from client %p\n",
+             GNUNET_h2s (&sqm->query),
+             sc);
   GNUNET_STATISTICS_update (GSF_stats,
                            gettext_noop ("# queries received via mesh"), 1,
                            GNUNET_NO);
@@ -473,7 +426,7 @@
  * Functions of this type are called upon new mesh connection from other peers.
  *
  * @param cls the closure from GNUNET_MESH_connect
- * @param socket the socket representing the mesh
+ * @param tunnel the tunnel representing the mesh
  * @param initiator the identity of the peer who wants to establish a mesh
  *            with us; NULL on binding error
  * @param port mesh port used for the incoming connection
@@ -481,37 +434,35 @@
  */
 static void *
 accept_cb (void *cls,
-          struct GNUNET_MESH_Tunnel *socket,
+          struct GNUNET_MESH_Tunnel *tunnel,
           const struct GNUNET_PeerIdentity *initiator,
           uint32_t port)
 {
   struct MeshClient *sc;
 
-  GNUNET_assert (NULL != socket);
+  GNUNET_assert (NULL != tunnel);
   if (sc_count >= sc_count_max)
   {
     GNUNET_STATISTICS_update (GSF_stats,
                              gettext_noop ("# mesh client connections 
rejected"), 1,
                              GNUNET_NO);
-    GNUNET_MESH_tunnel_destroy (socket);
+    GNUNET_MESH_tunnel_destroy (tunnel);
     return NULL;
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Accepting inbound mesh connection from `%s'\n",
-             GNUNET_i2s (initiator));
   GNUNET_STATISTICS_update (GSF_stats,
                            gettext_noop ("# mesh connections active"), 1,
                            GNUNET_NO);
   sc = GNUNET_new (struct MeshClient);
-  sc->socket = socket;
+  sc->tunnel = tunnel;
   GNUNET_CONTAINER_DLL_insert (sc_head,
                               sc_tail,
                               sc);
   sc_count++;
   refresh_timeout_task (sc);
-  fprintf (stderr,
-          "Accept returns %p\n", 
-          sc);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Accepting inbound mesh connection from `%s' as client %p\n",
+             GNUNET_i2s (initiator),
+             sc);
   return sc;
 }
 
@@ -530,12 +481,37 @@
            void *tunnel_ctx)
 {
   struct MeshClient *sc = tunnel_ctx;
+  struct WriteQueueItem *wqi;
 
-  fprintf (stderr,
-          "Cleaner called with %p\n", 
-          sc);
-  if (NULL != sc)
-    terminate_mesh (sc);
+  if (NULL == sc)
+    return;
+  sc->tunnel = NULL;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Terminating mesh connection with client %p\n",
+             sc);
+  GNUNET_STATISTICS_update (GSF_stats,
+                           gettext_noop ("# mesh connections active"), -1,
+                           GNUNET_NO);
+  if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
+    GNUNET_SCHEDULER_cancel (sc->terminate_task); 
+  if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
+    GNUNET_SCHEDULER_cancel (sc->timeout_task); 
+  if (NULL != sc->wh)
+    GNUNET_MESH_notify_transmit_ready_cancel (sc->wh);
+  if (NULL != sc->qe)
+    GNUNET_DATASTORE_cancel (sc->qe);
+  while (NULL != (wqi = sc->wqi_head))
+  {
+    GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
+                                sc->wqi_tail,
+                                wqi);
+    GNUNET_free (wqi);
+  }
+  GNUNET_CONTAINER_DLL_remove (sc_head,
+                              sc_tail,
+                              sc);
+  sc_count--;
+  GNUNET_free (sc);
 }
 
 
@@ -560,7 +536,7 @@
                                             "MAX_MESH_CLIENTS",
                                             &sc_count_max))
     return;
-  listen_socket = GNUNET_MESH_connect (GSF_cfg,
+  listen_tunnel = GNUNET_MESH_connect (GSF_cfg,
                                       NULL,
                                       &accept_cb,
                                       &cleaner_cb,
@@ -575,15 +551,13 @@
 void
 GSF_mesh_stop_server ()
 {
-  struct MeshClient *sc;
-
-  while (NULL != (sc = sc_head))
-    terminate_mesh (sc);
-  if (NULL != listen_socket)
+  if (NULL != listen_tunnel)
   {
-    GNUNET_MESH_disconnect (listen_socket);
-    listen_socket = NULL;
+    GNUNET_MESH_disconnect (listen_tunnel);
+    listen_tunnel = NULL;
   }
+  GNUNET_assert (NULL == sc_head);
+  GNUNET_assert (0 == sc_count);
 }
 
 /* end of gnunet-service-fs_mesh.c */




reply via email to

[Prev in Thread] Current Thread [Next in Thread]