From 66a1c4d2425ad7a4016f7eab85639d714fb81172 Mon Sep 17 00:00:00 2001 From: shortcutme Date: Sat, 30 Nov 2019 02:07:30 +0100 Subject: [PATCH] Multi-process and gevent loop friendly lock --- src/Test/TestThreadPool.py | 46 ++++++++++++++++++++++++++++++++++++++ src/util/ThreadPool.py | 22 +++++++++++++++++- 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/src/Test/TestThreadPool.py b/src/Test/TestThreadPool.py index 6e081206..caf4863a 100644 --- a/src/Test/TestThreadPool.py +++ b/src/Test/TestThreadPool.py @@ -1,3 +1,5 @@ +import time + import gevent from util import ThreadPool @@ -27,3 +29,47 @@ class TestThreadPool: res = blocker() assert res == 10000000 + + def testLockBlockingSameThread(self): + from gevent.lock import Semaphore + + lock = Semaphore() + + s = time.time() + + def unlocker(): + time.sleep(1) + lock.release() + + gevent.spawn(unlocker) + lock.acquire(True) + lock.acquire(True, timeout=2) + + unlock_taken = time.time() - s + + assert 1.0 < unlock_taken < 1.5 + + def testLockBlockingDifferentThread(self): + lock = ThreadPool.Lock() + + s = time.time() + + def locker(): + lock.acquire(True) + time.sleep(1) + lock.release() + + pool = gevent.threadpool.ThreadPool(10) + pool.spawn(locker) + threads = [ + pool.spawn(locker), + ] + time.sleep(0.1) + + lock.acquire(True, 5.0) + + unlock_taken = time.time() - s + + assert 2.0 < unlock_taken < 2.5 + + gevent.joinall(threads) diff --git a/src/util/ThreadPool.py b/src/util/ThreadPool.py index 8fbb12fd..54f6e699 100644 --- a/src/util/ThreadPool.py +++ b/src/util/ThreadPool.py @@ -1,4 +1,6 @@ import gevent.threadpool +import gevent._threading +import threading class ThreadPool: @@ -17,6 +19,24 @@ class ThreadPool: return func def wrapper(*args, **kwargs): - return self.pool.apply(func, args, kwargs) + res = self.pool.apply(func, args, kwargs) + return res return wrapper + + +main_thread_id = threading.current_thread().ident +lock_pool = gevent.threadpool.ThreadPool(10) + + +class Lock: + def __init__(self): + self.lock = gevent._threading.Lock() + self.locked = self.lock.locked + self.release = self.lock.release + + def acquire(self, *args, **kwargs): + if self.locked() and threading.current_thread().ident == main_thread_id: + return lock_pool.apply(self.lock.acquire, args, kwargs) + else: + return self.lock.acquire(*args, **kwargs)