guix-patches
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[bug#45018] [PATCH 5/6] substitute: Cache and reuse connections while su


From: Ludovic Courtès
Subject: [bug#45018] [PATCH 5/6] substitute: Cache and reuse connections while substituting.
Date: Thu, 3 Dec 2020 11:19:29 +0100

That way, when fetching a series of substitutes from the same server(s),
the connection is reused instead of being closed/opened for each
substitutes, which saves on network round trips and TLS handshakes.

* guix/http-client.scm (http-fetch): Add #:keep-alive? and honor it.
* guix/progress.scm (progress-report-port): Add #:close? parameter and
honor it.
* guix/scripts/substitute.scm (fetch): Add #:port and #:keep-alive? and
honor them.
(open-connection-for-uri/cached, call-with-cached-connection): New
procedures.
(with-cached-connection): New macro.
(process-substitution): Wrap 'fetch' call in 'with-cached-connection'.
Pass #:close? to 'progress-report-port'.
---
 guix/http-client.scm        | 12 +++---
 guix/progress.scm           |  8 ++--
 guix/scripts/substitute.scm | 75 +++++++++++++++++++++++++++++++------
 3 files changed, 75 insertions(+), 20 deletions(-)

diff --git a/guix/http-client.scm b/guix/http-client.scm
index a767175d67..553640fe9e 100644
--- a/guix/http-client.scm
+++ b/guix/http-client.scm
@@ -1,5 +1,5 @@
 ;;; GNU Guix --- Functional package management for GNU
-;;; Copyright © 2012, 2013, 2014, 2015, 2016, 2017, 2018 Ludovic Courtès 
<ludo@gnu.org>
+;;; Copyright © 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2020 Ludovic Courtès 
<ludo@gnu.org>
 ;;; Copyright © 2015 Mark H Weaver <mhw@netris.org>
 ;;; Copyright © 2012, 2015 Free Software Foundation, Inc.
 ;;; Copyright © 2017 Tobias Geerinckx-Rice <me@tobias.gr>
@@ -70,6 +70,7 @@
 
 
 (define* (http-fetch uri #:key port (text? #f) (buffered? #t)
+                     (keep-alive? #f)
                      (verify-certificate? #t)
                      (headers '((user-agent . "GNU Guile")))
                      timeout)
@@ -79,6 +80,9 @@ textual.  Follow any HTTP redirection.  When BUFFERED? is #f, 
return an
 unbuffered port, suitable for use in `filtered-port'.  HEADERS is an alist of
 extra HTTP headers.
 
+When KEEP-ALIVE? is true, the connection is marked as 'keep-alive' and PORT is
+not closed upon completion.
+
 When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates.
 
 TIMEOUT specifies the timeout in seconds for connection establishment; when
@@ -104,11 +108,7 @@ Raise an '&http-get-error' condition if downloading fails."
         (setvbuf port 'none))
       (let*-values (((resp data)
                      (http-get uri #:streaming? #t #:port port
-                               ;; XXX: When #:keep-alive? is true, if DATA is
-                               ;; a chunked-encoding port, closing DATA won't
-                               ;; close PORT, leading to a file descriptor
-                               ;; leak.
-                               #:keep-alive? #f
+                               #:keep-alive? keep-alive?
                                #:headers headers))
                     ((code)
                      (response-code resp)))
diff --git a/guix/progress.scm b/guix/progress.scm
index fec65b424c..cd80ae620a 100644
--- a/guix/progress.scm
+++ b/guix/progress.scm
@@ -337,9 +337,10 @@ should be a <progress-reporter> object."
               (report total)
               (loop total (get-bytevector-n! in buffer 0 buffer-size))))))))
 
-(define (progress-report-port reporter port)
+(define* (progress-report-port reporter port #:key (close? #t))
   "Return a port that continuously reports the bytes read from PORT using
-REPORTER, which should be a <progress-reporter> object."
+REPORTER, which should be a <progress-reporter> object.  When CLOSE? is true,
+PORT is closed when the returned port is closed."
   (match reporter
     (($ <progress-reporter> start report stop)
      (let* ((total 0)
@@ -364,5 +365,6 @@ REPORTER, which should be a <progress-reporter> object."
                                         ;; trace.
                                         (unless (zero? total)
                                           (stop))
-                                        (close-port port)))))))
+                                        (when close?
+                                          (close-port port))))))))
 
diff --git a/guix/scripts/substitute.scm b/guix/scripts/substitute.scm
index 334d3c97f8..d6b2a5884f 100755
--- a/guix/scripts/substitute.scm
+++ b/guix/scripts/substitute.scm
@@ -188,9 +188,14 @@ again."
         (sigaction SIGALRM SIG_DFL)
         (apply values result)))))
 
-(define* (fetch uri #:key (buffered? #t) (timeout? #t))
+(define* (fetch uri #:key (buffered? #t) (timeout? #t)
+                (keep-alive? #f) (port #f))
   "Return a binary input port to URI and the number of bytes it's expected to
-provide."
+provide.
+
+When PORT is true, use it as the underlying I/O port for HTTP transfers; when
+PORT is false, open a new connection for URI.  When KEEP-ALIVE? is true, the
+connection (typically PORT) is kept open once data has been fetched from URI."
   (case (uri-scheme uri)
     ((file)
      (let ((port (open-file (uri-path uri)
@@ -206,7 +211,7 @@ provide."
        ;;   sudo tc qdisc add dev eth0 root netem delay 1500ms
        ;; and then cancel with:
        ;;   sudo tc qdisc del dev eth0 root
-       (let ((port #f))
+       (let ((port port))
          (with-timeout (if timeout?
                            %fetch-timeout
                            0)
@@ -217,10 +222,11 @@ provide."
            (begin
              (when (or (not port) (port-closed? port))
                (set! port (guix:open-connection-for-uri
-                           uri #:verify-certificate? #f))
-               (unless (or buffered? (not (file-port? port)))
-                 (setvbuf port 'none)))
+                           uri #:verify-certificate? #f)))
+             (unless (or buffered? (not (file-port? port)))
+               (setvbuf port 'none))
              (http-fetch uri #:text? #f #:port port
+                         #:keep-alive? keep-alive?
                          #:verify-certificate? #f))))))
     (else
      (leave (G_ "unsupported substitute URI scheme: ~a~%")
@@ -962,6 +968,48 @@ the URI, its compression method (a string), and the 
compressed file size."
     (((uri compression file-size) _ ...)
      (values uri compression file-size))))
 
+(define open-connection-for-uri/cached
+  (let ((cache (make-hash-table)))
+    (lambda* (uri #:key fresh?)
+      "Return a connection for URI, possibly reusing a cached connection.
+When FRESH? is true, delete any cached connections for URI and open a new
+one.  Return #f if URI's scheme is 'file' or #f."
+      (define host (uri-host uri))
+      (define scheme (uri-scheme uri))
+      (define key (cons host scheme))
+
+      (and (not (memq scheme '(file #f)))
+           (match (hash-ref cache key)
+             (#f
+              (let ((socket (guix:open-connection-for-uri
+                             uri #:verify-certificate? #f)))
+                (hash-set! cache key socket)
+                socket))
+             (socket
+              (if (or fresh? (port-closed? socket))
+                  (begin
+                    (false-if-exception (close-port socket))
+                    (hash-remove! cache key)
+                    (open-connection-for-uri/cached uri))
+                  socket)))))))
+
+(define (call-with-cached-connection uri proc)
+  (let ((port (open-connection-for-uri/cached uri)))
+    (catch 'system-error
+      (lambda ()
+        (proc port))
+      (lambda args
+        ;; If PORT was cached and the server closed the connection in the
+        ;; meantime, we get EPIPE.  In that case, open a fresh connection and
+        ;; retry.
+        (if (= EPIPE (system-error-errno args))
+            (proc (open-connection-for-uri/cached uri #:fresh? #t))
+            (apply throw args))))))
+
+(define-syntax-rule (with-cached-connection uri port exp ...)
+  "Bind PORT with EXP... to a socket connected to URI."
+  (call-with-cached-connection uri (lambda (port) exp ...)))
+
 (define* (process-substitution store-item destination
                                #:key cache-urls acl print-build-trace?)
   "Substitute STORE-ITEM (a store file name) from CACHE-URLS, and write it to
@@ -984,10 +1032,12 @@ DESTINATION as a nar file.  Verify the substitute 
against ACL."
               (G_ "Downloading ~a...~%") (uri->string uri)))
 
     (let*-values (((raw download-size)
-                   ;; Note that Hydra currently generates Nars on the fly
-                   ;; and doesn't specify a Content-Length, so
-                   ;; DOWNLOAD-SIZE is #f in practice.
-                   (fetch uri #:buffered? #f #:timeout? #f))
+                   ;; 'guix publish' without '--cache' doesn't specify a
+                   ;; Content-Length, so DOWNLOAD-SIZE is #f in this case.
+                   (with-cached-connection uri port
+                     (fetch uri #:buffered? #f #:timeout? #f
+                            #:port port
+                            #:keep-alive? #t)))
                   ((progress)
                    (let* ((dl-size  (or download-size
                                         (and (equal? compression "none")
@@ -1001,7 +1051,9 @@ DESTINATION as a nar file.  Verify the substitute against 
ACL."
                                          (uri->string uri) dl-size
                                          (current-error-port)
                                          #:abbreviation 
nar-uri-abbreviation))))
-                     (progress-report-port reporter raw)))
+                     ;; Keep RAW open upon completion so we can later reuse
+                     ;; the underlying connection.
+                     (progress-report-port reporter raw #:close? #f)))
                   ((input pids)
                    ;; NOTE: This 'progress' port of current process will be
                    ;; closed here, while the child process doing the
@@ -1216,6 +1268,7 @@ default value."
 
 ;;; Local Variables:
 ;;; eval: (put 'with-timeout 'scheme-indent-function 1)
+;;; eval: (put 'with-cached-connection 'scheme-indent-function 2)
 ;;; End:
 
 ;;; substitute.scm ends here
-- 
2.29.2






reply via email to

[Prev in Thread] Current Thread [Next in Thread]