diff --git a/guix-data-service/jobs.scm b/guix-data-service/jobs.scm index a0f59dc..b151367 100644 --- a/guix-data-service/jobs.scm +++ b/guix-data-service/jobs.scm @@ -20,11 +20,93 @@ #:use-module (ice-9 format) #:use-module (ice-9 threads) #:use-module (ice-9 atomic) + #:use-module (ice-9 textual-ports) + #:use-module (squee) + #:use-module (guix-data-service utils) + #:use-module (guix-data-service database) #:use-module (guix-data-service jobs load-new-guix-revision) - #:export (process-jobs + #:export (log-for-job + count-log-parts + combine-log-parts! + + process-jobs default-max-processes)) +(define (log-part-sequence-name job-id) + (simple-format #f "load_new_guix_revision_job_log_parts_id_seq_~A" job-id)) + +(define (start-thread-for-process-output job-id) + (define (insert conn job_id s) + (exec-query + conn + (string-append " +INSERT INTO load_new_guix_revision_job_log_parts (id, job_id, contents) +VALUES (nextval('" (log-part-sequence-name job_id) "'), $1, $2)") + (list job_id s))) + + + (match (pipe) + ((port-to-read-from . port-to-write-to) + + (setvbuf port-to-read-from 'line) + (setvbuf port-to-write-to 'line) + + (let ((flags (fcntl port-to-read-from F_GETFL))) + (fcntl port-to-read-from F_SETFL (logior O_NONBLOCK flags))) + (let ((flags (fcntl port-to-write-to F_GETFL))) + (fcntl port-to-write-to F_SETFL (logior O_NONBLOCK flags))) + + (call-with-new-thread + (lambda () + (with-postgresql-connection + (simple-format #f "~A job logging" job-id) + (lambda (logging-conn) + (exec-query + logging-conn + (simple-format #f "CREATE SEQUENCE IF NOT EXISTS ~A" + (log-part-sequence-name job-id))) + (exec-query + logging-conn + "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1" + (list job-id)) + + (let loop ((line (get-line port-to-read-from))) + (let ((line-with-newline + (string-append line "\n"))) + (catch #t + (lambda () + (insert logging-conn job-id line-with-newline) + (display line-with-newline)) + (lambda (key . args) + (display + (simple-format + #f + " +error: ~A: ~A +error: could not insert log part: '~A'\n\n" + key args line)) + (catch #t + (lambda () + (insert + logging-conn + job-id + (simple-format + #f + " +guix-data-service: error: missing log line: ~A +\n" key))) + (lambda _ + #t))))) + (loop (get-line port-to-read-from))))))) + + port-to-write-to))) + +(define (cleanup-logging conn job-id) + (drop-log-parts-sequence conn job-id) + (with-time-logging "vacuuming log parts" + (vacuum-log-parts-table conn))) + (define* (process-jobs conn #:key max-processes latest-branch-revision-max-processes skip-system-tests?) @@ -32,29 +114,137 @@ (fetch-unlocked-jobs conn)) (define (process-job job-id) - (apply execlp - "guix-data-service-process-job" - "guix-data-service-process-job" - job-id - (if skip-system-tests? - '("--skip-system-tests") - '()))) + (let ((log-port (start-thread-for-process-output job-id))) + (spawn + "guix-data-service-process-job" + (cons* "guix-data-service-process-job" + job-id + (if skip-system-tests? + '("--skip-system-tests") + '())) + #:output log-port + #:error log-port))) + + (define (post-job job-id) + (when (> (count-log-parts conn job-id) + 0) + (combine-log-parts! conn job-id) + (cleanup-logging conn job-id))) (define (handle-job-failure job-id) (record-job-event conn job-id "failure") (display (simple-format #f "recording failure for job ~A\n" job-id) - (current-error-port)) - (when (> (count-log-parts conn job-id) - 0) - (combine-log-parts! conn job-id))) + (current-error-port))) (process-jobs-concurrently fetch-new-jobs process-job + post-job handle-job-failure #:max-processes max-processes #:priority-max-processes latest-branch-revision-max-processes)) + +(define* (log-for-job conn job-id + #:key + character-limit + start-character) + (define (sql-html-escape s) + (string-append + "replace(" + (string-append + "replace(" + (string-append + "replace(" + s + ",'&','&')") + ",'<','<')") + ",'>','>')")) + + (define (get-characters s) + (if start-character + (simple-format #f "substr(~A, ~A, ~A)" + s start-character + character-limit) + (simple-format #f "right(~A, ~A)" s character-limit))) + + (define log-query + (string-append + "SELECT " + (sql-html-escape (get-characters "contents")) + " FROM load_new_guix_revision_job_logs" + " WHERE job_id = $1 AND contents IS NOT NULL")) + + (define parts-query + (string-append + "SELECT " + (sql-html-escape + (get-characters "STRING_AGG(contents, '' ORDER BY id ASC)")) + " FROM load_new_guix_revision_job_log_parts WHERE job_id = $1")) + + (match (exec-query conn log-query (list job-id)) + (((contents)) + contents) + (() + (match (exec-query conn parts-query (list job-id)) + (((contents)) + contents))))) + +(define (count-log-parts conn job-id) + (match (exec-query + conn + " +SELECT COUNT(*) +FROM load_new_guix_revision_job_log_parts +WHERE job_id = $1" + (list job-id)) + (((id)) + (string->number id)))) + +(define (combine-log-parts! conn job-id) + (with-postgresql-transaction + conn + (lambda (conn) + (exec-query + conn + (string-append + " +UPDATE load_new_guix_revision_job_logs SET contents = ( + SELECT STRING_AGG(contents, '' ORDER BY id ASC) + FROM load_new_guix_revision_job_log_parts + WHERE job_id = $1 + GROUP BY job_id +) +WHERE job_id = $1") + (list job-id)) + (exec-query + conn + "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1" + (list job-id))))) + +(define (drop-log-parts-sequence conn job-id) + (with-postgresql-transaction + conn + (lambda (conn) + (exec-query conn + "SET LOCAL lock_timeout = '10s'") + (with-exception-handler + (lambda (exn) + (simple-format (current-error-port) + "error when dropping sequence: ~A" + exn)) + (lambda () + (exec-query conn + (string-append + "DROP SEQUENCE IF EXISTS " + (log-part-sequence-name job-id)))) + #:unwind? #t)))) + +(define (vacuum-log-parts-table conn) + (exec-query + conn + "VACUUM load_new_guix_revision_job_log_parts")) + (define default-max-processes (max (round (/ (current-processor-count) 4)) @@ -67,6 +257,7 @@ (define* (process-jobs-concurrently fetch-new-jobs process-job + post-job handle-job-failure #:key (max-processes default-max-processes) @@ -108,9 +299,10 @@ ;; No process to wait for #f) ((pid . status) - (unless (eq? status 0) - (match (hash-ref processes pid) - ((_ (id)) + (match (hash-ref processes pid) + ((_ (id)) + (post-job id) + (unless (eq? status 0) (simple-format (current-error-port) "pid ~A (job: ~A) failed with status ~A\n" pid id status) @@ -161,20 +353,6 @@ (kill pid SIGTERM))) processes)) - (define (fork-and-process-job job-args) - (match (primitive-fork) - (0 - (dynamic-wind - (const #t) - (lambda () - (apply process-job job-args)) - (lambda () - (primitive-exit 127)))) - (pid - (hashv-set! processes pid - (list (current-time) job-args)) - #t))) - (define exit? (make-atomic-box #f)) @@ -206,6 +384,9 @@ (if priority? priority-max-processes max-processes)) - (fork-and-process-job (list job-id)))))) + (let ((pid (process-job job-id))) + (peek "PID" pid) + (hashv-set! processes pid + (list (current-time) (list job-id)))))))) jobs))) (sleep 15))) diff --git a/guix-data-service/jobs/load-new-guix-revision.scm b/guix-data-service/jobs/load-new-guix-revision.scm index 796bfc5..705ec41 100644 --- a/guix-data-service/jobs/load-new-guix-revision.scm +++ b/guix-data-service/jobs/load-new-guix-revision.scm @@ -19,13 +19,16 @@ #:use-module (srfi srfi-1) #:use-module (srfi srfi-11) #:use-module (srfi srfi-43) + #:use-module (srfi srfi-71) #:use-module (ice-9 match) #:use-module (ice-9 threads) + #:use-module (ice-9 custom-ports) #:use-module (ice-9 textual-ports) #:use-module (ice-9 hash-table) #:use-module (rnrs exceptions) #:use-module (json) #:use-module (squee) + #:use-module (fibers) #:use-module (guix monads) #:use-module (guix store) #:use-module (guix channels) @@ -61,10 +64,7 @@ #:use-module (guix-data-service model package-metadata) #:use-module (guix-data-service model derivation) #:use-module (guix-data-service model system-test) - #:export (log-for-job - count-log-parts - combine-log-parts! - fetch-unlocked-jobs + #:export (fetch-unlocked-jobs process-load-new-guix-revision-job select-load-new-guix-revision-job-metrics select-job-for-commit @@ -77,259 +77,6 @@ enqueue-load-new-guix-revision-job most-recent-n-load-new-guix-revision-jobs)) -(define (log-part-sequence-name job-id) - (simple-format #f "load_new_guix_revision_job_log_parts_id_seq_~A" job-id)) - -(define* (log-port job-id conn - #:key - delete-existing-log-parts? - real-output-port) - (define output-port - (or real-output-port - (current-output-port))) - - (define buffer "") - - (define (insert job_id s) - (exec-query - conn - (string-append - " -INSERT INTO load_new_guix_revision_job_log_parts (id, job_id, contents) -VALUES (nextval('" (log-part-sequence-name job_id) "'), $1, $2)") - (list job_id s))) - - (define (log-string s) - (if (string-contains s "\n") - (let ((output (string-append buffer s))) - (set! buffer "") ; clear the buffer - (catch #t - (lambda () - (insert job-id output) - (display output output-port)) - (lambda (key . args) - (display - (simple-format - #f - " -error: ~A: ~A -error: could not insert log part: '~A'\n\n" - key args output) - output-port) - (catch #t - (lambda () - (insert - job-id - (simple-format - #f - " -guix-data-service: error: missing log line: ~A -\n" key))) - (lambda () - #t))))) - (set! buffer (string-append buffer s)))) - - (exec-query - conn - (simple-format #f "CREATE SEQUENCE IF NOT EXISTS ~A" - (log-part-sequence-name job-id))) - (when delete-existing-log-parts? - ;; TODO, this is useful when re-running jobs, but I'm not sure that should - ;; be a thing, jobs should probably be only attempted once. - (exec-query - conn - "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1" - (list job-id))) - - (let ((port - (make-soft-port - (vector (lambda (c) - (set! buffer (string-append buffer (string c)))) - log-string - (lambda () - (force-output output-port)) - #f ; fetch one character - (lambda () - ;; close port - #f) - #f) ; number of characters that can be read - "w"))) - (setvbuf port 'line) - port)) - -(define (setup-port-for-inferior-error-output job-id real-output-port) - (define (insert conn job_id s) - (exec-query - conn - (string-append " -INSERT INTO load_new_guix_revision_job_log_parts (id, job_id, contents) -VALUES (nextval('" (log-part-sequence-name job_id) "'), $1, $2)") - (list job_id s))) - - (match (pipe) - ((port-to-read-from . port-to-write-to) - - (setvbuf port-to-read-from 'line) - (setvbuf port-to-write-to 'line) - (call-with-new-thread - (lambda () - (catch 'system-error - (lambda () - (set-thread-name "inferior logging")) - (const #t)) - - (with-postgresql-connection - (simple-format #f "~A inferior error logging" job-id) - (lambda (logging-conn) - (let loop ((line (get-line port-to-read-from))) - (let ((line-with-newline - (string-append line "\n"))) - (catch #t - (lambda () - (insert logging-conn job-id line-with-newline) - (display line-with-newline real-output-port)) - (lambda (key . args) - (display - (simple-format - #f - " -error: ~A: ~A -error: could not insert log part: '~A'\n\n" - key args line) - real-output-port) - (catch #t - (lambda () - (insert - logging-conn - job-id - (simple-format - #f - " -guix-data-service: error: missing log line: ~A -\n" key))) - (lambda () - #t))))) - (loop (get-line port-to-read-from))))))) - - port-to-write-to))) - -(define real-error-port - (make-parameter (current-error-port))) - -(define inferior-error-port - (make-parameter (current-error-port))) - -(define* (log-for-job conn job-id - #:key - character-limit - start-character) - (define (sql-html-escape s) - (string-append - "replace(" - (string-append - "replace(" - (string-append - "replace(" - s - ",'&','&')") - ",'<','<')") - ",'>','>')")) - - (define (get-characters s) - (if start-character - (simple-format #f "substr(~A, ~A, ~A)" - s start-character - character-limit) - (simple-format #f "right(~A, ~A)" s character-limit))) - - (define log-query - (string-append - "SELECT " - (sql-html-escape (get-characters "contents")) - " FROM load_new_guix_revision_job_logs" - " WHERE job_id = $1 AND contents IS NOT NULL")) - - (define parts-query - (string-append - "SELECT " - (sql-html-escape - (get-characters "STRING_AGG(contents, '' ORDER BY id ASC)")) - " FROM load_new_guix_revision_job_log_parts WHERE job_id = $1")) - - (match (exec-query conn log-query (list job-id)) - (((contents)) - contents) - (() - (match (exec-query conn parts-query (list job-id)) - (((contents)) - contents))))) - -(define (insert-empty-log-entry conn job-id) - (exec-query - conn - "DELETE FROM load_new_guix_revision_job_logs WHERE job_id = $1" - (list job-id)) - (exec-query - conn - "INSERT INTO load_new_guix_revision_job_logs (job_id, contents) VALUES -($1, NULL)" - (list job-id))) - -(define (count-log-parts conn job-id) - (match (exec-query - conn - " -SELECT COUNT(*) -FROM load_new_guix_revision_job_log_parts -WHERE job_id = $1" - (list job-id)) - (((id)) - (string->number id)))) - -(define (combine-log-parts! conn job-id) - (with-postgresql-transaction - conn - (lambda (conn) - (exec-query - conn - (string-append - " -UPDATE load_new_guix_revision_job_logs SET contents = ( - SELECT STRING_AGG(contents, '' ORDER BY id ASC) - FROM load_new_guix_revision_job_log_parts - WHERE job_id = $1 - GROUP BY job_id -) -WHERE job_id = $1") - (list job-id)) - (exec-query - conn - "DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1" - (list job-id))))) - -(define (drop-log-parts-sequence conn job-id) - (with-postgresql-transaction - conn - (lambda (conn) - (exec-query conn - "SET LOCAL lock_timeout = '10s'") - (with-exception-handler - (lambda (exn) - (simple-format (current-error-port) - "error when dropping sequence: ~A" - exn)) - (lambda () - (exec-query conn - (string-append - "DROP SEQUENCE IF EXISTS " - (log-part-sequence-name job-id)))) - #:unwind? #t)))) - -(define (vacuum-log-parts-table conn) - (exec-query - conn - "VACUUM load_new_guix_revision_job_log_parts")) - (define inferior-package-id (@@ (guix inferior) inferior-package-id)) @@ -845,9 +592,12 @@ WHERE job_id = $1") (a-name (inferior-package-name a)) (b-name (inferior-package-name b)) (a-version (inferior-package-version a)) - (b-version (inferior-package-version b))) + (b-version (inferior-package-version b)) + (a-replacement (inferior-package-replacement a)) + (b-replacement (inferior-package-replacement b))) (if (and (string=? a-name b-name) - (string=? a-version b-version)) + (string=? a-version b-version) + (eq? a-replacement b-replacement)) (begin (simple-format (current-error-port) "warning: ignoring duplicate package: ~A (~A)\n" @@ -926,7 +676,17 @@ WHERE job_id = $1") (list->vector deduplicated-packages))) -(define* (all-inferior-packages-data inf packages #:key (process-replacements? #t)) +(define* (all-inferior-packages-data inf packages) + (define inferior-package-id->packages-index-hash-table + (let ((hash-table (make-hash-table))) + (vector-for-each + (lambda (i pkg) + (hash-set! hash-table + (inferior-package-id pkg) + i)) + packages) + hash-table)) + (let* ((package-license-data (with-time-logging "fetching inferior package license metadata" (inferior-packages->license-data inf))) @@ -944,27 +704,21 @@ WHERE job_id = $1") (cdr translated-package-descriptions-and-synopsis)))) packages))) (package-replacement-data - (if process-replacements? - (vector-map - (lambda (_ package) - (let ((replacement (inferior-package-replacement package))) - (if replacement - ;; I'm not sure if replacements can themselves be - ;; replaced, but I do know for sure that there are - ;; infinite chains of replacements (python(2)-urllib3 - ;; in 7c4c781aa40c42d4cd10b8d9482199f3db345e1b for - ;; example). - ;; - ;; This code currently just capures the first level - ;; of replacements - (first - (all-inferior-packages-data - inf - (vector replacement) - #:process-replacements? #f)) - #f))) - packages) - #f))) + (vector-map + (lambda (_ pkg) + (let ((replacement (inferior-package-replacement pkg))) + (if replacement + ;; I'm not sure if replacements can themselves be + ;; replaced, but I do know for sure that there are + ;; infinite chains of replacements (python(2)-urllib3 + ;; in 7c4c781aa40c42d4cd10b8d9482199f3db345e1b for + ;; example). + ;; + ;; So this might be #f in these cases + (hash-ref inferior-package-id->packages-index-hash-table + (inferior-package-id pkg)) + #f))) + packages))) `((names . ,(vector-map (lambda (_ pkg) (inferior-package-name pkg)) packages)) @@ -972,53 +726,57 @@ WHERE job_id = $1") packages)) (license-data . ,package-license-data) (metadata . ,package-metadata) - (replacemnets . ,package-replacement-data)))) + (replacements . ,package-replacement-data)))) (define (insert-packages conn inferior-packages-data) - (let*-values - (((package-license-set-ids) - (inferior-packages->license-set-ids - conn - (inferior-packages->license-id-lists - conn - ;; TODO Don't needlessly convert - (vector->list - (assq-ref inferior-packages-data 'license-data))))) - ((all-package-metadata-ids new-package-metadata-ids) - (with-time-logging "inserting package metadata entries" - (inferior-packages->package-metadata-ids + (let* ((names (assq-ref inferior-packages-data 'names)) + (versions (assq-ref inferior-packages-data 'versions)) + (package-license-set-ids + (inferior-packages->license-set-ids conn - ;; TODO Don't needlessly convert - (vector->list - (assq-ref inferior-packages-data 'metadata)) - package-license-set-ids))) - ((replacement-ids) - (or (and=> (assq-ref inferior-packages-data 'replacements) - (lambda (all-replacement-data) - (with-time-logging "inserting package replacements" - (vector-map - (lambda (_ replacement-data) - (if replacement-data - (first - (insert-packages conn (list replacement-data))) - (cons "integer" NULL))) - all-replacement-data)))) - (make-vector (length package-license-set-ids) - (cons "integer" NULL))))) + (inferior-packages->license-id-lists + conn + ;; TODO Don't needlessly convert + (vector->list + (assq-ref inferior-packages-data 'license-data))))) + (all-package-metadata-ids + new-package-metadata-ids + (with-time-logging "inserting package metadata entries" + (inferior-packages->package-metadata-ids + conn + ;; TODO Don't needlessly convert + (vector->list + (assq-ref inferior-packages-data 'metadata)) + package-license-set-ids))) + (replacement-package-ids + (vector-map + (lambda (_ package-index-or-false) + (if package-index-or-false + (first + (inferior-packages->package-ids + conn + (list (list (vector-ref names package-index-or-false) + (vector-ref versions package-index-or-false) + (list-ref all-package-metadata-ids + package-index-or-false) + (cons "integer" NULL))))) + (cons "integer" NULL))) + (assq-ref inferior-packages-data 'replacements)))) (unless (null? new-package-metadata-ids) - (with-time-logging "fetching package metadata tsvector entries" + (with-time-logging "inserting package metadata tsvector entries" (insert-package-metadata-tsvector-entries conn new-package-metadata-ids))) - (with-time-logging "getting package-ids" - (inferior-packages->package-ids - conn - ;; TODO Do this more efficiently - (zip (vector->list (assq-ref inferior-packages-data 'names)) - (vector->list (assq-ref inferior-packages-data 'versions)) - all-package-metadata-ids - (vector->list replacement-ids)))))) + (with-time-logging "getting package-ids (without replacements)" + (list->vector + (inferior-packages->package-ids + conn + ;; TODO Do this more efficiently + (zip (vector->list names) + (vector->list versions) + all-package-metadata-ids + (vector->list replacement-package-ids))))))) (define (insert-lint-warnings conn package-ids @@ -1082,11 +840,11 @@ WHERE job_id = $1") conn derivations-vector))) - (insert-package-derivations conn - (car system-and-target) - (or (cdr system-and-target) "") - package-ids - derivation-ids))) + (insert-package-derivations conn + (car system-and-target) + (or (cdr system-and-target) "") + package-ids + derivation-ids))) inferior-packages-system-and-target-to-derivations-alist)) (define guix-store-path @@ -1213,8 +971,7 @@ WHERE job_id = $1") "SSL_CERT_DIR=" (nss-certs-store-path store)))) (begin (simple-format #t "debug: using open-inferior\n") - (open-inferior (guix-store-path store) - #:error-port (inferior-error-port)))))) + (open-inferior (guix-store-path store)))))) (define (start-inferior-and-return-derivation-file-names) ;; /etc is only missing if open-inferior/container has been used @@ -1344,8 +1101,7 @@ WHERE job_id = $1") '("/gnu/store")) (begin (simple-format #t "debug: using open-inferior\n") - (open-inferior store-path - #:error-port (inferior-error-port)))))) + (open-inferior store-path))))) (inferior-eval '(use-modules (srfi srfi-1) (srfi srfi-34) (guix grafts) @@ -1400,8 +1156,7 @@ WHERE job_id = $1") (begin (setenv "GUIX_LOCPATH" guix-locpath) (simple-format #t "debug: using open-inferior\n") - (open-inferior store-path - #:error-port (inferior-error-port))))))) + (open-inferior store-path)))))) (setenv "GUIX_LOCPATH" guix-locpath) ; restore GUIX_LOCPATH (when (eq? inf #f) @@ -2040,44 +1795,6 @@ SKIP LOCKED") (prevent-inlining-for-tests with-store-connection) -(define (setup-logging id thunk) - (let* ((previous-output-port (current-output-port)) - (previous-error-port (current-error-port)) - (result - (with-postgresql-connection - (simple-format #f "load-new-guix-revision ~A logging" id) - (lambda (logging-conn) - (insert-empty-log-entry logging-conn id) - (let ((logging-port - (log-port id logging-conn - #:delete-existing-log-parts? #t))) - (set-current-output-port logging-port) - (set-current-error-port logging-port) - (let ((result - (parameterize ((current-build-output-port logging-port) - (real-error-port previous-error-port) - (inferior-error-port - (setup-port-for-inferior-error-output - id previous-error-port))) - (thunk)))) - (set-current-output-port previous-output-port) - (set-current-error-port previous-error-port) - - ;; This can happen with GC, so do it explicitly - (close-port logging-port) - - (combine-log-parts! logging-conn id) - - result)))))) - result)) - -(define (cleanup-logging id conn) - (drop-log-parts-sequence conn id) - (with-time-logging "vacuuming log parts" - (vacuum-log-parts-table conn))) - -(prevent-inlining-for-tests setup-logging) - (define* (process-load-new-guix-revision-job id #:key skip-system-tests?) (with-postgresql-connection (simple-format #f "load-new-guix-revision ~A" id) @@ -2104,38 +1821,30 @@ SKIP LOCKED") (if (eq? (with-time-logging (string-append "processing revision " commit) - (setup-logging - id - (lambda () - (with-exception-handler - (const #f) - (lambda () - (with-throw-handler #t - (lambda () - (with-store-connection - (lambda (store) - (load-new-guix-revision conn - store - git-repository-id - commit - #:skip-system-tests? - skip-system-tests?)))) - (lambda (key . args) - (simple-format (current-error-port) - "error: load-new-guix-revision: ~A ~A\n" - key args) - (backtrace)))) - #:unwind? #t)))) + (with-exception-handler + (const #f) + (lambda () + (with-throw-handler #t + (lambda () + (with-store-connection + (lambda (store) + (load-new-guix-revision conn + store + git-repository-id + commit + #:skip-system-tests? #t)))) + (lambda (key . args) + (simple-format (current-error-port) + "error: load-new-guix-revision: ~A ~A\n" + key args) + (backtrace)))) + #:unwind? #t)) #t) (begin (record-job-succeeded conn id) (record-job-event conn id "success") (exec-query conn "COMMIT") - (with-time-logging - "cleanup logging" - (cleanup-logging id conn)) - (with-time-logging "vacuuming package derivations by guix revision range table" (vacuum-package-derivations-table conn)) @@ -2170,10 +1879,6 @@ SKIP LOCKED") (exec-query conn "ROLLBACK") (record-job-event conn id "failure") - (with-time-logging - "cleanup logging" - (cleanup-logging id conn)) - #f))) (() (exec-query conn "ROLLBACK") diff --git a/guix-data-service/model/package.scm b/guix-data-service/model/package.scm index 263f46c..7ec2b09 100644 --- a/guix-data-service/model/package.scm +++ b/guix-data-service/model/package.scm @@ -264,12 +264,11 @@ INSERT INTO packages (name, version, package_metadata_id) VALUES " RETURNING id")) (define (inferior-packages->package-ids conn package-entries) - (list->vector - (insert-missing-data-and-return-all-ids - conn - "packages" - '(name version package_metadata_id replacement_package_id) - package-entries))) + (insert-missing-data-and-return-all-ids + conn + "packages" + '(name version package_metadata_id replacement_package_id) + package-entries)) (define (select-package-versions-for-revision conn commit diff --git a/guix-data-service/web/jobs/controller.scm b/guix-data-service/web/jobs/controller.scm index b8b494d..7e5084f 100644 --- a/guix-data-service/web/jobs/controller.scm +++ b/guix-data-service/web/jobs/controller.scm @@ -23,6 +23,7 @@ #:use-module (guix-data-service web controller) #:use-module (guix-data-service web query-parameters) #:use-module (guix-data-service web util) + #:use-module (guix-data-service jobs) #:use-module (guix-data-service jobs load-new-guix-revision) #:use-module (guix-data-service web jobs html) #:export (jobs-controller)) diff --git a/scripts/guix-data-service-process-job.in b/scripts/guix-data-service-process-job.in index c6d06c6..89e31b9 100644 --- a/scripts/guix-data-service-process-job.in +++ b/scripts/guix-data-service-process-job.in @@ -25,6 +25,8 @@ (use-modules (srfi srfi-1) (srfi srfi-37) (ice-9 match) + (ice-9 suspendable-ports) + (fibers) (guix-data-service database) (guix-data-service data-deletion) (guix-data-service model package-derivation-by-guix-revision-range) @@ -38,12 +40,21 @@ ;; Make stack traces more useful (setenv "COLUMNS" "256") +(install-suspendable-ports!) + (define %options (list (option '("skip-system-tests") #f #f (lambda (opt name _ result) - (alist-cons 'skip-system-tests #t result))))) + (alist-cons 'skip-system-tests #t result))) + (option '("parallelism") #t #f + (lambda (opt name arg result) + (alist-cons 'parallelism + (string->number arg) + (alist-delete 'parallelism + result)))))) -(define %default-options '()) +(define %default-options + '((parallelism . 1))) (define (parse-options args) (args-fold @@ -62,6 +73,11 @@ (let ((opts (parse-options (cdr (program-arguments))))) (match (assq-ref opts 'arguments) ((job) - (process-load-new-guix-revision-job - job - #:skip-system-tests? (assq-ref opts 'skip-system-tests))))) + (run-fibers + (lambda () + (process-load-new-guix-revision-job + job + #:skip-system-tests? (assq-ref opts 'skip-system-tests))) + #:hz 0 + #:parallelism (assq-ref opts 'parallelism) + #:drain? #t)))) diff --git a/tests/jobs-load-new-guix-revision.scm b/tests/jobs-load-new-guix-revision.scm index 0eaad3f..8213afb 100644 --- a/tests/jobs-load-new-guix-revision.scm +++ b/tests/jobs-load-new-guix-revision.scm @@ -48,43 +48,37 @@ (mock ((guix-data-service jobs load-new-guix-revision) - setup-logging - (lambda (conn thunk) - (thunk))) + channel-derivations-by-system->guix-store-item + (lambda (store channel-derivations-by-system) + "/gnu/store/test")) (mock ((guix-data-service jobs load-new-guix-revision) - channel-derivations-by-system->guix-store-item - (lambda (store channel-derivations-by-system) - "/gnu/store/test")) + extract-information-from + (lambda* (conn store guix-revision-id commit + guix-source store-path + #:key skip-system-tests?) + #t)) (mock - ((guix-data-service jobs load-new-guix-revision) - extract-information-from - (lambda* (conn store guix-revision-id commit - guix-source store-path - #:key skip-system-tests?) + ((guix-data-service model channel-instance) + insert-channel-instances + (lambda (conn guix-revision-id derivations-by-system) #t)) (mock - ((guix-data-service model channel-instance) - insert-channel-instances - (lambda (conn guix-revision-id derivations-by-system) - #t)) + ((guix channels) + channel-news-for-commit + (lambda (channel commit) + '())) - (mock - ((guix channels) - channel-news-for-commit - (lambda (channel commit) - '())) - - (match (enqueue-load-new-guix-revision-job - conn - (git-repository-url->git-repository-id conn "test-url") - "test-commit" - "test-source") - ((id) - (process-load-new-guix-revision-job id))))))))))) + (match (enqueue-load-new-guix-revision-job + conn + (git-repository-url->git-repository-id conn "test-url") + "test-commit" + "test-source") + ((id) + (process-load-new-guix-revision-job id)))))))))) (exec-query conn "TRUNCATE guix_revisions CASCADE") (exec-query conn "TRUNCATE load_new_guix_revision_jobs CASCADE")