[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r33842 - gnunet/src/transport
From: |
gnunet |
Subject: |
[GNUnet-SVN] r33842 - gnunet/src/transport |
Date: |
Wed, 25 Jun 2014 16:14:17 +0200 |
Author: grothoff
Date: 2014-06-25 16:14:16 +0200 (Wed, 25 Jun 2014)
New Revision: 33842
Modified:
gnunet/src/transport/plugin_transport_http_client.c
gnunet/src/transport/plugin_transport_http_server.c
Log:
-add monitor support to http server, more code clean up, add http server MHD
suspend feature, support quota reset API from transport in HTTP server
Modified: gnunet/src/transport/plugin_transport_http_client.c
===================================================================
--- gnunet/src/transport/plugin_transport_http_client.c 2014-06-25 08:24:22 UTC
(rev 33841)
+++ gnunet/src/transport/plugin_transport_http_client.c 2014-06-25 14:14:16 UTC
(rev 33842)
@@ -156,9 +156,10 @@
struct HTTP_Client_Plugin *plugin;
/**
- * Client send handle
+ * Curl client PUT handle.
+ * FIXME: delta to put.easyhandle?
*/
- void *client_put;
+ CURL *client_put;
/**
* Handle for the HTTP PUT request.
@@ -166,14 +167,15 @@
struct ConnectionHandle put;
/**
- * Handle for the HTTP GET request.
+ * Curl client GET handle
+ * FIXME: delta to get.easyhandle?
*/
- struct ConnectionHandle get;
+ CURL *client_get;
/**
- * Client receive handle
+ * Handle for the HTTP GET request.
*/
- void *client_get;
+ struct ConnectionHandle get;
/**
* next pointer for double linked list
@@ -415,6 +417,7 @@
struct HTTP_Client_Plugin *plugin = s->plugin;
struct HTTP_Message *pos;
struct HTTP_Message *next;
+ CURLMcode mret;
if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task)
{
@@ -427,11 +430,49 @@
GNUNET_SCHEDULER_cancel (s->put_disconnect_task);
s->put_disconnect_task = GNUNET_SCHEDULER_NO_TASK;
}
+ if (GNUNET_SCHEDULER_NO_TASK != s->recv_wakeup_task)
+ {
+ GNUNET_SCHEDULER_cancel (s->recv_wakeup_task);
+ s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK;
+ }
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multipeermap_remove (plugin->sessions,
&s->target,
s));
+ if (NULL != s->client_put)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Session %p/connection %p: disconnecting PUT connection to peer
`%s'\n",
+ s,
+ s->client_put,
+ GNUNET_i2s (&s->target));
+ /* remove curl handle from multi handle */
+ mret = curl_multi_remove_handle (plugin->curl_multi_handle,
+ s->client_put);
+ GNUNET_break (CURLM_OK == mret);
+ curl_easy_cleanup (s->client_put);
+ s->client_put = NULL;
+ }
+ if (NULL != s->client_get)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Session %p/connection %p: disconnecting GET connection to peer
`%s'\n",
+ s, s->client_get,
+ GNUNET_i2s (&s->target));
+ /* remove curl handle from multi handle */
+ mret = curl_multi_remove_handle (plugin->curl_multi_handle,
+ s->client_get);
+ GNUNET_break (CURLM_OK == mret);
+ curl_easy_cleanup (s->client_get);
+ GNUNET_assert (plugin->cur_connections > 0);
+ plugin->cur_connections--;
+ s->client_get = NULL;
+ }
+ GNUNET_STATISTICS_set (plugin->env->stats,
+ HTTP_STAT_STR_CONNECTIONS,
+ plugin->cur_connections,
+ GNUNET_NO);
next = s->msg_head;
while (NULL != (pos = next))
{
@@ -757,86 +798,12 @@
struct Session *s)
{
struct HTTP_Client_Plugin *plugin = cls;
- struct HTTP_Message *msg;
- struct HTTP_Message *t;
- int res = GNUNET_OK;
- CURLMcode mret;
- if (NULL != s->client_put)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Session %p/connection %p: disconnecting PUT connection to peer
`%s'\n",
- s,
- s->client_put,
- GNUNET_i2s (&s->target));
-
- /* remove curl handle from multi handle */
- mret = curl_multi_remove_handle (plugin->curl_multi_handle,
- s->client_put);
- if (mret != CURLM_OK)
- {
- /* clean up easy handle, handle is now invalid and free'd */
- res = GNUNET_SYSERR;
- GNUNET_break (0);
- }
- curl_easy_cleanup (s->client_put);
- s->client_put = NULL;
- }
-
-
- if (s->recv_wakeup_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (s->recv_wakeup_task);
- s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK;
- }
-
- if (NULL != s->client_get)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Session %p/connection %p: disconnecting GET connection to peer
`%s'\n",
- s, s->client_get,
- GNUNET_i2s (&s->target));
- /* remove curl handle from multi handle */
- mret = curl_multi_remove_handle (plugin->curl_multi_handle, s->client_get);
- if (mret != CURLM_OK)
- {
- /* clean up easy handle, handle is now invalid and free'd */
- res = GNUNET_SYSERR;
- GNUNET_break (0);
- }
- curl_easy_cleanup (s->client_get);
- s->client_get = NULL;
- }
-
- msg = s->msg_head;
- while (NULL != msg)
- {
- t = msg->next;
- if (NULL != msg->transmit_cont)
- msg->transmit_cont (msg->transmit_cont_cls, &s->target, GNUNET_SYSERR,
- msg->size, msg->pos + s->overhead);
- s->overhead = 0;
- GNUNET_CONTAINER_DLL_remove (s->msg_head,
- s->msg_tail,
- msg);
- GNUNET_assert (0 < s->msgs_in_queue);
- s->msgs_in_queue--;
- GNUNET_assert (msg->size <= s->bytes_in_queue);
- s->bytes_in_queue -= msg->size;
- GNUNET_free (msg);
- msg = t;
- }
-
- GNUNET_assert (plugin->cur_connections >= 2);
- plugin->cur_connections -= 2;
- GNUNET_STATISTICS_set (plugin->env->stats,
- HTTP_STAT_STR_CONNECTIONS,
- plugin->cur_connections,
- GNUNET_NO);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Session %p: notifying transport about ending session\n",s);
-
- plugin->env->session_end (plugin->env->cls, s->address, s);
+ plugin->env->session_end (plugin->env->cls,
+ s->address,
+ s);
client_delete_session (s);
/* Re-schedule since handles have changed */
@@ -847,7 +814,7 @@
}
client_schedule (plugin, GNUNET_YES);
- return res;
+ return GNUNET_OK;
}
@@ -993,11 +960,13 @@
s->put_disconnect_task = GNUNET_SCHEDULER_NO_TASK;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Session %p/connection %p: will be disconnected due to no activity\n",
- s, s->client_put);
+ s,
+ s->client_put);
s->put_paused = GNUNET_NO;
s->put_tmp_disconnecting = GNUNET_YES;
if (NULL != s->client_put)
- curl_easy_pause (s->client_put, CURLPAUSE_CONT);
+ curl_easy_pause (s->client_put,
+ CURLPAUSE_CONT);
client_schedule (s->plugin, GNUNET_YES);
}
@@ -1284,24 +1253,23 @@
int running;
long http_statuscode;
CURLMcode mret;
+ CURLMsg *msg;
+ int msgs_left;
plugin->client_perform_task = GNUNET_SCHEDULER_NO_TASK;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
-
do
{
running = 0;
mret = curl_multi_perform (plugin->curl_multi_handle, &running);
- CURLMsg *msg;
- int msgs_left;
while ((msg = curl_multi_info_read (plugin->curl_multi_handle,
&msgs_left)))
{
CURL *easy_h = msg->easy_handle;
struct Session *s = NULL;
- char *d = (char *) s;
+ char *d = NULL; /* curl requires 'd' to be a 'char *' */
if (NULL == easy_h)
{
@@ -1316,80 +1284,88 @@
GNUNET_assert (CURLE_OK ==
curl_easy_getinfo (easy_h, CURLINFO_PRIVATE, &d));
s = (struct Session *) d;
- GNUNET_assert (s != NULL);
+ GNUNET_assert (NULL != s);
if (msg->msg == CURLMSG_DONE)
{
- GNUNET_break (CURLE_OK == curl_easy_getinfo (easy_h,
- CURLINFO_RESPONSE_CODE, &http_statuscode));
+ GNUNET_break (CURLE_OK ==
+curl_easy_getinfo (easy_h,
+ CURLINFO_RESPONSE_CODE,
+ &http_statuscode));
if (easy_h == s->client_put)
{
- if ((0 != msg->data.result) || (http_statuscode != 200))
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Session %p/connection %p: PUT connection to `%s' ended
with status %i reason %i: `%s'\n",
- s, msg->easy_handle,
- GNUNET_i2s (&s->target),
- http_statuscode,
- msg->data.result,
- curl_easy_strerror (msg->data.result));
- }
- else
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Session %p/connection %p: PUT connection to `%s' ended
normal\n",
- s, msg->easy_handle,
- GNUNET_i2s (&s->target));
- if (NULL == s->client_get)
- {
- /* Disconnect other transmission direction and tell transport */
- /* FIXME? */
- }
- curl_multi_remove_handle (plugin->curl_multi_handle, easy_h);
- curl_easy_cleanup (easy_h);
- s->put_tmp_disconnecting = GNUNET_NO;
- s->put_tmp_disconnected = GNUNET_YES;
- s->client_put = NULL;
- s->put.easyhandle = NULL;
- s->put.s = NULL;
+ if ((0 != msg->data.result) || (http_statuscode != 200))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Session %p/connection %p: PUT connection to `%s' ended with
status %i reason %i: `%s'\n",
+ s, msg->easy_handle,
+ GNUNET_i2s (&s->target),
+ http_statuscode,
+ msg->data.result,
+ curl_easy_strerror (msg->data.result));
+ }
+ else
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Session %p/connection %p: PUT connection to `%s' ended
normal\n",
+ s, msg->easy_handle,
+ GNUNET_i2s (&s->target));
+ if (NULL == s->client_get)
+ {
+ /* Disconnect other transmission direction and tell transport */
+ /* FIXME? */
+ }
+ curl_multi_remove_handle (plugin->curl_multi_handle,
+ easy_h);
+ curl_easy_cleanup (easy_h);
+ GNUNET_assert (plugin->cur_connections > 0);
+ plugin->cur_connections--;
+ s->put_tmp_disconnecting = GNUNET_NO;
+ s->put_tmp_disconnected = GNUNET_YES;
+ s->client_put = NULL;
+ s->put.easyhandle = NULL;
+ s->put.s = NULL;
- /*
- * Handling a rare case:
- * plugin_send was called during temporary put disconnect,
- * reconnect required after connection was disconnected
- */
- if (GNUNET_YES == s->put_reconnect_required)
+ /*
+ * Handling a rare case:
+ * plugin_send was called during temporary put disconnect,
+ * reconnect required after connection was disconnected
+ */
+ if (GNUNET_YES == s->put_reconnect_required)
+ {
+ s->put_reconnect_required = GNUNET_NO;
+ if (GNUNET_SYSERR == client_connect_put (s))
{
- s->put_reconnect_required = GNUNET_NO;
- if (GNUNET_SYSERR == client_connect_put (s))
- {
- GNUNET_break (s->client_put == NULL);
- GNUNET_break (s->put_tmp_disconnected == GNUNET_NO);
- }
+ GNUNET_break (s->client_put == NULL);
+ GNUNET_break (s->put_tmp_disconnected == GNUNET_NO);
}
+ }
}
if (easy_h == s->client_get)
{
- if ((0 != msg->data.result) || (http_statuscode != 200))
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Session %p/connection %p: GET connection to `%s' ended
with status %i reason %i: `%s'\n",
- s,
- msg->easy_handle,
- GNUNET_i2s (&s->target),
- http_statuscode,
- msg->data.result,
- curl_easy_strerror (msg->data.result));
+ if ((0 != msg->data.result) || (http_statuscode != 200))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Session %p/connection %p: GET connection to `%s' ended with
status %i reason %i: `%s'\n",
+ s,
+ msg->easy_handle,
+ GNUNET_i2s (&s->target),
+ http_statuscode,
+ msg->data.result,
+ curl_easy_strerror (msg->data.result));
- }
- else
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Session %p/connection %p: GET connection to `%s' ended
normal\n",
- s,
- msg->easy_handle,
- GNUNET_i2s (&s->target));
- /* Disconnect other transmission direction and tell transport */
- s->get.easyhandle = NULL;
- s->get.s = NULL;
- http_client_plugin_session_disconnect (plugin, s);
+ }
+ else
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Session %p/connection %p: GET connection to `%s' ended
normal\n",
+ s,
+ msg->easy_handle,
+ GNUNET_i2s (&s->target));
+ /* Disconnect other transmission direction and tell transport */
+ s->get.easyhandle = NULL;
+ s->get.s = NULL;
+ /* FIXME: who calls curl_multi_remove on 'easy_h' now!? */
+ GNUNET_assert (plugin->cur_connections > 0);
+ plugin->cur_connections--;
+ http_client_plugin_session_disconnect (plugin, s);
}
}
}
@@ -1423,6 +1399,7 @@
curl_easy_setopt (s->client_get, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1);
{
struct HttpAddress *ha;
+
ha = (struct HttpAddress *) s->address->address;
if (HTTP_OPTIONS_VERIFY_CERTIFICATE ==
@@ -1444,7 +1421,7 @@
curl_easy_setopt (s->client_get, CURLOPT_REDIR_PROTOCOLS, CURLPROTO_HTTP);
#endif
- if (s->plugin->proxy_hostname != NULL)
+ if (NULL != s->plugin->proxy_hostname)
{
curl_easy_setopt (s->client_get, CURLOPT_PROXY, s->plugin->proxy_hostname);
curl_easy_setopt (s->client_get, CURLOPT_PROXYTYPE, s->plugin->proxytype);
@@ -1478,8 +1455,9 @@
#endif
curl_easy_setopt (s->client_get, CURLOPT_FOLLOWLOCATION, 0);
- mret = curl_multi_add_handle (s->plugin->curl_multi_handle, s->client_get);
- if (mret != CURLM_OK)
+ mret = curl_multi_add_handle (s->plugin->curl_multi_handle,
+ s->client_get);
+ if (CURLM_OK != mret)
{
LOG (GNUNET_ERROR_TYPE_ERROR,
"Session %p : Failed to add GET handle to multihandle: `%s'\n",
@@ -1492,7 +1470,7 @@
GNUNET_break (0);
return GNUNET_SYSERR;
}
-
+ s->plugin->cur_connections++;
return GNUNET_OK;
}
@@ -1576,8 +1554,9 @@
#if CURL_TCP_NODELAY
curl_easy_setopt (s->client_put, CURLOPT_TCP_NODELAY, 1);
#endif
- mret = curl_multi_add_handle (s->plugin->curl_multi_handle, s->client_put);
- if (mret != CURLM_OK)
+ mret = curl_multi_add_handle (s->plugin->curl_multi_handle,
+ s->client_put);
+ if (CURLM_OK != mret)
{
LOG (GNUNET_ERROR_TYPE_ERROR,
"Session %p : Failed to add PUT handle to multihandle: `%s'\n",
@@ -1591,6 +1570,8 @@
return GNUNET_SYSERR;
}
s->put_tmp_disconnected = GNUNET_NO;
+ s->plugin->cur_connections++;
+
return GNUNET_OK;
}
@@ -1632,11 +1613,7 @@
if ((GNUNET_SYSERR == client_connect_get (s)) ||
(GNUNET_SYSERR == client_connect_put (s)))
- {
- plugin->env->session_end (plugin->env->cls, s->address, s);
- client_delete_session (s);
return GNUNET_SYSERR;
- }
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Session %p: connected with connections GET %p and PUT %p\n",
@@ -1644,7 +1621,6 @@
s->client_get,
s->client_put);
/* Perform connect */
- plugin->cur_connections += 2;
GNUNET_STATISTICS_set (plugin->env->stats,
HTTP_STAT_STR_CONNECTIONS,
plugin->cur_connections,
Modified: gnunet/src/transport/plugin_transport_http_server.c
===================================================================
--- gnunet/src/transport/plugin_transport_http_server.c 2014-06-25 08:24:22 UTC
(rev 33841)
+++ gnunet/src/transport/plugin_transport_http_server.c 2014-06-25 14:14:16 UTC
(rev 33842)
@@ -23,6 +23,7 @@
* @brief HTTP/S server transport plugin
* @author Matthias Wachs
* @author David Barksdale
+ * @author Christian Grothoff
*/
#include "platform.h"
#include "gnunet_util_lib.h"
@@ -223,11 +224,31 @@
struct GNUNET_TIME_Absolute next_receive;
/**
+ * Absolute time when this connection will time out.
+ */
+ struct GNUNET_TIME_Absolute timeout;
+
+ /**
* Session timeout task
*/
GNUNET_SCHEDULER_TaskIdentifier timeout_task;
/**
+ * Task to resume MHD handling when receiving is allowed again
+ */
+ GNUNET_SCHEDULER_TaskIdentifier recv_wakeup_task;
+
+ /**
+ * Number of bytes waiting for transmission to this peer.
+ */
+ unsigned long long bytes_in_queue;
+
+ /**
+ * Number of messages waiting for transmission to this peer.
+ */
+ unsigned int msgs_in_queue;
+
+ /**
* Unique HTTP/S connection tag for this connection
*/
uint32_t tag;
@@ -295,6 +316,7 @@
* NAT handle & address management
*/
struct GNUNET_NAT_Handle *nat;
+
/**
* IPv4 addresses DLL head
*/
@@ -453,10 +475,10 @@
memset (&info, 0, sizeof (info));
info.state = state;
info.is_inbound = GNUNET_YES;
- // info.num_msg_pending = session->msgs_in_queue; // FIXME
- // info.num_bytes_pending = session->bytes_in_queue; // FIXME
- // info.receive_delay = session->next_receive; // FIXME
- // info.session_timeout = session->timeout; // FIXME
+ info.num_msg_pending = session->msgs_in_queue;
+ info.num_bytes_pending = session->bytes_in_queue;
+ info.receive_delay = session->next_receive;
+ info.session_timeout = session->timeout;
info.address = session->address;
plugin->sic (plugin->sic_cls,
session,
@@ -465,6 +487,28 @@
/**
+ * Wake up an MHD connection which was suspended
+ *
+ * @param cls the session
+ * @param tc task context
+ */
+static void
+server_wake_up (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct Session *s = cls;
+
+ s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ return;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Session %p: Waking up PUT handle\n",
+ s);
+ MHD_resume_connection (s->server_recv->mhd_conn);
+}
+
+
+/**
* Reschedule the execution of both IPv4 and IPv6 server.
*
* @param plugin the plugin
@@ -488,7 +532,6 @@
{
struct HTTP_Server_Plugin *plugin = s->plugin;
struct HTTP_Message *msg;
- struct HTTP_Message *tmp;
struct ServerConnection *send;
struct ServerConnection *recv;
@@ -496,15 +539,21 @@
{
GNUNET_SCHEDULER_cancel (s->timeout_task);
s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ s->timeout = GNUNET_TIME_UNIT_ZERO_ABS;
}
+ if (GNUNET_SCHEDULER_NO_TASK != s->recv_wakeup_task)
+ {
+ GNUNET_SCHEDULER_cancel (s->recv_wakeup_task);
+ s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK;
+ if (NULL != s->server_recv)
+ MHD_resume_connection (s->server_recv->mhd_conn);
+ }
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multipeermap_remove (plugin->sessions,
&s->target,
s));
- msg = s->msg_head;
- while (NULL != msg)
+ while (NULL != (msg = s->msg_head))
{
- tmp = msg->next;
GNUNET_CONTAINER_DLL_remove (s->msg_head,
s->msg_tail,
msg);
@@ -514,9 +563,14 @@
GNUNET_SYSERR,
msg->size,
msg->pos + msg->overhead);
+ GNUNET_assert (s->msgs_in_queue > 0);
+ s->msgs_in_queue--;
+ GNUNET_assert (s->bytes_in_queue >= msg->size);
+ s->bytes_in_queue -= msg->size;
GNUNET_free (msg);
- msg = tmp;
}
+ GNUNET_assert (0 == s->msgs_in_queue);
+ GNUNET_assert (0 == s->bytes_in_queue);
send = s->server_send;
if (NULL != send)
{
@@ -546,6 +600,9 @@
recv->mhd_daemon,
GNUNET_YES);
}
+ notify_session_monitor (plugin,
+ s,
+ GNUNET_TRANSPORT_SS_DOWN);
if (GNUNET_YES == s->known_to_service)
plugin->env->session_end (plugin->env->cls,
s->address,
@@ -591,8 +648,22 @@
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct Session *s = cls;
+ struct GNUNET_TIME_Relative left;
s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ left = GNUNET_TIME_absolute_get_remaining (s->timeout);
+ if (0 != left.rel_value_us)
+ {
+ /* not actually our turn yet, but let's at least update
+ the monitor, it may think we're about to die ... */
+ notify_session_monitor (s->plugin,
+ s,
+ GNUNET_TRANSPORT_SS_UP);
+ s->timeout_task = GNUNET_SCHEDULER_add_delayed (left,
+ &server_session_timeout,
+ s);
+ return;
+ }
GNUNET_log (TIMEOUT_LOG,
"Session %p was idle for %s, disconnecting\n",
s,
@@ -611,15 +682,7 @@
server_reschedule_session_timeout (struct Session *s)
{
GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
- GNUNET_SCHEDULER_cancel (s->timeout_task);
- s->timeout_task = GNUNET_SCHEDULER_add_delayed (HTTP_SERVER_SESSION_TIMEOUT,
- &server_session_timeout,
- s);
- GNUNET_log (TIMEOUT_LOG,
- "Timeout rescheduled for session %p set to %s\n",
- s,
- GNUNET_STRINGS_relative_time_to_string
(HTTP_SERVER_SESSION_TIMEOUT,
- GNUNET_YES));
+ s->timeout = GNUNET_TIME_relative_to_absolute
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
}
@@ -681,10 +744,17 @@
msg->buf = (char *) &msg[1];
msg->transmit_cont = cont;
msg->transmit_cont_cls = cont_cls;
- memcpy (msg->buf, msgbuf, msgbuf_size);
+ memcpy (msg->buf,
+ msgbuf,
+ msgbuf_size);
GNUNET_CONTAINER_DLL_insert_tail (session->msg_head,
session->msg_tail,
msg);
+ session->msgs_in_queue++;
+ session->bytes_in_queue += msg->size;
+ notify_session_monitor (plugin,
+ session,
+ GNUNET_TRANSPORT_SS_UP);
GNUNET_asprintf (&stat_txt,
"# bytes currently in %s_server buffers",
plugin->protocol);
@@ -1018,6 +1088,15 @@
}
+/**
+ * Function that will be called whenever the transport service wants to
+ * notify the plugin that a session is still active and in use and
+ * therefore the session timeout for this session has to be updated
+ *
+ * @param cls closure
+ * @param peer which peer was the session for
+ * @param session which session is being updated
+ */
static void
http_server_plugin_update_session_timeout (void *cls,
const struct GNUNET_PeerIdentity
*peer,
@@ -1094,8 +1173,8 @@
if (NULL == url)
{
- GNUNET_break (0);
- return GNUNET_SYSERR;
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
}
if (regexec(&plugin->url_regex, url, 4, matches, 0))
@@ -1162,9 +1241,12 @@
GNUNET_i2s_full (target));
/* convert options */
- if (-1 == matches[3].rm_so) {
- *options = 0;
- } else {
+ if (-1 == matches[3].rm_so)
+ {
+ *options = 0;
+ }
+ else
+ {
rc = strtoul (&url[matches[3].rm_so + 1], &options_end, 10);
if (&url[matches[3].rm_eo] != options_end)
{
@@ -1184,9 +1266,10 @@
"URL options > UINT32_MAX\n");
return GNUNET_SYSERR;
}
- (*options) = (uint32_t)rc;
+ (*options) = (uint32_t) rc;
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Found options `%u' in url\n", *options);
+ "Found options `%u' in url\n",
+ *options);
}
return GNUNET_OK;
}
@@ -1344,6 +1427,7 @@
s->ats_address_network_type = ats.value;
s->next_receive = GNUNET_TIME_UNIT_ZERO_ABS;
s->tag = stc.tag;
+ s->timeout = GNUNET_TIME_relative_to_absolute
(HTTP_SERVER_SESSION_TIMEOUT);
s->timeout_task = GNUNET_SCHEDULER_add_delayed
(HTTP_SERVER_SESSION_TIMEOUT,
&server_session_timeout,
s);
@@ -1351,7 +1435,9 @@
&s->target,
s,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-
+ notify_session_monitor (plugin,
+ s,
+ GNUNET_TRANSPORT_SS_HANDSHAKE);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Creating new session %p for peer `%s' connecting from `%s'\n",
s, GNUNET_i2s (&target),
@@ -1361,7 +1447,8 @@
GNUNET_free_non_null (addr);
}
- if ( (_RECEIVE == direction) && (NULL != s->server_recv) )
+ if ( (_RECEIVE == direction) &&
+ (NULL != s->server_recv) )
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Duplicate PUT connection from `%s' tag %u, dismissing new
connection\n",
@@ -1392,13 +1479,15 @@
if (direction == _RECEIVE)
s->server_recv = sc;
- if ((NULL != s->server_send) && (NULL != s->server_recv))
+ if ( (NULL != s->server_send) &&
+ (NULL != s->server_recv) )
{
s->known_to_service = GNUNET_YES;
plugin->env->session_start (NULL, s->address ,s, NULL, 0);
}
- if ((NULL == s->server_recv) || (NULL == s->server_send))
+ if ( (NULL == s->server_recv) ||
+ (NULL == s->server_send) )
{
to = (HTTP_SERVER_NOT_VALIDATED_TIMEOUT.rel_value_us / 1000LL / 1000LL);
MHD_set_connection_option (mhd_connection,
@@ -1425,8 +1514,8 @@
* @param cls current session
* @param pos position in buffer
* @param buf the buffer to write data to
- * @param max max number of bytes available in buffer
- * @return bytes written to buffer
+ * @param max max number of bytes available in @a buf
+ * @return bytes written to @a buf
*/
static ssize_t
server_send_callback (void *cls,
@@ -1461,7 +1550,14 @@
if (NULL != msg->transmit_cont)
msg->transmit_cont (msg->transmit_cont_cls, &s->target, GNUNET_OK,
msg->size, msg->size + msg->overhead);
+ GNUNET_assert (s->msgs_in_queue > 0);
+ s->msgs_in_queue--;
+ GNUNET_assert (s->bytes_in_queue >= msg->size);
+ s->bytes_in_queue -= msg->size;
GNUNET_free (msg);
+ notify_session_monitor (s->plugin,
+ s,
+ GNUNET_TRANSPORT_SS_UP);
}
}
if (0 < bytes_read)
@@ -1469,13 +1565,19 @@
sc->connected = GNUNET_YES;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Sent %u bytes to peer `%s' with session %p \n",
- bytes_read, GNUNET_i2s (&s->target), s);
- GNUNET_asprintf (&stat_txt, "# bytes currently in %s_server buffers",
+ bytes_read,
+ GNUNET_i2s (&s->target),
+ s);
+ GNUNET_asprintf (&stat_txt,
+ "# bytes currently in %s_server buffers",
s->plugin->protocol);
GNUNET_STATISTICS_update (s->plugin->env->stats,
- stat_txt, -bytes_read, GNUNET_NO);
+ stat_txt,
+ - bytes_read,
+ GNUNET_NO);
GNUNET_free (stat_txt);
- GNUNET_asprintf (&stat_txt, "# bytes transmitted via %s_server",
+ GNUNET_asprintf (&stat_txt,
+ "# bytes transmitted via %s_server",
s->plugin->protocol);
GNUNET_STATISTICS_update (s->plugin->env->stats,
stat_txt, bytes_read, GNUNET_NO);
@@ -1484,8 +1586,9 @@
else if ((sc->options & OPTION_LONG_POLL) && sc->connected)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Completing GET response to peer `%s' with session %p \n",
- GNUNET_i2s (&s->target), s);
+ "Completing GET response to peer `%s' with session %p\n",
+ GNUNET_i2s (&s->target),
+ s);
return MHD_CONTENT_READER_END_OF_STREAM;
}
return bytes_read;
@@ -1519,7 +1622,14 @@
if (GNUNET_NO == s->known_to_service)
{
s->known_to_service = GNUNET_YES;
- plugin->env->session_start (NULL, s->address, s, NULL, 0);
+ plugin->env->session_start (NULL,
+ s->address,
+ s,
+ NULL,
+ 0);
+ notify_session_monitor (plugin,
+ s,
+ GNUNET_TRANSPORT_SS_UP);
}
delay = plugin->env->receive (plugin->env->cls,
s->address,
@@ -1554,6 +1664,8 @@
/**
* Add headers to a request indicating that we allow Cross-Origin Resource
* Sharing.
+ *
+ * @param response response object to modify
*/
static void
add_cors_headers(struct MHD_Response *response)
@@ -1579,7 +1691,7 @@
* @param method GET or PUT
* @param version HTTP version
* @param upload_data upload data
- * @param upload_data_size sizeof upload data
+ * @param upload_data_size size of @a upload_data
* @param httpSessionCache the session cache to remember the connection
* @return MHD_YES if connection is accepted, MHD_NO on reject
*/
@@ -1705,6 +1817,8 @@
}
else if ((*upload_data_size > 0) && (sc->connected == GNUNET_YES))
{
+ struct GNUNET_TIME_Relative delay;
+
/* (*upload_data_size > 0) for every segment received */
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Session %p / Connection %p: Peer `%s' PUT on address `%s' received
%u bytes\n",
@@ -1714,9 +1828,8 @@
s->address->address,
s->address->address_length),
*upload_data_size);
- struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();
-
- if ((s->next_receive.abs_value_us <= now.abs_value_us))
+ delay = GNUNET_TIME_absolute_get_remaining (s->next_receive);
+ if (0 == delay.rel_value_us)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"PUT with %u bytes forwarded to MST\n",
@@ -1725,19 +1838,26 @@
{
s->msg_tk = GNUNET_SERVER_mst_create (&server_receive_mst_cb, s);
}
- GNUNET_SERVER_mst_receive (s->msg_tk, s, upload_data,
- *upload_data_size, GNUNET_NO,
GNUNET_NO);
+ GNUNET_SERVER_mst_receive (s->msg_tk, s, upload_data,
+ *upload_data_size, GNUNET_NO, GNUNET_NO);
server_mhd_connection_timeout (plugin, s,
GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value_us / 1000LL / 1000LL);
(*upload_data_size) = 0;
}
else
{
+ /* delay processing */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Session %p / Connection %p: no inbound bandwidth
available! Next read was delayed by %s\n",
s, sc,
- GNUNET_STRINGS_relative_time_to_string
(GNUNET_TIME_absolute_get_duration (s->next_receive),
+ GNUNET_STRINGS_relative_time_to_string (delay,
GNUNET_YES));
+ GNUNET_assert (s->server_recv->mhd_conn == mhd_connection);
+ MHD_suspend_connection (s->server_recv->mhd_conn);
+ if (GNUNET_SCHEDULER_NO_TASK == s->recv_wakeup_task)
+ s->recv_wakeup_task = GNUNET_SCHEDULER_add_delayed (delay,
+ &server_wake_up,
+ s);
}
return MHD_YES;
}
@@ -1832,7 +1952,7 @@
*
* @param cls plugin as closure
* @param addr address of incoming connection
- * @param addr_len address length of incoming connection
+ * @param addr_len number of bytes in @a addr
* @return MHD_YES if connection is accepted, MHD_NO if connection is rejected
*/
static int
@@ -1860,6 +1980,14 @@
}
+/**
+ * Log function called by MHD.
+ *
+ * @param arg NULL
+ * @param fmt format string
+ * @param ap arguments for the format string (va_start() and va_end()
+ * will be called by MHD)
+ */
static void
server_log (void *arg,
const char *fmt,
@@ -1867,8 +1995,10 @@
{
char text[1024];
- vsnprintf (text, sizeof (text), fmt, ap);
- va_end (ap);
+ vsnprintf (text,
+ sizeof (text),
+ fmt,
+ ap);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Server: %s\n",
text);
@@ -1896,7 +2026,7 @@
gn_file =
GNUNET_DISK_file_open (file, GNUNET_DISK_OPEN_READ,
GNUNET_DISK_PERM_USER_READ);
- if (gn_file == NULL)
+ if (NULL == gn_file)
{
GNUNET_free (text);
return NULL;
@@ -2114,7 +2244,9 @@
plugin->name, plugin->port);
}
else
- server_reschedule (plugin, plugin->server_v4, GNUNET_NO);
+ server_reschedule (plugin,
+ plugin->server_v4,
+ GNUNET_NO);
}
@@ -2186,44 +2318,6 @@
}
-static void
-server_stop (struct HTTP_Server_Plugin *plugin)
-{
- if (plugin->server_v4 != NULL)
- {
- MHD_stop_daemon (plugin->server_v4);
- plugin->server_v4 = NULL;
- }
- if ( plugin->server_v6 != NULL)
- {
- MHD_stop_daemon (plugin->server_v6);
- plugin->server_v6 = NULL;
- }
-
-
- if (plugin->server_v4_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (plugin->server_v4_task);
- plugin->server_v4_task = GNUNET_SCHEDULER_NO_TASK;
- }
-
- if (plugin->server_v6_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (plugin->server_v6_task);
- plugin->server_v6_task = GNUNET_SCHEDULER_NO_TASK;
- }
-#if BUILD_HTTPS
- GNUNET_free_non_null (plugin->crypto_init);
- GNUNET_free_non_null (plugin->cert);
- GNUNET_free_non_null (plugin->key);
-#endif
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s server component stopped\n",
- plugin->name);
-}
-
-
/**
* Add an address to the server's set of addresses and notify transport
*
@@ -2456,8 +2550,7 @@
if (port > 65535)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- _
- ("Require valid port number for service in
configuration!\n"));
+ _("Require valid port number for service in
configuration!\n"));
return GNUNET_SYSERR;
}
}
@@ -2469,7 +2562,8 @@
}
- if (GNUNET_CONFIGURATION_have_value (cfg, service_name, "BINDTO"))
+ if (GNUNET_CONFIGURATION_have_value (cfg, service_name,
+ "BINDTO"))
{
GNUNET_break (GNUNET_OK ==
GNUNET_CONFIGURATION_get_value_string (cfg, service_name,
@@ -2478,7 +2572,7 @@
else
hostname = NULL;
- if (hostname != NULL)
+ if (NULL != hostname)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Resolving `%s' since that is where `%s' will bind to.\n",
@@ -2487,10 +2581,12 @@
if (disablev6)
hints.ai_family = AF_INET;
if ((0 != (ret = getaddrinfo (hostname, NULL, &hints, &res))) ||
- (res == NULL))
+ (NULL == res))
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Failed to resolve `%s': %s\n"),
- hostname, gai_strerror (ret));
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Failed to resolve `%s': %s\n"),
+ hostname,
+ gai_strerror (ret));
GNUNET_free (hostname);
return GNUNET_SYSERR;
}
@@ -2522,9 +2618,9 @@
next = pos->ai_next;
if ((disablev6) && (pos->ai_family == AF_INET6))
continue;
- if ((pos->ai_protocol != IPPROTO_TCP) && (pos->ai_protocol != 0))
+ if ((pos->ai_protocol != IPPROTO_TCP) && (0 != pos->ai_protocol))
continue; /* not TCP */
- if ((pos->ai_socktype != SOCK_STREAM) && (pos->ai_socktype != 0))
+ if ((pos->ai_socktype != SOCK_STREAM) && (0 != pos->ai_socktype))
continue; /* huh? */
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Service will bind to `%s'\n",
@@ -2638,7 +2734,7 @@
while (res > 0)
{
res--;
- GNUNET_assert (addrs[res] != NULL);
+ GNUNET_assert (NULL != addrs[res]);
GNUNET_free (addrs[res]);
}
GNUNET_free_non_null (addrs);
@@ -2733,7 +2829,10 @@
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
- GNUNET_asprintf(&url, "%s://%s", plugin->protocol,
plugin->external_hostname);
+ GNUNET_asprintf(&url,
+ "%s://%s",
+ plugin->protocol,
+ plugin->external_hostname);
urlen = strlen (url) + 1;
ext_addr = GNUNET_malloc (sizeof (struct HttpAddress) + urlen);
@@ -3048,7 +3147,32 @@
/* Stop to report addresses to transport service */
server_stop_report_addresses (plugin);
- server_stop (plugin);
+ if (NULL != plugin->server_v4)
+ {
+ MHD_stop_daemon (plugin->server_v4);
+ plugin->server_v4 = NULL;
+ }
+ if (NULL != plugin->server_v6)
+ {
+ MHD_stop_daemon (plugin->server_v6);
+ plugin->server_v6 = NULL;
+ }
+ if (GNUNET_SCHEDULER_NO_TASK != plugin->server_v4_task)
+ {
+ GNUNET_SCHEDULER_cancel (plugin->server_v4_task);
+ plugin->server_v4_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+
+ if (GNUNET_SCHEDULER_NO_TASK != plugin->server_v6_task)
+ {
+ GNUNET_SCHEDULER_cancel (plugin->server_v6_task);
+ plugin->server_v6_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+#if BUILD_HTTPS
+ GNUNET_free_non_null (plugin->crypto_init);
+ GNUNET_free_non_null (plugin->cert);
+ GNUNET_free_non_null (plugin->key);
+#endif
GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
&destroy_session_cb,
plugin);
@@ -3090,7 +3214,9 @@
const void *addr,
size_t addrlen)
{
- return http_common_plugin_address_to_string (PLUGIN_NAME, addr, addrlen);
+ return http_common_plugin_address_to_string (PLUGIN_NAME,
+ addr,
+ addrlen);
}
@@ -3099,7 +3225,7 @@
*
* @param cls closure ('struct HTTP_Server_Plugin*')
* @param session the session
- * @return the network type in HBO or GNUNET_SYSERR
+ * @return the network type in HBO or #GNUNET_SYSERR
*/
static enum GNUNET_ATS_Network_Type
http_server_get_network (void *cls,
@@ -3110,6 +3236,38 @@
/**
+ * Function that will be called whenever the transport service wants to
+ * notify the plugin that the inbound quota changed and that the plugin
+ * should update it's delay for the next receive value
+ *
+ * @param cls closure
+ * @param peer which peer was the session for
+ * @param session which session is being updated
+ * @param delay new delay to use for receiving
+ */
+static void
+http_server_plugin_update_inbound_delay (void *cls,
+ const struct GNUNET_PeerIdentity
*peer,
+ struct Session *s,
+ struct GNUNET_TIME_Relative delay)
+{
+ s->next_receive = GNUNET_TIME_relative_to_absolute (delay);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "New inbound delay %s\n",
+ GNUNET_STRINGS_relative_time_to_string (delay,
+ GNUNET_NO));
+ if (GNUNET_SCHEDULER_NO_TASK != s->recv_wakeup_task)
+ {
+ GNUNET_SCHEDULER_cancel (s->recv_wakeup_task);
+ s->recv_wakeup_task
+ = GNUNET_SCHEDULER_add_delayed (delay,
+ &server_wake_up,
+ s);
+ }
+}
+
+
+/**
* Return information about the given session to the
* monitor callback.
*
@@ -3207,7 +3365,7 @@
api->address_pretty_printer = &http_common_plugin_address_pretty_printer;
api->get_network = &http_server_get_network;
api->update_session_timeout = &http_server_plugin_update_session_timeout;
- // api->update_inbound_delay = &http_server_plugin_update_inbound_delay; //
FIXME: implement/support!
+ api->update_inbound_delay = &http_server_plugin_update_inbound_delay;
api->setup_monitor = &http_server_plugin_setup_monitor;
#if BUILD_HTTPS
plugin->name = "transport-https_server";
@@ -3218,7 +3376,9 @@
#endif
/* Compile URL regex */
- if (regcomp(&plugin->url_regex, URL_REGEX, REG_EXTENDED))
+ if (regcomp(&plugin->url_regex,
+ URL_REGEX,
+ REG_EXTENDED))
{
LOG (GNUNET_ERROR_TYPE_ERROR,
_("Unable to compile URL regex\n"));
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r33842 - gnunet/src/transport,
gnunet <=