guile-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Guile-commits] 23/23: virtualize read/write/close operations in <eport>


From: Andy Wingo
Subject: [Guile-commits] 23/23: virtualize read/write/close operations in <eport>
Date: Thu, 24 Mar 2016 14:26:04 +0000

wingo pushed a commit to branch wip-ethreads
in repository guile.

commit 4dc952f186e4c0603e182e51a86423a3a7f7d8e3
Author: Andy Wingo <address@hidden>
Date:   Fri Apr 6 19:44:00 2012 -0700

    virtualize read/write/close operations in <eport>
    
    * module/ice-9/eports.scm (<eport>): Rework to virtualize the operations
      that actually read and write data, and those that close the underlying
      resource.
      (<file-data>, <file-eport>): The nio implementation.
      (make-eport): New export.
      (file-eport-fd, file-eport-port): Renamed to add the "file-" prefix.
    
    * examples/ethreads/memcached-client.scm:
    * examples/ethreads/memcached-server.scm:
    * module/ice-9/ethreads.scm:
    * module/web/server/ethreads.scm: Adapt callers (mostly for eport-fd).
---
 examples/ethreads/memcached-client.scm |    4 +-
 examples/ethreads/memcached-server.scm |    2 +-
 module/ice-9/eports.scm                |  207 ++++++++++++++++++++------------
 module/ice-9/ethreads.scm              |    2 +-
 module/web/server/ethreads.scm         |    4 +-
 5 files changed, 134 insertions(+), 85 deletions(-)

diff --git a/examples/ethreads/memcached-client.scm 
b/examples/ethreads/memcached-client.scm
index bf9752b..7302bc2 100644
--- a/examples/ethreads/memcached-client.scm
+++ b/examples/ethreads/memcached-client.scm
@@ -115,7 +115,7 @@
                                          (addrinfo:socktype addrinfo)
                                          (addrinfo:protocol addrinfo)))))
     ;; Disable Nagle's algorithm.  We buffer ourselves.
-    (setsockopt (eport-fd eport) IPPROTO_TCP TCP_NODELAY 0)
+    (setsockopt (file-eport-fd eport) IPPROTO_TCP TCP_NODELAY 0)
     (connect-eport eport (addrinfo:addr addrinfo))
     eport))
 
@@ -141,7 +141,7 @@
   (when (zero? *active-clients*)
     (exit 0)))
 
-(define (run-memcached-test num-clients num-connections)
+(define* (run-memcached-test #:optional (num-clients 100) (num-connections 
1000))
   ;; The getaddrinfo call blocks, unfortunately.  Call it once before
   ;; spawning clients.
   (let ((addrinfo (car (getaddrinfo "localhost" (number->string 11211)))))
diff --git a/examples/ethreads/memcached-server.scm 
b/examples/ethreads/memcached-server.scm
index a5e28a8..164939b 100644
--- a/examples/ethreads/memcached-server.scm
+++ b/examples/ethreads/memcached-server.scm
@@ -134,7 +134,7 @@
   (let loop ()
     (let ((client (accept-eport esocket)))
       ;; Disable Nagle's algorithm.  We buffer ourselves.
-      (setsockopt (eport-fd client) IPPROTO_TCP TCP_NODELAY 0)
+      (setsockopt (file-eport-fd client) IPPROTO_TCP TCP_NODELAY 0)
       (spawn (lambda () (client-loop client store)))
       (loop))))
 
diff --git a/module/ice-9/eports.scm b/module/ice-9/eports.scm
index 5b8231d..8c7b60e 100644
--- a/module/ice-9/eports.scm
+++ b/module/ice-9/eports.scm
@@ -23,19 +23,14 @@
   #:use-module (ice-9 nio)
   #:export (;; EPorts: ports that suspend when they would block.
             eport?
-            eport-fd
-            eport-file-port
-            fdes->eport
-            file-port->eport
-            drain-output
+            make-eport
             close-eport
 
+            drain-output
+
             current-read-waiter
             current-write-waiter
 
-            accept-eport
-            connect-eport
-
             get-u8
             putback-u8
             lookahead-u8
@@ -59,15 +54,39 @@
             put-latin1-string
 
             put-utf8-char
-            put-utf8-string))
+            put-utf8-string
+
+            ;; The main implementation of eports uses nonblocking file
+            ;; descriptors.
+            <file-eport>
+            file-eport-fd
+            file-eport-port
+            fdes->eport
+            file-port->eport
+            accept-eport
+            connect-eport))
+
+(define-record-type <io-vtable>
+  (make-io-vtable read write close)
+  io-vtable?
+  (read io-vtable-read)
+  (write io-vtable-write)
+  (close io-vtable-close))
 
 (define-record-type <eport>
-  (make-eport fd readbuf writebuf file-port)
+  (%make-eport vtable data readbuf writebuf)
   eport?
-  (fd eport-fd set-eport-fd!)
+  (vtable eport-vtable)
+  (data eport-data set-eport-data!)
   (readbuf eport-readbuf set-eport-readbuf!)
-  (writebuf eport-writebuf set-eport-writebuf!)
-  (file-port eport-file-port))
+  (writebuf eport-writebuf set-eport-writebuf!))
+
+(define (eport:read eport bv start end)
+  ((io-vtable-read (eport-vtable eport)) (eport-data eport) bv start end))
+(define (eport:write eport bv start end)
+  ((io-vtable-write (eport-vtable eport)) (eport-data eport) bv start end))
+(define (eport:close eport)
+  ((io-vtable-close (eport-vtable eport)) (eport-data eport)))
 
 (define (default-read-waiter eport)
   (error "read would block" eport))
@@ -119,68 +138,27 @@
       (error "flushing too many bytes" buf n))
     (set-buf-cur! buf (+ cur n))))
 
-;; Create an NIO port that wraps FD.  The strange default sizes assume
-;; that the memory is allocated inline to the bytevector, and thus has a
-;; 12- or 24-byte header, and so they will have a total size of 500 and
-;; 1012 or 512 and 1024, respectively.  The collector might do better
-;; with sizes like these.
+;; Create a new eport.  The strange default sizes assume that the memory
+;; is allocated inline to the bytevector, and thus has a 12- or 24-byte
+;; header, and so they will have a total size of 500 and 1012 or 512 and
+;; 1024, respectively.  The collector might do better with sizes like
+;; these.
 ;;
-(define* (fdes->eport fd #:key readable? writable?
-                      (read-buffer-size 488)
-                      (write-buffer-size 1000)
-                      file-port)
-  (let ((eport
-         (make-eport
-          fd
-          (and readable? (make-fresh-buf read-buffer-size))
-          (and writable? (make-fresh-buf write-buffer-size))
-          file-port)))
-    (when file-port
-      (setvbuf file-port _IONBF))
-    (fcntl fd F_SETFL O_NONBLOCK)
-    eport))
-
-(define* (file-port->eport file-port)
-  (fdes->eport (fileno file-port)
-               #:readable? (input-port? file-port)
-               #:writable? (output-port? file-port)
-               #:file-port file-port))
+(define* (make-eport vtable data #:key
+                     (read-buffer-size 488)
+                     (write-buffer-size 1000))
+  (%make-eport vtable data
+               (and read-buffer-size (make-fresh-buf read-buffer-size))
+               (and write-buffer-size (make-fresh-buf write-buffer-size))))
 
 (define* (close-eport eport #:key (drain-output? #t))
-  (let ((fd (eport-fd eport)))
-    (when fd
-      (when drain-output?
-        (drain-output eport))
-      (set-eport-fd! eport #f)
-      (set-eport-readbuf! eport #f)
-      (set-eport-writebuf! eport #f)
-      (cond
-       ((eport-file-port eport) => close-port)
-       (else (close-fdes fd))))))
-
-;; Accept a new connection on EPORT, an eport that wraps a
-;; listening socket.  Returns two values: an eport for the new
-;; connection, and the sockaddr.
-;;
-(define (accept-eport eport)
-  (let ((pair (nio-accept (eport-fd eport))))
-    (if pair
-        (values (fdes->eport (car pair) #:readable? #t #:writable? #t)
-                (cdr pair))
-        (begin
-          (wait-for-readable eport)
-          (accept-eport eport)))))
-
-;; Connect a socket eport to the remote host at SOCKADDR.  Returns no
-;; values.
-;;
-(define (connect-eport eport sockaddr)
-  (unless (nio-connect (eport-fd eport) sockaddr)
-    (wait-for-writable eport)
-    (let ((err (getsockopt (eport-fd eport) SOL_SOCKET SO_ERROR)))
-      (unless (zero? err)
-        (scm-error 'system-error "connect-eport" "~A"
-                   (list (strerror err)) #f)))))
+  (when (eport-data eport)
+    (when drain-output?
+      (drain-output eport))
+    (eport:close eport)
+    (set-eport-data! eport #f)
+    (set-eport-readbuf! eport #f)
+    (set-eport-writebuf! eport #f)))
 
 ;; Ensure that there are readable bytes in the buffer, or that the
 ;; buffer is at EOF.  Returns the actual number of available bytes.
@@ -193,7 +171,7 @@
       (error "fill-input should only be called when the readbuf is empty"))
     (set-buf-cur! buf 0)
     (set-buf-end! buf 0) ; in case nio-read throws an error
-    (let ((rv (nio-read (eport-fd eport) bv 0 len)))
+    (let ((rv (eport:read eport bv 0 len)))
       (if (< rv 0)
           (begin
             (wait-for-readable eport)
@@ -212,8 +190,7 @@
       (let ((cur (buf-cur buf))
             (end (buf-end buf)))
         (when (< cur end)
-          (let ((written (nio-write (eport-fd eport)
-                                    bv cur (- end cur))))
+          (let ((written (eport:write eport bv cur (- end cur))))
             (flush-buffer buf written)
             (when (< written (- end cur))
               (wait-for-writable eport)
@@ -248,8 +225,7 @@
               ;; The buffer is more than half full; write some data and
               ;; try again.
               (begin
-                (let ((written (nio-write (eport-fd eport)
-                                         bv cur (- end cur))))
+                (let ((written (eport:write eport bv cur (- end cur))))
                  (flush-buffer buf written)
                  (when (< written (- end cur))
                    (wait-for-writable eport)
@@ -466,8 +442,7 @@
          ;; and write from BV directly, without copying.
          (else
           (drain-output eport)
-          (let ((written (nio-write (eport-fd eport)
-                                    bv start count)))
+          (let ((written (eport:write eport bv start count)))
             (when (< written count)
               (wait-for-writable eport)
               (lp (+ start written) (- count written))))))))))
@@ -574,3 +549,77 @@
 
 (define (put-utf8-string eport str)
   (put-bytevector eport (string->utf8 str)))
+
+
+
+
+;;;
+;;; File ports
+;;;
+
+(define-record-type <file-data>
+  (make-file-data fd port)
+  file-data?
+  (fd file-data-fd)
+  (port file-data-port))
+
+(define <file-eport>
+  (let ()
+    (define (fd-read data bv start count)
+      (nio-read (file-data-fd data) bv start count))
+    (define (fd-write data bv start count)
+      (nio-write (file-data-fd data) bv start count))
+    (define (fd-close data)
+      (cond
+       ((file-data-port data) => close-port)
+       (else (close-fdes (file-data-fd data)))))
+    (make-io-vtable fd-read fd-write fd-close)))
+
+(define* (fdes->eport fd #:key readable? writable?
+                      (read-buffer-size 488)
+                      (write-buffer-size 1000)
+                      file-port)
+  (let ((eport
+         (make-eport <file-eport> (make-file-data fd file-port)
+                     #:read-buffer-size (and readable? read-buffer-size)
+                     #:write-buffer-size (and writable? write-buffer-size))))
+    (when file-port
+      (setvbuf file-port _IONBF))
+    (fcntl fd F_SETFL O_NONBLOCK)
+    eport))
+
+(define* (file-port->eport file-port)
+  (fdes->eport (fileno file-port)
+               #:readable? (input-port? file-port)
+               #:writable? (output-port? file-port)
+               #:file-port file-port))
+
+(define (file-eport-fd eport)
+  (file-data-fd (eport-data eport)))
+
+(define (file-eport-port eport)
+  (file-data-port (eport-data eport)))
+
+;; Accept a new connection on EPORT, an eport that wraps a
+;; listening socket.  Returns two values: an eport for the new
+;; connection, and the sockaddr.
+;;
+(define (accept-eport eport)
+  (let ((pair (nio-accept (file-eport-fd eport))))
+    (if pair
+        (values (fdes->eport (car pair) #:readable? #t #:writable? #t)
+                (cdr pair))
+        (begin
+          (wait-for-readable eport)
+          (accept-eport eport)))))
+
+;; Connect a socket eport to the remote host at SOCKADDR.  Returns no
+;; values.
+;;
+(define (connect-eport eport sockaddr)
+  (unless (nio-connect (file-eport-fd eport) sockaddr)
+    (wait-for-writable eport)
+    (let ((err (getsockopt (file-eport-fd eport) SOL_SOCKET SO_ERROR)))
+      (unless (zero? err)
+        (scm-error 'system-error "connect-eport" "~A"
+                   (list (strerror err)) #f)))))
diff --git a/module/ice-9/ethreads.scm b/module/ice-9/ethreads.scm
index 5016540..2c2c974 100644
--- a/module/ice-9/ethreads.scm
+++ b/module/ice-9/ethreads.scm
@@ -229,7 +229,7 @@
    events
    (suspend
     (lambda (ctx thread)
-      (let* ((fd (eport-fd eport))
+      (let* ((fd (file-eport-fd eport))
              (sources (hashv-ref (econtext-sources ctx) fd)))
         (cond
          (sources
diff --git a/module/web/server/ethreads.scm b/module/web/server/ethreads.scm
index b803c01..bdcd26b 100644
--- a/module/web/server/ethreads.scm
+++ b/module/web/server/ethreads.scm
@@ -180,12 +180,12 @@
                      client-thread request body))
   (let loop ()
     (let ((client (accept-eport esocket)))
-      (setsockopt (eport-fd client) SOL_SOCKET SO_SNDBUF (* 12 1024))
+      (setsockopt (file-eport-fd client) SOL_SOCKET SO_SNDBUF (* 12 1024))
       ;; Always disable Nagle's algorithm, as we handle buffering
       ;; ourselves.  Ignore exceptions if it's not a TCP port, or
       ;; TCP_NODELAY is not defined on this platform.
       (false-if-exception
-       (setsockopt (eport-fd client) IPPROTO_TCP TCP_NODELAY 0))
+       (setsockopt (file-eport-fd client) IPPROTO_TCP TCP_NODELAY 0))
       (spawn (lambda () (client-loop client have-request)))
       (loop))))
 



reply via email to

[Prev in Thread] Current Thread [Next in Thread]