fix race where download ui updates could be turned off for subsequent tasks if after adding a task it completed very quickly on another thread; fix use of Gtk.ListView as a multi-threaded container (sometimes returns None for an item and causes an exception)

This commit is contained in:
blushingpenguin 2021-06-05 22:24:04 +01:00
parent 729dbee534
commit b40f283499
4 changed files with 127 additions and 61 deletions

View File

@ -383,14 +383,14 @@ class DownloadQueueWorker(object):
if not self.continue_check_callback(self):
return
try:
task = self.queue.get_next()
logger.info('%s is processing: %s', self, task)
task.run()
task.recycle()
except StopIteration as e:
task = self.queue.get_next()
if not task:
logger.info('No more tasks for %s to carry out.', self)
break
logger.info('%s is processing: %s', self, task)
task.run()
task.recycle()
self.exit_callback(self)
@ -461,7 +461,6 @@ class DownloadQueueManager(object):
def queue_task(self, task):
"""Marks a task as queued
"""
task.status = DownloadTask.QUEUED
self.__spawn_threads()

View File

@ -34,17 +34,65 @@ from gpodder import download, util
_ = gpodder.gettext
class TaskQueue:
def __init__(self):
self.lock = threading.Lock()
self.tasks = list()
class DownloadStatusModel(Gtk.ListStore):
def __len__(self):
with self.lock:
return len(self.tasks)
def is_empty(self):
return len(self) > 0
def add_task(self, task):
with self.lock:
self.tasks.append(task)
def remove_task(self, task):
with self.lock:
try:
self.tasks.remove(task)
return True
except ValueError:
# already dequeued
return False
def pop(self):
with self.lock:
if len(self.tasks) == 0:
return None
task = self.tasks.pop(0)
return task
def move_after(self, task, after):
with self.lock:
try:
index = self.tasks.index(after)
self.tasks.remove(task)
self.tasks.insert(index + 1, task)
except ValueError:
pass
def move_before(self, task, before):
with self.lock:
try:
index = self.tasks.index(before)
self.tasks.remove(task)
self.tasks.insert(index, task)
except ValueError:
pass
class DownloadStatusModel:
# Symbolic names for our columns, so we know what we're up to
C_TASK, C_NAME, C_URL, C_PROGRESS, C_PROGRESS_TEXT, C_ICON_NAME = list(range(6))
SEARCH_COLUMNS = (C_NAME, C_URL)
def __init__(self):
Gtk.ListStore.__init__(self, object, str, str, int, str, str)
self.set_downloading_access = threading.RLock()
self.list = Gtk.ListStore(object, str, str, int, str, str)
self.work_queue = TaskQueue()
# Set up stock icon IDs for tasks
self._status_ids = collections.defaultdict(lambda: None)
@ -54,6 +102,9 @@ class DownloadStatusModel(Gtk.ListStore):
self._status_ids[download.DownloadTask.CANCELLED] = 'media-playback-stop'
self._status_ids[download.DownloadTask.PAUSED] = 'media-playback-pause'
def get_model(self):
return self.list
def _format_message(self, episode, message, podcast):
episode = html.escape(episode)
podcast = html.escape(podcast)
@ -63,10 +114,10 @@ class DownloadStatusModel(Gtk.ListStore):
def request_update(self, iter, task=None):
if task is None:
# Ongoing update request from UI - get task from model
task = self.get_value(iter, self.C_TASK)
task = self.list.get_value(iter, self.C_TASK)
else:
# Initial update request - update non-changing fields
self.set(iter,
self.list.set(iter,
self.C_TASK, task,
self.C_URL, task.url)
@ -100,7 +151,7 @@ class DownloadStatusModel(Gtk.ListStore):
else:
progress_message = ('unknown size')
self.set(iter,
self.list.set(iter,
self.C_NAME, self._format_message(task.episode.title,
status_message, task.episode.channel.title),
self.C_PROGRESS, 100. * task.progress,
@ -108,14 +159,15 @@ class DownloadStatusModel(Gtk.ListStore):
self.C_ICON_NAME, self._status_ids[task.status])
def __add_new_task(self, task):
iter = self.append()
iter = self.list.append()
self.request_update(iter, task)
def register_task(self, task):
self.work_queue.add_task(task)
util.idle_add(self.__add_new_task, task)
def tell_all_tasks_to_quit(self):
for row in self:
for row in self.list:
task = row[DownloadStatusModel.C_TASK]
if task is not None:
# Pause currently-running (and queued) downloads
@ -131,7 +183,7 @@ class DownloadStatusModel(Gtk.ListStore):
Returns True if there are any downloads in the
QUEUED or DOWNLOADING status, False otherwise.
"""
for row in self:
for row in self.list:
task = row[DownloadStatusModel.C_TASK]
if task is not None and \
task.status in (task.DOWNLOADING,
@ -140,31 +192,36 @@ class DownloadStatusModel(Gtk.ListStore):
return False
def move_after(self, iter, position):
self.list.move_after(iter, position)
iter_task = self.list.get_value(iter, DownloadStatusModel.C_TASK)
pos_task = self.list.get_value(position, DownloadStatusModel.C_TASK)
self.work_queue.move_after(iter_task, pos_task)
def move_before(self, iter, position):
self.list.move_before(iter, position)
iter_task = self.list.get_value(iter, DownloadStatusModel.C_TASK)
pos_task = self.list.get_value(position, DownloadStatusModel.C_TASK)
self.work_queue.move_before(iter_task, pos_task)
def has_work(self):
return any(self._work_gen())
return len(self.work_queue) > 0
def available_work_count(self):
return len(list(self._work_gen()))
return len(self.work_queue)
def get_next(self):
with self.set_downloading_access:
result = next(self._work_gen())
self.set_downloading(result)
return result
def _work_gen(self):
return (task for task in
(row[DownloadStatusModel.C_TASK] for row in self)
if task.status == task.QUEUED)
task = self.work_queue.pop()
if task:
task.status = task.DOWNLOADING
return task
def set_downloading(self, task):
with self.set_downloading_access:
if task.status is task.DOWNLOADING:
# Task was already set as DOWNLOADING by get_next
return False
task.status = task.DOWNLOADING
return True
if not self.work_queue.remove_task(task):
# Task was already dequeued get_next
return False
task.status = task.DOWNLOADING
return True
class DownloadTaskMonitor(object):
"""A helper class that abstracts download events"""

View File

@ -196,6 +196,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
self.download_tasks_seen = set()
self.download_list_update_enabled = False
self.things_adding_tasks = 0
self.download_task_monitors = set()
# Set up the first instance of MygPoClient
@ -1042,7 +1043,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
column.set_property('min-width', 150)
column.set_property('max-width', 150)
self.treeDownloads.set_model(self.download_status_model)
self.treeDownloads.set_model(self.download_status_model.get_model())
TreeViewHelper.set(self.treeDownloads, TreeViewHelper.ROLE_DOWNLOADS)
self.treeDownloads.connect('popup-menu', self.treeview_downloads_show_context_menu)
@ -1080,14 +1081,18 @@ class gPodder(BuilderWidget, dbus.service.Object):
draw_text_box_centered(ctx, treeview, width, height, text, None, None)
return True
def enable_download_list_update(self):
def set_download_list_state(self, state):
if state == gPodderSyncUI.DL_ADDING_TASKS:
self.things_adding_tasks += 1
elif state == gPodderSyncUI.DL_ADDED_TASKS:
self.things_adding_tasks -= 1
if not self.download_list_update_enabled:
self.update_downloads_list()
GObject.timeout_add(1500, self.update_downloads_list)
self.download_list_update_enabled = True
def cleanup_downloads(self):
model = self.download_status_model
model = self.download_status_model.get_model()
all_tasks = [(Gtk.TreeRowReference.new(model, row.path), row[0]) for row in model]
changed_episode_urls = set()
@ -1122,7 +1127,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
model = self.download_status_model
if model is None:
model = ()
for row in model:
for row in model.get_model():
task = row[self.download_status_model.C_TASK]
monitor.task_updated(task)
@ -1134,7 +1139,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
def update_downloads_list(self, can_call_cleanup=True):
try:
model = self.download_status_model
model = self.download_status_model.get_model()
downloading, synchronizing, failed, finished, queued, paused, others = 0, 0, 0, 0, 0, 0, 0
total_speed, total_size, done_size = 0, 0, 0
@ -1151,6 +1156,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
task = row[self.download_status_model.C_TASK]
speed, size, status, progress, activity = task.speed, task.total_size, task.status, task.progress, task.activity
logger.info("%s: %f", task.episode.title, progress)
# Let the download task monitors know of changes
for monitor in self.download_task_monitors:
@ -1224,7 +1230,8 @@ class gPodder(BuilderWidget, dbus.service.Object):
title.append(N_('%(queued)d task queued',
'%(queued)d tasks queued',
queued) % {'queued': queued})
if (downloading + synchronizing + queued) == 0:
if ((downloading + synchronizing + queued) == 0 and
self.things_adding_tasks == 0):
self.set_download_progress(1.)
self.downloads_finished(self.download_tasks_seen)
gpodder.user_extensions.on_all_episodes_downloaded()
@ -1539,7 +1546,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
self.download_queue_manager.force_start_task(task)
else:
self.download_queue_manager.queue_task(task)
self.enable_download_list_update()
self.set_download_list_state(gPodderSyncUI.DL_ADDED_ONE)
elif status == download.DownloadTask.CANCELLED:
# Cancelling a download allowed when downloading/queued
if task.status in (task.QUEUED, task.DOWNLOADING):
@ -1605,10 +1612,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
index_above = path[0] - 1
if index_above < 0:
return
task = model.get_value(
model.get_iter(path),
DownloadStatusModel.C_TASK)
model.move_before(
self.download_status_model.move_before(
model.get_iter(path),
model.get_iter((index_above,)))
@ -1619,10 +1623,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
index_below = path[0] + 1
if index_below >= len(model):
return
task = model.get_value(
model.get_iter(path),
DownloadStatusModel.C_TASK)
model.move_after(
self.download_status_model.move_after(
model.get_iter(path),
model.get_iter((index_below,)))
@ -3052,7 +3053,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
else:
self.download_queue_manager.queue_task(task)
if tasks or queued_existing_task:
self.enable_download_list_update()
self.set_download_list_state(gPodderSyncUI.DL_ONEOFF)
# Flush updated episode status
if self.mygpo_client.can_access_webservice():
self.mygpo_client.flush()
@ -3583,7 +3584,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
task.status = task.PAUSED
elif task.status in (task.CANCELLED, task.PAUSED, task.FAILED):
self.download_queue_manager.queue_task(task)
self.enable_download_list_update()
self.set_download_list_state(gPodderSyncUI.DL_ONEOFF)
elif task.status == task.DONE:
model.remove(model.get_iter(tree_row_reference.get_path()))
@ -3723,14 +3724,14 @@ class gPodder(BuilderWidget, dbus.service.Object):
self.channels,
self.download_status_model,
self.download_queue_manager,
self.enable_download_list_update,
self.set_download_list_state,
self.commit_changes_to_database,
self.delete_episode_list,
gPodderEpisodeSelector,
self.mount_volume_for_file)
self.sync_ui.on_synchronize_episodes(self.channels, episodes, force_played,
self.enable_download_list_update)
self.sync_ui.on_synchronize_episodes(self.channels, episodes, force_played)
# self.set_download_list_state)
def on_extension_enabled(self, extension):
if getattr(extension, 'on_ui_object_available', None) is not None:

View File

@ -35,13 +35,16 @@ logger = logging.getLogger(__name__)
class gPodderSyncUI(object):
# download list states
(DL_ONEOFF, DL_ADDING_TASKS, DL_ADDED_TASKS) = list(range(3))
def __init__(self, config, notification, parent_window,
show_confirmation,
show_preferences,
channels,
download_status_model,
download_queue_manager,
enable_download_list_update,
set_download_list_state,
commit_changes_to_database,
delete_episode_list,
select_episodes_to_delete,
@ -57,7 +60,7 @@ class gPodderSyncUI(object):
self.channels = channels
self.download_status_model = download_status_model
self.download_queue_manager = download_queue_manager
self.enable_download_list_update = enable_download_list_update
self.set_download_list_state = set_download_list_state
self.commit_changes_to_database = commit_changes_to_database
self.delete_episode_list = delete_episode_list
self.select_episodes_to_delete = select_episodes_to_delete
@ -146,7 +149,7 @@ class gPodderSyncUI(object):
return
# enable updating of UI
self.enable_download_list_update()
self.set_download_list_state(gPodderSyncUI.DL_ONEOFF)
"""Update device playlists
General approach is as follows:
@ -196,20 +199,26 @@ class gPodderSyncUI(object):
episodes_for_playlist = [ep for ep in episodes_for_playlist if ep.is_new]
playlist.write_m3u(episodes_for_playlist)
# enable updating of UI
self.enable_download_list_update()
# enable updating of UI, but mark it as tasks being added so that a
# adding a single task that completes immediately doesn't turn off the
# ui updates again
self.set_download_list_state(gPodderSyncUI.DL_ADDING_TASKS)
if (self._config.device_sync.device_type == 'filesystem' and self._config.device_sync.playlists.create):
title = _('Update successful')
message = _('The playlist on your MP3 player has been updated.')
self.notification(message, title)
# called from the main thread to complete adding tasks_
def add_downloads_complete():
self.set_download_list_state(gPodderSyncUI.DL_ADDED_TASKS)
# Finally start the synchronization process
@util.run_in_background
def sync_thread_func():
device.add_sync_tasks(episodes, force_played=force_played,
done_callback=done_callback)
util.idle_add(add_downloads_complete)
return
if self._config.device_sync.playlists.create: