gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r28950 - in gnunet/src: include scalarproduct


From: gnunet
Subject: [GNUnet-SVN] r28950 - in gnunet/src: include scalarproduct
Date: Mon, 2 Sep 2013 16:46:52 +0200

Author: cfuchs
Date: 2013-09-02 16:46:52 +0200 (Mon, 02 Sep 2013)
New Revision: 28950

Modified:
   gnunet/src/include/gnunet_scalarproduct_service.h
   gnunet/src/scalarproduct/scalarproduct_api.c
Log:
removed much of the excell logics in the scalar product API
finished the the alice/bob API initiation functions in the SP API
merged structes in SP API
reorganized SP bookkeeping of computations



Modified: gnunet/src/include/gnunet_scalarproduct_service.h
===================================================================
--- gnunet/src/include/gnunet_scalarproduct_service.h   2013-09-02 14:25:54 UTC 
(rev 28949)
+++ gnunet/src/include/gnunet_scalarproduct_service.h   2013-09-02 14:46:52 UTC 
(rev 28950)
@@ -50,68 +50,8 @@
   GNUNET_SCALARPRODUCT_Status_ServiceDisconnected
 };
 
-struct GNUNET_SCALARPRODUCT_Handle
-{
-  /**
-   * Our configuration.
-   */
-  const struct GNUNET_CONFIGURATION_Handle *cfg;
+struct GNUNET_SCALARPRODUCT_Handle;
 
-  /**
-   * Current connection to the scalarproduct service.
-   */
-  struct GNUNET_CLIENT_Connection *client;
-
-  /**
-   * Handle for statistics.
-   */
-  struct GNUNET_STATISTICS_Handle *stats;
-
-  /**
-   * Current transmit handle.
-   */
-  struct GNUNET_CLIENT_TransmitHandle *th;
-  
-  /**
-   * Handle to the master context.
-   */
-  struct GNUNET_SCALARPRODUCT_Handle *h;
-  
-  /**
-   * The shared session key identifying this computation
-   */
-  struct GNUNET_HashCode * key;
-  
-  /**
-   * The message to be transmitted
-   */
-  void * msg;
-
-  union
-  {
-    /**
-     * Function to call after transmission of the request.
-     */
-    GNUNET_SCALARPRODUCT_ContinuationWithStatus cont_status;
-
-    /**
-     * Function to call after transmission of the request.
-     */
-    GNUNET_SCALARPRODUCT_DatumProcessor cont_datum;
-  };
-
-  /**
-   * Closure for 'cont'.
-   */
-  void *cont_cls;
-
-  /**
-   * Response Processor for response from the service. This function calls the
-   * continuation function provided by the client.
-   */
-  GNUNET_SCALARPRODUCT_ResponseMessageHandler response_proc;
-};
-
 typedef void (*GNUNET_SCALARPRODUCT_ResponseMessageHandler) (void *cls,
                                                              const struct 
GNUNET_MessageHeader *msg,
                                                              enum 
GNUNET_SCALARPRODUCT_ResponseStatus status);

Modified: gnunet/src/scalarproduct/scalarproduct_api.c
===================================================================
--- gnunet/src/scalarproduct/scalarproduct_api.c        2013-09-02 14:25:54 UTC 
(rev 28949)
+++ gnunet/src/scalarproduct/scalarproduct_api.c        2013-09-02 14:46:52 UTC 
(rev 28950)
@@ -41,24 +41,44 @@
 /**
  * Entry in the request queue per client
  */
-struct GNUNET_SCALARPRODUCT_QueueEntry
+struct GNUNET_SCALARPRODUCT_ComputationHandle
 {
   /**
    * This is a linked list.
    */
-  struct GNUNET_SCALARPRODUCT_QueueEntry *next;
+  struct GNUNET_SCALARPRODUCT_ComputationHandle *next;
 
   /**
    * This is a linked list.
    */
-  struct GNUNET_SCALARPRODUCT_QueueEntry *prev;
+  struct GNUNET_SCALARPRODUCT_ComputationHandle *prev;
+  
+  /**
+   * Our configuration.
+   */
+  const struct GNUNET_CONFIGURATION_Handle *cfg;
 
   /**
-   * Handle to the master context.
+   * Current connection to the scalarproduct service.
    */
-  struct GNUNET_SCALARPRODUCT_Handle *h;
+  struct GNUNET_CLIENT_Connection *client;
 
   /**
+   * Handle for statistics.
+   */
+  struct GNUNET_STATISTICS_Handle *stats;
+
+  /**
+   * The shared session key identifying this computation
+   */
+  struct GNUNET_HashCode * key;
+    
+  /**
+   * Current transmit handle.
+   */
+  struct GNUNET_CLIENT_TransmitHandle *th;
+
+  /**
    * Size of the message
    */
   uint16_t message_size;
@@ -66,7 +86,7 @@
   /**
    * Message to be sent to the scalarproduct service
    */
-  struct GNUNET_SCALARPRODUCT_client_request* msg;
+  struct GNUNET_SCALARPRODUCT_client_request * msg;
 
   union
   {
@@ -87,14 +107,6 @@
   void *cont_cls;
 
   /**
-   * Has this message been transmitted to the service?
-   * Only ever GNUNET_YES for the head of the queue.
-   * Note that the overall struct should end at a
-   * multiple of 64 bits.
-   */
-  int16_t was_transmitted;
-
-  /**
    * Response Processor for response from the service. This function calls the
    * continuation function provided by the client.
    */
@@ -102,35 +114,20 @@
 };
 
 /**************************************************************
- ***  Function Declarations                          **********
+ ***  Global Variables                               **********
  **************************************************************/
-
 /**
- * Creates a new entry at the tail of the DLL
- * 
- * @param h handle to the master context
- * 
- * @return pointer to the entry
+ * Head of the active sessions queue
  */
-static struct GNUNET_SCALARPRODUCT_QueueEntry *
-make_queue_entry (struct GNUNET_SCALARPRODUCT_Handle *h);
-
+struct GNUNET_SCALARPRODUCT_ComputationHandle *head;
 /**
- * Removes the head entry from the queue
- * 
- * @param h Handle to the master context
+ * Tail of the active sessions queue
  */
-static struct GNUNET_SCALARPRODUCT_QueueEntry *
-free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h);
+struct GNUNET_SCALARPRODUCT_ComputationHandle *tail;
 
-/**
- * Triggered when timeout occurs for a request in queue
- * 
- * @param cls The pointer to the QueueEntry
- * @param tc Task Context
- */
-static void
-timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+/**************************************************************
+ ***  Function Declarations                          **********
+ **************************************************************/
 
 /**
  * Called when a response is received from the service. After basic check
@@ -155,121 +152,11 @@
 static size_t transmit_request (void *cls, size_t size,
                                 void *buf);
 
-/**
- * Issues transmit request for the new entries in the queue
- * 
- * @param h handle to the master context
- */
-static void
-process_queue (struct GNUNET_SCALARPRODUCT_Handle *h);
-
 /**************************************************************
  ***  Static Function Declarations                   **********
  **************************************************************/
 
-
 /**
- * Creates a new entry at the tail of the DLL
- * 
- * @param h handle to the master context
- * 
- * @return pointer to the entry
- */
-static struct GNUNET_SCALARPRODUCT_QueueEntry *
-make_queue_entry (struct GNUNET_SCALARPRODUCT_Handle *h)
-{
-  struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
-
-  qe = GNUNET_new (struct GNUNET_SCALARPRODUCT_QueueEntry);
-
-  // if queue empty
-  if (NULL == h->queue_head && NULL == h->queue_tail)
-    {
-      qe->next = NULL;
-      qe->prev = NULL;
-      h->queue_head = qe;
-      h->queue_tail = qe;
-    }
-  else
-    {
-      qe->prev = h->queue_tail;
-      h->queue_tail->next = qe;
-      h->queue_tail = qe;
-    }
-
-  return qe;
-}
-
-
-/**
- * Removes the head entry from the queue
- * 
- * @param h Handle to the master context
- */
-static struct GNUNET_SCALARPRODUCT_QueueEntry *
-free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h)
-{
-  struct GNUNET_SCALARPRODUCT_QueueEntry * qe = NULL;
-
-  GNUNET_assert (NULL != h);
-  if (NULL == h->queue_head && NULL == h->queue_tail)
-    {
-      // The queue is empty. Just return.
-      qe = NULL;
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue was empty when 
free_queue_head_entry was called.\n");
-    }
-  else if (h->queue_head == h->queue_tail) //only one entry
-    {
-      qe = h->queue_head;
-      qe->next = NULL;
-      qe->prev = NULL;
-      h->queue_head = NULL;
-      h->queue_tail = NULL;
-    }
-  else
-    {
-      qe = h->queue_head;
-      h->queue_head = h->queue_head->next;
-      h->queue_head->prev = NULL;
-      qe->next = NULL;
-      qe->prev = NULL;
-    }
-  return qe;
-}
-
-
-/**
- * Triggered when timeout occurs for a request in queue
- * 
- * @param cls The pointer to the QueueEntry
- * @param tc Task Context
- */
-static void
-timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct GNUNET_SCALARPRODUCT_QueueEntry * qe = cls;
-
-  // Update Statistics
-  GNUNET_STATISTICS_update (qe->h->stats,
-                            gettext_noop ("# queue entry timeouts"), 1,
-                            GNUNET_NO);
-
-  // Clear the timeout_task
-  qe->timeout_task = GNUNET_SCHEDULER_NO_TASK;
-
-  // transmit_request is supposed to cancel timeout task.
-  // If message was not transmitted, there is definitely an error.
-  GNUNET_assert (GNUNET_NO == qe->was_transmitted);
-
-  LOG (GNUNET_ERROR_TYPE_INFO, "Timeout of request in datastore queue\n");
-
-  // remove the queue_entry for the queue
-  GNUNET_CONTAINER_DLL_remove (qe->h->queue_head, qe->h->queue_tail, qe);
-  qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Timeout);
-}
-
-
-/**
  * Handles the RESULT received in reply of prepare_response from the 
  * service
  * 
@@ -281,7 +168,7 @@
                         const struct GNUNET_MessageHeader *msg,
                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
 {
-  struct GNUNET_SCALARPRODUCT_QueueEntry *qe = cls;
+  struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
 
   GNUNET_assert (qe != NULL);
 
@@ -302,7 +189,7 @@
                         const struct GNUNET_MessageHeader *msg,
                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
 {
-  struct GNUNET_SCALARPRODUCT_QueueEntry *qe = cls;
+  struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
 
   GNUNET_assert (qe != NULL);
 
@@ -328,8 +215,8 @@
 static void
 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
 {
-  struct GNUNET_SCALARPRODUCT_Handle *h = cls;
-  struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
+  struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls;
+  struct GNUNET_SCALARPRODUCT_ComputationHandle *qe;
   int16_t was_transmitted;
   struct GNUNET_SCALARPRODUCT_client_response *message =
           (struct GNUNET_SCALARPRODUCT_client_response *) msg;
@@ -414,36 +301,18 @@
 transmit_request (void *cls, size_t size,
                   void *buf)
 {
-  struct GNUNET_SCALARPRODUCT_Handle *h = cls;
-  struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
+  struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
   size_t msize;
   
-  if (NULL == (qe = h->queue_head))
-    {
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue head is NULL!\n\n");
-      return 0;
-    }
-
-  GNUNET_SCHEDULER_cancel (qe->timeout_task);
-  qe->timeout_task = GNUNET_SCHEDULER_NO_TASK;
-
-  h->th = NULL;
-  if (NULL == (qe = h->queue_head))
-    return 0; /* no entry in queue */
   if (buf == NULL)
     {
       LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to 
SCALARPRODUCT.\n");
-      GNUNET_STATISTICS_update (h->stats,
+      GNUNET_STATISTICS_update (qe->stats,
                                 gettext_noop ("# transmission request 
failures"),
                                 1, GNUNET_NO);
-      GNUNET_SCALARPRODUCT_disconnect (h);
+      GNUNET_SCALARPRODUCT_disconnect (qe);
       return 0;
     }
-  if (size < (msize = qe->message_size))
-    {
-      process_queue (h);
-      return 0;
-    }
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to 
SCALARPRODUCT\n",
        msize);
 
@@ -451,8 +320,7 @@
   GNUNET_free (qe->msg);
   qe->was_transmitted = GNUNET_YES;
 
-  GNUNET_assert (GNUNET_NO == h->in_receive);
-  h->in_receive = GNUNET_YES;
+  qe->th = NULL;
 
   GNUNET_CLIENT_receive (h->client, &receive_cb, h,
                          GNUNET_TIME_UNIT_FOREVER_REL);
@@ -466,61 +334,6 @@
 }
 
 
-/**
- * Issues transmit request for the new entries in the queue
- * 
- * @param h handle to the master context
- */
-static void
-process_queue (struct GNUNET_SCALARPRODUCT_Handle *h)
-{
-  struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
-  
-  if (NULL == (qe = h->queue_head))
-    {
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n");
-      return; /* no entry in queue */
-    }
-  if (qe->was_transmitted == GNUNET_YES)
-    {
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n");
-      return; /* waiting for replies */
-    }
-  if (h->th != NULL)
-    {
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n");
-      return; /* request pending */
-    }
-  if (h->client == NULL)
-    {
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n");
-      return; /* waiting for reconnect */
-    }
-  if (GNUNET_YES == h->in_receive)
-    {
-      /* wait for response to previous query */
-      return;
-    }
-
-  h->th =
-          GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
-                                               GNUNET_TIME_UNIT_FOREVER_REL,
-                                               GNUNET_YES,
-                                               &transmit_request, h);
-
-  if (h->th == NULL)
-    {
-      LOG (GNUNET_ERROR_TYPE_ERROR,
-           _ ("Failed to send a message to the scalarproduct service\n"));
-      return;
-    }
-
-  GNUNET_assert (GNUNET_NO == h->in_receive);
-  GNUNET_break (NULL != h->th);
-}
-
-
-
 /**************************************************************
  ***  API                                            **********
  **************************************************************/
@@ -536,7 +349,7 @@
  * @param cont Callback function
  * @param cont_cls Closure for the callback function
  */
-struct GNUNET_SCALARPRODUCT_Handle *
+struct GNUNET_SCALARPRODUCT_ComputationHandle *
 GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg,
                                const struct GNUNET_HashCode * key,
                                const int32_t * elements,
@@ -544,7 +357,7 @@
                                GNUNET_SCALARPRODUCT_ContinuationWithStatus 
cont,
                                void *cont_cls)
 {
-  struct GNUNET_SCALARPRODUCT_Handle *h;
+  struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
   struct GNUNET_SCALARPRODUCT_client_request *msg;
   int32_t * vector;
   uint16_t size;
@@ -556,7 +369,7 @@
   GNUNET_assert(element_count > 1);
   GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct 
GNUNET_SCALARPRODUCT_client_request)
                                                    + element_count * sizeof 
(int32_t));
-  h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle);
+  h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
   h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
   if (!h->client)
     {
@@ -595,7 +408,6 @@
 
   memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
   
-  
   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
                                                GNUNET_TIME_UNIT_FOREVER_REL,
                                                GNUNET_YES, // retry is OK in 
the initial stage
@@ -610,6 +422,7 @@
       GNUNET_free(h);
       return NULL;
     }
+  GNUNET_CONTAINER_DLL_insert (head, tail, h);
   return h;
 }
 
@@ -627,7 +440,7 @@
  * @param cont Callback function
  * @param cont_cls Closure for the callback function
  */
-struct GNUNET_SCALARPRODUCT_Handle *
+struct GNUNET_SCALARPRODUCT_ComputationHandle *
 GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg,
                               const struct GNUNET_HashCode * key,
                               const struct GNUNET_PeerIdentity *peer,
@@ -638,38 +451,39 @@
                               GNUNET_SCALARPRODUCT_DatumProcessor cont,
                               void *cont_cls)
 {
-  struct GNUNET_CLIENT_Connection *client;
-  struct GNUNET_SCALARPRODUCT_Handle *h;
+  struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
   struct GNUNET_SCALARPRODUCT_client_request *msg;
   int32_t * vector;
   uint16_t size;
   uint64_t i;
   
-  GNUNET_assert(key);
-  GNUNET_assert(peer);
-  GNUNET_assert(elements);
-  GNUNET_assert(mask);
-  GNUNET_assert(cont);
-  GNUNET_assert(element_count > 1);
-  GNUNET_assert(mask_bytes != 0);
   GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct 
GNUNET_SCALARPRODUCT_client_request)
                                                    + element_count * sizeof 
(int32_t)
                                                    + mask_length);
-  client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
-
-  if (!client)
+  
+  h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
+  h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
+  if (!h->client)
     {
       LOG (GNUNET_ERROR_TYPE_ERROR,
            _ ("Failed to connect to the scalarproduct service\n"));
+      GNUNET_free(h);
       return NULL;
     }
+  h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
+  if (!h->th){
+      LOG (GNUNET_ERROR_TYPE_ERROR,
+           _("Failed to send a message to the statistics service\n"));
+      GNUNET_CLIENT_disconnect(h->client);
+      GNUNET_free(h);
+      return NULL;
+  }
+  
   size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * 
sizeof (int32_t) + mask_length;
   
-  h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle);
   h->cont_datum = cont;
   h->cont_cls = cont_cls;
   h->response_proc = &process_status_message;
-  h->client = client;
   h->cfg = cfg;
   h->msg = GNUNET_malloc (size);
   memcpy (&h->key, key, sizeof (struct GNUNET_HashCode));
@@ -689,17 +503,21 @@
   memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
   memcpy (&vector[element_count], mask, mask_length);
   
-  h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
                                                GNUNET_TIME_UNIT_FOREVER_REL,
                                                GNUNET_YES, // retry is OK in 
the initial stage
                                                &transmit_request, h);
-  if ( !h->th)
+  if (!h->th)
     {
       LOG (GNUNET_ERROR_TYPE_ERROR,
            _ ("Failed to send a message to the scalarproduct service\n"));
+      GNUNET_STATISTICS_destroy(h->GNUNET_YES);
+      GNUNET_CLIENT_disconnect(h->client);
+      GNUNET_free(h->msg);
+      GNUNET_free(h);
       return NULL;
     }
+  GNUNET_CONTAINER_DLL_insert (head, tail, h);
   return h;
 }
 
@@ -709,27 +527,24 @@
  * @param h handle to the scalarproduct
  */
 void
-GNUNET_SCALARPRODUCT_disconnect (struct GNUNET_SCALARPRODUCT_Handle * h)
+GNUNET_SCALARPRODUCT_disconnect (struct GNUNET_SCALARPRODUCT_ComputationHandle 
* h)
 {
-  struct GNUNET_SCALARPRODUCT_QueueEntry * qe;
+  struct GNUNET_SCALARPRODUCT_ComputationHandle * qe;
 
   LOG (GNUNET_ERROR_TYPE_INFO,
        "Disconnecting from VectorProduct\n");
 
-  while (NULL != h->queue_head)
+  for (qe = head; head != NULL; qe = head)
     {
-      GNUNET_assert (NULL != (qe = free_queue_head_entry (h)));
+      GNUNET_CONTAINER_DLL_remove (head, tail, qe);
+      if (NULL == qe->th)
+        GNUNET_CLIENT_notify_transmit_ready_cancel(qe->th);
+      GNUNET_CLIENT_disconnect (h->client);
+      GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES);
       qe->response_proc (qe, NULL, 
GNUNET_SCALARPRODUCT_Status_ServiceDisconnected);
+      GNUNET_free(qe->msg);
+      GNUNET_free(qe);
     }
-
-  if (h->client != NULL)
-    {
-      GNUNET_CLIENT_disconnect (h->client);
-      h->client = NULL;
-    }
-
-  GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO);
-  h->stats = NULL;
 }
 
 /* end of ext_api.c */




reply via email to

[Prev in Thread] Current Thread [Next in Thread]