commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] [gnuradio] 07/17: blocks: added 'MTU' and 'tcp_no_dela


From: git
Subject: [Commit-gnuradio] [gnuradio] 07/17: blocks: added 'MTU' and 'tcp_no_delay' params for 'socket_pdu' (and GRC option), applied MTU (buffer size) to TCP/UDP send, separate TCP/UDP server endpoint resolvers for empty/0.0.0.0 Host param (listen on all interfaces) Whitespace clean-up.
Date: Mon, 31 Mar 2014 20:15:53 +0000 (UTC)

This is an automated email from the git hooks/post-receive script.

jcorgan pushed a commit to branch master
in repository gnuradio.

commit 8b8d7ee57ce4029411cde48810e2ec6eac7ae5f2
Author: Balint Seeber <address@hidden>
Date:   Thu Mar 27 01:27:33 2014 -0700

    blocks: added 'MTU' and 'tcp_no_delay' params for 'socket_pdu' (and GRC 
option), applied MTU (buffer size) to TCP/UDP send, separate TCP/UDP server 
endpoint resolvers for empty/0.0.0.0 Host param (listen on all interfaces)
    Whitespace clean-up.
---
 gr-blocks/grc/blocks_socket_pdu.xml            |  28 ++++-
 gr-blocks/include/gnuradio/blocks/socket_pdu.h |   3 +-
 gr-blocks/lib/socket_pdu_impl.cc               | 136 +++++++++++++++----------
 gr-blocks/lib/socket_pdu_impl.h                |   5 +-
 4 files changed, 113 insertions(+), 59 deletions(-)

diff --git a/gr-blocks/grc/blocks_socket_pdu.xml 
b/gr-blocks/grc/blocks_socket_pdu.xml
index 1e897cf..72dc381 100644
--- a/gr-blocks/grc/blocks_socket_pdu.xml
+++ b/gr-blocks/grc/blocks_socket_pdu.xml
@@ -8,7 +8,7 @@
   <name>Socket PDU</name>
   <key>blocks_socket_pdu</key>
   <import>from gnuradio import blocks</import>
-  <make>blocks.socket_pdu($type, $host, $port, $mtu)</make>
+  <make>blocks.socket_pdu($type, $host, $port, $mtu, $tcp_no_delay)</make>
   <param>
     <name>Type</name>
     <key>type</key>
@@ -49,6 +49,31 @@
     <value>10000</value>
     <type>int</type>
   </param>
+  <param>
+    <name>TCP No Delay</name>
+    <key>tcp_no_delay</key>
+    <value>False</value>
+    <type>enum</type>
+    <hide>
+#if (($type() == '"TCP_CLIENT"') or ($type() == '"TCP_SERVER"'))
+#if (str($tcp_no_delay()) == 'False')
+part
+#else
+none
+#end if
+#else
+all
+#end if
+</hide>
+    <option>
+      <name>Enabled</name>
+      <key>True</key>
+    </option>
+    <option>
+      <name>Disabled</name>
+      <key>False</key>
+    </option>
+  </param>
   <sink>
     <name>pdus</name>
     <type>message</type>
@@ -59,4 +84,5 @@
     <type>message</type>
     <optional>1</optional>
   </source>
+  <doc>For server modes, leave Host blank to bind to all interfaces 
(equivalent to 0.0.0.0).</doc>
 </block>
diff --git a/gr-blocks/include/gnuradio/blocks/socket_pdu.h 
b/gr-blocks/include/gnuradio/blocks/socket_pdu.h
index 82a7632..31468a3 100644
--- a/gr-blocks/include/gnuradio/blocks/socket_pdu.h
+++ b/gr-blocks/include/gnuradio/blocks/socket_pdu.h
@@ -45,8 +45,9 @@ namespace gr {
        * \param addr network address to use
        * \param port network port to use
        * \param MTU maximum transmission unit
+       * \param tcp_no_delay TCP No Delay option (set to True to disable Nagle 
algorithm)
        */
-      static sptr make(std::string type, std::string addr, std::string port, 
int MTU=10000);
+      static sptr make(std::string type, std::string addr, std::string port, 
int MTU=10000, bool tcp_no_delay=false);
     };
 
   } /* namespace blocks */
diff --git a/gr-blocks/lib/socket_pdu_impl.cc b/gr-blocks/lib/socket_pdu_impl.cc
index 9daf8c3..3e483fb 100644
--- a/gr-blocks/lib/socket_pdu_impl.cc
+++ b/gr-blocks/lib/socket_pdu_impl.cc
@@ -33,41 +33,56 @@ namespace gr {
   namespace blocks {
 
     socket_pdu::sptr
-    socket_pdu::make(std::string type, std::string addr, std::string port, int 
MTU)
+    socket_pdu::make(std::string type, std::string addr, std::string port, int 
MTU/*= 10000*/, bool tcp_no_delay/*= false*/)
     {
-      return gnuradio::get_initial_sptr(new socket_pdu_impl(type, addr, port, 
MTU));
+      return gnuradio::get_initial_sptr(new socket_pdu_impl(type, addr, port, 
MTU, tcp_no_delay));
     }
 
-    socket_pdu_impl::socket_pdu_impl(std::string type, std::string addr, 
std::string port, int MTU)
-      :        block("socket_pdu",
-                io_signature::make (0, 0, 0),
-                io_signature::make (0, 0, 0))
+    socket_pdu_impl::socket_pdu_impl(std::string type, std::string addr, 
std::string port, int MTU/*= 10000*/, bool tcp_no_delay/*= false*/)
+      : block("socket_pdu",
+          io_signature::make (0, 0, 0),
+          io_signature::make (0, 0, 0)),
+      d_tcp_no_delay(tcp_no_delay)
     {
+      d_rxbuf.resize(MTU);
+      
       message_port_register_in(PDU_PORT_ID);
       message_port_register_out(PDU_PORT_ID);
 
-      if ((type == "TCP_SERVER") || (type == "TCP_CLIENT")) {
+      if ((type == "TCP_SERVER") && ((addr.empty()) || (addr == "0.0.0.0"))) { 
 // Bind on all interfaces
+        int port_num = atoi(port.c_str());
+        if (port_num == 0)
+          throw std::invalid_argument("gr::blocks:socket_pdu: invalid port for 
TCP_SERVER");
+        d_tcp_endpoint = 
boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port_num);
+      }
+      else if ((type == "TCP_SERVER") || (type == "TCP_CLIENT")) {
         boost::asio::ip::tcp::resolver resolver(d_io_service);
         boost::asio::ip::tcp::resolver::query 
query(boost::asio::ip::tcp::v4(), addr, port);
-       d_tcp_endpoint = *resolver.resolve(query);
+        d_tcp_endpoint = *resolver.resolve(query);
       }
-
-      if ((type == "UDP_SERVER") || (type == "UDP_CLIENT")) {
-       boost::asio::ip::udp::resolver resolver(d_io_service);
+      else if ((type == "UDP_SERVER") && ((addr.empty()) || (addr == 
"0.0.0.0"))) {  // Bind on all interfaces
+        int port_num = atoi(port.c_str());
+        if (port_num == 0)
+          throw std::invalid_argument("gr::blocks:socket_pdu: invalid port for 
UDP_SERVER");
+        d_udp_endpoint = 
boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port_num);
+      }
+      else if ((type == "UDP_SERVER") || (type == "UDP_CLIENT")) {
+        boost::asio::ip::udp::resolver resolver(d_io_service);
         boost::asio::ip::udp::resolver::query 
query(boost::asio::ip::udp::v4(), addr, port);
 
         if (type == "UDP_SERVER")
-         d_udp_endpoint = *resolver.resolve(query);
+          d_udp_endpoint = *resolver.resolve(query);
         else
-         d_udp_endpoint_other = *resolver.resolve(query);
+          d_udp_endpoint_other = *resolver.resolve(query);
       }
 
       if (type == "TCP_SERVER") {
         d_acceptor_tcp.reset(new boost::asio::ip::tcp::acceptor(d_io_service, 
d_tcp_endpoint));
         
d_acceptor_tcp->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
+        
         start_tcp_accept();
+        
         set_msg_handler(PDU_PORT_ID, 
boost::bind(&socket_pdu_impl::tcp_server_send, this, _1));
-
       }
       else if (type =="TCP_CLIENT") {
         boost::system::error_code error = boost::asio::error::host_not_found;
@@ -75,34 +90,35 @@ namespace gr {
         d_tcp_socket->connect(d_tcp_endpoint, error);
         if (error)
             throw boost::system::system_error(error);
+        
d_tcp_socket->set_option(boost::asio::ip::tcp::no_delay(d_tcp_no_delay));
 
         set_msg_handler(PDU_PORT_ID, 
boost::bind(&socket_pdu_impl::tcp_client_send, this, _1));
 
-        d_tcp_socket->async_read_some(
-         boost::asio::buffer(d_rxbuf),
-         boost::bind(&socket_pdu_impl::handle_tcp_read, this, 
-                      boost::asio::placeholders::error, 
-                      boost::asio::placeholders::bytes_transferred)
-       );
+        d_tcp_socket->async_read_some(boost::asio::buffer(d_rxbuf),
+          boost::bind(&socket_pdu_impl::handle_tcp_read, this, 
+            boost::asio::placeholders::error, 
+            boost::asio::placeholders::bytes_transferred));
       }
       else if (type =="UDP_SERVER") {
         d_udp_socket.reset(new boost::asio::ip::udp::socket(d_io_service, 
d_udp_endpoint));
         d_udp_socket->async_receive_from(boost::asio::buffer(d_rxbuf), 
d_udp_endpoint_other, 
-                                        
boost::bind(&socket_pdu_impl::handle_udp_read, this,
-                                                    
boost::asio::placeholders::error,
-                                                    
boost::asio::placeholders::bytes_transferred));
+          boost::bind(&socket_pdu_impl::handle_udp_read, this,
+            boost::asio::placeholders::error,
+            boost::asio::placeholders::bytes_transferred));
+        
         set_msg_handler(PDU_PORT_ID, boost::bind(&socket_pdu_impl::udp_send, 
this, _1));
       }
       else if (type =="UDP_CLIENT") {
         d_udp_socket.reset(new boost::asio::ip::udp::socket(d_io_service, 
d_udp_endpoint));
         d_udp_socket->async_receive_from(boost::asio::buffer(d_rxbuf), 
d_udp_endpoint_other, 
-                                        
boost::bind(&socket_pdu_impl::handle_udp_read, this,
-                                                    
boost::asio::placeholders::error,
-                                                    
boost::asio::placeholders::bytes_transferred)); 
+          boost::bind(&socket_pdu_impl::handle_udp_read, this,
+            boost::asio::placeholders::error,
+            boost::asio::placeholders::bytes_transferred));
+        
         set_msg_handler(PDU_PORT_ID, boost::bind(&socket_pdu_impl::udp_send, 
this, _1));
       }
       else
-       throw std::runtime_error("gr::blocks:socket_pdu: unknown socket type");
+        throw std::runtime_error("gr::blocks:socket_pdu: unknown socket type");
 
       d_thread = 
gr::thread::thread(boost::bind(&socket_pdu_impl::run_io_service, this));
       d_started = true;
@@ -112,14 +128,14 @@ namespace gr {
     socket_pdu_impl::handle_tcp_read(const boost::system::error_code& error, 
size_t bytes_transferred)
     {
       if (!error) {
-       pmt::pmt_t vector = pmt::init_u8vector(bytes_transferred, (const 
uint8_t *)&d_rxbuf[0]);
-       pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector);
-       message_port_pub(PDU_PORT_ID, pdu);
-
-       d_tcp_socket->async_read_some(boost::asio::buffer(d_rxbuf),
-                                     
boost::bind(&socket_pdu_impl::handle_tcp_read, this,
-                                                 
boost::asio::placeholders::error,
-                                                 
boost::asio::placeholders::bytes_transferred));
+        pmt::pmt_t vector = pmt::init_u8vector(bytes_transferred, (const 
uint8_t *)&d_rxbuf[0]);
+        pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector);
+        message_port_pub(PDU_PORT_ID, pdu);
+
+        d_tcp_socket->async_read_some(boost::asio::buffer(d_rxbuf),
+          boost::bind(&socket_pdu_impl::handle_tcp_read, this,
+            boost::asio::placeholders::error,
+            boost::asio::placeholders::bytes_transferred));
       }
       else
         throw boost::system::system_error(error);
@@ -128,11 +144,11 @@ namespace gr {
     void
     socket_pdu_impl::start_tcp_accept()
     {
-      tcp_connection::sptr new_connection = 
tcp_connection::make(d_acceptor_tcp->get_io_service());
+      tcp_connection::sptr new_connection = 
tcp_connection::make(d_acceptor_tcp->get_io_service(), d_rxbuf.size(), 
d_tcp_no_delay);
 
       d_acceptor_tcp->async_accept(new_connection->socket(),
-                                  
boost::bind(&socket_pdu_impl::handle_tcp_accept, this, 
-                                              new_connection, 
boost::asio::placeholders::error));
+        boost::bind(&socket_pdu_impl::handle_tcp_accept, this, 
+          new_connection, boost::asio::placeholders::error));
     }
 
     void
@@ -147,12 +163,12 @@ namespace gr {
     socket_pdu_impl::handle_tcp_accept(tcp_connection::sptr new_connection, 
const boost::system::error_code& error)
     {
       if (!error) {
-       new_connection->start(this);
-       d_tcp_connections.push_back(new_connection);
-       start_tcp_accept();
+        new_connection->start(this);
+        d_tcp_connections.push_back(new_connection);
+        start_tcp_accept();
       }
       else
-       std::cout << error << std::endl;
+        std::cout << error << std::endl;
     }
 
     void
@@ -160,22 +176,32 @@ namespace gr {
     {
       pmt::pmt_t vector = pmt::cdr(msg);
       size_t len = pmt::length(vector);
-      size_t offset(0);
-      boost::array<char, 10000> txbuf;
-      memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset), len);
-      d_tcp_socket->send(boost::asio::buffer(txbuf,len));
+      size_t offset = 0;
+      std::vector<char> txbuf(std::min(len, d_rxbuf.size()));
+      while (offset < len) {
+        size_t send_len = std::min((len - offset), txbuf.size());
+        memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset), 
send_len);
+        offset += send_len;
+        d_tcp_socket->send(boost::asio::buffer(txbuf, send_len));
+      }
     }
 
     void
     socket_pdu_impl::udp_send(pmt::pmt_t msg)
     {
+      if (d_udp_endpoint_other.address().to_string() == "0.0.0.0")
+        return;
+      
       pmt::pmt_t vector = pmt::cdr(msg);
       size_t len = pmt::length(vector);
-      size_t offset(0);
-      boost::array<char, 10000> txbuf;
-      memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset), len);
-      if (d_udp_endpoint_other.address().to_string() != "0.0.0.0")
-        d_udp_socket->send_to(boost::asio::buffer(txbuf,len), 
d_udp_endpoint_other);
+      size_t offset = 0;
+      std::vector<char> txbuf(std::min(len, d_rxbuf.size()));
+      while (offset < len) {
+        size_t send_len = std::min((len - offset), txbuf.size());
+        memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset), 
send_len);
+        offset += send_len;
+        d_udp_socket->send_to(boost::asio::buffer(txbuf, send_len), 
d_udp_endpoint_other);
+      }
     }
 
     void
@@ -183,14 +209,14 @@ namespace gr {
     {
       if (!error) {
         pmt::pmt_t vector = pmt::init_u8vector(bytes_transferred, (const 
uint8_t*)&d_rxbuf[0]);
-        pmt::pmt_t pdu = pmt::cons( pmt::PMT_NIL, vector);
+        pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector);
         
         message_port_pub(PDU_PORT_ID, pdu);
     
         d_udp_socket->async_receive_from(boost::asio::buffer(d_rxbuf), 
d_udp_endpoint_other,
-                                        
boost::bind(&socket_pdu_impl::handle_udp_read, this,
-                                                    
boost::asio::placeholders::error,
-                                                    
boost::asio::placeholders::bytes_transferred));
+          boost::bind(&socket_pdu_impl::handle_udp_read, this,
+            boost::asio::placeholders::error,
+            boost::asio::placeholders::bytes_transferred));
       } 
     }
 
diff --git a/gr-blocks/lib/socket_pdu_impl.h b/gr-blocks/lib/socket_pdu_impl.h
index 3099d90..2d5bc33 100644
--- a/gr-blocks/lib/socket_pdu_impl.h
+++ b/gr-blocks/lib/socket_pdu_impl.h
@@ -34,13 +34,14 @@ namespace gr {
     {
     private:
       boost::asio::io_service d_io_service;
-      boost::array<char, 10000> d_rxbuf;
+      std::vector<char> d_rxbuf;
       void run_io_service() { d_io_service.run(); }
 
       // TCP specific
       boost::asio::ip::tcp::endpoint d_tcp_endpoint;
       std::vector<tcp_connection::sptr> d_tcp_connections;
       void handle_tcp_read(const boost::system::error_code& error, size_t 
bytes_transferred);
+      bool d_tcp_no_delay;
 
       // TCP server specific
       boost::shared_ptr<boost::asio::ip::tcp::acceptor> d_acceptor_tcp;
@@ -60,7 +61,7 @@ namespace gr {
       void udp_send(pmt::pmt_t msg);
     
     public:
-      socket_pdu_impl(std::string type, std::string addr, std::string port, 
int MTU);
+      socket_pdu_impl(std::string type, std::string addr, std::string port, 
int MTU = 10000, bool tcp_no_delay = false);
     };
 
   } /* namespace blocks */



reply via email to

[Prev in Thread] Current Thread [Next in Thread]