summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--guix/http-client.scm12
-rw-r--r--guix/progress.scm8
-rwxr-xr-xguix/scripts/substitute.scm103
-rw-r--r--nix/libstore/build.cc29
4 files changed, 117 insertions, 35 deletions
diff --git a/guix/http-client.scm b/guix/http-client.scm
index a767175d67..553640fe9e 100644
--- a/guix/http-client.scm
+++ b/guix/http-client.scm
@@ -1,5 +1,5 @@
 ;;; GNU Guix --- Functional package management for GNU
-;;; Copyright © 2012, 2013, 2014, 2015, 2016, 2017, 2018 Ludovic Courtès <ludo@gnu.org>
+;;; Copyright © 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2020 Ludovic Courtès <ludo@gnu.org>
 ;;; Copyright © 2015 Mark H Weaver <mhw@netris.org>
 ;;; Copyright © 2012, 2015 Free Software Foundation, Inc.
 ;;; Copyright © 2017 Tobias Geerinckx-Rice <me@tobias.gr>
@@ -70,6 +70,7 @@
 
 
 (define* (http-fetch uri #:key port (text? #f) (buffered? #t)
+                     (keep-alive? #f)
                      (verify-certificate? #t)
                      (headers '((user-agent . "GNU Guile")))
                      timeout)
@@ -79,6 +80,9 @@ textual.  Follow any HTTP redirection.  When BUFFERED? is #f, return an
 unbuffered port, suitable for use in `filtered-port'.  HEADERS is an alist of
 extra HTTP headers.
 
+When KEEP-ALIVE? is true, the connection is marked as 'keep-alive' and PORT is
+not closed upon completion.
+
 When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates.
 
 TIMEOUT specifies the timeout in seconds for connection establishment; when
@@ -104,11 +108,7 @@ Raise an '&http-get-error' condition if downloading fails."
         (setvbuf port 'none))
       (let*-values (((resp data)
                      (http-get uri #:streaming? #t #:port port
-                               ;; XXX: When #:keep-alive? is true, if DATA is
-                               ;; a chunked-encoding port, closing DATA won't
-                               ;; close PORT, leading to a file descriptor
-                               ;; leak.
-                               #:keep-alive? #f
+                               #:keep-alive? keep-alive?
                                #:headers headers))
                     ((code)
                      (response-code resp)))
diff --git a/guix/progress.scm b/guix/progress.scm
index fec65b424c..cd80ae620a 100644
--- a/guix/progress.scm
+++ b/guix/progress.scm
@@ -337,9 +337,10 @@ should be a <progress-reporter> object."
               (report total)
               (loop total (get-bytevector-n! in buffer 0 buffer-size))))))))
 
-(define (progress-report-port reporter port)
+(define* (progress-report-port reporter port #:key (close? #t))
   "Return a port that continuously reports the bytes read from PORT using
-REPORTER, which should be a <progress-reporter> object."
+REPORTER, which should be a <progress-reporter> object.  When CLOSE? is true,
+PORT is closed when the returned port is closed."
   (match reporter
     (($ <progress-reporter> start report stop)
      (let* ((total 0)
@@ -364,5 +365,6 @@ REPORTER, which should be a <progress-reporter> object."
                                         ;; trace.
                                         (unless (zero? total)
                                           (stop))
-                                        (close-port port)))))))
+                                        (when close?
+                                          (close-port port))))))))
 
diff --git a/guix/scripts/substitute.scm b/guix/scripts/substitute.scm
index 73abd3f029..25075eedff 100755
--- a/guix/scripts/substitute.scm
+++ b/guix/scripts/substitute.scm
@@ -188,9 +188,14 @@ again."
         (sigaction SIGALRM SIG_DFL)
         (apply values result)))))
 
-(define* (fetch uri #:key (buffered? #t) (timeout? #t))
+(define* (fetch uri #:key (buffered? #t) (timeout? #t)
+                (keep-alive? #f) (port #f))
   "Return a binary input port to URI and the number of bytes it's expected to
-provide."
+provide.
+
+When PORT is true, use it as the underlying I/O port for HTTP transfers; when
+PORT is false, open a new connection for URI.  When KEEP-ALIVE? is true, the
+connection (typically PORT) is kept open once data has been fetched from URI."
   (case (uri-scheme uri)
     ((file)
      (let ((port (open-file (uri-path uri)
@@ -206,7 +211,7 @@ provide."
        ;;   sudo tc qdisc add dev eth0 root netem delay 1500ms
        ;; and then cancel with:
        ;;   sudo tc qdisc del dev eth0 root
-       (let ((port #f))
+       (let ((port port))
          (with-timeout (if timeout?
                            %fetch-timeout
                            0)
@@ -217,10 +222,11 @@ provide."
            (begin
              (when (or (not port) (port-closed? port))
                (set! port (guix:open-connection-for-uri
-                           uri #:verify-certificate? #f))
-               (unless (or buffered? (not (file-port? port)))
-                 (setvbuf port 'none)))
+                           uri #:verify-certificate? #f)))
+             (unless (or buffered? (not (file-port? port)))
+               (setvbuf port 'none))
              (http-fetch uri #:text? #f #:port port
+                         #:keep-alive? keep-alive?
                          #:verify-certificate? #f))))))
     (else
      (leave (G_ "unsupported substitute URI scheme: ~a~%")
@@ -478,17 +484,17 @@ indicates that PATH is unavailable at CACHE-URL."
     (build-request (string->uri url) #:method 'GET #:headers headers)))
 
 (define (at-most max-length lst)
-  "If LST is shorter than MAX-LENGTH, return it; otherwise return its
-MAX-LENGTH first elements."
+  "If LST is shorter than MAX-LENGTH, return it and the empty list; otherwise
+return its MAX-LENGTH first elements and its tail."
   (let loop ((len 0)
              (lst lst)
              (result '()))
     (match lst
       (()
-       (reverse result))
+       (values (reverse result) '()))
       ((head . tail)
        (if (>= len max-length)
-           (reverse result)
+           (values (reverse result) lst)
            (loop (+ 1 len) tail (cons head result)))))))
 
 (define* (http-multiple-get base-uri proc seed requests
@@ -962,6 +968,68 @@ the URI, its compression method (a string), and the compressed file size."
     (((uri compression file-size) _ ...)
      (values uri compression file-size))))
 
+(define %max-cached-connections
+  ;; Maximum number of connections kept in cache by
+  ;; 'open-connection-for-uri/cached'.
+  16)
+
+(define open-connection-for-uri/cached
+  (let ((cache '()))
+    (lambda* (uri #:key fresh?)
+      "Return a connection for URI, possibly reusing a cached connection.
+When FRESH? is true, delete any cached connections for URI and open a new
+one.  Return #f if URI's scheme is 'file' or #f."
+      (define host (uri-host uri))
+      (define scheme (uri-scheme uri))
+      (define key (list host scheme (uri-port uri)))
+
+      (and (not (memq scheme '(file #f)))
+           (match (assoc-ref cache key)
+             (#f
+              ;; Open a new connection to URI and evict old entries from
+              ;; CACHE, if any.
+              (let-values (((socket)
+                            (guix:open-connection-for-uri
+                             uri #:verify-certificate? #f))
+                           ((new-cache evicted)
+                            (at-most (- %max-cached-connections 1) cache)))
+                (for-each (match-lambda
+                            ((_ . port)
+                             (false-if-exception (close-port port))))
+                          evicted)
+                (set! cache (alist-cons key socket new-cache))
+                socket))
+             (socket
+              (if (or fresh? (port-closed? socket))
+                  (begin
+                    (false-if-exception (close-port socket))
+                    (set! cache (alist-delete key cache))
+                    (open-connection-for-uri/cached uri))
+                  (begin
+                    ;; Drain input left from the previous use.
+                    (drain-input socket)
+                    socket))))))))
+
+(define (call-with-cached-connection uri proc)
+  (let ((port (open-connection-for-uri/cached uri)))
+    (catch #t
+      (lambda ()
+        (proc port))
+      (lambda (key . args)
+        ;; If PORT was cached and the server closed the connection in the
+        ;; meantime, we get EPIPE.  In that case, open a fresh connection and
+        ;; retry.  We might also get 'bad-response or a similar exception from
+        ;; (web response) later on, once we've sent the request.
+        (if (or (and (eq? key 'system-error)
+                     (= EPIPE (system-error-errno `(,key ,@args))))
+                (memq key '(bad-response bad-header bad-header-component)))
+            (proc (open-connection-for-uri/cached uri #:fresh? #t))
+            (apply throw key args))))))
+
+(define-syntax-rule (with-cached-connection uri port exp ...)
+  "Bind PORT with EXP... to a socket connected to URI."
+  (call-with-cached-connection uri (lambda (port) exp ...)))
+
 (define* (process-substitution store-item destination
                                #:key cache-urls acl print-build-trace?)
   "Substitute STORE-ITEM (a store file name) from CACHE-URLS, and write it to
@@ -984,10 +1052,12 @@ DESTINATION as a nar file.  Verify the substitute against ACL."
               (G_ "Downloading ~a...~%") (uri->string uri)))
 
     (let*-values (((raw download-size)
-                   ;; Note that Hydra currently generates Nars on the fly
-                   ;; and doesn't specify a Content-Length, so
-                   ;; DOWNLOAD-SIZE is #f in practice.
-                   (fetch uri #:buffered? #f #:timeout? #f))
+                   ;; 'guix publish' without '--cache' doesn't specify a
+                   ;; Content-Length, so DOWNLOAD-SIZE is #f in this case.
+                   (with-cached-connection uri port
+                     (fetch uri #:buffered? #f #:timeout? #f
+                            #:port port
+                            #:keep-alive? #t)))
                   ((progress)
                    (let* ((dl-size  (or download-size
                                         (and (equal? compression "none")
@@ -1001,7 +1071,9 @@ DESTINATION as a nar file.  Verify the substitute against ACL."
                                          (uri->string uri) dl-size
                                          (current-error-port)
                                          #:abbreviation nar-uri-abbreviation))))
-                     (progress-report-port reporter raw)))
+                     ;; Keep RAW open upon completion so we can later reuse
+                     ;; the underlying connection.
+                     (progress-report-port reporter raw #:close? #f)))
                   ((input pids)
                    ;; NOTE: This 'progress' port of current process will be
                    ;; closed here, while the child process doing the
@@ -1218,6 +1290,7 @@ default value."
 
 ;;; Local Variables:
 ;;; eval: (put 'with-timeout 'scheme-indent-function 1)
+;;; eval: (put 'with-cached-connection 'scheme-indent-function 2)
 ;;; End:
 
 ;;; substitute.scm ends here
diff --git a/nix/libstore/build.cc b/nix/libstore/build.cc
index 50d300253d..6cfe7aba7e 100644
--- a/nix/libstore/build.cc
+++ b/nix/libstore/build.cc
@@ -3114,17 +3114,24 @@ void SubstitutionGoal::handleChildOutput(int fd, const string & data)
     }
 
     if (fd == substituter->fromAgent.readSide) {
-	/* Trim whitespace to the right.  */
-	size_t end = data.find_last_not_of(" \t\n");
-	string trimmed = (end != string::npos) ? data.substr(0, end + 1) : data;
-
-	if (expectedHashStr == "") {
-	    expectedHashStr = trimmed;
-	} else if (status == "") {
-	    status = trimmed;
-	    worker.wakeUp(shared_from_this());
-	} else {
-	    printMsg(lvlError, format("unexpected substituter message '%1%'") % data);
+	/* DATA may consist of several lines.  Process them one by one.  */
+	string input = data;
+	while (!input.empty()) {
+	    /* Process up to the first newline.  */
+	    size_t end = input.find_first_of("\n");
+	    string trimmed = (end != string::npos) ? input.substr(0, end) : input;
+
+	    /* Update the goal's state accordingly.  */
+	    if (expectedHashStr == "") {
+		expectedHashStr = trimmed;
+	    } else if (status == "") {
+		status = trimmed;
+		worker.wakeUp(shared_from_this());
+	    } else {
+		printMsg(lvlError, format("unexpected substituter message '%1%'") % input);
+	    }
+
+	    input = (end != string::npos) ? input.substr(end + 1) : "";
 	}
     }
 }