data-service/guix-data-service/jobs.scm

393 lines
12 KiB
Scheme

;;; Guix Data Service -- Information about Guix over time
;;; Copyright © 2019 Christopher Baines <mail@cbaines.net>
;;;
;;; This program is free software: you can redistribute it and/or
;;; modify it under the terms of the GNU Affero General Public License
;;; as published by the Free Software Foundation, either version 3 of
;;; the License, or (at your option) any later version.
;;;
;;; This program is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;; Affero General Public License for more details.
;;;
;;; You should have received a copy of the GNU Affero General Public
;;; License along with this program. If not, see
;;; <http://www.gnu.org/licenses/>.
(define-module (guix-data-service jobs)
#:use-module (ice-9 match)
#:use-module (ice-9 format)
#:use-module (ice-9 threads)
#:use-module (ice-9 atomic)
#:use-module (ice-9 textual-ports)
#:use-module (squee)
#:use-module (guix-data-service utils)
#:use-module (guix-data-service database)
#:use-module (guix-data-service jobs load-new-guix-revision)
#:export (log-for-job
count-log-parts
combine-log-parts!
process-jobs
default-max-processes))
(define (log-part-sequence-name job-id)
(simple-format #f "load_new_guix_revision_job_log_parts_id_seq_~A" job-id))
(define (start-thread-for-process-output job-id)
(define (insert conn job_id s)
(exec-query
conn
(string-append "
INSERT INTO load_new_guix_revision_job_log_parts (id, job_id, contents)
VALUES (nextval('" (log-part-sequence-name job_id) "'), $1, $2)")
(list job_id s)))
(match (pipe)
((port-to-read-from . port-to-write-to)
(setvbuf port-to-read-from 'line)
(setvbuf port-to-write-to 'line)
(let ((flags (fcntl port-to-read-from F_GETFL)))
(fcntl port-to-read-from F_SETFL (logior O_NONBLOCK flags)))
(let ((flags (fcntl port-to-write-to F_GETFL)))
(fcntl port-to-write-to F_SETFL (logior O_NONBLOCK flags)))
(call-with-new-thread
(lambda ()
(with-postgresql-connection
(simple-format #f "~A job logging" job-id)
(lambda (logging-conn)
(exec-query
logging-conn
(simple-format #f "CREATE SEQUENCE IF NOT EXISTS ~A"
(log-part-sequence-name job-id)))
(exec-query
logging-conn
"DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1"
(list job-id))
(let loop ((line (get-line port-to-read-from)))
(let ((line-with-newline
(string-append line "\n")))
(catch #t
(lambda ()
(insert logging-conn job-id line-with-newline)
(display line-with-newline))
(lambda (key . args)
(display
(simple-format
#f
"
error: ~A: ~A
error: could not insert log part: '~A'\n\n"
key args line))
(catch #t
(lambda ()
(insert
logging-conn
job-id
(simple-format
#f
"
guix-data-service: error: missing log line: ~A
\n" key)))
(lambda _
#t)))))
(loop (get-line port-to-read-from)))))))
port-to-write-to)))
(define (cleanup-logging conn job-id)
(drop-log-parts-sequence conn job-id)
(with-time-logging "vacuuming log parts"
(vacuum-log-parts-table conn)))
(define* (process-jobs conn #:key max-processes
latest-branch-revision-max-processes
skip-system-tests?)
(define (fetch-new-jobs)
(fetch-unlocked-jobs conn))
(define (process-job job-id)
(let ((log-port (start-thread-for-process-output job-id)))
(spawn
"guix-data-service-process-job"
(cons* "guix-data-service-process-job"
job-id
(if skip-system-tests?
'("--skip-system-tests")
'()))
#:output log-port
#:error log-port)))
(define (post-job job-id)
(when (> (count-log-parts conn job-id)
0)
(combine-log-parts! conn job-id)
(cleanup-logging conn job-id)))
(define (handle-job-failure job-id)
(record-job-event conn job-id "failure")
(display (simple-format #f "recording failure for job ~A\n" job-id)
(current-error-port)))
(process-jobs-concurrently fetch-new-jobs
process-job
post-job
handle-job-failure
#:max-processes max-processes
#:priority-max-processes
latest-branch-revision-max-processes))
(define* (log-for-job conn job-id
#:key
character-limit
start-character)
(define (sql-html-escape s)
(string-append
"replace("
(string-append
"replace("
(string-append
"replace("
s
",'&','&amp;')")
",'<','&lt;')")
",'>','&gt;')"))
(define (get-characters s)
(if start-character
(simple-format #f "substr(~A, ~A, ~A)"
s start-character
character-limit)
(simple-format #f "right(~A, ~A)" s character-limit)))
(define log-query
(string-append
"SELECT "
(sql-html-escape (get-characters "contents"))
" FROM load_new_guix_revision_job_logs"
" WHERE job_id = $1 AND contents IS NOT NULL"))
(define parts-query
(string-append
"SELECT "
(sql-html-escape
(get-characters "STRING_AGG(contents, '' ORDER BY id ASC)"))
" FROM load_new_guix_revision_job_log_parts WHERE job_id = $1"))
(match (exec-query conn log-query (list job-id))
(((contents))
contents)
(()
(match (exec-query conn parts-query (list job-id))
(((contents))
contents)))))
(define (count-log-parts conn job-id)
(match (exec-query
conn
"
SELECT COUNT(*)
FROM load_new_guix_revision_job_log_parts
WHERE job_id = $1"
(list job-id))
(((id))
(string->number id))))
(define (combine-log-parts! conn job-id)
(with-postgresql-transaction
conn
(lambda (conn)
(exec-query
conn
(string-append
"
UPDATE load_new_guix_revision_job_logs SET contents = (
SELECT STRING_AGG(contents, '' ORDER BY id ASC)
FROM load_new_guix_revision_job_log_parts
WHERE job_id = $1
GROUP BY job_id
)
WHERE job_id = $1")
(list job-id))
(exec-query
conn
"DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1"
(list job-id)))))
(define (drop-log-parts-sequence conn job-id)
(with-postgresql-transaction
conn
(lambda (conn)
(exec-query conn
"SET LOCAL lock_timeout = '10s'")
(with-exception-handler
(lambda (exn)
(simple-format (current-error-port)
"error when dropping sequence: ~A"
exn))
(lambda ()
(exec-query conn
(string-append
"DROP SEQUENCE IF EXISTS "
(log-part-sequence-name job-id))))
#:unwind? #t))))
(define (vacuum-log-parts-table conn)
(exec-query
conn
"VACUUM load_new_guix_revision_job_log_parts"))
(define default-max-processes
(max (round (/ (current-processor-count)
4))
1))
(define default-timeout
(* (* 60 60) ;; 1 hour in seconds
72))
(define* (process-jobs-concurrently
fetch-new-jobs
process-job
post-job
handle-job-failure
#:key
(max-processes default-max-processes)
(priority-max-processes (* 2 max-processes))
(timeout default-timeout))
(define processes
(make-hash-table))
(define (display-status)
(display
(string-append
"\n\n"
(let ((running-jobs (hash-count (const #t) processes)))
(cond
((eq? running-jobs 0)
"status: 0 running jobs")
((eq? running-jobs 1)
"status: 1 running job")
(else
(simple-format #f "status: ~A running jobs"
running-jobs))))
"\n"
(string-concatenate
(hash-map->list
(match-lambda*
((pid (start-time job-args))
(format #f " pid: ~5d job args: ~a\n"
pid job-args)))
processes))
"\n")))
(define (wait-on-processes)
(catch
#t
(lambda ()
(match (waitpid WAIT_ANY WNOHANG)
((0 . status)
;; No process to wait for
#f)
((pid . status)
(match (hash-ref processes pid)
((_ (id))
(post-job id)
(unless (eq? status 0)
(simple-format (current-error-port)
"pid ~A (job: ~A) failed with status ~A\n"
pid id status)
(handle-job-failure id))))
(hashv-remove! processes pid)
;; Recurse, to check for other finished processes.
(wait-on-processes))))
(lambda (key . args)
(unless (and (eq? key 'system-error)
(match args
(("waitpid" "~A" ("No child processes") (10))
#t)
(_ #f)))
(simple-format #t "key ~A args ~A\n"
key args)))))
(define (kill-long-running-processes)
(hash-map->list
(match-lambda*
((pid (start-time job-args))
(let ((running-for
(- (current-time) start-time)))
(when (> running-for timeout)
(display
(simple-format
#f "sending SIGTERM to pid ~A started at ~A, now running for ~A\n"
pid start-time running-for)
(current-error-port))
(kill pid SIGTERM)
(match job-args
((id)
(handle-job-failure id)))))))
processes))
(define (stop-running-processes)
(hash-map->list
(match-lambda*
((pid (start-time job-args))
(display
(simple-format
#f "sending SIGTERM to pid ~A\n"
pid)
(current-error-port))
(kill pid SIGTERM)))
processes))
(define exit?
(make-atomic-box #f))
(sigaction SIGTERM
(lambda args
(simple-format (current-error-port) "exiting due to SIGTERM\n")
(atomic-box-set! exit? #t)))
(while #t
(kill-long-running-processes)
(wait-on-processes)
(display-status)
(when (atomic-box-ref exit?)
(stop-running-processes)
(exit 0))
(match (fetch-new-jobs)
(()
;; Nothing to do
#f)
((jobs ...)
(for-each
(match-lambda
((job-id priority?)
(let ((current-processes
(hash-count (const #t) processes)))
(when (< current-processes
(if priority?
priority-max-processes
max-processes))
(let ((pid (process-job job-id)))
(peek "PID" pid)
(hashv-set! processes pid
(list (current-time) (list job-id))))))))
jobs)))
(sleep 15)))