diff --git a/.dir-locals.el b/.dir-locals.el index 5d052d8..f7cbfb5 100644 --- a/.dir-locals.el +++ b/.dir-locals.el @@ -7,7 +7,9 @@ (scheme-mode (indent-tabs-mode) (eval put 'with-time-logging 'scheme-indent-function 1) - (eval put 'make-parameter 'scheme-indent-function 1)) + (eval put 'make-parameter 'scheme-indent-function 1) + (eval put 'letpar 'scheme-indent-function 1) + (eval put 'letpar& 'scheme-indent-function 1)) (texinfo-mode (indent-tabs-mode) (fill-column . 72))) diff --git a/guix-data-service/data-deletion.scm b/guix-data-service/data-deletion.scm index 6c4e0b9..197cef1 100644 --- a/guix-data-service/data-deletion.scm +++ b/guix-data-service/data-deletion.scm @@ -448,17 +448,16 @@ WHERE NOT EXISTS ( (lambda (count result) (+ result count)) 0 - (par-map (lambda (derivation-id) - (with-postgresql-transaction/through-channel - conn-channel - (lambda (conn) - (exec-query - conn - " + (par-map& (lambda (derivation-id) + (with-thread-postgresql-connection + (lambda (conn) + (exec-query + conn + " SET CONSTRAINTS derivations_by_output_details_set_derivation_id_fkey DEFERRED") - (maybe-delete-derivation conn derivation-id)))) - derivations)))) + (maybe-delete-derivation conn derivation-id)))) + derivations)))) (simple-format (current-error-port) "Deleted ~A derivations\n" deleted-count) diff --git a/guix-data-service/database.scm b/guix-data-service/database.scm index 89b1a09..4d1001b 100644 --- a/guix-data-service/database.scm +++ b/guix-data-service/database.scm @@ -20,9 +20,6 @@ #:use-module (ice-9 match) #:use-module (ice-9 threads) #:use-module (squee) - #:use-module (fibers) - #:use-module (fibers channels) - #:use-module (fibers conditions) #:use-module (guix-data-service config) #:export (with-postgresql-connection @@ -136,98 +133,6 @@ (f conn))))) -(define* (make-postgresql-connection-channel name - #:key - (statement-timeout #f) - (threads 2)) - (parameterize (((@@ (fibers internal) current-fiber) #f)) - (let ((channel (make-channel))) - (for-each - (lambda _ - (call-with-new-thread - (lambda () - (with-postgresql-connection - name - (lambda (conn) - (let loop () - (match (get-message channel) - (((? channel? reply) f (? boolean? allways-rollback?)) - (put-message - reply - (with-exception-handler - (lambda (exn) - (cons 'worker-thread-error exn)) - (lambda () - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "postgresql connection thread: exception: ~A\n" - exn) - (backtrace) - (raise-exception exn)) - (lambda () - (call-with-values - (lambda () - (with-postgresql-transaction - conn - (lambda (conn) - (f conn)))) - (lambda vals vals))))) - #:unwind? #t)) - (loop)) - (((? channel? reply) . (? list? args)) - (put-message - reply - (with-exception-handler - (lambda (exn) - (cons 'worker-thread-error exn)) - (lambda () - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "postgresql connection thread: exception: ~A\n" - exn) - (backtrace) - (raise-exception exn)) - (lambda () - (call-with-values - (lambda () - (apply exec-query - conn - args)) - (lambda vals vals))))) - #:unwind? #t)) - (loop)) - (_ #f)))) - #:statement-timeout statement-timeout)))) - (iota threads)) - channel))) - -(define (close-postgresql-connection-channel channel) - (put-message channel #f)) - -(define (exec-query/through-channel channel . args) - (let ((reply (make-channel))) - (put-message channel (cons reply args)) - (match (get-message reply) - (('worker-thread-error . exn) - (raise-exception exn)) - (result - (apply values result))))) - -(define* (with-postgresql-transaction/through-channel channel - f - #:key always-rollback?) - (let ((reply (make-channel))) - (put-message channel (list reply f always-rollback?)) - (match (get-message reply) - (('worker-thread-error . exn) - (raise-exception exn)) - (result - (apply values result))))) - (define* (with-postgresql-transaction conn f #:key always-rollback?) (exec-query conn "BEGIN;") diff --git a/guix-data-service/utils.scm b/guix-data-service/utils.scm index 738f839..855c819 100644 --- a/guix-data-service/utils.scm +++ b/guix-data-service/utils.scm @@ -17,9 +17,18 @@ (define-module (guix-data-service utils) #:use-module (srfi srfi-11) + #:use-module (ice-9 match) + #:use-module (ice-9 threads) + #:use-module (fibers) + #:use-module (fibers channels) + #:use-module (fibers conditions) #:export (call-with-time-logging with-time-logging - prevent-inlining-for-tests)) + prevent-inlining-for-tests + + parallel-via-thread-pool-channel + par-map& + letpar&)) (define (call-with-time-logging action thunk) (simple-format #t "debug: Starting ~A\n" action) @@ -38,3 +47,94 @@ (define-syntax-rule (prevent-inlining-for-tests var) (set! var var)) + + +(define* (make-thread-pool-channel #:key (threads 8)) + (parameterize (((@@ (fibers internal) current-fiber) #f)) + (let ((channel (make-channel))) + (for-each + (lambda _ + (call-with-new-thread + (lambda () + (let loop () + (match (get-message channel) + (((? channel? reply) . (? procedure? proc)) + (put-message + reply + (with-exception-handler + (lambda (exn) + (cons 'worker-thread-error exn)) + (lambda () + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "worker thread: exception: ~A\n" + exn) + (backtrace) + (raise-exception exn)) + (lambda () + (call-with-values + proc + (lambda vals + vals))))) + #:unwind? #t)) + (loop)) + (_ #f)))))) + (iota threads)) + channel))) + +(define %thread-pool-mutex (make-mutex)) +(define %thread-pool-channel #f) + +(define (make-thread-pool-channel!') + (with-mutex %thread-pool-mutex + (unless %thread-pool-channel + (set! %thread-pool-channel (make-thread-pool-channel)) + (set! make-thread-pool-channel! (lambda () #t))))) + +(define make-thread-pool-channel! + (lambda () (make-thread-pool-channel!'))) + +(define (defer-to-thread-pool-channel thunk) + (make-thread-pool-channel!) + (let ((reply (make-channel))) + (put-message %thread-pool-channel (cons reply thunk)) + reply)) + +(define (fetch-result-of-defered-thunk reply-channel) + (match (get-message reply-channel) + (('worker-thread-error . exn) + (raise-exception exn)) + (result + (apply values result)))) + +(define-syntax parallel-via-thread-pool-channel + (lambda (x) + (syntax-case x () + ((_ e0 ...) + (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...))))) + #'(let ((tmp0 (defer-to-thread-pool-channel + (lambda () + e0))) + ...) + (values (fetch-result-of-defered-thunk tmp0) ...))))))) + +(define-syntax-rule (letpar& ((v e) ...) b0 b1 ...) + (call-with-values + (lambda () (parallel-via-thread-pool-channel e ...)) + (lambda (v ...) + b0 b1 ...))) + +(define (par-mapper' mapper cons) + (lambda (proc . lists) + (let loop ((lists lists)) + (match lists + (((heads tails ...) ...) + (let ((tail (defer-to-thread-pool-channel (loop tails))) + (head (apply proc heads))) + (cons head (fetch-result-of-defered-thunk tail)))) + (_ + '()))))) + +(define par-map& (par-mapper' map cons))