From 153b49c952eb0238329355590fc7d965ceb504e8 Mon Sep 17 00:00:00 2001 From: Mathieu Othacehe Date: Sat, 1 Aug 2020 11:56:43 +0200 Subject: [PATCH] database: Warn if the worker is busy for more than 5 seconds. * src/cuirass/utils.scm (with-operation, get-message-with-timeout): New procedures, (call-with-worker-thread): add timeout and timeout-proc arguments. * src/cuirass/database.scm (with-db-worker-thread): Pass a 5 seconds timeout to call-with-worker-thread, and print a debug message on timeout expiration. --- src/cuirass/database.scm | 13 ++++++++-- src/cuirass/utils.scm | 51 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/src/cuirass/database.scm b/src/cuirass/database.scm index de6b245..7bb5bd2 100644 --- a/src/cuirass/database.scm +++ b/src/cuirass/database.scm @@ -22,6 +22,7 @@ ;;; along with Cuirass. If not, see . (define-module (cuirass database) + #:use-module (cuirass logging) #:use-module (cuirass config) #:use-module (cuirass utils) #:use-module (ice-9 match) @@ -184,8 +185,16 @@ specified." "Evaluate EXP... in the critical section corresponding to %DB-CHANNEL. DB is bound to the argument of that critical section: the database connection." - (call-with-worker-thread (%db-channel) - (lambda (db) exp ...))) + (let ((timeout 5)) + (call-with-worker-thread + (%db-channel) + (lambda (db) exp ...) + #:timeout timeout + #:timeout-proc + (lambda () + (log-message + (format #f "Database worker unresponsive for ~a seconds." + (number->string timeout))))))) (define (read-sql-file file-name) "Return a list of string containing SQL instructions from FILE-NAME." diff --git a/src/cuirass/utils.scm b/src/cuirass/utils.scm index e2a6fa3..00cfef6 100644 --- a/src/cuirass/utils.scm +++ b/src/cuirass/utils.scm @@ -29,6 +29,8 @@ #:use-module (json) #:use-module (fibers) #:use-module (fibers channels) + #:use-module (fibers operations) + #:use-module (fibers timers) #:export (alist? object->json-scm object->json-string @@ -124,15 +126,58 @@ arguments of the worker thread procedure." (iota parallelism)) channel))) -(define (call-with-worker-thread channel proc) +(define* (with-timeout op #:key (seconds 0.05) (wrap values)) + "Return an operation that succeeds if the given OP succeeds or if SECONDS +have elapsed. In the first case, the result of OP is returned and in the +second case, the wrapping procedure WRAP is called and its result returned." + (choice-operation op + (wrap-operation (sleep-operation seconds) wrap))) + +(define* (get-message-with-timeout channel + #:key + seconds + (retry? #t) + timeout-proc) + "Perform a get-operation on CHANNEL with a timeout set to SECONDS. If the +timout expires and RETRY? is set to false, return 'timeout. If RETRY is true, +call the TIMEOUT-PROC procedure on timeout and retry the get-operation until +it succeeds." + (define (get-message*) + (perform-operation + (with-timeout + (get-operation channel) + #:seconds seconds + #:wrap (const 'timeout)))) + + (let ((res (get-message*))) + (if retry? + (begin + (let loop ((res res)) + (if (eq? res 'timeout) + (begin + (and timeout-proc (timeout-proc)) + (loop (get-message*))) + res))) + res))) + +(define* (call-with-worker-thread channel proc + #:key + timeout + timeout-proc) "Send PROC to the worker thread through CHANNEL. Return the result of PROC. -If already in the worker thread, call PROC immediately." +If already in the worker thread, call PROC immediately. If TIMEOUT is set to +a duration in seconds, TIMEOUT-PROC is called every time a delay of TIMEOUT +seconds expires, without a response from the worker thread." (let ((args (%worker-thread-args))) (if args (apply proc args) (let ((reply (make-channel))) (put-message channel (cons reply proc)) - (match (get-message reply) + (match (if (and timeout (current-fiber)) + (get-message-with-timeout reply + #:seconds timeout + #:timeout-proc timeout-proc) + (get-message reply)) (('worker-thread-error key args ...) (apply throw key args)) (result result))))))