Spawn new threads for download limit (bug 693)
When the "download limit" value is changed, automatically start new threads if it makes sense (when there are still tasks waiting to be added and the amount of current threads is less than the download limit). Also let each thread check if it may continue to accept tasks instead of using "cancelling". This way, the number of threads more often is in line with the download limit setting. Thanks to "ad" for reporting this issue.
This commit is contained in:
parent
5ded66d403
commit
cd6f130ee1
|
@ -339,24 +339,20 @@ class DownloadURLOpener(urllib.FancyURLopener):
|
|||
|
||||
|
||||
class DownloadQueueWorker(threading.Thread):
|
||||
def __init__(self, queue, exit_callback):
|
||||
def __init__(self, queue, exit_callback, continue_check_callback):
|
||||
threading.Thread.__init__(self)
|
||||
self.queue = queue
|
||||
self.exit_callback = exit_callback
|
||||
self.cancelled = False
|
||||
|
||||
def stop_accepting_tasks(self):
|
||||
"""
|
||||
When this is called, the worker will not accept new tasks,
|
||||
but quit when the current task has been finished.
|
||||
"""
|
||||
if not self.cancelled:
|
||||
self.cancelled = True
|
||||
log('%s stopped accepting tasks.', self.getName(), sender=self)
|
||||
self.continue_check_callback = continue_check_callback
|
||||
|
||||
def run(self):
|
||||
log('Running new thread: %s', self.getName(), sender=self)
|
||||
while not self.cancelled:
|
||||
while True:
|
||||
# Check if this thread is allowed to continue accepting tasks
|
||||
if not self.continue_check_callback(self):
|
||||
log('%s must not accept new tasks.', self.getName(), sender=self)
|
||||
return
|
||||
|
||||
try:
|
||||
task = self.queue.pop()
|
||||
log('%s is processing: %s', self.getName(), task, sender=self)
|
||||
|
@ -379,21 +375,27 @@ class DownloadQueueManager(object):
|
|||
with self.worker_threads_access:
|
||||
self.worker_threads.remove(worker_thread)
|
||||
|
||||
def spawn_and_retire_threads(self, request_new_thread=False):
|
||||
def __continue_check_callback(self, worker_thread):
|
||||
with self.worker_threads_access:
|
||||
if len(self.worker_threads) > self._config.max_downloads and \
|
||||
self._config.max_downloads_enabled:
|
||||
# Tell the excessive amount of oldest worker threads to quit, but keep at least one
|
||||
count = min(len(self.worker_threads)-1, len(self.worker_threads)-self._config.max_downloads)
|
||||
for worker in self.worker_threads[:count]:
|
||||
worker.stop_accepting_tasks()
|
||||
self.worker_threads.remove(worker_thread)
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
if request_new_thread and (len(self.worker_threads) == 0 or \
|
||||
def spawn_threads(self):
|
||||
with self.worker_threads_access:
|
||||
if not len(self.tasks):
|
||||
return
|
||||
|
||||
if len(self.worker_threads) == 0 or \
|
||||
len(self.worker_threads) < self._config.max_downloads or \
|
||||
not self._config.max_downloads_enabled):
|
||||
not self._config.max_downloads_enabled:
|
||||
# We have to create a new thread here, there's work to do
|
||||
log('I am going to spawn a new worker thread.', sender=self)
|
||||
worker = DownloadQueueWorker(self.tasks, self.__exit_callback)
|
||||
worker = DownloadQueueWorker(self.tasks, self.__exit_callback, \
|
||||
self.__continue_check_callback)
|
||||
self.worker_threads.append(worker)
|
||||
worker.start()
|
||||
|
||||
|
@ -407,7 +409,7 @@ class DownloadQueueManager(object):
|
|||
task.episode.reload_from_db()
|
||||
task.status = DownloadTask.QUEUED
|
||||
self.tasks.appendleft(task)
|
||||
self.spawn_and_retire_threads(request_new_thread=True)
|
||||
self.spawn_threads()
|
||||
|
||||
|
||||
class DownloadTask(object):
|
||||
|
|
|
@ -222,7 +222,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
|
|||
self.config.connect_gtk_togglebutton('limit_rate', self.cbLimitDownloads)
|
||||
|
||||
# When the amount of maximum downloads changes, notify the queue manager
|
||||
changed_cb = lambda spinbutton: self.download_queue_manager.spawn_and_retire_threads()
|
||||
changed_cb = lambda spinbutton: self.download_queue_manager.spawn_threads()
|
||||
self.spinMaxDownloads.connect('value-changed', changed_cb)
|
||||
|
||||
self.default_title = 'gPodder'
|
||||
|
|
Loading…
Reference in New Issue