Allow pass calls to the main loop
This commit is contained in:
parent
c1df78b97f
commit
8a5a75e68f
|
@ -1,6 +1,10 @@
|
|||
import threading
|
||||
import time
|
||||
|
||||
import gevent
|
||||
import gevent.monkey
|
||||
import gevent.threadpool
|
||||
import gevent._threading
|
||||
import threading
|
||||
|
||||
|
||||
class ThreadPool:
|
||||
|
@ -29,8 +33,12 @@ class ThreadPool:
|
|||
return wrapper
|
||||
|
||||
|
||||
lock_pool = gevent.threadpool.ThreadPool(50)
|
||||
main_thread_id = threading.current_thread().ident
|
||||
lock_pool = gevent.threadpool.ThreadPool(10)
|
||||
|
||||
|
||||
def isMainThread():
|
||||
return threading.current_thread().ident == main_thread_id
|
||||
|
||||
|
||||
class Lock:
|
||||
|
@ -40,7 +48,86 @@ class Lock:
|
|||
self.release = self.lock.release
|
||||
|
||||
def acquire(self, *args, **kwargs):
|
||||
if self.locked() and threading.current_thread().ident == main_thread_id:
|
||||
if self.locked() and isMainThread():
|
||||
# Start in new thread to avoid blocking gevent loop
|
||||
return lock_pool.apply(self.lock.acquire, args, kwargs)
|
||||
else:
|
||||
return self.lock.acquire(*args, **kwargs)
|
||||
|
||||
def __del__(self):
|
||||
while self.locked():
|
||||
self.release()
|
||||
|
||||
|
||||
class Event:
|
||||
def __init__(self):
|
||||
self.get_lock = Lock()
|
||||
self.res = None
|
||||
self.get_lock.acquire(False)
|
||||
self.done = False
|
||||
|
||||
def set(self, res):
|
||||
if self.done:
|
||||
raise Exception("Event already has value")
|
||||
self.res = res
|
||||
self.get_lock.release()
|
||||
self.done = True
|
||||
|
||||
def get(self):
|
||||
if not self.done:
|
||||
self.get_lock.acquire(True)
|
||||
if self.get_lock.locked():
|
||||
self.get_lock.release()
|
||||
back = self.res
|
||||
return back
|
||||
|
||||
def __del__(self):
|
||||
self.res = None
|
||||
while self.get_lock.locked():
|
||||
self.get_lock.release()
|
||||
|
||||
|
||||
# Execute function calls in main loop from other threads
|
||||
class MainLoopCaller():
|
||||
def __init__(self):
|
||||
self.queue_call = gevent._threading.Queue()
|
||||
|
||||
self.pool = gevent.threadpool.ThreadPool(1)
|
||||
self.num_direct = 0
|
||||
|
||||
def caller(self, func, args, kwargs, event_done):
|
||||
try:
|
||||
res = func(*args, **kwargs)
|
||||
event_done.set((True, res))
|
||||
except Exception as err:
|
||||
event_done.set((False, err))
|
||||
|
||||
def start(self):
|
||||
gevent.spawn(self.run)
|
||||
time.sleep(0.001)
|
||||
|
||||
def run(self):
|
||||
while 1:
|
||||
if self.queue_call.qsize() == 0: # Get queue in new thread to avoid gevent blocking
|
||||
func, args, kwargs, event_done = self.pool.apply(self.queue_call.get)
|
||||
else:
|
||||
func, args, kwargs, event_done = self.queue_call.get()
|
||||
gevent.spawn(self.caller, func, args, kwargs, event_done)
|
||||
del func, args, kwargs, event_done
|
||||
|
||||
def call(self, func, *args, **kwargs):
|
||||
if threading.current_thread().ident == main_thread_id:
|
||||
return func(*args, **kwargs)
|
||||
else:
|
||||
event_done = Event()
|
||||
self.queue_call.put((func, args, kwargs, event_done))
|
||||
success, res = event_done.get()
|
||||
del event_done
|
||||
self.queue_call.task_done()
|
||||
if success:
|
||||
return res
|
||||
else:
|
||||
raise res
|
||||
main_loop = MainLoopCaller()
|
||||
main_loop.start()
|
||||
patchSleep()
|
||||
|
|
Loading…
Reference in New Issue