guix-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

02/05: substitute-binary: Pipeline HTTP requests instead of using thread


From: Ludovic Courtès
Subject: 02/05: substitute-binary: Pipeline HTTP requests instead of using threads.
Date: Mon, 23 Mar 2015 21:28:58 +0000

civodul pushed a commit to branch master
in repository guix.

commit d3a652037ef879f9279bc056c43d15ba7afcbb25
Author: Ludovic Courtès <address@hidden>
Date:   Mon Mar 23 22:25:04 2015 +0100

    substitute-binary: Pipeline HTTP requests instead of using threads.
    
    * guix/scripts/substitute-binary.scm (fetch-narinfo, %lookup-threads,
      n-par-map*): Remove.
      (narinfo-cache-file, cached-narinfo, cache-narinfo!, narinfo-request,
      http-multiple-get, read-to-eof, fetch-narinfos, lookup-narinfos,
      narinfo-from-file): New procedures.
      (lookup-narinfo): Rewrite in terms of 'lookup-narinfos'.
      (guix-substitute-binary): Use 'lookup-narinfos' instead of
      'lookup-narinfo'.
---
 guix/scripts/substitute-binary.scm |  270 +++++++++++++++++++++++++-----------
 1 files changed, 192 insertions(+), 78 deletions(-)

diff --git a/guix/scripts/substitute-binary.scm 
b/guix/scripts/substitute-binary.scm
index 85c2c74..c21c50f 100755
--- a/guix/scripts/substitute-binary.scm
+++ b/guix/scripts/substitute-binary.scm
@@ -28,7 +28,7 @@
   #:use-module (guix base64)
   #:use-module (guix pk-crypto)
   #:use-module (guix pki)
-  #:use-module ((guix build utils) #:select (mkdir-p))
+  #:use-module ((guix build utils) #:select (mkdir-p dump-port))
   #:use-module ((guix build download)
                 #:select (progress-proc uri-abbreviation))
   #:use-module (ice-9 rdelim)
@@ -48,6 +48,8 @@
   #:use-module (srfi srfi-34)
   #:use-module (srfi srfi-35)
   #:use-module (web uri)
+  #:use-module (web request)
+  #:use-module (web response)
   #:use-module (guix http-client)
   #:export (narinfo-signature->canonical-sexp
             read-narinfo
@@ -218,7 +220,7 @@ failure."
 gonna have to wait."
   (delay (begin
            (format (current-error-port)
-                   (_ "updating list of substitutes from '~a'...~%")
+                   (_ "updating list of substitutes from '~a'...\r")
                    url)
            (open-cache url))))
 
@@ -380,40 +382,56 @@ or is signed by an unauthorized key."
 the cache STR originates form."
   (call-with-input-string str (cut read-narinfo <> cache-uri)))
 
-(define (fetch-narinfo cache path)
-  "Return the <narinfo> record for PATH, or #f if CACHE does not hold PATH."
-  (define (download url)
-    ;; Download the .narinfo from URL, and return its contents as a list of
-    ;; key/value pairs.  Don't emit an error message upon 404.
-    (false-if-exception (fetch (string->uri url)
-                               #:quiet-404? #t)))
-
-  (and (string=? (cache-store-directory cache) (%store-prefix))
-       (and=> (download (string-append (cache-url cache) "/"
-                                       (store-path-hash-part path)
-                                       ".narinfo"))
-              (cute read-narinfo <> (cache-url cache)))))
-
 (define (obsolete? date now ttl)
   "Return #t if DATE is obsolete compared to NOW + TTL seconds."
   (time>? (subtract-duration now (make-time time-duration 0 ttl))
           (make-time time-monotonic 0 date)))
 
-(define %lookup-threads
-  ;; Number of threads spawned to perform lookup operations.  This means we
-  ;; can have this many simultaneous HTTP GET requests to the server, which
-  ;; limits the impact of connection latency.
-  20)
 
-(define (lookup-narinfo cache path)
-  "Check locally if we have valid info about PATH, otherwise go to CACHE and
-check what it has."
+(define (narinfo-cache-file path)
+  "Return the name of the local file that contains an entry for PATH."
+  (string-append %narinfo-cache-directory "/"
+                 (store-path-hash-part path)))
+
+(define (cached-narinfo path)
+  "Check locally if we have valid info about PATH.  Return two values: a
+Boolean indicating whether we have valid cached info, and that info, which may
+be either #f (when PATH is unavailable) or the narinfo for PATH."
   (define now
     (current-time time-monotonic))
 
   (define cache-file
-    (string-append %narinfo-cache-directory "/"
-                   (store-path-hash-part path)))
+    (narinfo-cache-file path))
+
+  (catch 'system-error
+    (lambda ()
+      (call-with-input-file cache-file
+        (lambda (p)
+          (match (read p)
+            (('narinfo ('version 1)
+                       ('cache-uri cache-uri)
+                       ('date date) ('value #f))
+             ;; A cached negative lookup.
+             (if (obsolete? date now %narinfo-negative-ttl)
+                 (values #f #f)
+                 (values #t #f)))
+            (('narinfo ('version 1)
+                       ('cache-uri cache-uri)
+                       ('date date) ('value value))
+             ;; A cached positive lookup
+             (if (obsolete? date now %narinfo-ttl)
+                 (values #f #f)
+                 (values #t (string->narinfo value cache-uri))))
+            (('narinfo ('version v) _ ...)
+             (values #f #f))))))
+    (lambda _
+      (values #f #f))))
+
+(define (cache-narinfo! cache path narinfo)
+  "Cache locally NARNIFO for PATH, which originates from CACHE.  NARINFO may
+be #f, in which case it indicates that PATH is unavailable at CACHE."
+  (define now
+    (current-time time-monotonic))
 
   (define (cache-entry cache-uri narinfo)
     `(narinfo (version 1)
@@ -421,43 +439,153 @@ check what it has."
               (date ,(time-second now))
               (value ,(and=> narinfo narinfo->string))))
 
-  (let*-values (((valid? cached)
-                 (catch 'system-error
-                   (lambda ()
-                     (call-with-input-file cache-file
-                       (lambda (p)
-                         (match (read p)
-                           (('narinfo ('version 1)
-                                      ('cache-uri cache-uri)
-                                      ('date date) ('value #f))
-                            ;; A cached negative lookup.
-                            (if (obsolete? date now %narinfo-negative-ttl)
-                                (values #f #f)
-                                (values #t #f)))
-                           (('narinfo ('version 1)
-                                      ('cache-uri cache-uri)
-                                      ('date date) ('value value))
-                            ;; A cached positive lookup
-                            (if (obsolete? date now %narinfo-ttl)
-                                (values #f #f)
-                                (values #t (string->narinfo value
-                                                            cache-uri))))
-                           (('narinfo ('version v) _ ...)
-                            (values #f #f))))))
-                   (lambda _
-                     (values #f #f)))))
-    (if valid?
-        cached                                    ; including negative caches
+  (with-atomic-file-output (narinfo-cache-file path)
+    (lambda (out)
+      (write (cache-entry (cache-url cache) narinfo) out)))
+  narinfo)
+
+(define (narinfo-request cache-url path)
+  "Return an HTTP request for the narinfo of PATH at CACHE-URL."
+  (let ((url (string-append cache-url "/" (store-path-hash-part path)
+                            ".narinfo")))
+    (build-request (string->uri url) #:method 'GET)))
+
+(define (http-multiple-get base-url requests proc)
+  "Send all of REQUESTS to the server at BASE-URL.  Call PROC for each
+response, passing it the request object, the response, and a port from which
+to read the response body.  Return the list of results."
+  (let connect ((requests requests)
+                (result   '()))
+    ;; (format (current-error-port) "connecting (~a requests left)..."
+    ;;         (length requests))
+    (let ((p (open-socket-for-uri base-url)))
+      ;; Send all of REQUESTS in a row.
+      (setvbuf p _IOFBF (expt 2 16))
+      (for-each (cut write-request <> p) requests)
+      (force-output p)
+
+      ;; Now start processing responses.
+      (let loop ((requests requests)
+                 (result   result))
+        (match requests
+          (()
+           (reverse result))
+          ((head tail ...)
+           (let* ((resp (read-response p))
+                  (body (response-body-port resp)))
+             ;; The server can choose to stop responding at any time, in which
+             ;; case we have to try again.  Check whether that is the case.
+             (match (assq 'connection (response-headers resp))
+               (('connection 'close)
+                (connect requests result))        ;try again
+               (_
+                (loop tail                        ;keep going
+                      (cons (proc head resp body) result)))))))))))
+
+(define (read-to-eof port)
+  "Read from PORT until EOF is reached.  The data are discarded."
+  (dump-port port (%make-void-port "w")))
+
+(define (narinfo-from-file file url)
+  "Attempt to read a narinfo from FILE, using URL as the cache URL.  Return #f
+if file doesn't exist, and the narinfo otherwise."
+  (catch 'system-error
+    (lambda ()
+      (call-with-input-file file
+        (cut read-narinfo <> url)))
+    (lambda args
+      (if (= ENOENT (system-error-errno args))
+          #f
+          (apply throw args)))))
+
+(define (fetch-narinfos cache paths)
+  "Retrieve all the narinfos for PATHS from CACHE and return them."
+  (define url
+    (cache-url cache))
+
+  (define update-progress!
+    (let ((done 0))
+      (lambda ()
+        (display #\cr (current-error-port))
+        (force-output (current-error-port))
+        (format (current-error-port)
+                (_ "updating list of substitutes from '~a'... ~5,1f%")
+                url (* 100. (/ done (length paths))))
+        (set! done (+ 1 done)))))
+
+  (define (handle-narinfo-response request response port)
+    (let ((len (response-content-length response)))
+      ;; Make sure to read no more than LEN bytes since subsequent bytes may
+      ;; belong to the next response.
+      (case (response-code response)
+        ((200)                                     ; hit
+         (let ((narinfo (read-narinfo port url #:size len)))
+           (cache-narinfo! cache (narinfo-path narinfo) narinfo)
+           (update-progress!)
+           narinfo))
+        ((404)                                     ; failure
+         (let* ((path      (uri-path (request-uri request)))
+                (hash-part (string-drop-right path 8))) ; drop ".narinfo"
+           (if len
+               (get-bytevector-n port len)
+               (read-to-eof port))
+           (cache-narinfo! cache
+                           (find (cut string-contains <> hash-part) paths)
+                           #f)
+           (update-progress!))
+         #f)
+        (else                                      ; transient failure
+         (if len
+             (get-bytevector-n port len)
+             (read-to-eof port))
+         #f))))
+
+  (and (string=? (cache-store-directory cache) (%store-prefix))
+       (let ((uri (string->uri url)))
+         (case (and=> uri uri-scheme)
+           ((http)
+            (let ((requests (map (cut narinfo-request url <>) paths)))
+              (update-progress!)
+              (let ((result (http-multiple-get url requests
+                                               handle-narinfo-response)))
+                (newline (current-error-port))
+                result)))
+           ((file #f)
+            (let* ((base  (string-append (uri-path uri) "/"))
+                   (files (map (compose (cut string-append base <> ".narinfo")
+                                        store-path-hash-part)
+                               paths)))
+              (filter-map (cut narinfo-from-file <> url) files)))
+           (else
+            (leave (_ "~s: unsupported server URI scheme~%")
+                   (if uri (uri-scheme uri) url)))))))
+
+(define (lookup-narinfos cache paths)
+  "Return the narinfos for PATHS, invoking the server at CACHE when no
+information is available locally."
+  (let-values (((cached missing)
+                (fold2 (lambda (path cached missing)
+                         (let-values (((valid? value)
+                                       (cached-narinfo path)))
+                           (if valid?
+                               (values (cons value cached) missing)
+                               (values cached (cons path missing)))))
+                       '()
+                       '()
+                       paths)))
+    (if (null? missing)
+        cached
         (let* ((cache   (force cache))
-               (narinfo (and cache (fetch-narinfo cache path))))
-          ;; Cache NARINFO only when CACHE was actually accessible.  This
-          ;; avoids caching negative hits when in fact we just lacked network
-          ;; access.
-          (when cache
-            (with-atomic-file-output cache-file
-              (lambda (out)
-                (write (cache-entry (cache-url cache) narinfo) out))))
-          narinfo))))
+               (missing (if cache
+                            (fetch-narinfos cache missing)
+                            '())))
+          (append cached missing)))))
+
+(define (lookup-narinfo cache path)
+  "Return the narinfo for PATH in CACHE, or #f when no substitute for PATH was
+found."
+  (match (lookup-narinfos cache (list path))
+    ((answer) answer)))
 
 (define (remove-expired-cached-narinfos)
   "Remove expired narinfo entries from the cache.  The sole purpose of this
@@ -580,16 +708,6 @@ Internal tool to substitute a pre-built binary to a local 
build.\n"))
 ;;; Entry point.
 ;;;
 
-(define n-par-map*
-  ;; We want the ability to run many threads in parallel, regardless of the
-  ;; number of cores.  However, Guile 2.0.5 has a bug whereby 'n-par-map' ends
-  ;; up consuming a lot of memory, possibly leading to death.  Thus, resort to
-  ;; 'par-map' on 2.0.5.
-  (if (guile-version>? "2.0.5")
-      n-par-map
-      (lambda (n proc lst)
-        (par-map proc lst))))
-
 (define (check-acl-initialized)
   "Warn if the ACL is uninitialized."
   (define (singleton? acl)
@@ -698,9 +816,7 @@ substituter disabled~%")
                      ;; Return the subset of PATHS available in CACHE.
                      (let ((substitutable
                             (if cache
-                                (n-par-map* %lookup-threads
-                                            (cut lookup-narinfo cache <>)
-                                            paths)
+                                (lookup-narinfos cache paths)
                                 '())))
                        (for-each (lambda (narinfo)
                                    (format #t "~a~%" (narinfo-path narinfo)))
@@ -710,9 +826,7 @@ substituter disabled~%")
                      ;; Reply info about PATHS if it's in CACHE.
                      (let ((substitutable
                             (if cache
-                                (n-par-map* %lookup-threads
-                                            (cut lookup-narinfo cache <>)
-                                            paths)
+                                (lookup-narinfos cache paths)
                                 '())))
                        (for-each (lambda (narinfo)
                                    (format #t "~a\n~a\n~a\n"



reply via email to

[Prev in Thread] Current Thread [Next in Thread]