gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r37943 - gnunet/src/util


From: gnunet
Subject: [GNUnet-SVN] r37943 - gnunet/src/util
Date: Sun, 18 Sep 2016 22:43:47 +0200

Author: grothoff
Date: 2016-09-18 22:43:47 +0200 (Sun, 18 Sep 2016)
New Revision: 37943

Modified:
   gnunet/src/util/mst.c
   gnunet/src/util/service_new.c
Log:
more work on new MST and service logic

Modified: gnunet/src/util/mst.c
===================================================================
--- gnunet/src/util/mst.c       2016-09-18 09:16:05 UTC (rev 37942)
+++ gnunet/src/util/mst.c       2016-09-18 20:43:47 UTC (rev 37943)
@@ -144,15 +144,20 @@
     {
       /* need to align or need more space */
       mst->pos -= mst->off;
-      memmove (ibuf, &ibuf[mst->off], mst->pos);
+      memmove (ibuf,
+              &ibuf[mst->off],
+              mst->pos);
       mst->off = 0;
     }
     if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
     {
-      delta =
-          GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) -
-                      (mst->pos - mst->off), size);
-      GNUNET_memcpy (&ibuf[mst->pos], buf, delta);
+      delta 
+       = GNUNET_MIN (sizeof (struct GNUNET_MessageHeader)
+                     - (mst->pos - mst->off),
+                     size);
+      GNUNET_memcpy (&ibuf[mst->pos],
+                    buf,
+                    delta);
       mst->pos += delta;
       buf += delta;
       size -= delta;
@@ -178,7 +183,9 @@
     {
       /* can get more space by moving */
       mst->pos -= mst->off;
-      memmove (ibuf, &ibuf[mst->off], mst->pos);
+      memmove (ibuf,
+              &ibuf[mst->off],
+              mst->pos);
       mst->off = 0;
     }
     if (mst->curr_buf < want)
@@ -185,7 +192,8 @@
     {
       /* need to get more space by growing buffer */
       GNUNET_assert (0 == mst->off);
-      mst->hdr = GNUNET_realloc (mst->hdr, want);
+      mst->hdr = GNUNET_realloc (mst->hdr,
+                                want);
       ibuf = (char *) mst->hdr;
       mst->curr_buf = want;
     }
@@ -192,9 +200,12 @@
     hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
     if (mst->pos - mst->off < want)
     {
-      delta = GNUNET_MIN (want - (mst->pos - mst->off), size);
+      delta = GNUNET_MIN (want - (mst->pos - mst->off),
+                         size);
       GNUNET_assert (mst->pos + delta <= mst->curr_buf);
-      GNUNET_memcpy (&ibuf[mst->pos], buf, delta);
+      GNUNET_memcpy (&ibuf[mst->pos],
+                    buf,
+                    delta);
       mst->pos += delta;
       buf += delta;
       size -= delta;
@@ -278,12 +289,15 @@
   {
     if (size + mst->pos > mst->curr_buf)
     {
-      mst->hdr = GNUNET_realloc (mst->hdr, size + mst->pos);
+      mst->hdr = GNUNET_realloc (mst->hdr,
+                                size + mst->pos);
       ibuf = (char *) mst->hdr;
       mst->curr_buf = size + mst->pos;
     }
     GNUNET_assert (size + mst->pos <= mst->curr_buf);
-    GNUNET_memcpy (&ibuf[mst->pos], buf, size);
+    GNUNET_memcpy (&ibuf[mst->pos],
+                  buf,
+                  size);
     mst->pos += size;
   }
   if (purge)
@@ -318,8 +332,35 @@
                  int purge,
                  int one_shot)
 {
-  GNUNET_assert (0); // not implemented
-  return GNUNET_SYSERR;
+  ssize_t ret;
+  size_t left;
+  char *buf;
+
+  left = mst->curr_buf - mst->pos;
+  buf = (char *) mst->hdr;
+  ret = GNUNET_NETWORK_socket_recv (sock,
+                                   &buf[mst->pos],
+                                   left);
+  if (-1 == ret)
+  {
+    if ( (EAGAIN == errno) ||
+        (EINTR == errno) )
+      return GNUNET_OK;
+    GNUNET_log_strerror (GNUNET_ERROR_TYPE_INFO,
+                        "recv");
+    return GNUNET_SYSERR;
+  }
+  if (0 == ret)
+  {
+    /* other side closed connection, treat as error */
+    return GNUNET_SYSERR;
+  }
+  mst->pos += ret;
+  return GNUNET_MST_from_buffer (mst,
+                                NULL,
+                                0,
+                                purge,
+                                one_shot);
 }
 
 

Modified: gnunet/src/util/service_new.c
===================================================================
--- gnunet/src/util/service_new.c       2016-09-18 09:16:05 UTC (rev 37942)
+++ gnunet/src/util/service_new.c       2016-09-18 20:43:47 UTC (rev 37943)
@@ -205,9 +205,8 @@
   int ret;
 
   /**
-   * If GNUNET_YES, consider unknown message types an error where the
+   * If #GNUNET_YES, consider unknown message types an error where the
    * client is disconnected.
-   * FIXME: remove?
    */
   int require_found;
 };
@@ -247,7 +246,7 @@
   /**
    * Tokenizer we use for processing incoming data.
    */
-  struct GNUNET_SERVER_MessageStreamTokenizer *mst;
+  struct GNUNET_MessageStreamTokenizer *mst;
 
   /**
    * Task that warns about missing calls to
@@ -273,6 +272,12 @@
   void *user_context;
 
   /**
+   * Time when we last gave a message from this client
+   * to the application.
+   */
+  struct GNUNET_TIME_Absolute warn_start;
+  
+  /**
    * Persist the file handle for this client no matter what happens,
    * force the OS to close once the process actually dies.  Should only
    * be used in special cases!
@@ -287,6 +292,11 @@
   int is_monitor;
 
   /**
+   * Are we waiting for the application to call 
#GNUNET_SERVICE_client_continue()?
+   */
+  int needs_continue;
+
+  /**
    * Type of last message processed (for warn_no_receive_done).
    */
   uint16_t warn_type;
@@ -386,7 +396,9 @@
 {
   char *opt;
 
-  if (!GNUNET_CONFIGURATION_have_value (sh->cfg, sh->service_name, option))
+  if (! GNUNET_CONFIGURATION_have_value (sh->cfg,
+                                        sh->service_name,
+                                        option))
   {
     *ret = NULL;
     return GNUNET_OK;
@@ -394,7 +406,8 @@
   GNUNET_break (GNUNET_OK ==
                 GNUNET_CONFIGURATION_get_value_string (sh->cfg,
                                                        sh->service_name,
-                                                       option, &opt));
+                                                       option,
+                                                      &opt));
   if (NULL == (*ret = GNUNET_STRINGS_parse_ipv4_policy (opt)))
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
@@ -426,7 +439,9 @@
 {
   char *opt;
 
-  if (!GNUNET_CONFIGURATION_have_value (sh->cfg, sh->service_name, option))
+  if (! GNUNET_CONFIGURATION_have_value (sh->cfg,
+                                        sh->service_name,
+                                        option))
   {
     *ret = NULL;
     return GNUNET_OK;
@@ -434,12 +449,15 @@
   GNUNET_break (GNUNET_OK ==
                 GNUNET_CONFIGURATION_get_value_string (sh->cfg,
                                                        sh->service_name,
-                                                       option, &opt));
+                                                       option,
+                                                      &opt));
   if (NULL == (*ret = GNUNET_STRINGS_parse_ipv6_policy (opt)))
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
          _("Could not parse IPv6 network specification `%s' for `%s:%s'\n"),
-         opt, sh->service_name, option);
+         opt,
+        sh->service_name,
+        option);
     GNUNET_free (opt);
     return GNUNET_SYSERR;
   }
@@ -469,7 +487,9 @@
 
   un = GNUNET_new (struct sockaddr_un);
   un->sun_family = AF_UNIX;
-  strncpy (un->sun_path, unixpath, sizeof (un->sun_path) - 1);
+  strncpy (un->sun_path,
+          unixpath,
+          sizeof (un->sun_path) - 1);
 #ifdef LINUX
   if (GNUNET_YES == abstract)
     un->sun_path[0] = '\0';
@@ -554,10 +574,10 @@
                                         0);
     if (NULL == desc)
     {
-      if ((ENOBUFS == errno) ||
-         (ENOMEM == errno) ||
-         (ENFILE == errno) ||
-          (EACCES == errno))
+      if ( (ENOBUFS == errno) ||
+          (ENOMEM == errno) ||
+          (ENFILE == errno) ||
+          (EACCES == errno) )
       {
         LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR,
                      "socket");
@@ -571,7 +591,8 @@
     }
     else
     {
-      GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (desc));
+      GNUNET_break (GNUNET_OK ==
+                   GNUNET_NETWORK_socket_close (desc));
       desc = NULL;
     }
   }
@@ -648,9 +669,9 @@
     if (GNUNET_SYSERR == abstract)
       abstract = GNUNET_NO;
 #endif
-    if ((GNUNET_YES != abstract)
-        && (GNUNET_OK !=
-            GNUNET_DISK_directory_create_for_file (unixpath)))
+    if ( (GNUNET_YES != abstract) &&
+        (GNUNET_OK !=
+         GNUNET_DISK_directory_create_for_file (unixpath)) )
       GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR,
                                "mkdir",
                                unixpath);
@@ -682,7 +703,8 @@
     }
     else
     {
-      GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (desc));
+      GNUNET_break (GNUNET_OK ==
+                   GNUNET_NETWORK_socket_close (desc));
       desc = NULL;
     }
   }
@@ -994,7 +1016,8 @@
     LOG (GNUNET_ERROR_TYPE_ERROR,
          _("Could not access a pre-bound socket, will try to bind myself\n"));
     for (i = 0; (i < count) && (NULL != lsocks[i]); i++)
-      GNUNET_break (0 == GNUNET_NETWORK_socket_close (lsocks[i]));
+      GNUNET_break (GNUNET_OK ==
+                   GNUNET_NETWORK_socket_close (lsocks[i]));
     GNUNET_free (lsocks);
     return NULL;
   }
@@ -1081,7 +1104,8 @@
              GNUNET_a2s (server_addr, socklen));
       }
     }
-    GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock));
+    GNUNET_break (GNUNET_OK ==
+                 GNUNET_NETWORK_socket_close (sock));
     errno = eno;
     return NULL;
   }
@@ -1090,7 +1114,8 @@
   {
     LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR,
                   "listen");
-    GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock));
+    GNUNET_break (GNUNET_OK ==
+                 GNUNET_NETWORK_socket_close (sock));
     errno = 0;
     return NULL;
   }
@@ -1177,7 +1202,8 @@
              (unsigned int) 3 + cnt);
         cnt++;
         while (NULL != lsocks[cnt])
-          GNUNET_break (0 == GNUNET_NETWORK_socket_close (lsocks[cnt++]));
+          GNUNET_break (GNUNET_OK ==
+                       GNUNET_NETWORK_socket_close (lsocks[cnt++]));
         GNUNET_free (lsocks);
         lsocks = NULL;
         break;
@@ -1694,7 +1720,10 @@
     struct ServiceListenContext *slc = sh.slc_head;
 
     sh.slc_head = slc->next;
-    // FIXME: destroy slc
+    if (NULL != slc->listen_task)
+      GNUNET_SCHEDULER_cancel (slc->listen_task);
+    GNUNET_break (GNUNET_OK ==
+                 GNUNET_NETWORK_socket_close (slc->listen_socket));
     GNUNET_free (slc);
   }
 
@@ -1778,7 +1807,7 @@
  * the message queue.
  * Not every message queue implementation supports an error handler.
  *
- * @param cls closure
+ * @param cls closure with our `struct GNUNET_SERVICE_Client`
  * @param error error code
  */
 static void
@@ -1786,30 +1815,63 @@
                           enum GNUNET_MQ_Error error)
 {
   struct GNUNET_SERVICE_Client *client = cls;
+  struct GNUNET_SERVICE_Handle *sh = client->sh;
 
-  // FIXME!
+  if ( (GNUNET_MQ_ERROR_NO_MATCH == error) &&
+       (GNUNET_NO == sh->require_found) )
+    return; /* ignore error */
+  GNUNET_SERVICE_client_drop (client);
 }
 
 
 /**
+ * Task run to warn about missing calls to #GNUNET_SERVICE_client_continue().
+ *
+ * @param cls our `struct GNUNET_SERVICE_Client *` to process more requests 
from
+ */
+static void
+warn_no_client_continue (void *cls)
+{
+  struct GNUNET_SERVICE_Client *client = cls;
+
+  GNUNET_break (0 != client->warn_type); /* type should never be 0 here, as we 
don't use 0 */
+  client->warn_task 
+    = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
+                                    &warn_no_client_continue,
+                                   client);
+  LOG (GNUNET_ERROR_TYPE_WARNING,
+       _("Processing code for message of type %u did not call 
`GNUNET_SERVICE_client_continue' after %s\n"),
+       (unsigned int) client->warn_type,
+       GNUNET_STRINGS_relative_time_to_string 
(GNUNET_TIME_absolute_get_duration (client->warn_start),
+                                              GNUNET_YES));
+}
+
+
+/**
  * Functions with this signature are called whenever a
  * complete message is received by the tokenizer for a client.
  *
- * Do not call #GNUNET_SERVER_mst_destroy() from within
+ * Do not call #GNUNET_MST_destroy() from within
  * the scope of this callback.
  *
  * @param cls closure with the `struct GNUNET_SERVICE_Client *`
- * @param client_cls closure with the `struct GNUNET_SERVICE_Client *`
  * @param message the actual message
  * @return #GNUNET_OK on success (always)
  */
 static int
 service_client_mst_cb (void *cls,
-                       void *client_cls,
                        const struct GNUNET_MessageHeader *message)
 {
   struct GNUNET_SERVICE_Client *client = cls;
 
+  GNUNET_assert (GNUNET_NO == client->needs_continue);
+  client->needs_continue = GNUNET_YES;
+  client->warn_type = ntohs (message->type);
+  client->warn_start = GNUNET_TIME_absolute_get ();
+  client->warn_task
+    = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
+                                   &warn_no_client_continue,
+                                   client);
   GNUNET_MQ_inject_message (client->mq,
                             message);
   return GNUNET_OK;
@@ -1826,10 +1888,31 @@
 service_client_recv (void *cls)
 {
   struct GNUNET_SERVICE_Client *client = cls;
+  int ret;
 
-  // FIXME: read into buffer, pass to MST, then client->mq inject!
-  // FIXME: revise MST API to avoid the memcpy!
-  // i.e.: GNUNET_MST_read (client->sock);
+  client->recv_task = NULL;
+  ret = GNUNET_MST_read (client->mst,
+                        client->sock,
+                        GNUNET_NO,
+                        GNUNET_YES);
+  if (GNUNET_SYSERR == ret)
+  {
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (client);
+    return;
+  }
+  if (GNUNET_NO == ret)
+    return; /* more messages in buffer, wait for application
+              to be done processing */
+  GNUNET_assert (GNUNET_OK == ret);
+  if (GNUNET_YES == client->needs_continue)
+    return;
+  /* MST needs more data, re-schedule read job */
+  client->recv_task
+    = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
+                                    client->sock,
+                                    &service_client_recv,
+                                    client);
 }
 
 
@@ -1859,8 +1942,8 @@
                                               sh->handlers,
                                               &service_mq_error_handler,
                                               client);
-  client->mst = GNUNET_SERVER_mst_create (&service_client_mst_cb,
-                                          client);
+  client->mst = GNUNET_MST_create (&service_client_mst_cb,
+                                  client);
   client->user_context = sh->connect_cb (sh->cb_cls,
                                          client,
                                          client->mq);
@@ -1955,69 +2038,70 @@
 
   slc->listen_task = NULL;
   while (1)
+  {
+    struct GNUNET_NETWORK_Handle *sock;
+    const struct sockaddr_in *v4;
+    const struct sockaddr_in6 *v6;
+    struct sockaddr_storage sa;
+    socklen_t addrlen;
+    int ok;
+    
+    addrlen = sizeof (sa);
+    sock = GNUNET_NETWORK_socket_accept (slc->listen_socket,
+                                        (struct sockaddr *) &sa,
+                                        &addrlen);
+    if (NULL == sock)
+      break;
+    switch (sa.ss_family)
     {
-      struct GNUNET_NETWORK_Handle *sock;
-      const struct sockaddr_in *v4;
-      const struct sockaddr_in6 *v6;
-      struct sockaddr_storage sa;
-      socklen_t addrlen;
-      int ok;
-
-      addrlen = sizeof (sa);
-      sock = GNUNET_NETWORK_socket_accept (slc->listen_socket,
-                                           (struct sockaddr *) &sa,
-                                           &addrlen);
-      if (NULL == sock)
-        break;
-      switch (sa.ss_family)
-      {
-      case AF_INET:
-        GNUNET_assert (addrlen == sizeof (struct sockaddr_in));
-        v4 = (const struct sockaddr_in *) &sa;
-        ok = ( ( (NULL == sh->v4_allowed) ||
-                 (check_ipv4_listed (sh->v4_allowed,
-                                     &v4->sin_addr))) &&
-               ( (NULL == sh->v4_denied) ||
-                 (! check_ipv4_listed (sh->v4_denied,
-                                       &v4->sin_addr)) ) );
-        break;
-      case AF_INET6:
-        GNUNET_assert (addrlen == sizeof (struct sockaddr_in6));
-        v6 = (const struct sockaddr_in6 *) &sa;
-        ok = ( ( (NULL == sh->v6_allowed) ||
-                 (check_ipv6_listed (sh->v6_allowed,
-                                     &v6->sin6_addr))) &&
-               ( (NULL == sh->v6_denied) ||
-                 (! check_ipv6_listed (sh->v6_denied,
-                                       &v6->sin6_addr)) ) );
-        break;
+    case AF_INET:
+      GNUNET_assert (addrlen == sizeof (struct sockaddr_in));
+      v4 = (const struct sockaddr_in *) &sa;
+      ok = ( ( (NULL == sh->v4_allowed) ||
+              (check_ipv4_listed (sh->v4_allowed,
+                                  &v4->sin_addr))) &&
+            ( (NULL == sh->v4_denied) ||
+              (! check_ipv4_listed (sh->v4_denied,
+                                    &v4->sin_addr)) ) );
+      break;
+    case AF_INET6:
+      GNUNET_assert (addrlen == sizeof (struct sockaddr_in6));
+      v6 = (const struct sockaddr_in6 *) &sa;
+      ok = ( ( (NULL == sh->v6_allowed) ||
+              (check_ipv6_listed (sh->v6_allowed,
+                                  &v6->sin6_addr))) &&
+            ( (NULL == sh->v6_denied) ||
+              (! check_ipv6_listed (sh->v6_denied,
+                                    &v6->sin6_addr)) ) );
+      break;
 #ifndef WINDOWS
-      case AF_UNIX:
-        ok = GNUNET_OK;            /* controlled using file-system ACL now */
-        break;
+    case AF_UNIX:
+      ok = GNUNET_OK;            /* controlled using file-system ACL now */
+      break;
 #endif
-      default:
-        LOG (GNUNET_ERROR_TYPE_WARNING,
-             _("Unknown address family %d\n"),
-             sa.ss_family);
-        return;
-      }
-      if (! ok)
-        {
-          LOG (GNUNET_ERROR_TYPE_DEBUG,
-               "Service rejected incoming connection from %s due to policy.\n",
-               GNUNET_a2s ((const struct sockaddr *) &sa,
-                           addrlen));
-          GNUNET_NETWORK_socket_close (sock);
-          continue;
-        }
+    default:
+      LOG (GNUNET_ERROR_TYPE_WARNING,
+          _("Unknown address family %d\n"),
+          sa.ss_family);
+      return;
+    }
+    if (! ok)
+    {
       LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "Service accepted incoming connection from %s.\n",
-           GNUNET_a2s ((const struct sockaddr *) &sa,
-                       addrlen));
-      start_client (slc->sh,
-                    sock);
+          "Service rejected incoming connection from %s due to policy.\n",
+          GNUNET_a2s ((const struct sockaddr *) &sa,
+                      addrlen));
+      GNUNET_break (GNUNET_OK ==
+                   GNUNET_NETWORK_socket_close (sock));
+      continue;
     }
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+        "Service accepted incoming connection from %s.\n",
+        GNUNET_a2s ((const struct sockaddr *) &sa,
+                    addrlen));
+    start_client (slc->sh,
+                 sock);
+  }
   slc->listen_task
     = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
                                     slc->listen_socket,
@@ -2049,6 +2133,42 @@
 
 
 /**
+ * Task run to resume receiving data from the client after
+ * the client called #GNUNET_SERVICE_client_continue().
+ *
+ * @param cls our `struct GNUNET_SERVICE_Client`
+ */
+static void
+resume_client_receive (void *cls)
+{
+  struct GNUNET_SERVICE_Client *c = cls;
+  int ret;
+
+  c->recv_task = NULL;
+  /* first, check if there is still something in the buffer */
+  ret = GNUNET_MST_next (c->mst,
+                        GNUNET_YES);
+  if (GNUNET_SYSERR == ret)
+  {
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (c);
+    return;
+  }
+  if (GNUNET_NO == ret)
+    return; /* done processing, wait for more later */
+  GNUNET_assert (GNUNET_OK == ret);
+  if (GNUNET_YES == c->needs_continue)
+    return; /* #GNUNET_MST_next() did give a message to the client */
+  /* need to receive more data from the network first */
+  c->recv_task
+    = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
+                                    c->sock,
+                                    &service_client_recv,
+                                    c);  
+}
+
+
+/**
  * Continue receiving further messages from the given client.
  * Must be called after each message received.
  *
@@ -2057,7 +2177,16 @@
 void
 GNUNET_SERVICE_client_continue (struct GNUNET_SERVICE_Client *c)
 {
-  GNUNET_break (0); // not implemented
+  GNUNET_assert (GNUNET_YES == c->needs_continue);
+  GNUNET_assert (NULL == c->recv_task);
+  c->needs_continue = GNUNET_NO;
+  if (NULL != c->warn_task)
+  {
+    GNUNET_SCHEDULER_cancel (c->warn_task);
+    c->warn_task = NULL;
+  }
+  c->recv_task = GNUNET_SCHEDULER_add_now (&resume_client_receive,
+                                          c);
 }
 
 
@@ -2117,11 +2246,12 @@
     GNUNET_SCHEDULER_cancel (c->send_task);
     c->send_task = NULL;
   }
-  GNUNET_SERVER_mst_destroy (c->mst);
+  GNUNET_MST_destroy (c->mst);
   GNUNET_MQ_destroy (c->mq);
   if (GNUNET_NO == c->persist)
   {
-    GNUNET_NETWORK_socket_close (c->sock);
+    GNUNET_break (GNUNET_OK ==
+                 GNUNET_NETWORK_socket_close (c->sock));
   }
   else
   {




reply via email to

[Prev in Thread] Current Thread [Next in Thread]