[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Guile-commits] 23/23: virtualize read/write/close operations in <eport>
From: |
Andy Wingo |
Subject: |
[Guile-commits] 23/23: virtualize read/write/close operations in <eport> |
Date: |
Thu, 24 Mar 2016 14:26:04 +0000 |
wingo pushed a commit to branch wip-ethreads
in repository guile.
commit 4dc952f186e4c0603e182e51a86423a3a7f7d8e3
Author: Andy Wingo <address@hidden>
Date: Fri Apr 6 19:44:00 2012 -0700
virtualize read/write/close operations in <eport>
* module/ice-9/eports.scm (<eport>): Rework to virtualize the operations
that actually read and write data, and those that close the underlying
resource.
(<file-data>, <file-eport>): The nio implementation.
(make-eport): New export.
(file-eport-fd, file-eport-port): Renamed to add the "file-" prefix.
* examples/ethreads/memcached-client.scm:
* examples/ethreads/memcached-server.scm:
* module/ice-9/ethreads.scm:
* module/web/server/ethreads.scm: Adapt callers (mostly for eport-fd).
---
examples/ethreads/memcached-client.scm | 4 +-
examples/ethreads/memcached-server.scm | 2 +-
module/ice-9/eports.scm | 207 ++++++++++++++++++++------------
module/ice-9/ethreads.scm | 2 +-
module/web/server/ethreads.scm | 4 +-
5 files changed, 134 insertions(+), 85 deletions(-)
diff --git a/examples/ethreads/memcached-client.scm
b/examples/ethreads/memcached-client.scm
index bf9752b..7302bc2 100644
--- a/examples/ethreads/memcached-client.scm
+++ b/examples/ethreads/memcached-client.scm
@@ -115,7 +115,7 @@
(addrinfo:socktype addrinfo)
(addrinfo:protocol addrinfo)))))
;; Disable Nagle's algorithm. We buffer ourselves.
- (setsockopt (eport-fd eport) IPPROTO_TCP TCP_NODELAY 0)
+ (setsockopt (file-eport-fd eport) IPPROTO_TCP TCP_NODELAY 0)
(connect-eport eport (addrinfo:addr addrinfo))
eport))
@@ -141,7 +141,7 @@
(when (zero? *active-clients*)
(exit 0)))
-(define (run-memcached-test num-clients num-connections)
+(define* (run-memcached-test #:optional (num-clients 100) (num-connections
1000))
;; The getaddrinfo call blocks, unfortunately. Call it once before
;; spawning clients.
(let ((addrinfo (car (getaddrinfo "localhost" (number->string 11211)))))
diff --git a/examples/ethreads/memcached-server.scm
b/examples/ethreads/memcached-server.scm
index a5e28a8..164939b 100644
--- a/examples/ethreads/memcached-server.scm
+++ b/examples/ethreads/memcached-server.scm
@@ -134,7 +134,7 @@
(let loop ()
(let ((client (accept-eport esocket)))
;; Disable Nagle's algorithm. We buffer ourselves.
- (setsockopt (eport-fd client) IPPROTO_TCP TCP_NODELAY 0)
+ (setsockopt (file-eport-fd client) IPPROTO_TCP TCP_NODELAY 0)
(spawn (lambda () (client-loop client store)))
(loop))))
diff --git a/module/ice-9/eports.scm b/module/ice-9/eports.scm
index 5b8231d..8c7b60e 100644
--- a/module/ice-9/eports.scm
+++ b/module/ice-9/eports.scm
@@ -23,19 +23,14 @@
#:use-module (ice-9 nio)
#:export (;; EPorts: ports that suspend when they would block.
eport?
- eport-fd
- eport-file-port
- fdes->eport
- file-port->eport
- drain-output
+ make-eport
close-eport
+ drain-output
+
current-read-waiter
current-write-waiter
- accept-eport
- connect-eport
-
get-u8
putback-u8
lookahead-u8
@@ -59,15 +54,39 @@
put-latin1-string
put-utf8-char
- put-utf8-string))
+ put-utf8-string
+
+ ;; The main implementation of eports uses nonblocking file
+ ;; descriptors.
+ <file-eport>
+ file-eport-fd
+ file-eport-port
+ fdes->eport
+ file-port->eport
+ accept-eport
+ connect-eport))
+
+(define-record-type <io-vtable>
+ (make-io-vtable read write close)
+ io-vtable?
+ (read io-vtable-read)
+ (write io-vtable-write)
+ (close io-vtable-close))
(define-record-type <eport>
- (make-eport fd readbuf writebuf file-port)
+ (%make-eport vtable data readbuf writebuf)
eport?
- (fd eport-fd set-eport-fd!)
+ (vtable eport-vtable)
+ (data eport-data set-eport-data!)
(readbuf eport-readbuf set-eport-readbuf!)
- (writebuf eport-writebuf set-eport-writebuf!)
- (file-port eport-file-port))
+ (writebuf eport-writebuf set-eport-writebuf!))
+
+(define (eport:read eport bv start end)
+ ((io-vtable-read (eport-vtable eport)) (eport-data eport) bv start end))
+(define (eport:write eport bv start end)
+ ((io-vtable-write (eport-vtable eport)) (eport-data eport) bv start end))
+(define (eport:close eport)
+ ((io-vtable-close (eport-vtable eport)) (eport-data eport)))
(define (default-read-waiter eport)
(error "read would block" eport))
@@ -119,68 +138,27 @@
(error "flushing too many bytes" buf n))
(set-buf-cur! buf (+ cur n))))
-;; Create an NIO port that wraps FD. The strange default sizes assume
-;; that the memory is allocated inline to the bytevector, and thus has a
-;; 12- or 24-byte header, and so they will have a total size of 500 and
-;; 1012 or 512 and 1024, respectively. The collector might do better
-;; with sizes like these.
+;; Create a new eport. The strange default sizes assume that the memory
+;; is allocated inline to the bytevector, and thus has a 12- or 24-byte
+;; header, and so they will have a total size of 500 and 1012 or 512 and
+;; 1024, respectively. The collector might do better with sizes like
+;; these.
;;
-(define* (fdes->eport fd #:key readable? writable?
- (read-buffer-size 488)
- (write-buffer-size 1000)
- file-port)
- (let ((eport
- (make-eport
- fd
- (and readable? (make-fresh-buf read-buffer-size))
- (and writable? (make-fresh-buf write-buffer-size))
- file-port)))
- (when file-port
- (setvbuf file-port _IONBF))
- (fcntl fd F_SETFL O_NONBLOCK)
- eport))
-
-(define* (file-port->eport file-port)
- (fdes->eport (fileno file-port)
- #:readable? (input-port? file-port)
- #:writable? (output-port? file-port)
- #:file-port file-port))
+(define* (make-eport vtable data #:key
+ (read-buffer-size 488)
+ (write-buffer-size 1000))
+ (%make-eport vtable data
+ (and read-buffer-size (make-fresh-buf read-buffer-size))
+ (and write-buffer-size (make-fresh-buf write-buffer-size))))
(define* (close-eport eport #:key (drain-output? #t))
- (let ((fd (eport-fd eport)))
- (when fd
- (when drain-output?
- (drain-output eport))
- (set-eport-fd! eport #f)
- (set-eport-readbuf! eport #f)
- (set-eport-writebuf! eport #f)
- (cond
- ((eport-file-port eport) => close-port)
- (else (close-fdes fd))))))
-
-;; Accept a new connection on EPORT, an eport that wraps a
-;; listening socket. Returns two values: an eport for the new
-;; connection, and the sockaddr.
-;;
-(define (accept-eport eport)
- (let ((pair (nio-accept (eport-fd eport))))
- (if pair
- (values (fdes->eport (car pair) #:readable? #t #:writable? #t)
- (cdr pair))
- (begin
- (wait-for-readable eport)
- (accept-eport eport)))))
-
-;; Connect a socket eport to the remote host at SOCKADDR. Returns no
-;; values.
-;;
-(define (connect-eport eport sockaddr)
- (unless (nio-connect (eport-fd eport) sockaddr)
- (wait-for-writable eport)
- (let ((err (getsockopt (eport-fd eport) SOL_SOCKET SO_ERROR)))
- (unless (zero? err)
- (scm-error 'system-error "connect-eport" "~A"
- (list (strerror err)) #f)))))
+ (when (eport-data eport)
+ (when drain-output?
+ (drain-output eport))
+ (eport:close eport)
+ (set-eport-data! eport #f)
+ (set-eport-readbuf! eport #f)
+ (set-eport-writebuf! eport #f)))
;; Ensure that there are readable bytes in the buffer, or that the
;; buffer is at EOF. Returns the actual number of available bytes.
@@ -193,7 +171,7 @@
(error "fill-input should only be called when the readbuf is empty"))
(set-buf-cur! buf 0)
(set-buf-end! buf 0) ; in case nio-read throws an error
- (let ((rv (nio-read (eport-fd eport) bv 0 len)))
+ (let ((rv (eport:read eport bv 0 len)))
(if (< rv 0)
(begin
(wait-for-readable eport)
@@ -212,8 +190,7 @@
(let ((cur (buf-cur buf))
(end (buf-end buf)))
(when (< cur end)
- (let ((written (nio-write (eport-fd eport)
- bv cur (- end cur))))
+ (let ((written (eport:write eport bv cur (- end cur))))
(flush-buffer buf written)
(when (< written (- end cur))
(wait-for-writable eport)
@@ -248,8 +225,7 @@
;; The buffer is more than half full; write some data and
;; try again.
(begin
- (let ((written (nio-write (eport-fd eport)
- bv cur (- end cur))))
+ (let ((written (eport:write eport bv cur (- end cur))))
(flush-buffer buf written)
(when (< written (- end cur))
(wait-for-writable eport)
@@ -466,8 +442,7 @@
;; and write from BV directly, without copying.
(else
(drain-output eport)
- (let ((written (nio-write (eport-fd eport)
- bv start count)))
+ (let ((written (eport:write eport bv start count)))
(when (< written count)
(wait-for-writable eport)
(lp (+ start written) (- count written))))))))))
@@ -574,3 +549,77 @@
(define (put-utf8-string eport str)
(put-bytevector eport (string->utf8 str)))
+
+
+
+
+;;;
+;;; File ports
+;;;
+
+(define-record-type <file-data>
+ (make-file-data fd port)
+ file-data?
+ (fd file-data-fd)
+ (port file-data-port))
+
+(define <file-eport>
+ (let ()
+ (define (fd-read data bv start count)
+ (nio-read (file-data-fd data) bv start count))
+ (define (fd-write data bv start count)
+ (nio-write (file-data-fd data) bv start count))
+ (define (fd-close data)
+ (cond
+ ((file-data-port data) => close-port)
+ (else (close-fdes (file-data-fd data)))))
+ (make-io-vtable fd-read fd-write fd-close)))
+
+(define* (fdes->eport fd #:key readable? writable?
+ (read-buffer-size 488)
+ (write-buffer-size 1000)
+ file-port)
+ (let ((eport
+ (make-eport <file-eport> (make-file-data fd file-port)
+ #:read-buffer-size (and readable? read-buffer-size)
+ #:write-buffer-size (and writable? write-buffer-size))))
+ (when file-port
+ (setvbuf file-port _IONBF))
+ (fcntl fd F_SETFL O_NONBLOCK)
+ eport))
+
+(define* (file-port->eport file-port)
+ (fdes->eport (fileno file-port)
+ #:readable? (input-port? file-port)
+ #:writable? (output-port? file-port)
+ #:file-port file-port))
+
+(define (file-eport-fd eport)
+ (file-data-fd (eport-data eport)))
+
+(define (file-eport-port eport)
+ (file-data-port (eport-data eport)))
+
+;; Accept a new connection on EPORT, an eport that wraps a
+;; listening socket. Returns two values: an eport for the new
+;; connection, and the sockaddr.
+;;
+(define (accept-eport eport)
+ (let ((pair (nio-accept (file-eport-fd eport))))
+ (if pair
+ (values (fdes->eport (car pair) #:readable? #t #:writable? #t)
+ (cdr pair))
+ (begin
+ (wait-for-readable eport)
+ (accept-eport eport)))))
+
+;; Connect a socket eport to the remote host at SOCKADDR. Returns no
+;; values.
+;;
+(define (connect-eport eport sockaddr)
+ (unless (nio-connect (file-eport-fd eport) sockaddr)
+ (wait-for-writable eport)
+ (let ((err (getsockopt (file-eport-fd eport) SOL_SOCKET SO_ERROR)))
+ (unless (zero? err)
+ (scm-error 'system-error "connect-eport" "~A"
+ (list (strerror err)) #f)))))
diff --git a/module/ice-9/ethreads.scm b/module/ice-9/ethreads.scm
index 5016540..2c2c974 100644
--- a/module/ice-9/ethreads.scm
+++ b/module/ice-9/ethreads.scm
@@ -229,7 +229,7 @@
events
(suspend
(lambda (ctx thread)
- (let* ((fd (eport-fd eport))
+ (let* ((fd (file-eport-fd eport))
(sources (hashv-ref (econtext-sources ctx) fd)))
(cond
(sources
diff --git a/module/web/server/ethreads.scm b/module/web/server/ethreads.scm
index b803c01..bdcd26b 100644
--- a/module/web/server/ethreads.scm
+++ b/module/web/server/ethreads.scm
@@ -180,12 +180,12 @@
client-thread request body))
(let loop ()
(let ((client (accept-eport esocket)))
- (setsockopt (eport-fd client) SOL_SOCKET SO_SNDBUF (* 12 1024))
+ (setsockopt (file-eport-fd client) SOL_SOCKET SO_SNDBUF (* 12 1024))
;; Always disable Nagle's algorithm, as we handle buffering
;; ourselves. Ignore exceptions if it's not a TCP port, or
;; TCP_NODELAY is not defined on this platform.
(false-if-exception
- (setsockopt (eport-fd client) IPPROTO_TCP TCP_NODELAY 0))
+ (setsockopt (file-eport-fd client) IPPROTO_TCP TCP_NODELAY 0))
(spawn (lambda () (client-loop client have-request)))
(loop))))
- [Guile-commits] 09/23: eports: some more exports, (continued)
- [Guile-commits] 09/23: eports: some more exports, Andy Wingo, 2016/03/24
- [Guile-commits] 12/23: (web server ethreads): Use a large backlog., Andy Wingo, 2016/03/24
- [Guile-commits] 13/23: add latin1 chars and strings to eports, Andy Wingo, 2016/03/24
- [Guile-commits] 18/23: (web server ethreads) TCP_NODELAY tweak, Andy Wingo, 2016/03/24
- [Guile-commits] 05/23: http: allow custom read-line / continuation-line? functions, Andy Wingo, 2016/03/24
- [Guile-commits] 06/23: setsockopt can take an fd, Andy Wingo, 2016/03/24
- [Guile-commits] 10/23: EOF fix for continuation-line?, Andy Wingo, 2016/03/24
- [Guile-commits] 19/23: nio: add non-blocking connect, Andy Wingo, 2016/03/24
- [Guile-commits] 08/23: add #:limit to get-bytevector-delimited, Andy Wingo, 2016/03/24
- [Guile-commits] 11/23: socket: TCP_CORK, TCP_NODELAY, Andy Wingo, 2016/03/24
- [Guile-commits] 23/23: virtualize read/write/close operations in <eport>,
Andy Wingo <=
- [Guile-commits] 15/23: (web server ethreads): more use of latin1 accessors, Andy Wingo, 2016/03/24
- [Guile-commits] 01/23: add (ice-9 nio), Andy Wingo, 2016/03/24
- [Guile-commits] 20/23: eports: nonblocking connect-eport, Andy Wingo, 2016/03/24
- [Guile-commits] 14/23: refactoring to (web server ethreads) read-http-line, Andy Wingo, 2016/03/24
- [Guile-commits] 02/23: add (ice-9 eports), Andy Wingo, 2016/03/24
- [Guile-commits] 17/23: getsockopt: allow raw file descriptors, Andy Wingo, 2016/03/24
- [Guile-commits] 16/23: eports: add put-utf8-char, put-utf8-string, Andy Wingo, 2016/03/24
- [Guile-commits] 03/23: add (ice-9 epoll), Andy Wingo, 2016/03/24
- [Guile-commits] 21/23: eports tweak, Andy Wingo, 2016/03/24
- [Guile-commits] 07/23: add (web server ethreads), Andy Wingo, 2016/03/24