guix-cuirass/src/cuirass/remote-server.scm

498 lines
17 KiB
Scheme
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

;;; remote-server.scm -- Remote build server.
;;; Copyright © 2020 Mathieu Othacehe <othacehe@gnu.org>
;;;
;;; This file is part of Cuirass.
;;;
;;; GNU Guix is free software; you can redistribute it and/or modify it
;;; under the terms of the GNU General Public License as published by
;;; the Free Software Foundation; either version 3 of the License, or (at
;;; your option) any later version.
;;;
;;; GNU Guix is distributed in the hope that it will be useful, but
;;; WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;;; GNU General Public License for more details.
;;;
;;; You should have received a copy of the GNU General Public License
;;; along with GNU Guix. If not, see <http://www.gnu.org/licenses/>.
(define-module (cuirass remote-server)
#:use-module (cuirass base)
#:use-module (cuirass config)
#:use-module (cuirass database)
#:use-module (cuirass logging)
#:use-module (cuirass remote)
#:use-module (cuirass utils)
#:use-module (gcrypt pk-crypto)
#:use-module (guix avahi)
#:use-module (guix base32)
#:use-module (guix base64)
#:use-module (guix config)
#:use-module (guix derivations)
#:use-module (guix records)
#:use-module (guix packages)
#:use-module (guix pki)
#:use-module (guix scripts)
#:use-module ((guix store)
#:select (current-build-output-port
ensure-path
store-protocol-error?
with-store))
#:use-module (guix ui)
#:use-module (guix utils)
#:use-module (guix workers)
#:use-module (guix build download)
#:use-module (guix build syscalls)
#:use-module ((guix build utils) #:select (mkdir-p))
#:use-module (gcrypt hash)
#:use-module (gcrypt pk-crypto)
#:use-module (simple-zmq)
#:use-module (rnrs bytevectors)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-26)
#:use-module (srfi srfi-34)
#:use-module (srfi srfi-37)
#:use-module (ice-9 atomic)
#:use-module (ice-9 match)
#:use-module (ice-9 q)
#:use-module (ice-9 rdelim)
#:use-module (ice-9 regex)
#:use-module (ice-9 threads)
#:export (remote-server))
;; Indicate if the process has to be stopped.
(define %stop-process?
(make-atomic-box #f))
(define %cache-directory
(make-parameter #f))
(define %trigger-substitute-url
(make-parameter #f))
(define %private-key
(make-parameter #f))
(define %public-key
(make-parameter #f))
(define service-name
"Cuirass remote server")
(define (show-help)
(format #t (G_ "Usage: remote-server [OPTION]...
Start a remote build server.\n"))
(display (G_ "
-b, --backend-port=PORT listen worker connections on PORT"))
(display (G_ "
-l, --log-port=PORT listen build logs on PORT"))
(display (G_ "
-p, --publish-port=PORT publish substitutes on PORT"))
(display (G_ "
-D, --database=DB Use DB to read and store build results"))
(display (G_ "
-c, --cache=DIRECTORY cache built items to DIRECTORY"))
(display (G_ "
-t, --trigger-substitute-url=URL
trigger substitute baking at URL"))
(display (G_ "
-u, --user=USER change privileges to USER as soon as possible"))
(display (G_ "
--public-key=FILE use FILE as the public key for signatures"))
(display (G_ "
--private-key=FILE use FILE as the private key for signatures"))
(newline)
(display (G_ "
-h, --help display this help and exit"))
(display (G_ "
-V, --version display version information and exit"))
(newline)
(show-bug-report-information))
(define %options
(list (option '(#\h "help") #f #f
(lambda _
(show-help)
(exit 0)))
(option '(#\V "version") #f #f
(lambda _
(show-version-and-exit "guix publish")))
(option '(#\b "backend-port") #t #f
(lambda (opt name arg result)
(alist-cons 'backend-port (string->number* arg) result)))
(option '(#\l "log-port") #t #f
(lambda (opt name arg result)
(alist-cons 'log-port (string->number* arg) result)))
(option '(#\p "publish-port") #t #f
(lambda (opt name arg result)
(alist-cons 'publish-port (string->number* arg) result)))
(option '(#\D "database") #t #f
(lambda (opt name arg result)
(alist-cons 'database arg result)))
(option '(#\c "cache") #t #f
(lambda (opt name arg result)
(alist-cons 'cache arg result)))
(option '(#\t "trigger-substitute-url") #t #f
(lambda (opt name arg result)
(alist-cons 'trigger-substitute-url arg result)))
(option '(#\u "user") #t #f
(lambda (opt name arg result)
(alist-cons 'user arg result)))
(option '("public-key") #t #f
(lambda (opt name arg result)
(alist-cons 'public-key-file arg result)))
(option '("private-key") #t #f
(lambda (opt name arg result)
(alist-cons 'private-key-file arg result)))))
(define %default-options
`((backend-port . 5555)
(log-port . 5556)
(publish-port . 5557)
(public-key-file . ,%public-key-file)
(private-key-file . ,%private-key-file)))
;;;
;;; Build workers.
;;;
(define %workers
;; Set of connected workers.
(make-hash-table))
(define (pop-build name)
(define (random-system systems)
(list-ref systems (random (length systems))))
(let ((worker (hash-ref %workers name)))
(and worker
(let ((system (random-system
(worker-systems worker))))
(match (db-get-builds `((status . scheduled)
(system . ,system)
(order . priority+timestamp)
(nr . 1)))
((build) build)
(() #f))))))
(define (remove-unresponsive-workers!)
(let ((unresponsive
(hash-fold (lambda (key value old)
(let* ((last-seen (worker-last-seen value))
(diff (- (current-time) last-seen)))
(if (> diff (%worker-timeout))
(cons key old)
old)))
'()
%workers)))
(for-each (lambda (worker)
(hash-remove! %workers worker))
unresponsive)))
(define* (read-worker-exp exp #:key reply-worker)
"Read the given EXP sent by a worker. REPLY-WORKER is a procedure that can
be used to reply to the worker."
(define (update-workers! base-worker proc)
(let* ((worker* (worker
(inherit (sexp->worker base-worker))
(last-seen (current-time))))
(name (worker-name worker*)))
(proc name)
(hash-set! %workers name worker*)))
(match (zmq-read-message exp)
(('worker-ready worker)
(update-workers! worker
(lambda (name)
(log-message (G_ "Worker `~a' is ready.") name))))
(('worker-request-work name)
(let ((build (pop-build name)))
(if build
(let ((derivation (assq-ref build #:derivation))
(priority (assq-ref build #:priority))
(timeout (assq-ref build #:timeout))
(max-silent (assq-ref build #:max-silent)))
(db-update-build-worker! derivation name)
(db-update-build-status! derivation (build-status submitted))
(reply-worker
(zmq-build-request-message derivation
#:priority priority
#:timeout timeout
#:max-silent max-silent)))
(reply-worker
(zmq-no-build-message)))))
(('worker-ping worker)
(update-workers! worker (const #t))
(db-clear-workers)
(hash-for-each (lambda (key value)
(db-add-worker value))
%workers))
(('build-started ('drv drv) ('worker worker))
(let ((log-file (log-path (%cache-directory) drv)))
(log-message "build started: '~a' on ~a." drv worker)
(db-update-build-worker! drv worker)
(db-update-build-status! drv (build-status started)
#:log-file log-file)))))
;;;
;;; Fetch workers.
;;;
(define (zmq-fetch-workers-endpoint)
"inproc://fetch-workers")
(define (zmq-fetch-worker-socket)
"Return a socket used to communicate with the fetch workers."
(let ((socket (zmq-create-socket %zmq-context ZMQ_PULL))
(endpoint (zmq-fetch-workers-endpoint)))
(zmq-connect socket endpoint)
socket))
(define (url-fetch* url file)
(parameterize ((current-output-port (%make-void-port "w"))
(current-error-port (%make-void-port "w")))
(url-fetch url file)))
(define (publish-narinfo-url publish-url store-hash)
"Return the URL of STORE-HASH narinfo file on PUBLISH-URL."
(let ((hash (and=> (string-index store-hash #\-)
(cut string-take store-hash <>))))
(format #f "~a/~a.narinfo" publish-url hash)))
(define (ensure-path* store output)
(guard (c ((store-protocol-error? c)
(log-message "Failed to add ~a to store." output)
#f))
(ensure-path store output)))
(define (add-to-store outputs url)
"Add the OUTPUTS that are available from the substitute server at URL to the
store."
(parameterize ((current-build-output-port (%make-void-port "w")))
(with-store store
(set-build-options* store url)
(for-each (lambda (output)
(ensure-path* store output))
(map derivation-output-path outputs)))))
(define (trigger-substitutes-baking outputs url)
(for-each (lambda (output)
(let* ((path (derivation-output-path output))
(store-hash (strip-store-prefix path))
(narinfo-url (publish-narinfo-url url store-hash)))
(call-with-temporary-output-file
(lambda (tmp-file port)
(url-fetch* narinfo-url tmp-file)))))
outputs))
(define (need-fetching? message)
"Return #t if the received MESSAGE implies that some output fetching is
required and #f otherwise."
(match (zmq-read-message message)
(('build-succeeded _ ...)
#t)
(('build-failed _ ...)
#t)
(else #f)))
(define* (run-fetch message)
"Read MESSAGE and download the corresponding build outputs. If
%CACHE-DIRECTORY is set, download the matching NAR and NARINFO files in this
directory."
(define (build-outputs drv)
(catch 'system-error
(lambda ()
(map (match-lambda
((output-name . output)
output))
(derivation-outputs
(read-derivation-from-file drv))))
(const '())))
(match (zmq-read-message message)
(('build-succeeded ('drv drv) ('url url) _ ...)
(let ((outputs (build-outputs drv)))
(add-to-store outputs url)
(when (%trigger-substitute-url)
(trigger-substitutes-baking outputs (%trigger-substitute-url)))
(log-message "build succeeded: '~a'" drv)
(set-build-successful! drv)))
(('build-failed ('drv drv) ('url url) _ ...)
(log-message "build failed: '~a'" drv)
(db-update-build-status! drv (build-status failed)))))
(define (start-fetch-worker name)
"Start a fetch worker thread with the given NAME. This worker takes care of
downloading build outputs. It communicates with the remote server using a ZMQ
socket."
(call-with-new-thread
(lambda ()
(set-thread-name name)
(let ((socket (zmq-fetch-worker-socket)))
(let loop ()
(match (zmq-get-msg-parts-bytevector socket)
((message)
(run-fetch (bv->string message))))
(loop))))))
;;;
;;; ZMQ connection.
;;;
(define %zmq-context
(zmq-create-context))
(define (zmq-backend-endpoint backend-port)
"Return a ZMQ endpoint string allowing TCP connections on BACKEND-PORT from
all network interfaces."
(string-append "tcp://*:" (number->string backend-port)))
(define (zmq-start-proxy backend-port)
"This procedure starts a proxy between client connections from the IPC
frontend to the workers connected through the TCP backend."
(define (socket-ready? items socket)
(find (lambda (item)
(eq? (poll-item-socket item) socket))
items))
(let* ((build-socket
(zmq-create-socket %zmq-context ZMQ_ROUTER))
(fetch-socket
(zmq-create-socket %zmq-context ZMQ_PUSH))
(poll-items (list
(poll-item build-socket ZMQ_POLLIN))))
(zmq-bind-socket build-socket (zmq-backend-endpoint backend-port))
(zmq-bind-socket fetch-socket (zmq-fetch-workers-endpoint))
;; Do not use the built-in zmq-proxy as we want to edit the envelope of
;; frontend messages before forwarding them to the backend.
(let loop ()
(let ((items (zmq-poll* poll-items 1000)))
(when (zmq-socket-ready? items build-socket)
(match (zmq-get-msg-parts-bytevector build-socket)
((worker empty rest)
(let ((reply-worker
(lambda (message)
(zmq-send-msg-parts-bytevector
build-socket
(list worker
(zmq-empty-delimiter)
(string->bv message))))))
(if (need-fetching? (bv->string rest))
(zmq-send-bytevector fetch-socket rest)
(read-worker-exp (bv->string rest)
#:reply-worker reply-worker))))))
(remove-unresponsive-workers!)
(loop)))))
;;;
;;; Entry point.
;;;
;; The PID of the publish process.
(define %publish-pid
(make-atomic-box #f))
;; The thread running the Avahi publish service.
(define %avahi-thread
(make-atomic-box #f))
(define (signal-handler)
"Catch SIGINT to stop the Avahi event loop and the publish process before
exiting."
(sigaction SIGINT
(lambda (signum)
(let ((publish-pid (atomic-box-ref %publish-pid))
(avahi-thread (atomic-box-ref %avahi-thread)))
(atomic-box-set! %stop-process? #t)
(and publish-pid
(begin
(kill publish-pid SIGHUP)
(waitpid publish-pid)))
(and avahi-thread
(join-thread avahi-thread))
(exit 1)))))
(define (gather-user-privileges user)
"switch to the identity of user, a user name."
(catch 'misc-error
(lambda ()
(let ((user (getpw user)))
(setgroups #())
(setgid (passwd:gid user))
(setuid (passwd:uid user))))
(lambda (key proc message args . rest)
(leave (G_ "user '~a' not found: ~a~%")
user (apply format #f message args)))))
(define (remote-server args)
(signal-handler)
(with-error-handling
(let* ((opts (args-fold* args %options
(lambda (opt name arg result)
(leave (G_ "~A: unrecognized option~%") name))
(lambda (arg result)
(leave (G_ "~A: extraneous argument~%") arg))
%default-options))
(backend-port (assoc-ref opts 'backend-port))
(log-port (assoc-ref opts 'log-port))
(publish-port (assoc-ref opts 'publish-port))
(cache (assoc-ref opts 'cache))
(database (assoc-ref opts 'database))
(trigger-substitute-url (assoc-ref opts 'trigger-substitute-url))
(user (assoc-ref opts 'user))
(public-key
(read-file-sexp
(assoc-ref opts 'public-key-file)))
(private-key
(read-file-sexp
(assoc-ref opts 'private-key-file))))
(parameterize ((%cache-directory cache)
(%trigger-substitute-url trigger-substitute-url)
(%package-database database)
(%public-key public-key)
(%private-key private-key))
(when user
(gather-user-privileges user))
(atomic-box-set!
%publish-pid
(publish-server publish-port
#:public-key public-key
#:private-key private-key))
(atomic-box-set!
%avahi-thread
(avahi-publish-service-thread
service-name
#:type remote-server-service-type
#:port backend-port
#:stop-loop? (lambda ()
(atomic-box-ref %stop-process?))
#:txt (list (string-append "log-port="
(number->string log-port))
(string-append "publish-port="
(number->string publish-port)))))
(receive-logs log-port (%cache-directory))
(with-database
(for-each (lambda (number)
(start-fetch-worker
(string-append "fetch-worker-"
(number->string number))))
(iota 4))
(zmq-start-proxy backend-port))))))