data-service/guix-data-service/utils.scm

314 lines
10 KiB
Scheme

;;; Guix Data Service -- Information about Guix over time
;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
;;;
;;; This program is free software: you can redistribute it and/or
;;; modify it under the terms of the GNU Affero General Public License
;;; as published by the Free Software Foundation, either version 3 of
;;; the License, or (at your option) any later version.
;;;
;;; This program is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;; Affero General Public License for more details.
;;;
;;; You should have received a copy of the GNU Affero General Public
;;; License along with this program. If not, see
;;; <http://www.gnu.org/licenses/>.
(define-module (guix-data-service utils)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-11)
#:use-module (ice-9 match)
#:use-module (ice-9 format)
#:use-module (ice-9 threads)
#:use-module (fibers)
#:use-module (fibers channels)
#:use-module (fibers operations)
#:use-module (fibers timers)
#:use-module (fibers conditions)
#:use-module (prometheus)
#:export (call-with-time-logging
with-time-logging
prevent-inlining-for-tests
thread-pool-channel
thread-pool-request-timeout
make-thread-pool-channel
parallel-via-thread-pool-channel
par-map&
letpar&
chunk
chunk!
chunk-for-each!
delete-duplicates/sort!
get-gc-metrics-updater))
(define (call-with-time-logging action thunk)
(simple-format #t "debug: Starting ~A\n" action)
(let ((start-time (current-time)))
(let-values
((result (thunk)))
(let ((time-taken (- (current-time) start-time)))
(simple-format #t "debug: Finished ~A, took ~A seconds\n"
action time-taken))
(apply values result))))
(define-syntax-rule (with-time-logging action exp ...)
"Log under NAME the time taken to evaluate EXP."
(call-with-time-logging action (lambda () exp ...)))
(define-syntax-rule (prevent-inlining-for-tests var)
(set! var var))
(define* (make-thread-pool-channel threads
#:key
idle-thunk
idle-seconds)
(define (delay-logger seconds-delayed)
(when (> seconds-delayed 1)
(format
(current-error-port)
"warning: thread pool delayed by ~1,2f seconds~%"
seconds-delayed)))
(let ((channel (make-channel)))
(for-each
(lambda _
(call-with-new-thread
(lambda ()
(let loop ()
(match (if idle-seconds
(perform-operation
(choice-operation
(get-operation channel)
(wrap-operation (sleep-operation idle-seconds)
(const 'timeout))))
(get-message channel))
('timeout
(when idle-thunk
(with-exception-handler
(lambda (exn)
(simple-format (current-error-port)
"worker thread idle thunk exception: ~A\n"
exn))
idle-thunk
#:unwind? #t))
(loop))
(((? channel? reply) sent-time (? procedure? proc))
(let ((time-delay
(- (get-internal-real-time)
sent-time)))
(delay-logger (/ time-delay
internal-time-units-per-second))
(put-message
reply
(with-exception-handler
(lambda (exn)
(cons 'worker-thread-error exn))
(lambda ()
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"worker thread: exception: ~A\n"
exn)
(backtrace)
(raise-exception exn))
(lambda ()
(call-with-values
proc
(lambda vals
vals)))))
#:unwind? #t)))
(loop))
(_ #f))))))
(iota threads))
channel))
(define &thread-pool-request-timeout
(make-exception-type '&thread-pool-request-timeout
&error
'()))
(define make-thread-pool-request-timeout-error
(record-constructor &thread-pool-request-timeout))
(define thread-pool-request-timeout-error?
(record-predicate &thread-pool-request-timeout))
(define thread-pool-channel
(make-parameter #f))
(define thread-pool-request-timeout
(make-parameter #f))
(define (defer-to-thread-pool-channel thunk)
(let ((reply (make-channel)))
(spawn-fiber
(lambda ()
(let ((val
(perform-operation
(let ((put
(wrap-operation
(put-operation (thread-pool-channel)
(list reply
(get-internal-real-time)
thunk))
(const 'success))))
(or
(and=> (thread-pool-request-timeout)
(lambda (timeout)
(choice-operation
put
(wrap-operation (sleep-operation timeout)
(const 'request-timeout)))))
put)))))
(when (eq? val 'request-timeout)
(put-message reply val)))))
reply))
(define (fetch-result-of-defered-thunks . reply-channels)
(let ((responses (map get-message
reply-channels)))
(map
(match-lambda
('request-timeout
(raise-exception
(make-thread-pool-request-timeout-error)))
(('worker-thread-error . exn)
(raise-exception exn))
(result
(apply values result)))
responses)))
(define-syntax parallel-via-thread-pool-channel
(lambda (x)
(syntax-case x ()
((_ e0 ...)
(with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
#'(let ((tmp0 (defer-to-thread-pool-channel
(lambda ()
e0)))
...)
(apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
(define-syntax-rule (letpar& ((v e) ...) b0 b1 ...)
(call-with-values
(lambda () (parallel-via-thread-pool-channel e ...))
(lambda (v ...)
b0 b1 ...)))
(define (par-mapper' mapper cons)
(lambda (proc . lists)
(let loop ((lists lists))
(match lists
(((heads tails ...) ...)
(let ((tail (loop tails))
(head (defer-to-thread-pool-channel
(lambda ()
(apply proc heads)))))
(cons (fetch-result-of-defered-thunks head) tail)))
(_
'())))))
(define par-map& (par-mapper' map cons))
(define (chunk lst max-length)
(if (> (length lst)
max-length)
(call-with-values (lambda ()
(split-at lst max-length))
(lambda (first-lst rest)
(cons first-lst
(chunk rest max-length))))
(list lst)))
(define (chunk! lst max-length)
(if (> (length lst)
max-length)
(call-with-values (lambda ()
(split-at! lst max-length))
(lambda (first-lst rest)
(cons first-lst
(chunk! rest max-length))))
(list lst)))
(define* (chunk-for-each! proc chunk-size #:rest lsts)
(define (do-one-iteration lsts)
(if (> (length (car lsts))
chunk-size)
(let ((chunks-and-rest
(map (lambda (lst)
(call-with-values (lambda ()
(split-at! lst chunk-size))
(lambda (first-lst rest)
(cons first-lst
rest))))
lsts)))
(apply proc
(map car chunks-and-rest))
(do-one-iteration
(map cdr chunks-and-rest)))
(apply proc lsts)))
(let ((list-lengths (map length lsts)))
(unless (eq? 1 (length (delete-duplicates list-lengths)))
(error "lists not equal length"))
(unless (eq? 0 (first list-lengths))
(do-one-iteration lsts)))
#t)
(define (delete-duplicates/sort! unsorted-lst less)
(if (null? unsorted-lst)
unsorted-lst
(let ((sorted-lst (sort! unsorted-lst less)))
(let loop ((lst (cdr sorted-lst))
(last-element (car sorted-lst))
(result (list (car sorted-lst))))
(if (null? lst)
result
(let ((current-element (car lst)))
(if (eq? current-element last-element)
(loop (cdr lst)
last-element
result)
(loop (cdr lst)
current-element
(cons current-element
result)))))))))
(define (get-gc-metrics-updater registry)
(define metrics
`((gc-time-taken
. ,(make-gauge-metric registry "guile_gc_time_taken"))
(heap-size
. ,(make-gauge-metric registry "guile_heap_size"))
(heap-free-size
. ,(make-gauge-metric registry "guile_heap_free_size"))
(heap-total-allocated
. ,(make-gauge-metric registry "guile_heap_total_allocated"))
(heap-allocated-since-gc
. ,(make-gauge-metric registry "guile_allocated_since_gc"))
(protected-objects
. ,(make-gauge-metric registry "guile_gc_protected_objects"))
(gc-times
. ,(make-gauge-metric registry "guile_gc_times"))))
(lambda ()
(let ((stats (gc-stats)))
(for-each
(match-lambda
((name . metric)
(let ((value (assq-ref stats name)))
(metric-set metric value))))
metrics))))