mirror of
git://git.savannah.gnu.org/guix/data-service.git
synced 2023-12-14 03:23:03 +01:00
211 lines
6.2 KiB
Scheme
211 lines
6.2 KiB
Scheme
;;; Guix Data Service -- Information about Guix over time
|
|
;;; Copyright © 2019 Christopher Baines <mail@cbaines.net>
|
|
;;;
|
|
;;; This program is free software: you can redistribute it and/or
|
|
;;; modify it under the terms of the GNU Affero General Public License
|
|
;;; as published by the Free Software Foundation, either version 3 of
|
|
;;; the License, or (at your option) any later version.
|
|
;;;
|
|
;;; This program is distributed in the hope that it will be useful,
|
|
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
;;; Affero General Public License for more details.
|
|
;;;
|
|
;;; You should have received a copy of the GNU Affero General Public
|
|
;;; License along with this program. If not, see
|
|
;;; <http://www.gnu.org/licenses/>.
|
|
|
|
(define-module (guix-data-service jobs)
|
|
#:use-module (ice-9 match)
|
|
#:use-module (ice-9 format)
|
|
#:use-module (ice-9 threads)
|
|
#:use-module (ice-9 atomic)
|
|
#:use-module (guix-data-service jobs load-new-guix-revision)
|
|
#:export (process-jobs
|
|
|
|
default-max-processes))
|
|
|
|
(define* (process-jobs conn #:key max-processes
|
|
latest-branch-revision-max-processes
|
|
skip-system-tests?)
|
|
(define (fetch-new-jobs)
|
|
(fetch-unlocked-jobs conn))
|
|
|
|
(define (process-job job-id)
|
|
(apply execlp
|
|
"guix-data-service-process-job"
|
|
"guix-data-service-process-job"
|
|
job-id
|
|
(if skip-system-tests?
|
|
'("--skip-system-tests")
|
|
'())))
|
|
|
|
(define (handle-job-failure job-id)
|
|
(record-job-event conn job-id "failure")
|
|
(display (simple-format #f "recording failure for job ~A\n" job-id)
|
|
(current-error-port))
|
|
(when (> (count-log-parts conn job-id)
|
|
0)
|
|
(combine-log-parts! conn job-id)))
|
|
|
|
(process-jobs-concurrently fetch-new-jobs
|
|
process-job
|
|
handle-job-failure
|
|
#:max-processes max-processes
|
|
#:priority-max-processes
|
|
latest-branch-revision-max-processes))
|
|
|
|
(define default-max-processes
|
|
(max (round (/ (current-processor-count)
|
|
4))
|
|
1))
|
|
|
|
(define default-timeout
|
|
(* (* 60 60) ;; 1 hour in seconds
|
|
72))
|
|
|
|
(define* (process-jobs-concurrently
|
|
fetch-new-jobs
|
|
process-job
|
|
handle-job-failure
|
|
#:key
|
|
(max-processes default-max-processes)
|
|
(priority-max-processes (* 2 max-processes))
|
|
(timeout default-timeout))
|
|
|
|
(define processes
|
|
(make-hash-table))
|
|
|
|
(define (display-status)
|
|
(display
|
|
(string-append
|
|
"\n\n"
|
|
(let ((running-jobs (hash-count (const #t) processes)))
|
|
(cond
|
|
((eq? running-jobs 0)
|
|
"status: 0 running jobs")
|
|
((eq? running-jobs 1)
|
|
"status: 1 running job")
|
|
(else
|
|
(simple-format #f "status: ~A running jobs"
|
|
running-jobs))))
|
|
"\n"
|
|
(string-concatenate
|
|
(hash-map->list
|
|
(match-lambda*
|
|
((pid (start-time job-args))
|
|
(format #f " pid: ~5d job args: ~a\n"
|
|
pid job-args)))
|
|
processes))
|
|
"\n")))
|
|
|
|
(define (wait-on-processes)
|
|
(catch
|
|
#t
|
|
(lambda ()
|
|
(match (waitpid WAIT_ANY WNOHANG)
|
|
((0 . status)
|
|
;; No process to wait for
|
|
#f)
|
|
((pid . status)
|
|
(unless (eq? status 0)
|
|
(match (hash-ref processes pid)
|
|
((_ (id))
|
|
(simple-format (current-error-port)
|
|
"pid ~A (job: ~A) failed with status ~A\n"
|
|
pid id status)
|
|
|
|
(handle-job-failure id))))
|
|
|
|
(hashv-remove! processes pid)
|
|
|
|
;; Recurse, to check for other finished processes.
|
|
(wait-on-processes))))
|
|
(lambda (key . args)
|
|
(unless (and (eq? key 'system-error)
|
|
(match args
|
|
(("waitpid" "~A" ("No child processes") (10))
|
|
#t)
|
|
(_ #f)))
|
|
(simple-format #t "key ~A args ~A\n"
|
|
key args)))))
|
|
|
|
(define (kill-long-running-processes)
|
|
(hash-map->list
|
|
(match-lambda*
|
|
((pid (start-time job-args))
|
|
(let ((running-for
|
|
(- (current-time) start-time)))
|
|
(when (> running-for timeout)
|
|
(display
|
|
(simple-format
|
|
#f "sending SIGTERM to pid ~A started at ~A, now running for ~A\n"
|
|
pid start-time running-for)
|
|
(current-error-port))
|
|
(kill pid SIGTERM)
|
|
|
|
(match job-args
|
|
((id)
|
|
(handle-job-failure id)))))))
|
|
processes))
|
|
|
|
(define (stop-running-processes)
|
|
(hash-map->list
|
|
(match-lambda*
|
|
((pid (start-time job-args))
|
|
(display
|
|
(simple-format
|
|
#f "sending SIGTERM to pid ~A\n"
|
|
pid)
|
|
(current-error-port))
|
|
(kill pid SIGTERM)))
|
|
processes))
|
|
|
|
(define (fork-and-process-job job-args)
|
|
(match (primitive-fork)
|
|
(0
|
|
(dynamic-wind
|
|
(const #t)
|
|
(lambda ()
|
|
(apply process-job job-args))
|
|
(lambda ()
|
|
(primitive-exit 127))))
|
|
(pid
|
|
(hashv-set! processes pid
|
|
(list (current-time) job-args))
|
|
#t)))
|
|
|
|
(define exit?
|
|
(make-atomic-box #f))
|
|
|
|
(sigaction SIGTERM
|
|
(lambda args
|
|
(simple-format (current-error-port) "exiting due to SIGTERM\n")
|
|
(atomic-box-set! exit? #t)))
|
|
|
|
(while #t
|
|
(kill-long-running-processes)
|
|
(wait-on-processes)
|
|
(display-status)
|
|
|
|
(when (atomic-box-ref exit?)
|
|
(stop-running-processes)
|
|
(exit 0))
|
|
|
|
(match (fetch-new-jobs)
|
|
(()
|
|
;; Nothing to do
|
|
#f)
|
|
((jobs ...)
|
|
(for-each
|
|
(match-lambda
|
|
((job-id priority?)
|
|
(let ((current-processes
|
|
(hash-count (const #t) processes)))
|
|
(when (< current-processes
|
|
(if priority?
|
|
priority-max-processes
|
|
max-processes))
|
|
(fork-and-process-job (list job-id))))))
|
|
jobs)))
|
|
(sleep 15)))
|