Allow reordering of the download queue

This commit is contained in:
Adam Voss 2016-11-14 21:54:21 -06:00
parent 05d9b4a56a
commit 27dbe80782
5 changed files with 126 additions and 75 deletions

View File

@ -371,7 +371,7 @@
<property name="headers_visible">False</property>
<property name="rules_hint">False</property>
<property name="rubber-banding">True</property>
<property name="reorderable">False</property>
<property name="reorderable">True</property>
<property name="enable_search">True</property>
<property name="fixed_height_mode">False</property>
<property name="hover_selection">False</property>

View File

@ -337,45 +337,48 @@ class DownloadURLOpener(urllib.FancyURLopener):
class DownloadQueueWorker(object):
def __init__(self, queue, exit_callback, continue_check_callback, minimum_tasks):
def __init__(self, queue, exit_callback, continue_check_callback):
self.queue = queue
self.exit_callback = exit_callback
self.continue_check_callback = continue_check_callback
# The minimum amount of tasks that should be downloaded by this worker
# before using the continue_check_callback to determine if it might
# continue accepting tasks. This can be used to forcefully start a
# download, even if a download limit is in effect.
self.minimum_tasks = minimum_tasks
def __repr__(self):
return threading.current_thread().getName()
def run(self):
logger.info('Starting new thread: %s', self)
while True:
# Check if this thread is allowed to continue accepting tasks
# (But only after reducing minimum_tasks to zero - see above)
if self.minimum_tasks > 0:
self.minimum_tasks -= 1
elif not self.continue_check_callback(self):
if not self.continue_check_callback(self):
return
try:
task = self.queue.pop()
task = self.queue.get_next()
logger.info('%s is processing: %s', self, task)
task.run()
task.recycle()
except IndexError, e:
except StopIteration, e:
logger.info('No more tasks for %s to carry out.', self)
break
self.exit_callback(self)
class ForceDownloadWorker(object):
def __init__(self, task):
self.task = task
def __repr__(self):
return threading.current_thread().getName()
def run(self):
logger.info('Starting new thread: %s', self)
logger.info('%s is processing: %s', self, self.task)
self.task.run()
class DownloadQueueManager(object):
def __init__(self, config):
def __init__(self, config, queue):
self._config = config
self.tasks = collections.deque()
self.tasks = queue
self.worker_threads_access = threading.RLock()
self.worker_threads = []
@ -393,61 +396,37 @@ class DownloadQueueManager(object):
else:
return True
def spawn_threads(self, force_start=False):
def __spawn_threads(self):
"""Spawn new worker threads if necessary
If force_start is True, forcefully spawn a thread and
let it process at least one episodes, even if a download
limit is in effect at the moment.
"""
with self.worker_threads_access:
if not len(self.tasks):
if not self.tasks.has_work():
return
if force_start or len(self.worker_threads) == 0 or \
if len(self.worker_threads) == 0 or \
len(self.worker_threads) < self._config.max_downloads or \
not self._config.max_downloads_enabled:
# We have to create a new thread here, there's work to do
logger.info('Starting new worker thread.')
# The new worker should process at least one task (the one
# that we want to forcefully start) if force_start is True.
if force_start:
minimum_tasks = 1
else:
minimum_tasks = 0
worker = DownloadQueueWorker(self.tasks, self.__exit_callback,
self.__continue_check_callback, minimum_tasks)
self.__continue_check_callback)
self.worker_threads.append(worker)
util.run_in_background(worker.run)
def are_queued_or_active_tasks(self):
with self.worker_threads_access:
return len(self.worker_threads) > 0
def update_max_downloads(self):
self.__spawn_threads()
def add_task(self, task, force_start=False):
"""Add a new task to the download queue
def force_start_task(self, task):
if self.tasks.set_downloading(task):
worker = ForceDownloadWorker(task)
util.run_in_background(worker.run)
If force_start is True, ignore the download limit
and forcefully start the download right away.
def queue_task(self, task):
"""Marks a task as queued
"""
if task.status != DownloadTask.INIT:
# Remove the task from its current position in the
# download queue (if any) to avoid race conditions
# where two worker threads download the same file
try:
self.tasks.remove(task)
except ValueError, e:
pass
task.status = DownloadTask.QUEUED
if force_start:
# Add the task to be taken on next pop
self.tasks.append(task)
else:
# Add the task to the end of the queue
self.tasks.appendleft(task)
self.spawn_threads(force_start)
self.__spawn_threads()
class DownloadTask(object):
@ -739,8 +718,8 @@ class DownloadTask(object):
self.speed = 0.0
return False
# We only start this download if its status is "queued"
if self.status != DownloadTask.QUEUED:
# We only start this download if its status is "downloading"
if self.status != DownloadTask.DOWNLOADING:
return False
# We are downloading this file right now

View File

@ -23,6 +23,8 @@
# Based on code from gpodder.services (thp, 2007-08-24)
#
from __future__ import with_statement
import gpodder
from gpodder import util
@ -32,6 +34,7 @@ from gi.repository import Gtk
import cgi
import collections
import threading
_ = gpodder.gettext
@ -45,6 +48,8 @@ class DownloadStatusModel(Gtk.ListStore):
def __init__(self):
Gtk.ListStore.__init__(self, object, str, str, int, str, str)
self.set_downloading_access = threading.RLock()
# Set up stock icon IDs for tasks
self._status_ids = collections.defaultdict(lambda: None)
self._status_ids[download.DownloadTask.DOWNLOADING] = 'go-down'
@ -138,6 +143,27 @@ class DownloadStatusModel(Gtk.ListStore):
return False
def has_work(self):
return any(task for task in
(row[DownloadStatusModel.C_TASK] for row in self)
if task.status == task.QUEUED)
def get_next(self):
with self.set_downloading_access:
result = next(task for task in
(row[DownloadStatusModel.C_TASK] for row in self)
if task.status == task.QUEUED)
self.set_downloading(result)
return result
def set_downloading(self, task):
with self.set_downloading_access:
if task.status is task.DOWNLOADING:
# Task was already set as DOWNLOADING by get_next
return False
task.status = task.DOWNLOADING
return True
class DownloadTaskMonitor(object):
"""A helper class that abstracts download events"""

View File

@ -158,7 +158,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
self.new_episodes_window = None
self.download_status_model = DownloadStatusModel()
self.download_queue_manager = download.DownloadQueueManager(self.config)
self.download_queue_manager = download.DownloadQueueManager(self.config, self.download_status_model)
self.config.connect_gtk_spinbutton('max_downloads', self.spinMaxDownloads)
self.config.connect_gtk_togglebutton('max_downloads_enabled', self.cbMaxDownloads)
@ -166,7 +166,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
self.config.connect_gtk_togglebutton('limit_rate', self.cbLimitDownloads)
# When the amount of maximum downloads changes, notify the queue manager
changed_cb = lambda spinbutton: self.download_queue_manager.spawn_threads()
changed_cb = lambda spinbutton: self.download_queue_manager.update_max_downloads()
self.spinMaxDownloads.connect('value-changed', changed_cb)
# Keep a reference to the last add podcast dialog instance
@ -1514,7 +1514,10 @@ class gPodder(BuilderWidget, dbus.service.Object):
if status == download.DownloadTask.QUEUED:
# Only queue task when its paused/failed/cancelled (or forced)
if task.status in (task.PAUSED, task.FAILED, task.CANCELLED) or force_start:
self.download_queue_manager.add_task(task, force_start)
if force_start:
self.download_queue_manager.force_start_task(task)
else:
self.download_queue_manager.queue_task(task)
self.enable_download_list_update()
elif status == download.DownloadTask.CANCELLED:
# Cancelling a download allowed when downloading/queued
@ -1565,25 +1568,59 @@ class gPodder(BuilderWidget, dbus.service.Object):
selected_tasks, can_queue, can_cancel, can_pause, can_remove, can_force = \
self.downloads_list_get_selection(model, paths)
def make_menu_item(label, icon_name, tasks, status, sensitive, force_start=False):
def make_menu_item(label, icon_name, tasks=None, status=None, sensitive=True, force_start=False, action=None):
# This creates a menu item for selection-wide actions
item = Gtk.ImageMenuItem.new_with_mnemonic(label)
if icon_name is not None:
item.set_image(Gtk.Image.new_from_icon_name(icon_name, Gtk.IconSize.MENU))
item.connect('activate', lambda item: self._for_each_task_set_status(tasks, status, force_start))
if action is not None:
item.connect('activate', action)
else:
item.connect('activate', lambda item: self._for_each_task_set_status(tasks, status, force_start))
item.set_sensitive(sensitive)
return item
def move_selected_items_up(menu_item):
selection = self.treeDownloads.get_selection()
model, selected_paths = selection.get_selected_rows()
for path in selected_paths:
index_above = path[0]-1
if index_above < 0:
return
task = model.get_value(
model.get_iter(path),
DownloadStatusModel.C_TASK)
model.move_before(
model.get_iter(path),
model.get_iter((index_above,)))
def move_selected_items_down(menu_item):
selection = self.treeDownloads.get_selection()
model, selected_paths = selection.get_selected_rows()
for path in reversed(selected_paths):
index_below = path[0]+1
if index_below >= len(model):
return
task = model.get_value(
model.get_iter(path),
DownloadStatusModel.C_TASK)
model.move_after(
model.get_iter(path),
model.get_iter((index_below,)))
menu = Gtk.Menu()
if can_force:
menu.append(make_menu_item(_('Start download now'), 'go-down', selected_tasks, download.DownloadTask.QUEUED, True, True))
menu.append(make_menu_item(_('Start download now'), 'document-save', selected_tasks, download.DownloadTask.QUEUED, force_start=True))
else:
menu.append(make_menu_item(_('Download'), 'go-down', selected_tasks, download.DownloadTask.QUEUED, can_queue, False))
menu.append(make_menu_item(_('Download'), 'document-save', selected_tasks, download.DownloadTask.QUEUED, can_queue))
menu.append(make_menu_item(_('Cancel'), 'media-playback-stop', selected_tasks, download.DownloadTask.CANCELLED, can_cancel))
menu.append(make_menu_item(_('Pause'), 'media-playback-pause', selected_tasks, download.DownloadTask.PAUSED, can_pause))
menu.append(Gtk.SeparatorMenuItem())
menu.append(make_menu_item(_('Remove from list'), 'list-remove', selected_tasks, None, can_remove))
menu.append(make_menu_item(_('Move up'), 'go-up', action=move_selected_items_up))
menu.append(make_menu_item(_('Move down'), 'go-down', action=move_selected_items_down))
menu.append(Gtk.SeparatorMenuItem())
menu.append(make_menu_item(_('Remove from list'), 'list-remove', selected_tasks, sensitive=can_remove))
menu.show_all()
@ -1791,7 +1828,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
if not can_cancel:
item = Gtk.ImageMenuItem(_('Download'))
item.set_image(Gtk.Image.new_from_icon_name('go-down', Gtk.IconSize.MENU))
item.set_image(Gtk.Image.new_from_icon_name('document-save', Gtk.IconSize.MENU))
item.set_action_name('win.download')
menu.append(item)
else:
@ -2833,7 +2870,10 @@ class gPodder(BuilderWidget, dbus.service.Object):
if episode.url == task.url:
task_exists = True
if task.status not in (task.DOWNLOADING, task.QUEUED):
self.download_queue_manager.add_task(task, force_start)
if force_start:
self.download_queue_manager.force_start_task(task)
else:
self.download_queue_manager.queue_task(task)
enable_update = True
continue
@ -2849,13 +2889,20 @@ class gPodder(BuilderWidget, dbus.service.Object):
logger.error('While downloading %s', episode.title, exc_info=True)
continue
if add_paused:
task.status = task.PAUSED
else:
self.mygpo_client.on_download([task.episode])
self.download_queue_manager.add_task(task, force_start)
# New Task, we must wait on the GTK Loop
self.download_status_model.register_task(task)
def queue_task(task):
if add_paused:
task.status = task.PAUSED
else:
self.mygpo_client.on_download([task.episode])
if force_start:
self.download_queue_manager.force_start_task(task)
else:
self.download_queue_manager.queue_task(task)
# Executes after task has been registered
util.idle_add(queue_task, task)
enable_update = True
if enable_update:
@ -3310,7 +3357,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
if task.status in (task.DOWNLOADING, task.QUEUED):
task.status = task.PAUSED
elif task.status in (task.CANCELLED, task.PAUSED, task.FAILED):
self.download_queue_manager.add_task(task)
self.download_queue_manager.queue_task(task)
self.enable_download_list_update()
elif task.status == task.DONE:
model.remove(model.get_iter(tree_row_reference.get_path()))
@ -3649,7 +3696,6 @@ class gPodderApplication(Gtk.Application):
def on_check_for_updates_activate(self, action, param):
self.window.check_for_updates(silent=False)
def main(options=None):
GObject.set_application_name('gPodder')

View File

@ -226,7 +226,7 @@ class Device(services.ObservableService):
sync_task.status=sync_task.QUEUED
sync_task.device=self
self.download_status_model.register_task(sync_task)
self.download_queue_manager.add_task(sync_task)
self.download_queue_manager.queue_task(sync_task)
else:
logger.warning("No episodes to sync")