gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r14288 - gnunet/src/fs


From: gnunet
Subject: [GNUnet-SVN] r14288 - gnunet/src/fs
Date: Mon, 31 Jan 2011 00:23:13 +0100

Author: grothoff
Date: 2011-01-31 00:23:13 +0100 (Mon, 31 Jan 2011)
New Revision: 14288

Modified:
   gnunet/src/fs/gnunet-service-fs_cp.c
   gnunet/src/fs/gnunet-service-fs_cp.h
Log:
stuff

Modified: gnunet/src/fs/gnunet-service-fs_cp.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_cp.c        2011-01-30 20:53:33 UTC (rev 
14287)
+++ gnunet/src/fs/gnunet-service-fs_cp.c        2011-01-30 23:23:13 UTC (rev 
14288)
@@ -27,16 +27,66 @@
 #include "gnunet-service-fs.h"
 #include "gnunet-service-fs_cp.h"
 
+/**
+ * How often do we flush trust values to disk?
+ */
+#define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_MINUTES, 5)
 
+
 struct GSF_PeerTransmitHandle
 {
 
   /**
+   * Handle for an active request for transmission to this
+   * peer, or NULL (if core queue was full).
+   */
+  struct GNUNET_CORE_TransmitHandle *cth;
+
+  /**
    * Time when this transmission request was issued.
    */
   struct GNUNET_TIME_Absolute transmission_request_start_time;
 
+  /**
+   * Timeout for this request.
+   */
+  struct GNUNET_TIME_Absolute timeout;
 
+  /**
+   * Task called on timeout, or 0 for none.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
+  /**
+   * Function to call to get the actual message.
+   */
+  GSF_GetMessageCallback gmc;
+
+  /**
+   * Peer this request targets.
+   */
+  struct GSF_ConnectedPeer *cp;
+
+  /**
+   * Closure for 'gmc'.
+   */
+  void *gmc_cls;
+
+  /**
+   * Size of the message to be transmitted.
+   */
+  size_t size;
+
+  /**
+   * GNUNET_YES if this is a query, GNUNET_NO for content.
+   */
+  int is_query;
+
+  /**
+   * Priority of this request.
+   */
+  uint32_t priority;
+
 };
 
 
@@ -58,12 +108,6 @@
   struct GNUNET_TIME_Absolute last_migration_block;
 
   /**
-   * Handle for an active request for transmission to this
-   * peer, or NULL.
-   */
-  struct GNUNET_CORE_TransmitHandle *cth;
-
-  /**
    * Messages (replies, queries, content migration) we would like to
    * send to this peer in the near future.  Sorted by priority, head.
    */
@@ -127,6 +171,74 @@
 
 
 /**
+ * Map from peer identities to 'struct GSF_ConnectPeer' entries.
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *cp_map;
+
+
+/**
+ * Where do we store trust information?
+ */
+static char *trustDirectory;
+
+
+/**
+ * Get the filename under which we would store the GNUNET_HELLO_Message
+ * for the given host and protocol.
+ * @return filename of the form DIRECTORY/HOSTID
+ */
+static char *
+get_trust_filename (const struct GNUNET_PeerIdentity *id)
+{
+  struct GNUNET_CRYPTO_HashAsciiEncoded fil;
+  char *fn;
+
+  GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
+  GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
+  return fn;
+}
+
+
+/**
+ * Find latency information in 'atsi'.
+ *
+ * @param atsi performance data
+ * @return connection latency
+ */
+static struct GNUNET_TIME_Relative
+get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
+{
+  if (atsi == NULL)
+    return GNUNET_TIME_UNIT_SECONDS;
+  while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
+         (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) )
+    atsi++;
+  if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) 
+    {
+      GNUNET_break (0);
+      /* how can we not have latency data? */
+      return GNUNET_TIME_UNIT_SECONDS;
+    }
+  return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+                                       ntohl (atsi->value));
+}
+
+
+/**
+ * Update the performance information kept for the given peer.
+ *
+ * @param cp peer record to update
+ * @param atsi transport performance data
+ */
+static void
+update_atsi (struct GSF_ConnectedPeer *cp,
+            const struct GNUNET_TRANSPORT_ATS_Information *atsi)
+{
+  // FIXME: merge atsi into cp's performance data!
+}
+
+
+/**
  * A peer connected to us.  Setup the connected peer
  * records.
  *
@@ -138,12 +250,89 @@
 GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
                           const struct GNUNET_TRANSPORT_ATS_Information *atsi)
 {
-  // FIXME
-  return NULL;
+  struct GSF_ConnectedPeer *cp;
+  char *fn;
+  uint32_t trust;
+  struct GNUNET_TIME_Relative latency;
+
+  cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer));
+  cp->transmission_delay = GNUNET_LOAD_value_init (latency);
+  cp->pid = GNUNET_PEER_intern (peer);
+  fn = get_trust_filename (peer);
+  if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
+      (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
+    cp->disk_trust = cp->trust = ntohl (trust);
+  GNUNET_free (fn);
+  GNUNET_break (GNUNET_OK ==
+               GNUNET_CONTAINER_multihashmap_put (cp_map,
+                                                  &peer->hashPubKey,
+                                                  cp,
+                                                  
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  update_atsi (cp, atsi);
+
+
+  // FIXME: notify plan & migration about new peer!
+  
+  return cp;
 }
 
 
 /**
+ * Core is ready to transmit to a peer, get the message.
+ *
+ * @param cls the 'struct GSF_PeerTransmitHandle' of the message
+ * @param size number of bytes core is willing to take
+ * @param buf where to copy the message
+ * @return number of bytes copied to buf
+ */
+static size_t
+peer_transmit_ready_cb (void *cls,
+                       size_t size,
+                       void *buf)
+{
+  struct GSF_PeerTransmitHandle *pth = cls;
+  struct GSF_ConnectedPeer *cp;
+  size_t ret;
+
+  cp = pth->cp;
+  GNUNET_CONTAINER_DLL_remove (cp->pth_head,
+                              cp->pth_tail,
+                              pth);
+  // FIXME: update 'cp' counters!
+  ret = pth->gmc (pth->gmc_cls, 
+                 0, NULL);
+  GNUNET_free (pth);  
+  return ret;
+}
+
+
+/**
+ * Function called if there has been a timeout trying to satisfy
+ * a transmission request.
+ *
+ * @param cls the 'struct GSF_PeerTransmitHandle' of the request 
+ * @param tc scheduler context
+ */
+static void
+peer_transmit_timeout (void *cls,
+                      const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct GSF_PeerTransmitHandle *pth = cls;
+  struct GSF_ConnectedPeer *cp;
+  
+  pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+  cp = pth->cp;
+  GNUNET_CONTAINER_DLL_remove (cp->pth_head,
+                              cp->pth_tail,
+                              pth);
+  // FIXME: update 'cp' counters!
+  pth->gmc (pth->gmc_cls, 
+           0, NULL);
+  GNUNET_free (pth);
+}
+
+
+/**
  * Transmit a message to the given peer as soon as possible.
  * If the peer disconnects before the transmission can happen,
  * the callback is invoked with a 'NULL' buffer.
@@ -166,17 +355,83 @@
                    GSF_GetMessageCallback gmc,
                    void *gmc_cls)
 {
-  // FIXME
-  return NULL;
+  struct GSF_ConnectedPeer *cp;
+  struct GSF_PeerTransmitHandle *pth;
+  struct GSF_PeerTransmitHandle *pos;
+  struct GSF_PeerTransmitHandle *prev;
+  struct GNUNET_PeerIdentity target;
+
+  cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
+                                         &peer->hashPubKey);
+  GNUNET_assert (NULL != cp);
+  pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle));
+  pth->transmission_request_start_time = GNUNET_TIME_absolute_now ();
+  pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
+  pth->gmc = gmc;
+  pth->gmc_cls = gmc_cls;
+  pth->size = size;
+  pth->is_query = is_query;
+  pth->priority = priority;
+  pth->cp = cp;
+  /* insertion sort (by priority, descending) */
+  prev = NULL;
+  pos = cp->pth_head;
+  while ( (pos != NULL) &&
+         (pos->priority > priority) )
+    {
+      prev = pos;
+      pos = pos->next;
+    }
+  if (prev == NULL)
+    GNUNET_CONTAINER_DLL_insert_head (cp->pth_head,
+                                     cp->pth_tail,
+                                     pth);
+  else
+    GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
+                                      cp->pth_tail,
+                                      prev,
+                                      pth);
+  GNUNET_PEER_resolve (cp->pid,
+                      &target);
+  pth->cth = GNUNET_CORE_notify_transmit_ready (core,
+                                               priority,
+                                               timeout,
+                                               &target,
+                                               size,
+                                               &peer_transmit_ready_cb,
+                                               pth);
+  /* pth->cth could be NULL here, that's OK, we'll try again
+     later... */
+  if (pth->cth == NULL)
+    pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
+                                                     &peer_transmit_timeout,
+                                                     pth);
+  return pth;
 }
 
 
 /**
  * Cancel an earlier request for transmission.
+ *
+ * @param pth request to cancel
  */
 void
 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
 {
+  struct GSF_PeerTransmitHandle *pth = cls;
+  struct GSF_ConnectedPeer *cp;
+
+  if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel (pth->timeout_task);
+      pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+    }
+  cp = pth->cp;
+  GNUNET_CONTAINER_DLL_remove (cp->pth_head,
+                              cp->pth_tail,
+                              pth);
+  // FIXME: update 'cp' counters!
+  GNUNET_free (pth);
 }
 
 
@@ -196,6 +451,7 @@
                              const struct GSF_LocalClient *initiator_client,
                              const struct GSF_ConnectedPeer *initiator_peer)
 {
+  // FIXME...
 }
 
 
@@ -218,6 +474,12 @@
                          struct GNUNET_TIME_Absolute timeout,
                          const struct GNUNET_TRANSPORT_ATS_Information *atsi)
 {
+  struct GSF_ConnectedPeer *cp;
+
+  cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
+                                         &peer->hashPubKey);
+  GNUNET_assert (NULL != cp);
+  update_atsi (cp, atsi);
 }
 
 
@@ -232,10 +494,61 @@
 GSF_peer_disconnect_handler_ (void *cls,
                              const struct GNUNET_PeerIdentity *peer)
 {
+  struct GSF_ConnectedPeer *cp;
+
+  cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
+                                         &peer->hashPubKey);
+  GNUNET_assert (NULL != cp);
+  GNUNET_CONTAINER_multihashmap_remove (cp_map,
+                                       &peer->hashPubKey,
+                                       cp);
+  // FIXME: more cleanup
+  GNUNET_free (cp);
 }
 
 
 /**
+ * Closure for 'call_iterator'.
+ */
+struct IterationContext
+{
+  /**
+   * Function to call on each entry.
+   */
+  GSF_ConnectedPeerIterator it;
+
+  /**
+   * Closure for 'it'.
+   */
+  void *it_cls;
+};
+
+
+/**
+ * Function that calls the callback for each peer.
+ *
+ * @param cls the 'struct IterationContext*'
+ * @param key identity of the peer
+ * @param value the 'struct GSF_ConnectedPeer*'
+ * @return GNUNET_YES to continue iteration
+ */
+static int
+call_iterator (void *cls,
+              const GNUNET_HashCode *key,
+              void *value)
+{
+  struct IterationContext *ic = cls;
+  struct GSF_ConnectedPeer *cp = value;
+  
+  ic->it (ic->it_cls,
+         (const struct GNUNET_PeerIdentity*) key,
+         cp,
+         &cp->ppd);
+  return GNUNET_YES;
+}
+
+
+/**
  * Iterate over all connected peers.
  *
  * @param it function to call for each peer
@@ -245,6 +558,13 @@
 GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
                              void *it_cls)
 {
+  struct IterationContext ic;
+
+  ic.it = it;
+  ic.it_cls = it_cls;
+  GNUNET_CONTAINER_multihashmap_iterate (cp_map,
+                                        &call_iterator,
+                                        &ic);
 }
 
 
@@ -280,5 +600,131 @@
 }
 
 
+/**
+ * Write host-trust information to a file - flush the buffer entry!
+ *
+ * @param cls closure, not used
+ * @param key host identity
+ * @param value the 'struct GSF_ConnectedPeer' to flush
+ * @return GNUNET_OK to continue iteration
+ */
+static int
+flush_trust (void *cls,
+            const GNUNET_HashCode *key,
+            void *value)
+{
+  struct GSF_ConnectedPeer *cp = value;
+  char *fn;
+  uint32_t trust;
+  struct GNUNET_PeerIdentity pid;
+
+  if (cp->trust == cp->disk_trust)
+    return GNUNET_OK;                     /* unchanged */
+  GNUNET_PEER_resolve (cp->pid,
+                      &pid);
+  fn = get_trust_filename (&pid);
+  if (cp->trust == 0)
+    {
+      if ((0 != UNLINK (fn)) && (errno != ENOENT))
+        GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
+                                  GNUNET_ERROR_TYPE_BULK, "unlink", fn);
+    }
+  else
+    {
+      trust = htonl (cp->trust);
+      if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
+                                                   sizeof(uint32_t),
+                                                   GNUNET_DISK_PERM_USER_READ 
| GNUNET_DISK_PERM_USER_WRITE
+                                                   | 
GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
+        cp->disk_trust = cp->trust;
+    }
+  GNUNET_free (fn);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Call this method periodically to flush trust information to disk.
+ *
+ * @param cls closure, not used
+ * @param tc task context, not used
+ */
+static void
+cron_flush_trust (void *cls,
+                 const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+
+  if (NULL == cp_map)
+    return;
+  GNUNET_CONTAINER_multihashmap_iterate (cp_map,
+                                        &flush_trust,
+                                        NULL);
+  if (NULL == tc)
+    return;
+  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+    return;
+  GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, 
+                               &cron_flush_trust, 
+                               NULL);
+}
+
+
+/**
+ * Initialize peer management subsystem.
+ *
+ * @param cfg configuration to use
+ */
+void
+GSF_connected_peer_init_ (struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+  cp_map = GNUNET_CONTAINER_multihashmap_create (128);
+  GNUNET_assert (GNUNET_OK ==
+                 GNUNET_CONFIGURATION_get_value_filename (cfg,
+                                                          "fs",
+                                                          "TRUST",
+                                                          &trustDirectory));
+  GNUNET_DISK_directory_create (trustDirectory);
+  GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
+                                     &cron_flush_trust, NULL);
+}
+
+
+/**
+ * Iterator to free peer entries.
+ *
+ * @param cls closure, unused
+ * @param key current key code
+ * @param value value in the hash map (peer entry)
+ * @return GNUNET_YES (we should continue to iterate)
+ */
+static int 
+clean_peer (void *cls,
+           const GNUNET_HashCode * key,
+           void *value)
+{
+  GSF_peer_disconnect_handler_ (NULL, 
+                               (const struct GNUNET_PeerIdentity*) key);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Shutdown peer management subsystem.
+ */
+void
+GSF_connected_peer_done_ ()
+{
+  cron_flush_trust (NULL, NULL);
+  GNUNET_CONTAINER_multihashmap_iterate (cp_peers,
+                                        &clean_peer,
+                                        NULL);
+  GNUNET_CONTAINER_multihashmap_destroy (cp_map);
+  cp_map = NULL;
+  GNUNET_free (trustDirectory);
+  trustDirectory = NULL;
+}
+
+
+
 #endif
 /* end of gnunet-service-fs_cp.h */

Modified: gnunet/src/fs/gnunet-service-fs_cp.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs_cp.h        2011-01-30 20:53:33 UTC (rev 
14287)
+++ gnunet/src/fs/gnunet-service-fs_cp.h        2011-01-30 23:23:13 UTC (rev 
14288)
@@ -186,6 +186,8 @@
 
 /**
  * Cancel an earlier request for transmission.
+ *
+ * @param pth request to cancel
  */
 void
 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth);
@@ -285,5 +287,21 @@
                             void *rc_cls);
 
 
+/**
+ * Initialize peer management subsystem.
+ *
+ * @param cfg configuration to use
+ */
+void
+GSF_connected_peer_init_ (struct GNUNET_CONFIGURATION_Handle *cfg);
+
+
+/**
+ * Shutdown peer management subsystem.
+ */
+void
+GSF_connected_peer_done_ (void);
+
+
 #endif
 /* end of gnunet-service-fs_cp.h */




reply via email to

[Prev in Thread] Current Thread [Next in Thread]