[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r32575 - in gnunet/src: include multicast psyc psycstore
From: |
gnunet |
Subject: |
[GNUnet-SVN] r32575 - in gnunet/src: include multicast psyc psycstore |
Date: |
Fri, 7 Mar 2014 00:46:45 +0100 |
Author: tg
Date: 2014-03-07 00:46:45 +0100 (Fri, 07 Mar 2014)
New Revision: 32575
Added:
gnunet/src/psyc/psyc_common.c
Modified:
gnunet/src/include/gnunet_multicast_service.h
gnunet/src/include/gnunet_protocols.h
gnunet/src/include/gnunet_psyc_service.h
gnunet/src/include/gnunet_social_service.h
gnunet/src/multicast/gnunet-service-multicast.c
gnunet/src/multicast/multicast.h
gnunet/src/multicast/multicast_api.c
gnunet/src/psyc/Makefile.am
gnunet/src/psyc/gnunet-service-psyc.c
gnunet/src/psyc/psyc.h
gnunet/src/psyc/psyc_api.c
gnunet/src/psyc/test_psyc.c
gnunet/src/psycstore/plugin_psycstore_sqlite.c
Log:
PSYC: implement slave to master requests, tests, fixes, reorg
Multicast lib: handle member to origin requests.
Keep track of members and origins and call their callbacks when necessary.
Modified: gnunet/src/include/gnunet_multicast_service.h
===================================================================
--- gnunet/src/include/gnunet_multicast_service.h 2014-03-06 23:46:42 UTC
(rev 32574)
+++ gnunet/src/include/gnunet_multicast_service.h 2014-03-06 23:46:45 UTC
(rev 32575)
@@ -160,8 +160,62 @@
/* Followed by message body. */
};
+
+/**
+ * Header of a request from a member to the origin.
+ */
+struct GNUNET_MULTICAST_RequestHeader
+{
+ /**
+ * Header for all requests from a member to the origin.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Public key of the sending member.
+ */
+ struct GNUNET_CRYPTO_EddsaPublicKey member_key;
+
+ /**
+ * ECC signature of the request fragment.
+ *
+ * Signature must match the public key of the multicast group.
+ */
+ struct GNUNET_CRYPTO_EddsaSignature signature;
+
+ /**
+ * Purpose for the signature and size of the signed data.
+ */
+ struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
+
+ /**
+ * Number of the request fragment, monotonically increasing.
+ */
+ uint64_t fragment_id GNUNET_PACKED;
+
+ /**
+ * Byte offset of this @e fragment of the @e request.
+ */
+ uint64_t fragment_offset GNUNET_PACKED;
+
+ /**
+ * Number of the request this fragment belongs to.
+ *
+ * Set in GNUNET_MULTICAST_origin_to_all().
+ */
+ uint64_t request_id GNUNET_PACKED;
+
+ /**
+ * Flags for this request.
+ */
+ enum GNUNET_MULTICAST_MessageFlags flags GNUNET_PACKED;
+
+ /* Followed by request body. */
+};
+
GNUNET_NETWORK_STRUCT_END
+
/**
* Maximum size of a multicast message fragment.
*/
@@ -492,7 +546,7 @@
* @param next_fragment_id Next fragment ID to continue counting fragments from
* when restarting the origin. 1 for a new group.
* @param join_cb Function called to approve / disapprove joining of a peer.
- * @param mem_test_cb Function multicast can use to test group membership.
+ * @param member_test_cb Function multicast can use to test group membership.
* @param replay_frag_cb Function that can be called to replay a message
fragment.
* @param replay_msg_cb Function that can be called to replay a message.
* @param request_cb Function called with message fragments from group members.
@@ -507,7 +561,7 @@
const struct GNUNET_CRYPTO_EddsaPrivateKey
*priv_key,
uint64_t next_fragment_id,
GNUNET_MULTICAST_JoinCallback join_cb,
- GNUNET_MULTICAST_MembershipTestCallback
mem_test_cb,
+ GNUNET_MULTICAST_MembershipTestCallback
member_test_cb,
GNUNET_MULTICAST_ReplayFragmentCallback
replay_frag_cb,
GNUNET_MULTICAST_ReplayMessageCallback
replay_msg_cb,
GNUNET_MULTICAST_RequestCallback request_cb,
@@ -756,14 +810,14 @@
* Send a message to the origin of the multicast group.
*
* @param member Membership handle.
- * @param message_id Application layer ID for the message. Opaque to
multicast.
+ * @param request_id Application layer ID for the request. Opaque to
multicast.
* @param notify Callback to call to get the message.
* @param notify_cls Closure for @a notify.
* @return Handle to cancel request, NULL on error (i.e. request already
pending).
*/
struct GNUNET_MULTICAST_MemberRequestHandle *
GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *member,
- uint64_t message_id,
+ uint64_t request_id,
GNUNET_MULTICAST_MemberTransmitNotify
notify,
void *notify_cls);
Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h 2014-03-06 23:46:42 UTC (rev
32574)
+++ gnunet/src/include/gnunet_protocols.h 2014-03-06 23:46:45 UTC (rev
32575)
@@ -2279,14 +2279,34 @@
/* WIP: no numbers assigned yet */
/**
+ * Start an origin.
+ */
+#define GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START 750
+
+/**
+ * Stop an origin.
+ */
+#define GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_STOP 751
+
+/**
+ * Join a group as a member.
+ */
+#define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN 752
+
+/**
+ * Leave a group.
+ */
+#define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_PART 753
+
+/**
* Multicast message from the origin to all members.
*/
-#define GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE 750
+#define GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE 754
/**
* A unicast message from a group member to the origin.
*/
-#define GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST
+#define GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST 755
/**
* A peer wants to join the group.
@@ -2366,14 +2386,6 @@
/*******************************************************************************
- * PSYC message types
-
******************************************************************************/
-
-/*******************************************************************************
- * PSYCSTORE message types
-
******************************************************************************/
-
-/*******************************************************************************
* SOCIAL message types
******************************************************************************/
Modified: gnunet/src/include/gnunet_psyc_service.h
===================================================================
--- gnunet/src/include/gnunet_psyc_service.h 2014-03-06 23:46:42 UTC (rev
32574)
+++ gnunet/src/include/gnunet_psyc_service.h 2014-03-06 23:46:45 UTC (rev
32575)
@@ -110,7 +110,7 @@
* Past messages are only available to slaves who were admitted at the time
* they were sent to the channel.
*/
- GNUNET_PSYC_CHANNEL_RESTRICTED_HISTORY = 1 << 1,
+ GNUNET_PSYC_CHANNEL_RESTRICTED_HISTORY = 1 << 1
};
/**
@@ -132,7 +132,7 @@
*/
GNUNET_PSYC_CHANNEL_PRIVATE
= GNUNET_PSYC_CHANNEL_ADMISSION_CONTROL
- | GNUNET_PSYC_CHANNEL_RESTRICTED_HISTORY,
+ | GNUNET_PSYC_CHANNEL_RESTRICTED_HISTORY
#if IDEAS_FOR_FUTURE
/**
@@ -152,9 +152,7 @@
*/
GNUNET_PSYC_CHANNEL_CLOSED
= GNUNET_PSYC_CHANNEL_ADMISSION_CONTROL,
-,
#endif
-
};
@@ -163,7 +161,12 @@
/**
* Historic message, retrieved from PSYCstore.
*/
- GNUNET_PSYC_MESSAGE_HISTORIC = 1
+ GNUNET_PSYC_MESSAGE_HISTORIC = 1 << 0,
+
+ /**
+ * Request from slave to master.
+ */
+ GNUNET_PSYC_MESSAGE_REQUEST = 1 << 1
};
GNUNET_NETWORK_STRUCT_BEGIN
@@ -406,7 +409,7 @@
/**
* Function called to provide data for a transmission via PSYC.
*
- * Note that returning #GNUNET_OK or #GNUNET_SYSERR (but not #GNUNET_NO)
+ * Note that returning #GNUNET_YES or #GNUNET_SYSERR (but not #GNUNET_NO)
* invalidates the respective transmission handle.
*
* @param cls Closure.
@@ -422,15 +425,43 @@
* #GNUNET_YES if this completes the transmission (all data supplied)
*/
typedef int
-(*GNUNET_PSYC_MasterTransmitNotify) (void *cls,
- uint16_t *data_size,
- void *data);
+(*GNUNET_PSYC_TransmitNotifyData) (void *cls,
+ uint16_t *data_size,
+ void *data);
+/**
+ * Function called to provide a modifier for a transmission via PSYC.
+ *
+ * Note that returning #GNUNET_YES or #GNUNET_SYSERR (but not #GNUNET_NO)
+ * invalidates the respective transmission handle.
+ *
+ * @param cls Closure.
+ * @param[in,out] data_size Initially set to the number of bytes available in
+ * @a data, should be set to the number of bytes written to data.
+ * @param[out] data Where to write the modifier's name and value.
+ * The function must copy at most @a data_size bytes to @a data.
+ * When this callback is first called for a modifier, @a data should
+ * contain: "name\0value". If the whole value does not fit, subsequent
+ * calls to this function should write continuations of the value to
+ * @a data.
+ * @param oper Where to write the operator of the modifier. Only needed
during
+ * the first call to this callback at the beginning of the modifier.
+ * In case of subsequent calls asking for value continuations @a oper
is
+ * set to #NULL.
+ * @return #GNUNET_SYSERR on error (fatal, aborts transmission)
+ * #GNUNET_NO on success, if more data is to be transmitted later.
+ * Should be used if @a data_size was not big enough to take all the
+ * data for the modifier's value (the name must be always returned
+ * during the first call to this callback).
+ * If 0 is returned in @a data_size the transmission is paused,
+ * and can be resumed with GNUNET_PSYC_master_transmit_resume().
+ * #GNUNET_YES if this completes the modifier (the whole value is
supplied).
+ */
typedef int
-(*GNUNET_PSYC_MasterTransmitNotifyModifier) (void *cls,
- uint16_t *data_size,
- void *data,
- uint8_t *oper);
+(*GNUNET_PSYC_TransmitNotifyModifier) (void *cls,
+ uint16_t *data_size,
+ void *data,
+ uint8_t *oper);
/**
* Flags for transmitting messages to a channel by the master.
@@ -477,8 +508,8 @@
struct GNUNET_PSYC_MasterTransmitHandle *
GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
const char *method_name,
- GNUNET_PSYC_MasterTransmitNotifyModifier
notify_mod,
- GNUNET_PSYC_MasterTransmitNotify notify_data,
+ GNUNET_PSYC_TransmitNotifyModifier notify_mod,
+ GNUNET_PSYC_TransmitNotifyData notify_data,
void *notify_cls,
enum GNUNET_PSYC_MasterTransmitFlags flags);
@@ -588,29 +619,6 @@
/**
- * Function called to provide data for a transmission to the channel master
- * (a.k.a. the @e host of the channel).
- *
- * Note that returning #GNUNET_OK or #GNUNET_SYSERR (but not #GNUNET_NO)
- * invalidates the respective transmission handle.
- *
- * @param cls Closure.
- * @param[in,out] data_size Initially set to the number of bytes available in
- * @a data, should be set to the number of bytes written to data
- * (IN/OUT).
- * @param[out] data Where to write the body of the message to give to the
method;
- * function must copy at most @a *data_size bytes to @a data.
- * @return #GNUNET_SYSERR on error (fatal, aborts transmission).
- * #GNUNET_NO on success, if more data is to be transmitted later.
- * #GNUNET_YES if this completes the transmission (all data supplied).
- */
-typedef int
-(*GNUNET_PSYC_SlaveTransmitNotify) (void *cls,
- size_t *data_size,
- char *data);
-
-
-/**
* Flags for transmitting messages to the channel master by a slave.
*/
enum GNUNET_PSYC_SlaveTransmitFlags
@@ -630,8 +638,8 @@
*
* @param slave Slave handle.
* @param method_name Which (PSYC) method should be invoked (on host).
- * @param env Environment containing transient variables for the message, or
NULL.
- * @param notify Function to call when we are allowed to transmit (to get
data).
+ * @param notify_mod Function to call to obtain modifiers.
+ * @param notify_data Function to call to obtain fragments of the data.
* @param notify_cls Closure for @a notify.
* @param flags Flags for the message being transmitted.
* @return Transmission handle, NULL on error (i.e. more than one request
queued).
@@ -639,8 +647,8 @@
struct GNUNET_PSYC_SlaveTransmitHandle *
GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
const char *method_name,
- const struct GNUNET_ENV_Environment *env,
- GNUNET_PSYC_SlaveTransmitNotify notify,
+ GNUNET_PSYC_TransmitNotifyModifier notify_mod,
+ GNUNET_PSYC_TransmitNotifyData notify_data,
void *notify_cls,
enum GNUNET_PSYC_SlaveTransmitFlags flags);
Modified: gnunet/src/include/gnunet_social_service.h
===================================================================
--- gnunet/src/include/gnunet_social_service.h 2014-03-06 23:46:42 UTC (rev
32574)
+++ gnunet/src/include/gnunet_social_service.h 2014-03-06 23:46:45 UTC (rev
32575)
@@ -375,12 +375,11 @@
* Convert our home to a place so we can access it via the place API.
*
* @param home Handle for the home.
- * @param keep_active Keep home active after last application disconnected.
* @return Place handle for the same home, valid as long as @a home is valid;
* do NOT try to GNUNET_SOCIAL_place_leave() this place, it's your
home!
*/
struct GNUNET_SOCIAL_Place *
-GNUNET_SOCIAL_home_get_place (struct GNUNET_SOCIAL_Home *home, int
keep_active);
+GNUNET_SOCIAL_home_get_place (struct GNUNET_SOCIAL_Home *home);
/**
@@ -390,9 +389,10 @@
* Guests will be disconnected until the home is restarted.
*
* @param home Home to leave.
+ * @param keep_active Keep home active after last application disconnected.
*/
void
-GNUNET_SOCIAL_home_leave (struct GNUNET_SOCIAL_Home *home);
+GNUNET_SOCIAL_home_leave (struct GNUNET_SOCIAL_Home *home, int keep_active);
/**
* Request entry to a place (home hosted by someone else).
Modified: gnunet/src/multicast/gnunet-service-multicast.c
===================================================================
--- gnunet/src/multicast/gnunet-service-multicast.c 2014-03-06 23:46:42 UTC
(rev 32574)
+++ gnunet/src/multicast/gnunet-service-multicast.c 2014-03-06 23:46:45 UTC
(rev 32575)
@@ -41,6 +41,71 @@
/**
+ * Handle a connecting client starting an origin.
+ */
+static void
+handle_origin_start (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+
+}
+
+
+/**
+ * Handle a client stopping an origin.
+ */
+static void
+handle_origin_stop (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+
+}
+
+
+/**
+ * Handle a connecting client joining a group.
+ */
+static void
+handle_member_join (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+
+}
+
+
+/**
+ * Handle a client parting a group.
+ */
+static void
+handle_member_part (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+
+}
+
+
+/**
+ * Incoming message from a client.
+ */
+static void
+handle_multicast_message (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+
+}
+
+
+/**
+ * Incoming request from a client.
+ */
+static void
+handle_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+
+}
+
+/**
* Process multicast requests.
*
* @param cls closure
@@ -52,7 +117,24 @@
const struct GNUNET_CONFIGURATION_Handle *cfg)
{
static const struct GNUNET_SERVER_MessageHandler handlers[] = {
- /* FIXME: add handlers here! */
+ { &handle_origin_start, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START, 0 },
+
+ { &handle_origin_stop, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_STOP, 0 },
+
+ { &handle_member_join, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN, 0 },
+
+ { &handle_member_part, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_PART, 0 },
+
+ { &handle_multicast_message, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 },
+
+ { &handle_multicast_request, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 },
+
{NULL, NULL, 0, 0}
};
/* FIXME: do setup here */
Modified: gnunet/src/multicast/multicast.h
===================================================================
--- gnunet/src/multicast/multicast.h 2014-03-06 23:46:42 UTC (rev 32574)
+++ gnunet/src/multicast/multicast.h 2014-03-06 23:46:45 UTC (rev 32575)
@@ -22,6 +22,7 @@
* @file multicast/multicast.h
* @brief multicast IPC messages
* @author Christian Grothoff
+ * @author Gabor X Toth
*/
#ifndef MULTICAST_H
#define MULTICAST_H
@@ -30,12 +31,52 @@
/**
+ * Header of a join request sent to the origin or another member.
+ */
+struct GNUNET_MULTICAST_JoinRequest
+{
+ /**
+ * Header for the join request.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * ECC signature of the rest of the fields of the join request.
+ *
+ * Signature must match the public key of the joining member.
+ */
+ struct GNUNET_CRYPTO_EddsaSignature signature;
+
+ /**
+ * Purpose for the signature and size of the signed data.
+ */
+ struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
+
+ /**
+ * Public key of the target group.
+ */
+ struct GNUNET_CRYPTO_EddsaPublicKey group_key;
+
+ /**
+ * Public key of the joining member.
+ */
+ struct GNUNET_CRYPTO_EddsaPublicKey member_key;
+
+ /**
+ * Peer identity of the joining member.
+ */
+ struct GNUNET_PeerIdentity member_peer;
+
+ /* Followed by request body. */
+};
+
+
+/**
* Message sent from the client to the service to notify the service
* about a join decision.
*/
struct MulticastJoinDecisionMessage
{
-
/**
*
*/
@@ -329,9 +370,6 @@
};
-
-
-
GNUNET_NETWORK_STRUCT_END
#endif
Modified: gnunet/src/multicast/multicast_api.c
===================================================================
--- gnunet/src/multicast/multicast_api.c 2014-03-06 23:46:42 UTC (rev
32574)
+++ gnunet/src/multicast/multicast_api.c 2014-03-06 23:46:45 UTC (rev
32575)
@@ -34,6 +34,19 @@
/**
+ * Started origins.
+ * Group's pub_key_hash -> struct GNUNET_MULTICAST_Origin
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *origins;
+
+/**
+ * Joined members.
+ * group_key_hash -> struct GNUNET_MULTICAST_Member
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *members;
+
+
+/**
* Handle for a request to send a message to all multicast group members
* (from the origin).
*/
@@ -49,13 +62,20 @@
};
+struct GNUNET_MULTICAST_Group
+{
+ uint8_t is_origin;
+};
+
/**
* Handle for the origin of a multicast group.
*/
struct GNUNET_MULTICAST_Origin
{
+ struct GNUNET_MULTICAST_Group grp;
+
+ struct GNUNET_MULTICAST_OriginMessageHandle msg_handle;
struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
- struct GNUNET_MULTICAST_OriginMessageHandle msg_handle;
GNUNET_MULTICAST_JoinCallback join_cb;
GNUNET_MULTICAST_MembershipTestCallback mem_test_cb;
@@ -66,6 +86,9 @@
void *cls;
uint64_t next_fragment_id;
+
+ struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
+ struct GNUNET_HashCode pub_key_hash;
};
@@ -74,123 +97,176 @@
*/
struct GNUNET_MULTICAST_MemberRequestHandle
{
+ GNUNET_MULTICAST_MemberTransmitNotify notify;
+ void *notify_cls;
+ struct GNUNET_MULTICAST_Member *member;
+
+ uint64_t request_id;
+ uint64_t fragment_offset;
};
/**
- * Opaque handle for a multicast group member.
+ * Handle for a multicast group member.
*/
struct GNUNET_MULTICAST_Member
{
+ struct GNUNET_MULTICAST_Group grp;
+
+ struct GNUNET_MULTICAST_MemberRequestHandle req_handle;
+
+ struct GNUNET_CRYPTO_EddsaPublicKey group_key;
+ struct GNUNET_CRYPTO_EddsaPrivateKey member_key;
+ struct GNUNET_PeerIdentity origin;
+ struct GNUNET_PeerIdentity relays;
+ uint32_t relay_count;
+ struct GNUNET_MessageHeader *join_request;
+ GNUNET_MULTICAST_JoinCallback join_cb;
+ GNUNET_MULTICAST_MembershipTestCallback member_test_cb;
+ GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
+ GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
+ GNUNET_MULTICAST_MessageCallback message_cb;
+ void *cls;
+
+ uint64_t next_fragment_id;
+ struct GNUNET_HashCode group_key_hash;
};
-GNUNET_NETWORK_STRUCT_BEGIN
+/**
+ * Handle that identifies a join request.
+ *
+ * Used to match calls to #GNUNET_MULTICAST_JoinCallback to the
+ * corresponding calls to #GNUNET_MULTICAST_join_decision().
+ */
+struct GNUNET_MULTICAST_JoinHandle
+{
+};
+
/**
- * Header of a request from a member to the origin.
+ * Handle to pass back for the answer of a membership test.
*/
-struct GNUNET_MULTICAST_RequestHeader
+struct GNUNET_MULTICAST_MembershipTestHandle
{
- /**
- * Header for all requests from a member to the origin.
- */
- struct GNUNET_MessageHeader header;
+};
- /**
- * Public key of the sending member.
- */
- struct GNUNET_CRYPTO_EddsaPublicKey member_key;
- /**
- * ECC signature of the request fragment.
- *
- * Signature must match the public key of the multicast group.
- */
- struct GNUNET_CRYPTO_EddsaSignature signature;
+/**
+ * Opaque handle to a replay request from the multicast service.
+ */
+struct GNUNET_MULTICAST_ReplayHandle
+{
+};
- /**
- * Purpose for the signature and size of the signed data.
- */
- struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
- /**
- * Number of the request fragment, monotonically increasing.
- */
- uint64_t fragment_id GNUNET_PACKED;
+/**
+ * Handle for a replay request.
+ */
+struct GNUNET_MULTICAST_MemberReplayHandle
+{
+};
- /**
- * Byte offset of this @e fragment of the @e request.
- */
- uint64_t fragment_offset GNUNET_PACKED;
- /**
- * Number of the request this fragment belongs to.
- *
- * Set in GNUNET_MULTICAST_origin_to_all().
- */
- uint64_t request_id GNUNET_PACKED;
+/**
+ * Iterator callback for calling message callbacks for all groups.
+ */
+static int
+message_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash,
+ void *group)
+{
+ const struct GNUNET_MessageHeader *msg = cls;
+ struct GNUNET_MULTICAST_Group *grp = group;
- /**
- * Flags for this request.
- */
- enum GNUNET_MULTICAST_MessageFlags flags GNUNET_PACKED;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Calling message callback for a message of type %u and size
%u.\n",
+ ntohs (msg->type), ntohs (msg->size));
- /* Followed by request body. */
-};
+ if (GNUNET_YES == grp->is_origin)
+ {
+ struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *)
grp;
+ orig->message_cb (orig->cls, msg);
+ }
+ else
+ {
+ struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *)
grp;
+ mem->message_cb (mem->cls, msg);
+ }
+ return GNUNET_YES;
+}
+
+
/**
- * Header of a join request sent to the origin or another member.
+ * Handle a multicast message from the service.
+ *
+ * Call message callbacks of all origins and members of the destination group.
+ *
+ * @param grp Destination group of the message.
+ * @param msg The message.
*/
-struct GNUNET_MULTICAST_JoinRequest
+static void
+handle_multicast_message (struct GNUNET_MULTICAST_Group *grp,
+ const struct GNUNET_MULTICAST_MessageHeader *msg)
{
- /**
- * Header for the join request.
- */
- struct GNUNET_MessageHeader header;
+ struct GNUNET_HashCode *hash;
- /**
- * ECC signature of the rest of the fields of the join request.
- *
- * Signature must match the public key of the joining member.
- */
- struct GNUNET_CRYPTO_EddsaSignature signature;
+ if (GNUNET_YES == grp->is_origin)
+ {
+ struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *)
grp;
+ hash = &orig->pub_key_hash;
+ }
+ else
+ {
+ struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *)
grp;
+ hash = &mem->group_key_hash;
+ }
- /**
- * Purpose for the signature and size of the signed data.
- */
- struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
+ if (origins != NULL)
+ GNUNET_CONTAINER_multihashmap_get_multiple (origins, hash,
message_callback,
+ (void *) msg);
+ if (members != NULL)
+ GNUNET_CONTAINER_multihashmap_get_multiple (members, hash,
message_callback,
+ (void *) msg);
+}
- /**
- * Public key of the target group.
- */
- struct GNUNET_CRYPTO_EddsaPublicKey group_key;
- /**
- * Public key of the joining member.
- */
- struct GNUNET_CRYPTO_EddsaPublicKey member_key;
+/**
+ * Iterator callback for calling request callbacks of origins.
+ */
+static int
+request_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash,
+ void *origin)
+{
+ const struct GNUNET_MULTICAST_RequestHeader *req = cls;
+ struct GNUNET_MULTICAST_Origin *orig = origin;
- /**
- * Peer identity of the joining member.
- */
- struct GNUNET_PeerIdentity member_peer;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Calling request callback for a request of type %u and size
%u.\n",
+ ntohs (req->header.type), ntohs (req->header.size));
- /* Followed by request body. */
-};
+ orig->request_cb (orig->cls, &req->member_key,
+ (const struct GNUNET_MessageHeader *) req, 0);
+ return GNUNET_YES;
+}
-GNUNET_NETWORK_STRUCT_END
-
/**
- * Handle that identifies a join request.
+ * Handle a multicast request from the service.
*
- * Used to match calls to #GNUNET_MULTICAST_JoinCallback to the
- * corresponding calls to #GNUNET_MULTICAST_join_decision().
+ * Call request callbacks of all origins of the destination group.
+ *
+ * @param grp Destination group of the message.
+ * @param msg The message.
*/
-struct GNUNET_MULTICAST_JoinHandle
+static void
+handle_multicast_request (const struct GNUNET_HashCode *group_key_hash,
+ const struct GNUNET_MULTICAST_RequestHeader *req)
{
-};
+ if (NULL != origins)
+ GNUNET_CONTAINER_multihashmap_get_multiple (origins, group_key_hash,
+ request_callback, (void *)
req);
+}
/**
@@ -227,14 +303,6 @@
/**
- * Handle to pass back for the answer of a membership test.
- */
-struct GNUNET_MULTICAST_MembershipTestHandle
-{
-};
-
-
-/**
* Call informing multicast about the decision taken for a membership test.
*
* @param mth Handle that was given for the query.
@@ -249,14 +317,6 @@
/**
- * Opaque handle to a replay request from the multicast service.
- */
-struct GNUNET_MULTICAST_ReplayHandle
-{
-};
-
-
-/**
* Replay a message fragment for the multicast group.
*
* @param rh Replay handle identifying which replay operation was requested.
@@ -340,6 +400,7 @@
void *cls)
{
struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig));
+ orig->grp.is_origin = GNUNET_YES;
orig->priv_key = *priv_key;
orig->next_fragment_id = next_fragment_id;
orig->join_cb = join_cb;
@@ -349,11 +410,38 @@
orig->request_cb = request_cb;
orig->message_cb = message_cb;
orig->cls = cls;
+
+ GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &orig->pub_key);
+ GNUNET_CRYPTO_hash (&orig->pub_key, sizeof (orig->pub_key),
+ &orig->pub_key_hash);
+
+ if (NULL == origins)
+ origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+
+ GNUNET_CONTAINER_multihashmap_put (origins, &orig->pub_key_hash, orig,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+
+ /* FIXME: send ORIGIN_START to service */
+
return orig;
}
-/* FIXME: for now just send back to the client what it sent. */
+/**
+ * Stop a multicast group.
+ *
+ * @param origin Multicast group to stop.
+ */
+void
+GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig)
+{
+ GNUNET_CONTAINER_multihashmap_remove (origins, &orig->pub_key_hash, orig);
+ GNUNET_free (orig);
+}
+
+
+/* FIXME: for now just call clients' callbacks
+ * without sending anything to multicast. */
static void
schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext
*tc)
{
@@ -371,7 +459,7 @@
|| GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size)
{
LOG (GNUNET_ERROR_TYPE_ERROR,
- "MasterTransmitNotify() returned error or invalid message size.\n");
+ "OriginTransmitNotify() returned error or invalid message size.\n");
/* FIXME: handle error */
return;
}
@@ -401,19 +489,18 @@
return;
}
- /* FIXME: send msg to the service and only then call message_cb with the
- * returned signed message.
- * FIXME: Also send to local members in this group.
+ /* FIXME: send msg to the service and only then call handle_multicast_message
+ * with the returned signed message.
*/
- orig->message_cb (orig->cls, (const struct GNUNET_MessageHeader *) msg);
+ handle_multicast_message (&orig->grp, msg);
if (GNUNET_NO == ret)
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 1),
schedule_origin_to_all, orig);
-
}
+
/**
* Send a message to the multicast group.
*
@@ -439,6 +526,7 @@
mh->notify = notify;
mh->notify_cls = notify_cls;
+ /* FIXME: remove delay, it's there only for testing */
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 1),
schedule_origin_to_all, origin);
@@ -470,18 +558,6 @@
/**
- * Stop a multicast group.
- *
- * @param origin Multicast group to stop.
- */
-void
-GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *origin)
-{
- GNUNET_free (origin);
-}
-
-
-/**
* Join a multicast group.
*
* The entity joining is always the local peer. Further information about the
@@ -531,24 +607,61 @@
const struct GNUNET_PeerIdentity *relays,
const struct GNUNET_MessageHeader *join_request,
GNUNET_MULTICAST_JoinCallback join_cb,
- GNUNET_MULTICAST_MembershipTestCallback
mem_test_cb,
+ GNUNET_MULTICAST_MembershipTestCallback
member_test_cb,
GNUNET_MULTICAST_ReplayFragmentCallback
replay_frag_cb,
GNUNET_MULTICAST_ReplayMessageCallback
replay_msg_cb,
GNUNET_MULTICAST_MessageCallback message_cb,
void *cls)
{
struct GNUNET_MULTICAST_Member *mem = GNUNET_malloc (sizeof (*mem));
+ mem->group_key = *group_key;
+ mem->member_key = *member_key;
+ mem->origin = *origin;
+ mem->relay_count = relay_count;
+ mem->relays = *relays;
+ mem->join_cb = join_cb;
+ mem->member_test_cb = member_test_cb;
+ mem->replay_frag_cb = replay_frag_cb;
+ mem->message_cb = message_cb;
+ mem->cls = cls;
+ if (NULL != join_request)
+ {
+ uint16_t size = ntohs (join_request->size);
+ mem->join_request = GNUNET_malloc (size);
+ memcpy (mem->join_request, join_request, size);
+ }
+
+ GNUNET_CRYPTO_hash (&mem->group_key, sizeof (mem->group_key),
&mem->group_key_hash);
+
+ if (NULL == members)
+ members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+
+ GNUNET_CONTAINER_multihashmap_put (members, &mem->group_key_hash, mem,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+
+ /* FIXME: send MEMBER_JOIN to service */
+
return mem;
}
/**
- * Handle for a replay request.
+ * Part a multicast group.
+ *
+ * Disconnects from all group members and invalidates the @a member handle.
+ *
+ * An application-dependent part message can be transmitted beforehand using
+ * #GNUNET_MULTICAST_member_to_origin())
+ *
+ * @param member Membership handle.
*/
-struct GNUNET_MULTICAST_MemberReplayHandle
+void
+GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem)
{
-};
+ GNUNET_CONTAINER_multihashmap_remove (members, &mem->group_key_hash, mem);
+ GNUNET_free (mem);
+}
/**
@@ -612,20 +725,62 @@
}
-/**
- * Part a multicast group.
- *
- * Disconnects from all group members and invalidates the @a member handle.
- *
- * An application-dependent part message can be transmitted beforehand using
- * #GNUNET_MULTICAST_member_to_origin())
- *
- * @param member Membership handle.
- */
-void
-GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *member)
+/* FIXME: for now just send back to the client what it sent. */
+static void
+schedule_member_to_origin (void *cls, const struct
GNUNET_SCHEDULER_TaskContext *tc)
{
- GNUNET_free (member);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "schedule_member_to_origin()\n");
+ struct GNUNET_MULTICAST_Member *mem = cls;
+ struct GNUNET_MULTICAST_MemberRequestHandle *rh = &mem->req_handle;
+
+ size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD;
+ char buf[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
+ struct GNUNET_MULTICAST_RequestHeader *req
+ = (struct GNUNET_MULTICAST_RequestHeader *) buf;
+ int ret = rh->notify (rh->notify_cls, &buf_size, &req[1]);
+
+ if (! (GNUNET_YES == ret || GNUNET_NO == ret)
+ || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size)
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "MemberTransmitNotify() returned error or invalid message size.\n");
+ /* FIXME: handle error */
+ return;
+ }
+
+ if (GNUNET_NO == ret && 0 == buf_size)
+ return; /* Transmission paused. */
+
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
+ req->header.size = htons (sizeof (*req) + buf_size);
+ req->request_id = GNUNET_htonll (rh->request_id);
+
+ /* FIXME: add fragment ID and signature in the service instead of here */
+ req->fragment_id = GNUNET_ntohll (mem->next_fragment_id++);
+ req->fragment_offset = GNUNET_ntohll (rh->fragment_offset);
+ rh->fragment_offset += sizeof (*req) + buf_size;
+ req->purpose.size = htonl (sizeof (*req) + buf_size
+ - sizeof (req->header)
+ - sizeof (req->member_key)
+ - sizeof (req->signature));
+ req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
+
+ if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&mem->member_key, &req->purpose,
+ &req->signature))
+ {
+ /* FIXME: handle error */
+ return;
+ }
+
+ /* FIXME: send req to the service and only then call handle_multicast_request
+ * with the returned request.
+ */
+ handle_multicast_request (&mem->group_key_hash, req);
+
+ if (GNUNET_NO == ret)
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 1),
+ schedule_member_to_origin, mem);
}
@@ -633,18 +788,28 @@
* Send a message to the origin of the multicast group.
*
* @param member Membership handle.
- * @param message_id Application layer ID for the message. Opaque to
multicast.
+ * @param request_id Application layer ID for the request. Opaque to
multicast.
* @param notify Callback to call to get the message.
* @param notify_cls Closure for @a notify.
* @return Handle to cancel request, NULL on error (i.e. request already
pending).
*/
struct GNUNET_MULTICAST_MemberRequestHandle *
GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *member,
- uint64_t message_id,
+ uint64_t request_id,
GNUNET_MULTICAST_MemberTransmitNotify
notify,
void *notify_cls)
{
- return NULL;
+ struct GNUNET_MULTICAST_MemberRequestHandle *rh = &member->req_handle;
+ rh->member = member;
+ rh->request_id = request_id;
+ rh->notify = notify;
+ rh->notify_cls = notify_cls;
+
+ /* FIXME: remove delay, it's there only for testing */
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 1),
+ schedule_member_to_origin, member);
+ return &member->req_handle;
}
Modified: gnunet/src/psyc/Makefile.am
===================================================================
--- gnunet/src/psyc/Makefile.am 2014-03-06 23:46:42 UTC (rev 32574)
+++ gnunet/src/psyc/Makefile.am 2014-03-06 23:46:45 UTC (rev 32575)
@@ -21,7 +21,7 @@
libgnunetpsyc_la_SOURCES = \
psyc_api.c \
- psyc.h
+ psyc_common.c
libgnunetpsyc_la_LIBADD = \
$(top_builddir)/src/util/libgnunetutil.la \
$(top_builddir)/src/env/libgnunetenv.la \
@@ -39,7 +39,8 @@
gnunet-service-psyc
gnunet_service_psyc_SOURCES = \
- gnunet-service-psyc.c
+ gnunet-service-psyc.c \
+ psyc_common.c
gnunet_service_psyc_LDADD = \
$(top_builddir)/src/statistics/libgnunetstatistics.la \
$(top_builddir)/src/util/libgnunetutil.la \
@@ -51,6 +52,7 @@
$(top_builddir)/src/util/libgnunetutil.la \
$(top_builddir)/src/multicast/libgnunetmulticast.la \
$(top_builddir)/src/psycstore/libgnunetpsycstore.la
+gnunet_service_psyc_CFLAGS = $(AM_CFLAGS)
if HAVE_TESTING
Modified: gnunet/src/psyc/gnunet-service-psyc.c
===================================================================
--- gnunet/src/psyc/gnunet-service-psyc.c 2014-03-06 23:46:42 UTC (rev
32574)
+++ gnunet/src/psyc/gnunet-service-psyc.c 2014-03-06 23:46:45 UTC (rev
32575)
@@ -56,6 +56,7 @@
static struct GNUNET_PSYCSTORE_Handle *store;
/**
+ * All connected masters and slaves.
* Channel's pub_key_hash -> struct Channel
*/
static struct GNUNET_CONTAINER_MultiHashMap *clients;
@@ -105,6 +106,15 @@
uint8_t in_transmit;
uint8_t is_master;
+
+ /**
+ * Ready to receive messages from client.
+ */
+ uint8_t ready;
+
+ /**
+ * Client disconnected.
+ */
uint8_t disconnected;
};
@@ -116,7 +126,6 @@
struct Channel channel;
struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
- struct GNUNET_HashCode pub_key_hash;
struct GNUNET_MULTICAST_Origin *origin;
struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
@@ -144,6 +153,8 @@
* @see enum GNUNET_PSYC_Policy
*/
uint32_t policy;
+
+ struct GNUNET_HashCode pub_key_hash;
};
@@ -155,24 +166,26 @@
struct Channel channel;
struct GNUNET_CRYPTO_EddsaPrivateKey slave_key;
struct GNUNET_CRYPTO_EddsaPublicKey chan_key;
- struct GNUNET_HashCode chan_key_hash;
struct GNUNET_MULTICAST_Member *member;
struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle;
struct GNUNET_PeerIdentity origin;
+
+ uint32_t relay_count;
struct GNUNET_PeerIdentity *relays;
+
struct GNUNET_MessageHeader *join_req;
uint64_t max_message_id;
uint64_t max_request_id;
- uint32_t relay_count;
+ struct GNUNET_HashCode chan_key_hash;
};
static inline void
-transmit_message (struct Channel *ch);
+transmit_message (struct Channel *ch, uint8_t inc_msg_id);
/**
@@ -235,14 +248,14 @@
if (NULL == client)
return;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client);
-
struct Channel *ch
= GNUNET_SERVER_client_get_user_context (client, struct Channel);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client disconnected\n", ch);
+
if (NULL == ch)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "User context is NULL in client_disconnect()\n");
+ "%p User context is NULL in client_disconnect()\n", ch);
GNUNET_break (0);
return;
}
@@ -252,7 +265,7 @@
/* Send pending messages to multicast before cleanup. */
if (NULL != ch->tmit_head)
{
- transmit_message (ch);
+ transmit_message (ch, GNUNET_NO);
}
else
{
@@ -311,54 +324,23 @@
/**
- * Iterator callback for sending a message to a client.
- *
- * @see message_cb()
- */
-static int
-message_to_client (void *cls, const struct GNUNET_HashCode *chan_key_hash,
- void *chan)
-{
- const struct GNUNET_MessageHeader *msg = cls;
- struct Channel *ch = chan;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending message of type %u and size %u to client 0x%zx.\n",
- ntohs (msg->type), ntohs (msg->size), ch->client);
-
- GNUNET_SERVER_notification_context_add (nc, ch->client);
- GNUNET_SERVER_notification_context_unicast (nc, ch->client, msg, GNUNET_NO);
-
- return GNUNET_YES;
-}
-
-
-/**
* Incoming message fragment from multicast.
*
- * Store it using PSYCstore and send it to all clients of the channel.
+ * Store it using PSYCstore and send it to the client of the channel.
*/
static void
-message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
+message_cb (struct Channel *ch,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *chan_key,
+ const struct GNUNET_HashCode *chan_key_hash,
+ const struct GNUNET_MessageHeader *msg)
{
uint16_t type = ntohs (msg->type);
uint16_t size = ntohs (msg->size);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received message of type %u and size %u from multicast.\n",
- type, size);
+ "%p Received message of type %u and size %u from multicast.\n",
+ ch, type, size);
- struct Channel *ch = cls;
- struct Master *mst = cls;
- struct Slave *slv = cls;
-
- /* const struct GNUNET_MULTICAST_MessageHeader *mmsg
- = (const struct GNUNET_MULTICAST_MessageHeader *) msg; */
- struct GNUNET_CRYPTO_EddsaPublicKey *chan_key
- = ch->is_master ? &mst->pub_key : &slv->chan_key;
- struct GNUNET_HashCode *chan_key_hash
- = ch->is_master ? &mst->pub_key_hash : &slv->chan_key_hash;
-
switch (type)
{
case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
@@ -378,29 +360,19 @@
const struct GNUNET_MULTICAST_MessageHeader *mmsg
= (const struct GNUNET_MULTICAST_MessageHeader *) msg;
- struct GNUNET_PSYC_MessageHeader *pmsg;
- uint16_t size = ntohs (msg->size);
- uint16_t psize = 0;
- uint16_t pos = 0;
-
- for (pos = 0; sizeof (*mmsg) + pos < size; pos += psize)
+ if (GNUNET_YES != GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg),
+ (const char *)
&mmsg[1]))
{
- const struct GNUNET_MessageHeader *pmsg
- = (const struct GNUNET_MessageHeader *) ((char *) &mmsg[1] + pos);
- psize = ntohs (pmsg->size);
- if (psize < sizeof (*pmsg) || sizeof (*mmsg) + pos + psize > size)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Received invalid message part of type %u and size %u "
- "from multicast. Not sending to clients.\n",
- ntohs (pmsg->type), psize);
- GNUNET_break_op (0);
- return;
- }
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Received message with invalid parts from multicast. "
+ "Dropping message.\n", ch);
+ GNUNET_break_op (0);
+ break;
}
- psize = sizeof (*pmsg) + size - sizeof (*mmsg);
+ struct GNUNET_PSYC_MessageHeader *pmsg;
+ uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
pmsg = GNUNET_malloc (psize);
pmsg->header.size = htons (psize);
pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
@@ -408,39 +380,116 @@
memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
- GNUNET_CONTAINER_multihashmap_get_multiple (clients, chan_key_hash,
- message_to_client,
- (void *) pmsg);
+ GNUNET_SERVER_notification_context_add (nc, ch->client);
+ GNUNET_SERVER_notification_context_unicast (nc, ch->client,
+ (const struct
GNUNET_MessageHeader *) pmsg,
+ GNUNET_NO);
GNUNET_free (pmsg);
break;
}
default:
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Discarding unknown message of type %u and size %u.\n",
- type, size);
+ "%p Dropping unknown message of type %u and size %u.\n",
+ ch, type, size);
}
}
/**
- * Send a request received from multicast to a client.
+ * Incoming message fragment from multicast for a master.
*/
-static int
-request_to_client (void *cls, const struct GNUNET_HashCode *chan_key_hash,
- void *chan)
+static void
+master_message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
{
- /* TODO */
+ struct Master *mst = cls;
+ GNUNET_assert (NULL != mst);
- return GNUNET_YES;
+ struct GNUNET_CRYPTO_EddsaPublicKey *chan_key = &mst->pub_key;
+ struct GNUNET_HashCode *chan_key_hash = &mst->pub_key_hash;
+
+ message_cb (&mst->channel, chan_key, chan_key_hash, msg);
}
+/**
+ * Incoming message fragment from multicast for a slave.
+ */
static void
+slave_message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+ struct Slave *slv = cls;
+ GNUNET_assert (NULL != slv);
+
+ struct GNUNET_CRYPTO_EddsaPublicKey *chan_key = &slv->chan_key;
+ struct GNUNET_HashCode *chan_key_hash = &slv->chan_key_hash;
+
+ message_cb (&slv->channel, chan_key, chan_key_hash, msg);
+}
+
+
+/**
+ * Incoming request fragment from multicast for a master.
+ *
+ * @param cls Master.
+ * @param member_key Sending member's public key.
+ * @param msg The message.
+ * @param flags Request flags.
+ */
+static void
request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
- const struct GNUNET_MessageHeader *req,
+ const struct GNUNET_MessageHeader *msg,
enum GNUNET_MULTICAST_MessageFlags flags)
{
+ struct Master *mst = cls;
+ struct Channel *ch = &mst->channel;
+ uint16_t type = ntohs (msg->type);
+ uint16_t size = ntohs (msg->size);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Received request of type %u and size %u from multicast.\n",
+ ch, type, size);
+
+ switch (type)
+ {
+ case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
+ {
+ const struct GNUNET_MULTICAST_RequestHeader *req
+ = (const struct GNUNET_MULTICAST_RequestHeader *) msg;
+
+ if (GNUNET_YES != GNUNET_PSYC_check_message_parts (size - sizeof (*req),
+ (const char *) &req[1]))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Dropping message with invalid parts "
+ "received from multicast.\n", ch);
+ GNUNET_break_op (0);
+ break;
+ }
+
+ struct GNUNET_PSYC_MessageHeader *pmsg;
+ uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
+ pmsg = GNUNET_malloc (psize);
+ pmsg->header.size = htons (psize);
+ pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+ pmsg->message_id = req->request_id;
+ pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
+
+ memcpy (&pmsg[1], &req[1], size - sizeof (*req));
+
+ GNUNET_SERVER_notification_context_add (nc, ch->client);
+ GNUNET_SERVER_notification_context_unicast (nc, ch->client,
+ (const struct
GNUNET_MessageHeader *) pmsg,
+ GNUNET_NO);
+ GNUNET_free (pmsg);
+ break;
+ }
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Dropping unknown request of type %u and size %u.\n",
+ ch, type, size);
+ GNUNET_break_op (0);
+ }
}
@@ -470,7 +519,8 @@
max_fragment_id + 1,
join_cb, membership_test_cb,
replay_fragment_cb, replay_message_cb,
- request_cb, message_cb, ch);
+ request_cb, master_message_cb, ch);
+ ch->ready = GNUNET_YES;
}
GNUNET_SERVER_notification_context_add (nc, ch->client);
GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
@@ -505,7 +555,8 @@
slv->join_req, join_cb,
membership_test_cb,
replay_fragment_cb, replay_message_cb,
- message_cb, ch);
+ slave_message_cb, ch);
+ ch->ready = GNUNET_YES;
}
GNUNET_SERVER_notification_context_add (nc, ch->client);
@@ -529,9 +580,11 @@
mst->channel.is_master = GNUNET_YES;
mst->policy = ntohl (req->policy);
mst->priv_key = req->channel_key;
- GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key,
- &mst->pub_key);
+ GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &mst->pub_key);
GNUNET_CRYPTO_hash (&mst->pub_key, sizeof (mst->pub_key),
&mst->pub_key_hash);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Master connected to channel %s.\n",
+ mst, GNUNET_h2s (&mst->pub_key_hash));
GNUNET_PSYCSTORE_counters_get (store, &mst->pub_key,
master_counters_cb, mst);
@@ -561,14 +614,20 @@
&slv->chan_key_hash);
slv->origin = req->origin;
slv->relay_count = ntohl (req->relay_count);
+ if (0 < slv->relay_count)
+ {
+ const struct GNUNET_PeerIdentity *relays
+ = (const struct GNUNET_PeerIdentity *) &req[1];
+ slv->relays
+ = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
+ uint32_t i;
+ for (i = 0; i < slv->relay_count; i++)
+ memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
+ }
- const struct GNUNET_PeerIdentity *relays
- = (const struct GNUNET_PeerIdentity *) &req[1];
- slv->relays
- = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
- uint32_t i;
- for (i = 0; i < slv->relay_count; i++)
- memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Slave connected to channel %s.\n",
+ slv, GNUNET_h2s (&slv->chan_key_hash));
GNUNET_PSYCSTORE_counters_get (store, &slv->chan_key,
slave_counters_cb, slv);
@@ -609,13 +668,14 @@
if (NULL == tmit_msg || *data_size < tmit_msg->size)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify: nothing to
send.\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p transmit_notify: nothing to send.\n", ch);
*data_size = 0;
return GNUNET_NO;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "transmit_notify: sending %u bytes.\n", tmit_msg->size);
+ "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size);
*data_size = tmit_msg->size;
memcpy (data, tmit_msg->buf, *data_size);
@@ -630,7 +690,7 @@
{
if (NULL != ch->tmit_head)
{
- transmit_message (ch);
+ transmit_message (ch, GNUNET_NO);
}
else if (ch->disconnected)
{
@@ -644,19 +704,55 @@
/**
+ * Callback for the transmit functions of multicast.
+ */
+static int
+master_transmit_notify (void *cls, size_t *data_size, void *data)
+{
+ int ret = transmit_notify (cls, data_size, data);
+
+ if (GNUNET_YES == ret)
+ {
+ struct Master *mst = cls;
+ mst->tmit_handle = NULL;
+ }
+ return ret;
+}
+
+
+/**
+ * Callback for the transmit functions of multicast.
+ */
+static int
+slave_transmit_notify (void *cls, size_t *data_size, void *data)
+{
+ int ret = transmit_notify (cls, data_size, data);
+
+ if (GNUNET_YES == ret)
+ {
+ struct Slave *slv = cls;
+ slv->tmit_handle = NULL;
+ }
+ return ret;
+}
+
+
+/**
* Transmit a message from a channel master to the multicast group.
*/
static void
-master_transmit_message (struct Master *mst)
+master_transmit_message (struct Master *mst, uint8_t inc_msg_id)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst);
mst->channel.tmit_task = 0;
if (NULL == mst->tmit_handle)
{
+ if (GNUNET_YES == inc_msg_id)
+ mst->max_message_id++;
mst->tmit_handle
- = GNUNET_MULTICAST_origin_to_all (mst->origin, ++mst->max_message_id,
+ = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
mst->max_group_generation,
- transmit_notify, mst);
+ master_transmit_notify, mst);
}
else
{
@@ -669,14 +765,16 @@
* Transmit a message from a channel slave to the multicast group.
*/
static void
-slave_transmit_message (struct Slave *slv)
+slave_transmit_message (struct Slave *slv, uint8_t inc_msg_id)
{
slv->channel.tmit_task = 0;
if (NULL == slv->tmit_handle)
{
+ if (GNUNET_YES == inc_msg_id)
+ slv->max_message_id++;
slv->tmit_handle
- = GNUNET_MULTICAST_member_to_origin(slv->member, ++slv->max_request_id,
- transmit_notify, slv);
+ = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
+ slave_transmit_notify, slv);
}
else
{
@@ -686,11 +784,11 @@
static inline void
-transmit_message (struct Channel *ch)
+transmit_message (struct Channel *ch, uint8_t inc_msg_id)
{
ch->is_master
- ? master_transmit_message ((struct Master *) ch)
- : slave_transmit_message ((struct Slave *) ch);
+ ? master_transmit_message ((struct Master *) ch, inc_msg_id)
+ : slave_transmit_message ((struct Slave *) ch, inc_msg_id);
}
@@ -708,10 +806,9 @@
tmit_msg->size = sizeof (*msg);
tmit_msg->state = ch->tmit_state;
GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
- transmit_message (ch);
+ transmit_message (ch, GNUNET_NO);
/* FIXME: cleanup */
- GNUNET_SERVER_client_disconnect (ch->client);
}
@@ -720,40 +817,60 @@
*/
static void
handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+ const struct GNUNET_MessageHeader *msg)
{
struct Channel *ch
= GNUNET_SERVER_client_get_user_context (client, struct Channel);
GNUNET_assert (NULL != ch);
+ if (GNUNET_YES != ch->ready)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Ignoring message from client, channel is not ready yet.\n",
+ ch);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+
+ uint8_t inc_msg_id = GNUNET_NO;
uint16_t size = ntohs (msg->size);
- uint16_t psize = 0, pos = 0;
+ uint16_t psize = 0, ptype = 0, pos = 0;
if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Message payload too large\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch);
GNUNET_break (0);
transmit_error (ch);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Received message from client.\n", ch);
+ GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
+
for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
{
const struct GNUNET_MessageHeader *pmsg
= (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
psize = ntohs (pmsg->size);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received message part of type %u and size %u "
- "from client.\n", ntohs (pmsg->type), psize);
+ ptype = ntohs (pmsg->type);
if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Received invalid message part of type %u and size %u "
- "from client.\n", ntohs (pmsg->type), psize);
+ "%p Received invalid message part of type %u and size %u "
+ "from client.\n", ch, ptype, psize);
GNUNET_break (0);
transmit_error (ch);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Received message part from client.\n", ch);
+ GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
+
+ if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == ptype)
+ inc_msg_id = GNUNET_YES;
}
size -= sizeof (*msg);
@@ -763,7 +880,7 @@
tmit_msg->size = size;
tmit_msg->state = ch->tmit_state;
GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
- transmit_message (ch);
+ transmit_message (ch, inc_msg_id);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
};
Modified: gnunet/src/psyc/psyc.h
===================================================================
--- gnunet/src/psyc/psyc.h 2014-03-06 23:46:42 UTC (rev 32574)
+++ gnunet/src/psyc/psyc.h 2014-03-06 23:46:45 UTC (rev 32575)
@@ -27,9 +27,18 @@
#ifndef PSYC_H
#define PSYC_H
-#include "gnunet_common.h"
+#include "platform.h"
+#include "gnunet_psyc_service.h"
+int
+GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data);
+
+void
+GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
+ const struct GNUNET_MessageHeader *msg);
+
+
enum MessageState
{
MSG_STATE_START = 0,
Modified: gnunet/src/psyc/psyc_api.c
===================================================================
--- gnunet/src/psyc/psyc_api.c 2014-03-06 23:46:42 UTC (rev 32574)
+++ gnunet/src/psyc/psyc_api.c 2014-03-06 23:46:45 UTC (rev 32575)
@@ -48,12 +48,30 @@
struct GNUNET_MessageHeader *msg;
};
+
/**
+ * Handle for a pending PSYC transmission operation.
+ */
+struct GNUNET_PSYC_ChannelTransmitHandle
+{
+ struct GNUNET_PSYC_Channel *ch;
+ GNUNET_PSYC_TransmitNotifyModifier notify_mod;
+ GNUNET_PSYC_TransmitNotifyData notify_data;
+ void *notify_cls;
+ enum MessageState state;
+};
+
+/**
* Handle to access PSYC channel operations for both the master and slaves.
*/
struct GNUNET_PSYC_Channel
{
/**
+ * Transmission handle;
+ */
+ struct GNUNET_PSYC_ChannelTransmitHandle tmit;
+
+ /**
* Configuration to use.
*/
const struct GNUNET_CONFIGURATION_Handle *cfg;
@@ -124,6 +142,11 @@
uint64_t recv_message_id;
/**
+ * Public key of the slave from which a message is being received.
+ */
+ struct GNUNET_CRYPTO_EddsaPublicKey recv_slave_key;
+
+ /**
* State of the currently being received message from the PSYC service.
*/
enum MessageState recv_state;
@@ -171,27 +194,12 @@
/**
- * Handle for a pending PSYC transmission operation.
- */
-struct GNUNET_PSYC_MasterTransmitHandle
-{
- struct GNUNET_PSYC_Master *master;
- GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod;
- GNUNET_PSYC_MasterTransmitNotify notify_data;
- void *notify_cls;
- enum MessageState state;
-};
-
-
-/**
* Handle for the master of a PSYC channel.
*/
struct GNUNET_PSYC_Master
{
struct GNUNET_PSYC_Channel ch;
- struct GNUNET_PSYC_MasterTransmitHandle *tmit;
-
GNUNET_PSYC_MasterStartCallback start_cb;
uint64_t max_message_id;
@@ -204,6 +212,10 @@
struct GNUNET_PSYC_Slave
{
struct GNUNET_PSYC_Channel ch;
+
+ GNUNET_PSYC_SlaveJoinCallback join_cb;
+
+ uint64_t max_message_id;
};
@@ -251,7 +263,7 @@
static void
-master_transmit_data (struct GNUNET_PSYC_Master *mst);
+channel_transmit_data (struct GNUNET_PSYC_Channel *ch);
/**
@@ -302,7 +314,8 @@
ch->recv_state = MSG_STATE_START;
ch->recv_flags = 0;
ch->recv_message_id = 0;
- ch->recv_mod_value_size =0;
+ //FIXME: ch->recv_slave_key = { 0 };
+ ch->recv_mod_value_size = 0;
ch->recv_mod_value_size_expected = 0;
}
@@ -379,8 +392,9 @@
}
if (NULL != op
- && (end || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
- < op->msg->size + sizeof (struct GNUNET_MessageHeader))))
+ && (GNUNET_YES == end
+ || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
+ < op->msg->size + sizeof (struct GNUNET_MessageHeader))))
{
/* End of message or buffer is full, add it to transmission queue. */
op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
@@ -390,6 +404,9 @@
ch->tmit_ack_pending++;
}
+ if (GNUNET_YES == end)
+ ch->in_transmit = GNUNET_NO;
+
transmit_next (ch);
}
@@ -400,15 +417,14 @@
* @param mst Master handle.
*/
static void
-master_transmit_mod (struct GNUNET_PSYC_Master *mst)
+channel_transmit_mod (struct GNUNET_PSYC_Channel *ch)
{
- struct GNUNET_PSYC_Channel *ch = &mst->ch;
uint16_t max_data_size, data_size;
char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
int notify_ret;
- switch (mst->tmit->state)
+ switch (ch->tmit.state)
{
case MSG_STATE_MODIFIER:
{
@@ -417,12 +433,11 @@
max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
- notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls,
- &data_size, &mod[1], &mod->oper);
+ notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
+ &data_size, &mod[1], &mod->oper);
mod->name_size = strnlen ((char *) &mod[1], data_size);
if (mod->name_size < data_size)
{
- mod->oper = htons (mod->oper);
mod->value_size = htons (data_size - 1 - mod->name_size);
mod->name_size = htons (mod->name_size);
}
@@ -438,8 +453,8 @@
max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
msg->size = sizeof (struct GNUNET_MessageHeader);
- notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls,
- &data_size, &msg[1], NULL);
+ notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
+ &data_size, &msg[1], NULL);
break;
}
default:
@@ -454,27 +469,28 @@
ch->tmit_paused = GNUNET_YES;
return;
}
- mst->tmit->state = MSG_STATE_MOD_CONT;
+ ch->tmit.state = MSG_STATE_MOD_CONT;
break;
case GNUNET_YES:
if (0 == data_size)
{
/* End of modifiers. */
- mst->tmit->state = MSG_STATE_DATA;
+ ch->tmit.state = MSG_STATE_DATA;
if (0 == ch->tmit_ack_pending)
- master_transmit_data (mst);
+ channel_transmit_data (ch);
return;
}
- mst->tmit->state = MSG_STATE_MODIFIER;
+ ch->tmit.state = MSG_STATE_MODIFIER;
break;
default:
LOG (GNUNET_ERROR_TYPE_ERROR,
- "MasterTransmitNotify returned error when requesting a modifier.\n");
+ "MasterTransmitNotifyModifier returned error "
+ "when requesting a modifier.\n");
- mst->tmit->state = MSG_STATE_START;
+ ch->tmit.state = MSG_STATE_CANCEL;
msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
msg->size = htons (sizeof (*msg));
@@ -489,7 +505,7 @@
queue_message (ch, msg, GNUNET_NO);
}
- master_transmit_mod (mst);
+ channel_transmit_mod (ch);
}
@@ -499,17 +515,16 @@
* @param mst Master handle.
*/
static void
-master_transmit_data (struct GNUNET_PSYC_Master *mst)
+channel_transmit_data (struct GNUNET_PSYC_Channel *ch)
{
- struct GNUNET_PSYC_Channel *ch = &mst->ch;
uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
- int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls,
- &data_size, &msg[1]);
+ int notify_ret = ch->tmit.notify_data (ch->tmit.notify_cls,
+ &data_size, &msg[1]);
switch (notify_ret)
{
case GNUNET_NO:
@@ -522,14 +537,14 @@
break;
case GNUNET_YES:
- mst->tmit->state = MSG_STATE_START;
+ ch->tmit.state = MSG_STATE_END;
break;
default:
LOG (GNUNET_ERROR_TYPE_ERROR,
"MasterTransmitNotify returned error when requesting data.\n");
- mst->tmit->state = MSG_STATE_START;
+ ch->tmit.state = MSG_STATE_CANCEL;
msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
msg->size = htons (sizeof (*msg));
queue_message (ch, msg, GNUNET_YES);
@@ -554,6 +569,86 @@
/**
+ * Send a message to a channel.
+ *
+ * @param ch Handle to the PSYC channel.
+ * @param method_name Which method should be invoked.
+ * @param notify_mod Function to call to obtain modifiers.
+ * @param notify_data Function to call to obtain fragments of the data.
+ * @param notify_cls Closure for @a notify_mod and @a notify_data.
+ * @param flags Flags for the message being transmitted.
+ * @return Transmission handle, NULL on error (i.e. more than one request
queued).
+ */
+static struct GNUNET_PSYC_ChannelTransmitHandle *
+channel_transmit (struct GNUNET_PSYC_Channel *ch,
+ const char *method_name,
+ GNUNET_PSYC_TransmitNotifyModifier notify_mod,
+ GNUNET_PSYC_TransmitNotifyData notify_data,
+ void *notify_cls,
+ uint32_t flags)
+{
+ if (GNUNET_NO != ch->in_transmit)
+ return NULL;
+ ch->in_transmit = GNUNET_YES;
+
+ size_t size = strlen (method_name) + 1;
+ struct GNUNET_PSYC_MessageMethod *pmeth;
+ struct OperationHandle *op;
+
+ ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg)
+ + sizeof (*pmeth) + size);
+ op->msg = (struct GNUNET_MessageHeader *) &op[1];
+ op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size;
+
+ pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1];
+ pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
+ pmeth->header.size = htons (sizeof (*pmeth) + size);
+ pmeth->flags = htonl (flags);
+ memcpy (&pmeth[1], method_name, size);
+
+ ch->tmit.ch = ch;
+ ch->tmit.notify_mod = notify_mod;
+ ch->tmit.notify_data = notify_data;
+ ch->tmit.notify_cls = notify_cls;
+ ch->tmit.state = MSG_STATE_MODIFIER;
+
+ channel_transmit_mod (ch);
+ return &ch->tmit;
+}
+
+
+/**
+ * Resume transmission to the channel.
+ *
+ * @param th Handle of the request that is being resumed.
+ */
+static void
+channel_transmit_resume (struct GNUNET_PSYC_ChannelTransmitHandle *th)
+{
+ struct GNUNET_PSYC_Channel *ch = th->ch;
+ if (0 == ch->tmit_ack_pending)
+ {
+ ch->tmit_paused = GNUNET_NO;
+ channel_transmit_data (ch);
+ }
+}
+
+
+/**
+ * Abort transmission request to channel.
+ *
+ * @param th Handle of the request that is being aborted.
+ */
+static void
+channel_transmit_cancel (struct GNUNET_PSYC_ChannelTransmitHandle *th)
+{
+ struct GNUNET_PSYC_Channel *ch = th->ch;
+ if (GNUNET_NO == ch->in_transmit)
+ return;
+}
+
+
+/**
* Handle incoming message from the PSYC service.
*
* @param ch The channel the message is sent to.
@@ -564,14 +659,20 @@
const struct GNUNET_PSYC_MessageHeader *msg)
{
uint16_t size = ntohs (msg->header.size);
+ uint32_t flags = ntohl (msg->flags);
+ GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
+ (struct GNUNET_MessageHeader *) msg);
+
if (MSG_STATE_START == ch->recv_state)
{
ch->recv_message_id = GNUNET_ntohll (msg->message_id);
- ch->recv_flags = ntohl (msg->flags);
+ ch->recv_flags = flags;
+ ch->recv_slave_key = msg->slave_key;
}
else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id)
{
+ // FIXME
LOG (GNUNET_ERROR_TYPE_WARNING,
"Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
GNUNET_ntohll (msg->message_id), ch->recv_message_id);
@@ -579,11 +680,11 @@
recv_error (ch);
return;
}
- else if (ntohl (msg->flags) != ch->recv_flags)
+ else if (flags != ch->recv_flags)
{
LOG (GNUNET_ERROR_TYPE_WARNING,
"Unexpected message flags. Got: %lu, expected: %lu\n",
- ntohl (msg->flags), ch->recv_flags);
+ flags, ch->recv_flags);
GNUNET_break_op (0);
recv_error (ch);
return;
@@ -599,10 +700,6 @@
ptype = ntohs (pmsg->type);
size_eq = size_min = 0;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received message part of type %u and size %u from PSYC.\n",
- ptype, psize);
-
if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -612,6 +709,10 @@
return;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message part from PSYC.\n");
+ GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
+
switch (ptype)
{
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
@@ -758,6 +859,46 @@
/**
+ * Handle incoming message acknowledgement from the PSYC service.
+ *
+ * @param ch The channel the acknowledgement is sent to.
+ */
+static void
+handle_psyc_message_ack (struct GNUNET_PSYC_Channel *ch)
+{
+ if (0 == ch->tmit_ack_pending)
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
+ GNUNET_break (0);
+ return;
+ }
+ ch->tmit_ack_pending--;
+
+ switch (ch->tmit.state)
+ {
+ case MSG_STATE_MODIFIER:
+ case MSG_STATE_MOD_CONT:
+ if (GNUNET_NO == ch->tmit_paused)
+ channel_transmit_mod (ch);
+ break;
+
+ case MSG_STATE_DATA:
+ if (GNUNET_NO == ch->tmit_paused)
+ channel_transmit_data (ch);
+ break;
+
+ case MSG_STATE_END:
+ case MSG_STATE_CANCEL:
+ break;
+
+ default:
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Ignoring message ACK in state %u.\n", ch->tmit.state);
+ }
+}
+
+
+/**
* Type of a function to call when we receive a message
* from the service.
*
@@ -775,7 +916,7 @@
if (NULL == msg)
{
- GNUNET_break (0);
+ // timeout / disconnected from server, reconnect
reschedule_connect (ch);
return;
}
@@ -824,63 +965,15 @@
}
case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
{
-#if TODO
struct CountersResult *cres = (struct CountersResult *) msg;
slv->max_message_id = GNUNET_ntohll (cres->max_message_id);
- if (NULL != slv->join_ack_cb)
- mst->join_ack_cb (ch->cb_cls, mst->max_message_id);
-#endif
+ if (NULL != slv->join_cb)
+ slv->join_cb (ch->cb_cls, slv->max_message_id);
break;
}
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
{
- if (0 == ch->tmit_ack_pending)
- {
- LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
- GNUNET_break (0);
- break;
- }
- ch->tmit_ack_pending--;
-
- if (ch->is_master)
- {
- GNUNET_assert (NULL != mst->tmit);
- switch (mst->tmit->state)
- {
- case MSG_STATE_MODIFIER:
- case MSG_STATE_MOD_CONT:
- if (GNUNET_NO == ch->tmit_paused)
- master_transmit_mod (mst);
- break;
-
- case MSG_STATE_DATA:
- if (GNUNET_NO == ch->tmit_paused)
- master_transmit_data (mst);
- break;
-
- case MSG_STATE_END:
- case MSG_STATE_CANCEL:
- if (NULL != mst->tmit)
- {
- GNUNET_free (mst->tmit);
- mst->tmit = NULL;
- }
- else
- {
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "Ignoring message ACK, there's no transmission going on.\n");
- GNUNET_break (0);
- }
- break;
- default:
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Ignoring message ACK in state %u.\n", mst->tmit->state);
- }
- }
- else
- {
- /* TODO: slave */
- }
+ handle_psyc_message_ack (ch);
break;
}
@@ -1106,8 +1199,6 @@
GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master)
{
disconnect (master);
- if (NULL != master->tmit)
- GNUNET_free (master->tmit);
GNUNET_free (master);
}
@@ -1162,41 +1253,14 @@
struct GNUNET_PSYC_MasterTransmitHandle *
GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
const char *method_name,
- GNUNET_PSYC_MasterTransmitNotifyModifier
notify_mod,
- GNUNET_PSYC_MasterTransmitNotify notify_data,
+ GNUNET_PSYC_TransmitNotifyModifier notify_mod,
+ GNUNET_PSYC_TransmitNotifyData notify_data,
void *notify_cls,
enum GNUNET_PSYC_MasterTransmitFlags flags)
{
- GNUNET_assert (NULL != master);
- struct GNUNET_PSYC_Channel *ch = &master->ch;
- if (GNUNET_NO != ch->in_transmit)
- return NULL;
- ch->in_transmit = GNUNET_YES;
-
- size_t size = strlen (method_name) + 1;
- struct GNUNET_PSYC_MessageMethod *pmeth;
- struct OperationHandle *op;
-
- ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg)
- + sizeof (*pmeth) + size);
- op->msg = (struct GNUNET_MessageHeader *) &op[1];
- op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size;
-
- pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1];
- pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
- pmeth->header.size = htons (sizeof (*pmeth) + size);
- pmeth->flags = htonl (flags);
- memcpy (&pmeth[1], method_name, size);
-
- master->tmit = GNUNET_malloc (sizeof (*master->tmit));
- master->tmit->master = master;
- master->tmit->notify_mod = notify_mod;
- master->tmit->notify_data = notify_data;
- master->tmit->notify_cls = notify_cls;
- master->tmit->state = MSG_STATE_MODIFIER;
-
- master_transmit_mod (master);
- return master->tmit;
+ return (struct GNUNET_PSYC_MasterTransmitHandle *)
+ channel_transmit (&master->ch, method_name, notify_mod, notify_data,
+ notify_cls, flags);
}
@@ -1208,12 +1272,7 @@
void
GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle
*th)
{
- struct GNUNET_PSYC_Channel *ch = &th->master->ch;
- if (0 == ch->tmit_ack_pending)
- {
- ch->tmit_paused = GNUNET_NO;
- master_transmit_data (th->master);
- }
+ channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
}
@@ -1225,10 +1284,7 @@
void
GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle
*th)
{
- struct GNUNET_PSYC_Master *master = th->master;
- struct GNUNET_PSYC_Channel *ch = &master->ch;
- if (GNUNET_NO != ch->in_transmit)
- return;
+ channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
}
@@ -1282,15 +1338,15 @@
{
struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv));
struct GNUNET_PSYC_Channel *ch = &slv->ch;
- struct SlaveJoinRequest *req = GNUNET_malloc (sizeof (*req)
- + relay_count * sizeof
(*relays));
+ struct SlaveJoinRequest *req
+ = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays));
req->header.size = htons (sizeof (*req)
+ relay_count * sizeof (*relays));
req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN);
req->channel_key = *channel_key;
req->slave_key = *slave_key;
req->origin = *origin;
- req->relay_count = relay_count;
+ req->relay_count = htonl (relay_count);
memcpy (&req[1], relays, relay_count * sizeof (*relays));
ch->message_cb = message_cb;
@@ -1303,6 +1359,7 @@
ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv);
+ slv->join_cb = slave_joined_cb;
return slv;
}
@@ -1328,9 +1385,8 @@
*
* @param slave Slave handle.
* @param method_name Which (PSYC) method should be invoked (on host).
- * @param env Environment containing transient variables for the message, or
- * NULL.
- * @param notify Function to call when we are allowed to transmit (to get
data).
+ * @param notify_mod Function to call to obtain modifiers.
+ * @param notify_data Function to call to obtain fragments of the data.
* @param notify_cls Closure for @a notify.
* @param flags Flags for the message being transmitted.
* @return Transmission handle, NULL on error (i.e. more than one request
@@ -1339,12 +1395,14 @@
struct GNUNET_PSYC_SlaveTransmitHandle *
GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
const char *method_name,
- const struct GNUNET_ENV_Environment *env,
- GNUNET_PSYC_SlaveTransmitNotify notify,
+ GNUNET_PSYC_TransmitNotifyModifier notify_mod,
+ GNUNET_PSYC_TransmitNotifyData notify_data,
void *notify_cls,
enum GNUNET_PSYC_SlaveTransmitFlags flags)
{
- return NULL;
+ return (struct GNUNET_PSYC_SlaveTransmitHandle *)
+ channel_transmit (&slave->ch, method_name,
+ notify_mod, notify_data, notify_cls, flags);
}
@@ -1356,7 +1414,7 @@
void
GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
{
-
+ channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
}
@@ -1368,7 +1426,7 @@
void
GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th)
{
-
+ channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
}
@@ -1382,7 +1440,7 @@
struct GNUNET_PSYC_Channel *
GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
{
- return (struct GNUNET_PSYC_Channel *) master;
+ return &master->ch;
}
@@ -1395,7 +1453,7 @@
struct GNUNET_PSYC_Channel *
GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave)
{
- return (struct GNUNET_PSYC_Channel *) slave;
+ return &slave->ch;
}
Added: gnunet/src/psyc/psyc_common.c
===================================================================
--- gnunet/src/psyc/psyc_common.c (rev 0)
+++ gnunet/src/psyc/psyc_common.c 2014-03-06 23:46:45 UTC (rev 32575)
@@ -0,0 +1,100 @@
+/*
+ * This file is part of GNUnet
+ * (C) 2013 Christian Grothoff (and other contributing authors)
+ *
+ * GNUnet is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 3, or (at your
+ * option) any later version.
+ *
+ * GNUnet is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNUnet; see the file COPYING. If not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+/**
+ * @file psyc/psyc_common.c
+ * @brief Common functions for PSYC
+ * @author Gabor X Toth
+ */
+
+#include <inttypes.h>
+#include "psyc.h"
+
+/**
+ * Check if @a data contains a series of valid message parts.
+ *
+ * @param data_size Size of @a data.
+ * @param data Data.
+ *
+ * @return GNUNET_YES or GNUNET_NO
+ */
+int
+GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data)
+{
+ const struct GNUNET_MessageHeader *pmsg;
+ uint16_t psize = 0;
+ uint16_t pos = 0;
+
+ for (pos = 0; data_size + pos < data_size; pos += psize)
+ {
+ pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
+ psize = ntohs (pmsg->size);
+ if (psize < sizeof (*pmsg) || data_size + pos + psize > data_size)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Invalid message part of type %u and size %u.",
+ ntohs (pmsg->type), psize);
+ return GNUNET_NO;
+ }
+ }
+ return GNUNET_YES;
+}
+
+
+void
+GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
+ const struct GNUNET_MessageHeader *msg)
+{
+ uint16_t size = ntohs (msg->size);
+ uint16_t type = ntohs (msg->type);
+ GNUNET_log (kind, "Message of type %d and size %u:\n", type, size);
+ switch (type)
+ {
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
+ {
+ struct GNUNET_PSYC_MessageHeader *pmsg
+ = (struct GNUNET_PSYC_MessageHeader *) msg;
+ GNUNET_log (kind, "\tID: %" PRIu64 "\tflags: %" PRIu32 "\n",
+ GNUNET_ntohll (pmsg->message_id), ntohl (pmsg->flags));
+ break;
+ }
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
+ {
+ struct GNUNET_PSYC_MessageMethod *meth
+ = (struct GNUNET_PSYC_MessageMethod *) msg;
+ GNUNET_log (kind, "\t%.*s\n", size - sizeof (*meth), &meth[1]);
+ break;
+ }
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+ {
+ struct GNUNET_PSYC_MessageModifier *mod
+ = (struct GNUNET_PSYC_MessageModifier *) msg;
+ uint16_t name_size = ntohs (mod->name_size);
+ char oper = ' ' < mod->oper ? mod->oper : ' ';
+ GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1],
+ ntohs (mod->value_size), ((char *) &mod[1]) + name_size + 1);
+ break;
+ }
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
+ GNUNET_log (kind, "\t%.*s\n", size - sizeof (*msg), &msg[1]);
+ break;
+ }
+}
Modified: gnunet/src/psyc/test_psyc.c
===================================================================
--- gnunet/src/psyc/test_psyc.c 2014-03-06 23:46:42 UTC (rev 32574)
+++ gnunet/src/psyc/test_psyc.c 2014-03-06 23:46:45 UTC (rev 32575)
@@ -37,7 +37,7 @@
#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
-#define DEBUG_SERVICE 0
+#define DEBUG_SERVICE 1
/**
@@ -66,7 +66,8 @@
struct TransmitClosure
{
- struct GNUNET_PSYC_MasterTransmitHandle *handle;
+ struct GNUNET_PSYC_MasterTransmitHandle *mst_tmit;
+ struct GNUNET_PSYC_SlaveTransmitHandle *slv_tmit;
struct GNUNET_ENV_Environment *env;
char *data[16];
const char *mod_value;
@@ -78,12 +79,30 @@
struct TransmitClosure *tmit;
+
+enum
+{
+ TEST_NONE,
+ TEST_SLAVE_TRANSMIT,
+ TEST_MASTER_TRANSMIT,
+} test;
+
+
+static void
+master_transmit ();
+
+
/**
* Clean up all resources used.
*/
static void
cleanup ()
{
+ if (NULL != slv)
+ {
+ GNUNET_PSYC_slave_part (slv);
+ slv = NULL;
+ }
if (NULL != mst)
{
GNUNET_PSYC_master_stop (mst);
@@ -133,6 +152,8 @@
static void
end ()
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Ending tests.\n");
+
if (end_badly_task != GNUNET_SCHEDULER_NO_TASK)
{
GNUNET_SCHEDULER_cancel (end_badly_task);
@@ -144,8 +165,8 @@
static void
-message (void *cls, uint64_t message_id, uint32_t flags,
- const struct GNUNET_MessageHeader *msg)
+master_message (void *cls, uint64_t message_id, uint32_t flags,
+ const struct GNUNET_MessageHeader *msg)
{
if (NULL == msg)
{
@@ -158,16 +179,68 @@
uint16_t size = ntohs (msg->size);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Got message part of type %u and size %u "
+ "Master got message part of type %u and size %u "
"belonging to message ID %llu with flags %u\n",
type, size, message_id, flags);
- if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
- end ();
+ switch (test)
+ {
+ case TEST_SLAVE_TRANSMIT:
+ if (GNUNET_PSYC_MESSAGE_REQUEST != flags)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Unexpected request flags: %lu\n", flags);
+ GNUNET_assert (0);
+ return;
+ }
+ // FIXME: check rest of message
+
+ if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
+ master_transmit ();
+ break;
+
+ case TEST_MASTER_TRANSMIT:
+ break;
+
+ default:
+ GNUNET_assert (0);
+ }
}
static void
+slave_message (void *cls, uint64_t message_id, uint32_t flags,
+ const struct GNUNET_MessageHeader *msg)
+{
+ if (NULL == msg)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Error while receiving message %llu\n", message_id);
+ return;
+ }
+
+ uint16_t type = ntohs (msg->type);
+ uint16_t size = ntohs (msg->size);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Slave got message part of type %u and size %u "
+ "belonging to message ID %llu with flags %u\n",
+ type, size, message_id, flags);
+
+ switch (test)
+ {
+ case TEST_MASTER_TRANSMIT:
+ if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
+ end ();
+ break;
+
+ default:
+ GNUNET_assert (0);
+ }
+}
+
+
+static void
join_request (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
const char *method_name,
size_t variable_count, const struct GNUNET_ENV_Modifier
*variables,
@@ -175,7 +248,9 @@
struct GNUNET_PSYC_JoinHandle *jh)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Got join request.");
+ "Got join request: %s (%zu vars)", method_name, variable_count);
+ GNUNET_PSYC_join_decision (jh, GNUNET_YES, 0, NULL, "_notice_join", NULL,
+ "you're in", 9);
}
@@ -185,7 +260,7 @@
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n");
struct TransmitClosure *tmit = cls;
tmit->paused = GNUNET_NO;
- GNUNET_PSYC_master_transmit_resume (tmit->handle);
+ GNUNET_PSYC_master_transmit_resume (tmit->mst_tmit);
}
@@ -204,7 +279,35 @@
uint16_t name_size = 0;
size_t value_size = 0;
- if (NULL != tmit->mod_value && 0 < tmit->mod_value_size)
+ if (NULL != oper)
+ { /* New modifier */
+ if (GNUNET_NO == GNUNET_ENV_environment_shift (tmit->env, &op, &name,
+ (void *) &value,
&value_size))
+ { /* No more modifiers, continue with data */
+ *data_size = 0;
+ return GNUNET_YES;
+ }
+
+ *oper = op;
+ name_size = strlen (name);
+
+ if (name_size + 1 + value_size <= *data_size)
+ {
+ *data_size = name_size + 1 + value_size;
+ }
+ else
+ {
+ tmit->mod_value_size = value_size;
+ value_size = *data_size - name_size - 1;
+ tmit->mod_value_size -= value_size;
+ tmit->mod_value = value + value_size;
+ }
+
+ memcpy (data, name, name_size);
+ ((char *)data)[name_size] = '\0';
+ memcpy ((char *)data + name_size + 1, value, value_size);
+ }
+ else if (NULL != tmit->mod_value && 0 < tmit->mod_value_size)
{ /* Modifier continuation */
value = tmit->mod_value;
if (tmit->mod_value_size <= *data_size)
@@ -231,35 +334,7 @@
*data_size = value_size;
memcpy (data, value, value_size);
}
- else if (NULL != oper)
- {
- if (GNUNET_NO == GNUNET_ENV_environment_shift (tmit->env, &op, &name,
- (void *) &value,
&value_size))
- { /* No more modifiers, continue with data */
- *data_size = 0;
- return GNUNET_YES;
- }
- *oper = op;
- name_size = strlen (name);
-
- if (name_size + 1 + value_size <= *data_size)
- {
- *data_size = name_size + 1 + value_size;
- }
- else
- {
- tmit->mod_value_size = value_size;
- value_size = *data_size - name_size - 1;
- tmit->mod_value_size -= value_size;
- tmit->mod_value = value + value_size;
- }
-
- memcpy (data, name, name_size);
- ((char *)data)[name_size] = '\0';
- memcpy ((char *)data + name_size + 1, value, value_size);
- }
-
return 0 == tmit->mod_value_size ? GNUNET_YES : GNUNET_NO;
}
@@ -268,6 +343,12 @@
tmit_notify_data (void *cls, uint16_t *data_size, void *data)
{
struct TransmitClosure *tmit = cls;
+ if (0 == tmit->data_count)
+ {
+ *data_size = 0;
+ return GNUNET_YES;
+ }
+
uint16_t size = strlen (tmit->data[tmit->n]);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Transmit notify data: %lu bytes available, "
@@ -300,32 +381,76 @@
static void
-master_started (void *cls, uint64_t max_message_id)
+slave_joined (void *cls, uint64_t max_message_id)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Master started: %" PRIu64 "\n", max_message_id);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave joined: %lu\n",
max_message_id);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave sending request to master.\n");
+ test = TEST_SLAVE_TRANSMIT;
+
tmit = GNUNET_new (struct TransmitClosure);
tmit->env = GNUNET_ENV_environment_create ();
GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
+ "_abc", "abc def", 7);
+ GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
+ "_abc_def", "abc def ghi", 11);
+ tmit->n = 0;
+ tmit->data[0] = "slave test";
+ tmit->data_count = 1;
+ tmit->slv_tmit
+ = GNUNET_PSYC_slave_transmit (slv, "_request_test", tmit_notify_mod,
+ tmit_notify_data, tmit,
+ GNUNET_PSYC_SLAVE_TRANSMIT_NONE);
+}
+
+static void
+slave_join ()
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n");
+
+ struct GNUNET_PeerIdentity origin;
+ struct GNUNET_PeerIdentity relays[16];
+ struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
+ GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
"_foo", "bar baz", 7);
+ GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
+ "_foo_bar", "foo bar baz", 11);
+ slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin,
+ 16, relays, &slave_message, &join_request,
&slave_joined,
+ NULL, "_request_join", env, "some data", 9);
+ GNUNET_ENV_environment_destroy (env);
+}
+
+
+static void
+master_transmit ()
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n");
+ test = TEST_MASTER_TRANSMIT;
+
+ tmit = GNUNET_new (struct TransmitClosure);
+ tmit->env = GNUNET_ENV_environment_create ();
GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
+ "_foo", "bar baz", 7);
+ GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
"_foo_bar", "foo bar baz", 11);
tmit->data[0] = "foo";
tmit->data[1] = "foo bar";
tmit->data[2] = "foo bar baz";
tmit->data_count = 3;
- tmit->handle
- = GNUNET_PSYC_master_transmit (mst, "_test", tmit_notify_mod,
+ tmit->mst_tmit
+ = GNUNET_PSYC_master_transmit (mst, "_notice_test", tmit_notify_mod,
tmit_notify_data, tmit,
GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN);
}
static void
-slave_joined (void *cls, uint64_t max_message_id)
+master_started (void *cls, uint64_t max_message_id)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave joined: %lu\n",
max_message_id);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Master started: %" PRIu64 "\n", max_message_id);
+ slave_join ();
}
@@ -355,21 +480,9 @@
GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key);
GNUNET_CRYPTO_eddsa_key_get_public (slave_key, &slave_pub_key);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n");
mst = GNUNET_PSYC_master_start (cfg, channel_key,
GNUNET_PSYC_CHANNEL_PRIVATE,
- &message, &join_request, &master_started,
NULL);
- return; /* FIXME: test slave */
-
- struct GNUNET_PeerIdentity origin;
- struct GNUNET_PeerIdentity relays[16];
- struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
- GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
- "_foo", "bar baz", 7);
- GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
- "_foo_bar", "foo bar baz", 11);
- slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin,
- 16, relays, &message, &join_request,
&slave_joined,
- NULL, "_request_join", env, "some data", 9);
- GNUNET_ENV_environment_destroy (env);
+ &master_message, &join_request,
&master_started, NULL);
}
Modified: gnunet/src/psycstore/plugin_psycstore_sqlite.c
===================================================================
--- gnunet/src/psycstore/plugin_psycstore_sqlite.c 2014-03-06 23:46:42 UTC
(rev 32574)
+++ gnunet/src/psycstore/plugin_psycstore_sqlite.c 2014-03-06 23:46:45 UTC
(rev 32575)
@@ -435,7 +435,7 @@
&plugin->select_membership);
sql_prepare (plugin->dbh,
- "INSERT INTO messages\n"
+ "INSERT OR IGNORE INTO messages\n"
" (channel_id, hop_counter, signature, purpose,\n"
" fragment_id, fragment_offset, message_id,\n"
" group_generation, multicast_flags, psycstore_flags, data)\n"
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r32575 - in gnunet/src: include multicast psyc psycstore,
gnunet <=