[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[no subject]
From: |
Mathieu Othacehe |
Date: |
Sun, 13 Dec 2020 07:35:06 -0500 (EST) |
branch: wip-offload
commit c386f6943117da1ea8cb9fab0d7c4bdf435ccac3
Author: Mathieu Othacehe <othacehe@gnu.org>
AuthorDate: Wed Dec 9 17:52:27 2020 +0100
tmp
---
bin/cuirass.in | 9 +++++++
src/cuirass/base.scm | 12 +++++++++-
src/cuirass/database.scm | 42 ++++++++++++++++++++++++++++----
src/cuirass/http.scm | 16 +++++++++++++
src/cuirass/remote-server.scm | 36 +++++++++++++++++++++-------
src/cuirass/remote-worker.scm | 56 +++++++++++++++++++++++++++++++++----------
src/cuirass/remote.scm | 54 ++++++++++++++++++++++++++++++++++-------
src/cuirass/templates.scm | 42 +++++++++++++++++++++++++++++++-
src/schema.sql | 9 ++++++-
9 files changed, 241 insertions(+), 35 deletions(-)
diff --git a/bin/cuirass.in b/bin/cuirass.in
index 98bc061..7ba89db 100644
--- a/bin/cuirass.in
+++ b/bin/cuirass.in
@@ -212,6 +212,15 @@ exec ${GUILE:-@GUILE@} --no-auto-compile -e main -s "$0"
"$@"
"next evaluation in ~a seconds" interval)
(sleep interval)))))
+ (when (%build-remote?)
+ (spawn-fiber
+ (essential-task
+ 'request-workers exit-channel
+ (lambda ()
+ (while #t
+ (request-workers)
+ (sleep 60))))))
+
(spawn-fiber
(essential-task
'metrics exit-channel
diff --git a/src/cuirass/base.scm b/src/cuirass/base.scm
index 1a865e5..64bf524 100644
--- a/src/cuirass/base.scm
+++ b/src/cuirass/base.scm
@@ -72,6 +72,7 @@
process-specs
evaluation-log-file
with-build-offload-thread
+ request-workers
;; Parameters.
%package-cachedir
@@ -473,6 +474,8 @@ in the database."
#:retry? #f)
((drvs . systems)
(remote-build socket drvs systems))
+ ('workers
+ (remote-send-workers socket))
('timeout #f))
(loop))))))
channel))
@@ -485,6 +488,9 @@ in the database."
(define (build-derivations/offload drvs systems)
(put-message (%build-offload-channel) (cons drvs systems)))
+(define (request-workers)
+ (put-message (%build-offload-channel) 'workers))
+
;;;
;;; Building packages.
@@ -633,7 +639,7 @@ updating the database accordingly."
(log-message "bogus build-started event for '~a'" drv)))
(('build-remote drv host _ ...)
(log-message "'~a' offloaded to '~a'" drv host)
- (db-update-build-machine! drv host))
+ (db-update-build-worker! drv host))
(('build-succeeded drv _ ...)
(if (valid? drv)
(begin
@@ -655,6 +661,10 @@ updating the database accordingly."
(log-message "build failed: '~a'" drv)
(db-update-build-status! drv (build-status failed)))
(log-message "bogus build-failed event for '~a'" drv)))
+ (('workers workers)
+ (for-each (lambda (worker)
+ (db-add-worker (sexp->worker worker)))
+ workers))
(('substituter-started item _ ...)
(log-message "substituter started: '~a'" item))
(('substituter-succeeded item _ ...)
diff --git a/src/cuirass/database.scm b/src/cuirass/database.scm
index 4ef5229..3baaf61 100644
--- a/src/cuirass/database.scm
+++ b/src/cuirass/database.scm
@@ -24,6 +24,7 @@
(define-module (cuirass database)
#:use-module (cuirass logging)
#:use-module (cuirass config)
+ #:use-module (cuirass remote)
#:use-module (cuirass utils)
#:use-module (ice-9 match)
#:use-module (ice-9 format)
@@ -60,7 +61,7 @@
db-add-build-product
db-register-builds
db-update-build-status!
- db-update-build-machine!
+ db-update-build-worker!
db-get-output
db-get-inputs
db-get-build
@@ -82,6 +83,8 @@
db-get-evaluation-specification
db-get-build-product-path
db-get-build-products
+ db-add-worker
+ db-get-workers
db-get-evaluation-summary
db-get-checkouts
read-sql-file
@@ -803,10 +806,10 @@ log file for DRV."
(#:event . ,(assq-ref status-names
status)))))))))
-(define* (db-update-build-machine! drv machine)
- "Update the database so that DRV's machine is MACHINE."
+(define* (db-update-build-worker! drv worker)
+ "Update the database so that DRV's worker is WORKER."
(with-db-writer-worker-thread db
- (sqlite-exec db "UPDATE Builds SET machine=" machine
+ (sqlite-exec db "UPDATE Builds SET worker=" worker
"WHERE derivation=" drv ";")))
(define (db-get-output path)
@@ -965,6 +968,7 @@ CASE WHEN :borderlowid IS NULL THEN
(derivation . "Builds.derivation = :derivation")
(job . "Builds.job_name = :job")
(system . "Builds.system = :system")
+ (worker . "Builds.worker = :worker")
(evaluation . "Builds.evaluation = :evaluation")
(status . ,(match (assq-ref filters 'status)
(#f #f)
@@ -1413,3 +1417,33 @@ WHERE build = " build-id))
(#:checksum . ,checksum)
(#:path . ,path))
products)))))))
+
+(define (db-add-worker worker)
+ "Insert WORKER into Worker table."
+ (with-db-writer-worker-thread db
+ (sqlite-exec db "\
+INSERT OR REPLACE INTO Workers (name, address, systems, last_seen)
+VALUES ("
+ (worker-name worker) ", "
+ (worker-address worker) ", "
+ (worker-systems worker) ", "
+ (worker-last-seen worker) ");")
+ (last-insert-rowid db)))
+
+(define (db-get-workers)
+ "Return the workers in Workers table."
+ (with-db-worker-thread db
+ (let loop ((rows (sqlite-exec db "
+SELECT name, address, systems, last_seen from Workers"))
+ (workers '()))
+ (match rows
+ (() (reverse workers))
+ ((#(name address systems last-seen)
+ . rest)
+ (loop rest
+ (cons (worker
+ (name name)
+ (address address)
+ (systems (with-input-from-string systems read))
+ (last-seen last-seen))
+ workers)))))))
diff --git a/src/cuirass/http.scm b/src/cuirass/http.scm
index 99dc2ce..60f1a75 100644
--- a/src/cuirass/http.scm
+++ b/src/cuirass/http.scm
@@ -28,6 +28,7 @@
#:use-module (cuirass metrics)
#:use-module (cuirass utils)
#:use-module (cuirass logging)
+ #:use-module (cuirass remote)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-11)
#:use-module (srfi srfi-26)
@@ -661,6 +662,21 @@ Hydra format."
(respond-json-with-error 500 "No build found.")))
(respond-json-with-error 500 "Query parameter not provided."))))
+ (('GET "workers")
+ (respond-html
+ (html-page
+ "Workers status"
+ (let ((workers (db-get-workers)))
+ (workers-status
+ workers
+ (map (lambda (worker)
+ (let ((name (worker-name worker)))
+ (db-get-builds `((worker . ,name)
+ (status . started)
+ (order . status+submission-time)))))
+ workers)))
+ '())))
+
(('GET "metrics")
(respond-html
(metrics-page)))
diff --git a/src/cuirass/remote-server.scm b/src/cuirass/remote-server.scm
index 393ad44..f9d3be1 100644
--- a/src/cuirass/remote-server.scm
+++ b/src/cuirass/remote-server.scm
@@ -169,7 +169,7 @@ work for the worker with the given NAME."
(queues (find-system-queues systems)))
(q-pop! (random-queue queues))))
-(define* (read-client-exp client exp)
+(define* (read-client-exp client-socket client exp)
"Read the given EXP sent by CLIENT."
(catch 'system-error
(lambda ()
@@ -182,18 +182,34 @@ work for the worker with the given NAME."
;; targeted system. Also save the client ID in the queue to be able
;; to send it build events later on.
(q-push! (assoc-ref %build-queues system)
- (list client drv))))))
+ (list client drv))))
+ (('request-workers)
+ (zmq-send-msg-parts-bytevector
+ client-socket
+ (list client
+ (make-bytevector 0)
+ (string->bv
+ (zmq-workers
+ (hash-fold (lambda (key value old)
+ (cons (worker->sexp value) old))
+ '()
+ %workers))))))))
(const #f)))
(define* (read-worker-exp exp #:key reply-worker)
"Read the given EXP sent by a worker. REPLY-WORKER is a procedure that can
be used to reply to the worker."
+ (define (update-workers! worker proc)
+ (let* ((worker* (sexp->worker worker))
+ (name (worker-name worker*)))
+ (proc name)
+ (hash-set! %workers name worker*)))
+
(match (zmq-read-message exp)
(('worker-ready worker)
- (let* ((worker* (sexp->worker worker))
- (name (worker-name worker*)))
- (info (G_ "Worker `~a' is ready.~%") name)
- (hash-set! %workers name worker*)))
+ (update-workers! worker
+ (lambda (name)
+ (info (G_ "Worker `~a' is ready.~%") name))))
(('worker-request-work name)
(if (build-available? name)
(match (pop-random-build name)
@@ -201,7 +217,9 @@ be used to reply to the worker."
(reply-worker client (zmq-build-request-message drv))))
(reply-worker
(zmq-empty-delimiter)
- (zmq-no-build-message))))))
+ (zmq-no-build-message))))
+ (('worker-ping worker)
+ (update-workers! worker (const #t)))))
;;;
@@ -408,7 +426,9 @@ frontend to the workers connected through the TCP backend."
(when (zmq-socket-ready? items client-socket)
(match (zmq-get-msg-parts-bytevector client-socket)
((client empty rest)
- (read-client-exp client (bv->string rest)))))
+ (read-client-exp client-socket
+ client
+ (bv->string rest)))))
;; build-worker -> remote-server.
(when (zmq-socket-ready? items build-socket)
(match (zmq-get-msg-parts-bytevector build-socket)
diff --git a/src/cuirass/remote-worker.scm b/src/cuirass/remote-worker.scm
index fef7994..bc68fbe 100644
--- a/src/cuirass/remote-worker.scm
+++ b/src/cuirass/remote-worker.scm
@@ -156,7 +156,9 @@ given NAME."
(when (file-exists? cache)
(delete-file-recursively cache))))
-(define* (run-build drv service #:key reply)
+(define* (run-build drv service
+ #:key
+ reply worker)
"Build DRV and send messages upon build start, failure or completion to the
build server identified by SERVICE-NAME using the REPLY procedure.
@@ -166,10 +168,10 @@ still be substituted."
(with-store store
(let ((publish-url (service->publish-url service))
(local-publish-url (service->local-publish-url service))
- (local-address (avahi-service-local-address service)))
+ (name (worker-name worker)))
(add-substitute-url store publish-url)
(empty-cache!)
- (reply (zmq-build-started-message drv local-address))
+ (reply (zmq-build-started-message drv name))
(guard (c ((store-protocol-error? c)
(info (G_ "Derivation `~a' build failed: ~a~%")
drv (store-protocol-error-message c))
@@ -182,16 +184,41 @@ still be substituted."
(info (G_ "Derivation ~a build failed.~%") drv)
(reply (zmq-build-failed-message drv))))))))
-(define* (run-command command service #:key reply)
+(define* (run-command command service
+ #:key
+ reply worker)
"Run COMMAND. SERVICE-NAME is the name of the build server that sent the
command. REPLY is a procedure that can be used to reply to this server."
(match (zmq-read-message command)
(('build ('drv drv) ('system system))
(info (G_ "Building `~a' derivation.~%") drv)
- (run-build drv service #:reply reply))
+ (run-build drv service #:reply reply #:worker worker))
(('no-build)
#t)))
+(define (worker-ping base-worker service)
+ (define (ping socket)
+ (let ((worker (worker
+ (inherit base-worker)
+ (last-seen (current-time)))))
+ (zmq-send-msg-parts-bytevector
+ socket
+ (list (make-bytevector 0)
+ (string->bv
+ (zmq-worker-ping (worker->sexp worker)))))))
+
+ (call-with-new-thread
+ (lambda ()
+ (let* ((socket (zmq-dealer-socket))
+ (address (avahi-service-address service))
+ (port (avahi-service-port service))
+ (endpoint (zmq-backend-endpoint address port)))
+ (zmq-connect socket endpoint)
+ (let loop ()
+ (ping socket)
+ (sleep 10)
+ (loop))))))
+
(define (start-worker worker service)
"Start a worker thread named NAME, reading commands from the DEALER socket
and executing them. The worker can reply on the same socket."
@@ -225,12 +252,14 @@ and executing them. The worker can reply on the same
socket."
(endpoint (zmq-backend-endpoint address port)))
(zmq-connect socket endpoint)
(ready socket)
+ (worker-ping worker service)
(let loop ()
(request-work socket)
(match (zmq-get-msg-parts-bytevector socket '())
((empty client empty command)
(run-command (bv->string command) service
- #:reply (reply socket client))))
+ #:reply (reply socket client)
+ #:worker worker)))
(sleep 1)
(loop))))))
@@ -287,12 +316,15 @@ exiting."
(lambda (action service)
(case action
((new-service)
- (for-each (lambda (n)
- (start-worker (worker
- (name (generate-worker-name))
- (systems '("x86_64-linux")))
- service))
- (iota workers)))))
+ (for-each
+ (lambda (n)
+ (let ((address (avahi-service-local-address service)))
+ (start-worker (worker
+ (address address)
+ (name (generate-worker-name))
+ (systems '("x86_64-linux")))
+ service)))
+ (iota workers)))))
#:types (list remote-server-service-type)
#:stop-loop? (lambda ()
(atomic-box-ref %stop-process?))))))
diff --git a/src/cuirass/remote.scm b/src/cuirass/remote.scm
index c181813..dbf3cd5 100644
--- a/src/cuirass/remote.scm
+++ b/src/cuirass/remote.scm
@@ -33,8 +33,10 @@
#:use-module (ice-9 rdelim)
#:export (worker
worker?
+ worker-address
worker-name
worker-systems
+ worker-last-seen
worker->sexp
sexp->worker
generate-worker-name
@@ -53,12 +55,16 @@
zmq-build-started-message
zmq-build-failed-message
zmq-build-succeeded-message
+ zmq-worker-ping
zmq-worker-ready-message
zmq-worker-request-work-message
+ zmq-request-workers
+ zmq-workers
zmq-read-message
remote-server-service-type
remote-build-socket
+ remote-send-workers
remote-build
remote-build-poll))
@@ -70,25 +76,36 @@
(define-record-type* <worker>
worker make-worker
worker?
+ (address worker-address)
(name worker-name)
- (systems worker-systems))
+ (systems worker-systems)
+ (last-seen worker-last-seen
+ (default 0)))
(define (worker->sexp worker)
"Return an sexp describing WORKER."
- (let ((name (worker-name worker))
- (systems (worker-systems worker)))
+ (let ((address (worker-address worker))
+ (name (worker-name worker))
+ (systems (worker-systems worker))
+ (last-seen (worker-last-seen worker)))
`(worker
+ (address ,address)
(name ,name)
- (systems ,systems))))
+ (systems ,systems)
+ (last-seen ,last-seen))))
(define (sexp->worker sexp)
"Turn SEXP, an sexp as returned by 'worker->sexp', into a <worker> record."
(match sexp
- (('worker ('name name) ('systems systems))
+ (('worker ('address address)
+ ('name name)
+ ('systems systems)
+ ('last-seen last-seen))
(worker
+ (address address)
(name name)
- (systems systems)))))
-
+ (systems systems)
+ (last-seen last-seen)))))
(define %seed
(seed->random-state
@@ -227,6 +244,10 @@ retries a call to PROC."
"Return a message that indicates that the build of DRV is done."
(format #f "~s" `(build-succeeded (drv ,drv) (url ,url))))
+(define (zmq-worker-ping worker)
+ "Return a message that indicates that WORKER is alive."
+ (format #f "~s" `(worker-ping ,worker)))
+
(define (zmq-worker-ready-message worker)
"Return a message that indicates that WORKER is ready."
(format #f "~s" `(worker-ready ,worker)))
@@ -235,6 +256,14 @@ retries a call to PROC."
"Return a message that indicates that WORKER is requesting work."
(format #f "~s" `(worker-request-work ,name)))
+(define (zmq-request-workers)
+ "Return a message requesting the WORKERS list."
+ (format #f "~s" `(request-workers)))
+
+(define (zmq-workers workers)
+ "Return a message containing the WORKERS list."
+ (format #f "~s" `(workers ,workers)))
+
;;;
;;; Remote builds.
@@ -250,6 +279,13 @@ retries a call to PROC."
(zmq-connect socket endpoint)
socket))
+(define (remote-send-workers socket)
+ "Request the workers list on SOCKET."
+ (zmq-send-msg-parts-bytevector
+ socket
+ (list (make-bytevector 0)
+ (string->bv (zmq-request-workers)))))
+
(define* (remote-build socket drvs systems)
"Builds DRVS using the remote build mechanism. A build command is sent on
SOCKET to the build server for each derivation.
@@ -282,7 +318,9 @@ received, return if no event occured for TIMEOUT
milliseconds."
(('build-succeeded ('drv drv) ('url url))
(event-proc (list 'build-succeeded drv)))
(('build-failed ('drv drv))
- (event-proc (list 'build-failed drv)))))
+ (event-proc (list 'build-failed drv)))
+ (('workers workers)
+ (event-proc (list 'workers workers)))))
(let* ((poll-items (list
(poll-item socket ZMQ_POLLIN)))
diff --git a/src/cuirass/templates.scm b/src/cuirass/templates.scm
index 70737fc..5e1965a 100644
--- a/src/cuirass/templates.scm
+++ b/src/cuirass/templates.scm
@@ -34,6 +34,7 @@
#:use-module ((guix utils) #:select (string-replace-substring))
#:use-module ((cuirass database) #:select (build-status
evaluation-status))
+ #:use-module (cuirass remote)
#:export (html-page
specifications-table
evaluation-info-table
@@ -42,7 +43,8 @@
build-details
evaluation-build-table
running-builds-table
- global-metrics-content))
+ global-metrics-content
+ workers-status))
(define (navigation-items navigation)
(match navigation
@@ -137,6 +139,9 @@ system whose names start with " (code "guile-") ":" (br)
(href "/metrics"))
"Global metrics")
(a (@ (class "dropdown-item")
+ (href "/workers"))
+ "Workers status")
+ (a (@ (class "dropdown-item")
(href "/status"))
"Running builds")))
(li (@ (class "nav-item"))
@@ -1013,3 +1018,38 @@ completed builds divided by the time required to build
them.")
#:title "Pending builds"
#:labels '("Pending builds")
#:colors (list "#3e95cd")))))
+
+(define (workers-status workers builds)
+ (define (build-row build)
+ `(tr
+ (th (@ (scope "row"))
+ (a (@ (href "/build/" ,(assq-ref build #:id) "/details"))
+ ,(assq-ref build #:id)))
+ (td ,(assq-ref build #:job-name))
+ (td ,(time->string
+ (assq-ref build #:starttime)))
+ (td ,(assq-ref build #:system))))
+
+ (define (worker-header worker)
+ `((p ,(integer->char 128994)
+ " "
+ (b ,(worker-name worker))
+ ,(format #f " (~a, ~{~a ~})"
+ (worker-address worker)
+ (worker-systems worker)))))
+
+ (define (worker-table worker builds)
+ `(,@(worker-header worker)
+ (table
+ (@ (class "table table-sm table-hover table-striped"))
+ ,@(if (null? builds)
+ `((th (@ (scope "col")) "No elements here."))
+ `((thead (tr (th (@ (scope "col")) "ID")
+ (th (@ (scope "col")) "Job")
+ (th (@ (scope "col")) "Queued at")
+ (th (@ (scope "col")) "System")))
+ (tbody
+ ,(map build-row builds)))))))
+
+ `((p (@ (class "lead")) "Workers status")
+ ,@(map worker-table workers builds)))
diff --git a/src/schema.sql b/src/schema.sql
index 51d0c80..8558f54 100644
--- a/src/schema.sql
+++ b/src/schema.sql
@@ -61,7 +61,7 @@ CREATE TABLE Builds (
evaluation INTEGER NOT NULL,
job_name TEXT NOT NULL,
system TEXT NOT NULL,
- machine TEXT, --optional, machine performing the build.
+ worker TEXT, --optional, worker performing the build.
nix_name TEXT NOT NULL,
log TEXT NOT NULL,
status INTEGER NOT NULL,
@@ -96,6 +96,13 @@ CREATE TABLE Events (
event_json TEXT NOT NULL
);
+CREATE TABLE Workers (
+ name TEXT NOT NULL PRIMARY KEY,
+ address TEXT NOT NULL,
+ systems TEXT NOT NULL,
+ last_seen INTEGER NOT NULL
+);
+
-- XXX: All queries targeting Builds and Outputs tables *must* be covered by
-- an index. It is also preferable for the other tables.
CREATE INDEX Builds_status_index ON Builds (status);
- branch wip-offload created (now 3930230), Mathieu Othacehe, 2020/12/13
- [no subject], Mathieu Othacehe, 2020/12/13
- [no subject], Mathieu Othacehe, 2020/12/13
- [no subject], Mathieu Othacehe, 2020/12/13
- [no subject],
Mathieu Othacehe <=
- [no subject], Mathieu Othacehe, 2020/12/13
- [no subject], Mathieu Othacehe, 2020/12/13