gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r37858 - in libmicrohttpd: . src/include src/microhttpd


From: gnunet
Subject: [GNUnet-SVN] r37858 - in libmicrohttpd: . src/include src/microhttpd
Date: Sat, 3 Sep 2016 11:56:30 +0200

Author: grothoff
Date: 2016-09-03 11:56:30 +0200 (Sat, 03 Sep 2016)
New Revision: 37858

Added:
   libmicrohttpd/src/microhttpd/mhd_sem.c
Modified:
   libmicrohttpd/ChangeLog
   libmicrohttpd/src/include/microhttpd.h
   libmicrohttpd/src/microhttpd/Makefile.am
   libmicrohttpd/src/microhttpd/connection.c
   libmicrohttpd/src/microhttpd/daemon.c
   libmicrohttpd/src/microhttpd/internal.h
   libmicrohttpd/src/microhttpd/mhd_locks.h
   libmicrohttpd/src/microhttpd/response.c
Log:
implementing 'Connection: upgrade' for thread-per-connection modes, but untested

Modified: libmicrohttpd/ChangeLog
===================================================================
--- libmicrohttpd/ChangeLog     2016-09-03 08:16:26 UTC (rev 37857)
+++ libmicrohttpd/ChangeLog     2016-09-03 09:56:30 UTC (rev 37858)
@@ -1,3 +1,7 @@
+Sat Sep  3 11:56:20 CEST 2016
+       Adding logic for handling HTTP "Upgrade" in thread-per-connection
+       mode. Also still untested. -CG
+
 Sat Aug 27 21:01:43 CEST 2016
        Adding a few extra safety checks around HTTP "Upgrade"
        (against wrong uses of API), and a testcase. -CG

Modified: libmicrohttpd/src/include/microhttpd.h
===================================================================
--- libmicrohttpd/src/include/microhttpd.h      2016-09-03 08:16:26 UTC (rev 
37857)
+++ libmicrohttpd/src/include/microhttpd.h      2016-09-03 09:56:30 UTC (rev 
37858)
@@ -1,6 +1,6 @@
 /*
      This file is part of libmicrohttpd
-     Copyright (C) 2006-2015 Christian Grothoff (and other contributing 
authors)
+     Copyright (C) 2006-2016 Christian Grothoff (and other contributing 
authors)
 
      This library is free software; you can redistribute it and/or
      modify it under the terms of the GNU Lesser General Public
@@ -2261,8 +2261,20 @@
    * NOTE: it is unclear if we want to have this in the
    * "final" API, this is just an idea right now.
    */
-  MHD_UPGRADE_ACTION_CORK
+  MHD_UPGRADE_ACTION_CORK,
 
+  /**
+   * Try to "flush" our write buffer (to the network), returning
+   * #MHD_YES on success (buffer is empty) and #MHD_NO on failure
+   * (unsent bytes remain in buffers).  This option is useful if
+   * the application wants to make sure that all data has been sent,
+   * which may be a good idea before closing the socket.
+   *
+   * NOTE: it is unclear if we want to have this in the
+   * "final" API, this is just an idea right now.
+   */
+  MHD_UPGRADE_ACTION_FLUSH
+
 };
 
 

Modified: libmicrohttpd/src/microhttpd/Makefile.am
===================================================================
--- libmicrohttpd/src/microhttpd/Makefile.am    2016-09-03 08:16:26 UTC (rev 
37857)
+++ libmicrohttpd/src/microhttpd/Makefile.am    2016-09-03 09:56:30 UTC (rev 
37858)
@@ -63,7 +63,7 @@
   sysfdsetsize.c sysfdsetsize.h \
   mhd_str.c mhd_str.h \
   mhd_threads.c mhd_threads.h \
-  mhd_locks.h \
+  mhd_locks.h mhd_sem.c \
   mhd_sockets.c mhd_sockets.h \
   mhd_itc.c mhd_itc.h \
   mhd_compat.c mhd_compat.h \

Modified: libmicrohttpd/src/microhttpd/connection.c
===================================================================
--- libmicrohttpd/src/microhttpd/connection.c   2016-09-03 08:16:26 UTC (rev 
37857)
+++ libmicrohttpd/src/microhttpd/connection.c   2016-09-03 09:56:30 UTC (rev 
37858)
@@ -413,7 +413,8 @@
  */
 const char *
 MHD_lookup_connection_value (struct MHD_Connection *connection,
-                             enum MHD_ValueKind kind, const char *key)
+                             enum MHD_ValueKind kind,
+                             const char *key)
 {
   struct MHD_HTTP_Header *pos;
 
@@ -2772,7 +2773,6 @@
               /* Buffering for flushable socket was already enabled*/
               if (MHD_NO == socket_flush_possible (connection))
                 socket_start_no_buffering (connection);
-
               break;
             }
           /* not ready, no socket action */

Modified: libmicrohttpd/src/microhttpd/daemon.c
===================================================================
--- libmicrohttpd/src/microhttpd/daemon.c       2016-09-03 08:16:26 UTC (rev 
37857)
+++ libmicrohttpd/src/microhttpd/daemon.c       2016-09-03 09:56:30 UTC (rev 
37858)
@@ -636,6 +636,79 @@
 
 
 /**
+ * Obtain the select() file descriptor sets for the
+ * given @a urh.
+ *
+ * @param urh upgrade handle to wait for
+ * @param[out] rs read set to initialize
+ * @param[out] ws write set to initialize
+ * @param[out] max_fd maximum FD to update
+ * @param fd_setsize value of FD_SETSIZE
+ * @return #MHD_YES on success, #MHD_NO on error
+ */
+static int
+urh_to_fdset (struct MHD_UpgradeResponseHandle *urh,
+              fd_set *rs,
+              fd_set *ws,
+              MHD_socket *max_fd,
+              unsigned int fd_setsize)
+{
+  if ( (0 == (MHD_EPOLL_STATE_READ_READY & urh->mhd.celi)) &&
+       (! MHD_add_to_fd_set_ (urh->mhd.socket,
+                              rs,
+                              max_fd,
+                              fd_setsize)) )
+    return MHD_NO;
+  if ( (0 != (MHD_EPOLL_STATE_WRITE_READY & urh->mhd.celi)) &&
+       (! MHD_add_to_fd_set_ (urh->mhd.socket,
+                              ws,
+                              max_fd,
+                              fd_setsize)) )
+    return MHD_NO;
+  if ( (0 != (MHD_EPOLL_STATE_READ_READY & urh->app.celi)) &&
+       (! MHD_add_to_fd_set_ (urh->connection->socket_fd,
+                              rs,
+                              max_fd,
+                              fd_setsize)) )
+    return MHD_NO;
+  if ( (0 != (MHD_EPOLL_STATE_WRITE_READY & urh->app.celi)) &&
+       (! MHD_add_to_fd_set_ (urh->connection->socket_fd,
+                              ws,
+                              max_fd,
+                              fd_setsize)) )
+    return MHD_NO;
+  return MHD_YES;
+}
+
+
+/**
+ * Update the @a urh based on the ready FDs in the @a rs and @a ws.
+ *
+ * @param urh upgrade handle to update
+ * @param rs read result from select()
+ * @param ws write result from select()
+ */
+static void
+urh_from_fdset (struct MHD_UpgradeResponseHandle *urh,
+                const fd_set *rs,
+                const fd_set *ws)
+{
+  if (FD_ISSET (urh->connection->socket_fd,
+                rs))
+    urh->app.celi |= MHD_EPOLL_STATE_READ_READY;
+  if (FD_ISSET (urh->connection->socket_fd,
+                ws))
+    urh->app.celi |= MHD_EPOLL_STATE_WRITE_READY;
+  if (FD_ISSET (urh->mhd.socket,
+                rs))
+    urh->mhd.celi |= MHD_EPOLL_STATE_READ_READY;
+  if (FD_ISSET (urh->mhd.socket,
+                ws))
+    urh->mhd.celi |= MHD_EPOLL_STATE_WRITE_READY;
+}
+
+
+/**
  * Obtain the `select()` sets for this daemon.
  * Daemon's FDs will be added to fd_sets. To get only
  * daemon FDs in fd_sets, call FD_ZERO for each fd_set
@@ -733,30 +806,13 @@
     }
   for (urh = daemon->urh_head; NULL != urh; urh = urh->next)
     {
-      if ( (0 == (MHD_EPOLL_STATE_READ_READY & urh->mhd.celi)) &&
-           (! MHD_add_to_fd_set_ (urh->mhd.socket,
-                                  read_fd_set,
-                                  max_fd,
-                                  fd_setsize)) )
+      if (MHD_NO ==
+          urh_to_fdset (urh,
+                        read_fd_set,
+                        write_fd_set,
+                        max_fd,
+                        fd_setsize))
         result = MHD_NO;
-      if ( (0 != (MHD_EPOLL_STATE_WRITE_READY & urh->mhd.celi)) &&
-           (! MHD_add_to_fd_set_ (urh->mhd.socket,
-                                  write_fd_set,
-                                  max_fd,
-                                  fd_setsize)) )
-        result = MHD_NO;
-      if ( (0 != (MHD_EPOLL_STATE_READ_READY & urh->app.celi)) &&
-           (! MHD_add_to_fd_set_ (urh->connection->socket_fd,
-                                  read_fd_set,
-                                  max_fd,
-                                  fd_setsize)) )
-        result = MHD_NO;
-      if ( (0 != (MHD_EPOLL_STATE_WRITE_READY & urh->app.celi)) &&
-           (! MHD_add_to_fd_set_ (urh->connection->socket_fd,
-                                  write_fd_set,
-                                  max_fd,
-                                  fd_setsize)) )
-        result = MHD_NO;
     }
 #if DEBUG_CONNECT
 #ifdef HAVE_MESSAGES
@@ -825,7 +881,252 @@
 }
 
 
+#if HTTPS_SUPPORT
 /**
+ * Performs bi-directional forwarding on upgraded HTTPS connections
+ * based on the readyness state stored in the @a urh handle.
+ *
+ * @param urh handle to process
+ */
+static void
+process_urh (struct MHD_UpgradeResponseHandle *urh)
+{
+  /* handle reading from TLS client and writing to application */
+  if ( (0 != (MHD_EPOLL_STATE_READ_READY & urh->app.celi)) &&
+       (urh->in_buffer_off < urh->in_buffer_size) )
+    {
+      ssize_t res;
+
+      res = gnutls_record_recv (urh->connection->tls_session,
+                                &urh->in_buffer[urh->in_buffer_off],
+                                urh->in_buffer_size - urh->in_buffer_off);
+      if ( (GNUTLS_E_AGAIN == res) ||
+           (GNUTLS_E_INTERRUPTED == res) )
+        {
+          urh->app.celi &= ~MHD_EPOLL_STATE_READ_READY;
+        }
+      else if (res > 0)
+        {
+          urh->in_buffer_off += res;
+        }
+    }
+  if ( (0 != (MHD_EPOLL_STATE_WRITE_READY & urh->mhd.celi)) &&
+       (urh->in_buffer_off > 0) )
+    {
+      size_t res;
+
+      res = write (urh->mhd.socket,
+                   urh->in_buffer,
+                   urh->in_buffer_off);
+      if (-1 == res)
+        {
+          /* FIXME: differenciate by errno? */
+          urh->mhd.celi &= ~MHD_EPOLL_STATE_WRITE_READY;
+        }
+      else
+        {
+          if (urh->in_buffer_off != res)
+            {
+              memmove (urh->in_buffer,
+                       &urh->in_buffer[res],
+                       urh->in_buffer_off - res);
+              urh->in_buffer_off -= res;
+            }
+          else
+            {
+              urh->in_buffer_off = 0;
+            }
+        }
+    }
+
+  /* handle reading from application and writing to HTTPS client */
+  if ( (0 != (MHD_EPOLL_STATE_READ_READY & urh->mhd.celi)) &&
+       (urh->out_buffer_off < urh->out_buffer_size) )
+    {
+      size_t res;
+
+      res = read (urh->mhd.socket,
+                  &urh->out_buffer[urh->out_buffer_off],
+                  urh->out_buffer_size - urh->out_buffer_off);
+      if (-1 == res)
+        {
+          /* FIXME: differenciate by errno? */
+          urh->mhd.celi &= ~MHD_EPOLL_STATE_READ_READY;
+        }
+      else
+        {
+          urh->out_buffer_off += res;
+        }
+    }
+  if ( (0 != (MHD_EPOLL_STATE_WRITE_READY & urh->app.celi)) &&
+       (urh->out_buffer_off > 0) )
+    {
+      ssize_t res;
+
+      res = gnutls_record_send (urh->connection->tls_session,
+                                urh->out_buffer,
+                                urh->out_buffer_off);
+      if ( (GNUTLS_E_AGAIN == res) ||
+           (GNUTLS_E_INTERRUPTED == res) )
+        {
+          urh->app.celi &= ~MHD_EPOLL_STATE_WRITE_READY;
+        }
+      else if (res > 0)
+        {
+          if (urh->out_buffer_off != res)
+            {
+              memmove (urh->out_buffer,
+                       &urh->out_buffer[res],
+                       urh->out_buffer_off - res);
+              urh->out_buffer_off -= res;
+            }
+          else
+            {
+              urh->out_buffer_off = 0;
+            }
+        }
+    }
+}
+#endif
+
+
+/**
+ * Main function of the thread that handles an individual connection
+ * after it was "upgraded" when #MHD_USE_THREAD_PER_CONNECTION is set.
+ *
+ * @param con the connection this thread will handle
+ */
+static void
+thread_main_connection_upgrade (struct MHD_Connection *con)
+{
+  struct MHD_Daemon *daemon = con->daemon;
+
+  if (0 == (daemon->options & MHD_USE_SSL))
+    {
+      /* Here, we need to block until the application
+         signals us that it is done with the socket */
+      MHD_semaphore_down (con->upgrade_sem);
+      MHD_semaphore_destroy (con->upgrade_sem);
+      con->upgrade_sem = NULL;
+      return;
+    }
+#if HTTPS_SUPPORT
+  {
+    struct MHD_UpgradeResponseHandle *urh = con->urh;
+
+    /* Here, we need to bi-directionally forward
+       until the application tells us that it is done
+       with the socket; */
+    if (0 == (daemon->options & MHD_USE_POLL))
+      {
+        while (MHD_CONNECTION_UPGRADE == con->state)
+          {
+            /* use select */
+            fd_set rs;
+            fd_set ws;
+            MHD_socket max_fd;
+            int num_ready;
+            int result;
+
+            FD_ZERO (&rs);
+            FD_ZERO (&ws);
+            max_fd = MHD_INVALID_SOCKET;
+            result = urh_to_fdset (urh,
+                                   &rs,
+                                   &ws,
+                                   &max_fd,
+                                   FD_SETSIZE);
+            if (MHD_NO == result)
+              {
+#ifdef HAVE_MESSAGES
+                MHD_DLOG (con->daemon,
+                          "Error preparing select\n");
+#endif
+                break;
+              }
+            num_ready = MHD_SYS_select_ (max_fd + 1,
+                                         &rs,
+                                         &ws,
+                                         NULL,
+                                         NULL);
+            if (num_ready < 0)
+              {
+                const int err = MHD_socket_get_error_();
+
+                if (MHD_SCKT_ERR_IS_EINTR_(err))
+                  continue;
+#ifdef HAVE_MESSAGES
+                MHD_DLOG (con->daemon,
+                          "Error during select (%d): `%s'\n",
+                          err,
+                          MHD_socket_strerr_ (err));
+#endif
+                break;
+              }
+            urh_from_fdset (urh,
+                            &rs,
+                            &ws);
+            process_urh (urh);
+          }
+      }
+#ifdef HAVE_POLL
+    else
+      {
+        /* use poll() */
+        struct pollfd p[2];
+        const unsigned int timeout = UINT_MAX;
+
+        p[0].fd = urh->connection->socket_fd;
+        p[1].fd = urh->mhd.socket;
+        while (MHD_CONNECTION_UPGRADE == con->state)
+          {
+            if (0 == (MHD_EPOLL_STATE_READ_READY & urh->app.celi))
+              p[0].events |= POLLIN;
+            if (0 == (MHD_EPOLL_STATE_WRITE_READY & urh->app.celi))
+              p[0].events |= POLLOUT;
+            if (0 == (MHD_EPOLL_STATE_READ_READY & urh->mhd.celi))
+              p[1].events |= POLLIN;
+            if (0 == (MHD_EPOLL_STATE_WRITE_READY & urh->mhd.celi))
+              p[1].events |= POLLOUT;
+
+            if (MHD_sys_poll_ (p,
+                               2,
+                               timeout) < 0)
+              {
+                const int err = MHD_socket_get_error_ ();
+
+                if (MHD_SCKT_ERR_IS_EINTR_ (err))
+                  continue;
+#ifdef HAVE_MESSAGES
+                MHD_DLOG (con->daemon,
+                          "Error during poll: `%s'\n",
+                          MHD_socket_strerr_ (err));
+#endif
+                break;
+              }
+            if (0 != (p[0].revents & POLLIN))
+              urh->app.celi |= MHD_EPOLL_STATE_READ_READY;
+            if (0 != (p[0].revents & POLLOUT))
+              urh->app.celi |= MHD_EPOLL_STATE_WRITE_READY;
+            if (0 != (p[1].revents & POLLIN))
+              urh->mhd.celi |= MHD_EPOLL_STATE_READ_READY;
+            if (0 != (p[1].revents & POLLOUT))
+              urh->mhd.celi |= MHD_EPOLL_STATE_WRITE_READY;
+            process_urh (urh);
+          }
+      }
+    /* end POLL */
+#endif
+    /* end HTTPS */
+#else
+    /* HTTPS option set, but compiled without HTTPS */
+  MHD_PANIC ("This should not be possible\n");
+#endif
+  }
+}
+
+
+/**
  * Main function of the thread that handles an individual
  * connection when #MHD_USE_THREAD_PER_CONNECTION is set.
  *
@@ -836,6 +1137,7 @@
 thread_main_handle_connection (void *data)
 {
   struct MHD_Connection *con = data;
+  struct MHD_Daemon *daemon = con->daemon;
   int num_ready;
   fd_set rs;
   fd_set ws;
@@ -844,7 +1146,7 @@
   struct timeval *tvp;
   time_t now;
 #if WINDOWS
-  MHD_pipe spipe = con->daemon->wpipe[0];
+  MHD_pipe spipe = daemon->wpipe[0];
 #ifdef HAVE_POLL
   int extra_slot;
 #endif /* HAVE_POLL */
@@ -855,11 +1157,13 @@
 #ifdef HAVE_POLL
   struct pollfd p[1 + EXTRA_SLOTS];
 #endif
+#undef EXTRA_SLOTS
 
-  while ( (MHD_YES != con->daemon->shutdown) &&
+  while ( (MHD_YES != daemon->shutdown) &&
          (MHD_CONNECTION_CLOSED != con->state) )
     {
-      unsigned const int timeout = con->daemon->connection_timeout;
+      const unsigned int timeout = daemon->connection_timeout;
+
       tvp = NULL;
 #if HTTPS_SUPPORT
       if (MHD_YES == con->tls_read_ready)
@@ -870,7 +1174,8 @@
          tvp = &tv;
        }
 #endif
-      if (NULL == tvp && timeout > 0)
+      if ( (NULL == tvp) &&
+           (timeout > 0) )
        {
          now = MHD_monotonic_sec_counter();
          if (now - con->last_activity > timeout)
@@ -884,16 +1189,17 @@
               if (seconds_left > TIMEVAL_TV_SEC_MAX)
                 tv.tv_sec = TIMEVAL_TV_SEC_MAX;
               else
-                tv.tv_sec = (_MHD_TIMEVAL_TV_SEC_TYPE)seconds_left;
+                tv.tv_sec = (_MHD_TIMEVAL_TV_SEC_TYPE) seconds_left;
 #endif /* _WIN32 */
             }
          tv.tv_usec = 0;
          tvp = &tv;
        }
-      if (0 == (con->daemon->options & MHD_USE_POLL))
+      if (0 == (daemon->options & MHD_USE_POLL))
        {
          /* use select */
          int err_state = 0;
+
          FD_ZERO (&rs);
          FD_ZERO (&ws);
          maxsock = MHD_INVALID_SOCKET;
@@ -900,19 +1206,31 @@
          switch (con->event_loop_info)
            {
            case MHD_EVENT_LOOP_INFO_READ:
-             if (!MHD_add_to_fd_set_ (con->socket_fd, &rs, &maxsock, 
FD_SETSIZE))
+             if (! MHD_add_to_fd_set_ (con->socket_fd,
+                                        &rs,
+                                        &maxsock,
+                                        FD_SETSIZE))
                err_state = 1;
              break;
            case MHD_EVENT_LOOP_INFO_WRITE:
-             if (!MHD_add_to_fd_set_ (con->socket_fd, &ws, &maxsock, 
FD_SETSIZE))
+             if (! MHD_add_to_fd_set_ (con->socket_fd,
+                                        &ws,
+                                        &maxsock,
+                                        FD_SETSIZE))
                 err_state = 1;
              if ( (con->read_buffer_size > con->read_buffer_offset) &&
-                   (!MHD_add_to_fd_set_ (con->socket_fd, &rs, &maxsock, 
FD_SETSIZE)) )
+                   (! MHD_add_to_fd_set_ (con->socket_fd,
+                                          &rs,
+                                          &maxsock,
+                                          FD_SETSIZE)) )
                err_state = 1;
              break;
            case MHD_EVENT_LOOP_INFO_BLOCK:
              if ( (con->read_buffer_size > con->read_buffer_offset) &&
-                   (!MHD_add_to_fd_set_ (con->socket_fd, &rs, &maxsock, 
FD_SETSIZE)) )
+                   (! MHD_add_to_fd_set_ (con->socket_fd,
+                                          &rs,
+                                          &maxsock,
+                                          FD_SETSIZE)) )
                err_state = 1;
              tv.tv_sec = 0;
              tv.tv_usec = 0;
@@ -925,7 +1243,10 @@
 #if WINDOWS
           if (MHD_INVALID_PIPE_ != spipe)
             {
-              if (!MHD_add_to_fd_set_ (spipe, &rs, &maxsock, FD_SETSIZE))
+              if (! MHD_add_to_fd_set_ (spipe,
+                                        &rs,
+                                        &maxsock,
+                                        FD_SETSIZE))
                 err_state = 1;
             }
 #endif
@@ -938,10 +1259,15 @@
                 goto exit;
               }
 
-         num_ready = MHD_SYS_select_ (maxsock + 1, &rs, &ws, NULL, tvp);
+         num_ready = MHD_SYS_select_ (maxsock + 1,
+                                       &rs,
+                                       &ws,
+                                       NULL,
+                                       tvp);
          if (num_ready < 0)
            {
              const int err = MHD_socket_get_error_();
+
              if (MHD_SCKT_ERR_IS_EINTR_(err))
                continue;
 #ifdef HAVE_MESSAGES
@@ -960,8 +1286,10 @@
 #endif
           if (MHD_NO ==
               call_handlers (con,
-                             FD_ISSET (con->socket_fd, &rs),
-                             FD_ISSET (con->socket_fd, &ws),
+                             FD_ISSET (con->socket_fd,
+                                       &rs),
+                             FD_ISSET (con->socket_fd,
+                                       &ws),
                              MHD_NO))
             goto exit;
        }
@@ -969,7 +1297,9 @@
       else
        {
          /* use poll */
-         memset (&p, 0, sizeof (p));
+         memset (&p,
+                  0,
+                  sizeof (p));
          p[0].fd = con->socket_fd;
          switch (con->event_loop_info)
            {
@@ -1004,11 +1334,11 @@
 #endif
          if (MHD_sys_poll_ (p,
 #if WINDOWS
-                    1 + extra_slot,
+                             1 + extra_slot,
 #else
-                    1,
+                             1,
 #endif
-                   (NULL == tvp) ? -1 : tv.tv_sec * 1000) < 0)
+                             (NULL == tvp) ? -1 : tv.tv_sec * 1000) < 0)
            {
              if (MHD_SCKT_LAST_ERR_IS_(MHD_SCKT_EINTR_))
                continue;
@@ -1033,6 +1363,11 @@
             goto exit;
        }
 #endif
+      if (MHD_CONNECTION_UPGRADE == con->state)
+        {
+          thread_main_connection_upgrade (con);
+          break;
+        }
     }
   if (MHD_CONNECTION_IN_CLEANUP != con->state)
     {
@@ -1054,14 +1389,15 @@
       con->response = NULL;
     }
 
-  if (NULL != con->daemon->notify_connection)
-    con->daemon->notify_connection (con->daemon->notify_connection_cls,
+  if (NULL != daemon->notify_connection)
+    con->daemon->notify_connection (daemon->notify_connection_cls,
                                     con,
                                     &con->socket_context,
                                     MHD_CONNECTION_NOTIFY_CLOSED);
   if (MHD_INVALID_SOCKET != con->socket_fd)
     {
-      shutdown (con->socket_fd, SHUT_WR);
+      shutdown (con->socket_fd,
+                SHUT_WR);
       if (0 != MHD_socket_close_ (con->socket_fd))
         MHD_PANIC ("close failed\n");
       con->socket_fd = MHD_INVALID_SOCKET;
@@ -1433,7 +1769,7 @@
     {
       /* in turbo mode, we assume that non-blocking was already set
         by 'accept4' or whoever calls 'MHD_add_connection' */
-      if (!MHD_socket_nonblocking_ (connection->socket_fd))
+      if (! MHD_socket_nonblocking_ (connection->socket_fd))
         {
 #ifdef HAVE_MESSAGES
           MHD_DLOG (connection->daemon,
@@ -2168,116 +2504,7 @@
 }
 
 
-#if HTTPS_SUPPORT
 /**
- * Performs bi-directional forwarding on upgraded HTTPS connections
- * based on the readyness state stored in the @a urh handle.
- *
- * @param urh handle to process
- */
-static void
-process_urh (struct MHD_UpgradeResponseHandle *urh)
-{
-  /* handle reading from TLS client and writing to application */
-  if ( (0 != (MHD_EPOLL_STATE_READ_READY & urh->app.celi)) &&
-       (urh->in_buffer_off < urh->in_buffer_size) )
-    {
-      ssize_t res;
-
-      res = gnutls_record_recv (urh->connection->tls_session,
-                                &urh->in_buffer[urh->in_buffer_off],
-                                urh->in_buffer_size - urh->in_buffer_off);
-      if ( (GNUTLS_E_AGAIN == res) ||
-           (GNUTLS_E_INTERRUPTED == res) )
-        {
-          urh->app.celi &= ~MHD_EPOLL_STATE_READ_READY;
-        }
-      else if (res > 0)
-        {
-          urh->in_buffer_off += res;
-        }
-    }
-  if ( (0 != (MHD_EPOLL_STATE_WRITE_READY & urh->mhd.celi)) &&
-       (urh->in_buffer_off > 0) )
-    {
-      size_t res;
-
-      res = write (urh->mhd.socket,
-                   urh->in_buffer,
-                   urh->in_buffer_off);
-      if (-1 == res)
-        {
-          /* FIXME: differenciate by errno? */
-          urh->mhd.celi &= ~MHD_EPOLL_STATE_WRITE_READY;
-        }
-      else
-        {
-          if (urh->in_buffer_off != res)
-            {
-              memmove (urh->in_buffer,
-                       &urh->in_buffer[res],
-                       urh->in_buffer_off - res);
-              urh->in_buffer_off -= res;
-            }
-          else
-            {
-              urh->in_buffer_off = 0;
-            }
-        }
-    }
-
-  /* handle reading from application and writing to HTTPS client */
-  if ( (0 != (MHD_EPOLL_STATE_READ_READY & urh->mhd.celi)) &&
-       (urh->out_buffer_off < urh->out_buffer_size) )
-    {
-      size_t res;
-
-      res = read (urh->mhd.socket,
-                  &urh->out_buffer[urh->out_buffer_off],
-                  urh->out_buffer_size - urh->out_buffer_off);
-      if (-1 == res)
-        {
-          /* FIXME: differenciate by errno? */
-          urh->mhd.celi &= ~MHD_EPOLL_STATE_READ_READY;
-        }
-      else
-        {
-          urh->out_buffer_off += res;
-        }
-    }
-  if ( (0 != (MHD_EPOLL_STATE_WRITE_READY & urh->app.celi)) &&
-       (urh->out_buffer_off > 0) )
-    {
-      ssize_t res;
-
-      res = gnutls_record_send (urh->connection->tls_session,
-                                urh->out_buffer,
-                                urh->out_buffer_off);
-      if ( (GNUTLS_E_AGAIN == res) ||
-           (GNUTLS_E_INTERRUPTED == res) )
-        {
-          urh->app.celi &= ~MHD_EPOLL_STATE_WRITE_READY;
-        }
-      else if (res > 0)
-        {
-          if (urh->out_buffer_off != res)
-            {
-              memmove (urh->out_buffer,
-                       &urh->out_buffer[res],
-                       urh->out_buffer_off - res);
-              urh->out_buffer_off -= res;
-            }
-          else
-            {
-              urh->out_buffer_off = 0;
-            }
-        }
-    }
-}
-#endif
-
-
-/**
  * Run webserver operations. This method should be called by clients
  * in combination with #MHD_get_fdset if the client-controlled select
  * method is used.
@@ -2362,14 +2589,9 @@
   for (urh = daemon->urh_head; NULL != urh; urh = urh->next)
     {
       /* update urh state based on select() output */
-      if (FD_ISSET (urh->connection->socket_fd, read_fd_set))
-        urh->app.celi |= MHD_EPOLL_STATE_READ_READY;
-      if (FD_ISSET (urh->connection->socket_fd, write_fd_set))
-        urh->app.celi |= MHD_EPOLL_STATE_WRITE_READY;
-      if (FD_ISSET (urh->mhd.socket, read_fd_set))
-        urh->mhd.celi |= MHD_EPOLL_STATE_READ_READY;
-      if (FD_ISSET (urh->mhd.socket, write_fd_set))
-        urh->mhd.celi |= MHD_EPOLL_STATE_WRITE_READY;
+      urh_from_fdset (urh,
+                      read_fd_set,
+                      write_fd_set);
       /* call generic forwarding function for passing data */
       process_urh (urh);
     }

Modified: libmicrohttpd/src/microhttpd/internal.h
===================================================================
--- libmicrohttpd/src/microhttpd/internal.h     2016-09-03 08:16:26 UTC (rev 
37857)
+++ libmicrohttpd/src/microhttpd/internal.h     2016-09-03 09:56:30 UTC (rev 
37858)
@@ -480,8 +480,15 @@
    * Connection was "upgraded" and socket is now under the
    * control of the application.
    */
-  MHD_CONNECTION_UPGRADE = MHD_TLS_CONNECTION_INIT + 1
+  MHD_CONNECTION_UPGRADE = MHD_TLS_CONNECTION_INIT + 1,
 
+  /**
+   * Connection was "upgraded" and subsequently closed
+   * by the application.  We now need to do our own
+   * internal cleanup.
+   */
+  MHD_CONNECTION_UPGRADE_CLOSED = MHD_TLS_CONNECTION_INIT + 1
+
 };
 
 /**
@@ -854,6 +861,23 @@
 
 #if HTTPS_SUPPORT
   /**
+   * If this connection was upgraded and if we are using
+   * #MHD_USE_THREAD_PER_CONNECTION, this points to the
+   * upgrade response details such that the
+   * #thread_main_connection_upgrade()-logic can perform
+   * the bi-directional forwarding.
+   */
+  struct MHD_UpgradeResponseHandle *urh;
+
+  /**
+   * If this connection was upgraded and if we are using
+   * #MHD_USE_THREAD_PER_CONNECTION without encryption,
+   * this points to the semaphore we use to signal termination
+   * to the thread handling the connection.
+   */
+  struct MHD_Semaphore *upgrade_sem;
+
+  /**
    * State required for HTTPS/SSL/TLS support.
    */
   gnutls_session_t tls_session;

Modified: libmicrohttpd/src/microhttpd/mhd_locks.h
===================================================================
--- libmicrohttpd/src/microhttpd/mhd_locks.h    2016-09-03 08:16:26 UTC (rev 
37857)
+++ libmicrohttpd/src/microhttpd/mhd_locks.h    2016-09-03 09:56:30 UTC (rev 
37858)
@@ -22,8 +22,9 @@
  * @file microhttpd/mhd_locks.h
  * @brief  Header for platform-independent locks abstraction
  * @author Karlson2k (Evgeny Grin)
+ * @author Christian Grothoff
  *
- * Provides basic abstraction for locks and mutex.
+ * Provides basic abstraction for locks/mutex and semaphores.
  * Any functions can be implemented as macro on some platforms
  * unless explicitly marked otherwise.
  * Any function argument can be skipped in macro, so avoid
@@ -147,4 +148,48 @@
 #define MHD_mutex_unlock_(pmutex) (LeaveCriticalSection((pmutex)), !0)
 #endif
 
+
+/**
+ * A semaphore.
+ */
+struct MHD_Semaphore;
+
+
+/**
+ * Create a semaphore with an initial counter of @a init
+ *
+ * @param init initial counter
+ * @return the semaphore, NULL on error
+ */
+struct MHD_Semaphore *
+MHD_semaphore_create (unsigned int init);
+
+
+/**
+ * Count down the semaphore, block if necessary.
+ *
+ * @param sem semaphore to count down.
+ */
+void
+MHD_semaphore_down (struct MHD_Semaphore *sem);
+
+
+/**
+ * Increment the semaphore.
+ *
+ * @param sem semaphore to increment.
+ */
+void
+MHD_semaphore_up (struct MHD_Semaphore *sem);
+
+
+/**
+ * Destroys the semaphore.
+ *
+ * @param sem semaphore to destroy.
+ */
+void
+MHD_semaphore_destroy (struct MHD_Semaphore *sem);
+
+
 #endif /* ! MHD_LOCKS_H */

Added: libmicrohttpd/src/microhttpd/mhd_sem.c
===================================================================
--- libmicrohttpd/src/microhttpd/mhd_sem.c                              (rev 0)
+++ libmicrohttpd/src/microhttpd/mhd_sem.c      2016-09-03 09:56:30 UTC (rev 
37858)
@@ -0,0 +1,138 @@
+/*
+  This file is part of libmicrohttpd
+  Copyright (C) 2016 Christian Grothoff
+
+  This library is free software; you can redistribute it and/or
+  modify it under the terms of the GNU Lesser General Public
+  License as published by the Free Software Foundation; either
+  version 2.1 of the License, or (at your option) any later version.
+
+  This library is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public
+  License along with this library; if not, write to the Free Software
+  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  
USA
+
+*/
+
+/**
+ * @file microhttpd/mhd_sem.c
+ * @brief  implementation of semaphores
+ * @author Christian Grothoff
+ */
+#include "internal.h"
+#include "mhd_locks.h"
+
+/**
+ * A semaphore.
+ */
+struct MHD_Semaphore
+{
+  /**
+   * Mutex we use internally.
+   */
+  pthread_mutex_t mutex;
+
+  /**
+   * Condition variable used to implement the semaphore.
+   */
+  pthread_cond_t cv;
+
+  /**
+   * Current value of the semaphore.
+   */
+  unsigned int counter;
+};
+
+
+/**
+ * Create a semaphore with an initial counter of @a init
+ *
+ * @param init initial counter
+ * @return the semaphore, NULL on error
+ */
+struct MHD_Semaphore *
+MHD_semaphore_create (unsigned int init)
+{
+  struct MHD_Semaphore *sem;
+
+  sem = malloc (sizeof (struct MHD_Semaphore));
+  if (NULL == sem)
+    return NULL;
+  sem->counter = init;
+  if (0 != pthread_mutex_init (&sem->mutex,
+                               NULL))
+    {
+      free (sem);
+      return NULL;
+    }
+  if (0 != pthread_cond_init (&sem->cv,
+                              NULL))
+    {
+      (void) pthread_mutex_destroy (&sem->mutex);
+      free (sem);
+      return NULL;
+    }
+  return sem;
+}
+
+
+/**
+ * Count down the semaphore, block if necessary.
+ *
+ * @param sem semaphore to count down.
+ */
+void
+MHD_semaphore_down (struct MHD_Semaphore *sem)
+{
+  if (! pthread_mutex_lock (&sem->mutex))
+    MHD_PANIC ("pthread_mutex_lock for semaphore failed\n");
+  while (0 == sem->counter)
+    {
+      if (0 != pthread_cond_wait (&sem->cv,
+                                  &sem->mutex))
+        MHD_PANIC ("pthread_cond_wait failed\n");
+    }
+  sem->counter--;
+  if (! pthread_mutex_unlock (&sem->mutex))
+    MHD_PANIC ("pthread_mutex_unlock for semaphore failed\n");
+}
+
+
+/**
+ * Increment the semaphore.
+ *
+ * @param sem semaphore to increment.
+ */
+void
+MHD_semaphore_up (struct MHD_Semaphore *sem)
+{
+  if (! pthread_mutex_lock (&sem->mutex))
+    MHD_PANIC ("pthread_mutex_lock for semaphore failed\n");
+  sem->counter++;
+  pthread_cond_signal (&sem->cv);
+  if (! pthread_mutex_unlock (&sem->mutex))
+    MHD_PANIC ("pthread_mutex_unlock for semaphore failed\n");
+}
+
+
+/**
+ * Destroys the semaphore.
+ *
+ * @param sem semaphore to destroy.
+ */
+void
+MHD_semaphore_destroy (struct MHD_Semaphore *sem)
+{
+  if (0 != pthread_cond_destroy (&sem->cv))
+    MHD_PANIC ("pthread_cond_destroy failed\n");
+  if (0 != pthread_mutex_destroy (&sem->mutex))
+    MHD_PANIC ("pthread_mutex_destroy failed\n");
+  free (sem);
+}
+
+
+/* end of mhd_sem.c */

Modified: libmicrohttpd/src/microhttpd/response.c
===================================================================
--- libmicrohttpd/src/microhttpd/response.c     2016-09-03 08:16:26 UTC (rev 
37857)
+++ libmicrohttpd/src/microhttpd/response.c     2016-09-03 09:56:30 UTC (rev 
37858)
@@ -605,7 +605,23 @@
   switch (action)
   {
   case MHD_UPGRADE_ACTION_CLOSE:
+    /* transition to special 'closed' state for start of cleanup */
+    connection->state = MHD_CONNECTION_UPGRADE_CLOSED;
     /* Application is done with this connection, tear it down! */
+    if (0 != (daemon->options & MHD_USE_THREAD_PER_CONNECTION) )
+      {
+        if (0 == (daemon->options & MHD_USE_SSL) )
+          {
+            /* just need to signal the thread that we are done */
+            MHD_semaphore_up (connection->upgrade_sem);
+          }
+        else
+          {
+            /* signal thread by shutdown() of 'app' socket */
+            shutdown (urh->app.socket, SHUT_RDWR);
+          }
+        return MHD_YES;
+      }
 #if HTTPS_SUPPORT
     if (0 != (daemon->options & MHD_USE_SSL) )
       {
@@ -658,6 +674,9 @@
   case MHD_UPGRADE_ACTION_CORK:
     /* FIXME: not implemented */
     return MHD_NO;
+  case MHD_UPGRADE_ACTION_FLUSH:
+    /* FIXME: not implemented */
+    return MHD_NO;
   default:
     /* we don't understand this one */
     return MHD_NO;
@@ -784,11 +803,6 @@
                                rbo,
                                urh->app.socket,
                                urh);
-    /* As far as MHD is concerned, this connection is
-       suspended; it will be resumed once we are done
-       in the #MHD_upgrade_action() function */
-    MHD_suspend_connection (connection);
-
     /* Launch IO processing by the event loop */
     if (0 != (daemon->options & MHD_USE_EPOLL))
       {
@@ -846,12 +860,25 @@
           return MHD_NO;
        }
       }
-
-    /* This takes care of most event loops: simply add to DLL */
-    DLL_insert (daemon->urh_head,
-                daemon->urh_tail,
-                urh);
-    /* FIXME: None of the above will not work (yet) for thread-per-connection 
processing */
+    if (0 == (daemon->options & MHD_USE_THREAD_PER_CONNECTION) )
+      {
+        /* As far as MHD's event loops are concerned, this connection
+           is suspended; it will be resumed once we are done in the
+           #MHD_upgrade_action() function */
+        MHD_suspend_connection (connection);
+        /* This takes care of further processing for most event loops:
+           simply add to DLL for bi-direcitonal processing */
+        DLL_insert (daemon->urh_head,
+                    daemon->urh_tail,
+                    urh);
+      }
+    else
+      {
+        /* Our caller will set 'connection->state' to
+           MHD_CONNECTION_UPGRADE, thereby triggering the main method
+           of the thread to switch to bi-directional forwarding. */
+        connection->urh = urh;
+      }
     return MHD_YES;
   }
   urh->app.socket = MHD_INVALID_SOCKET;
@@ -864,10 +891,31 @@
                              rbo,
                              connection->socket_fd,
                              urh);
-  /* As far as MHD is concerned, this connection is
-     suspended; it will be resumed once we are done
-     in the #MHD_upgrade_action() function */
-  MHD_suspend_connection (connection);
+  if (0 != (daemon->options & MHD_USE_THREAD_PER_CONNECTION) )
+    {
+      /* Need to give the thread something to block on... */
+      connection->upgrade_sem = MHD_semaphore_create (0);
+      if (NULL == connection->upgrade_sem)
+        {
+#ifdef HAVE_MESSAGES
+          MHD_DLOG (daemon,
+                    "Failed to create semaphore for upgrade handling\n");
+#endif
+          MHD_connection_close_ (connection,
+                                 MHD_REQUEST_TERMINATED_WITH_ERROR);
+          return MHD_NO;
+        }
+      /* Our caller will set 'connection->state' to
+         MHD_CONNECTION_UPGRADE, thereby triggering the
+         main method of the thread to block on the semaphore. */
+    }
+  else
+    {
+      /* As far as MHD's event loops are concerned, this connection is
+         suspended; it will be resumed once we are done in the
+         #MHD_upgrade_action() function */
+      MHD_suspend_connection (connection);
+    }
   return MHD_YES;
 }
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]