guile-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Guile-commits] GNU Guile branch, wip-ethreads, updated. v2.0.5-116-g2b6


From: Andy Wingo
Subject: [Guile-commits] GNU Guile branch, wip-ethreads, updated. v2.0.5-116-g2b6a851
Date: Fri, 30 Mar 2012 17:37:20 +0000

This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "GNU Guile".

http://git.savannah.gnu.org/cgit/guile.git/commit/?id=2b6a851718aca9fc4a1cd451491136768fbc6423

The branch, wip-ethreads has been updated
       via  2b6a851718aca9fc4a1cd451491136768fbc6423 (commit)
       via  49df47c949ffcee4927cb29d3dcb910e7d4e5205 (commit)
       via  74920887be0788ffc3d02c0fba5502c55fac206b (commit)
       via  b67ef803d7bcf04bc7a2d0493110a88312784a5d (commit)
      from  ae8cc531c72d831f00a7f387aacb50baad3ee7d8 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.

- Log -----------------------------------------------------------------
commit 2b6a851718aca9fc4a1cd451491136768fbc6423
Author: Andy Wingo <address@hidden>
Date:   Fri Mar 30 19:36:24 2012 +0200

    add examples/ethreads/memcached-{client,server}
    
    * examples/ethreads/memcached-client.scm:
    * examples/ethreads/memcached-server.scm: Two new ethreads examples.
      Not really optimized, no consideration for memory use, but perhaps
      instructive.

commit 49df47c949ffcee4927cb29d3dcb910e7d4e5205
Author: Andy Wingo <address@hidden>
Date:   Fri Mar 30 19:35:42 2012 +0200

    eports tweak
    
    * module/ice-9/eports.scm (flush-buffer, fill-input, ensure-writable):
      Instead of making flush-buffer reset cur and end to 0 when the buffer
      is empty, make that be the job of the less-frequently-called
      fill-input and ensure-writable.

commit 74920887be0788ffc3d02c0fba5502c55fac206b
Author: Andy Wingo <address@hidden>
Date:   Fri Mar 30 17:39:32 2012 +0200

    eports: nonblocking connect-eport
    
    * module/ice-9/eports.scm (connect-eport): Add implementation of
      nonblocking `connect' for eports.

commit b67ef803d7bcf04bc7a2d0493110a88312784a5d
Author: Andy Wingo <address@hidden>
Date:   Fri Mar 30 17:11:22 2012 +0200

    nio: add non-blocking connect
    
    * libguile/nio.c (scm_nio_connect):
    * module/ice-9/nio.scm (nio-connect): Add non-blocking connect
      primitive.

-----------------------------------------------------------------------

Summary of changes:
 examples/ethreads/memcached-client.scm |  156 ++++++++++++++++++++++++++++++++
 examples/ethreads/memcached-server.scm |  156 ++++++++++++++++++++++++++++++++
 libguile/nio.c                         |   32 +++++++
 module/ice-9/eports.scm                |   66 ++++++++------
 module/ice-9/nio.scm                   |   13 +++-
 5 files changed, 394 insertions(+), 29 deletions(-)
 create mode 100644 examples/ethreads/memcached-client.scm
 create mode 100644 examples/ethreads/memcached-server.scm

diff --git a/examples/ethreads/memcached-client.scm 
b/examples/ethreads/memcached-client.scm
new file mode 100644
index 0000000..bf9752b
--- /dev/null
+++ b/examples/ethreads/memcached-client.scm
@@ -0,0 +1,156 @@
+;;; Simple memcached client implementation
+
+;; Copyright (C)  2012 Free Software Foundation, Inc.
+
+;; This library is free software; you can redistribute it and/or
+;; modify it under the terms of the GNU Lesser General Public
+;; License as published by the Free Software Foundation; either
+;; version 3 of the License, or (at your option) any later version.
+;;
+;; This library is distributed in the hope that it will be useful,
+;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+;; Lesser General Public License for more details.
+;;
+;; You should have received a copy of the GNU Lesser General Public
+;; License along with this library; if not, write to the Free Software
+;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+;; 02110-1301 USA
+
+(use-modules (rnrs bytevectors)
+             (ice-9 eports)
+             (ice-9 ethreads)
+             (ice-9 match))
+
+(define (server-error eport msg . args)
+  (apply format (current-error-port) msg args)
+  (newline (current-error-port))
+  (close-eport eport)
+  (suspend))
+
+(define (read-line eport)
+  (define (end-of-line? c)
+    (or (eqv? c #\newline) (eqv? c #\return)))
+  (call-with-values
+      (lambda ()
+        ;; Restrict to 512 chars to avoid denial of service attacks.
+        (get-latin1-string-delimited eport end-of-line? #:max-chars 512))
+    (lambda (str delim)
+      (cond
+       ((not delim)
+        (server-error eport "Line too long: ~S" str))
+       ((eof-object? delim)
+        (server-error eport "EOF while reading line: ~S" str))
+       (else
+        (when (and (eqv? delim #\return)
+                   (eqv? (lookahead-latin1-char eport) #\newline))
+          (get-latin1-char eport))
+        str)))))
+
+(define (parse-int eport val)
+  (let ((num (string->number val)))
+    (unless (and num (integer? num) (exact? num) (>= num 0))
+      (server-error eport "Expected a non-negative integer: ~s" val))
+    num))
+
+(define (make-item flags bv)
+  (vector flags bv))
+(define (item-flags item)
+  (vector-ref item 0))
+(define (item-bv item)
+  (vector-ref item 1))
+
+(define (get eport . keys)
+  (put-utf8-string eport "get ")
+  (put-utf8-string eport (string-join keys " "))
+  (put-utf8-string eport "\r\n")
+  (drain-output eport)
+  (let lp ((vals '()))
+    (let ((line (read-line eport)))
+      (match (string-split line #\space)
+        (("VALUE" key flags length)
+         (let* ((flags (parse-int eport flags))
+                (length (parse-int eport length)))
+           (unless (member key keys)
+             (server-error eport "Unknown key: ~a" key))
+           (when (assoc key vals)
+             (server-error eport "Already have response for key: ~a" key))
+           (let ((bv (get-bytevector-n eport length)))
+             (unless (= (bytevector-length bv) length)
+               (server-error eport "Expected ~A bytes, got ~A" length bv))
+             (unless (eqv? (get-latin1-char eport) #\return)
+               (server-error eport "Expected \\r"))
+             (unless (eqv? (get-latin1-char eport) #\newline)
+               (server-error eport "Expected \\n"))
+             (lp (acons key (make-item flags bv) vals)))))
+        (("END")
+         (reverse vals))
+        (_
+         (server-error eport "Bad line: ~A" line))))))
+
+(define* (set eport key flags exptime bytes #:key noreply?)
+  (put-utf8-string eport "set ")
+  (put-utf8-string eport key)
+  (put-utf8-char eport #\space)
+  (put-utf8-string eport (number->string flags))
+  (put-utf8-char eport #\space)
+  (put-utf8-string eport (number->string exptime))
+  (put-utf8-char eport #\space)
+  (put-utf8-string eport (number->string (bytevector-length bytes)))
+  (when noreply?
+    (put-utf8-string eport " noreply"))
+  (put-utf8-string eport "\r\n")
+  (put-bytevector eport bytes)
+  (put-utf8-string eport "\r\n")
+  (drain-output eport)
+  (let ((line (read-line eport)))
+    (match line
+      ("STORED" #t)
+      ("NOT_STORED" #t)
+      (_
+       (server-error eport "Unexpected response from server: ~A" line)))))
+
+(define (connect addrinfo)
+  (let ((eport (file-port->eport (socket (addrinfo:fam addrinfo)
+                                         (addrinfo:socktype addrinfo)
+                                         (addrinfo:protocol addrinfo)))))
+    ;; Disable Nagle's algorithm.  We buffer ourselves.
+    (setsockopt (eport-fd eport) IPPROTO_TCP TCP_NODELAY 0)
+    (connect-eport eport (addrinfo:addr addrinfo))
+    eport))
+
+(define *active-clients* 0)
+
+(define (client-loop addrinfo n num-connections)
+  (set! *active-clients* (1+ *active-clients*))
+  (let ((eport (connect addrinfo))
+        (key (string-append "test-" (number->string n))))
+    (let lp ((m 0))
+      (when (< m num-connections)
+        (let ((v (string->utf8 (number->string m))))
+          (set eport key 0 0 v)
+          (let* ((response (get eport key))
+                 (item (assoc-ref response key)))
+            (unless item
+              (server-error eport "Not found: ~A" key))
+            (unless (equal? (item-bv item) v)
+              (server-error eport "Bad response: ~A (expected ~A)" (item-bv 
item) v))
+            (lp (1+ m))))))
+    (close-eport eport))
+  (set! *active-clients* (1- *active-clients*))
+  (when (zero? *active-clients*)
+    (exit 0)))
+
+(define (run-memcached-test num-clients num-connections)
+  ;; The getaddrinfo call blocks, unfortunately.  Call it once before
+  ;; spawning clients.
+  (let ((addrinfo (car (getaddrinfo "localhost" (number->string 11211)))))
+    (let lp ((n 0))
+      (when (< n num-clients)
+        (spawn
+         (lambda ()
+           (client-loop addrinfo n num-connections)))
+        (lp (1+ n)))))
+  (run))
+
+(apply run-memcached-test (map string->number (cdr (program-arguments))))
diff --git a/examples/ethreads/memcached-server.scm 
b/examples/ethreads/memcached-server.scm
new file mode 100644
index 0000000..a5e28a8
--- /dev/null
+++ b/examples/ethreads/memcached-server.scm
@@ -0,0 +1,156 @@
+;;; Simple memcached server implementation
+
+;; Copyright (C)  2012 Free Software Foundation, Inc.
+
+;; This library is free software; you can redistribute it and/or
+;; modify it under the terms of the GNU Lesser General Public
+;; License as published by the Free Software Foundation; either
+;; version 3 of the License, or (at your option) any later version.
+;;
+;; This library is distributed in the hope that it will be useful,
+;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+;; Lesser General Public License for more details.
+;;
+;; You should have received a copy of the GNU Lesser General Public
+;; License along with this library; if not, write to the Free Software
+;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+;; 02110-1301 USA
+
+(use-modules (rnrs bytevectors)
+             (ice-9 eports)
+             (ice-9 ethreads)
+             (ice-9 match))
+
+(define (make-default-socket family addr port)
+  (let ((sock (socket PF_INET SOCK_STREAM 0)))
+    (setsockopt sock SOL_SOCKET SO_REUSEADDR 1)
+    (fcntl sock F_SETFD FD_CLOEXEC)
+    (bind sock family addr port)
+    sock))
+
+(define (client-error eport msg . args)
+  (put-utf8-string eport (apply format #f msg args))
+  (put-utf8-string eport "\r\n")
+  (drain-output eport)
+  (close-eport eport)
+  (suspend))
+
+(define (read-line eport)
+  (define (end-of-line? c)
+    (or (eqv? c #\newline) (eqv? c #\return)))
+  (call-with-values
+      (lambda ()
+        ;; Restrict to 512 chars to avoid denial of service attacks.
+        (get-latin1-string-delimited eport end-of-line? #:max-chars 512))
+    (lambda (str delim)
+      (cond
+       ((not delim)
+        (client-error eport "Line too long: ~S" str))
+       ((eof-object? delim)
+        (client-error eport "EOF while reading line: ~S" str))
+       (else
+        (when (and (eqv? delim #\return)
+                   (eqv? (lookahead-latin1-char eport) #\newline))
+          (get-latin1-char eport))
+        str)))))
+
+(define (parse-int eport val)
+  (let ((num (string->number val)))
+    (unless (and num (integer? num) (exact? num) (>= num 0))
+      (client-error eport "Expected a non-negative integer: ~s" val))
+    num))
+
+(define (make-item flags bv)
+  (vector flags bv))
+(define (item-flags item)
+  (vector-ref item 0))
+(define (item-bv item)
+  (vector-ref item 1))
+
+(define *commands* (make-hash-table))
+
+(define-syntax-rule (define-command (name eport store . pat)
+                      body body* ...)
+  (begin
+    (define (name eport store args)
+      (match args
+        (pat body body* ...)
+        (else
+         (client-error eport "Bad line: ~A ~S" 'name (string-join args " ")))))
+    (hashq-set! *commands* 'name name)))
+
+(define-command (get eport store key* ...)
+  (let lp ((key* key*))
+    (match key*
+      ((key key* ...)
+       (let ((item (hash-ref store key)))
+         (when item
+           (put-utf8-string eport "VALUE ")
+           (put-utf8-string eport key)
+           (put-utf8-char eport #\space)
+           (put-utf8-string eport (number->string (item-flags item)))
+           (put-utf8-char eport #\space)
+           (put-utf8-string eport (number->string
+                                   (bytevector-length (item-bv item))))
+           (put-utf8-char eport #\return)
+           (put-utf8-char eport #\newline)
+           (put-bytevector eport (item-bv item))
+           (put-utf8-string eport "\r\n"))
+         (lp key*)))
+      (()
+       (put-utf8-string eport "END\r\n")))))
+
+(define-command (set eport store key flags exptime bytes
+                     . (and noreply (or ("noreply") ())))
+  (let* ((flags (parse-int eport flags))
+         (exptime (parse-int eport exptime))
+         (bytes (parse-int eport bytes)))
+    (let ((bv (get-bytevector-n eport bytes)))
+      (unless (= (bytevector-length bv) bytes)
+        (client-error eport "Tried to read ~A bytes, only read ~A"
+                      bytes (bytevector-length bv)))
+      (hash-set! store key (make-item flags bv))
+      (when (eqv? (lookahead-latin1-char eport) #\return)
+        (get-latin1-char eport))
+      (when (eqv? (lookahead-latin1-char eport) #\newline)
+        (get-latin1-char eport)))
+    (put-utf8-string eport "STORED\r\n")))
+
+(define (client-loop eport store)
+  (let loop ()
+    (let* ((args (string-split (read-line eport) #\space))
+           (verb (string->symbol (car args)))
+           (proc (hashq-ref *commands* verb)))
+      (unless proc
+        (client-error eport "Bad command: ~a" verb))
+      (proc eport store (cdr args)))
+    (drain-output eport)
+    (if (eof-object? (lookahead-u8 eport))
+        (close-eport eport)
+        (loop))))
+
+(define (socket-loop esocket store)
+  (let loop ()
+    (let ((client (accept-eport esocket)))
+      ;; Disable Nagle's algorithm.  We buffer ourselves.
+      (setsockopt (eport-fd client) IPPROTO_TCP TCP_NODELAY 0)
+      (spawn (lambda () (client-loop client store)))
+      (loop))))
+
+(define* (run-memcached #:key
+                        (host #f)
+                        (family AF_INET)
+                        (addr (if host
+                                  (inet-pton family host)
+                                  INADDR_LOOPBACK))
+                        (port 11211)
+                        (socket (make-default-socket family addr port)))
+  (listen socket 128)
+  (sigaction SIGPIPE SIG_IGN)
+  (spawn
+   (lambda ()
+     (socket-loop (file-port->eport socket) (make-hash-table))))
+  (run))
+
+(run-memcached)
diff --git a/libguile/nio.c b/libguile/nio.c
index 8697b02..7fc978b 100644
--- a/libguile/nio.c
+++ b/libguile/nio.c
@@ -169,6 +169,37 @@ scm_nio_accept (SCM fd)
 }
 #undef FUNC_NAME
 
+/* Initiate a connection from a socket.
+
+   The second argument should be a socket address object as returned by
+   @code{make-socket-address}.
+
+   Returns @code{#t} if the connection succeeded immediately, and
+   @code{#f} otherwise.  */
+static SCM
+scm_nio_connect (SCM fd, SCM sockaddr)
+#define FUNC_NAME "nio-connect"
+{
+  int c_fd, rv, save_errno;
+  struct sockaddr *soka;
+  size_t size;
+
+  c_fd = scm_to_int (fd);
+  soka = scm_to_sockaddr (sockaddr, &size);
+  rv = connect (c_fd, soka, size);
+  save_errno = errno;
+
+  if (rv == -1 && save_errno != EINPROGRESS)
+    {
+      free (soka);
+      errno = save_errno;
+      SCM_SYSERROR;
+    }
+  free (soka);
+  return scm_from_bool (rv == 0);
+}
+#undef FUNC_NAME
+
 
 
 
@@ -179,6 +210,7 @@ scm_init_nio (void)
   scm_c_define_gsubr ("%nio-read", 4, 0, 0, scm_nio_read);
   scm_c_define_gsubr ("%nio-write", 4, 0, 0, scm_nio_write);
   scm_c_define_gsubr ("%nio-accept", 1, 0, 0, scm_nio_accept);
+  scm_c_define_gsubr ("%nio-connect", 2, 0, 0, scm_nio_connect);
 }
 
 void
diff --git a/module/ice-9/eports.scm b/module/ice-9/eports.scm
index 4fe9845..62de47c 100644
--- a/module/ice-9/eports.scm
+++ b/module/ice-9/eports.scm
@@ -34,6 +34,7 @@
             current-write-waiter
 
             accept-eport
+            connect-eport
 
             get-u8
             putback-u8
@@ -110,21 +111,13 @@
 (define (make-fresh-buf n)
   (make-buf (make-bytevector n 0) 0 0))
 
-;; Mark N bytes as having been read or written.  This advances CUR by N,
-;; except in the case that CUR would be equal to END, in which case both
-;; are reset to 0.
+;; Mark N bytes as having been read or written.  This advances CUR by N.
 ;;
 (define (flush-buffer buf n)
-  (let ((new-cur (+ (buf-cur buf) n))
-        (end (buf-end buf)))
-    (cond
-     ((< new-cur end)
-      (set-buf-cur! buf new-cur))
-     ((= new-cur end)
-      (set-buf-cur! buf 0)
-      (set-buf-end! buf 0))
-     (else
-      (error "flushing too many bytes" buf n)))))
+  (let ((cur (buf-cur buf)))
+    (unless (<= n (- (buf-end buf) cur))
+      (error "flushing too many bytes" buf n))
+    (set-buf-cur! buf (+ cur n))))
 
 ;; Create an NIO port that wraps FD.  The strange default sizes assume
 ;; that the memory is allocated inline to the bytevector, and thus has a
@@ -178,25 +171,36 @@
           (wait-for-readable eport)
           (accept-eport eport)))))
 
+;; Connect a socket eport to the remote host at SOCKADDR.  Returns no
+;; values.
+;;
+(define (connect-eport eport sockaddr)
+  (unless (nio-connect (eport-fd eport) sockaddr)
+    (wait-for-writable eport)
+    (let ((err (getsockopt (eport-fd eport) SOL_SOCKET SO_ERROR)))
+      (unless (zero? err)
+        (scm-error 'system-error "connect-eport" "~A"
+                   (list (strerror err)) #f)))))
+
 ;; Ensure that there are readable bytes in the buffer, or that the
 ;; buffer is at EOF.  Returns the actual number of available bytes.
 ;;
 (define (fill-input eport)
   (let* ((buf (eport-readbuf eport))
          (bv (buf-bv buf))
-         (cur (buf-cur buf))
-         (end (buf-end buf))
          (len (bytevector-length bv)))
-    (if (zero? (- len end))
-        (error "fill-input should only be called when the readbuf is empty"))
-    (let ((rv (nio-read (eport-fd eport) bv end (- len end))))
+    (unless (= (buf-cur buf) (buf-end buf))
+      (error "fill-input should only be called when the readbuf is empty"))
+    (set-buf-cur! buf 0)
+    (set-buf-end! buf 0) ; in case nio-read throws an error
+    (let ((rv (nio-read (eport-fd eport) bv 0 len)))
       (if (< rv 0)
           (begin
             (wait-for-readable eport)
             (fill-input eport))
-          (let ((new-end (+ end rv)))
-            (set-buf-end! buf new-end)
-            (- new-end cur))))))
+          (begin
+            (set-buf-end! buf rv)
+            rv)))))
 
 ;; Write all buffered output: those bytes between CUR and END.  Advances
 ;; CUR to be equal to END.
@@ -228,7 +232,12 @@
              (end (buf-end buf))
              (bv (buf-bv buf))
              (size (bytevector-length bv)))
-        (when (= end size)
+        (cond
+         ((zero? end))
+         ((= cur end)
+          (set-buf-cur! buf 0)
+          (set-buf-end! buf 0))
+         ((= end size)
           (if (> (* cur 2) size)
               ;; The buffer is less than half full; shuffle the data to
               ;; make space.
@@ -238,12 +247,13 @@
                 (set-buf-end! buf (- end cur)))
               ;; The buffer is more than half full; write some data and
               ;; try again.
-              (let ((written (nio-write (eport-fd eport)
-                                        bv cur (- end cur))))
-                (flush-buffer buf written)
-                (when (< written (- end cur))
-                  (wait-for-writable eport)
-                  (lp)))))))))
+              (begin
+                (let ((written (nio-write (eport-fd eport)
+                                         bv cur (- end cur))))
+                 (flush-buffer buf written)
+                 (when (< written (- end cur))
+                   (wait-for-writable eport)
+                   (lp)))))))))))
 
 ;; Peek at the next octet from EPORT, blocking if necessary.
 ;;
diff --git a/module/ice-9/nio.scm b/module/ice-9/nio.scm
index 1c64ee6..5b07d1c 100644
--- a/module/ice-9/nio.scm
+++ b/module/ice-9/nio.scm
@@ -20,7 +20,8 @@
 (define-module (ice-9 nio)
   #:export (nio-read
             nio-write
-            nio-accept))
+            nio-accept
+            nio-connect))
 
 (eval-when (eval load compile)
   (load-extension (string-append "libguile-" (effective-version))
@@ -44,3 +45,13 @@ read without blocking.  A return value of 0 indicates EOF."
 new client socket, unless the @code{accept} call would have blocked, in
 which case return @code{#f}."
   (%nio-accept fd))
+
+(define (nio-connect fd sockaddr)
+  "Initiate a connection from a socket whose file descriptor is
address@hidden  The second argument should be a socket address object as
+returned by @code{make-socket-address}.  Returns @code{#t} if the
+connection succeeded immediately, and @code{#f} otherwise.
+
+Once this socket becomes writable, it is ready.  If there was a
+connection error, it can be retrieved with getsockopt of SO_ERROR."
+  (%nio-connect fd sockaddr))


hooks/post-receive
-- 
GNU Guile



reply via email to

[Prev in Thread] Current Thread [Next in Thread]