[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet] branch master updated: tng: more UDP communicator backchannels
From: |
gnunet |
Subject: |
[gnunet] branch master updated: tng: more UDP communicator backchannels |
Date: |
Mon, 01 Jun 2020 16:46:00 +0200 |
This is an automated email from the git hooks/post-receive script.
martin-schanzenbach pushed a commit to branch master
in repository gnunet.
The following commit(s) were added to refs/heads/master by this push:
new 198c09654 tng: more UDP communicator backchannels
198c09654 is described below
commit 198c09654354d09a9b33f27cf095e0295f70826c
Author: Martin Schanzenbach <mschanzenbach@posteo.de>
AuthorDate: Mon Jun 1 16:39:35 2020 +0200
tng: more UDP communicator backchannels
Added a new message for queue updates to indicate queue length.
Queues now may also have a priority parameter.
---
src/include/gnunet_protocols.h | 4 +
.../gnunet_transport_communication_service.h | 24 ++
src/transport/gnunet-communicator-tcp.c | 2 +
src/transport/gnunet-communicator-udp.c | 384 +++++++++++++--------
src/transport/gnunet-communicator-unix.c | 2 +
src/transport/test_communicator_basic.c | 45 ++-
src/transport/transport-testing2.c | 126 +++++--
src/transport/transport-testing2.h | 7 +-
src/transport/transport.h | 60 ++++
src/transport/transport_api2_communication.c | 77 +++++
10 files changed, 543 insertions(+), 188 deletions(-)
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 282bb53d1..a9cd7466a 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -3161,6 +3161,10 @@ extern "C" {
*/
#define GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL 1221
+/**
+ * @brief inform transport that a queue was updated
+ */
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE 1222
/**
* Message sent to indicate to the transport that a monitor
diff --git a/src/include/gnunet_transport_communication_service.h
b/src/include/gnunet_transport_communication_service.h
index 3ead03536..ea6b95e2d 100644
--- a/src/include/gnunet_transport_communication_service.h
+++ b/src/include/gnunet_transport_communication_service.h
@@ -50,6 +50,10 @@ extern "C" {
*/
#define GNUNET_TRANSPORT_COMMUNICATION_VERSION 0x00000000
+/**
+ * Queue length
+ */
+#define GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED UINT64_MAX
/**
* Function called by the transport service to initialize a
@@ -252,6 +256,9 @@ enum GNUNET_TRANSPORT_ConnectionStatus
* @param address address in human-readable format, 0-terminated, UTF-8
* @param mtu maximum message size supported by queue, 0 if
* sending is not supported, SIZE_MAX for no MTU
+ * @param q_len number of messages that can be send through this queue
+ * @param priority queue priority. Queues with highest priority should be
+ * used
* @param nt which network type does the @a address belong to?
* @param cs what is the connection status of the queue?
* @param mq message queue of the @a peer
@@ -263,10 +270,27 @@ GNUNET_TRANSPORT_communicator_mq_add (
const struct GNUNET_PeerIdentity *peer,
const char *address,
uint32_t mtu,
+ uint64_t q_len,
+ uint32_t priority,
enum GNUNET_NetworkType nt,
enum GNUNET_TRANSPORT_ConnectionStatus cs,
struct GNUNET_MQ_Handle *mq);
+/**
+ * Notify transport service that an MQ was updated
+ *
+ * @param ch connection to transport service
+ * @param qh the queue to update
+ * @param q_len number of messages that can be send through this queue
+ * @param priority queue priority. Queues with highest priority should be
+ * used
+ */
+void
+GNUNET_TRANSPORT_communicator_mq_update (
+ struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
+ const struct GNUNET_TRANSPORT_QueueHandle *u_qh,
+ uint64_t q_len,
+ uint32_t priority);
/**
* Notify transport service that an MQ became unavailable due to a
diff --git a/src/transport/gnunet-communicator-tcp.c
b/src/transport/gnunet-communicator-tcp.c
index bbfacbffd..7f70c55df 100644
--- a/src/transport/gnunet-communicator-tcp.c
+++ b/src/transport/gnunet-communicator-tcp.c
@@ -1547,6 +1547,8 @@ boot_queue (struct Queue *queue, enum
GNUNET_TRANSPORT_ConnectionStatus cs)
&queue->target,
foreign_addr,
0 /* no MTU */,
+
GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED,
+ 0, /* Priority */
queue->nt,
cs,
queue->mq);
diff --git a/src/transport/gnunet-communicator-udp.c
b/src/transport/gnunet-communicator-udp.c
index 344ba5180..46d9766d0 100644
--- a/src/transport/gnunet-communicator-udp.c
+++ b/src/transport/gnunet-communicator-udp.c
@@ -549,14 +549,24 @@ struct ReceiverAddress
struct GNUNET_CONTAINER_HeapNode *hn;
/**
- * Message queue we are providing for the #ch.
+ * KX message queue we are providing for the #ch.
*/
- struct GNUNET_MQ_Handle *mq;
+ struct GNUNET_MQ_Handle *kx_mq;
+
+ /**
+ * Default message queue we are providing for the #ch.
+ */
+ struct GNUNET_MQ_Handle *d_mq;
+
+ /**
+ * handle for KX queue with the #ch.
+ */
+ struct GNUNET_TRANSPORT_QueueHandle *kx_qh;
/**
- * handle for this queue with the #ch.
+ * handle for default queue with the #ch.
*/
- struct GNUNET_TRANSPORT_QueueHandle *qh;
+ struct GNUNET_TRANSPORT_QueueHandle *d_qh;
/**
* Timeout for this receiver address.
@@ -564,9 +574,14 @@ struct ReceiverAddress
struct GNUNET_TIME_Absolute timeout;
/**
- * MTU we allowed transport for this receiver right now.
+ * MTU we allowed transport for this receiver's KX queue.
*/
- size_t mtu;
+ size_t kx_mtu;
+
+ /**
+ * MTU we allowed transport for this receiver's default queue.
+ */
+ size_t d_mtu;
/**
* Length of the DLL at @a ss_head.
@@ -786,15 +801,25 @@ receiver_destroy (struct ReceiverAddress *receiver)
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Disconnecting receiver for peer `%s'\n",
GNUNET_i2s (&receiver->target));
- if (NULL != (mq = receiver->mq))
+ if (NULL != (mq = receiver->kx_mq))
{
- receiver->mq = NULL;
+ receiver->kx_mq = NULL;
GNUNET_MQ_destroy (mq);
}
- if (NULL != receiver->qh)
+ if (NULL != receiver->kx_qh)
{
- GNUNET_TRANSPORT_communicator_mq_del (receiver->qh);
- receiver->qh = NULL;
+ GNUNET_TRANSPORT_communicator_mq_del (receiver->kx_qh);
+ receiver->kx_qh = NULL;
+ }
+ if (NULL != (mq = receiver->d_mq))
+ {
+ receiver->d_mq = NULL;
+ GNUNET_MQ_destroy (mq);
+ }
+ if (NULL != receiver->d_qh)
+ {
+ GNUNET_TRANSPORT_communicator_mq_del (receiver->d_qh);
+ receiver->d_qh = NULL;
}
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multipeermap_remove (receivers,
@@ -1265,30 +1290,27 @@ handle_ack (void *cls, const struct GNUNET_PeerIdentity
*pid, void *value)
(void) pid;
for (struct SharedSecret *ss = receiver->ss_head; NULL != ss; ss = ss->next)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Checking shared secrets\n");
if (0 == memcmp (&ack->cmac, &ss->cmac, sizeof(struct GNUNET_HashCode)))
{
uint32_t allowed;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Found matching mac\n");
+ "Found matching mac\n");
allowed = ntohl (ack->sequence_max);
if (allowed > ss->sequence_allowed)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%u > %u (%u)\n", allowed, ss->sequence_allowed,
- receiver->acks_available);
+ "%u > %u (%u)\n", allowed, ss->sequence_allowed,
+ receiver->acks_available);
receiver->acks_available += (allowed - ss->sequence_allowed);
- if ((allowed - ss->sequence_allowed) == receiver->acks_available)
- {
- /* we just incremented from zero => MTU change! */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "we just incremented from zero => MTU change!\n");
- //TODO setup_receiver_mq (receiver);
- }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Tell transport we have more acks!\n");
+ GNUNET_TRANSPORT_communicator_mq_update (ch,
+ receiver->d_qh,
+ (allowed -
ss->sequence_allowed),
+ 1);
ss->sequence_allowed = allowed;
/* move ss to head to avoid discarding it anytime soon! */
GNUNET_CONTAINER_DLL_remove (receiver->ss_head, receiver->ss_tail, ss);
@@ -1906,15 +1928,24 @@ do_pad (gcry_cipher_hd_t out_cipher, char *dgram,
size_t pad_size)
* @param impl_state our `struct ReceiverAddress`
*/
static void
-mq_send (struct GNUNET_MQ_Handle *mq,
- const struct GNUNET_MessageHeader *msg,
- void *impl_state)
+mq_send_kx (struct GNUNET_MQ_Handle *mq,
+ const struct GNUNET_MessageHeader *msg,
+ void *impl_state)
{
struct ReceiverAddress *receiver = impl_state;
uint16_t msize = ntohs (msg->size);
+ struct UdpHandshakeSignature uhs;
+ struct UDPConfirmation uc;
+ struct InitialKX kx;
+ struct GNUNET_CRYPTO_EcdhePrivateKey epriv;
+ char dgram[receiver->kx_mtu + sizeof(uc) + sizeof(kx)];
+ size_t dpos;
+ gcry_cipher_hd_t out_cipher;
+ struct SharedSecret *ss;
+
- GNUNET_assert (mq == receiver->mq);
- if (msize > receiver->mtu)
+ GNUNET_assert (mq == receiver->kx_mq);
+ if (msize > receiver->kx_mtu)
{
GNUNET_break (0);
receiver_destroy (receiver);
@@ -1922,117 +1953,124 @@ mq_send (struct GNUNET_MQ_Handle *mq,
}
reschedule_receiver_timeout (receiver);
- if (0 == receiver->acks_available)
+ /* setup key material */
+ GNUNET_CRYPTO_ecdhe_key_create (&epriv);
+
+ ss = setup_shared_secret_enc (&epriv, receiver);
+ setup_cipher (&ss->master, 0, &out_cipher);
+ /* compute 'uc' */
+ uc.sender = my_identity;
+ uc.monotonic_time =
+ GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
+ uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_UDP_HANDSHAKE);
+ uhs.purpose.size = htonl (sizeof(uhs));
+ uhs.sender = my_identity;
+ uhs.receiver = receiver->target;
+ GNUNET_CRYPTO_ecdhe_key_get_public (&epriv, &uhs.ephemeral);
+ uhs.monotonic_time = uc.monotonic_time;
+ GNUNET_CRYPTO_eddsa_sign (my_private_key,
+ &uhs,
+ &uc.sender_sig);
+ /* Leave space for kx */
+ dpos = sizeof(kx);
+ /* Append encrypted uc to dgram */
+ GNUNET_assert (0 == gcry_cipher_encrypt (out_cipher,
+ &dgram[dpos],
+ sizeof(uc),
+ &uc,
+ sizeof(uc)));
+ dpos += sizeof(uc);
+ /* Append encrypted payload to dgram */
+ GNUNET_assert (
+ 0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize));
+ dpos += msize;
+ do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos);
+ /* Datagram starts with kx */
+ kx.ephemeral = uhs.ephemeral;
+ GNUNET_assert (
+ 0 == gcry_cipher_gettag (out_cipher, kx.gcm_tag, sizeof(kx.gcm_tag)));
+ gcry_cipher_close (out_cipher);
+ memcpy (dgram, &kx, sizeof(kx));
+ if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock,
+ dgram,
+ sizeof(dgram),
+ receiver->address,
+ receiver->address_len))
+ GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending KX to %s\n", GNUNET_a2s (receiver->address,
+ receiver->address_len));
+ GNUNET_MQ_impl_send_continue (mq);
+}
+
+
+/**
+ * Signature of functions implementing the sending functionality of a
+ * message queue.
+ *
+ * @param mq the message queue
+ * @param msg the message to send
+ * @param impl_state our `struct ReceiverAddress`
+ */
+static void
+mq_send_d (struct GNUNET_MQ_Handle *mq,
+ const struct GNUNET_MessageHeader *msg,
+ void *impl_state)
+{
+ struct ReceiverAddress *receiver = impl_state;
+ uint16_t msize = ntohs (msg->size);
+
+ GNUNET_assert (mq == receiver->d_mq);
+ if ((msize > receiver->d_mtu) ||
+ (0 == receiver->acks_available))
{
- /* use KX encryption method */
- struct UdpHandshakeSignature uhs;
- struct UDPConfirmation uc;
- struct InitialKX kx;
- struct GNUNET_CRYPTO_EcdhePrivateKey epriv;
- char dgram[receiver->mtu + sizeof(uc) + sizeof(kx)];
- size_t dpos;
- gcry_cipher_hd_t out_cipher;
- struct SharedSecret *ss;
+ GNUNET_break (0);
+ receiver_destroy (receiver);
+ return;
+ }
+ reschedule_receiver_timeout (receiver);
- /* setup key material */
- GNUNET_CRYPTO_ecdhe_key_create (&epriv);
+ /* begin "BOX" encryption method, scan for ACKs from tail! */
+ for (struct SharedSecret *ss = receiver->ss_tail; NULL != ss; ss = ss->prev)
+ {
+ if (ss->sequence_used >= ss->sequence_allowed)
+ {
+ continue;
+ }
+ char dgram[sizeof(struct UDPBox) + receiver->d_mtu];
+ struct UDPBox *box;
+ gcry_cipher_hd_t out_cipher;
+ size_t dpos;
- ss = setup_shared_secret_enc (&epriv, receiver);
- setup_cipher (&ss->master, 0, &out_cipher);
- /* compute 'uc' */
- uc.sender = my_identity;
- uc.monotonic_time =
- GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
- uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_UDP_HANDSHAKE);
- uhs.purpose.size = htonl (sizeof(uhs));
- uhs.sender = my_identity;
- uhs.receiver = receiver->target;
- GNUNET_CRYPTO_ecdhe_key_get_public (&epriv, &uhs.ephemeral);
- uhs.monotonic_time = uc.monotonic_time;
- GNUNET_CRYPTO_eddsa_sign (my_private_key,
- &uhs,
- &uc.sender_sig);
- /* Leave space for kx */
- dpos = sizeof(kx);
- /* Append encrypted uc to dgram */
- GNUNET_assert (0 == gcry_cipher_encrypt (out_cipher,
- &dgram[dpos],
- sizeof(uc),
- &uc,
- sizeof(uc)));
- dpos += sizeof(uc);
+ box = (struct UDPBox *) dgram;
+ ss->sequence_used++;
+ get_kid (&ss->master, ss->sequence_used, &box->kid);
+ setup_cipher (&ss->master, ss->sequence_used, &out_cipher);
/* Append encrypted payload to dgram */
+ dpos = sizeof(struct UDPBox);
GNUNET_assert (
0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize));
dpos += msize;
do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos);
- /* Datagram starts with kx */
- kx.ephemeral = uhs.ephemeral;
- GNUNET_assert (
- 0 == gcry_cipher_gettag (out_cipher, kx.gcm_tag, sizeof(kx.gcm_tag)));
+ GNUNET_assert (0 == gcry_cipher_gettag (out_cipher,
+ box->gcm_tag,
+ sizeof(box->gcm_tag)));
gcry_cipher_close (out_cipher);
- memcpy (dgram, &kx, sizeof(kx));
if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock,
dgram,
sizeof(dgram),
receiver->address,
receiver->address_len))
GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send");
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending KX to %s\n", GNUNET_a2s (receiver->address,
- receiver->address_len));
GNUNET_MQ_impl_send_continue (mq);
- return;
- } /* End of KX encryption method */
-
- /* begin "BOX" encryption method, scan for ACKs from tail! */
- for (struct SharedSecret *ss = receiver->ss_tail; NULL != ss; ss = ss->prev)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "In non-kx mode...\n");
- if (ss->sequence_used < ss->sequence_allowed)
+ receiver->acks_available--;
+ if (0 == receiver->acks_available)
{
- char dgram[sizeof(struct UDPBox) + receiver->mtu];
- struct UDPBox *box;
- gcry_cipher_hd_t out_cipher;
- size_t dpos;
-
- box = (struct UDPBox *) dgram;
- ss->sequence_used++;
- get_kid (&ss->master, ss->sequence_used, &box->kid);
- setup_cipher (&ss->master, ss->sequence_used, &out_cipher);
- /* Append encrypted payload to dgram */
- dpos = sizeof(struct UDPBox);
- GNUNET_assert (
- 0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg,
msize));
- dpos += msize;
- do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos);
- GNUNET_assert (0 == gcry_cipher_gettag (out_cipher,
- box->gcm_tag,
- sizeof(box->gcm_tag)));
- gcry_cipher_close (out_cipher);
- if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock,
- dgram,
- sizeof(dgram),
- receiver->address,
- receiver->address_len))
- GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send");
+ /* We have no more ACKs */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending data\n");
-
- GNUNET_MQ_impl_send_continue (mq);
- receiver->acks_available--;
- if (0 == receiver->acks_available)
- {
- /* We have no more ACKs => MTU change! */
- setup_receiver_mq (receiver);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "No more acks, MTU changed\n");
- }
- return;
+ "No more acks\n");
}
}
- GNUNET_assert (0);
}
@@ -2045,15 +2083,37 @@ mq_send (struct GNUNET_MQ_Handle *mq,
* @param impl_state our `struct ReceiverAddress`
*/
static void
-mq_destroy (struct GNUNET_MQ_Handle *mq, void *impl_state)
+mq_destroy_d (struct GNUNET_MQ_Handle *mq, void *impl_state)
{
struct ReceiverAddress *receiver = impl_state;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "MQ destroyed\n");
- if (mq == receiver->mq)
+ "Default MQ destroyed\n");
+ if (mq == receiver->d_mq)
{
- receiver->mq = NULL;
- //receiver_destroy (receiver);
+ receiver->d_mq = NULL;
+ receiver_destroy (receiver);
+ }
+}
+
+
+/**
+ * Signature of functions implementing the destruction of a message
+ * queue. Implementations must not free @a mq, but should take care
+ * of @a impl_state.
+ *
+ * @param mq the message queue to destroy
+ * @param impl_state our `struct ReceiverAddress`
+ */
+static void
+mq_destroy_kx (struct GNUNET_MQ_Handle *mq, void *impl_state)
+{
+ struct ReceiverAddress *receiver = impl_state;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "KX MQ destroyed\n");
+ if (mq == receiver->kx_mq)
+ {
+ receiver->kx_mq = NULL;
+ receiver_destroy (receiver);
}
}
@@ -2106,12 +2166,17 @@ setup_receiver_mq (struct ReceiverAddress *receiver)
{
size_t base_mtu;
- if (NULL != receiver->qh)
+ /*if (NULL != receiver->kx_qh)
{
- GNUNET_TRANSPORT_communicator_mq_del (receiver->qh);
- receiver->qh = NULL;
+ GNUNET_TRANSPORT_communicator_mq_del (receiver->kx_qh);
+ receiver->kx_qh = NULL;
}
- //GNUNET_assert (NULL == receiver->mq);
+ if (NULL != receiver->d_qh)
+ {
+ GNUNET_TRANSPORT_communicator_mq_del (receiver->d_qh);
+ receiver->d_qh = NULL;
+ }*/
+ // GNUNET_assert (NULL == receiver->mq);
switch (receiver->address->sa_family)
{
case AF_INET:
@@ -2130,35 +2195,54 @@ setup_receiver_mq (struct ReceiverAddress *receiver)
GNUNET_assert (0);
break;
}
- if (0 == receiver->acks_available)
- {
- /* MTU based on full KX messages */
- receiver->mtu = base_mtu - sizeof(struct InitialKX) /* 48 */
- - sizeof(struct UDPConfirmation); /* 104 */
- }
- else
- {
- /* MTU based on BOXed messages */
- receiver->mtu = base_mtu - sizeof(struct UDPBox);
- }
+ /* MTU based on full KX messages */
+ receiver->kx_mtu = base_mtu - sizeof(struct InitialKX) /* 48 */
+ - sizeof(struct UDPConfirmation); /* 104 */
+ /* MTU based on BOXed messages */
+ receiver->d_mtu = base_mtu - sizeof(struct UDPBox);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Setting up MQs and QHs\n");
/* => Effective MTU for CORE will range from 1080 (IPv6 + KX) to
1404 (IPv4 + Box) bytes, depending on circumstances... */
- if (NULL == receiver->mq)
- receiver->mq = GNUNET_MQ_queue_for_callbacks (&mq_send,
- &mq_destroy,
- &mq_cancel,
- receiver,
- NULL,
- &mq_error,
- receiver);
- receiver->qh =
+ if (NULL == receiver->kx_mq)
+ receiver->kx_mq = GNUNET_MQ_queue_for_callbacks (&mq_send_kx,
+ &mq_destroy_kx,
+ &mq_cancel,
+ receiver,
+ NULL,
+ &mq_error,
+ receiver);
+ if (NULL == receiver->d_mq)
+ receiver->d_mq = GNUNET_MQ_queue_for_callbacks (&mq_send_d,
+ &mq_destroy_d,
+ &mq_cancel,
+ receiver,
+ NULL,
+ &mq_error,
+ receiver);
+
+ receiver->kx_qh =
GNUNET_TRANSPORT_communicator_mq_add (ch,
&receiver->target,
receiver->foreign_addr,
- receiver->mtu,
+ receiver->kx_mtu,
+
GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED,
+ 0, /* Priority */
receiver->nt,
GNUNET_TRANSPORT_CS_OUTBOUND,
- receiver->mq);
+ receiver->kx_mq);
+ receiver->d_qh =
+ GNUNET_TRANSPORT_communicator_mq_add (ch,
+ &receiver->target,
+ receiver->foreign_addr,
+ receiver->d_mtu,
+ 0, /* Initialize with 0 acks */
+ 1, /* Priority */
+ receiver->nt,
+ GNUNET_TRANSPORT_CS_OUTBOUND,
+ receiver->d_mq);
+
}
diff --git a/src/transport/gnunet-communicator-unix.c
b/src/transport/gnunet-communicator-unix.c
index 31d2e4ed3..27dda7281 100644
--- a/src/transport/gnunet-communicator-unix.c
+++ b/src/transport/gnunet-communicator-unix.c
@@ -670,6 +670,8 @@ setup_queue (const struct GNUNET_PeerIdentity *target,
&queue->target,
foreign_addr,
UNIX_MTU,
+
GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED,
+ 0,
GNUNET_NT_LOOPBACK,
cs,
queue->mq);
diff --git a/src/transport/test_communicator_basic.c
b/src/transport/test_communicator_basic.c
index 1dfcf2371..1ea79fa19 100644
--- a/src/transport/test_communicator_basic.c
+++ b/src/transport/test_communicator_basic.c
@@ -58,19 +58,21 @@ static char *cfg_peers_name[NUM_PEERS];
static int ret;
+static size_t long_message_size;
+
static struct GNUNET_TIME_Absolute start_short;
static struct GNUNET_TIME_Absolute start_long;
static struct GNUNET_TIME_Absolute timeout;
-static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *my_tc;
+static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *my_tc;
#define SHORT_MESSAGE_SIZE 128
-#define LONG_MESSAGE_SIZE 32000
+#define LONG_MESSAGE_SIZE 32000 /* FIXME */
-#define BURST_PACKETS 50
+#define BURST_PACKETS 500
#define TOTAL_ITERATIONS 1
@@ -88,6 +90,7 @@ static unsigned int iterations_left = TOTAL_ITERATIONS;
enum TestPhase
{
+ TP_INIT,
TP_BURST_SHORT,
TP_BURST_LONG,
TP_SIZE_CHECK
@@ -230,15 +233,18 @@ static void
size_test (void *cls)
{
char *payload;
+ size_t max_size = 64000;
GNUNET_assert (TP_SIZE_CHECK == phase);
- if (ack >= 64000)
+ if (LONG_MESSAGE_SIZE != long_message_size)
+ max_size = long_message_size;
+ if (ack >= max_size)
return; /* Leave some room for our protocol, so not 2^16 exactly */
payload = make_payload (ack);
ack += 5;
num_sent++;
GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc,
- (ack < 64000)
+ (ack < max_size)
? &size_test
: NULL,
NULL,
@@ -254,7 +260,7 @@ long_test (void *cls)
{
char *payload;
- payload = make_payload (LONG_MESSAGE_SIZE);
+ payload = make_payload (long_message_size);
num_sent++;
GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc,
(BURST_PACKETS ==
@@ -263,7 +269,7 @@ long_test (void *cls)
: &long_test,
NULL,
payload,
- LONG_MESSAGE_SIZE);
+ long_message_size);
GNUNET_free (payload);
timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS);
}
@@ -288,6 +294,7 @@ short_test (void *cls)
timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS);
}
+
static int test_prepared = GNUNET_NO;
/**
@@ -316,7 +323,6 @@ prepare_test (void *cls)
}
-
/**
* @brief Handle opening of queue
*
@@ -332,18 +338,25 @@ static void
add_queue_cb (void *cls,
struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle
*tc_h,
struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *
- tc_queue)
+ tc_queue,
+ size_t mtu)
{
+ if (TP_INIT != phase)
+ return;
if (0 != strcmp ((char*) cls, cfg_peers_name[0]))
return; // TODO?
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Queue established, starting test...\n");
start_short = GNUNET_TIME_absolute_get ();
- my_tc = tc_queue;
+ my_tc = tc_h;
+ if (0 != mtu)
+ long_message_size = mtu;
+ else
+ long_message_size = LONG_MESSAGE_SIZE;
phase = TP_BURST_SHORT;
- timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS);
+ timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_MINUTES);
GNUNET_assert (NULL == to_task);
- to_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
+ to_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
&latency_timeout,
NULL);
prepare_test (NULL);
@@ -395,6 +408,9 @@ incoming_message_cb (void *cls,
timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS);
switch (phase)
{
+ case TP_INIT:
+ GNUNET_break (0);
+ break;
case TP_BURST_SHORT:
{
GNUNET_assert (SHORT_MESSAGE_SIZE == payload_len);
@@ -428,7 +444,7 @@ incoming_message_cb (void *cls,
}
case TP_BURST_LONG:
{
- if (LONG_MESSAGE_SIZE != payload_len)
+ if (long_message_size != payload_len)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Ignoring packet with wrong length\n");
@@ -441,7 +457,7 @@ incoming_message_cb (void *cls,
{
GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
"Long size packet test done.\n");
- char *goodput = GNUNET_STRINGS_byte_size_fancy ((LONG_MESSAGE_SIZE
+ char *goodput = GNUNET_STRINGS_byte_size_fancy ((long_message_size
* num_received * 1000
* 1000)
/
duration.rel_value_us);
@@ -553,6 +569,7 @@ main (int argc,
char *test_name;
char *cfg_peer;
+ phase = TP_INIT;
ret = 1;
test_name = GNUNET_TESTING_get_testname_from_underscore (argv[0]);
communicator_name = strchr (test_name, '-');
diff --git a/src/transport/transport-testing2.c
b/src/transport/transport-testing2.c
index fc6d13590..8250027f7 100644
--- a/src/transport/transport-testing2.c
+++ b/src/transport/transport-testing2.c
@@ -33,7 +33,7 @@
#include "gnunet_hello_lib.h"
#include "gnunet_signatures.h"
#include "transport.h"
-
+#include <inttypes.h>
#define LOG(kind, ...) GNUNET_log_from (kind, "transport-testing2",
__VA_ARGS__)
@@ -227,10 +227,20 @@ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue
uint32_t nt;
/**
- * Maximum transmission unit, in NBO. UINT32_MAX for unlimited.
+ * Maximum transmission unit. UINT32_MAX for unlimited.
*/
uint32_t mtu;
+ /**
+ * Queue length. UINT64_MAX for unlimited.
+ */
+ uint64_t q_len;
+
+ /**
+ * Queue prio
+ */
+ uint32_t priority;
+
/**
* An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO.
*/
@@ -370,8 +380,8 @@ handle_communicator_backchannel (void *cls,
struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi;
struct GNUNET_MQ_Envelope *env;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received backchannel message\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received backchannel message\n");
if (tc_h->bc_enabled != GNUNET_YES)
{
GNUNET_SERVICE_client_continue (client->client);
@@ -379,10 +389,10 @@ handle_communicator_backchannel (void *cls,
}
/* Find client providing this communicator */
/* Finally, deliver backchannel message to communicator */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Delivering backchannel message of type %u to %s\n",
- ntohs (msg->type),
- target_communicator);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Delivering backchannel message of type %u to %s\n",
+ ntohs (msg->type),
+ target_communicator);
other_tc_h = tc_h->bc_cb (tc_h, msg, (struct
GNUNET_PeerIdentity*) &bc_msg->pid);
env = GNUNET_MQ_msg_extra (
@@ -496,9 +506,6 @@ handle_incoming_msg (void *cls,
msg = (struct GNUNET_MessageHeader *) &inc_msg[1];
size_t payload_len = ntohs (msg->size) - sizeof (struct
GNUNET_MessageHeader);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Incoming message from communicator!\n");
-
if (NULL != tc_h->incoming_msg_cb)
{
tc_h->incoming_msg_cb (tc_h->cb_cls,
@@ -608,15 +615,14 @@ handle_add_queue_message (void *cls,
client->tc;
struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
- tc_queue = tc_h->queue_head;
- if (NULL != tc_queue)
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got queue with ID %u\n", msg->qid);
+ for (tc_queue = tc_h->queue_head; NULL != tc_queue; tc_queue =
tc_queue->next)
{
- while (tc_queue->qid != msg->qid)
- {
- tc_queue = tc_queue->next;
- }
+ if (tc_queue->qid == msg->qid)
+ break;
}
- else
+ if (NULL == tc_queue)
{
tc_queue =
GNUNET_new (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue);
@@ -628,16 +634,58 @@ handle_add_queue_message (void *cls,
GNUNET_assert (tc_queue->qid == msg->qid);
GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver));
tc_queue->nt = msg->nt;
- tc_queue->mtu = msg->mtu;
+ tc_queue->mtu = ntohl (msg->mtu);
tc_queue->cs = msg->cs;
+ tc_queue->priority = ntohl (msg->priority);
+ tc_queue->q_len = GNUNET_ntohll (msg->q_len);
if (NULL != tc_h->add_queue_cb)
{
- tc_h->add_queue_cb (tc_h->cb_cls, tc_h, tc_queue);
+ tc_h->add_queue_cb (tc_h->cb_cls, tc_h, tc_queue, tc_queue->mtu);
}
GNUNET_SERVICE_client_continue (client->client);
}
+/**
+ * @brief Handle new queue
+ *
+ * Store context and call client callback.
+ *
+ * @param cls Closure - communicator handle
+ * @param msg Message struct
+ */
+static void
+handle_update_queue_message (void *cls,
+ const struct
+ GNUNET_TRANSPORT_UpdateQueueMessage *msg)
+{
+ struct MyClient *client = cls;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
+ client->tc;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received queue update message for %u with q_len %"PRIu64"\n",
+ msg->qid, GNUNET_ntohll(msg->q_len));
+ tc_queue = tc_h->queue_head;
+ if (NULL != tc_queue)
+ {
+ while (tc_queue->qid != msg->qid)
+ {
+ tc_queue = tc_queue->next;
+ }
+ }
+ GNUNET_assert (tc_queue->qid == msg->qid);
+ GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver));
+ tc_queue->nt = msg->nt;
+ tc_queue->mtu = ntohl (msg->mtu);
+ tc_queue->cs = msg->cs;
+ tc_queue->priority = ntohl (msg->priority);
+ tc_queue->q_len += GNUNET_ntohll (msg->q_len);
+ GNUNET_SERVICE_client_continue (client->client);
+}
+
+
/**
* @brief Shut down the service
*
@@ -789,6 +837,10 @@ transport_communicator_start (
GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
struct GNUNET_TRANSPORT_AddQueueMessage,
tc_h),
+ GNUNET_MQ_hd_fixed_size (update_queue_message,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE,
+ struct GNUNET_TRANSPORT_UpdateQueueMessage,
+ tc_h),
// GNUNET_MQ_hd_fixed_size (del_queue_message,
// GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
// struct GNUNET_TRANSPORT_DelQueueMessage,
@@ -1063,7 +1115,7 @@
GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue (
*/
void
GNUNET_TRANSPORT_TESTING_transport_communicator_send
- (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue,
+ (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h,
GNUNET_SCHEDULER_TaskCallback cont,
void *cont_cls,
const void *payload,
@@ -1073,7 +1125,39 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_send
struct GNUNET_TRANSPORT_SendMessageTo *msg;
struct GNUNET_MQ_Envelope *env;
size_t inbox_size;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue_tmp;
+ tc_queue = NULL;
+ for (tc_queue_tmp = tc_h->queue_head;
+ NULL != tc_queue_tmp;
+ tc_queue_tmp = tc_queue_tmp->next)
+ {
+ if (tc_queue_tmp->q_len <= 0)
+ continue;
+ if (NULL == tc_queue)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n",
+ tc_queue_tmp->priority,
+ tc_queue_tmp->q_len,
+ tc_queue_tmp->mtu);
+ tc_queue = tc_queue_tmp;
+ continue;
+ }
+ if (tc_queue->priority < tc_queue_tmp->priority)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n",
+ tc_queue_tmp->priority,
+ tc_queue_tmp->q_len,
+ tc_queue_tmp->mtu);
+ tc_queue = tc_queue_tmp;
+ }
+ }
+ GNUNET_assert (NULL != tc_queue);
+ if (tc_queue->q_len != GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED)
+ tc_queue->q_len--;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending message\n");
inbox_size = sizeof (struct GNUNET_MessageHeader) + payload_size;
diff --git a/src/transport/transport-testing2.h
b/src/transport/transport-testing2.h
index 7a449f081..b77125e82 100644
--- a/src/transport/transport-testing2.h
+++ b/src/transport/transport-testing2.h
@@ -132,7 +132,8 @@ typedef void
*tc_h,
struct
GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue
- *tc_queue);
+ *tc_queue,
+ size_t mtu);
/**
@@ -215,8 +216,8 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue
(struct
*/
void
GNUNET_TRANSPORT_TESTING_transport_communicator_send (struct
-
GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue
- *tc_queue,
+
GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle
+ *tc_h,
GNUNET_SCHEDULER_TaskCallback
cont,
void *cont_cls,
diff --git a/src/transport/transport.h b/src/transport/transport.h
index 36182d8d7..a64ffd5c6 100644
--- a/src/transport/transport.h
+++ b/src/transport/transport.h
@@ -835,6 +835,17 @@ struct GNUNET_TRANSPORT_AddQueueMessage
*/
uint32_t mtu;
+ /**
+ * Queue length, in NBO. Defines how many messages may be
+ * send through this queue. UINT64_MAX for unlimited.
+ */
+ uint64_t q_len;
+
+ /**
+ * Priority of the queue in relation to other queues.
+ */
+ uint32_t priority;
+
/**
* An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO.
*/
@@ -844,6 +855,55 @@ struct GNUNET_TRANSPORT_AddQueueMessage
};
+/**
+ * Update queue
+ */
+struct GNUNET_TRANSPORT_UpdateQueueMessage
+{
+ /**
+ * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Queue identifier (used to identify the queue).
+ */
+ uint32_t qid GNUNET_PACKED;
+
+ /**
+ * Receiver that can be addressed via the queue.
+ */
+ struct GNUNET_PeerIdentity receiver;
+
+ /**
+ * An `enum GNUNET_NetworkType` in NBO.
+ */
+ uint32_t nt;
+
+ /**
+ * Maximum transmission unit, in NBO. UINT32_MAX for unlimited.
+ */
+ uint32_t mtu;
+
+ /**
+ * Queue length, in NBO. Defines how many messages may be
+ * send through this queue. UINT64_MAX for unlimited.
+ */
+ uint64_t q_len;
+
+ /**
+ * Priority of the queue in relation to other queues.
+ */
+ uint32_t priority;
+
+ /**
+ * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO.
+ */
+ uint32_t cs;
+};
+
+
+
/**
* Remove queue, it is no longer available.
*/
diff --git a/src/transport/transport_api2_communication.c
b/src/transport/transport_api2_communication.c
index e80cd5c03..cfa144415 100644
--- a/src/transport/transport_api2_communication.c
+++ b/src/transport/transport_api2_communication.c
@@ -280,6 +280,15 @@ struct GNUNET_TRANSPORT_QueueHandle
* Maximum transmission unit for the queue.
*/
uint32_t mtu;
+
+ /**
+ * Queue length.
+ */
+ uint64_t q_len;
+ /**
+ * Queue priority.
+ */
+ uint32_t priority;
};
@@ -395,6 +404,8 @@ send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
if (NULL == qh->ch->mq)
return;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending `GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP` message\n");
env = GNUNET_MQ_msg_extra (aqm,
strlen (qh->address) + 1,
GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP);
@@ -402,11 +413,39 @@ send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
aqm->receiver = qh->peer;
aqm->nt = htonl ((uint32_t) qh->nt);
aqm->mtu = htonl (qh->mtu);
+ aqm->q_len = GNUNET_htonll (qh->q_len);
+ aqm->priority = htonl (qh->priority);
aqm->cs = htonl ((uint32_t) qh->cs);
memcpy (&aqm[1], qh->address, strlen (qh->address) + 1);
GNUNET_MQ_send (qh->ch->mq, env);
}
+/**
+ * Send message to the transport service about queue @a qh
+ * updated.
+ *
+ * @param qh queue to add
+ */
+static void
+send_update_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
+{
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_TRANSPORT_UpdateQueueMessage *uqm;
+
+ if (NULL == qh->ch->mq)
+ return;
+ env = GNUNET_MQ_msg (uqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE);
+ uqm->qid = htonl (qh->queue_id);
+ uqm->receiver = qh->peer;
+ uqm->nt = htonl ((uint32_t) qh->nt);
+ uqm->mtu = htonl (qh->mtu);
+ uqm->q_len = GNUNET_htonll (qh->q_len);
+ uqm->priority = htonl (qh->priority);
+ uqm->cs = htonl ((uint32_t) qh->cs);
+ GNUNET_MQ_send (qh->ch->mq, env);
+}
+
+
/**
* Send message to the transport service about queue @a qh
@@ -924,6 +963,9 @@ GNUNET_TRANSPORT_communicator_receive (
* @param address address in human-readable format, 0-terminated, UTF-8
* @param mtu maximum message size supported by queue, 0 if
* sending is not supported, SIZE_MAX for no MTU
+ * @param q_len number of messages that can be send through this queue
+ * @param priority queue priority. Queues with highest priority should be
+ * used
* @param nt which network type does the @a address belong to?
* @param cc what characteristics does the communicator have?
* @param cs what is the connection status of the queue?
@@ -936,6 +978,8 @@ GNUNET_TRANSPORT_communicator_mq_add (
const struct GNUNET_PeerIdentity *peer,
const char *address,
uint32_t mtu,
+ uint64_t q_len,
+ uint32_t priority,
enum GNUNET_NetworkType nt,
enum GNUNET_TRANSPORT_ConnectionStatus cs,
struct GNUNET_MQ_Handle *mq)
@@ -948,6 +992,8 @@ GNUNET_TRANSPORT_communicator_mq_add (
qh->address = GNUNET_strdup (address);
qh->nt = nt;
qh->mtu = mtu;
+ qh->q_len = q_len;
+ qh->priority = priority;
qh->cs = cs;
qh->mq = mq;
qh->queue_id = ch->queue_gen++;
@@ -957,6 +1003,37 @@ GNUNET_TRANSPORT_communicator_mq_add (
}
+/**
+ * Notify transport service that an MQ was updated
+ *
+ * @param ch connection to transport service
+ * @param qh the queue to update
+ * @param q_len number of messages that can be send through this queue
+ * @param priority queue priority. Queues with highest priority should be
+ * used
+ */
+void
+GNUNET_TRANSPORT_communicator_mq_update (
+ struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
+ const struct GNUNET_TRANSPORT_QueueHandle *u_qh,
+ uint64_t q_len,
+ uint32_t priority)
+{
+ struct GNUNET_TRANSPORT_QueueHandle *qh;
+
+ for (qh = ch->queue_head; NULL != qh; qh = qh->next)
+ {
+ if (u_qh == qh)
+ break;
+ }
+ GNUNET_assert (NULL != qh);
+ qh->q_len = q_len;
+ qh->priority = priority;
+ send_update_queue (qh);
+}
+
+
+
/**
* Notify transport service that an MQ became unavailable due to a
* disconnect or timeout.
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [gnunet] branch master updated: tng: more UDP communicator backchannels,
gnunet <=