[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r28142 - gnunet/src/fs
From: |
gnunet |
Subject: |
[GNUnet-SVN] r28142 - gnunet/src/fs |
Date: |
Thu, 18 Jul 2013 13:06:23 +0200 |
Author: grothoff
Date: 2013-07-18 13:06:23 +0200 (Thu, 18 Jul 2013)
New Revision: 28142
Added:
gnunet/src/fs/gnunet-service-fs_mesh_server.c
Removed:
gnunet/src/fs/gnunet-service-fs_mesh.c
Modified:
gnunet/src/fs/Makefile.am
gnunet/src/fs/fs.conf.in
gnunet/src/fs/gnunet-service-fs.c
gnunet/src/fs/gnunet-service-fs_mesh.h
gnunet/src/fs/test_gnunet_service_fs_p2p_mesh.conf
Log:
-splitting mesh into server and client parts
Modified: gnunet/src/fs/Makefile.am
===================================================================
--- gnunet/src/fs/Makefile.am 2013-07-18 11:04:11 UTC (rev 28141)
+++ gnunet/src/fs/Makefile.am 2013-07-18 11:06:23 UTC (rev 28142)
@@ -190,7 +190,8 @@
gnunet-service-fs_pr.c gnunet-service-fs_pr.h \
gnunet-service-fs_push.c gnunet-service-fs_push.h \
gnunet-service-fs_put.c gnunet-service-fs_put.h \
- gnunet-service-fs_mesh.c gnunet-service-fs_mesh.h
+ gnunet-service-fs_mesh_client.c gnunet-service-fs_mesh.h \
+ gnunet-service-fs_mesh_server.c
gnunet_service_fs_LDADD = \
$(top_builddir)/src/fs/libgnunetfs.la \
$(top_builddir)/src/dht/libgnunetdht.la \
Modified: gnunet/src/fs/fs.conf.in
===================================================================
--- gnunet/src/fs/fs.conf.in 2013-07-18 11:04:11 UTC (rev 28141)
+++ gnunet/src/fs/fs.conf.in 2013-07-18 11:06:23 UTC (rev 28142)
@@ -60,7 +60,7 @@
# some reasonable level. And if we have a very, very large
# number, we probably won't have enough bandwidth to suppor them
# well anyway, so better have a moderate cap.
-MAX_STREAM_CLIENTS = 128
+MAX_MESH_CLIENTS = 128
[gnunet-auto-share]
Modified: gnunet/src/fs/gnunet-service-fs.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs.c 2013-07-18 11:04:11 UTC (rev 28141)
+++ gnunet/src/fs/gnunet-service-fs.c 2013-07-18 11:06:23 UTC (rev 28142)
@@ -475,7 +475,8 @@
static void
shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- GSF_mesh_stop ();
+ GSF_mesh_stop_client ();
+ GSF_mesh_stop_server ();
if (NULL != GSF_core)
{
GNUNET_CORE_disconnect (GSF_core);
@@ -646,7 +647,8 @@
GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, &age_cover_counters,
NULL);
datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
- GSF_mesh_start ();
+ GSF_mesh_start_server ();
+ GSF_mesh_start_client ();
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
NULL);
return GNUNET_OK;
Deleted: gnunet/src/fs/gnunet-service-fs_mesh.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_mesh.c 2013-07-18 11:04:11 UTC (rev
28141)
+++ gnunet/src/fs/gnunet-service-fs_mesh.c 2013-07-18 11:06:23 UTC (rev
28142)
@@ -1,1245 +0,0 @@
-/*
- This file is part of GNUnet.
- (C) 2012, 2013 Christian Grothoff (and other contributing authors)
-
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
-*/
-
-/**
- * @file fs/gnunet-service-fs_mesh.c
- * @brief non-anonymous file-transfer
- * @author Christian Grothoff
- *
- * TODO:
- * - MESH2 API doesn't allow flow control for server yet (needed!)
- * - likely need to register clean up handler with mesh to handle
- * client disconnect (likely leaky right now)
- * - server is optional, currently client code will NPE if we have
- * no server, again MESH2 API requirement forcing this for now
- * - message handlers are symmetric for client/server, should be
- * separated (currently clients can get requests and servers can
- * handle answers, not good)
- * - code is entirely untested
- * - might have overlooked a few possible simplifications
- * - PORT is set to old application type, unsure if we should keep
- * it that way (fine for now)
- */
-#include "platform.h"
-#include "gnunet_constants.h"
-#include "gnunet_util_lib.h"
-#include "gnunet_mesh_service.h"
-#include "gnunet_protocols.h"
-#include "gnunet_applications.h"
-#include "gnunet-service-fs.h"
-#include "gnunet-service-fs_indexing.h"
-#include "gnunet-service-fs_mesh.h"
-
-/**
- * After how long do we termiante idle connections?
- */
-#define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES,
2)
-
-/**
- * After how long do we reset connections without replies?
- */
-#define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 30)
-
-
-/**
- * A message in the queue to be written to the mesh.
- */
-struct WriteQueueItem
-{
- /**
- * Kept in a DLL.
- */
- struct WriteQueueItem *next;
-
- /**
- * Kept in a DLL.
- */
- struct WriteQueueItem *prev;
-
- /**
- * Number of bytes of payload, allocated at the end of this struct.
- */
- size_t msize;
-};
-
-
-/**
- * Information we keep around for each active meshing client.
- */
-struct MeshClient
-{
- /**
- * DLL
- */
- struct MeshClient *next;
-
- /**
- * DLL
- */
- struct MeshClient *prev;
-
- /**
- * Socket for communication.
- */
- struct GNUNET_MESH_Tunnel *socket;
-
- /**
- * Handle for active write operation, or NULL.
- */
- struct GNUNET_MESH_TransmitHandle *wh;
-
- /**
- * Head of write queue.
- */
- struct WriteQueueItem *wqi_head;
-
- /**
- * Tail of write queue.
- */
- struct WriteQueueItem *wqi_tail;
-
- /**
- * Current active request to the datastore, if we have one pending.
- */
- struct GNUNET_DATASTORE_QueueEntry *qe;
-
- /**
- * Task that is scheduled to asynchronously terminate the connection.
- */
- GNUNET_SCHEDULER_TaskIdentifier terminate_task;
-
- /**
- * Task that is scheduled to terminate idle connections.
- */
- GNUNET_SCHEDULER_TaskIdentifier timeout_task;
-
- /**
- * Size of the last write that was initiated.
- */
- size_t reply_size;
-
-};
-
-
-/**
- * Query from one peer, asking the other for CHK-data.
- */
-struct MeshQueryMessage
-{
-
- /**
- * Type is GNUNET_MESSAGE_TYPE_FS_MESH_QUERY.
- */
- struct GNUNET_MessageHeader header;
-
- /**
- * Block type must be DBLOCK or IBLOCK.
- */
- uint32_t type;
-
- /**
- * Query hash from CHK (hash of encrypted block).
- */
- struct GNUNET_HashCode query;
-
-};
-
-
-/**
- * Reply to a MeshQueryMessage.
- */
-struct MeshReplyMessage
-{
-
- /**
- * Type is GNUNET_MESSAGE_TYPE_FS_MESH_REPLY.
- */
- struct GNUNET_MessageHeader header;
-
- /**
- * Block type must be DBLOCK or IBLOCK.
- */
- uint32_t type;
-
- /**
- * Expiration time for the block.
- */
- struct GNUNET_TIME_AbsoluteNBO expiration;
-
- /* followed by the encrypted block */
-
-};
-
-
-/**
- * Handle for a mesh to another peer.
- */
-struct MeshHandle;
-
-
-/**
- * Handle for a request that is going out via mesh API.
- */
-struct GSF_MeshRequest
-{
-
- /**
- * DLL.
- */
- struct GSF_MeshRequest *next;
-
- /**
- * DLL.
- */
- struct GSF_MeshRequest *prev;
-
- /**
- * Which mesh is this request associated with?
- */
- struct MeshHandle *sh;
-
- /**
- * Function to call with the result.
- */
- GSF_MeshReplyProcessor proc;
-
- /**
- * Closure for 'proc'
- */
- void *proc_cls;
-
- /**
- * Query to transmit to the other peer.
- */
- struct GNUNET_HashCode query;
-
- /**
- * Desired type for the reply.
- */
- enum GNUNET_BLOCK_Type type;
-
- /**
- * Did we transmit this request already? YES if we are
- * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
- */
- int was_transmitted;
-};
-
-
-/**
- * Handle for a mesh to another peer.
- */
-struct MeshHandle
-{
- /**
- * Head of DLL of pending requests on this mesh.
- */
- struct GSF_MeshRequest *pending_head;
-
- /**
- * Tail of DLL of pending requests on this mesh.
- */
- struct GSF_MeshRequest *pending_tail;
-
- /**
- * Map from query to 'struct GSF_MeshRequest's waiting for
- * a reply.
- */
- struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
-
- /**
- * Connection to the other peer.
- */
- struct GNUNET_MESH_Tunnel *mesh;
-
- /**
- * Handle for active write operation, or NULL.
- */
- struct GNUNET_MESH_TransmitHandle *wh;
-
- /**
- * Which peer does this mesh go to?
- */
- struct GNUNET_PeerIdentity target;
-
- /**
- * Task to kill inactive meshs (we keep them around for
- * a few seconds to give the application a chance to give
- * us another query).
- */
- GNUNET_SCHEDULER_TaskIdentifier timeout_task;
-
- /**
- * Task to reset meshs that had errors (asynchronously,
- * as we may not be able to do it immediately during a
- * callback from the mesh API).
- */
- GNUNET_SCHEDULER_TaskIdentifier reset_task;
-
- /**
- * Is this mesh ready for transmission?
- */
- int is_ready;
-
-};
-
-
-/**
- * Listen socket for incoming requests.
- */
-static struct GNUNET_MESH_Handle *listen_socket;
-
-/**
- * Head of DLL of mesh clients.
- */
-static struct MeshClient *sc_head;
-
-/**
- * Tail of DLL of mesh clients.
- */
-static struct MeshClient *sc_tail;
-
-/**
- * Number of active mesh clients in the 'sc_*'-DLL.
- */
-static unsigned int sc_count;
-
-/**
- * Maximum allowed number of mesh clients.
- */
-static unsigned long long sc_count_max;
-
-/**
- * Map from peer identities to 'struct MeshHandles' with meshs to
- * those peers.
- */
-static struct GNUNET_CONTAINER_MultiHashMap *mesh_map;
-
-
-/* ********************* client-side code ************************* */
-
-/**
- * Iterator called on each entry in a waiting map to
- * call the 'proc' continuation and release associated
- * resources.
- *
- * @param cls the 'struct MeshHandle'
- * @param key the key of the entry in the map (the query)
- * @param value the 'struct GSF_MeshRequest' to clean up
- * @return GNUNET_YES (continue to iterate)
- */
-static int
-free_waiting_entry (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
-{
- struct GSF_MeshRequest *sr = value;
-
- sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
- GNUNET_TIME_UNIT_FOREVER_ABS,
- 0, NULL);
- GSF_mesh_query_cancel (sr);
- return GNUNET_YES;
-}
-
-
-/**
- * Destroy a mesh handle.
- *
- * @param sh mesh to process
- */
-static void
-destroy_mesh_handle (struct MeshHandle *sh)
-{
- struct GSF_MeshRequest *sr;
-
- while (NULL != (sr = sh->pending_head))
- {
- sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
- GNUNET_TIME_UNIT_FOREVER_ABS,
- 0, NULL);
- GSF_mesh_query_cancel (sr);
- }
- GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
- &free_waiting_entry,
- sh);
- if (NULL != sh->wh)
- GNUNET_MESH_notify_transmit_ready_cancel (sh->wh);
- if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
- GNUNET_SCHEDULER_cancel (sh->timeout_task);
- if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
- GNUNET_SCHEDULER_cancel (sh->reset_task);
- GNUNET_MESH_tunnel_destroy (sh->mesh);
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_remove (mesh_map,
- &sh->target.hashPubKey,
- sh));
- GNUNET_CONTAINER_multihashmap_destroy (sh->waiting_map);
- GNUNET_free (sh);
-}
-
-
-/**
- * Transmit pending requests via the mesh.
- *
- * @param sh mesh to process
- */
-static void
-transmit_pending (struct MeshHandle *sh);
-
-
-/**
- * Iterator called on each entry in a waiting map to
- * move it back to the pending list.
- *
- * @param cls the 'struct MeshHandle'
- * @param key the key of the entry in the map (the query)
- * @param value the 'struct GSF_MeshRequest' to move to pending
- * @return GNUNET_YES (continue to iterate)
- */
-static int
-move_to_pending (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
-{
- struct MeshHandle *sh = cls;
- struct GSF_MeshRequest *sr = value;
-
- GNUNET_assert (GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
- key,
- value));
- GNUNET_CONTAINER_DLL_insert (sh->pending_head,
- sh->pending_tail,
- sr);
- sr->was_transmitted = GNUNET_NO;
- return GNUNET_YES;
-}
-
-
-/**
- * We had a serious error, tear down and re-create mesh from scratch.
- *
- * @param sh mesh to reset
- */
-static void
-reset_mesh (struct MeshHandle *sh)
-{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Resetting mesh to %s\n",
- GNUNET_i2s (&sh->target));
- GNUNET_MESH_tunnel_destroy (sh->mesh);
- sh->is_ready = GNUNET_NO;
- GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
- &move_to_pending,
- sh);
- sh->mesh = GNUNET_MESH_tunnel_create (listen_socket,
- sh,
- &sh->target,
-
GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
- GNUNET_YES,
- GNUNET_YES);
-}
-
-
-/**
- * Task called when it is time to destroy an inactive mesh.
- *
- * @param cls the 'struct MeshHandle' to tear down
- * @param tc scheduler context, unused
- */
-static void
-mesh_timeout (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct MeshHandle *sh = cls;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Timeout on mesh to %s\n",
- GNUNET_i2s (&sh->target));
- sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
- destroy_mesh_handle (sh);
-}
-
-
-/**
- * Task called when it is time to reset an mesh.
- *
- * @param cls the 'struct MeshHandle' to tear down
- * @param tc scheduler context, unused
- */
-static void
-reset_mesh_task (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct MeshHandle *sh = cls;
-
- sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
- reset_mesh (sh);
-}
-
-
-/**
- * We had a serious error, tear down and re-create mesh from scratch,
- * but do so asynchronously.
- *
- * @param sh mesh to reset
- */
-static void
-reset_mesh_async (struct MeshHandle *sh)
-{
- if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
- GNUNET_SCHEDULER_cancel (sh->reset_task);
- sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_mesh_task,
- sh);
-}
-
-
-/**
- * Functions of this signature are called whenever we are ready to transmit
- * query via a mesh.
- *
- * @param cls the struct MeshHandle for which we did the write call
- * @param size the number of bytes that can be written to 'buf'
- * @param buf where to write the message
- * @return number of bytes written to 'buf'
- */
-static size_t
-transmit_sqm (void *cls,
- size_t size,
- void *buf)
-{
- struct MeshHandle *sh = cls;
- struct MeshQueryMessage sqm;
- struct GSF_MeshRequest *sr;
-
- sh->wh = NULL;
- if (NULL == buf)
- {
- reset_mesh (sh);
- return 0;
- }
- sr = sh->pending_head;
- if (NULL == sr)
- return 0;
- GNUNET_assert (size >= sizeof (struct MeshQueryMessage));
- GNUNET_CONTAINER_DLL_remove (sh->pending_head,
- sh->pending_tail,
- sr);
- GNUNET_CONTAINER_multihashmap_put (sh->waiting_map,
- &sr->query,
- sr,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending query via mesh to %s\n",
- GNUNET_i2s (&sh->target));
- sr->was_transmitted = GNUNET_YES;
- sqm.header.size = htons (sizeof (sqm));
- sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MESH_QUERY);
- sqm.type = htonl (sr->type);
- sqm.query = sr->query;
- memcpy (buf, &sqm, sizeof (sqm));
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Successfully transmitted %u bytes via mesh to %s\n",
- (unsigned int) size,
- GNUNET_i2s (&sh->target));
- transmit_pending (sh);
- return sizeof (sqm);
-}
-
-
-/**
- * Transmit pending requests via the mesh.
- *
- * @param sh mesh to process
- */
-static void
-transmit_pending (struct MeshHandle *sh)
-{
- if (NULL != sh->wh)
- return;
- sh->wh = GNUNET_MESH_notify_transmit_ready (sh->mesh, GNUNET_YES /* allow
cork */,
- GNUNET_TIME_UNIT_FOREVER_REL,
- sizeof (struct MeshQueryMessage),
- &transmit_sqm, sh);
-}
-
-
-/**
- * Closure for 'handle_reply'.
- */
-struct HandleReplyClosure
-{
-
- /**
- * Reply payload.
- */
- const void *data;
-
- /**
- * Expiration time for the block.
- */
- struct GNUNET_TIME_Absolute expiration;
-
- /**
- * Number of bytes in 'data'.
- */
- size_t data_size;
-
- /**
- * Type of the block.
- */
- enum GNUNET_BLOCK_Type type;
-
- /**
- * Did we have a matching query?
- */
- int found;
-};
-
-
-/**
- * Iterator called on each entry in a waiting map to
- * process a result.
- *
- * @param cls the 'struct HandleReplyClosure'
- * @param key the key of the entry in the map (the query)
- * @param value the 'struct GSF_MeshRequest' to handle result for
- * @return GNUNET_YES (continue to iterate)
- */
-static int
-handle_reply (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
-{
- struct HandleReplyClosure *hrc = cls;
- struct GSF_MeshRequest *sr = value;
-
- sr->proc (sr->proc_cls,
- hrc->type,
- hrc->expiration,
- hrc->data_size,
- hrc->data);
- GSF_mesh_query_cancel (sr);
- hrc->found = GNUNET_YES;
- return GNUNET_YES;
-}
-
-
-/**
- * Functions with this signature are called whenever a
- * complete reply is received.
- *
- * @param cls closure with the 'struct MeshHandle'
- * @param tunnel tunnel handle
- * @param tunnel_ctx tunnel context
- * @param message the actual message
- * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
- */
-static int
-reply_cb (void *cls,
- struct GNUNET_MESH_Tunnel *tunnel,
- void **tunnel_ctx,
- const struct GNUNET_MessageHeader *message)
-{
- struct MeshHandle *sh = *tunnel_ctx;
- const struct MeshReplyMessage *srm;
- struct HandleReplyClosure hrc;
- uint16_t msize;
- enum GNUNET_BLOCK_Type type;
- struct GNUNET_HashCode query;
-
- msize = ntohs (message->size);
- if (sizeof (struct MeshReplyMessage) > msize)
- {
- GNUNET_break_op (0);
- reset_mesh_async (sh);
- return GNUNET_SYSERR;
- }
- srm = (const struct MeshReplyMessage *) message;
- msize -= sizeof (struct MeshReplyMessage);
- type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
- if (GNUNET_YES !=
- GNUNET_BLOCK_get_key (GSF_block_ctx,
- type,
- &srm[1], msize, &query))
- {
- GNUNET_break_op (0);
- reset_mesh_async (sh);
- return GNUNET_SYSERR;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received reply `%s' via mesh\n",
- GNUNET_h2s (&query));
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# replies received via mesh"), 1,
- GNUNET_NO);
- hrc.data = &srm[1];
- hrc.data_size = msize;
- hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
- hrc.type = type;
- hrc.found = GNUNET_NO;
- GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map,
- &query,
- &handle_reply,
- &hrc);
- if (GNUNET_NO == hrc.found)
- {
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# replies received via mesh
dropped"), 1,
- GNUNET_NO);
- return GNUNET_OK;
- }
- return GNUNET_OK;
-}
-
-
-/**
- * Get (or create) a mesh to talk to the given peer.
- *
- * @param target peer we want to communicate with
- */
-static struct MeshHandle *
-get_mesh (const struct GNUNET_PeerIdentity *target)
-{
- struct MeshHandle *sh;
-
- sh = GNUNET_CONTAINER_multihashmap_get (mesh_map,
- &target->hashPubKey);
- if (NULL != sh)
- {
- if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
- {
- GNUNET_SCHEDULER_cancel (sh->timeout_task);
- sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
- }
- return sh;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Creating mesh to %s\n",
- GNUNET_i2s (target));
- sh = GNUNET_malloc (sizeof (struct MeshHandle));
- sh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
- &reset_mesh_task,
- sh);
- sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES);
- sh->target = *target;
- sh->mesh = GNUNET_MESH_tunnel_create (listen_socket,
- sh,
- &sh->target,
-
GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
- GNUNET_NO,
- GNUNET_YES);
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_put (mesh_map,
- &sh->target.hashPubKey,
- sh,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- return sh;
-}
-
-
-/**
- * Look for a block by directly contacting a particular peer.
- *
- * @param target peer that should have the block
- * @param query hash to query for the block
- * @param type desired type for the block
- * @param proc function to call with result
- * @param proc_cls closure for 'proc'
- * @return handle to cancel the operation
- */
-struct GSF_MeshRequest *
-GSF_mesh_query (const struct GNUNET_PeerIdentity *target,
- const struct GNUNET_HashCode *query,
- enum GNUNET_BLOCK_Type type,
- GSF_MeshReplyProcessor proc, void *proc_cls)
-{
- struct MeshHandle *sh;
- struct GSF_MeshRequest *sr;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Preparing to send query for %s via mesh to %s\n",
- GNUNET_h2s (query),
- GNUNET_i2s (target));
- sh = get_mesh (target);
- sr = GNUNET_malloc (sizeof (struct GSF_MeshRequest));
- sr->sh = sh;
- sr->proc = proc;
- sr->proc_cls = proc_cls;
- sr->type = type;
- sr->query = *query;
- GNUNET_CONTAINER_DLL_insert (sh->pending_head,
- sh->pending_tail,
- sr);
- if (GNUNET_YES == sh->is_ready)
- transmit_pending (sh);
- return sr;
-}
-
-
-/**
- * Cancel an active request; must not be called after 'proc'
- * was calld.
- *
- * @param sr request to cancel
- */
-void
-GSF_mesh_query_cancel (struct GSF_MeshRequest *sr)
-{
- struct MeshHandle *sh = sr->sh;
-
- if (GNUNET_YES == sr->was_transmitted)
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
- &sr->query,
- sr));
- else
- GNUNET_CONTAINER_DLL_remove (sh->pending_head,
- sh->pending_tail,
- sr);
- GNUNET_free (sr);
- if ( (0 == GNUNET_CONTAINER_multihashmap_size (sh->waiting_map)) &&
- (NULL == sh->pending_head) )
- sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
- &mesh_timeout,
- sh);
-}
-
-
-/* ********************* server-side code ************************* */
-
-
-/**
- * We're done with a particular client, clean up.
- *
- * @param sc client to clean up
- */
-static void
-terminate_mesh (struct MeshClient *sc)
-{
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# mesh connections active"), -1,
- GNUNET_NO);
- if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
- GNUNET_SCHEDULER_cancel (sc->terminate_task);
- if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
- GNUNET_SCHEDULER_cancel (sc->timeout_task);
- if (NULL != sc->wh)
- GNUNET_MESH_notify_transmit_ready_cancel (sc->wh);
- if (NULL != sc->qe)
- GNUNET_DATASTORE_cancel (sc->qe);
- GNUNET_MESH_tunnel_destroy (sc->socket);
- struct WriteQueueItem *wqi;
- while (NULL != (wqi = sc->wqi_head))
- {
- GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
- sc->wqi_tail,
- wqi);
- GNUNET_free (wqi);
- }
- GNUNET_CONTAINER_DLL_remove (sc_head,
- sc_tail,
- sc);
- sc_count--;
- GNUNET_free (sc);
-}
-
-
-/**
- * Task run to asynchronously terminate the mesh due to timeout.
- *
- * @param cls the 'struct MeshClient'
- * @param tc scheduler context
- */
-static void
-timeout_mesh_task (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct MeshClient *sc = cls;
-
- sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
- terminate_mesh (sc);
-}
-
-
-/**
- * Reset the timeout for the mesh client (due to activity).
- *
- * @param sc client handle to reset timeout for
- */
-static void
-refresh_timeout_task (struct MeshClient *sc)
-{
- if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
- GNUNET_SCHEDULER_cancel (sc->timeout_task);
- sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
- &timeout_mesh_task,
- sc);
-}
-
-
-/**
- * We're done handling a request from a client, read the next one.
- *
- * @param sc client to continue reading requests from
- */
-static void
-continue_reading (struct MeshClient *sc)
-{
- refresh_timeout_task (sc);
-}
-
-
-/**
- * Transmit the next entry from the write queue.
- *
- * @param sc where to process the write queue
- */
-static void
-continue_writing (struct MeshClient *sc);
-
-
-/**
- * Send a reply now, mesh is ready.
- *
- * @param cls closure with the struct MeshClient which sent the query
- * @param size number of bytes available in 'buf'
- * @param buf where to write the message
- * @return number of bytes written to 'buf'
- */
-static size_t
-write_continuation (void *cls,
- size_t size,
- void *buf)
-{
- struct MeshClient *sc = cls;
- struct WriteQueueItem *wqi;
- size_t ret;
-
- sc->wh = NULL;
- if (NULL == (wqi = sc->wqi_head))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Write queue empty, reading more requests\n");
- return 0;
- }
- if (0 == size)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmission of reply failed, terminating mesh\n");
- terminate_mesh (sc);
- return 0;
- }
- GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
- sc->wqi_tail,
- wqi);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitted %u byte reply via mesh\n",
- (unsigned int) size);
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# Blocks transferred via mesh"), 1,
- GNUNET_NO);
- memcpy (buf, &wqi[1], ret = wqi->msize);
- GNUNET_free (wqi);
- continue_writing (sc);
- return ret;
-}
-
-
-/**
- * Transmit the next entry from the write queue.
- *
- * @param sc where to process the write queue
- */
-static void
-continue_writing (struct MeshClient *sc)
-{
- struct WriteQueueItem *wqi;
-
- if (NULL != sc->wh)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Write pending, waiting for it to complete\n");
- return; /* write already pending */
- }
- if (NULL == (wqi = sc->wqi_head))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Write queue empty, reading more requests\n");
- continue_reading (sc);
- return;
- }
- sc->wh = GNUNET_MESH_notify_transmit_ready (sc->socket, GNUNET_NO,
- GNUNET_TIME_UNIT_FOREVER_REL,
- wqi->msize,
- &write_continuation,
- sc);
- if (NULL == sc->wh)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Write failed; terminating mesh\n");
- terminate_mesh (sc);
- return;
- }
-}
-
-
-/**
- * Process a datum that was stored in the datastore.
- *
- * @param cls closure with the struct MeshClient which sent the query
- * @param key key for the content
- * @param size number of bytes in data
- * @param data content stored
- * @param type type of the content
- * @param priority priority of the content
- * @param anonymity anonymity-level for the content
- * @param expiration expiration time for the content
- * @param uid unique identifier for the datum;
- * maybe 0 if no unique identifier is available
- */
-static void
-handle_datastore_reply (void *cls,
- const struct GNUNET_HashCode * key,
- size_t size, const void *data,
- enum GNUNET_BLOCK_Type type,
- uint32_t priority,
- uint32_t anonymity,
- struct GNUNET_TIME_Absolute
- expiration, uint64_t uid)
-{
- struct MeshClient *sc = cls;
- size_t msize = size + sizeof (struct MeshReplyMessage);
- struct WriteQueueItem *wqi;
- struct MeshReplyMessage *srm;
-
- sc->qe = NULL;
- if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Performing on-demand encoding\n");
- if (GNUNET_OK !=
- GNUNET_FS_handle_on_demand_block (key,
- size, data, type,
- priority, anonymity,
- expiration, uid,
- &handle_datastore_reply,
- sc))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "On-demand encoding request failed\n");
- continue_writing (sc);
- }
- return;
- }
- if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
- {
- GNUNET_break (0);
- continue_writing (sc);
- return;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Starting transmission of %u byte reply for query `%s' via
mesh\n",
- (unsigned int) size,
- GNUNET_h2s (key));
- wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
- wqi->msize = msize;
- srm = (struct MeshReplyMessage *) &wqi[1];
- srm->header.size = htons ((uint16_t) msize);
- srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MESH_REPLY);
- srm->type = htonl (type);
- srm->expiration = GNUNET_TIME_absolute_hton (expiration);
- memcpy (&srm[1], data, size);
- sc->reply_size = msize;
- GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
- sc->wqi_tail,
- wqi);
- continue_writing (sc);
-}
-
-
-/**
- * Functions with this signature are called whenever a
- * complete query message is received.
- *
- * Do not call GNUNET_SERVER_mst_destroy in callback
- *
- * @param cls closure with the 'struct MeshClient'
- * @param tunnel tunnel handle
- * @param tunnel_ctx tunnel context
- * @param message the actual message
- * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
- */
-static int
-request_cb (void *cls,
- struct GNUNET_MESH_Tunnel *tunnel,
- void **tunnel_ctx,
- const struct GNUNET_MessageHeader *message)
-{
- struct MeshClient *sc = *tunnel_ctx;
- const struct MeshQueryMessage *sqm;
-
- sqm = (const struct MeshQueryMessage *) message;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received query for `%s' via mesh\n",
- GNUNET_h2s (&sqm->query));
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# queries received via mesh"), 1,
- GNUNET_NO);
- refresh_timeout_task (sc);
- sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
- 0,
- &sqm->query,
- ntohl (sqm->type),
- 0 /* priority */,
- GSF_datastore_queue_size,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &handle_datastore_reply, sc);
- if (NULL == sc->qe)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Queueing request with datastore failed (queue full?)\n");
- continue_writing (sc);
- }
- return GNUNET_OK;
-}
-
-
-/**
- * Functions of this type are called upon new mesh connection from other peers.
- *
- * @param cls the closure from GNUNET_MESH_connect
- * @param socket the socket representing the mesh
- * @param initiator the identity of the peer who wants to establish a mesh
- * with us; NULL on binding error
- * @param port mesh port used for the incoming connection
- * @return initial tunnel context (our 'struct MeshClient')
- */
-static void *
-accept_cb (void *cls,
- struct GNUNET_MESH_Tunnel *socket,
- const struct GNUNET_PeerIdentity *initiator,
- uint32_t port)
-{
- struct MeshClient *sc;
-
- GNUNET_assert (NULL != socket);
- if (sc_count >= sc_count_max)
- {
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# mesh client connections
rejected"), 1,
- GNUNET_NO);
- GNUNET_MESH_tunnel_destroy (socket);
- return NULL;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Accepting inbound mesh connection from `%s'\n",
- GNUNET_i2s (initiator));
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# mesh connections active"), 1,
- GNUNET_NO);
- sc = GNUNET_malloc (sizeof (struct MeshClient));
- sc->socket = socket;
- GNUNET_CONTAINER_DLL_insert (sc_head,
- sc_tail,
- sc);
- sc_count++;
- refresh_timeout_task (sc);
- return sc;
-}
-
-
-/**
- * Initialize subsystem for non-anonymous file-sharing.
- */
-void
-GSF_mesh_start ()
-{
- static const struct GNUNET_MESH_MessageHandler handlers[] = {
- { &request_cb, GNUNET_MESSAGE_TYPE_FS_MESH_QUERY, sizeof (struct
MeshQueryMessage)},
- { &reply_cb, GNUNET_MESSAGE_TYPE_FS_MESH_REPLY, 0 },
- { NULL, 0, 0 }
- };
- static const uint32_t ports[] = {
- GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
- 0
- };
-
- mesh_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
- if (GNUNET_YES ==
- GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
- "fs",
- "MAX_MESH_CLIENTS",
- &sc_count_max))
- {
- listen_socket = GNUNET_MESH_connect (GSF_cfg,
- NULL,
- &accept_cb,
- NULL /* FIXME: have a cleanup
callback? */,
- handlers,
- ports);
- }
-}
-
-
-/**
- * Function called on each active meshs to shut them down.
- *
- * @param cls NULL
- * @param key target peer, unused
- * @param value the 'struct MeshHandle' to destroy
- * @return GNUNET_YES (continue to iterate)
- */
-static int
-release_meshs (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
-{
- struct MeshHandle *sh = value;
-
- destroy_mesh_handle (sh);
- return GNUNET_YES;
-}
-
-
-/**
- * Shutdown subsystem for non-anonymous file-sharing.
- */
-void
-GSF_mesh_stop ()
-{
- struct MeshClient *sc;
-
- while (NULL != (sc = sc_head))
- terminate_mesh (sc);
- GNUNET_CONTAINER_multihashmap_iterate (mesh_map,
- &release_meshs,
- NULL);
- GNUNET_CONTAINER_multihashmap_destroy (mesh_map);
- mesh_map = NULL;
- if (NULL != listen_socket)
- {
- GNUNET_MESH_disconnect (listen_socket);
- listen_socket = NULL;
- }
-}
-
-/* end of gnunet-service-fs_mesh.c */
Modified: gnunet/src/fs/gnunet-service-fs_mesh.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs_mesh.h 2013-07-18 11:04:11 UTC (rev
28141)
+++ gnunet/src/fs/gnunet-service-fs_mesh.h 2013-07-18 11:06:23 UTC (rev
28142)
@@ -79,13 +79,81 @@
* Initialize subsystem for non-anonymous file-sharing.
*/
void
-GSF_mesh_start (void);
+GSF_mesh_start_server (void);
/**
* Shutdown subsystem for non-anonymous file-sharing.
*/
void
-GSF_mesh_stop (void);
+GSF_mesh_stop_server (void);
+/**
+ * Initialize subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_mesh_start_client (void);
+
+
+/**
+ * Shutdown subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_mesh_stop_client (void);
+
+
+GNUNET_NETWORK_STRUCT_BEGIN
+
+/**
+ * Query from one peer, asking the other for CHK-data.
+ */
+struct MeshQueryMessage
+{
+
+ /**
+ * Type is GNUNET_MESSAGE_TYPE_FS_MESH_QUERY.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Block type must be DBLOCK or IBLOCK.
+ */
+ uint32_t type GNUNET_PACKED;
+
+ /**
+ * Query hash from CHK (hash of encrypted block).
+ */
+ struct GNUNET_HashCode query;
+
+};
+
+
+/**
+ * Reply to a MeshQueryMessage.
+ */
+struct MeshReplyMessage
+{
+
+ /**
+ * Type is GNUNET_MESSAGE_TYPE_FS_MESH_REPLY.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Block type must be DBLOCK or IBLOCK.
+ */
+ uint32_t type GNUNET_PACKED;
+
+ /**
+ * Expiration time for the block.
+ */
+ struct GNUNET_TIME_AbsoluteNBO expiration;
+
+ /* followed by the encrypted block */
+
+};
+
+GNUNET_NETWORK_STRUCT_END
+
+
#endif
Copied: gnunet/src/fs/gnunet-service-fs_mesh_server.c (from rev 28132,
gnunet/src/fs/gnunet-service-fs_mesh.c)
===================================================================
--- gnunet/src/fs/gnunet-service-fs_mesh_server.c
(rev 0)
+++ gnunet/src/fs/gnunet-service-fs_mesh_server.c 2013-07-18 11:06:23 UTC
(rev 28142)
@@ -0,0 +1,589 @@
+/*
+ This file is part of GNUnet.
+ (C) 2012, 2013 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file fs/gnunet-service-fs_mesh.c
+ * @brief non-anonymous file-transfer
+ * @author Christian Grothoff
+ *
+ * TODO:
+ * - MESH2 API doesn't allow flow control for server yet (needed!)
+ * - likely need to register clean up handler with mesh to handle
+ * client disconnect (likely leaky right now)
+ * - server is optional, currently client code will NPE if we have
+ * no server, again MESH2 API requirement forcing this for now
+ * - message handlers are symmetric for client/server, should be
+ * separated (currently clients can get requests and servers can
+ * handle answers, not good)
+ * - code is entirely untested
+ * - might have overlooked a few possible simplifications
+ * - PORT is set to old application type, unsure if we should keep
+ * it that way (fine for now)
+ */
+#include "platform.h"
+#include "gnunet_constants.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_mesh_service.h"
+#include "gnunet_protocols.h"
+#include "gnunet_applications.h"
+#include "gnunet-service-fs.h"
+#include "gnunet-service-fs_indexing.h"
+#include "gnunet-service-fs_mesh.h"
+
+/**
+ * After how long do we termiante idle connections?
+ */
+#define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES,
2)
+
+
+/**
+ * A message in the queue to be written to the mesh.
+ */
+struct WriteQueueItem
+{
+ /**
+ * Kept in a DLL.
+ */
+ struct WriteQueueItem *next;
+
+ /**
+ * Kept in a DLL.
+ */
+ struct WriteQueueItem *prev;
+
+ /**
+ * Number of bytes of payload, allocated at the end of this struct.
+ */
+ size_t msize;
+};
+
+
+/**
+ * Information we keep around for each active meshing client.
+ */
+struct MeshClient
+{
+ /**
+ * DLL
+ */
+ struct MeshClient *next;
+
+ /**
+ * DLL
+ */
+ struct MeshClient *prev;
+
+ /**
+ * Socket for communication.
+ */
+ struct GNUNET_MESH_Tunnel *socket;
+
+ /**
+ * Handle for active write operation, or NULL.
+ */
+ struct GNUNET_MESH_TransmitHandle *wh;
+
+ /**
+ * Head of write queue.
+ */
+ struct WriteQueueItem *wqi_head;
+
+ /**
+ * Tail of write queue.
+ */
+ struct WriteQueueItem *wqi_tail;
+
+ /**
+ * Current active request to the datastore, if we have one pending.
+ */
+ struct GNUNET_DATASTORE_QueueEntry *qe;
+
+ /**
+ * Task that is scheduled to asynchronously terminate the connection.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier terminate_task;
+
+ /**
+ * Task that is scheduled to terminate idle connections.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
+ /**
+ * Size of the last write that was initiated.
+ */
+ size_t reply_size;
+
+};
+
+
+/**
+ * Listen socket for incoming requests.
+ */
+static struct GNUNET_MESH_Handle *listen_socket;
+
+/**
+ * Head of DLL of mesh clients.
+ */
+static struct MeshClient *sc_head;
+
+/**
+ * Tail of DLL of mesh clients.
+ */
+static struct MeshClient *sc_tail;
+
+/**
+ * Number of active mesh clients in the 'sc_*'-DLL.
+ */
+static unsigned int sc_count;
+
+/**
+ * Maximum allowed number of mesh clients.
+ */
+static unsigned long long sc_count_max;
+
+
+
+/* ********************* server-side code ************************* */
+
+
+/**
+ * We're done with a particular client, clean up.
+ *
+ * @param sc client to clean up
+ */
+static void
+terminate_mesh (struct MeshClient *sc)
+{
+ struct WriteQueueItem *wqi;
+
+ fprintf (stderr,
+ "terminate mesh called for %p\n",
+ sc);
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# mesh connections active"), -1,
+ GNUNET_NO);
+ if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
+ GNUNET_SCHEDULER_cancel (sc->terminate_task);
+ if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
+ GNUNET_SCHEDULER_cancel (sc->timeout_task);
+ if (NULL != sc->wh)
+ GNUNET_MESH_notify_transmit_ready_cancel (sc->wh);
+ if (NULL != sc->qe)
+ GNUNET_DATASTORE_cancel (sc->qe);
+ while (NULL != (wqi = sc->wqi_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
+ sc->wqi_tail,
+ wqi);
+ GNUNET_free (wqi);
+ }
+ GNUNET_CONTAINER_DLL_remove (sc_head,
+ sc_tail,
+ sc);
+ sc_count--;
+ GNUNET_free (sc);
+}
+
+
+/**
+ * Task run to asynchronously terminate the mesh due to timeout.
+ *
+ * @param cls the 'struct MeshClient'
+ * @param tc scheduler context
+ */
+static void
+timeout_mesh_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct MeshClient *sc = cls;
+ struct GNUNET_MESH_Tunnel *tun;
+
+ sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ tun = sc->socket;
+ sc->socket = NULL;
+ GNUNET_MESH_tunnel_destroy (tun);
+}
+
+
+/**
+ * Reset the timeout for the mesh client (due to activity).
+ *
+ * @param sc client handle to reset timeout for
+ */
+static void
+refresh_timeout_task (struct MeshClient *sc)
+{
+ if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
+ GNUNET_SCHEDULER_cancel (sc->timeout_task);
+ sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
+ &timeout_mesh_task,
+ sc);
+}
+
+
+/**
+ * We're done handling a request from a client, read the next one.
+ *
+ * @param sc client to continue reading requests from
+ */
+static void
+continue_reading (struct MeshClient *sc)
+{
+ refresh_timeout_task (sc);
+ GNUNET_MESH_receive_done (sc->socket);
+}
+
+
+/**
+ * Transmit the next entry from the write queue.
+ *
+ * @param sc where to process the write queue
+ */
+static void
+continue_writing (struct MeshClient *sc);
+
+
+/**
+ * Send a reply now, mesh is ready.
+ *
+ * @param cls closure with the struct MeshClient which sent the query
+ * @param size number of bytes available in 'buf'
+ * @param buf where to write the message
+ * @return number of bytes written to 'buf'
+ */
+static size_t
+write_continuation (void *cls,
+ size_t size,
+ void *buf)
+{
+ struct MeshClient *sc = cls;
+ struct WriteQueueItem *wqi;
+ size_t ret;
+
+ sc->wh = NULL;
+ if (NULL == (wqi = sc->wqi_head))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Write queue empty, reading more requests\n");
+ return 0;
+ }
+ if (0 == size)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmission of reply failed, terminating mesh\n");
+ terminate_mesh (sc);
+ return 0;
+ }
+ GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
+ sc->wqi_tail,
+ wqi);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmitted %u byte reply via mesh\n",
+ (unsigned int) size);
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# Blocks transferred via mesh"), 1,
+ GNUNET_NO);
+ memcpy (buf, &wqi[1], ret = wqi->msize);
+ GNUNET_free (wqi);
+ continue_writing (sc);
+ return ret;
+}
+
+
+/**
+ * Transmit the next entry from the write queue.
+ *
+ * @param sc where to process the write queue
+ */
+static void
+continue_writing (struct MeshClient *sc)
+{
+ struct WriteQueueItem *wqi;
+
+ if (NULL != sc->wh)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Write pending, waiting for it to complete\n");
+ return; /* write already pending */
+ }
+ if (NULL == (wqi = sc->wqi_head))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Write queue empty, reading more requests\n");
+ continue_reading (sc);
+ return;
+ }
+ sc->wh = GNUNET_MESH_notify_transmit_ready (sc->socket, GNUNET_NO,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ wqi->msize,
+ &write_continuation,
+ sc);
+ if (NULL == sc->wh)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Write failed; terminating mesh\n");
+ terminate_mesh (sc);
+ return;
+ }
+}
+
+
+/**
+ * Process a datum that was stored in the datastore.
+ *
+ * @param cls closure with the struct MeshClient which sent the query
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ * maybe 0 if no unique identifier is available
+ */
+static void
+handle_datastore_reply (void *cls,
+ const struct GNUNET_HashCode * key,
+ size_t size, const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ struct GNUNET_TIME_Absolute
+ expiration, uint64_t uid)
+{
+ struct MeshClient *sc = cls;
+ size_t msize = size + sizeof (struct MeshReplyMessage);
+ struct WriteQueueItem *wqi;
+ struct MeshReplyMessage *srm;
+
+ sc->qe = NULL;
+ if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Performing on-demand encoding\n");
+ if (GNUNET_OK !=
+ GNUNET_FS_handle_on_demand_block (key,
+ size, data, type,
+ priority, anonymity,
+ expiration, uid,
+ &handle_datastore_reply,
+ sc))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "On-demand encoding request failed\n");
+ continue_writing (sc);
+ }
+ return;
+ }
+ if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
+ {
+ GNUNET_break (0);
+ continue_writing (sc);
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Starting transmission of %u byte reply for query `%s' via
mesh\n",
+ (unsigned int) size,
+ GNUNET_h2s (key));
+ wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
+ wqi->msize = msize;
+ srm = (struct MeshReplyMessage *) &wqi[1];
+ srm->header.size = htons ((uint16_t) msize);
+ srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MESH_REPLY);
+ srm->type = htonl (type);
+ srm->expiration = GNUNET_TIME_absolute_hton (expiration);
+ memcpy (&srm[1], data, size);
+ sc->reply_size = msize;
+ GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
+ sc->wqi_tail,
+ wqi);
+ continue_writing (sc);
+}
+
+
+/**
+ * Functions with this signature are called whenever a
+ * complete query message is received.
+ *
+ * Do not call GNUNET_SERVER_mst_destroy in callback
+ *
+ * @param cls closure with the 'struct MeshClient'
+ * @param tunnel tunnel handle
+ * @param tunnel_ctx tunnel context
+ * @param message the actual message
+ * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
+ */
+static int
+request_cb (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_MessageHeader *message)
+{
+ struct MeshClient *sc = *tunnel_ctx;
+ const struct MeshQueryMessage *sqm;
+
+ fprintf (stderr,
+ "Request gets %p\n",
+ sc);
+ sqm = (const struct MeshQueryMessage *) message;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received query for `%s' via mesh\n",
+ GNUNET_h2s (&sqm->query));
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# queries received via mesh"), 1,
+ GNUNET_NO);
+ refresh_timeout_task (sc);
+ sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
+ 0,
+ &sqm->query,
+ ntohl (sqm->type),
+ 0 /* priority */,
+ GSF_datastore_queue_size,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &handle_datastore_reply, sc);
+ if (NULL == sc->qe)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Queueing request with datastore failed (queue full?)\n");
+ continue_writing (sc);
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Functions of this type are called upon new mesh connection from other peers.
+ *
+ * @param cls the closure from GNUNET_MESH_connect
+ * @param socket the socket representing the mesh
+ * @param initiator the identity of the peer who wants to establish a mesh
+ * with us; NULL on binding error
+ * @param port mesh port used for the incoming connection
+ * @return initial tunnel context (our 'struct MeshClient')
+ */
+static void *
+accept_cb (void *cls,
+ struct GNUNET_MESH_Tunnel *socket,
+ const struct GNUNET_PeerIdentity *initiator,
+ uint32_t port)
+{
+ struct MeshClient *sc;
+
+ GNUNET_assert (NULL != socket);
+ if (sc_count >= sc_count_max)
+ {
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# mesh client connections
rejected"), 1,
+ GNUNET_NO);
+ GNUNET_MESH_tunnel_destroy (socket);
+ return NULL;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Accepting inbound mesh connection from `%s'\n",
+ GNUNET_i2s (initiator));
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# mesh connections active"), 1,
+ GNUNET_NO);
+ sc = GNUNET_new (struct MeshClient);
+ sc->socket = socket;
+ GNUNET_CONTAINER_DLL_insert (sc_head,
+ sc_tail,
+ sc);
+ sc_count++;
+ refresh_timeout_task (sc);
+ fprintf (stderr,
+ "Accept returns %p\n",
+ sc);
+ return sc;
+}
+
+
+/**
+ * Function called by mesh when a client disconnects.
+ * Cleans up our 'struct MeshClient' of that tunnel.
+ *
+ * @param cls NULL
+ * @param tunnel tunnel of the disconnecting client
+ * @param tunnel_ctx our 'struct MeshClient'
+ */
+static void
+cleaner_cb (void *cls,
+ const struct GNUNET_MESH_Tunnel *tunnel,
+ void *tunnel_ctx)
+{
+ struct MeshClient *sc = tunnel_ctx;
+
+ fprintf (stderr,
+ "Cleaner called with %p\n",
+ sc);
+ if (NULL != sc)
+ terminate_mesh (sc);
+}
+
+
+/**
+ * Initialize subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_mesh_start_server ()
+{
+ static const struct GNUNET_MESH_MessageHandler handlers[] = {
+ { &request_cb, GNUNET_MESSAGE_TYPE_FS_MESH_QUERY, sizeof (struct
MeshQueryMessage)},
+ { NULL, 0, 0 }
+ };
+ static const uint32_t ports[] = {
+ GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
+ 0
+ };
+
+ if (GNUNET_YES !=
+ GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
+ "fs",
+ "MAX_MESH_CLIENTS",
+ &sc_count_max))
+ return;
+ listen_socket = GNUNET_MESH_connect (GSF_cfg,
+ NULL,
+ &accept_cb,
+ &cleaner_cb,
+ handlers,
+ ports);
+}
+
+
+/**
+ * Shutdown subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_mesh_stop_server ()
+{
+ struct MeshClient *sc;
+
+ while (NULL != (sc = sc_head))
+ terminate_mesh (sc);
+ if (NULL != listen_socket)
+ {
+ GNUNET_MESH_disconnect (listen_socket);
+ listen_socket = NULL;
+ }
+}
+
+/* end of gnunet-service-fs_mesh.c */
Modified: gnunet/src/fs/test_gnunet_service_fs_p2p_mesh.conf
===================================================================
--- gnunet/src/fs/test_gnunet_service_fs_p2p_mesh.conf 2013-07-18 11:04:11 UTC
(rev 28141)
+++ gnunet/src/fs/test_gnunet_service_fs_p2p_mesh.conf 2013-07-18 11:06:23 UTC
(rev 28142)
@@ -14,3 +14,7 @@
# (may improve anonymity, probably not a good idea if content_caching is NO)
CONTENT_PUSHING = NO
+#PREFIX = valgrind
+
+[mesh]
+#PREFIX = valgrind
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r28142 - gnunet/src/fs,
gnunet <=