[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r28950 - in gnunet/src: include scalarproduct
From: |
gnunet |
Subject: |
[GNUnet-SVN] r28950 - in gnunet/src: include scalarproduct |
Date: |
Mon, 2 Sep 2013 16:46:52 +0200 |
Author: cfuchs
Date: 2013-09-02 16:46:52 +0200 (Mon, 02 Sep 2013)
New Revision: 28950
Modified:
gnunet/src/include/gnunet_scalarproduct_service.h
gnunet/src/scalarproduct/scalarproduct_api.c
Log:
removed much of the excell logics in the scalar product API
finished the the alice/bob API initiation functions in the SP API
merged structes in SP API
reorganized SP bookkeeping of computations
Modified: gnunet/src/include/gnunet_scalarproduct_service.h
===================================================================
--- gnunet/src/include/gnunet_scalarproduct_service.h 2013-09-02 14:25:54 UTC
(rev 28949)
+++ gnunet/src/include/gnunet_scalarproduct_service.h 2013-09-02 14:46:52 UTC
(rev 28950)
@@ -50,68 +50,8 @@
GNUNET_SCALARPRODUCT_Status_ServiceDisconnected
};
-struct GNUNET_SCALARPRODUCT_Handle
-{
- /**
- * Our configuration.
- */
- const struct GNUNET_CONFIGURATION_Handle *cfg;
+struct GNUNET_SCALARPRODUCT_Handle;
- /**
- * Current connection to the scalarproduct service.
- */
- struct GNUNET_CLIENT_Connection *client;
-
- /**
- * Handle for statistics.
- */
- struct GNUNET_STATISTICS_Handle *stats;
-
- /**
- * Current transmit handle.
- */
- struct GNUNET_CLIENT_TransmitHandle *th;
-
- /**
- * Handle to the master context.
- */
- struct GNUNET_SCALARPRODUCT_Handle *h;
-
- /**
- * The shared session key identifying this computation
- */
- struct GNUNET_HashCode * key;
-
- /**
- * The message to be transmitted
- */
- void * msg;
-
- union
- {
- /**
- * Function to call after transmission of the request.
- */
- GNUNET_SCALARPRODUCT_ContinuationWithStatus cont_status;
-
- /**
- * Function to call after transmission of the request.
- */
- GNUNET_SCALARPRODUCT_DatumProcessor cont_datum;
- };
-
- /**
- * Closure for 'cont'.
- */
- void *cont_cls;
-
- /**
- * Response Processor for response from the service. This function calls the
- * continuation function provided by the client.
- */
- GNUNET_SCALARPRODUCT_ResponseMessageHandler response_proc;
-};
-
typedef void (*GNUNET_SCALARPRODUCT_ResponseMessageHandler) (void *cls,
const struct
GNUNET_MessageHeader *msg,
enum
GNUNET_SCALARPRODUCT_ResponseStatus status);
Modified: gnunet/src/scalarproduct/scalarproduct_api.c
===================================================================
--- gnunet/src/scalarproduct/scalarproduct_api.c 2013-09-02 14:25:54 UTC
(rev 28949)
+++ gnunet/src/scalarproduct/scalarproduct_api.c 2013-09-02 14:46:52 UTC
(rev 28950)
@@ -41,24 +41,44 @@
/**
* Entry in the request queue per client
*/
-struct GNUNET_SCALARPRODUCT_QueueEntry
+struct GNUNET_SCALARPRODUCT_ComputationHandle
{
/**
* This is a linked list.
*/
- struct GNUNET_SCALARPRODUCT_QueueEntry *next;
+ struct GNUNET_SCALARPRODUCT_ComputationHandle *next;
/**
* This is a linked list.
*/
- struct GNUNET_SCALARPRODUCT_QueueEntry *prev;
+ struct GNUNET_SCALARPRODUCT_ComputationHandle *prev;
+
+ /**
+ * Our configuration.
+ */
+ const struct GNUNET_CONFIGURATION_Handle *cfg;
/**
- * Handle to the master context.
+ * Current connection to the scalarproduct service.
*/
- struct GNUNET_SCALARPRODUCT_Handle *h;
+ struct GNUNET_CLIENT_Connection *client;
/**
+ * Handle for statistics.
+ */
+ struct GNUNET_STATISTICS_Handle *stats;
+
+ /**
+ * The shared session key identifying this computation
+ */
+ struct GNUNET_HashCode * key;
+
+ /**
+ * Current transmit handle.
+ */
+ struct GNUNET_CLIENT_TransmitHandle *th;
+
+ /**
* Size of the message
*/
uint16_t message_size;
@@ -66,7 +86,7 @@
/**
* Message to be sent to the scalarproduct service
*/
- struct GNUNET_SCALARPRODUCT_client_request* msg;
+ struct GNUNET_SCALARPRODUCT_client_request * msg;
union
{
@@ -87,14 +107,6 @@
void *cont_cls;
/**
- * Has this message been transmitted to the service?
- * Only ever GNUNET_YES for the head of the queue.
- * Note that the overall struct should end at a
- * multiple of 64 bits.
- */
- int16_t was_transmitted;
-
- /**
* Response Processor for response from the service. This function calls the
* continuation function provided by the client.
*/
@@ -102,35 +114,20 @@
};
/**************************************************************
- *** Function Declarations **********
+ *** Global Variables **********
**************************************************************/
-
/**
- * Creates a new entry at the tail of the DLL
- *
- * @param h handle to the master context
- *
- * @return pointer to the entry
+ * Head of the active sessions queue
*/
-static struct GNUNET_SCALARPRODUCT_QueueEntry *
-make_queue_entry (struct GNUNET_SCALARPRODUCT_Handle *h);
-
+struct GNUNET_SCALARPRODUCT_ComputationHandle *head;
/**
- * Removes the head entry from the queue
- *
- * @param h Handle to the master context
+ * Tail of the active sessions queue
*/
-static struct GNUNET_SCALARPRODUCT_QueueEntry *
-free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h);
+struct GNUNET_SCALARPRODUCT_ComputationHandle *tail;
-/**
- * Triggered when timeout occurs for a request in queue
- *
- * @param cls The pointer to the QueueEntry
- * @param tc Task Context
- */
-static void
-timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+/**************************************************************
+ *** Function Declarations **********
+ **************************************************************/
/**
* Called when a response is received from the service. After basic check
@@ -155,121 +152,11 @@
static size_t transmit_request (void *cls, size_t size,
void *buf);
-/**
- * Issues transmit request for the new entries in the queue
- *
- * @param h handle to the master context
- */
-static void
-process_queue (struct GNUNET_SCALARPRODUCT_Handle *h);
-
/**************************************************************
*** Static Function Declarations **********
**************************************************************/
-
/**
- * Creates a new entry at the tail of the DLL
- *
- * @param h handle to the master context
- *
- * @return pointer to the entry
- */
-static struct GNUNET_SCALARPRODUCT_QueueEntry *
-make_queue_entry (struct GNUNET_SCALARPRODUCT_Handle *h)
-{
- struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
-
- qe = GNUNET_new (struct GNUNET_SCALARPRODUCT_QueueEntry);
-
- // if queue empty
- if (NULL == h->queue_head && NULL == h->queue_tail)
- {
- qe->next = NULL;
- qe->prev = NULL;
- h->queue_head = qe;
- h->queue_tail = qe;
- }
- else
- {
- qe->prev = h->queue_tail;
- h->queue_tail->next = qe;
- h->queue_tail = qe;
- }
-
- return qe;
-}
-
-
-/**
- * Removes the head entry from the queue
- *
- * @param h Handle to the master context
- */
-static struct GNUNET_SCALARPRODUCT_QueueEntry *
-free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h)
-{
- struct GNUNET_SCALARPRODUCT_QueueEntry * qe = NULL;
-
- GNUNET_assert (NULL != h);
- if (NULL == h->queue_head && NULL == h->queue_tail)
- {
- // The queue is empty. Just return.
- qe = NULL;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue was empty when
free_queue_head_entry was called.\n");
- }
- else if (h->queue_head == h->queue_tail) //only one entry
- {
- qe = h->queue_head;
- qe->next = NULL;
- qe->prev = NULL;
- h->queue_head = NULL;
- h->queue_tail = NULL;
- }
- else
- {
- qe = h->queue_head;
- h->queue_head = h->queue_head->next;
- h->queue_head->prev = NULL;
- qe->next = NULL;
- qe->prev = NULL;
- }
- return qe;
-}
-
-
-/**
- * Triggered when timeout occurs for a request in queue
- *
- * @param cls The pointer to the QueueEntry
- * @param tc Task Context
- */
-static void
-timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct GNUNET_SCALARPRODUCT_QueueEntry * qe = cls;
-
- // Update Statistics
- GNUNET_STATISTICS_update (qe->h->stats,
- gettext_noop ("# queue entry timeouts"), 1,
- GNUNET_NO);
-
- // Clear the timeout_task
- qe->timeout_task = GNUNET_SCHEDULER_NO_TASK;
-
- // transmit_request is supposed to cancel timeout task.
- // If message was not transmitted, there is definitely an error.
- GNUNET_assert (GNUNET_NO == qe->was_transmitted);
-
- LOG (GNUNET_ERROR_TYPE_INFO, "Timeout of request in datastore queue\n");
-
- // remove the queue_entry for the queue
- GNUNET_CONTAINER_DLL_remove (qe->h->queue_head, qe->h->queue_tail, qe);
- qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Timeout);
-}
-
-
-/**
* Handles the RESULT received in reply of prepare_response from the
* service
*
@@ -281,7 +168,7 @@
const struct GNUNET_MessageHeader *msg,
enum GNUNET_SCALARPRODUCT_ResponseStatus status)
{
- struct GNUNET_SCALARPRODUCT_QueueEntry *qe = cls;
+ struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
GNUNET_assert (qe != NULL);
@@ -302,7 +189,7 @@
const struct GNUNET_MessageHeader *msg,
enum GNUNET_SCALARPRODUCT_ResponseStatus status)
{
- struct GNUNET_SCALARPRODUCT_QueueEntry *qe = cls;
+ struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
GNUNET_assert (qe != NULL);
@@ -328,8 +215,8 @@
static void
receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
{
- struct GNUNET_SCALARPRODUCT_Handle *h = cls;
- struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
+ struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls;
+ struct GNUNET_SCALARPRODUCT_ComputationHandle *qe;
int16_t was_transmitted;
struct GNUNET_SCALARPRODUCT_client_response *message =
(struct GNUNET_SCALARPRODUCT_client_response *) msg;
@@ -414,36 +301,18 @@
transmit_request (void *cls, size_t size,
void *buf)
{
- struct GNUNET_SCALARPRODUCT_Handle *h = cls;
- struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
+ struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
size_t msize;
- if (NULL == (qe = h->queue_head))
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue head is NULL!\n\n");
- return 0;
- }
-
- GNUNET_SCHEDULER_cancel (qe->timeout_task);
- qe->timeout_task = GNUNET_SCHEDULER_NO_TASK;
-
- h->th = NULL;
- if (NULL == (qe = h->queue_head))
- return 0; /* no entry in queue */
if (buf == NULL)
{
LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to
SCALARPRODUCT.\n");
- GNUNET_STATISTICS_update (h->stats,
+ GNUNET_STATISTICS_update (qe->stats,
gettext_noop ("# transmission request
failures"),
1, GNUNET_NO);
- GNUNET_SCALARPRODUCT_disconnect (h);
+ GNUNET_SCALARPRODUCT_disconnect (qe);
return 0;
}
- if (size < (msize = qe->message_size))
- {
- process_queue (h);
- return 0;
- }
LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to
SCALARPRODUCT\n",
msize);
@@ -451,8 +320,7 @@
GNUNET_free (qe->msg);
qe->was_transmitted = GNUNET_YES;
- GNUNET_assert (GNUNET_NO == h->in_receive);
- h->in_receive = GNUNET_YES;
+ qe->th = NULL;
GNUNET_CLIENT_receive (h->client, &receive_cb, h,
GNUNET_TIME_UNIT_FOREVER_REL);
@@ -466,61 +334,6 @@
}
-/**
- * Issues transmit request for the new entries in the queue
- *
- * @param h handle to the master context
- */
-static void
-process_queue (struct GNUNET_SCALARPRODUCT_Handle *h)
-{
- struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
-
- if (NULL == (qe = h->queue_head))
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n");
- return; /* no entry in queue */
- }
- if (qe->was_transmitted == GNUNET_YES)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n");
- return; /* waiting for replies */
- }
- if (h->th != NULL)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n");
- return; /* request pending */
- }
- if (h->client == NULL)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n");
- return; /* waiting for reconnect */
- }
- if (GNUNET_YES == h->in_receive)
- {
- /* wait for response to previous query */
- return;
- }
-
- h->th =
- GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_YES,
- &transmit_request, h);
-
- if (h->th == NULL)
- {
- LOG (GNUNET_ERROR_TYPE_ERROR,
- _ ("Failed to send a message to the scalarproduct service\n"));
- return;
- }
-
- GNUNET_assert (GNUNET_NO == h->in_receive);
- GNUNET_break (NULL != h->th);
-}
-
-
-
/**************************************************************
*** API **********
**************************************************************/
@@ -536,7 +349,7 @@
* @param cont Callback function
* @param cont_cls Closure for the callback function
*/
-struct GNUNET_SCALARPRODUCT_Handle *
+struct GNUNET_SCALARPRODUCT_ComputationHandle *
GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg,
const struct GNUNET_HashCode * key,
const int32_t * elements,
@@ -544,7 +357,7 @@
GNUNET_SCALARPRODUCT_ContinuationWithStatus
cont,
void *cont_cls)
{
- struct GNUNET_SCALARPRODUCT_Handle *h;
+ struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
struct GNUNET_SCALARPRODUCT_client_request *msg;
int32_t * vector;
uint16_t size;
@@ -556,7 +369,7 @@
GNUNET_assert(element_count > 1);
GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct
GNUNET_SCALARPRODUCT_client_request)
+ element_count * sizeof
(int32_t));
- h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle);
+ h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
if (!h->client)
{
@@ -595,7 +408,6 @@
memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
-
h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
GNUNET_TIME_UNIT_FOREVER_REL,
GNUNET_YES, // retry is OK in
the initial stage
@@ -610,6 +422,7 @@
GNUNET_free(h);
return NULL;
}
+ GNUNET_CONTAINER_DLL_insert (head, tail, h);
return h;
}
@@ -627,7 +440,7 @@
* @param cont Callback function
* @param cont_cls Closure for the callback function
*/
-struct GNUNET_SCALARPRODUCT_Handle *
+struct GNUNET_SCALARPRODUCT_ComputationHandle *
GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg,
const struct GNUNET_HashCode * key,
const struct GNUNET_PeerIdentity *peer,
@@ -638,38 +451,39 @@
GNUNET_SCALARPRODUCT_DatumProcessor cont,
void *cont_cls)
{
- struct GNUNET_CLIENT_Connection *client;
- struct GNUNET_SCALARPRODUCT_Handle *h;
+ struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
struct GNUNET_SCALARPRODUCT_client_request *msg;
int32_t * vector;
uint16_t size;
uint64_t i;
- GNUNET_assert(key);
- GNUNET_assert(peer);
- GNUNET_assert(elements);
- GNUNET_assert(mask);
- GNUNET_assert(cont);
- GNUNET_assert(element_count > 1);
- GNUNET_assert(mask_bytes != 0);
GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct
GNUNET_SCALARPRODUCT_client_request)
+ element_count * sizeof
(int32_t)
+ mask_length);
- client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
-
- if (!client)
+
+ h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
+ h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
+ if (!h->client)
{
LOG (GNUNET_ERROR_TYPE_ERROR,
_ ("Failed to connect to the scalarproduct service\n"));
+ GNUNET_free(h);
return NULL;
}
+ h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
+ if (!h->th){
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ _("Failed to send a message to the statistics service\n"));
+ GNUNET_CLIENT_disconnect(h->client);
+ GNUNET_free(h);
+ return NULL;
+ }
+
size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count *
sizeof (int32_t) + mask_length;
- h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle);
h->cont_datum = cont;
h->cont_cls = cont_cls;
h->response_proc = &process_status_message;
- h->client = client;
h->cfg = cfg;
h->msg = GNUNET_malloc (size);
memcpy (&h->key, key, sizeof (struct GNUNET_HashCode));
@@ -689,17 +503,21 @@
memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
memcpy (&vector[element_count], mask, mask_length);
- h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
GNUNET_TIME_UNIT_FOREVER_REL,
GNUNET_YES, // retry is OK in
the initial stage
&transmit_request, h);
- if ( !h->th)
+ if (!h->th)
{
LOG (GNUNET_ERROR_TYPE_ERROR,
_ ("Failed to send a message to the scalarproduct service\n"));
+ GNUNET_STATISTICS_destroy(h->GNUNET_YES);
+ GNUNET_CLIENT_disconnect(h->client);
+ GNUNET_free(h->msg);
+ GNUNET_free(h);
return NULL;
}
+ GNUNET_CONTAINER_DLL_insert (head, tail, h);
return h;
}
@@ -709,27 +527,24 @@
* @param h handle to the scalarproduct
*/
void
-GNUNET_SCALARPRODUCT_disconnect (struct GNUNET_SCALARPRODUCT_Handle * h)
+GNUNET_SCALARPRODUCT_disconnect (struct GNUNET_SCALARPRODUCT_ComputationHandle
* h)
{
- struct GNUNET_SCALARPRODUCT_QueueEntry * qe;
+ struct GNUNET_SCALARPRODUCT_ComputationHandle * qe;
LOG (GNUNET_ERROR_TYPE_INFO,
"Disconnecting from VectorProduct\n");
- while (NULL != h->queue_head)
+ for (qe = head; head != NULL; qe = head)
{
- GNUNET_assert (NULL != (qe = free_queue_head_entry (h)));
+ GNUNET_CONTAINER_DLL_remove (head, tail, qe);
+ if (NULL == qe->th)
+ GNUNET_CLIENT_notify_transmit_ready_cancel(qe->th);
+ GNUNET_CLIENT_disconnect (h->client);
+ GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES);
qe->response_proc (qe, NULL,
GNUNET_SCALARPRODUCT_Status_ServiceDisconnected);
+ GNUNET_free(qe->msg);
+ GNUNET_free(qe);
}
-
- if (h->client != NULL)
- {
- GNUNET_CLIENT_disconnect (h->client);
- h->client = NULL;
- }
-
- GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO);
- h->stats = NULL;
}
/* end of ext_api.c */
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r28950 - in gnunet/src: include scalarproduct,
gnunet <=