Rework the shortlived PostgreSQL specific connection channel

In to a generic thing more like (ice-9 futures). Including copying some bits
from the (ice-9 threads) module and adapting them to work with this fibers
approach, rather than futures. The advantage being that using fibers channels
doesn't block the threads being used by fibers, whereas futures would.
This commit is contained in:
Christopher Baines 2020-10-03 21:32:46 +01:00
parent 18b6dd9e6d
commit e2e55c69de
4 changed files with 112 additions and 106 deletions

View File

@ -7,7 +7,9 @@
(scheme-mode
(indent-tabs-mode)
(eval put 'with-time-logging 'scheme-indent-function 1)
(eval put 'make-parameter 'scheme-indent-function 1))
(eval put 'make-parameter 'scheme-indent-function 1)
(eval put 'letpar 'scheme-indent-function 1)
(eval put 'letpar& 'scheme-indent-function 1))
(texinfo-mode
(indent-tabs-mode)
(fill-column . 72)))

View File

@ -448,17 +448,16 @@ WHERE NOT EXISTS (
(lambda (count result)
(+ result count))
0
(par-map (lambda (derivation-id)
(with-postgresql-transaction/through-channel
conn-channel
(lambda (conn)
(exec-query
conn
"
(par-map& (lambda (derivation-id)
(with-thread-postgresql-connection
(lambda (conn)
(exec-query
conn
"
SET CONSTRAINTS derivations_by_output_details_set_derivation_id_fkey DEFERRED")
(maybe-delete-derivation conn derivation-id))))
derivations))))
(maybe-delete-derivation conn derivation-id))))
derivations))))
(simple-format (current-error-port)
"Deleted ~A derivations\n"
deleted-count)

View File

@ -20,9 +20,6 @@
#:use-module (ice-9 match)
#:use-module (ice-9 threads)
#:use-module (squee)
#:use-module (fibers)
#:use-module (fibers channels)
#:use-module (fibers conditions)
#:use-module (guix-data-service config)
#:export (with-postgresql-connection
@ -136,98 +133,6 @@
(f conn)))))
(define* (make-postgresql-connection-channel name
#:key
(statement-timeout #f)
(threads 2))
(parameterize (((@@ (fibers internal) current-fiber) #f))
(let ((channel (make-channel)))
(for-each
(lambda _
(call-with-new-thread
(lambda ()
(with-postgresql-connection
name
(lambda (conn)
(let loop ()
(match (get-message channel)
(((? channel? reply) f (? boolean? allways-rollback?))
(put-message
reply
(with-exception-handler
(lambda (exn)
(cons 'worker-thread-error exn))
(lambda ()
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"postgresql connection thread: exception: ~A\n"
exn)
(backtrace)
(raise-exception exn))
(lambda ()
(call-with-values
(lambda ()
(with-postgresql-transaction
conn
(lambda (conn)
(f conn))))
(lambda vals vals)))))
#:unwind? #t))
(loop))
(((? channel? reply) . (? list? args))
(put-message
reply
(with-exception-handler
(lambda (exn)
(cons 'worker-thread-error exn))
(lambda ()
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"postgresql connection thread: exception: ~A\n"
exn)
(backtrace)
(raise-exception exn))
(lambda ()
(call-with-values
(lambda ()
(apply exec-query
conn
args))
(lambda vals vals)))))
#:unwind? #t))
(loop))
(_ #f))))
#:statement-timeout statement-timeout))))
(iota threads))
channel)))
(define (close-postgresql-connection-channel channel)
(put-message channel #f))
(define (exec-query/through-channel channel . args)
(let ((reply (make-channel)))
(put-message channel (cons reply args))
(match (get-message reply)
(('worker-thread-error . exn)
(raise-exception exn))
(result
(apply values result)))))
(define* (with-postgresql-transaction/through-channel channel
f
#:key always-rollback?)
(let ((reply (make-channel)))
(put-message channel (list reply f always-rollback?))
(match (get-message reply)
(('worker-thread-error . exn)
(raise-exception exn))
(result
(apply values result)))))
(define* (with-postgresql-transaction conn f
#:key always-rollback?)
(exec-query conn "BEGIN;")

View File

@ -17,9 +17,18 @@
(define-module (guix-data-service utils)
#:use-module (srfi srfi-11)
#:use-module (ice-9 match)
#:use-module (ice-9 threads)
#:use-module (fibers)
#:use-module (fibers channels)
#:use-module (fibers conditions)
#:export (call-with-time-logging
with-time-logging
prevent-inlining-for-tests))
prevent-inlining-for-tests
parallel-via-thread-pool-channel
par-map&
letpar&))
(define (call-with-time-logging action thunk)
(simple-format #t "debug: Starting ~A\n" action)
@ -38,3 +47,94 @@
(define-syntax-rule (prevent-inlining-for-tests var)
(set! var var))
(define* (make-thread-pool-channel #:key (threads 8))
(parameterize (((@@ (fibers internal) current-fiber) #f))
(let ((channel (make-channel)))
(for-each
(lambda _
(call-with-new-thread
(lambda ()
(let loop ()
(match (get-message channel)
(((? channel? reply) . (? procedure? proc))
(put-message
reply
(with-exception-handler
(lambda (exn)
(cons 'worker-thread-error exn))
(lambda ()
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"worker thread: exception: ~A\n"
exn)
(backtrace)
(raise-exception exn))
(lambda ()
(call-with-values
proc
(lambda vals
vals)))))
#:unwind? #t))
(loop))
(_ #f))))))
(iota threads))
channel)))
(define %thread-pool-mutex (make-mutex))
(define %thread-pool-channel #f)
(define (make-thread-pool-channel!')
(with-mutex %thread-pool-mutex
(unless %thread-pool-channel
(set! %thread-pool-channel (make-thread-pool-channel))
(set! make-thread-pool-channel! (lambda () #t)))))
(define make-thread-pool-channel!
(lambda () (make-thread-pool-channel!')))
(define (defer-to-thread-pool-channel thunk)
(make-thread-pool-channel!)
(let ((reply (make-channel)))
(put-message %thread-pool-channel (cons reply thunk))
reply))
(define (fetch-result-of-defered-thunk reply-channel)
(match (get-message reply-channel)
(('worker-thread-error . exn)
(raise-exception exn))
(result
(apply values result))))
(define-syntax parallel-via-thread-pool-channel
(lambda (x)
(syntax-case x ()
((_ e0 ...)
(with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
#'(let ((tmp0 (defer-to-thread-pool-channel
(lambda ()
e0)))
...)
(values (fetch-result-of-defered-thunk tmp0) ...)))))))
(define-syntax-rule (letpar& ((v e) ...) b0 b1 ...)
(call-with-values
(lambda () (parallel-via-thread-pool-channel e ...))
(lambda (v ...)
b0 b1 ...)))
(define (par-mapper' mapper cons)
(lambda (proc . lists)
(let loop ((lists lists))
(match lists
(((heads tails ...) ...)
(let ((tail (defer-to-thread-pool-channel (loop tails)))
(head (apply proc heads)))
(cons head (fetch-result-of-defered-thunk tail))))
(_
'())))))
(define par-map& (par-mapper' map cons))