Remove workers hash table.

* src/cuirass/database.scm (db-add-worker): Rename ...
(db-add-or-update-worker): ... into this new procedure.
(db-get-worker, db-remove-unresponsive-workers): New procedures.
* src/cuirass/remote-server.scm (%workers): Remove it.
(pop-build): Adapt it.
(remove-unresponsive-workers!): Remove it.
(read-worker-exp): Adapt it.
(zmq-start-proxy): Ditto.
* tests/database.scm ("db-add-worker"): Rename ...
("db-add-or-update-worker"): ... into this new test.
("db-get-worker", "db-remove-unresponsive-workers"): New tests.
This commit is contained in:
Mathieu Othacehe 2021-01-31 10:31:01 +01:00
parent b993f3d433
commit e9e0943945
No known key found for this signature in database
GPG Key ID: 8354763531769CA6
3 changed files with 48 additions and 38 deletions

View File

@ -88,8 +88,10 @@
db-get-builds-max
db-get-evaluation-specification
db-get-build-product-path
db-add-worker
db-add-or-update-worker
db-get-worker
db-get-workers
db-remove-unresponsive-workers
db-clear-workers
db-clear-build-queue
;; Parameters.
@ -1370,7 +1372,7 @@ WHERE id = " id))
((path) path)
(else #f))))
(define (db-add-worker worker)
(define (db-add-or-update-worker worker)
"Insert WORKER into Worker table."
(with-db-worker-thread db
(exec-query/bind db "\
@ -1380,7 +1382,24 @@ VALUES ("
(worker-address worker) ", "
(worker-machine worker) ", "
(string-join (worker-systems worker) ",") ", "
(worker-last-seen worker) ");")))
(worker-last-seen worker) ")
ON CONFLICT(name) DO UPDATE
SET last_seen = " (worker-last-seen worker) ";")))
(define (db-get-worker name)
"Return the worker with the given NAME."
(with-db-worker-thread db
(match (expect-one-row
(exec-query/bind db "
SELECT name, address, machine, systems, last_seen from Workers
WHERE name = " name ";"))
((name address machine systems last-seen)
(worker
(name name)
(address address)
(machine machine)
(systems (string-split systems #\,))
(last-seen last-seen))))))
(define (db-get-workers)
"Return the workers in Workers table."
@ -1401,6 +1420,11 @@ SELECT name, address, machine, systems, last_seen from Workers"))
(last-seen last-seen))
workers)))))))
(define (db-remove-unresponsive-workers timeout)
(with-db-worker-thread db
(exec-query/bind db "DELETE FROM Workers WHERE
(extract(epoch from now())::int - last_seen) > " timeout ";")))
(define (db-clear-workers)
"Remove all workers from Workers table."
(with-db-worker-thread db

View File

@ -158,15 +158,11 @@ Start a remote build server.\n"))
;;; Build workers.
;;;
(define %workers
;; Set of connected workers.
(make-hash-table))
(define (pop-build name)
(define (random-system systems)
(list-ref systems (random (length systems))))
(let ((worker (hash-ref %workers name)))
(let ((worker (db-get-worker name)))
(and worker
(let ((system (random-system
(worker-systems worker))))
@ -177,36 +173,18 @@ Start a remote build server.\n"))
((build) build)
(() #f))))))
(define (remove-unresponsive-workers!)
(let ((unresponsive
(hash-fold (lambda (key value old)
(let* ((last-seen (worker-last-seen value))
(diff (- (current-time) last-seen)))
(if (> diff (%worker-timeout))
(cons key old)
old)))
'()
%workers)))
(for-each (lambda (worker)
(hash-remove! %workers worker))
unresponsive)))
(define* (read-worker-exp exp #:key reply-worker)
"Read the given EXP sent by a worker. REPLY-WORKER is a procedure that can
be used to reply to the worker."
(define (update-workers! base-worker proc)
(define (update-worker! base-worker)
(let* ((worker* (worker
(inherit (sexp->worker base-worker))
(last-seen (current-time))))
(name (worker-name worker*)))
(proc name)
(hash-set! %workers name worker*)))
(last-seen (current-time)))))
(db-add-or-update-worker worker*)))
(match (zmq-read-message exp)
(('worker-ready worker)
(update-workers! worker
(lambda (name)
(log-message (G_ "Worker `~a' is ready.") name))))
(update-worker! worker))
(('worker-request-work name)
(let ((build (pop-build name)))
(if build
@ -224,11 +202,7 @@ be used to reply to the worker."
(reply-worker
(zmq-no-build-message)))))
(('worker-ping worker)
(update-workers! worker (const #t))
(db-clear-workers)
(hash-for-each (lambda (key value)
(db-add-worker value))
%workers))
(update-worker! worker))
(('build-started ('drv drv) ('worker worker))
(let ((log-file (log-path (%cache-directory) drv)))
(log-message "build started: '~a' on ~a." drv worker)
@ -387,7 +361,7 @@ frontend to the workers connected through the TCP backend."
(zmq-send-bytevector fetch-socket rest)
(read-worker-exp (bv->string rest)
#:reply-worker reply-worker))))))
(remove-unresponsive-workers!)
(db-remove-unresponsive-workers (%worker-timeout))
(loop)))))

View File

@ -327,14 +327,26 @@ timestamp, checkouttime, evaltime) VALUES ('guix', 0, 0, 0, 0);")
"path"
(db-get-build-product-path 1))
(test-equal "db-add-worker"
(test-equal "db-add-or-update-worker"
1
(db-add-worker %dummy-worker))
(begin
(db-add-or-update-worker %dummy-worker)
(db-add-or-update-worker %dummy-worker)))
(test-equal "db-get-worker"
%dummy-worker
(db-get-worker "worker"))
(test-equal "db-get-workers"
(list %dummy-worker)
(db-get-workers))
(test-equal "db-remove-unresponsive-workers"
'()
(begin
(db-remove-unresponsive-workers 50)
(db-get-workers)))
(test-equal "db-clear-workers"
'()
(begin