gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r26983 - gnunet/src/set


From: gnunet
Subject: [GNUnet-SVN] r26983 - gnunet/src/set
Date: Wed, 24 Apr 2013 14:05:33 +0200

Author: dold
Date: 2013-04-24 14:05:33 +0200 (Wed, 24 Apr 2013)
New Revision: 26983

Added:
   gnunet/src/set/gnunet-service-set.h
   gnunet/src/set/gnunet-service-set_union.c
   gnunet/src/set/set_protocol.h
Log:
forgot to commit new files


Added: gnunet/src/set/gnunet-service-set.h
===================================================================
--- gnunet/src/set/gnunet-service-set.h                         (rev 0)
+++ gnunet/src/set/gnunet-service-set.h 2013-04-24 12:05:33 UTC (rev 26983)
@@ -0,0 +1,293 @@
+/*
+      This file is part of GNUnet
+      (C) 2013 Christian Grothoff (and other contributing authors)
+
+      GNUnet is free software; you can redistribute it and/or modify
+      it under the terms of the GNU General Public License as published
+      by the Free Software Foundation; either version 2, or (at your
+      option) any later version.
+
+      GNUnet is distributed in the hope that it will be useful, but
+      WITHOUT ANY WARRANTY; without even the implied warranty of
+      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+      General Public License for more details.
+
+      You should have received a copy of the GNU General Public License
+      along with GNUnet; see the file COPYING.  If not, write to the
+      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+      Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @brief common stuff for the set service
+ */
+
+#ifndef GNUNET_SERVICE_SET_H_PRIVATE
+#define GNUNET_SERVICE_SET_H_PRIVATE
+
+#include "platform.h"
+#include "gnunet_common.h"
+#include "gnunet_protocols.h"
+#include "gnunet_applications.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_core_service.h"
+#include "gnunet_stream_lib.h"
+#include "gnunet_set_service.h"
+#include "set.h"
+#include "mq.h"
+
+
+/* FIXME: cfuchs */
+struct IntersectionState;
+
+
+/**
+ * Extra state required for set union.
+ */
+struct UnionState;
+
+
+/**
+ * A set that supports a specific operation
+ * with other peers.
+ */
+struct Set
+{
+  /**
+   * Client that owns the set.
+   * Only one client may own a set.
+   */
+  struct GNUNET_SERVER_Client *client;
+
+  /**
+   * Message queue for the client
+   */
+  struct GNUNET_MQ_MessageQueue *client_mq;
+
+  /**
+   * Type of operation supported for this set
+   */
+  uint32_t operation;
+
+  /**
+   * Sets are held in a doubly linked list.
+   */
+  struct Set *next;
+
+  /**
+   * Sets are held in a doubly linked list.
+   */
+  struct Set *prev;
+
+  /**
+   * Appropriate state for each type of
+   * operation.
+   */
+  union {
+    struct IntersectionState *i;
+    struct UnionState *u;
+  } extra;
+};
+
+
+/**
+ * State for an evaluate operation for a set that
+ * supports set union.
+ */
+struct UnionEvaluateOperation;
+
+
+/* FIXME: cfuchs */
+struct IntersectionEvaluateOperation
+{
+  /* FIXME: cfuchs */
+};
+
+
+/**
+ * State of evaluation a set operation with
+ * another peer
+ */
+struct EvaluateOperation
+{
+  /**
+   * Local set the operation is evaluated on
+   */
+  struct Set *set;
+
+  /**
+   * Peer with the remote set
+   */
+  struct GNUNET_PeerIdentity peer;
+
+  /**
+   * Application-specific identifier
+   */
+  struct GNUNET_HashCode app_id;
+
+  /**
+   * Context message, given to us
+   * by the client, may be NULL.
+   */
+  struct GNUNET_MessageHeader *context_msg;
+
+  /**
+   * Stream socket connected to the other peer
+   */
+  struct GNUNET_STREAM_Socket *socket;
+
+  /**
+   * Message queue for the peer on the other
+   * end
+   */
+  struct GNUNET_MQ_MessageQueue *mq;
+
+  /**
+   * Type of this operation
+   */
+  enum GNUNET_SET_OperationType operation;
+
+  /**
+   * GNUNET_YES if we started the operation,
+   * GNUNET_NO if the other peer started it.
+   */
+  int is_outgoing;
+
+  /**
+   * Request id, so we can use one client handle
+   * for multiple operations
+   */
+  uint32_t request_id;
+
+  union {
+    struct UnionEvaluateOperation *u;
+    struct IntersectionEvaluateOperation *i;
+  } extra;
+};
+
+
+struct Listener
+{
+  /**
+   * Listeners are held in a doubly linked list.
+   */
+  struct Listener *next;
+
+  /**
+   * Listeners are held in a doubly linked list.
+   */
+  struct Listener *prev;
+
+  /**
+   * Client that owns the set.
+   * Only one client may own a set.
+   */
+  struct GNUNET_SERVER_Client *client;
+
+  /**
+   * Message queue for the client
+   */
+  struct GNUNET_MQ_MessageQueue *client_mq;
+
+  /**
+   * Type of operation supported for this set
+   */
+  enum GNUNET_SET_OperationType operation;
+
+  /**
+   * Application id of intereset for this listener.
+   */
+  struct GNUNET_HashCode app_id;
+};
+
+
+/**
+ * Peer that has connected to us, but is not yet evaluating a set operation.
+ * Once the peer has sent a request, and the client has
+ * accepted or rejected it, this information will be deleted.
+ */
+struct Incoming
+{
+  /**
+   * Incoming peers are held in a linked list
+   */
+  struct Incoming *next;
+
+  /**
+   * Incoming peers are held in a linked list
+   */
+  struct Incoming *prev;
+
+  /**
+   * Identity of the peer that connected to us
+   */
+  struct GNUNET_PeerIdentity peer;
+
+  /**
+   * Socket connected to the peer
+   */
+  struct GNUNET_STREAM_Socket *socket;
+
+  /**
+   * Message queue for the peer
+   */
+  struct GNUNET_MQ_MessageQueue *mq;
+
+  /**
+   * App code, set once the peer has
+   * requested an operation
+   */
+  struct GNUNET_HashCode app_id;
+
+  /**
+   * Context message, set once the peer
+   * has requested an operation.
+   */
+  struct GNUNET_MessageHeader *context_msg;
+
+  /**
+   * Operation the other peer wants to do
+   */
+  enum GNUNET_SET_OperationType operation;
+
+  /**
+   * Request id associated with the
+   * request coming from this client
+   */
+  uint32_t request_id;
+};
+
+
+/**
+ * Configuration of the local peer
+ */
+extern const struct GNUNET_CONFIGURATION_Handle *configuration;
+
+
+/**
+ * Disconnect a client and free all resources
+ * that the client allocated (e.g. Sets or Listeners)
+ *
+ * @param client the client to disconnect
+ */
+void
+client_disconnect (struct GNUNET_SERVER_Client *client);
+
+
+struct Set *
+union_set_create ();
+
+
+void
+union_evaluate (struct EvaluateOperation *eo);
+
+
+void
+union_add (struct Set *set, struct ElementMessage *m);
+
+
+void
+union_accept (struct EvaluateOperation *eo, struct Incoming *incoming);
+
+
+#endif

Added: gnunet/src/set/gnunet-service-set_union.c
===================================================================
--- gnunet/src/set/gnunet-service-set_union.c                           (rev 0)
+++ gnunet/src/set/gnunet-service-set_union.c   2013-04-24 12:05:33 UTC (rev 
26983)
@@ -0,0 +1,648 @@
+/*
+      This file is part of GNUnet
+      (C) 2013 Christian Grothoff (and other contributing authors)
+
+      GNUnet is free software; you can redistribute it and/or modify
+      it under the terms of the GNU General Public License as published
+      by the Free Software Foundation; either version 2, or (at your
+      option) any later version.
+
+      GNUnet is distributed in the hope that it will be useful, but
+      WITHOUT ANY WARRANTY; without even the implied warranty of
+      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+      General Public License for more details.
+
+      You should have received a copy of the GNU General Public License
+      along with GNUnet; see the file COPYING.  If not, write to the
+      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+      Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file set/gnunet-service-set.c
+ * @brief two-peer set operations
+ * @author Florian Dold
+ */
+
+
+#include "gnunet-service-set.h"
+#include "set_protocol.h"
+#include "ibf.h"
+#include "strata_estimator.h"
+
+
+/**
+ * Number of IBFs in a strata estimator.
+ */
+#define SE_STRATA_COUNT 32
+/**
+ * Size of the IBFs in the strata estimator.
+ */
+#define SE_IBF_SIZE 80
+/**
+ * hash num parameter for the difference digests and strata estimators
+ */
+#define SE_IBF_HASH_NUM 3
+
+/**
+ * Number of buckets that can be transmitted in one message.
+ */
+#define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
+
+
+/**
+ * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
+ * Choose this value so that computing the IBF is still cheaper
+ * than transmitting all values.
+ */
+#define MAX_IBF_ORDER (16)
+
+
+enum UnionOperationState
+{
+  STATE_EXPECT_SE,
+  STATE_EXPECT_IBF,
+  STATE_EXPECT_IBF_CONT,
+  STATE_EXPECT_ELEMENTS,
+  STATE_EXPECT_ELEMENTS_AND_REQUESTS,
+  STATE_WAIT_SENT_DONE,
+  STATE_FINISHED
+};
+
+
+/**
+ * State of an evaluate operation
+ * with another peer.
+ */
+struct UnionEvaluateOperation
+{
+  /* last difference estimate */
+  unsigned int diff;
+
+  /**
+   * Number of ibf buckets received
+   */
+  unsigned int ibf_buckets_received;
+
+  /**
+   * Current salt in use, zero unless
+   * we detected a collision
+   */
+  uint8_t salt;
+
+  /**
+   * order of the ibf we receive
+   */
+  unsigned int ibf_order;
+
+  /**
+   * The ibf we currently receive
+   */
+  struct InvertibleBloomFilter *ibf_received;
+
+  struct StrataEstimator *se;
+
+  /**
+   * Current state of the operation
+   */
+  enum UnionOperationState state;
+};
+
+
+/**
+ * Element entry, stored in the hash maps from
+ * partial IBF keys to elements.
+ */
+struct ElementEntry
+{
+  /**
+   * The actual element
+   */
+  struct GNUNET_SET_Element *element;
+
+  /**
+   * Actual ibf key of the element entry
+   */
+  struct IBF_Key ibf_key;
+
+  /**
+   * Linked list, note that the next element
+   * has to have an ibf_key that is lexicographically
+   * equal or larger.
+   */
+  struct ElementEntry *next;
+
+  /**
+   * GNUNET_YES if the element was received from
+   * the remote peer, and the local peer did not previously
+   * have it
+   */
+  int remote;
+};
+
+
+/**
+ * Extra state required for efficient set union.
+ */
+struct UnionState
+{
+  /**
+   * Strate estimator of the set we currently have,
+   * used for estimation of the symmetric difference
+   */
+  struct StrataEstimator *se;
+
+  /**
+   * Array of IBFs, some of them pre-allocated
+   */
+  struct InvertibleBloomFilter **ibfs;
+
+  /**
+   * Maps the first 32 bits of the ibf-key to
+   * elements.
+   */
+  struct GNUNET_CONTAINER_MultiHashMap32 *elements;
+};
+
+
+static void
+send_operation_request (struct EvaluateOperation *eo)
+{
+  struct GNUNET_MQ_Message *mqm;
+  struct OperationRequestMessage *msg;
+
+  mqm = GNUNET_MQ_msg_concat (msg, eo->context_msg,
+                              GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST);
+  if (NULL == mqm)
+  {
+    /* the context message is too large */
+    client_disconnect (eo->set->client);
+    GNUNET_break (0);
+    return;
+  }
+  msg->operation = eo->operation;
+  msg->app_id = eo->app_id;
+  GNUNET_MQ_send (eo->mq, mqm);
+}
+
+
+/**
+ * Iterator to insert values into an ibf.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return GNUNET_YES if we should continue to
+ *         iterate,
+ *         GNUNET_NO if not.
+ */
+static int
+ibf_insert_iterator (void *cls,
+                     uint32_t key,
+                     void *value)
+{
+  struct InvertibleBloomFilter *ibf = cls;
+  struct ElementEntry *e = value;
+  struct IBF_Key ibf_key;
+
+  GNUNET_assert (NULL != e);
+  ibf_key = e->ibf_key;
+  ibf_insert (ibf, ibf_key);
+  e = e->next;
+
+  while (NULL != e)
+  {
+    /* only insert keys we haven't seen yet */
+    if (0 != memcmp (&e->ibf_key, &ibf_key, sizeof ibf_key))
+    {
+      ibf_key = e->ibf_key;
+      ibf_insert (ibf, ibf_key);
+    }
+    e = e->next; 
+  }
+
+  return GNUNET_YES;
+}
+
+
+/**
+ * Create and populate an IBF for the specified peer,
+ * if it does not already exist.
+ *
+ * @param cpi peer to create the ibf for
+ */
+static struct InvertibleBloomFilter *
+prepare_ibf (struct EvaluateOperation *eo, uint16_t order)
+{
+  struct UnionState *us = eo->set->extra.u;
+
+  GNUNET_assert (order <= MAX_IBF_ORDER);
+  if (NULL == us->ibfs)
+    us->ibfs = GNUNET_malloc (MAX_IBF_ORDER * sizeof (struct 
InvertibleBloomFilter *));
+  if (NULL == us->ibfs[order])
+  {
+    us->ibfs[order] = ibf_create (1 << order, SE_IBF_HASH_NUM);
+    GNUNET_CONTAINER_multihashmap32_iterate (us->elements, 
ibf_insert_iterator, us->ibfs[order]);
+  }
+  return us->ibfs[order];
+}
+
+
+/**
+ * Send an ibf of appropriate size.
+ *
+ * @param cpi the peer
+ */
+static void
+send_ibf (struct EvaluateOperation *eo, uint16_t ibf_order)
+{
+  unsigned int buckets_sent = 0;
+  struct InvertibleBloomFilter *ibf;
+
+  ibf = prepare_ibf (eo, ibf_order);
+
+  while (buckets_sent < (1 << ibf_order))
+  {
+    unsigned int buckets_in_message;
+    struct GNUNET_MQ_Message *mqm;
+    struct IBFMessage *msg;
+
+    buckets_in_message = (1 << ibf_order) - buckets_sent;
+    /* limit to maximum */
+    if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
+      buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
+
+    mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
+                               GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
+    msg->order = htons (ibf_order);
+    msg->offset = htons (buckets_sent);
+    ibf_write_slice (ibf, buckets_sent,
+                     buckets_in_message, &msg[1]);
+    buckets_sent += buckets_in_message;
+    GNUNET_MQ_send (eo->mq, mqm);
+  }
+
+  eo->extra.u->state = STATE_EXPECT_ELEMENTS_AND_REQUESTS;
+}
+
+
+/**
+ * Send a strata estimator.
+ *
+ * @param cpi the peer
+ */
+static void
+send_strata_estimator (struct EvaluateOperation *eo)
+{
+  struct GNUNET_MQ_Message *mqm;
+  struct GNUNET_MessageHeader *strata_msg;
+
+  mqm = GNUNET_MQ_msg_header_extra (strata_msg,
+                                    SE_STRATA_COUNT * IBF_BUCKET_SIZE * 
SE_IBF_SIZE,
+                                    GNUNET_MESSAGE_TYPE_SET_P2P_SE);
+  strata_estimator_write (eo->set->extra.u->se, &strata_msg[1]);
+  GNUNET_MQ_send (eo->mq, mqm);
+
+  eo->extra.u->state = STATE_EXPECT_IBF;
+}
+
+
+static void
+handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
+{
+  struct EvaluateOperation *eo = cls;
+  int ibf_order;
+  int diff;
+
+  if (eo->extra.u->state != STATE_EXPECT_SE)
+  {
+    /* FIXME: handle */
+    GNUNET_break (0);
+    return;
+  }
+  GNUNET_assert (NULL == eo->extra.u->se);
+  eo->extra.u->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, 
SE_IBF_HASH_NUM);
+  strata_estimator_read (&mh[1], eo->extra.u->se);
+  GNUNET_assert (NULL != eo->set->extra.u->se);
+  diff = strata_estimator_difference (eo->set->extra.u->se, eo->extra.u->se);
+  /* minimum order */
+  ibf_order = 2;
+  while ((1<<ibf_order) < (2 * diff))
+    ibf_order++;
+  if (ibf_order > MAX_IBF_ORDER)
+    ibf_order = MAX_IBF_ORDER;
+  send_ibf (eo, ibf_order);
+}
+
+
+/**
+ * FIXME
+ *
+ * @param
+ */
+static void
+decode (struct EvaluateOperation *eo)
+{
+  struct IBF_Key key;
+  int side;
+  struct InvertibleBloomFilter *diff_ibf;
+
+  GNUNET_assert (STATE_EXPECT_ELEMENTS == eo->extra.u->state);
+
+  diff_ibf = ibf_dup (prepare_ibf (eo, eo->extra.u->ibf_order));
+  ibf_subtract (diff_ibf, eo->extra.u->ibf_received);
+
+  while (1)
+  {
+    int res;
+
+    res = ibf_decode (diff_ibf, &side, &key);
+    if (GNUNET_SYSERR == res)
+    {
+      /* decoding failed, we tell the other peer by sending our ibf with a 
larger order */
+      GNUNET_assert (0);
+      return;
+    }
+    if (GNUNET_NO == res)
+    {
+      struct GNUNET_MQ_Message *mqm;
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending 
DONE\n");
+      mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
+      GNUNET_MQ_send (eo->mq, mqm);
+      return;
+    }
+    if (1 == side)
+    {
+      struct ElementEntry *e;
+      /* we have the element(s), send it to the other peer */
+      e = GNUNET_CONTAINER_multihashmap32_get (eo->set->extra.u->elements, 
(uint32_t) key.key_val);
+      if (NULL == e)
+      {
+        /* FIXME */
+        GNUNET_assert (0);
+        return;
+      }
+      while (NULL != e)
+      {
+        /* FIXME: send element */
+        e = e->next;
+      }
+    }
+    else
+    {
+      struct GNUNET_MQ_Message *mqm;
+      struct GNUNET_MessageHeader *msg;
+
+      /* FIXME: before sending the request, check if we may just have the 
element */
+      /* FIXME: merge multiple requests */
+      mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), 
GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
+      *(struct IBF_Key *) &msg[1] = key;
+      GNUNET_MQ_send (eo->mq, mqm);
+    }
+  }
+}
+
+
+
+static void
+handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
+{
+  struct EvaluateOperation *eo = cls;
+  struct UnionEvaluateOperation *ueo = eo->extra.u;
+  struct IBFMessage *msg = (struct IBFMessage *) mh;
+  unsigned int buckets_in_message;
+
+  if (ueo->state == STATE_EXPECT_ELEMENTS_AND_REQUESTS)
+  {
+    /* check that the ibf is a new one / first part */
+    /* clear outgoing messages */
+    GNUNET_assert (0);
+  }
+  else if (ueo->state == STATE_EXPECT_IBF)
+  {
+    ueo->state = STATE_EXPECT_IBF_CONT;
+    ueo->ibf_order = msg->order;
+    GNUNET_assert (NULL == ueo->ibf_received);
+    ueo->ibf_received = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
+    if (ntohs (msg->offset) != 0)
+    {
+      /* FIXME: handle */
+      GNUNET_assert (0);
+    }
+  }
+  else if (ueo->state == STATE_EXPECT_IBF_CONT)
+  {
+    if ( (ntohs (msg->offset) != ueo->ibf_buckets_received) ||
+         (msg->order != ueo->ibf_order) )
+    {
+      /* FIXME: handle */
+      GNUNET_assert (0);
+    }
+  }
+
+  buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / 
IBF_BUCKET_SIZE;
+
+  if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * 
IBF_BUCKET_SIZE)
+  {
+    /* FIXME: handle, message was malformed */
+    GNUNET_assert (0);
+  }
+
+  ibf_read_slice (&msg[1], ueo->ibf_buckets_received, buckets_in_message, 
ueo->ibf_received);
+  ueo->ibf_buckets_received += buckets_in_message;
+
+  if (ueo->ibf_buckets_received == (1<<ueo->ibf_order))
+  {
+    ueo->state = STATE_EXPECT_ELEMENTS;
+    decode (eo);
+  }
+}
+
+
+static void
+handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
+{
+  struct EvaluateOperation *eo = cls;
+
+  if ( (eo->extra.u->state != STATE_EXPECT_ELEMENTS) &&
+       (eo->extra.u->state != STATE_EXPECT_ELEMENTS_AND_REQUESTS) )
+  {
+    /* FIXME: handle */
+    GNUNET_break (0);
+    return;
+  }
+}
+
+
+static void
+handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
+{
+  struct EvaluateOperation *eo = cls;
+
+  /* look up elements and send them */
+  if (eo->extra.u->state != STATE_EXPECT_ELEMENTS_AND_REQUESTS)
+  {
+    /* FIXME: handle */
+    GNUNET_break (0);
+    return;
+  }
+}
+
+
+static void
+handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
+{
+  GNUNET_break (0);
+}
+
+
+static const struct GNUNET_MQ_Handler union_handlers[] = {
+  {handle_p2p_elements, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS},
+  {handle_p2p_strata_estimator, GNUNET_MESSAGE_TYPE_SET_P2P_SE},
+  {handle_p2p_ibf, GNUNET_MESSAGE_TYPE_SET_P2P_IBF},
+  {handle_p2p_element_requests, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS},
+  {handle_p2p_done, GNUNET_MESSAGE_TYPE_SET_P2P_DONE},
+  GNUNET_MQ_HANDLERS_END
+};
+
+
+/**
+ * Functions of this type will be called when a stream is established
+ *
+ * @param cls the closure from GNUNET_STREAM_open
+ * @param socket socket to use to communicate with the other side (read/write)
+ */
+static void
+stream_open_cb (void *cls,
+                struct GNUNET_STREAM_Socket *socket)
+{
+  struct EvaluateOperation *eo = cls;
+
+  GNUNET_assert (NULL == eo->mq);
+  GNUNET_assert (socket == eo->socket);
+
+  eo->mq = GNUNET_MQ_queue_for_stream_socket (eo->socket, union_handlers, eo);
+  send_operation_request (eo);
+}
+       
+
+void
+union_evaluate (struct EvaluateOperation *eo)
+{
+  GNUNET_assert (GNUNET_SET_OPERATION_UNION == eo->set->operation);
+  eo->socket = 
+      GNUNET_STREAM_open (configuration, &eo->peer, 
GNUNET_APPLICATION_TYPE_SET,
+                          stream_open_cb, GNUNET_STREAM_OPTION_END);
+}
+
+
+static void
+insert_ibf_key_unchecked (struct UnionState *us, struct IBF_Key ibf_key)
+{
+  int i;
+
+  strata_estimator_insert (us->se, ibf_key);
+  for (i = 0; i <= MAX_IBF_ORDER; i++)
+  {
+    if (NULL == us->ibfs)
+      break;
+    if (NULL == us->ibfs[i])
+      continue;
+    ibf_insert (us->ibfs[i], ibf_key);
+  }
+}
+
+
+/**
+ * Insert an element into the consensus set of the specified session.
+ * The element will not be copied, and freed when destroying the session.
+ *
+ * @param session session for new element
+ * @param element element to insert
+ */
+static void
+insert_element (struct Set *set, struct GNUNET_SET_Element *element)
+{
+  struct UnionState *us = set->extra.u;
+  struct GNUNET_HashCode hash;
+  struct ElementEntry *e;
+  struct ElementEntry *e_old;
+
+  e = GNUNET_new (struct ElementEntry);
+  e->element = element;
+  GNUNET_CRYPTO_hash (e->element->data, e->element->size, &hash);
+  e->ibf_key = ibf_key_from_hashcode (&hash);
+
+  e_old = GNUNET_CONTAINER_multihashmap32_get (us->elements, (uint32_t) 
e->ibf_key.key_val);
+  if (NULL == e_old)
+  {
+    GNUNET_CONTAINER_multihashmap32_put (us->elements, (uint32_t) 
e->ibf_key.key_val, e,
+                                         
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+    return;
+  }
+
+  while (NULL != e_old)
+  {
+    int cmp = memcmp (&e->ibf_key, &e_old->ibf_key, sizeof (struct IBF_Key));
+    if (cmp < 0)
+    {
+      if (NULL == e_old->next)
+      {
+        e_old->next = e;
+        insert_ibf_key_unchecked (us, e->ibf_key);
+        return;
+      }
+      e_old = e_old->next;
+    }
+    else if (cmp == 0)
+    {
+      e->next = e_old->next;
+      e_old->next = e;
+      return;
+    }
+    else
+    {
+      e->next = e_old;
+      insert_ibf_key_unchecked (us, e->ibf_key);
+      return;
+    }
+  } 
+}
+
+
+void
+union_accept (struct EvaluateOperation *eo, struct Incoming *incoming)
+{
+  GNUNET_assert (NULL != incoming->mq); 
+  eo->mq = incoming->mq;
+  GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo);
+
+  send_strata_estimator (eo);
+}
+
+
+struct Set *
+union_set_create ()
+{
+  struct Set *set;
+  set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState));
+  set->extra.u = (struct UnionState *) &set[1];
+  set->operation = GNUNET_SET_OPERATION_UNION;
+  set->extra.u->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, 
SE_IBF_HASH_NUM);
+  return set;
+}
+
+
+void
+union_add (struct Set *set, struct ElementMessage *m)
+{
+  struct GNUNET_SET_Element *element;
+  uint16_t element_size;
+  element_size = ntohs (m->header.size) - sizeof *m;
+  element = GNUNET_malloc (sizeof *element + element_size);
+  element->size = element_size;
+  element->data = &element[1];
+  memcpy (element->data, &m[1], element_size);
+  insert_element (set, element);
+}
+

Added: gnunet/src/set/set_protocol.h
===================================================================
--- gnunet/src/set/set_protocol.h                               (rev 0)
+++ gnunet/src/set/set_protocol.h       2013-04-24 12:05:33 UTC (rev 26983)
@@ -0,0 +1,98 @@
+/*
+     This file is part of GNUnet.
+     (C) 2013 Christian Grothoff (and other contributing authors)
+
+     GNUnet is free software; you can redistribute it and/or modify
+     it under the terms of the GNU General Public License as published
+     by the Free Software Foundation; either version 2, or (at your
+     option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     General Public License for more details.
+
+     You should have received a copy of the GNU General Public License
+     along with GNUnet; see the file COPYING.  If not, write to the
+     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+     Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @author Florian Dold
+ * @file set/set_protocol.h
+ * @brief Peer-to-Peer messages for gnunet set
+ */
+#ifndef SET_PROTOCOL_H
+#define SET_PROTOCOL_H
+
+#include "platform.h"
+#include "gnunet_common.h"
+
+
+GNUNET_NETWORK_STRUCT_BEGIN
+
+struct OperationRequestMessage
+{
+  /**
+   * Type: GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Operation to request, values from 'enum GNUNET_SET_OperationType'
+   */
+  uint32_t operation;
+
+  /**
+   * Application-specific identifier of the request.
+   */
+  struct GNUNET_HashCode app_id;
+
+  /* rest: optional message */
+};
+
+struct ElementRequestMessage
+{
+  /**
+   * Type: GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Salt the keys in the body use
+   */
+  uint8_t salt;
+};
+
+
+struct IBFMessage
+{
+  /**
+   * Type: GNUNET_MESSAGE_TYPE_SET_P2P_IBF
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Order of the whole ibf, where
+   * num_buckets = 2^order
+   */
+  uint8_t order;
+
+  /**
+   * Salt used when hashing elements for this IBF.
+   */
+  uint8_t salt;
+
+  /**
+   * Offset of the strata in the rest of the message
+   */
+  uint16_t offset GNUNET_PACKED;
+
+  /* rest: strata */
+};
+
+
+GNUNET_NETWORK_STRUCT_END
+
+#endif




reply via email to

[Prev in Thread] Current Thread [Next in Thread]