2
0
Fork 0
mirror of git://git.savannah.gnu.org/guix/guix-cuirass.git synced 2023-12-14 06:03:04 +01:00

remote: Remove address argument.

This commit is contained in:
Mathieu Othacehe 2021-02-12 14:39:12 +01:00
parent b191a2a5f6
commit 83f33cdbb4
No known key found for this signature in database
GPG key ID: 8354763531769CA6
3 changed files with 61 additions and 42 deletions

View file

@ -179,8 +179,8 @@ Start a remote build server.\n"))
((build) build)
(() #f))))))
(define* (read-worker-exp exp #:key reply-worker)
"Read the given EXP sent by a worker. REPLY-WORKER is a procedure that can
(define* (read-worker-exp msg #:key reply-worker)
"Read the given MSG sent by a worker. REPLY-WORKER is a procedure that can
be used to reply to the worker."
(define (update-worker! base-worker)
(let* ((worker* (worker
@ -188,12 +188,13 @@ be used to reply to the worker."
(last-seen (current-time)))))
(db-add-or-update-worker worker*)))
(match (zmq-read-message exp)
(match (zmq-read-message
(zmq-message-string msg))
(('worker-ready worker)
(update-worker! worker))
(('worker-request-info)
(reply-worker
(zmq-server-info (%log-port) (%publish-port))))
(zmq-server-info (zmq-remote-address msg) (%log-port) (%publish-port))))
(('worker-request-work name)
(let ((build (pop-build name)))
(if build
@ -357,18 +358,19 @@ frontend to the workers connected through the TCP backend."
(let loop ()
(let ((items (zmq-poll* poll-items 1000)))
(when (zmq-socket-ready? items build-socket)
(match (zmq-get-msg-parts-bytevector build-socket)
(match (zmq-message-receive build-socket)
((worker empty rest)
(let ((reply-worker
(lambda (message)
(zmq-send-msg-parts-bytevector
build-socket
(list worker
(list (zmq-message-content worker)
(zmq-empty-delimiter)
(string->bv message))))))
(if (need-fetching? (bv->string rest))
(zmq-send-bytevector fetch-socket rest)
(read-worker-exp (bv->string rest)
(string->bv message)))))
(rest-bv (zmq-message-content rest)))
(if (need-fetching? (bv->string rest-bv))
(zmq-send-bytevector fetch-socket rest-bv)
(read-worker-exp rest
#:reply-worker reply-worker))))))
(db-remove-unresponsive-workers (%worker-timeout))
(loop)))))

View file

@ -87,9 +87,6 @@ Start a remote build worker.\n"))
(option '(#\V "version") #f #f
(lambda _
(show-version-and-exit "guix publish")))
(option '(#\a "address") #t #f
(lambda (opt name arg result)
(alist-cons 'address arg result)))
(option '(#\w "workers") #t #f
(lambda (opt name arg result)
(alist-cons 'workers (string->number* arg) result)))
@ -230,7 +227,7 @@ command. REPLY is a procedure that can be used to reply to this server."
(sleep 60)
(loop))))))
(define (start-worker worker serv)
(define (start-worker wrk serv)
"Start a worker thread named NAME, reading commands from the DEALER socket
and executing them. The worker can reply on the same socket."
(define (reply socket)
@ -239,14 +236,14 @@ and executing them. The worker can reply on the same socket."
socket
(list (zmq-empty-delimiter) (string->bv message)))))
(define (ready socket)
(define (ready socket worker)
(zmq-send-msg-parts-bytevector
socket
(list (make-bytevector 0)
(string->bv
(zmq-worker-ready-message (worker->sexp worker))))))
(define (request-work socket)
(define (request-work socket worker)
(let ((name (worker-name worker)))
(zmq-send-msg-parts-bytevector
socket
@ -259,37 +256,54 @@ and executing them. The worker can reply on the same socket."
(list (make-bytevector 0)
(string->bv (zmq-worker-request-info-message)))))
(define (read-server-info socket serv)
(define (read-server-info socket)
(request-info socket)
(match (zmq-get-msg-parts-bytevector socket '())
((empty info)
(match (zmq-read-message (bv->string info))
(('server-info
('worker-address worker-address)
('log-port log-port)
('publish-port publish-port))
(let ((url (publish-url (server-address serv)
publish-port)))
(server
(inherit serv)
(log-port log-port)
(publish-url url))))))))
(list worker-address log-port publish-port))))))
(define (server-info->server info serv)
(match info
((_ log-port publish-port)
(let ((url (publish-url (server-address serv)
publish-port)))
(server
(inherit serv)
(log-port log-port)
(publish-url url))))))
(define (server-info->worker info w)
(match info
((worker-address _ _)
(let ((url (local-publish-url worker-address)))
(worker
(inherit w)
(address worker-address)
(publish-url url))))))
(match (primitive-fork)
(0
(set-thread-name (worker-name worker))
(set-thread-name (worker-name wrk))
(let* ((socket (zmq-dealer-socket))
(address (server-address serv))
(port (server-port serv))
(endpoint (zmq-backend-endpoint address port)))
(zmq-connect socket endpoint)
(ready socket)
(worker-ping worker serv)
(let ((server* (read-server-info socket serv)))
(let* ((info (read-server-info socket))
(server (server-info->server info serv))
(worker (server-info->worker info wrk)))
(ready socket worker)
(worker-ping worker server)
(let loop ()
(request-work socket)
(request-work socket worker)
(match (zmq-get-msg-parts-bytevector socket '())
((empty command)
(run-command (bv->string command) server*
(run-command (bv->string command) server
#:reply (reply socket)
#:worker worker)))
(sleep 10)
@ -343,7 +357,6 @@ exiting."
(lambda (arg result)
(leave (G_ "~A: extraneous argument~%") arg))
%default-options))
(address (assoc-ref opts 'address))
(workers (assoc-ref opts 'workers))
(publish-port (assoc-ref opts 'publish-port))
(server-address (assoc-ref opts 'server))
@ -363,18 +376,12 @@ exiting."
#:public-key public-key
#:private-key private-key))
(when (and server-address (not address))
(leave (G_ "Address must be set when server is provided.~%")))
(if server-address
(for-each
(lambda (n)
(let* ((publish-url (local-publish-url address))
(worker (worker
(let* ((worker (worker
(name (generate-worker-name))
(address address)
(machine (gethostname))
(publish-url publish-url)
(systems systems)))
(addr (string-split server-address #\:))
(server (match addr
@ -391,8 +398,7 @@ exiting."
((new-service)
(for-each
(lambda (n)
(let* ((address (or address
(avahi-service-local-address service)))
(let* ((address (avahi-service-local-address service))
(publish-url (local-publish-url address)))
(add-to-worker-pids!
(start-worker (worker

View file

@ -82,6 +82,8 @@
zmq-worker-request-work-message
zmq-worker-request-info-message
zmq-server-info
zmq-remote-address
zmq-message-string
zmq-read-message
remote-server-service-type))
@ -95,7 +97,8 @@
worker make-worker
worker?
(name worker-name)
(address worker-address)
(address worker-address
(default #f))
(machine worker-machine)
(publish-url worker-publish-url
(default #f))
@ -400,6 +403,13 @@ retries a call to PROC."
(eq? (poll-item-socket item) socket))
items))
(define (zmq-remote-address message)
(zmq-message-gets message "Peer-Address"))
(define (zmq-message-string message)
(bv->string
(zmq-message-content message)))
(define (zmq-read-message msg)
(call-with-input-string msg read))
@ -455,9 +465,10 @@ retries a call to PROC."
"Return a message requesting server information."
(format #f "~s" '(worker-request-info)))
(define (zmq-server-info log-port publish-port)
(define (zmq-server-info worker-address log-port publish-port)
"Return a message containing server information."
(format #f "~s" `(server-info (log-port ,log-port)
(format #f "~s" `(server-info (worker-address ,worker-address)
(log-port ,log-port)
(publish-port ,publish-port))))
(define remote-server-service-type