[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
15/33: Add more fibers utilities
From: |
Christopher Baines |
Subject: |
15/33: Add more fibers utilities |
Date: |
Wed, 14 Aug 2024 05:01:28 -0400 (EDT) |
cbaines pushed a commit to branch master
in repository data-service.
commit 5439159a169661ee4507fa2f565c38e2b14398d8
Author: Christopher Baines <mail@cbaines.net>
AuthorDate: Fri Jul 19 17:07:10 2024 +0100
Add more fibers utilities
---
guix-data-service/utils.scm | 88 +++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 88 insertions(+)
diff --git a/guix-data-service/utils.scm b/guix-data-service/utils.scm
index a9e8f39..736e24d 100644
--- a/guix-data-service/utils.scm
+++ b/guix-data-service/utils.scm
@@ -17,7 +17,9 @@
(define-module (guix-data-service utils)
#:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-9)
#:use-module (srfi srfi-11)
+ #:use-module (srfi srfi-71)
#:use-module (ice-9 ftw)
#:use-module (ice-9 match)
#:use-module (ice-9 atomic)
@@ -46,6 +48,12 @@
with-resource-from-pool
resource-pool-stats
+ fibers-delay
+ fibers-force
+
+ fibers-batch-for-each
+ fibers-for-each
+
parallel-via-fibers
par-map&
letpar&
@@ -456,6 +464,86 @@ available. Return the resource once PROC has returned."
(raise-exception
(make-resource-pool-timeout-error))))))
+(define-record-type <fibers-promise>
+ (make-fibers-promise thunk values-box evaluated-condition)
+ fibers-promise?
+ (thunk fibers-promise-thunk)
+ (values-box fibers-promise-values-box)
+ (evaluated-condition fibers-promise-evaluated-condition))
+
+(define (fibers-delay thunk)
+ (make-fibers-promise
+ thunk
+ (make-atomic-box #f)
+ (make-condition)))
+
+(define (fibers-force fp)
+ (let ((res (atomic-box-compare-and-swap!
+ (fibers-promise-values-box fp)
+ #f
+ 'started)))
+ (if (eq? #f res)
+ (call-with-values
+ (lambda ()
+ (with-exception-handler
+ (lambda (exn)
+ (atomic-box-set! (fibers-promise-values-box fp)
+ exn)
+ (signal-condition!
+ (fibers-promise-evaluated-condition fp))
+ (raise-exception exn))
+ (fibers-promise-thunk fp)
+ #:unwind? #t))
+ (lambda vals
+ (atomic-box-set! (fibers-promise-values-box fp)
+ vals)
+ (signal-condition!
+ (fibers-promise-evaluated-condition fp))
+ (apply values vals)))
+ (if (eq? res 'started)
+ (begin
+ (wait (fibers-promise-evaluated-condition fp))
+ (let ((result (atomic-box-ref (fibers-promise-values-box fp))))
+ (if (exception? result)
+ (raise-exception result)
+ (apply values result))))
+ (if (exception? res)
+ (raise-exception res)
+ (apply values res))))))
+
+(define (fibers-batch-for-each proc batch-size . lists)
+ ;; Like split-at, but don't care about the order of the resulting lists, and
+ ;; don't error if the list is shorter than i elements
+ (define (split-at* lst i)
+ (let lp ((l lst) (n i) (acc '()))
+ (if (or (<= n 0) (null? l))
+ (values (reverse! acc) l)
+ (lp (cdr l) (- n 1) (cons (car l) acc)))))
+
+ ;; As this can be called with lists with tens of thousands of items in them,
+ ;; batch the
+ (define (get-batch lists)
+ (let ((split-lists
+ (map (lambda (lst)
+ (let ((batch rest (split-at* lst batch-size)))
+ (cons batch rest)))
+ lists)))
+ (values (map car split-lists)
+ (map cdr split-lists))))
+
+ (let loop ((lists lists))
+ (call-with-values
+ (lambda ()
+ (get-batch lists))
+ (lambda (batch rest)
+ (apply par-map& proc batch)
+ (unless (null? (car rest))
+ (loop rest)))))
+ *unspecified*)
+
+(define (fibers-for-each proc . lists)
+ (apply fibers-batch-for-each proc 20 lists))
+
(define (defer-to-parallel-fiber thunk)
(let ((reply (make-channel)))
(spawn-fiber
- 08/33: Use a bigger buffer for requests/responses, (continued)
- 08/33: Use a bigger buffer for requests/responses, Christopher Baines, 2024/08/14
- 10/33: Refactor opening store connections when processing jobs, Christopher Baines, 2024/08/14
- 04/33: Stop inserting missing source file nars, Christopher Baines, 2024/08/14
- 13/33: Return two values from channel->source-and-derivations-by-system, Christopher Baines, 2024/08/14
- 16/33: Rewrite the key parts of loading data to be even more parallel, Christopher Baines, 2024/08/14
- 18/33: Compute package derivations in chunks, Christopher Baines, 2024/08/14
- 01/33: Remove the statistics page, Christopher Baines, 2024/08/14
- 03/33: Try and speed up large package derivation comparisions, Christopher Baines, 2024/08/14
- 09/33: Re-work the fibers scheduling, Christopher Baines, 2024/08/14
- 12/33: Avoid long running store connections, Christopher Baines, 2024/08/14
- 15/33: Add more fibers utilities,
Christopher Baines <=
- 07/33: Stream the package derivation page since it can be large, Christopher Baines, 2024/08/14
- 11/33: Catch and ignore the new cross build derivation errors, Christopher Baines, 2024/08/14
- 19/33: Add worker thread utils, Christopher Baines, 2024/08/14
- 14/33: Insert package derivations earlier, Christopher Baines, 2024/08/14
- 26/33: Move inserting derivations in to the load-new-guix-revision module, Christopher Baines, 2024/08/14
- 21/33: Update tests, Christopher Baines, 2024/08/14
- 05/33: Log more timing infromation about inserting derivations, Christopher Baines, 2024/08/14
- 06/33: Add more time logging in to insert-missing-derivations, Christopher Baines, 2024/08/14
- 27/33: Get the test suite working again, Christopher Baines, 2024/08/14
- 20/33: Try using 2 threads for the server, Christopher Baines, 2024/08/14