guix-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

15/33: Add more fibers utilities


From: Christopher Baines
Subject: 15/33: Add more fibers utilities
Date: Wed, 14 Aug 2024 05:01:28 -0400 (EDT)

cbaines pushed a commit to branch master
in repository data-service.

commit 5439159a169661ee4507fa2f565c38e2b14398d8
Author: Christopher Baines <mail@cbaines.net>
AuthorDate: Fri Jul 19 17:07:10 2024 +0100

    Add more fibers utilities
---
 guix-data-service/utils.scm | 88 +++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 88 insertions(+)

diff --git a/guix-data-service/utils.scm b/guix-data-service/utils.scm
index a9e8f39..736e24d 100644
--- a/guix-data-service/utils.scm
+++ b/guix-data-service/utils.scm
@@ -17,7 +17,9 @@
 
 (define-module (guix-data-service utils)
   #:use-module (srfi srfi-1)
+  #:use-module (srfi srfi-9)
   #:use-module (srfi srfi-11)
+  #:use-module (srfi srfi-71)
   #:use-module (ice-9 ftw)
   #:use-module (ice-9 match)
   #:use-module (ice-9 atomic)
@@ -46,6 +48,12 @@
             with-resource-from-pool
             resource-pool-stats
 
+            fibers-delay
+            fibers-force
+
+            fibers-batch-for-each
+            fibers-for-each
+
             parallel-via-fibers
             par-map&
             letpar&
@@ -456,6 +464,86 @@ available.  Return the resource once PROC has returned."
           (raise-exception
            (make-resource-pool-timeout-error))))))
 
+(define-record-type <fibers-promise>
+  (make-fibers-promise thunk values-box evaluated-condition)
+  fibers-promise?
+  (thunk                fibers-promise-thunk)
+  (values-box           fibers-promise-values-box)
+  (evaluated-condition  fibers-promise-evaluated-condition))
+
+(define (fibers-delay thunk)
+  (make-fibers-promise
+   thunk
+   (make-atomic-box #f)
+   (make-condition)))
+
+(define (fibers-force fp)
+  (let ((res (atomic-box-compare-and-swap!
+              (fibers-promise-values-box fp)
+              #f
+              'started)))
+    (if (eq? #f res)
+        (call-with-values
+            (lambda ()
+              (with-exception-handler
+                  (lambda (exn)
+                    (atomic-box-set! (fibers-promise-values-box fp)
+                                     exn)
+                    (signal-condition!
+                     (fibers-promise-evaluated-condition fp))
+                    (raise-exception exn))
+                (fibers-promise-thunk fp)
+                #:unwind? #t))
+          (lambda vals
+            (atomic-box-set! (fibers-promise-values-box fp)
+                             vals)
+            (signal-condition!
+             (fibers-promise-evaluated-condition fp))
+            (apply values vals)))
+        (if (eq? res 'started)
+            (begin
+              (wait (fibers-promise-evaluated-condition fp))
+              (let ((result (atomic-box-ref (fibers-promise-values-box fp))))
+                (if (exception? result)
+                    (raise-exception result)
+                    (apply values result))))
+            (if (exception? res)
+                (raise-exception res)
+                (apply values res))))))
+
+(define (fibers-batch-for-each proc batch-size . lists)
+  ;; Like split-at, but don't care about the order of the resulting lists, and
+  ;; don't error if the list is shorter than i elements
+  (define (split-at* lst i)
+    (let lp ((l lst) (n i) (acc '()))
+      (if (or (<= n 0) (null? l))
+          (values (reverse! acc) l)
+          (lp (cdr l) (- n 1) (cons (car l) acc)))))
+
+  ;; As this can be called with lists with tens of thousands of items in them,
+  ;; batch the
+  (define (get-batch lists)
+    (let ((split-lists
+           (map (lambda (lst)
+                  (let ((batch rest (split-at* lst batch-size)))
+                    (cons batch rest)))
+                lists)))
+      (values (map car split-lists)
+              (map cdr split-lists))))
+
+  (let loop ((lists lists))
+    (call-with-values
+        (lambda ()
+          (get-batch lists))
+      (lambda (batch rest)
+        (apply par-map& proc batch)
+        (unless (null? (car rest))
+          (loop  rest)))))
+  *unspecified*)
+
+(define (fibers-for-each proc . lists)
+  (apply fibers-batch-for-each proc 20 lists))
+
 (define (defer-to-parallel-fiber thunk)
   (let ((reply (make-channel)))
     (spawn-fiber



reply via email to

[Prev in Thread] Current Thread [Next in Thread]