Add more exception handling to make-resource-pool
As I'm not sure it's working reliably.
This commit is contained in:
parent
da2a405e8b
commit
b2bf948a00
|
@ -99,94 +99,104 @@
|
|||
(let ((channel (make-channel)))
|
||||
(spawn-fiber
|
||||
(lambda ()
|
||||
(let loop ((resources '())
|
||||
(available '())
|
||||
(waiters '()))
|
||||
(while #t
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(simple-format
|
||||
(current-error-port)
|
||||
"exception in the ~A pool fiber: ~A\n"
|
||||
name
|
||||
exn))
|
||||
(lambda ()
|
||||
(let loop ((resources '())
|
||||
(available '())
|
||||
(waiters '()))
|
||||
|
||||
(match (get-message channel)
|
||||
(('checkout reply)
|
||||
(if (null? available)
|
||||
(if (= (length resources) max-size)
|
||||
(loop resources
|
||||
available
|
||||
(cons reply waiters))
|
||||
(let ((new-resource (initializer/safe)))
|
||||
(if new-resource
|
||||
(let ((checkout-success?
|
||||
(perform-operation
|
||||
(choice-operation
|
||||
(wrap-operation
|
||||
(put-operation reply new-resource)
|
||||
(const #t))
|
||||
(wrap-operation (sleep-operation 0.2)
|
||||
(const #f))))))
|
||||
(loop (cons new-resource resources)
|
||||
(if checkout-success?
|
||||
available
|
||||
(cons new-resource available))
|
||||
waiters))
|
||||
(match (get-message channel)
|
||||
(('checkout reply)
|
||||
(if (null? available)
|
||||
(if (= (length resources) max-size)
|
||||
(loop resources
|
||||
available
|
||||
(cons reply waiters)))))
|
||||
(let ((checkout-success?
|
||||
(perform-operation
|
||||
(choice-operation
|
||||
(wrap-operation
|
||||
(put-operation reply (car available))
|
||||
(const #t))
|
||||
(wrap-operation (sleep-operation 0.2)
|
||||
(const #f))))))
|
||||
(if checkout-success?
|
||||
(loop resources
|
||||
(cdr available)
|
||||
waiters)
|
||||
(loop resources
|
||||
available
|
||||
waiters)))))
|
||||
(('return resource)
|
||||
;; When a resource is returned, prompt all the waiters to request
|
||||
;; again. This is to avoid the pool waiting on channels that may
|
||||
;; be dead.
|
||||
(for-each
|
||||
(lambda (waiter)
|
||||
(spawn-fiber
|
||||
(lambda ()
|
||||
(perform-operation
|
||||
(choice-operation
|
||||
(put-operation waiter 'resource-pool-retry-checkout)
|
||||
(sleep-operation 0.2))))))
|
||||
waiters)
|
||||
(cons reply waiters))
|
||||
(let ((new-resource (initializer/safe)))
|
||||
(if new-resource
|
||||
(let ((checkout-success?
|
||||
(perform-operation
|
||||
(choice-operation
|
||||
(wrap-operation
|
||||
(put-operation reply new-resource)
|
||||
(const #t))
|
||||
(wrap-operation (sleep-operation 0.2)
|
||||
(const #f))))))
|
||||
(loop (cons new-resource resources)
|
||||
(if checkout-success?
|
||||
available
|
||||
(cons new-resource available))
|
||||
waiters))
|
||||
(loop resources
|
||||
available
|
||||
(cons reply waiters)))))
|
||||
(let ((checkout-success?
|
||||
(perform-operation
|
||||
(choice-operation
|
||||
(wrap-operation
|
||||
(put-operation reply (car available))
|
||||
(const #t))
|
||||
(wrap-operation (sleep-operation 0.2)
|
||||
(const #f))))))
|
||||
(if checkout-success?
|
||||
(loop resources
|
||||
(cdr available)
|
||||
waiters)
|
||||
(loop resources
|
||||
available
|
||||
waiters)))))
|
||||
(('return resource)
|
||||
;; When a resource is returned, prompt all the waiters to request
|
||||
;; again. This is to avoid the pool waiting on channels that may
|
||||
;; be dead.
|
||||
(for-each
|
||||
(lambda (waiter)
|
||||
(spawn-fiber
|
||||
(lambda ()
|
||||
(perform-operation
|
||||
(choice-operation
|
||||
(put-operation waiter 'resource-pool-retry-checkout)
|
||||
(sleep-operation 0.2))))))
|
||||
waiters)
|
||||
|
||||
(loop resources
|
||||
(cons resource available)
|
||||
;; clear waiters, as they've been notified
|
||||
'()))
|
||||
(('stats reply)
|
||||
(let ((stats
|
||||
`((resources . ,(length resources))
|
||||
(available . ,(length available))
|
||||
(waiters . ,(length waiters)))))
|
||||
(loop resources
|
||||
(cons resource available)
|
||||
;; clear waiters, as they've been notified
|
||||
'()))
|
||||
(('stats reply)
|
||||
(let ((stats
|
||||
`((resources . ,(length resources))
|
||||
(available . ,(length available))
|
||||
(waiters . ,(length waiters)))))
|
||||
|
||||
(perform-operation
|
||||
(choice-operation
|
||||
(wrap-operation
|
||||
(put-operation reply stats)
|
||||
(const #t))
|
||||
(wrap-operation (sleep-operation 0.2)
|
||||
(const #f)))))
|
||||
(perform-operation
|
||||
(choice-operation
|
||||
(wrap-operation
|
||||
(put-operation reply stats)
|
||||
(const #t))
|
||||
(wrap-operation (sleep-operation 0.2)
|
||||
(const #f)))))
|
||||
|
||||
(loop resources
|
||||
available
|
||||
waiters))
|
||||
(unknown
|
||||
(simple-format
|
||||
(current-error-port)
|
||||
"unrecognised message to ~A resource pool channel: ~A\n"
|
||||
name
|
||||
unknown)
|
||||
(loop resources
|
||||
available
|
||||
waiters))))))
|
||||
(loop resources
|
||||
available
|
||||
waiters))
|
||||
(unknown
|
||||
(simple-format
|
||||
(current-error-port)
|
||||
"unrecognised message to ~A resource pool channel: ~A\n"
|
||||
name
|
||||
unknown)
|
||||
(loop resources
|
||||
available
|
||||
waiters)))))
|
||||
#:unwind? #t))))
|
||||
|
||||
channel))
|
||||
|
||||
|
|
Loading…
Reference in New Issue