mirror of
git://git.savannah.gnu.org/guix/data-service.git
synced 2023-12-14 03:23:03 +01:00
d06230fcf4
I think the idle connections associated with idle threads are still taking up memory, so especially now that you can configure an arbitrary number of threads (and thus connections), I think it's good to close them regularly.
309 lines
9.9 KiB
Scheme
309 lines
9.9 KiB
Scheme
;;; Guix Data Service -- Information about Guix over time
|
|
;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
|
|
;;;
|
|
;;; This program is free software: you can redistribute it and/or
|
|
;;; modify it under the terms of the GNU Affero General Public License
|
|
;;; as published by the Free Software Foundation, either version 3 of
|
|
;;; the License, or (at your option) any later version.
|
|
;;;
|
|
;;; This program is distributed in the hope that it will be useful,
|
|
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
;;; Affero General Public License for more details.
|
|
;;;
|
|
;;; You should have received a copy of the GNU Affero General Public
|
|
;;; License along with this program. If not, see
|
|
;;; <http://www.gnu.org/licenses/>.
|
|
|
|
(define-module (guix-data-service utils)
|
|
#:use-module (srfi srfi-1)
|
|
#:use-module (srfi srfi-11)
|
|
#:use-module (ice-9 match)
|
|
#:use-module (ice-9 format)
|
|
#:use-module (ice-9 threads)
|
|
#:use-module (fibers)
|
|
#:use-module (fibers channels)
|
|
#:use-module (fibers operations)
|
|
#:use-module (fibers timers)
|
|
#:use-module (fibers conditions)
|
|
#:use-module (prometheus)
|
|
#:export (call-with-time-logging
|
|
with-time-logging
|
|
prevent-inlining-for-tests
|
|
|
|
%thread-pool-threads
|
|
%thread-pool-idle-seconds
|
|
%thread-pool-idle-thunk
|
|
parallel-via-thread-pool-channel
|
|
par-map&
|
|
letpar&
|
|
|
|
chunk
|
|
chunk!
|
|
chunk-for-each!
|
|
|
|
delete-duplicates/sort!
|
|
|
|
get-gc-metrics-updater))
|
|
|
|
(define (call-with-time-logging action thunk)
|
|
(simple-format #t "debug: Starting ~A\n" action)
|
|
(let ((start-time (current-time)))
|
|
(let-values
|
|
((result (thunk)))
|
|
(let ((time-taken (- (current-time) start-time)))
|
|
(simple-format #t "debug: Finished ~A, took ~A seconds\n"
|
|
action time-taken))
|
|
(apply values result))))
|
|
|
|
(define-syntax-rule (with-time-logging action exp ...)
|
|
"Log under NAME the time taken to evaluate EXP."
|
|
(call-with-time-logging action (lambda () exp ...)))
|
|
|
|
(define-syntax-rule (prevent-inlining-for-tests var)
|
|
(set! var var))
|
|
|
|
(define %thread-pool-threads
|
|
(make-parameter 8))
|
|
|
|
(define %thread-pool-idle-seconds
|
|
(make-parameter #f))
|
|
|
|
(define %thread-pool-idle-thunk
|
|
(make-parameter #f))
|
|
|
|
(define* (make-thread-pool-channel threads)
|
|
(define (delay-logger seconds-delayed)
|
|
(when (> seconds-delayed 1)
|
|
(format
|
|
(current-error-port)
|
|
"warning: thread pool delayed by ~1,2f seconds~%"
|
|
seconds-delayed)))
|
|
|
|
(define idle-thunk
|
|
(%thread-pool-idle-thunk))
|
|
|
|
(define idle-seconds
|
|
(%thread-pool-idle-seconds))
|
|
|
|
(let ((channel (make-channel)))
|
|
(for-each
|
|
(lambda _
|
|
(call-with-new-thread
|
|
(lambda ()
|
|
(let loop ()
|
|
(match (if idle-seconds
|
|
(perform-operation
|
|
(choice-operation
|
|
(get-operation channel)
|
|
(wrap-operation (sleep-operation idle-seconds)
|
|
(const 'timeout))))
|
|
(get-message channel))
|
|
('timeout
|
|
(when idle-thunk
|
|
(with-exception-handler
|
|
(lambda (exn)
|
|
(simple-format (current-error-port)
|
|
"worker thread idle thunk exception: ~A\n"
|
|
exn))
|
|
idle-thunk
|
|
#:unwind? #t))
|
|
|
|
(loop))
|
|
|
|
(((? channel? reply) sent-time (? procedure? proc))
|
|
(let ((time-delay
|
|
(- (get-internal-real-time)
|
|
sent-time)))
|
|
(delay-logger (/ time-delay
|
|
internal-time-units-per-second))
|
|
(put-message
|
|
reply
|
|
(with-exception-handler
|
|
(lambda (exn)
|
|
(cons 'worker-thread-error exn))
|
|
(lambda ()
|
|
(with-exception-handler
|
|
(lambda (exn)
|
|
(simple-format
|
|
(current-error-port)
|
|
"worker thread: exception: ~A\n"
|
|
exn)
|
|
(backtrace)
|
|
(raise-exception exn))
|
|
(lambda ()
|
|
(call-with-values
|
|
proc
|
|
(lambda vals
|
|
vals)))))
|
|
#:unwind? #t)))
|
|
(loop))
|
|
(_ #f))))))
|
|
(iota threads))
|
|
channel))
|
|
|
|
(define %thread-pool-mutex (make-mutex))
|
|
(define %thread-pool-channel #f)
|
|
|
|
(define (make-thread-pool-channel!')
|
|
(with-mutex %thread-pool-mutex
|
|
(unless %thread-pool-channel
|
|
(set! %thread-pool-channel (make-thread-pool-channel
|
|
(%thread-pool-threads)))
|
|
(set! make-thread-pool-channel! (lambda () #t)))))
|
|
|
|
(define make-thread-pool-channel!
|
|
(lambda () (make-thread-pool-channel!')))
|
|
|
|
(define (defer-to-thread-pool-channel thunk)
|
|
(make-thread-pool-channel!)
|
|
(let ((reply (make-channel)))
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(put-message %thread-pool-channel (list reply
|
|
(get-internal-real-time)
|
|
thunk))))
|
|
reply))
|
|
|
|
(define (fetch-result-of-defered-thunk reply-channel)
|
|
(match (get-message reply-channel)
|
|
(('worker-thread-error . exn)
|
|
(raise-exception exn))
|
|
(result
|
|
(apply values result))))
|
|
|
|
(define (fetch-result-of-defered-thunks . reply-channels)
|
|
(let ((responses (map get-message reply-channels)))
|
|
(map
|
|
(match-lambda
|
|
(('worker-thread-error . exn)
|
|
(raise-exception exn))
|
|
(result
|
|
(apply values result)))
|
|
responses)))
|
|
|
|
(define-syntax parallel-via-thread-pool-channel
|
|
(lambda (x)
|
|
(syntax-case x ()
|
|
((_ e0 ...)
|
|
(with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
|
|
#'(let ((tmp0 (defer-to-thread-pool-channel
|
|
(lambda ()
|
|
e0)))
|
|
...)
|
|
(apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
|
|
|
|
(define-syntax-rule (letpar& ((v e) ...) b0 b1 ...)
|
|
(call-with-values
|
|
(lambda () (parallel-via-thread-pool-channel e ...))
|
|
(lambda (v ...)
|
|
b0 b1 ...)))
|
|
|
|
(define (par-mapper' mapper cons)
|
|
(lambda (proc . lists)
|
|
(let loop ((lists lists))
|
|
(match lists
|
|
(((heads tails ...) ...)
|
|
(let ((tail (loop tails))
|
|
(head (defer-to-thread-pool-channel
|
|
(lambda ()
|
|
(apply proc heads)))))
|
|
(cons (fetch-result-of-defered-thunk head) tail)))
|
|
(_
|
|
'())))))
|
|
|
|
(define par-map& (par-mapper' map cons))
|
|
|
|
(define (chunk lst max-length)
|
|
(if (> (length lst)
|
|
max-length)
|
|
(call-with-values (lambda ()
|
|
(split-at lst max-length))
|
|
(lambda (first-lst rest)
|
|
(cons first-lst
|
|
(chunk rest max-length))))
|
|
(list lst)))
|
|
|
|
(define (chunk! lst max-length)
|
|
(if (> (length lst)
|
|
max-length)
|
|
(call-with-values (lambda ()
|
|
(split-at! lst max-length))
|
|
(lambda (first-lst rest)
|
|
(cons first-lst
|
|
(chunk! rest max-length))))
|
|
(list lst)))
|
|
|
|
(define* (chunk-for-each! proc chunk-size #:rest lsts)
|
|
(define (do-one-iteration lsts)
|
|
(if (> (length (car lsts))
|
|
chunk-size)
|
|
(let ((chunks-and-rest
|
|
(map (lambda (lst)
|
|
(call-with-values (lambda ()
|
|
(split-at! lst chunk-size))
|
|
(lambda (first-lst rest)
|
|
(cons first-lst
|
|
rest))))
|
|
lsts)))
|
|
(apply proc
|
|
(map car chunks-and-rest))
|
|
(do-one-iteration
|
|
(map cdr chunks-and-rest)))
|
|
(apply proc lsts)))
|
|
|
|
(let ((list-lengths (map length lsts)))
|
|
(unless (eq? 1 (length (delete-duplicates list-lengths)))
|
|
(error "lists not equal length"))
|
|
|
|
(unless (eq? 0 (first list-lengths))
|
|
(do-one-iteration lsts)))
|
|
|
|
#t)
|
|
|
|
(define (delete-duplicates/sort! unsorted-lst less)
|
|
(if (null? unsorted-lst)
|
|
unsorted-lst
|
|
(let ((sorted-lst (sort! unsorted-lst less)))
|
|
|
|
(let loop ((lst (cdr sorted-lst))
|
|
(last-element (car sorted-lst))
|
|
(result (list (car sorted-lst))))
|
|
(if (null? lst)
|
|
result
|
|
(let ((current-element (car lst)))
|
|
(if (eq? current-element last-element)
|
|
(loop (cdr lst)
|
|
last-element
|
|
result)
|
|
(loop (cdr lst)
|
|
current-element
|
|
(cons current-element
|
|
result)))))))))
|
|
|
|
(define (get-gc-metrics-updater registry)
|
|
(define metrics
|
|
`((gc-time-taken
|
|
. ,(make-gauge-metric registry "guile_gc_time_taken"))
|
|
(heap-size
|
|
. ,(make-gauge-metric registry "guile_heap_size"))
|
|
(heap-free-size
|
|
. ,(make-gauge-metric registry "guile_heap_free_size"))
|
|
(heap-total-allocated
|
|
. ,(make-gauge-metric registry "guile_heap_total_allocated"))
|
|
(heap-allocated-since-gc
|
|
. ,(make-gauge-metric registry "guile_allocated_since_gc"))
|
|
(protected-objects
|
|
. ,(make-gauge-metric registry "guile_gc_protected_objects"))
|
|
(gc-times
|
|
. ,(make-gauge-metric registry "guile_gc_times"))))
|
|
|
|
(lambda ()
|
|
(let ((stats (gc-stats)))
|
|
(for-each
|
|
(match-lambda
|
|
((name . metric)
|
|
(let ((value (assq-ref stats name)))
|
|
(metric-set metric value))))
|
|
metrics))))
|
|
|