[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r29564 - in gnunet/src: include psyc
From: |
gnunet |
Subject: |
[GNUnet-SVN] r29564 - in gnunet/src: include psyc |
Date: |
Wed, 25 Sep 2013 19:46:06 +0200 |
Author: tg
Date: 2013-09-25 19:46:06 +0200 (Wed, 25 Sep 2013)
New Revision: 29564
Added:
gnunet/src/psyc/psyc.conf
Modified:
gnunet/src/include/gnunet_protocols.h
gnunet/src/include/gnunet_psyc_service.h
gnunet/src/psyc/Makefile.am
gnunet/src/psyc/gnunet-service-psyc.c
gnunet/src/psyc/psyc.h
gnunet/src/psyc/psyc_api.c
gnunet/src/psyc/test_psyc.c
Log:
psyc service: start/stop, join/part, message transmission: lib -> psyc ->
mcast; psyc API: stop/resume transmission
Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h 2013-09-25 17:46:03 UTC (rev
29563)
+++ gnunet/src/include/gnunet_protocols.h 2013-09-25 17:46:06 UTC (rev
29564)
@@ -29,7 +29,7 @@
/*******************************************************************************
* TODO: we need a way to register message types centrally (via some webpage).
* For now: unofficial extensions should start at 48k, internal extensions
- * define here should leave some room (4-10 additional messages to the previous
+ * defined here should leave some room (4-10 additional messages to the
previous
* extension).
******************************************************************************/
@@ -1985,9 +1985,9 @@
#define GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT 655
-#define GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET_MASTER 656
+#define GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET 656
-#define GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET_SLAVE 657
+/* 657 */
#define GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY 658
@@ -2008,14 +2008,88 @@
#define GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT 665
-#define GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS_MASTER 666
+#define GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS 666
-#define GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS_SLAVE 667
+#define GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE 667
-#define GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE 668
+/*******************************************************************************
+ * PSYC message types
+
******************************************************************************/
+
+#define GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE 680
+
+
+#define GNUNET_MESSAGE_TYPE_PSYC_MASTER_START 681
+
+#define GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK 682
+
+#define GNUNET_MESSAGE_TYPE_PSYC_MASTER_STOP 683
+
+
+#define GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN 684
+
+#define GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK 685
+
+#define GNUNET_MESSAGE_TYPE_PSYC_SLAVE_PART 686
+
+
+#define GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST 687
+
+#define GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION 688
+
+
+#define GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD 689
+
+#define GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM 690
+
+
+#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_METHOD 691
+
+#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MODIFIER 692
+
+#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MOD_CONT 693
+
+#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_DATA 694
+
+#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK 695
+
+
+#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD 696
+
+#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER 697
+
+#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT 698
+
+#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA 699
+
+#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK 700
+
+
+#define GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST 701
+
+#define GNUNET_MESSAGE_TYPE_PSYC_STORY_METHOD 702
+
+#define GNUNET_MESSAGE_TYPE_PSYC_STORY_MODIFIER 703
+
+#define GNUNET_MESSAGE_TYPE_PSYC_STORY_MOD_CONT 704
+
+#define GNUNET_MESSAGE_TYPE_PSYC_STORY_DATA 705
+
+#define GNUNET_MESSAGE_TYPE_PSYC_STORY_ACK 706
+
+
+#define GNUNET_MESSAGE_TYPE_PSYC_STATE_GET 707
+
+#define GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX 708
+
+#define GNUNET_MESSAGE_TYPE_PSYC_STATE_MODIFIER 709
+
+#define GNUNET_MESSAGE_TYPE_PSYC_STATE_MOD_CONT 710
+
+
/**
- * Next available: 680
+ * Next available: 730
*/
@@ -2029,7 +2103,7 @@
/**
* Multicast message from the origin to all members.
*/
-#define GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE 700
+#define GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE 730
/**
* A unicast message from a group member to the origin.
Modified: gnunet/src/include/gnunet_psyc_service.h
===================================================================
--- gnunet/src/include/gnunet_psyc_service.h 2013-09-25 17:46:03 UTC (rev
29563)
+++ gnunet/src/include/gnunet_psyc_service.h 2013-09-25 17:46:06 UTC (rev
29564)
@@ -184,6 +184,96 @@
};
+/**
+ * M
+ */
+struct GNUNET_PSYC_MessageMethod
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
+ */
+ struct GNUNET_MessageHeader header;
+
+ uint32_t reserved GNUNET_PACKED;
+
+ /**
+ * Number of modifiers in the message.
+ */
+ uint32_t mod_count GNUNET_PACKED;
+
+ /**
+ * OR'ed GNUNET_PSYC_MasterTransmitFlags
+ */
+ uint32_t flags GNUNET_PACKED;
+
+ /**
+ * Sending slave's public key. NULL if the message is from the master, or
when
+ * transmitting a message.
+ */
+ struct GNUNET_CRYPTO_EccPublicSignKey slave_key;
+
+ /* Followed by NUL-terminated method name. */
+};
+
+
+struct GNUNET_PSYC_MessageModifier
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Size of value.
+ */
+ uint32_t value_size GNUNET_PACKED;
+
+ /**
+ * Size of name, including NUL terminator.
+ */
+ uint16_t name_size GNUNET_PACKED;
+
+ /**
+ * enum GNUNET_ENV_Operator
+ */
+ uint8_t oper;
+
+ /* Followed by NUL-terminated name, then the value. */
+};
+
+
+enum GNUNET_PSYC_DataStatus
+{
+ /**
+ * To be continued.
+ */
+ GNUNET_PSYC_DATA_CONT = 0,
+
+ /**
+ * Reached the end of message.
+ */
+ GNUNET_PSYC_DATA_END = 1,
+
+ /**
+ * Cancelled before the end.
+ */
+ GNUNET_PSYC_DATA_CANCEL = 2
+};
+
+
+struct GNUNET_PSYC_MessageData
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * enum GNUNET_PSYC_DataStatus
+ */
+ uint8_t status;
+};
+
/**
* Handle that identifies a join request.
*
@@ -194,14 +284,14 @@
/**
- * Method called from PSYC upon receiving a message indicating a call
- * to a @e method.
+ * Method called from PSYC upon receiving a message indicating a call to a
+ * @e method.
*
* @param cls Closure.
* @param slave_key Who transmitted the message.
* - NULL for multicast messages from the master.
- * - The hash of the sending slave's public key for unicast requests
from
- * one of the slaves to the master.
+ * - The sending slave's public key for unicast requests from one of the
+ * slaves to the master.
* @param message_id Unique message counter for this message.
* Unique only in combination with the given sender for this channel.
* @param method_name Method name from PSYC.
@@ -241,7 +331,8 @@
*/
typedef int
(*GNUNET_PSYC_JoinCallback) (void *cls,
- const struct GNUNET_CRYPTO_EccPublicSignKey
*slave_key,
+ const struct GNUNET_CRYPTO_EccPublicSignKey
+ *slave_key,
const char *method_name,
size_t variable_count,
const struct GNUNET_ENV_Modifier *variables,
@@ -268,7 +359,8 @@
* multicast tree. Note that it is unnecessary to specify our own
* peer identity in this array.
* @param method_name Method name for the message transmitted with the
response.
- * @param env Environment containing transient variables for the message, or
NULL.
+ * @param env Environment containing transient variables for the message,
+ * or NULL.
* @param data Data of the message.
* @param data_size Size of @a data.
*/
@@ -290,6 +382,16 @@
/**
+ * Function called after the channel master started.
+ *
+ * @param cls Closure.
+ * @param last_message_id Last message ID sent to the channel.
+ */
+typedef void
+(*GNUNET_PSYC_MasterStartCallback) (void *cls, uint64_t max_message_id);
+
+
+/**
* Start a PSYC master channel.
*
* Will start a multicast group identified by the given ECC key. Messages
@@ -313,6 +415,7 @@
* Used to automate join decisions.
* @param method Function to invoke on messages received from slaves.
* @param join_cb Function to invoke when a peer wants to join.
+ * @param start_cb Function to invoke after the channel master started.
* @param cls Closure for @a method and @a join_cb.
* @return Handle for the channel master, NULL on error.
*/
@@ -322,6 +425,7 @@
enum GNUNET_PSYC_Policy policy,
GNUNET_PSYC_Method method,
GNUNET_PSYC_JoinCallback join_cb,
+ GNUNET_PSYC_MasterStartCallback start_cb,
void *cls);
@@ -339,8 +443,10 @@
* @param[out] data Where to write the body of the message to give to the
* method. The function must copy at most @a data_size bytes to @a
data.
* @return #GNUNET_SYSERR on error (fatal, aborts transmission)
- * #GNUNET_NO on success, if more data is to be transmitted later
- * (should be used if @a data_size was not big enough to take all the
data)
+ * #GNUNET_NO on success, if more data is to be transmitted later.
+ * Should be used if @a data_size was not big enough to take all the
+ * data. If 0 is returned in @a data_size the transmission is paused,
+ * and can be resumed with GNUNET_PSYC_master_transmit_resume().
* #GNUNET_YES if this completes the transmission (all data supplied)
*/
typedef int
@@ -403,6 +509,15 @@
/**
+ * Resume transmission to the channel.
+ *
+ * @param th Handle of the request that is being resumed.
+ */
+void
+GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle
*th);
+
+
+/**
* Abort transmission request to channel.
*
* @param th Handle of the request that is being aborted.
@@ -427,10 +542,20 @@
/**
+ * Function called after the slave joined.
+ *
+ * @param cls Closure.
+ * @param max_message_id Last message ID sent to the channel.
+ */
+typedef void
+(*GNUNET_PSYC_SlaveJoinCallback) (void *cls, uint64_t max_message_id);
+
+
+/**
* Join a PSYC channel.
*
* The entity joining is always the local peer. The user must immediately use
- * the GNUNET_PSYC_slave_to_master() functions to transmit a @e join_msg to the
+ * the GNUNET_PSYC_slave_transmit() functions to transmit a @e join_msg to the
* channel; if the join request succeeds, the channel state (and @e recent
* method calls) will be replayed to the joining member. There is no explicit
* notification on failure (as the channel may simply take days to approve,
@@ -464,6 +589,7 @@
const struct GNUNET_PeerIdentity *relays,
GNUNET_PSYC_Method method,
GNUNET_PSYC_JoinCallback join_cb,
+ GNUNET_PSYC_SlaveJoinCallback slave_joined_cb,
void *cls,
const char *method_name,
const struct GNUNET_ENV_Environment *env,
@@ -475,7 +601,7 @@
* Part a PSYC channel.
*
* Will terminate the connection to the PSYC service. Polite clients should
- * first explicitly send a @e part request (via GNUNET_PSYC_slave_to_master()).
+ * first explicitly send a @e part request (via GNUNET_PSYC_slave_transmit()).
*
* @param slave Slave handle.
*/
@@ -484,15 +610,16 @@
/**
- * Function called to provide data for a transmission to the channel
- * master (aka the @e host of the channel).
+ * Function called to provide data for a transmission to the channel master
+ * (a.k.a. the @e host of the channel).
*
* Note that returning #GNUNET_OK or #GNUNET_SYSERR (but not #GNUNET_NO)
* invalidates the respective transmission handle.
*
* @param cls Closure.
- * @param[in,out] data_size Initially set to the number of bytes available in
@a data,
- * should be set to the number of bytes written to data (IN/OUT).
+ * @param[in,out] data_size Initially set to the number of bytes available in
+ * @a data, should be set to the number of bytes written to data
+ * (IN/OUT).
* @param[out] data Where to write the body of the message to give to the
method;
* function must copy at most @a *data_size bytes to @a data.
* @return #GNUNET_SYSERR on error (fatal, aborts transmission).
@@ -541,6 +668,15 @@
/**
+ * Resume transmission to the master.
+ *
+ * @param th Handle of the request that is being resumed.
+ */
+void
+GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle
*th);
+
+
+/**
* Abort transmission request to master.
*
* @param th Handle of the request that is being aborted.
@@ -556,7 +692,8 @@
/**
- * Convert a channel @a master to a @e channel handle to access the @e channel
APIs.
+ * Convert a channel @a master to a @e channel handle to access the @e channel
+ * APIs.
*
* @param master Channel master handle.
* @return Channel handle, valid for as long as @a master is valid.
@@ -598,7 +735,8 @@
*/
void
GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel,
- const struct GNUNET_CRYPTO_EccPublicSignKey
*slave_key,
+ const struct GNUNET_CRYPTO_EccPublicSignKey
+ *slave_key,
uint64_t announced_at,
uint64_t effective_since);
@@ -626,7 +764,8 @@
*/
void
GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel,
- const struct GNUNET_CRYPTO_EccPublicSignKey
*slave_key,
+ const struct GNUNET_CRYPTO_EccPublicSignKey
+ *slave_key,
uint64_t announced_at);
@@ -702,6 +841,10 @@
void
GNUNET_PSYC_channel_story_tell_cancel (struct GNUNET_PSYC_Story *story);
+
+/**
+ * Handle for a state query operation.
+ */
struct GNUNET_PSYC_StateQuery;
@@ -730,9 +873,9 @@
/**
* Return all channel state variables whose name matches a given prefix.
*
- * A name matches if it starts with the given @a name_prefix, thus requesting
the
- * empty prefix ("") will match all values; requesting "_a_b" will also return
- * values stored under "_a_b_c".
+ * A name matches if it starts with the given @a name_prefix, thus requesting
+ * the empty prefix ("") will match all values; requesting "_a_b" will also
+ * return values stored under "_a_b_c".
*
* The @a state_cb is invoked on all matching state variables asynchronously,
as
* the state is stored in and retrieved from the PSYCstore,
Modified: gnunet/src/psyc/Makefile.am
===================================================================
--- gnunet/src/psyc/Makefile.am 2013-09-25 17:46:03 UTC (rev 29563)
+++ gnunet/src/psyc/Makefile.am 2013-09-25 17:46:06 UTC (rev 29564)
@@ -24,12 +24,14 @@
psyc.h
libgnunetpsyc_la_LIBADD = \
$(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/env/libgnunetenv.la \
$(GN_LIBINTL) $(XLIB)
libgnunetpsyc_la_LDFLAGS = \
$(GN_LIB_LDFLAGS) $(WINFLAGS) \
-version-info 0:0:0
libgnunetpsyc_la_DEPENDENCIES = \
- $(top_builddir)/src/util/libgnunetutil.la
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/env/libgnunetenv.la
bin_PROGRAMS =
@@ -41,10 +43,14 @@
gnunet_service_psyc_LDADD = \
$(top_builddir)/src/statistics/libgnunetstatistics.la \
$(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/multicast/libgnunetmulticast.la \
+ $(top_builddir)/src/psycstore/libgnunetpsycstore.la \
$(GN_LIBINTL)
gnunet_service_psyc_DEPENDENCIES = \
$(top_builddir)/src/statistics/libgnunetstatistics.la \
- $(top_builddir)/src/util/libgnunetutil.la
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/multicast/libgnunetmulticast.la \
+ $(top_builddir)/src/psycstore/libgnunetpsycstore.la
if HAVE_TESTING
Modified: gnunet/src/psyc/gnunet-service-psyc.c
===================================================================
--- gnunet/src/psyc/gnunet-service-psyc.c 2013-09-25 17:46:03 UTC (rev
29563)
+++ gnunet/src/psyc/gnunet-service-psyc.c 2013-09-25 17:46:06 UTC (rev
29564)
@@ -29,10 +29,14 @@
#include "gnunet_constants.h"
#include "gnunet_protocols.h"
#include "gnunet_statistics_service.h"
+#include "gnunet_multicast_service.h"
+#include "gnunet_psycstore_service.h"
#include "gnunet_psyc_service.h"
#include "psyc.h"
+#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
+
/**
* Handle to our current configuration.
*/
@@ -48,8 +52,85 @@
*/
static struct GNUNET_SERVER_NotificationContext *nc;
+/**
+ * Handle to the PSYCstore.
+ */
+static struct GNUNET_PSYCSTORE_Handle *store;
/**
+ * Message in the transmission queue.
+ */
+struct TransmitMessage
+{
+ struct TransmitMessage *prev;
+ struct TransmitMessage *next;
+
+ char *buf;
+ uint16_t size;
+ uint8_t status;
+};
+
+/**
+ * Common part of the client context for both a master and slave channel.
+ */
+struct Channel
+{
+ struct GNUNET_SERVER_Client *client;
+
+ struct TransmitMessage *tmit_head;
+ struct TransmitMessage *tmit_tail;
+
+ char *tmit_buf;
+ uint32_t tmit_mod_count;
+ uint32_t tmit_mod_recvd;
+ uint16_t tmit_size;
+ uint8_t tmit_status;
+
+ uint8_t in_transmit;
+ uint8_t is_master;
+};
+
+/**
+ * Client context for a channel master.
+ */
+struct Master
+{
+ struct Channel channel;
+ struct GNUNET_CRYPTO_EccPrivateKey private_key;
+ struct GNUNET_CRYPTO_EccPublicSignKey public_key;
+
+ struct GNUNET_MULTICAST_Origin *origin;
+ struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
+
+ uint64_t max_message_id;
+ uint64_t max_state_message_id;
+ uint64_t max_group_generation;
+
+ /**
+ * enum GNUNET_PSYC_Policy
+ */
+ uint32_t policy;
+};
+
+
+/**
+ * Client context for a channel slave.
+ */
+struct Slave
+{
+ struct Channel channel;
+ struct GNUNET_CRYPTO_EccPrivateKey slave_key;
+ struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
+
+ struct GNUNET_MULTICAST_Member *member;
+ struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle;
+
+ uint64_t max_message_id;
+ uint64_t max_request_id;
+};
+
+
+/**
* Task run during shutdown.
*
* @param cls unused
@@ -70,7 +151,280 @@
}
}
+/**
+ * Called whenever a client is disconnected.
+ * Frees our resources associated with that client.
+ *
+ * @param cls Closure.
+ * @param client Identification of the client.
+ */
+static void
+client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
+{
+ if (NULL == client)
+ return;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client);
+
+ struct Channel *ch = GNUNET_SERVER_client_get_user_context (client,
+ struct Channel);
+ GNUNET_assert (NULL != ch);
+
+ if (NULL != ch->tmit_buf)
+ {
+ GNUNET_free (ch->tmit_buf);
+ ch->tmit_buf = NULL;
+ }
+ GNUNET_free (ch);
+}
+
+
+void
+counters_cb (void *cls, uint64_t max_fragment_id, uint64_t max_message_id,
+ uint64_t max_group_generation, uint64_t max_state_message_id)
+{
+ struct Channel *ch = cls;
+ struct CountersResult *res = GNUNET_malloc (sizeof (*res));
+ res->header.size = htons (sizeof (*res));
+ res->max_message_id = GNUNET_htonll (max_message_id);
+
+ if (ch->is_master)
+ {
+ struct Master *mst = cls;
+ mst->max_message_id = max_message_id;
+ mst->max_state_message_id = max_state_message_id;
+ mst->max_group_generation = max_group_generation;
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
+ }
+ else
+ {
+ struct Slave *slv = cls;
+ slv->max_message_id = max_message_id;
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
+ }
+
+ GNUNET_SERVER_notification_context_add (nc, ch->client);
+ GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
+ GNUNET_NO);
+ GNUNET_free (res);
+}
+
+
+static void
+handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ const struct MasterStartRequest *req
+ = (const struct MasterStartRequest *) msg;
+ struct Master *mst = GNUNET_new (struct Master);
+ mst->channel.client = client;
+ mst->channel.is_master = GNUNET_YES;
+ mst->policy = ntohl (req->policy);
+ mst->private_key = req->channel_key;
+ GNUNET_CRYPTO_ecc_key_get_public_for_signature (&mst->private_key,
+ &mst->public_key);
+
+ GNUNET_PSYCSTORE_counters_get (store, &mst->public_key,
+ counters_cb, mst);
+
+ GNUNET_SERVER_client_set_user_context (client, mst);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+static void
+handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ const struct SlaveJoinRequest *req
+ = (const struct SlaveJoinRequest *) msg;
+ struct Slave *slv = GNUNET_new (struct Slave);
+ slv->channel.client = client;
+ slv->channel.is_master = GNUNET_NO;
+ slv->channel_key = req->channel_key;
+ slv->slave_key = req->slave_key;
+
+ GNUNET_PSYCSTORE_counters_get (store, &slv->channel_key,
+ counters_cb, slv);
+
+ GNUNET_SERVER_client_set_user_context (client, slv);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+static void
+send_transmit_ack (struct Channel *ch)
+{
+ struct TransmitAck *res = GNUNET_malloc (sizeof (*res));
+ res->header.size = htons (sizeof (*res));
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK);
+ res->buf_avail = htons (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE - ch->tmit_size);
+
+ GNUNET_SERVER_notification_context_add (nc, ch->client);
+ GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
+ GNUNET_NO);
+ GNUNET_free (res);
+}
+
+
+static int
+transmit_notify (void *cls, size_t *data_size, void *data)
+{
+ struct Channel *ch = cls;
+ struct TransmitMessage *msg = ch->tmit_head;
+
+ if (NULL == msg || *data_size < msg->size)
+ {
+ *data_size = 0;
+ return GNUNET_NO;
+ }
+
+ memcpy (data, msg->buf, msg->size);
+ *data_size = msg->size;
+
+ GNUNET_free (ch->tmit_buf);
+ GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg);
+
+ return (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES;
+}
+
+
+static int
+master_transmit_message (struct Master *mst)
+{
+ if (NULL == mst->tmit_handle)
+ {
+ mst->tmit_handle
+ = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
+ mst->max_group_generation,
+ transmit_notify, mst);
+ }
+ else
+ {
+ GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
+ }
+ return GNUNET_OK;
+}
+
+
+static int
+slave_transmit_message (struct Slave *slv)
+{
+ if (NULL == slv->tmit_handle)
+ {
+ slv->tmit_handle
+ = GNUNET_MULTICAST_member_to_origin(slv->member, slv->max_request_id,
+ transmit_notify, slv);
+ }
+ else
+ {
+ GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
+ }
+ return GNUNET_OK;
+}
+
+
+static int
+buffer_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg)
+{
+ uint16_t size = ntohs (msg->size);
+
+ if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size)
+ return GNUNET_SYSERR;
+
+ if (0 == ch->tmit_size)
+ {
+ ch->tmit_buf = GNUNET_malloc (size);
+ memcpy (ch->tmit_buf, msg, size);
+ ch->tmit_size = size;
+ }
+ else if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE <= ch->tmit_size + size)
+ {
+ ch->tmit_buf = GNUNET_realloc (ch->tmit_buf, ch->tmit_size + size);
+ memcpy (ch->tmit_buf + ch->tmit_size, msg, size);
+ ch->tmit_size += size;
+ }
+
+ if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE
+ < ch->tmit_size + sizeof (struct GNUNET_PSYC_MessageData))
+ {
+ struct TransmitMessage *tmit_msg = GNUNET_new (struct TransmitMessage);
+ tmit_msg->buf = (char *) msg;
+ tmit_msg->size = size;
+ tmit_msg->status = ch->tmit_status;
+ GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
+
+ ch->is_master
+ ? master_transmit_message ((struct Master *) ch)
+ : slave_transmit_message ((struct Slave *) ch);
+ }
+
+ return GNUNET_OK;
+}
+
+static void
+handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ const struct GNUNET_PSYC_MessageMethod *meth
+ = (const struct GNUNET_PSYC_MessageMethod *) msg;
+ struct Channel *ch = GNUNET_SERVER_client_get_user_context (client,
+ struct Channel);
+ GNUNET_assert (NULL != ch);
+
+ if (GNUNET_NO != ch->in_transmit)
+ {
+ // FIXME: already transmitting a message, send back error message.
+ return;
+ }
+
+ ch->tmit_buf = NULL;
+ ch->tmit_size = 0;
+ ch->tmit_mod_recvd = 0;
+ ch->tmit_mod_count = ntohl (meth->mod_count);
+ ch->tmit_status = GNUNET_PSYC_DATA_CONT;
+
+ buffer_message (ch, msg);
+
+ if (0 == ch->tmit_mod_count)
+ send_transmit_ack (ch);
+};
+
+
+static void
+handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ const struct GNUNET_PSYC_MessageModifier *mod
+ = (const struct GNUNET_PSYC_MessageModifier *) msg;
+ struct Channel *ch = GNUNET_SERVER_client_get_user_context (client,
+ struct Channel);
+ GNUNET_assert (NULL != ch);
+
+ ch->tmit_mod_recvd++;
+ buffer_message (ch, msg);
+
+ if (ch->tmit_mod_recvd == ch->tmit_mod_count)
+ send_transmit_ack (ch);
+};
+
+
+static void
+handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ const struct GNUNET_PSYC_MessageData *data
+ = (const struct GNUNET_PSYC_MessageData *) msg;
+ struct Channel *ch = GNUNET_SERVER_client_get_user_context (client,
+ struct Channel);
+ GNUNET_assert (NULL != ch);
+
+ ch->tmit_status = data->status;
+ buffer_message (ch, msg);
+ send_transmit_ack (ch);
+};
+
+
/**
* Initialize the PSYC service.
*
@@ -83,14 +437,30 @@
const struct GNUNET_CONFIGURATION_Handle *c)
{
static const struct GNUNET_SERVER_MessageHandler handlers[] = {
+ { &handle_master_start, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
+
+ { &handle_slave_join, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
+
+ { &handle_transmit_method, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_METHOD, 0 },
+
+ { &handle_transmit_modifier, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MODIFIER, 0 },
+
+ { &handle_transmit_data, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_DATA, 0 },
+
{ NULL, NULL, 0, 0 }
};
cfg = c;
-
+ store = GNUNET_PSYCSTORE_connect (cfg);
stats = GNUNET_STATISTICS_create ("psyc", cfg);
+ nc = GNUNET_SERVER_notification_context_create (server, 1);
GNUNET_SERVER_add_handlers (server, handlers);
- nc = GNUNET_SERVER_notification_context_create (server, 1);
+ GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
NULL);
}
Added: gnunet/src/psyc/psyc.conf
===================================================================
--- gnunet/src/psyc/psyc.conf (rev 0)
+++ gnunet/src/psyc/psyc.conf 2013-09-25 17:46:06 UTC (rev 29564)
@@ -0,0 +1,7 @@
+[psyc]
+AUTOSTART = YES
+HOME = $SERVICEHOME
+BINARY = gnunet-service-psyc
+UNIXPATH = /tmp/gnunet-service-psyc.sock
+UNIX_MATCH_UID = NO
+UNIX_MATCH_GID = YES
Modified: gnunet/src/psyc/psyc.h
===================================================================
--- gnunet/src/psyc/psyc.h 2013-09-25 17:46:03 UTC (rev 29563)
+++ gnunet/src/psyc/psyc.h 2013-09-25 17:46:06 UTC (rev 29564)
@@ -24,15 +24,188 @@
* @author Gabor X Toth
*/
-#ifndef GNUNET_PSYC_H
-#define GNUNET_PSYC_H
+#ifndef PSYC_H
+#define PSYC_H
#include "gnunet_common.h"
GNUNET_NETWORK_STRUCT_BEGIN
+/**** service -> library ****/
+/**
+ * Answer from service to client about last operation.
+ */
+struct OperationResult
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE
+ */
+ struct GNUNET_MessageHeader header;
+ /**
+ * Operation ID.
+ */
+ uint32_t op_id GNUNET_PACKED;
+
+ /**
+ * Status code for the operation.
+ */
+ int64_t result_code GNUNET_PACKED;
+
+ /* followed by 0-terminated error message (on error) */
+
+};
+
+
+struct CountersResult
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYC_RESULT_COUNTERS
+ */
+ struct GNUNET_MessageHeader header;
+
+ uint64_t max_message_id;
+};
+
+
+/**
+ * Transmit acknowledgment.
+ *
+ * Sent after the last GNUNET_PSYC_MessageModifier and after each
+ * GNUNET_PSYC_MessageData.
+ *
+ * This message acknowledges previously received messages and asks for the next
+ * fragment of data.
+ */
+struct TransmitAck
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Buffer space available for the next data fragment.
+ */
+ uint16_t buf_avail;
+};
+
+
+/**** library -> service ****/
+
+
+struct MasterStartRequest
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYC_MASTER_START
+ */
+ struct GNUNET_MessageHeader header;
+
+ struct GNUNET_CRYPTO_EccPrivateKey channel_key;
+
+ uint32_t policy GNUNET_PACKED;
+};
+
+
+struct SlaveJoinRequest
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN
+ */
+ struct GNUNET_MessageHeader header;
+
+ uint32_t relay_count GNUNET_PACKED;
+
+ struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
+
+ struct GNUNET_CRYPTO_EccPrivateKey slave_key;
+
+ /* Followed by struct GNUNET_PeerIdentity relays[relay_count] */
+};
+
+
+struct ChannelSlaveAdd
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD
+ */
+ struct GNUNET_MessageHeader header;
+
+ uint32_t reserved;
+
+ struct GNUNET_CRYPTO_EccPublicSignKey *slave_key;
+
+ uint64_t announced_at;
+
+ uint64_t effective_since;
+};
+
+
+struct ChannelSlaveRemove
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM
+ */
+ struct GNUNET_MessageHeader header;
+
+ uint32_t reserved;
+
+ struct GNUNET_CRYPTO_EccPublicSignKey *slave_key;
+
+ uint64_t announced_at;
+};
+
+
+struct StoryRequest
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STORY_REQUEST
+ */
+ struct GNUNET_MessageHeader header;
+
+ uint64_t op_id;
+
+ uint64_t start_message_id;
+
+ uint64_t end_message_id;
+};
+
+
+struct StateQuery
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STATE_QUERY
+ */
+ struct GNUNET_MessageHeader header;
+
+ uint64_t op_id;
+
+ /* Followed by NUL-terminated name. */
+};
+
+
+struct StateResult
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STATE_RESULT
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Size of name, including NUL terminator.
+ */
+ uint16_t name_size GNUNET_PACKED;
+
+ /**
+ * OR'd StateOpFlags
+ */
+ uint8_t flags;
+
+ /* Followed by NUL-terminated name, then the value. */
+};
+
+
GNUNET_NETWORK_STRUCT_END
#endif
Modified: gnunet/src/psyc/psyc_api.c
===================================================================
--- gnunet/src/psyc/psyc_api.c 2013-09-25 17:46:03 UTC (rev 29563)
+++ gnunet/src/psyc/psyc_api.c 2013-09-25 17:46:06 UTC (rev 29564)
@@ -36,15 +36,76 @@
#include "gnunet_psyc_service.h"
#include "psyc.h"
+#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
+
+
+struct OperationHandle
+{
+ struct OperationHandle *prev;
+ struct OperationHandle *next;
+ const struct GNUNET_MessageHeader *msg;
+};
+
/**
- * Handle that identifies a join request.
- *
- * Used to match calls to #GNUNET_PSYC_JoinCallback to the
- * corresponding calls to GNUNET_PSYC_join_decision().
+ * Handle to access PSYC channel operations for both the master and slaves.
*/
-struct GNUNET_PSYC_JoinHandle
+struct GNUNET_PSYC_Channel
{
+ /**
+ * Configuration to use.
+ */
+ const struct GNUNET_CONFIGURATION_Handle *cfg;
+ /**
+ * Socket (if available).
+ */
+ struct GNUNET_CLIENT_Connection *client;
+
+ /**
+ * Currently pending transmission request, or NULL for none.
+ */
+ struct GNUNET_CLIENT_TransmitHandle *th;
+
+ /**
+ * Head of operations to transmit.
+ */
+ struct OperationHandle *transmit_head;
+
+ /**
+ * Tail of operations to transmit.
+ */
+ struct OperationHandle *transmit_tail;
+
+ /**
+ * Message to send on reconnect.
+ */
+ struct GNUNET_MessageHeader *reconnect_msg;
+
+ /**
+ * Task doing exponential back-off trying to reconnect.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
+
+ /**
+ * Time for next connect retry.
+ */
+ struct GNUNET_TIME_Relative reconnect_delay;
+
+ GNUNET_PSYC_Method method_cb;
+
+ GNUNET_PSYC_JoinCallback join_cb;
+
+ void *cb_cls;
+
+ /**
+ * Are we polling for incoming messages right now?
+ */
+ int in_receive;
+
+ /**
+ * Are we currently transmitting a message?
+ */
+ int in_transmit;
};
@@ -53,23 +114,30 @@
*/
struct GNUNET_PSYC_Master
{
+ struct GNUNET_PSYC_Channel ch;
+ GNUNET_PSYC_MasterStartCallback start_cb;
+
+ uint64_t max_message_id;
};
/**
- * Handle for a pending PSYC transmission operation.
+ * Handle for a PSYC channel slave.
*/
-struct GNUNET_PSYC_MasterTransmitHandle
+struct GNUNET_PSYC_Slave
{
-
+ struct GNUNET_PSYC_Channel ch;
};
/**
- * Handle for a PSYC channel slave.
+ * Handle that identifies a join request.
+ *
+ * Used to match calls to #GNUNET_PSYC_JoinCallback to the
+ * corresponding calls to GNUNET_PSYC_join_decision().
*/
-struct GNUNET_PSYC_Slave
+struct GNUNET_PSYC_JoinHandle
{
};
@@ -78,16 +146,20 @@
/**
* Handle for a pending PSYC transmission operation.
*/
-struct GNUNET_PSYC_SlaveTransmitHandle
+struct GNUNET_PSYC_MasterTransmitHandle
{
-
+ struct GNUNET_PSYC_Master *master;
+ const struct GNUNET_ENV_Environment *env;
+ GNUNET_PSYC_MasterTransmitNotify notify;
+ void *notify_cls;
+ enum GNUNET_PSYC_MasterTransmitFlags flags;
};
/**
- * Handle to access PSYC channel operations for both the master and slaves.
+ * Handle for a pending PSYC transmission operation.
*/
-struct GNUNET_PSYC_Channel
+struct GNUNET_PSYC_SlaveTransmitHandle
{
};
@@ -102,48 +174,267 @@
};
+/**
+ * Handle for a state query operation.
+ */
struct GNUNET_PSYC_StateQuery
{
};
-/**
- * Function to call with the decision made for a join request.
+/**
+ * Try again to connect to the PSYCstore service.
*
- * Must be called once and only once in response to an invocation of the
- * #GNUNET_PSYC_JoinCallback.
+ * @param cls handle to the PSYCstore service.
+ * @param tc scheduler context
+ */
+static void
+reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Reschedule a connect attempt to the service.
*
- * @param jh Join request handle.
- * @param is_admitted #GNUNET_YES if joining is approved,
- * #GNUNET_NO if it is disapproved.
- * @param relay_count Number of relays given.
- * @param relays Array of suggested peers that might be useful relays to use
- * when joining the multicast group (essentially a list of peers that
- * are already part of the multicast group and might thus be willing
- * to help with routing). If empty, only this local peer (which must
- * be the multicast origin) is a good candidate for building the
- * multicast tree. Note that it is unnecessary to specify our own
- * peer identity in this array.
- * @param method_name Method name for the message transmitted with the
response.
- * @param env Environment containing transient variables for the message, or
NULL.
- * @param data Data of the message.
- * @param data_size Size of @a data.
+ * @param h transport service to reconnect
*/
-void
-GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
- int is_admitted,
- unsigned int relay_count,
- const struct GNUNET_PeerIdentity *relays,
- const char *method_name,
- const struct GNUNET_ENV_Environment *env,
- const void *data,
- size_t data_size)
+static void
+reschedule_connect (struct GNUNET_PSYC_Channel *c)
{
+ GNUNET_assert (c->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
+ if (NULL != c->th)
+ {
+ GNUNET_CLIENT_notify_transmit_ready_cancel (c->th);
+ c->th = NULL;
+ }
+ if (NULL != c->client)
+ {
+ GNUNET_CLIENT_disconnect (c->client);
+ c->client = NULL;
+ }
+ c->in_receive = GNUNET_NO;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Scheduling task to reconnect to PSYCstore service in %s.\n",
+ GNUNET_STRINGS_relative_time_to_string (c->reconnect_delay,
GNUNET_YES));
+ c->reconnect_task =
+ GNUNET_SCHEDULER_add_delayed (c->reconnect_delay, &reconnect, c);
+ c->reconnect_delay = GNUNET_TIME_STD_BACKOFF (c->reconnect_delay);
}
+/**
+ * Schedule transmission of the next message from our queue.
+ *
+ * @param h PSYCstore handle
+ */
+static void
+transmit_next (struct GNUNET_PSYC_Channel *c);
+
+
+/**
+ * Type of a function to call when we receive a message
+ * from the service.
+ *
+ * @param cls closure
+ * @param msg message received, NULL on timeout or fatal error
+ */
+static void
+message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_PSYC_Channel *ch = cls;
+ struct GNUNET_PSYC_Master *mst = cls;
+ struct GNUNET_PSYC_Slave *slv = cls;
+
+ if (NULL == msg)
+ {
+ reschedule_connect (ch);
+ return;
+ }
+ uint16_t size_eq = 0;
+ uint16_t size_min = 0;
+ const uint16_t size = ntohs (msg->size);
+ const uint16_t type = ntohs (msg->type);
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message of type %d from PSYC service\n", type);
+
+ switch (type)
+ {
+ case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK:
+ case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
+ size_eq = sizeof (struct CountersResult);
+ break;
+ }
+
+ if (! ((0 < size_eq && size == size_eq)
+ || (0 < size_min && size >= size_min)))
+ {
+ GNUNET_break (0);
+ reschedule_connect (ch);
+ return;
+ }
+
+ struct CountersResult *cres;
+
+ switch (type)
+ {
+ case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK:
+ cres = (struct CountersResult *) msg;
+ mst->max_message_id = GNUNET_ntohll (cres->max_message_id);
+ if (NULL != mst->start_cb)
+ mst->start_cb (ch->cb_cls, mst->max_message_id);
+ break;
+
+ case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
+ cres = (struct CountersResult *) msg;
+#if TODO
+ slv->max_message_id = GNUNET_ntohll (cres->max_message_id);
+ if (NULL != slv->join_ack_cb)
+ mst->join_ack_cb (ch->cb_cls, mst->max_message_id);
+#endif
+ break;
+ }
+
+ GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
+ GNUNET_TIME_UNIT_FOREVER_REL);
+}
+
+
+/**
+ * Transmit next message to service.
+ *
+ * @param cls The 'struct GNUNET_PSYCSTORE_Handle'.
+ * @param size Number of bytes available in buf.
+ * @param buf Where to copy the message.
+ * @return Number of bytes copied to buf.
+ */
+static size_t
+send_next_message (void *cls, size_t size, void *buf)
+{
+ struct GNUNET_PSYC_Channel *ch = cls;
+ struct OperationHandle *op = ch->transmit_head;
+ size_t ret;
+
+ ch->th = NULL;
+ if (NULL == op->msg)
+ return 0;
+ ret = ntohs (op->msg->size);
+ if (ret > size)
+ {
+ reschedule_connect (ch);
+ return 0;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending message of type %d to PSYCstore service\n",
+ ntohs (op->msg->type));
+ memcpy (buf, op->msg, ret);
+
+ GNUNET_CONTAINER_DLL_remove (ch->transmit_head, ch->transmit_tail, op);
+ GNUNET_free (op);
+
+ if (NULL != ch->transmit_head)
+ transmit_next (ch);
+
+ if (GNUNET_NO == ch->in_receive)
+ {
+ ch->in_receive = GNUNET_YES;
+ GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
+ GNUNET_TIME_UNIT_FOREVER_REL);
+ }
+ return ret;
+}
+
+
+/**
+ * Schedule transmission of the next message from our queue.
+ *
+ * @param h PSYCstore handle.
+ */
+static void
+transmit_next (struct GNUNET_PSYC_Channel *ch)
+{
+ if (NULL != ch->th || NULL == ch->client)
+ return;
+
+ struct OperationHandle *op = ch->transmit_head;
+ if (NULL == op)
+ return;
+
+ ch->th = GNUNET_CLIENT_notify_transmit_ready (ch->client,
+ ntohs (op->msg->size),
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ GNUNET_NO,
+ &send_next_message,
+ ch);
+}
+
+
+/**
+ * Try again to connect to the PSYC service.
+ *
+ * @param cls Channel handle.
+ * @param tc Scheduler context.
+ */
+static void
+reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_PSYC_Channel *ch = cls;
+
+ ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Connecting to PSYC service.\n");
+ GNUNET_assert (NULL == ch->client);
+ ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg);
+ GNUNET_assert (NULL != ch->client);
+
+ if (NULL == ch->transmit_head ||
+ ch->transmit_head->msg->type != ch->reconnect_msg->type)
+ {
+ struct OperationHandle *op
+ = GNUNET_malloc (sizeof (struct OperationHandle)
+ + ntohs (ch->reconnect_msg->size));
+ memcpy (&op[1], ch->reconnect_msg, ntohs (ch->reconnect_msg->size));
+ op->msg = (struct GNUNET_MessageHeader *) &op[1];
+ GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
+ }
+
+ transmit_next (ch);
+}
+
+
+/**
+ * Disconnect from the PSYC service.
+ *
+ * @param cls Channel handle.
+ * @param tc Scheduler context.
+ */
+static void
+disconnect (void *c)
+{
+ struct GNUNET_PSYC_Channel *ch = c;
+ GNUNET_assert (NULL != ch);
+ GNUNET_assert (ch->transmit_head == ch->transmit_tail);
+ if (ch->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (ch->reconnect_task);
+ ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ if (NULL != ch->th)
+ {
+ GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th);
+ ch->th = NULL;
+ }
+ if (NULL != ch->client)
+ {
+ GNUNET_CLIENT_disconnect (ch->client);
+ ch->client = NULL;
+ }
+ if (NULL != ch->reconnect_msg)
+ ch->reconnect_msg = NULL;
+}
+
+
/**
* Start a PSYC master channel.
*
@@ -177,57 +468,172 @@
enum GNUNET_PSYC_Policy policy,
GNUNET_PSYC_Method method,
GNUNET_PSYC_JoinCallback join_cb,
+ GNUNET_PSYC_MasterStartCallback master_started_cb,
void *cls)
{
- return NULL;
+ struct GNUNET_PSYC_Master *mst = GNUNET_malloc (sizeof (*mst));
+ struct GNUNET_PSYC_Channel *ch = &mst->ch;
+ struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req));
+
+ req->header.size = htons (sizeof (*req) + sizeof (*channel_key));
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START);
+ req->channel_key = *channel_key;
+ req->policy = policy;
+
+ ch->cfg = cfg;
+ ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
+ ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
+ ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst);
+
+ ch->method_cb = method;
+ ch->join_cb = join_cb;
+ ch->cb_cls = cls;
+ mst->start_cb = master_started_cb;
+
+ return mst;
}
/**
+ * Stop a PSYC master channel.
+ *
+ * @param master PSYC channel master to stop.
+ */
+void
+GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst)
+{
+ disconnect (mst);
+ GNUNET_free (mst);
+}
+
+
+/**
+ * Function to call with the decision made for a join request.
+ *
+ * Must be called once and only once in response to an invocation of the
+ * #GNUNET_PSYC_JoinCallback.
+ *
+ * @param jh Join request handle.
+ * @param is_admitted #GNUNET_YES if joining is approved,
+ * #GNUNET_NO if it is disapproved.
+ * @param relay_count Number of relays given.
+ * @param relays Array of suggested peers that might be useful relays to use
+ * when joining the multicast group (essentially a list of peers that
+ * are already part of the multicast group and might thus be willing
+ * to help with routing). If empty, only this local peer (which must
+ * be the multicast origin) is a good candidate for building the
+ * multicast tree. Note that it is unnecessary to specify our own
+ * peer identity in this array.
+ * @param method_name Method name for the message transmitted with the
response.
+ * @param env Environment containing transient variables for the message, or
NULL.
+ * @param data Data of the message.
+ * @param data_size Size of @a data.
+ */
+void
+GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
+ int is_admitted,
+ unsigned int relay_count,
+ const struct GNUNET_PeerIdentity *relays,
+ const char *method_name,
+ const struct GNUNET_ENV_Environment *env,
+ const void *data,
+ size_t data_size)
+{
+
+}
+
+
+/* FIXME: split up value into <64K chunks and transmit the continuations in
+ * MOD_CONT msgs */
+int
+send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod)
+{
+ struct GNUNET_PSYC_Channel *ch = cls;
+ size_t name_size = strlen (mod->name) + 1;
+ struct GNUNET_PSYC_MessageModifier *pmod;
+ struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*pmod)
+ + name_size + mod->value_size);
+ pmod = (struct GNUNET_PSYC_MessageModifier *) &op[1];
+ op->msg = (struct GNUNET_MessageHeader *) pmod;
+
+ pmod->header.type = GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MODIFIER;
+ pmod->header.size = htons (sizeof (*pmod) + name_size + mod->value_size);
+ pmod->name_size = htons (name_size);
+ memcpy (&pmod[1], mod->name, name_size);
+ memcpy ((void *) &pmod[1] + name_size, mod->value, mod->value_size);
+
+ GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
+ return GNUNET_YES;
+}
+
+
+/**
* Send a message to call a method to all members in the PSYC channel.
*
- * @param master Handle to the PSYC channel.
+ * @param mst Handle to the PSYC channel.
* @param method_name Which method should be invoked.
* @param env Environment containing state operations and transient variables
* for the message, or NULL.
* @param notify Function to call to obtain the arguments.
* @param notify_cls Closure for @a notify.
* @param flags Flags for the message being transmitted.
- * @return Transmission handle, NULL on error (i.e. more than one request
queued).
+ * @return Transmission handle, NULL on error (i.e. more than one request
+ * queued).
*/
struct GNUNET_PSYC_MasterTransmitHandle *
-GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
+GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *mst,
const char *method_name,
const struct GNUNET_ENV_Environment *env,
GNUNET_PSYC_MasterTransmitNotify notify,
void *notify_cls,
enum GNUNET_PSYC_MasterTransmitFlags flags)
{
- return NULL;
-}
+ GNUNET_assert (NULL != mst);
+ struct GNUNET_PSYC_Channel *ch = &mst->ch;
+ if (GNUNET_NO != ch->in_transmit)
+ return NULL;
+ ch->in_transmit = GNUNET_YES;
+ struct GNUNET_PSYC_MessageMethod *pmeth;
+ struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth));
+ pmeth = (struct GNUNET_PSYC_MessageMethod *) &op[1];
+ op->msg = (struct GNUNET_MessageHeader *) pmeth;
-/**
- * Abort transmission request to channel.
- *
- * @param th Handle of the request that is being aborted.
- */
-void
-GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle
*th)
-{
+ pmeth->header.type = GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_METHOD;
+ size_t size = strlen (method_name) + 1;
+ pmeth->header.size = htons (sizeof (*pmeth) + size);
+ pmeth->flags = htonl (flags);
+ pmeth->mod_count
+ = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env));
+ memcpy (&pmeth[1], method_name, size);
+ GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
+
+ GNUNET_ENV_environment_iterate (env, send_modifier, mst);
+
+ struct GNUNET_PSYC_MasterTransmitHandle *th = GNUNET_malloc (sizeof (*th));
+ th->master = mst;
+ th->env = env;
+ th->notify = notify;
+ th->notify_cls = notify_cls;
+ return th;
}
/**
- * Stop a PSYC master channel.
+ * Abort transmission request to the channel.
*
- * @param master PSYC channel master to stop.
+ * @param th Handle of the request that is being aborted.
*/
void
-GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master)
+GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle
*th)
{
+ struct GNUNET_PSYC_Master *mst = th->master;
+ struct GNUNET_PSYC_Channel *ch = &mst->ch;
+ if (GNUNET_NO != ch->in_transmit)
+ return;
+
}
@@ -235,7 +641,7 @@
* Join a PSYC channel.
*
* The entity joining is always the local peer. The user must immediately use
- * the GNUNET_PSYC_slave_to_master() functions to transmit a @e join_msg to the
+ * the GNUNET_PSYC_slave_transmit() functions to transmit a @e join_msg to the
* channel; if the join request succeeds, the channel state (and @e recent
* method calls) will be replayed to the joining member. There is no explicit
* notification on failure (as the channel may simply take days to approve,
@@ -269,13 +675,32 @@
const struct GNUNET_PeerIdentity *relays,
GNUNET_PSYC_Method method,
GNUNET_PSYC_JoinCallback join_cb,
+ GNUNET_PSYC_SlaveJoinCallback slave_joined_cb,
void *cls,
const char *method_name,
const struct GNUNET_ENV_Environment *env,
const void *data,
size_t data_size)
{
- return NULL;
+ struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv));
+ struct GNUNET_PSYC_Channel *ch = &slv->ch;
+ struct SlaveJoinRequest *req = GNUNET_malloc (sizeof (*req));
+
+ req->header.size = htons (sizeof (*req)
+ + sizeof (*channel_key) + sizeof (*slave_key)
+ + relay_count * sizeof (*relays));
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN);
+ req->channel_key = *channel_key;
+ req->slave_key = *slave_key;
+ req->relay_count = relay_count;
+ memcpy (&req[1], relays, relay_count * sizeof (*relays));
+
+ ch->cfg = cfg;
+ ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
+ ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
+ ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv);
+
+ return slv;
}
@@ -283,14 +708,15 @@
* Part a PSYC channel.
*
* Will terminate the connection to the PSYC service. Polite clients should
- * first explicitly send a @e part request (via GNUNET_PSYC_slave_to_master()).
+ * first explicitly send a @e part request (via GNUNET_PSYC_slave_transmit()).
*
- * @param slave Slave handle.
+ * @param slv Slave handle.
*/
void
-GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave)
+GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv)
{
-
+ disconnect (slv);
+ GNUNET_free (slv);
}
@@ -299,11 +725,13 @@
*
* @param slave Slave handle.
* @param method_name Which (PSYC) method should be invoked (on host).
- * @param env Environment containing transient variables for the message, or
NULL.
+ * @param env Environment containing transient variables for the message, or
+ * NULL.
* @param notify Function to call when we are allowed to transmit (to get
data).
* @param notify_cls Closure for @a notify.
* @param flags Flags for the message being transmitted.
- * @return Transmission handle, NULL on error (i.e. more than one request
queued).
+ * @return Transmission handle, NULL on error (i.e. more than one request
+ * queued).
*/
struct GNUNET_PSYC_SlaveTransmitHandle *
GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
@@ -330,7 +758,8 @@
/**
- * Convert a channel @a master to a @e channel handle to access the @e channel
APIs.
+ * Convert a channel @a master to a @e channel handle to access the @e channel
+ * APIs.
*
* @param master Channel master handle.
* @return Channel handle, valid for as long as @a master is valid.
@@ -338,7 +767,7 @@
struct GNUNET_PSYC_Channel *
GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
{
- return NULL;
+ return (struct GNUNET_PSYC_Channel *) master;
}
@@ -351,7 +780,7 @@
struct GNUNET_PSYC_Channel *
GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave)
{
- return NULL;
+ return (struct GNUNET_PSYC_Channel *) slave;
}
@@ -371,18 +800,30 @@
* correctly; not doing so correctly will result in either denying other slaves
* access or offering access to channel data to non-members.
*
- * @param channel Channel handle.
+ * @param ch Channel handle.
* @param slave_key Identity of channel slave to add.
* @param announced_at ID of the message that announced the membership change.
* @param effective_since Addition of slave is in effect since this message ID.
*/
void
-GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel,
- const struct GNUNET_CRYPTO_EccPublicSignKey
*slave_key,
+GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *ch,
+ const struct GNUNET_CRYPTO_EccPublicSignKey
+ *slave_key,
uint64_t announced_at,
uint64_t effective_since)
{
+ struct ChannelSlaveAdd *slvadd;
+ struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*slvadd));
+ slvadd = (struct ChannelSlaveAdd *) &op[1];
+ op->msg = (struct GNUNET_MessageHeader *) slvadd;
+ slvadd->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD;
+ slvadd->header.size = htons (sizeof (*slvadd));
+ slvadd->announced_at = GNUNET_htonll (announced_at);
+ slvadd->effective_since = GNUNET_htonll (effective_since);
+
+ GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
+ transmit_next (ch);
}
@@ -403,16 +844,27 @@
* denying members access or offering access to channel data to
* non-members.
*
- * @param channel Channel handle.
+ * @param ch Channel handle.
* @param slave_key Identity of channel slave to remove.
* @param announced_at ID of the message that announced the membership change.
*/
void
-GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel,
- const struct GNUNET_CRYPTO_EccPublicSignKey
*slave_key,
+GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *ch,
+ const struct GNUNET_CRYPTO_EccPublicSignKey
+ *slave_key,
uint64_t announced_at)
{
+ struct ChannelSlaveRemove *slvrm;
+ struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*slvrm));
+ slvrm = (struct ChannelSlaveRemove *) &op[1];
+ op->msg = (struct GNUNET_MessageHeader *) slvrm;
+ slvrm->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM;
+ slvrm->header.size = htons (sizeof (*slvrm));
+ slvrm->announced_at = GNUNET_htonll (announced_at);
+
+ GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
+ transmit_next (ch);
}
@@ -424,7 +876,7 @@
*
* To get the latest message, use 0 for both the start and end message ID.
*
- * @param channel Which channel should be replayed?
+ * @param ch Which channel should be replayed?
* @param start_message_id Earliest interesting point in history.
* @param end_message_id Last (exclusive) interesting point in history.
* @param method Function to invoke on messages received from the story.
@@ -441,7 +893,7 @@
* @return Handle to cancel story telling operation.
*/
struct GNUNET_PSYC_Story *
-GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel,
+GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *ch,
uint64_t start_message_id,
uint64_t end_message_id,
GNUNET_PSYC_Method method,
@@ -495,9 +947,9 @@
/**
* Return all channel state variables whose name matches a given prefix.
*
- * A name matches if it starts with the given @a name_prefix, thus requesting
the
- * empty prefix ("") will match all values; requesting "_a_b" will also return
- * values stored under "_a_b_c".
+ * A name matches if it starts with the given @a name_prefix, thus requesting
+ * the empty prefix ("") will match all values; requesting "_a_b" will also
+ * return values stored under "_a_b_c".
*
* The @a state_cb is invoked on all matching state variables asynchronously,
as
* the state is stored in and retrieved from the PSYCstore,
Modified: gnunet/src/psyc/test_psyc.c
===================================================================
--- gnunet/src/psyc/test_psyc.c 2013-09-25 17:46:03 UTC (rev 29563)
+++ gnunet/src/psyc/test_psyc.c 2013-09-25 17:46:06 UTC (rev 29564)
@@ -28,12 +28,12 @@
#include "platform.h"
#include "gnunet_common.h"
#include "gnunet_util_lib.h"
-#include "gnunet_psycstore_service.h"
#include "gnunet_testing_lib.h"
+#include "gnunet_psyc_service.h"
#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
-#define DEBUG_SERVICE 0
+#define DEBUG_SERVICE 1
/**
@@ -41,18 +41,34 @@
*/
static int res;
+static const struct GNUNET_CONFIGURATION_Handle *cfg;
+
/**
* Handle for task for timeout termination.
*/
static GNUNET_SCHEDULER_TaskIdentifier end_badly_task;
+static struct GNUNET_PSYC_Master *mst;
+static struct GNUNET_PSYC_Slave *slv;
+static struct GNUNET_PSYC_Channel *ch;
+static struct GNUNET_CRYPTO_EccPrivateKey *channel_key;
+static struct GNUNET_CRYPTO_EccPrivateKey *slave_key;
+
+static struct GNUNET_CRYPTO_EccPublicSignKey channel_pub_key;
+static struct GNUNET_CRYPTO_EccPublicSignKey slave_pub_key;
+
/**
* Clean up all resources used.
*/
static void
cleanup ()
{
+ if (master != NULL)
+ {
+ GNUNET_PSYC_master_stop (master);
+ master = NULL;
+ }
GNUNET_SCHEDULER_shutdown ();
}
@@ -100,6 +116,42 @@
&end_normally, NULL);
}
+
+static int
+method (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key,
+ uint64_t message_id, const char *method_name,
+ size_t modifier_count, const struct GNUNET_ENV_Modifier *modifiers,
+ uint64_t data_offset, const void *data, size_t data_size,
+ enum GNUNET_PSYC_MessageFlags flags)
+{
+ return GNUNET_OK;
+}
+
+
+static int
+join (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key,
+ const char *method_name,
+ size_t variable_count, const struct GNUNET_ENV_Modifier *variables,
+ const void *data, size_t data_size, struct GNUNET_PSYC_JoinHandle *jh)
+{
+ return GNUNET_OK;
+}
+
+
+void
+master_started (void *cls, uint64_t max_message_id)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master started: %lu\n",
max_message_id);
+}
+
+
+void
+slave_joined (void *cls, uint64_t max_message_id)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave joined: %lu\n",
max_message_id);
+}
+
+
/**
* Main function of the test, run from scheduler.
*
@@ -110,14 +162,28 @@
static void
#if DEBUG_SERVICE
run (void *cls, char *const *args, const char *cfgfile,
- const struct GNUNET_CONFIGURATION_Handle *cfg)
+ const struct GNUNET_CONFIGURATION_Handle *c)
#else
run (void *cls,
- const struct GNUNET_CONFIGURATION_Handle *cfg,
+ const struct GNUNET_CONFIGURATION_Handle *c,
struct GNUNET_TESTING_Peer *peer)
#endif
{
+ cfg = c;
end_badly_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, &end_badly, NULL);
+
+ channel_key = GNUNET_CRYPTO_ecc_key_create ();
+ slave_key = GNUNET_CRYPTO_ecc_key_create ();
+
+ GNUNET_CRYPTO_ecc_key_get_public_for_signature (channel_key,
&channel_pub_key);
+ GNUNET_CRYPTO_ecc_key_get_public_for_signature (slave_key, &slave_pub_key);
+
+ mst = GNUNET_PSYC_master_start (cfg, channel_key,
+ GNUNET_PSYC_CHANNEL_PRIVATE,
+ &method, &join, &master_started, NULL);
+
+ slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key,
+ &method, &join, &slave_joined, NULL);
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r29564 - in gnunet/src: include psyc,
gnunet <=