gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] branch master updated: unix communicator now build


From: gnunet
Subject: [GNUnet-SVN] [gnunet] branch master updated: unix communicator now builds
Date: Thu, 08 Nov 2018 14:20:34 +0100

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository gnunet.

The following commit(s) were added to refs/heads/master by this push:
     new 304bfc5d1 unix communicator now builds
304bfc5d1 is described below

commit 304bfc5d18d5613a38b5d927925dbfa00adfc82a
Author: Christian Grothoff <address@hidden>
AuthorDate: Thu Nov 8 14:20:33 2018 +0100

    unix communicator now builds
---
 .../gnunet_transport_communication_service.h       |   2 +-
 src/transport/Makefile.am                          |   9 +
 src/transport/gnunet-communicator-unix.c           | 769 ++++++++++-----------
 3 files changed, 363 insertions(+), 417 deletions(-)

diff --git a/src/include/gnunet_transport_communication_service.h 
b/src/include/gnunet_transport_communication_service.h
index b1a248e51..ab5d3742a 100644
--- a/src/include/gnunet_transport_communication_service.h
+++ b/src/include/gnunet_transport_communication_service.h
@@ -70,7 +70,7 @@ extern "C"
 typedef int
 (*GNUNET_TRANSPORT_CommunicatorMqInit) (void *cls,
                                         const struct GNUNET_PeerIdentity *peer,
-                                        const void *address);
+                                        const char *address);
 
 
 /**
diff --git a/src/transport/Makefile.am b/src/transport/Makefile.am
index c6c02c6ed..92b53137f 100644
--- a/src/transport/Makefile.am
+++ b/src/transport/Makefile.am
@@ -140,6 +140,7 @@ endif
 
 noinst_PROGRAMS = \
  gnunet-transport-profiler \
+ gnunet-communicator-unix \
  $(WLAN_BIN_SENDER) \
  $(WLAN_BIN_RECEIVER)
 
@@ -219,6 +220,14 @@ gnunet_transport_certificate_creation_SOURCES = \
 gnunet_transport_certificate_creation_LDADD = \
   $(top_builddir)/src/util/libgnunetutil.la
 
+gnunet_communicator_unix_SOURCES = \
+ gnunet-communicator-unix.c
+gnunet_communicator_unix_LDADD = \
+  libgnunettransportcommunicator.la \
+  $(top_builddir)/src/statistics/libgnunetstatistics.la \
+  $(top_builddir)/src/util/libgnunetutil.la
+
+
 gnunet_helper_transport_wlan_SOURCES = \
  gnunet-helper-transport-wlan.c
 
diff --git a/src/transport/gnunet-communicator-unix.c 
b/src/transport/gnunet-communicator-unix.c
index f07975186..2879b1738 100644
--- a/src/transport/gnunet-communicator-unix.c
+++ b/src/transport/gnunet-communicator-unix.c
@@ -154,6 +154,11 @@ static unsigned long long delivering_messages;
 static unsigned long long max_queue_length;
 
 /**
+ * For logging statistics.
+ */
+static struct GNUNET_STATISTICS_Handle *stats;
+
+/**
  * Our environment.
  */
 static struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
@@ -194,12 +199,11 @@ static struct GNUNET_TRANSPORT_AddressIdentifier *ai;
 static void
 queue_destroy (struct Queue *queue)
 {
-  struct Plugin *plugin = cls;
   struct GNUNET_MQ_Handle *mq;
 
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Disconnecting queue for peer `%s'\n",       
-       GNUNET_i2s (&queue->target));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Disconnecting queue for peer `%s'\n",       
+             GNUNET_i2s (&queue->target));
   if (0 != queue->bytes_in_queue)
   {
     GNUNET_CONTAINER_DLL_remove (queue_head,
@@ -253,11 +257,11 @@ queue_timeout (void *cls)
                                      queue);
     return;
   }
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Queue %p was idle for %s, disconnecting\n",
-       queue,
-       GNUNET_STRINGS_relative_time_to_string 
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
-                                              GNUNET_YES));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Queue %p was idle for %s, disconnecting\n",
+             queue,
+             GNUNET_STRINGS_relative_time_to_string 
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+                                                     GNUNET_YES));
   queue_destroy (queue);
 }
 
@@ -288,8 +292,7 @@ reschedule_queue_timeout (struct Queue *queue)
  */
 static struct sockaddr_un *
 unix_address_to_sockaddr (const char *unixpath,
-                          socklen_t *sock_len,
-                         int is_abstract)
+                          socklen_t *sock_len)
 {
   struct sockaddr_un *un;
   size_t slen;
@@ -309,7 +312,7 @@ unix_address_to_sockaddr (const char *unixpath,
   un->sun_len = (u_char) slen;
 #endif
   (*sock_len) = slen;
-  if (GNUNET_YES == is_abstract)
+  if ('@' == un->sun_path[0])
     un->sun_path[0] = '\0';
   return un;
 }
@@ -328,7 +331,7 @@ struct LookupCtx
   /**
    * Address we are looking for.
    */
-  const sockaddr_un *un;
+  const struct sockaddr_un *un;
 
   /**
    * Number of bytes in @a un
@@ -347,7 +350,7 @@ struct LookupCtx
  */
 static int
 lookup_queue_it (void *cls,
-                const struct GNUNET_PeerIdentity * key,
+                const struct GNUNET_PeerIdentity *key,
                 void *value)
 {
   struct LookupCtx *lctx = cls;
@@ -374,14 +377,14 @@ lookup_queue_it (void *cls,
  */
 static struct Queue *
 lookup_queue (const struct GNUNET_PeerIdentity *peer,
-             const sockaddr_un *un,
+             const struct sockaddr_un *un,
              socklen_t un_len)
 {
   struct LookupCtx lctx;
 
   lctx.un = un;
   lctx.un_len = un_len;
-  GNUNET_CONTAINER_multipeermap_get_multiple (plugin->queue_map,
+  GNUNET_CONTAINER_multipeermap_get_multiple (queue_map,
                                              peer,
                                              &lookup_queue_it,
                                              &lctx);
@@ -390,295 +393,6 @@ lookup_queue (const struct GNUNET_PeerIdentity *peer,
 
 
 /**
- * Creates a new outbound queue the transport service will use to send
- * data to another peer.
- *
- * @param peer the target peer
- * @param un the address
- * @param un_len number of bytes in @a un
- * @return the queue or NULL of max connections exceeded
- */
-static struct Queue *
-unix_plugin_get_queue (const struct GNUNET_PeerIdentity *target,
-                      const struct sockaddr_un *un,
-                      socklen_t un_len)
-{
-  struct Plugin *plugin = cls;
-  struct Queue *queue;
-  struct UnixAddress *ua;
-  char * addrstr;
-  uint32_t addr_str_len;
-  uint32_t addr_option;
-  char *foreign_addr;
-  int is_abstract;
-
-  if (is_abstract = ('\0' == un.sun_path[0]))
-    un.sun_path[0] = '/';  
-  GNUNET_asprintf (&foreign_addr,
-                  "%s-%s#%d",
-                  COMMUNICATOR_NAME,
-                  un.sun_path,
-                  is_abstract);
-
-
-  addrstr = (char *) &ua[1];
-  addr_str_len = ntohl (ua->addrlen);
-  addr_option = ntohl (ua->options);
-
-  /* create a new queue */
-  queue = GNUNET_new (struct Queue);
-  queue->target = address->peer;
-  queue->address = GNUNET_HELLO_address_copy (address);
-  queue->plugin = plugin;
-  queue->timeout = GNUNET_TIME_relative_to_absolute 
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
-  queue->timeout_task = GNUNET_SCHEDULER_add_delayed 
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
-                                                        &queue_timeout,
-                                                        queue);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Creating a new queue %p for address `%s'\n",
-       queue,
-       unix_plugin_address_to_string (NULL,
-                                      address->address,
-                                      address->address_length));
-  (void) GNUNET_CONTAINER_multipeermap_put (plugin->queue_map,
-                                           &address->peer, queue,
-                                           
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-  GNUNET_STATISTICS_set (plugin->env->stats,
-                        "# UNIX queues active",
-                        GNUNET_CONTAINER_multipeermap_size (queue_map),
-                        GNUNET_NO);
-  return queue;
-}
-
-
-/**
- * Function that will be called whenever the transport service wants
- * to notify the plugin that a queue is still active and in use and
- * therefore the queue timeout for this queue has to be updated
- *
- * @param cls closure with the `struct Plugin *`
- * @param peer which peer was the queue for
- * @param queue which queue is being updated
- */
-static void
-unix_plugin_update_queue_timeout (void *cls,
-                                 const struct GNUNET_PeerIdentity *peer,
-                                 struct Queue *queue)
-{
-  struct Plugin *plugin = cls;
-
-  if (GNUNET_OK !=
-      GNUNET_CONTAINER_multipeermap_contains_value (plugin->queue_map,
-                                                    &queue->target,
-                                                    queue))
-  {
-    GNUNET_break (0);
-    return;
-  }
-  reschedule_queue_timeout (queue);
-}
-
-
-/**
- * Demultiplexer for UNIX messages
- *
- * @param plugin the main plugin for this transport
- * @param sender from which peer the message was received
- * @param currhdr pointer to the header of the message
- * @param ua address to look for
- * @param ua_len length of the address @a ua
- */
-static void
-unix_demultiplexer (struct Plugin *plugin,
-                    struct GNUNET_PeerIdentity *sender,
-                    const struct GNUNET_MessageHeader *currhdr,
-                    const struct UnixAddress *ua,
-                    size_t ua_len)
-{
-  struct Queue *queue;
-  struct GNUNET_HELLO_Address *address;
-
-  GNUNET_assert (ua_len >= sizeof (struct UnixAddress));
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Received message from %s\n",
-       unix_plugin_address_to_string (NULL, ua, ua_len));
-  GNUNET_STATISTICS_update (plugin->env->stats,
-                           "# bytes received via UNIX",
-                           ntohs (currhdr->size),
-                           GNUNET_NO);
-
-  /* Look for existing queue */
-  address = GNUNET_HELLO_address_allocate (sender,
-                                           PLUGIN_NAME,
-                                           ua, ua_len,
-                                           GNUNET_HELLO_ADDRESS_INFO_NONE); /* 
UNIX does not have "inbound" queues */
-  queue = lookup_queue (plugin, address);
-  if (NULL == queue)
-  {
-    queue = unix_plugin_get_queue (plugin, address);
-    /* Notify transport and ATS about new inbound queue */
-    plugin->env->queue_start (NULL,
-                                queue->address,
-                                queue,
-                                GNUNET_ATS_NET_LOOPBACK);
-  }
-  else
-  {
-    reschedule_queue_timeout (queue);
-  }
-  GNUNET_HELLO_address_free (address);
-  plugin->env->receive (plugin->env->cls,
-                        queue->address,
-                        queue,
-                        currhdr);
-}
-
-
-/**
- * We have been notified that our socket has something to read. Do the
- * read and reschedule this function to be called again once more is
- * available.
- *
- * @param cls NULL
- */
-static void
-select_read_cb (void *cls);
-
-
-/**
- * Function called when message was successfully passed to
- * transport service.  Continue read activity.
- *
- * @param cls NULL
- */
-static void
-receive_complete_cb (void *cls)
-{
-  delivering_messages--;
-  if ( (NULL == read_task) &&
-       (delivering_messages < max_queue_length) )
-    read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
-                                              unix_sock,
-                                              &select_read_cb,
-                                              NULL);
-}
-
-
-/**
- * We have been notified that our socket has something to read. Do the
- * read and reschedule this function to be called again once more is
- * available.
- *
- * @param cls NULL
- */
-static void
-select_read_cb (void *cls)
-{
-  char buf[65536] GNUNET_ALIGN;
-  struct Queue *queue;
-  const struct UNIXMessage *msg;
-  struct sockaddr_un un;
-  socklen_t addrlen;
-  ssize_t ret;
-  uint16_t msize;
-
-  read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
-                                            unix_sock,
-                                            &select_read_cb,
-                                            NULL);
-  addrlen = sizeof (un);
-  memset (&un,
-         0,
-         sizeof (un));
-  ret = GNUNET_NETWORK_socket_recvfrom (unix_sock,
-                                        buf,
-                                       sizeof (buf),
-                                        (struct sockaddr *) &un,
-                                        &addrlen);
-  if ( (-1 == ret) &&
-       ( (EAGAIN == errno) ||
-        (ENOBUFS == errno) ) )
-    return;
-  if (-1 == ret)
-  {
-    GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
-                         "recvfrom");
-    return;
-  }
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Read %d bytes from socket %s\n",
-       (int) ret,
-       un.sun_path);
-  GNUNET_assert (AF_UNIX == (un.sun_family));
-  msg = (struct UNIXMessage *) buf;
-  msize = ntohs (msg->header.size);
-  if ( (msize < sizeof (struct UNIXMessage)) ||
-       (msize > ret) )
-  {
-    GNUNET_break_op (0);
-    return;
-  }
-  queue = lookup_queue (&msg->sender,
-                       un,
-                       addrlen);
-  if (NULL == queue)
-    queue = setup_queue (&msg->sender,
-                        un,
-                        addrlen);
-  if (NULL == queue)
-  {
-    GNUENT_log (GNUNET_ERROR_TYPE_ERROR,
-               _("Maximum number of UNIX connections exceeded, dropping 
incoming message\n"));
-    return;
-  }
-  
-
-  {
-    uint16_t offset = 0;
-    uint16_t tsize = msize - sizeof (struct UNIXMessage);
-    const char *msgbuf = (const char *) &msg[1];
-    
-    while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize)
-    {
-      const struct GNUNET_MessageHeader *currhdr;
-      struct GNUNET_MessageHeader al_hdr;
-      uint16_t csize;
-
-      currhdr = (const struct GNUNET_MessageHeader *) &msgbuf[offset];
-      /* ensure aligned access */
-      memcpy (&al_hdr,
-             currhdr,
-             sizeof (al_hdr));
-      csize = ntohs (al_hdr.size);
-      if ( (csize < sizeof (struct GNUNET_MessageHeader)) ||
-          (csize > tsize - offset))
-      {
-       GNUNET_break_op (0);
-       break;
-      }
-      ret = GNUNET_TRANSPORT_communicator_receive (ch,
-                                                  &msg->sender,
-                                                  currhdr,
-                                                  &receive_complete_cb,
-                                                  NULL);
-      if (GNUNET_SYSERR == ret)
-       return; /* transport not up */
-      if (GNUNET_NO == ret)
-       break;
-      delivering_messages++;
-      offset += csize;
-    }
-  }
-  if (delivering_messages >= max_queue_length)
-  {
-    /* we should try to apply 'back pressure' */
-    GNUNET_SCHEDULER_cancel (read_task);
-    read_task = NULL;
-  }
-}
-
-
-/**
  * We have been notified that our socket is ready to write.
  * Then reschedule this function to be called again once more is available.
  *
@@ -712,16 +426,27 @@ select_write_cb (void *cls)
   sent = GNUNET_NETWORK_socket_sendto (unix_sock,
                                        queue->msg,
                                        msg_size,
-                                       (const struct sockaddr *) mq->address,
-                                       mq->address_len);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "UNIX transmitted message to %s (%d/%u: %s)\n",
-       GNUNET_i2s (&queue->target),      
-       (int) sent,
-       (unsigned int) msg_size,
-       (sent < 0) ? STRERROR (errno) : "ok");
+                                       (const struct sockaddr *) 
queue->address,
+                                       queue->address_len);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "UNIX transmitted message to %s (%d/%u: %s)\n",
+             GNUNET_i2s (&queue->target),      
+             (int) sent,
+             (unsigned int) msg_size,
+             (sent < 0) ? STRERROR (errno) : "ok");
   if (-1 != sent)
+  {
+    GNUNET_STATISTICS_update (stats,
+                             "# bytes sent",
+                             (long long) sent,
+                             GNUNET_NO);
+    reschedule_queue_timeout (queue);
     return; /* all good */
+  }
+  GNUNET_STATISTICS_update (stats,
+                           "# network transmission failures",
+                           1,
+                           GNUNET_NO);
   switch (errno)
   {
   case EAGAIN:
@@ -747,11 +472,11 @@ select_write_cb (void *cls)
         GNUNET_break (0);
         return;
       }
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-          "Trying to increase socket buffer size from %u to %u for message 
size %u\n",
-          (unsigned int) size,
-          (unsigned int) m((msg_size / 1000) + 2) * 1000,
-          (unsigned int) msg_size);
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Trying to increase socket buffer size from %u to %u for 
message size %u\n",
+                 (unsigned int) size,
+                 (unsigned int) ((msg_size / 1000) + 2) * 1000,
+                 (unsigned int) msg_size);
       size = ((msg_size / 1000) + 2) * 1000;
       if (GNUNET_OK ==
          GNUNET_NETWORK_socket_setsockopt (unix_sock,
@@ -846,7 +571,17 @@ mq_cancel (struct GNUNET_MQ_Handle *mq,
 {
   struct Queue *queue = impl_state;
 
-  // FIXME: TBD!
+  GNUNET_assert (NULL != queue->msg);
+  queue->msg = NULL;
+  GNUNET_CONTAINER_DLL_remove (queue_head,
+                              queue_tail,
+                              queue);
+  GNUNET_assert (NULL != write_task);
+  if (NULL == queue_head)
+  {
+    GNUNET_SCHEDULER_cancel (write_task);
+    write_task = NULL;
+  }
 }
 
 
@@ -865,7 +600,230 @@ mq_error (void *cls,
 {
   struct Queue *queue = cls;
 
-  // FIXME: TBD!
+  GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+             "UNIX MQ error in queue to %s: %d\n",
+             GNUNET_i2s (&queue->target),
+             (int) error);
+  queue_destroy (queue);
+}
+
+
+/**
+ * Creates a new outbound queue the transport service will use to send
+ * data to another peer.
+ *
+ * @param peer the target peer
+ * @param un the address
+ * @param un_len number of bytes in @a un
+ * @return the queue or NULL of max connections exceeded
+ */
+static struct Queue *
+setup_queue (const struct GNUNET_PeerIdentity *target,
+            const struct sockaddr_un *un,
+            socklen_t un_len)
+{
+  struct Queue *queue;
+
+  queue = GNUNET_new (struct Queue);
+  queue->target = *target;
+  queue->address = GNUNET_memdup (un,
+                                 un_len);
+  queue->address_len = un_len;
+  (void) GNUNET_CONTAINER_multipeermap_put (queue_map,
+                                           &queue->target,
+                                           queue,
+                                           
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  GNUNET_STATISTICS_set (stats,
+                        "# queues active",
+                        GNUNET_CONTAINER_multipeermap_size (queue_map),
+                        GNUNET_NO);
+  queue->timeout = GNUNET_TIME_relative_to_absolute 
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+  queue->timeout_task
+    = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+                                   &queue_timeout,
+                                   queue);
+  queue->mq
+    = GNUNET_MQ_queue_for_callbacks (&mq_send,
+                                    &mq_destroy,
+                                    &mq_cancel,
+                                    queue,
+                                    NULL,
+                                    &mq_error,
+                                    queue);
+  {
+    char *foreign_addr;
+    
+    if ('\0' == un->sun_path[0])
+      GNUNET_asprintf (&foreign_addr,
+                      "address@hidden",
+                      COMMUNICATOR_NAME,
+                      &un->sun_path[1]);
+    else
+      GNUNET_asprintf (&foreign_addr,
+                      "%s-%s",
+                      COMMUNICATOR_NAME,
+                      un->sun_path);
+    queue->qh
+      = GNUNET_TRANSPORT_communicator_mq_add (ch,
+                                             &queue->target,
+                                             foreign_addr,
+                                             GNUNET_ATS_NET_LOOPBACK,
+                                             queue->mq);
+    GNUNET_free (foreign_addr);
+  }
+  return queue;
+}
+
+
+/**
+ * We have been notified that our socket has something to read. Do the
+ * read and reschedule this function to be called again once more is
+ * available.
+ *
+ * @param cls NULL
+ */
+static void
+select_read_cb (void *cls);
+
+
+/**
+ * Function called when message was successfully passed to
+ * transport service.  Continue read activity.
+ *
+ * @param cls NULL
+ * @param success #GNUNET_OK on success
+ */
+static void
+receive_complete_cb (void *cls,
+                    int success)
+{
+  delivering_messages--;
+  if (GNUNET_OK != success)
+    GNUNET_STATISTICS_update (stats,
+                             "# transport transmission failures",
+                             1,
+                             GNUNET_NO);
+  if ( (NULL == read_task) &&
+       (delivering_messages < max_queue_length) )
+    read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
+                                              unix_sock,
+                                              &select_read_cb,
+                                              NULL);
+}
+
+
+/**
+ * We have been notified that our socket has something to read. Do the
+ * read and reschedule this function to be called again once more is
+ * available.
+ *
+ * @param cls NULL
+ */
+static void
+select_read_cb (void *cls)
+{
+  char buf[65536] GNUNET_ALIGN;
+  struct Queue *queue;
+  const struct UNIXMessage *msg;
+  struct sockaddr_un un;
+  socklen_t addrlen;
+  ssize_t ret;
+  uint16_t msize;
+
+  read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
+                                            unix_sock,
+                                            &select_read_cb,
+                                            NULL);
+  addrlen = sizeof (un);
+  memset (&un,
+         0,
+         sizeof (un));
+  ret = GNUNET_NETWORK_socket_recvfrom (unix_sock,
+                                        buf,
+                                       sizeof (buf),
+                                        (struct sockaddr *) &un,
+                                        &addrlen);
+  if ( (-1 == ret) &&
+       ( (EAGAIN == errno) ||
+        (ENOBUFS == errno) ) )
+    return;
+  if (-1 == ret)
+  {
+    GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
+                         "recvfrom");
+    return;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Read %d bytes from socket %s\n",
+             (int) ret,
+             un.sun_path);
+  GNUNET_assert (AF_UNIX == (un.sun_family));
+  msg = (struct UNIXMessage *) buf;
+  msize = ntohs (msg->header.size);
+  if ( (msize < sizeof (struct UNIXMessage)) ||
+       (msize > ret) )
+  {
+    GNUNET_break_op (0);
+    return;
+  }
+  queue = lookup_queue (&msg->sender,
+                       &un,
+                       addrlen);
+  if (NULL == queue)
+    queue = setup_queue (&msg->sender,
+                        &un,
+                        addrlen);
+  else
+    reschedule_queue_timeout (queue);
+  if (NULL == queue)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+               _("Maximum number of UNIX connections exceeded, dropping 
incoming message\n"));
+    return;
+  }
+ 
+  {
+    uint16_t offset = 0;
+    uint16_t tsize = msize - sizeof (struct UNIXMessage);
+    const char *msgbuf = (const char *) &msg[1];
+    
+    while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize)
+    {
+      const struct GNUNET_MessageHeader *currhdr;
+      struct GNUNET_MessageHeader al_hdr;
+      uint16_t csize;
+
+      currhdr = (const struct GNUNET_MessageHeader *) &msgbuf[offset];
+      /* ensure aligned access */
+      memcpy (&al_hdr,
+             currhdr,
+             sizeof (al_hdr));
+      csize = ntohs (al_hdr.size);
+      if ( (csize < sizeof (struct GNUNET_MessageHeader)) ||
+          (csize > tsize - offset))
+      {
+       GNUNET_break_op (0);
+       break;
+      }
+      ret = GNUNET_TRANSPORT_communicator_receive (ch,
+                                                  &msg->sender,
+                                                  currhdr,
+                                                  &receive_complete_cb,
+                                                  NULL);
+      if (GNUNET_SYSERR == ret)
+       return; /* transport not up */
+      if (GNUNET_NO == ret)
+       break;
+      delivering_messages++;
+      offset += csize;
+    }
+  }
+  if (delivering_messages >= max_queue_length)
+  {
+    /* we should try to apply 'back pressure' */
+    GNUNET_SCHEDULER_cancel (read_task);
+    read_task = NULL;
+  }
 }
 
 
@@ -889,76 +847,69 @@ mq_error (void *cls,
 static int
 mq_init (void *cls,
         const struct GNUNET_PeerIdentity *peer,
-        const void *address)
+        const char *address)
 {
   struct Queue *queue;
-  char *a;
-  char *e;
-  int is_abs;
-  sockaddr_un *un;
+  const char *path;
+  struct sockaddr_un *un;
   socklen_t un_len;
   
-  if (NULL == strncmp (address,
-                      COMMUNICATOR_NAME "-",
-                      strlen (COMMUNICATOR_NAME "-")))
+  if (0 != strncmp (address,
+                   COMMUNICATOR_NAME "-",
+                   strlen (COMMUNICATOR_NAME "-")))
   {
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
   }
-  a = GNUNET_strdup (&address[strlen (COMMUNICATOR_NAME "-")]);
-  e = strchr (a,
-             (unsigned char) '#');
-  if (NULL == e)
-  {
-    GNUNET_free (a);
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;    
-  }
-  is_abs = ('1' == e[1]);
-  *e = '\0';
-  un = unix_address_to_sockaddr (a,
-                                &un_len,
-                                is_abs);
+  path = &address[strlen (COMMUNICATOR_NAME "-")];
+  un = unix_address_to_sockaddr (path,
+                                &un_len);
   queue = lookup_queue (peer,
                        un,
                        un_len);
   if (NULL != queue)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-               "Address `%s' ignored, queue exists\n",
-               address);
+               "Address `%s' for %s ignored, queue exists\n",
+               path,
+               GNUNET_i2s (peer));
     GNUNET_free (un);
     return GNUNET_OK;
   }
-  queue = GNUNET_new (struct Queue);
-  queue->target = *peer;
-  queue->address = un;
-  queue->address_len = un_len;
-  (void) GNUNET_CONTAINER_multihashmap_put (queue_map,
-                                           &queue->target,
-                                           queue,
-                                           
GNUET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-  GNUNET_STATISTICS_set (stats,
-                        "# UNIX queues active",
-                        GNUNET_CONTAINER_multipeermap_size (plugin->queue_map),
-                        GNUNET_NO);
-  queue->timeout = GNUNET_SCHEDULER_add_delayed 
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
-                                                &queue_timeout,
-                                                queue);
-  queue->mq
-    = GNUNET_MQ_queue_for_callbacks (&mq_send,
-                                    &mq_destroy,
-                                    &mq_cancel,
-                                    queue,
-                                    NULL,
-                                    &mq_error,
-                                    queue);
-  queue->qh
-    = GNUNET_TRANSPORT_communicator_mq_add (ch,
-                                           &queue->target,
-                                           address,
-                                           ATS,
-                                           queue->mq);
+  queue = setup_queue (peer,
+                      un,
+                      un_len);
+  GNUNET_free (un);
+  if (NULL == queue)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+               "Failed to setup queue to %s at `%s'\n",
+               GNUNET_i2s (peer),              
+               path);
+    return GNUNET_NO;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Iterator over all message queues to clean up.
+ *
+ * @param cls NULL
+ * @param target unused
+ * @param value the queue to destroy
+ * @return #GNUNET_OK to continue to iterate
+ */
+static int
+get_queue_delete_it (void *cls,
+                    const struct GNUNET_PeerIdentity *target,
+                    void *value)
+{
+  struct Queue *queue = value;
+
+  (void) cls;
+  (void) target;
+  queue_destroy (queue);
   return GNUNET_OK;
 }
 
@@ -971,22 +922,6 @@ mq_init (void *cls,
 static void
 do_shutdown (void *cls)
 {
-  struct UNIXMessageWrapper *msgw;
-
-  while (NULL != (msgw = msg_head))
-  {
-    GNUNET_CONTAINER_DLL_remove (msg_head,
-                                 msg_tail,
-                                 msgw);
-    queue = msgw->queue;
-    queue->msgs_in_queue--;
-    GNUNET_assert (queue->bytes_in_queue >= msgw->msgsize);
-    queue->bytes_in_queue -= msgw->msgsize;
-    GNUNET_assert (bytes_in_queue >= msgw->msgsize);
-    bytes_in_queue -= msgw->msgsize;
-    GNUNET_free (msgw->msg);
-    GNUNET_free (msgw);
-  }
   if (NULL != read_task)
   {
     GNUNET_SCHEDULER_cancel (read_task);
@@ -1017,6 +952,12 @@ do_shutdown (void *cls)
     GNUNET_TRANSPORT_communicator_disconnect (ch);
     ch = NULL;
   }
+  if (NULL != stats)
+  {
+    GNUNET_STATISTICS_destroy (stats,
+                              GNUNET_NO);
+    stats = NULL;
+  }
 }
 
 
@@ -1035,7 +976,6 @@ run (void *cls,
      const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
   char *unix_socket_path;
-  int is_abstract;
   struct sockaddr_un *un;
   socklen_t un_len;
   char *my_addr;
@@ -1059,18 +999,16 @@ run (void *cls,
                                             &max_queue_length))
     max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
     
-  
-  /* Initialize my flags */
-  is_abstract = 0;
-#ifdef LINUX
-  is_abstract
-    = GNUNET_CONFIGURATION_get_value_yesno (cfg,
-                                           "testing",
-                                           "USE_ABSTRACT_SOCKETS");
-#endif
   un = unix_address_to_sockaddr (unix_socket_path,
-                                 &un_len,
-                                is_abstract);
+                                 &un_len);
+  if (NULL == un)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+               "Failed to setup UNIX domain socket address with path `%s'\n",
+               unix_socket_path);
+    GNUNET_free (unix_socket_path);
+    return;
+  }
   unix_sock = GNUNET_NETWORK_socket_create (AF_UNIX,
                                            SOCK_DGRAM,
                                            0);
@@ -1086,9 +1024,9 @@ run (void *cls,
        (GNUNET_OK !=
        GNUNET_DISK_directory_create_for_file (un->sun_path)) )
   {
-    LOG (GNUNET_ERROR_TYPE_ERROR,
-        _("Cannot create path to `%s'\n"),
-        un->sun_path);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+               _("Cannot create path to `%s'\n"),
+               un->sun_path);
     GNUNET_NETWORK_socket_close (unix_sock);
     unix_sock = NULL;
     GNUNET_free (un);
@@ -1100,11 +1038,9 @@ run (void *cls,
                                   (const struct sockaddr *) un,
                                  un_len))
   {
-    GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
-                        "bind");
-    LOG (GNUNET_ERROR_TYPE_ERROR,
-        _("Cannot bind to `%s'\n"),
-        un->sun_path);
+    GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR,
+                             "bind",
+                             un->sun_path);
     GNUNET_NETWORK_socket_close (unix_sock);
     unix_sock = NULL;
     GNUNET_free (un);
@@ -1112,14 +1048,16 @@ run (void *cls,
     return;
   }
   GNUNET_free (un);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Bound to `%s'\n",
-       unix_socket_path);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Bound to `%s'\n",
+             unix_socket_path);
+  stats = GNUNET_STATISTICS_create ("C-UNIX",
+                                   cfg);
   GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
                                 NULL);
   read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
                                             unix_sock,
-                                            &unix_plugin_select_read,
+                                            &select_read_cb,
                                             NULL);
   queue_map = GNUNET_CONTAINER_multipeermap_create (10,
                                                      GNUNET_NO);
@@ -1136,10 +1074,9 @@ run (void *cls,
     return;
   }
   GNUNET_asprintf (&my_addr,
-                  "%s-%s#%d",
+                  "%s-%s",
                   COMMUNICATOR_NAME,
-                  unix_socket_path,
-                  is_abstract);
+                  unix_socket_path);
   ai = GNUNET_TRANSPORT_communicator_address_add (ch,
                                                  my_addr,
                                                  GNUNET_ATS_NET_LOOPBACK,

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]