Merge pull request #1149 from blushingpenguin/queue

#1117, #1137: fixes for pause/resume/cancelling download tasks
This commit is contained in:
auouymous 2021-09-08 16:23:18 -07:00 committed by GitHub
commit 3d966b95b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 232 additions and 158 deletions

View File

@ -460,6 +460,7 @@ class DownloadQueueManager(object):
def queue_task(self, task):
"""Marks a task as queued
"""
self.tasks.queue_task(task)
self.__spawn_threads()
@ -469,7 +470,7 @@ class DownloadTask(object):
You can create a new download task like this:
task = DownloadTask(episode, gpodder.config.Config(CONFIGFILE))
task.status = DownloadTask.DOWNLOADING
task.status = DownloadTask.QUEUED
task.run()
While the download is in progress, you can access its properties:
@ -486,7 +487,8 @@ class DownloadTask(object):
You can cancel a running download task by setting its status:
task.status = DownloadTask.CANCELLED
with task:
task.status = DownloadTask.CANCELLING
The task will then abort as soon as possible (due to the nature
of downloading data, this can take a while when the Internet is
@ -537,9 +539,9 @@ class DownloadTask(object):
The same thing works for failed downloads ("notify_as_failed()").
"""
# Possible states this download task can be in
STATUS_MESSAGE = (_('Queued'), _('Downloading'),
_('Finished'), _('Failed'), _('Cancelled'), _('Paused'))
(QUEUED, DOWNLOADING, DONE, FAILED, CANCELLED, PAUSED) = list(range(6))
STATUS_MESSAGE = (_('Queued'), _('Queued'), _('Downloading'),
_('Finished'), _('Failed'), _('Cancelling'), _('Cancelled'), _('Pausing'), _('Paused'))
(NEW, QUEUED, DOWNLOADING, DONE, FAILED, CANCELLING, CANCELLED, PAUSING, PAUSED) = list(range(9))
# Wheter this task represents a file download or a device sync operation
ACTIVITY_DOWNLOAD, ACTIVITY_SYNCHRONIZE = list(range(2))
@ -550,6 +552,12 @@ class DownloadTask(object):
def __str__(self):
return self.__episode.title
def __enter__(self):
return self.__lock.acquire()
def __exit__(self, type, value, traceback):
self.__lock.release()
def __get_status(self):
return self.__status
@ -602,8 +610,9 @@ class DownloadTask(object):
downloader = property(fget=__get_downloader, fset=__set_downloader)
def cancel(self):
if self.status in (self.DOWNLOADING, self.QUEUED):
self.status = self.CANCELLED
with self:
if self.status in (self.DOWNLOADING, self.QUEUED):
self.status = self.CANCELLING
def removed_from_list(self):
if self.status != self.DONE:
@ -611,7 +620,8 @@ class DownloadTask(object):
def __init__(self, episode, config, downloader=None):
assert episode.download_task is None
self.__status = DownloadTask.QUEUED
self.__lock = threading.RLock()
self.__status = DownloadTask.NEW
self.__activity = DownloadTask.ACTIVITY_DOWNLOAD
self.__status_changed = True
self.__episode = episode
@ -701,10 +711,10 @@ class DownloadTask(object):
self.calculate_speed(count, blockSize)
if self.status == DownloadTask.CANCELLED:
if self.status == DownloadTask.CANCELLING:
raise DownloadCancelledException()
if self.status == DownloadTask.PAUSED:
if self.status == DownloadTask.PAUSING:
raise DownloadCancelledException()
def calculate_speed(self, count, blockSize):
@ -755,29 +765,36 @@ class DownloadTask(object):
self.__start_time = 0
self.__start_blocks = 0
# If the download has already been cancelled, skip it
if self.status == DownloadTask.CANCELLED:
util.delete_file(self.tempname)
self.progress = 0.0
self.speed = 0.0
self.recycle()
return False
# If the download has already been cancelled/paused, skip it
with self:
if self.status == DownloadTask.CANCELLING:
self.status = DownloadTask.CANCELLED
util.delete_file(self.tempname)
self.progress = 0.0
self.speed = 0.0
self.recycle()
return False
# We only start this download if its status is "downloading"
if self.status != DownloadTask.DOWNLOADING:
return False
if self.status == DownloadTask.PAUSING:
self.status = DownloadTask.PAUSED
return False
# We are downloading this file right now
self.status = DownloadTask.DOWNLOADING
self._notification_shown = False
# We only start this download if its status is queued
if self.status != DownloadTask.QUEUED:
return False
# Restore a reference to this task in the episode
# when running a recycled task following a pause or failed
# see #649
if not self.episode.download_task:
self.episode.download_task = self
# We are downloading this file right now
self.status = DownloadTask.DOWNLOADING
self._notification_shown = False
# Restore a reference to this task in the episode
# when running a recycled task following a pause or failed
# see #649
if not self.episode.download_task:
self.episode.download_task = self
url = self.__episode.url
result = DownloadTask.DOWNLOADING
try:
if url == '':
raise DownloadNoURLException()
@ -854,19 +871,20 @@ class DownloadTask(object):
self.__episode.on_downloaded(self.filename)
except DownloadCancelledException:
logger.info('Download has been cancelled/paused: %s', self)
if self.status == DownloadTask.CANCELLED:
if self.status == DownloadTask.CANCELLING:
util.delete_file(self.tempname)
self.progress = 0.0
self.speed = 0.0
result = DownloadTask.CANCELLED
except DownloadNoURLException:
self.status = DownloadTask.FAILED
result = DownloadTask.FAILED
self.error_message = _('Episode has no URL to download')
except urllib.error.ContentTooShortError as ctse:
self.status = DownloadTask.FAILED
result = DownloadTask.FAILED
self.error_message = _('Missing content from server')
except ConnectionError as ce:
# special case request exception
self.status = DownloadTask.FAILED
result = DownloadTask.FAILED
logger.error('Download failed: %s', str(ce), exc_info=True)
d = {'host': ce.args[0].pool.host, 'port': ce.args[0].pool.port}
self.error_message = _("Couldn't connect to server %(host)s:%(port)s" % d)
@ -876,45 +894,54 @@ class DownloadTask(object):
re = re.args[0]
logger.error('%s while downloading "%s"', str(re),
self.__episode.title, exc_info=True)
self.status = DownloadTask.FAILED
result = DownloadTask.FAILED
d = {'error': str(re)}
self.error_message = _('Request Error: %(error)s') % d
except IOError as ioe:
logger.error('%s while downloading "%s": %s', ioe.strerror,
self.__episode.title, ioe.filename, exc_info=True)
self.status = DownloadTask.FAILED
result = DownloadTask.FAILED
d = {'error': ioe.strerror, 'filename': ioe.filename}
self.error_message = _('I/O Error: %(error)s: %(filename)s') % d
except gPodderDownloadHTTPError as gdhe:
logger.error('HTTP %s while downloading "%s": %s',
gdhe.error_code, self.__episode.title, gdhe.error_message,
exc_info=True)
self.status = DownloadTask.FAILED
result = DownloadTask.FAILED
d = {'code': gdhe.error_code, 'message': gdhe.error_message}
self.error_message = _('HTTP Error %(code)s: %(message)s') % d
except Exception as e:
self.status = DownloadTask.FAILED
result = DownloadTask.FAILED
logger.error('Download failed: %s', str(e), exc_info=True)
self.error_message = _('Error: %s') % (str(e),)
if self.status == DownloadTask.FAILED:
self.__episode._download_error = self.error_message
with self:
if result == DownloadTask.FAILED:
self.status = DownloadTask.FAILED
self.__episode._download_error = self.error_message
# Delete empty partial files, they prevent streaming after a download failure (live stream)
if util.calculate_size(self.filename) == 0:
util.delete_file(self.tempname)
# Delete empty partial files, they prevent streaming after a download failure (live stream)
if util.calculate_size(self.filename) == 0:
util.delete_file(self.tempname)
if self.status == DownloadTask.DOWNLOADING:
# Everything went well - we're done
self.status = DownloadTask.DONE
if self.total_size <= 0:
self.total_size = util.calculate_size(self.filename)
logger.info('Total size updated to %d', self.total_size)
self.progress = 1.0
gpodder.user_extensions.on_episode_downloaded(self.__episode)
return True
if result == DownloadTask.DOWNLOADING:
# Everything went well - we're done (even if the task was cancelled/paused,
# since it's finished we might as well mark it done)
self.status = DownloadTask.DONE
if self.total_size <= 0:
self.total_size = util.calculate_size(self.filename)
logger.info('Total size updated to %d', self.total_size)
self.progress = 1.0
gpodder.user_extensions.on_episode_downloaded(self.__episode)
return True
self.speed = 0.0
self.speed = 0.0
# cancelled -- update state to mark it as safe to manipulate this task again
if self.status == DownloadTask.PAUSING:
self.status = DownloadTask.PAUSED
elif self.status == DownloadTask.CANCELLING:
self.status = DownloadTask.CANCELLED
# We finished, but not successfully (at least not really)
return False

View File

@ -46,7 +46,8 @@ class TaskQueue:
def add_task(self, task):
with self.lock:
self.tasks.append(task)
if task not in self.tasks:
self.tasks.append(task)
def remove_task(self, task):
with self.lock:
@ -162,20 +163,27 @@ class DownloadStatusModel:
self.request_update(iter, task)
def register_task(self, task):
self.work_queue.add_task(task)
# self.work_queue.add_task(task)
util.idle_add(self.__add_new_task, task)
def queue_task(self, task):
with task:
if task.status in (task.NEW, task.FAILED, task.CANCELLED, task.PAUSED):
task.status = task.QUEUED
self.work_queue.add_task(task)
def tell_all_tasks_to_quit(self):
for row in self.list:
task = row[DownloadStatusModel.C_TASK]
if task is not None:
# Pause currently-running (and queued) downloads
if task.status in (task.QUEUED, task.DOWNLOADING):
task.status = task.PAUSED
with task:
# Pause currently-running (and queued) downloads
if task.status in (task.QUEUED, task.DOWNLOADING):
task.status = task.PAUSING
# Delete cancelled and failed downloads
if task.status in (task.CANCELLED, task.FAILED):
task.removed_from_list()
# Delete cancelled and failed downloads
if task.status in (task.CANCELLED, task.FAILED):
task.removed_from_list()
def are_downloads_in_progress(self):
"""
@ -210,17 +218,11 @@ class DownloadStatusModel:
return len(self.work_queue)
def get_next(self):
task = self.work_queue.pop()
if task:
task.status = task.DOWNLOADING
return task
return self.work_queue.pop()
def set_downloading(self, task):
if not self.work_queue.remove_task(task):
# Task was already dequeued get_next
return False
task.status = task.DOWNLOADING
return True
# return False if Task was already dequeued by get_next
return self.work_queue.remove_task(task)
class DownloadTaskMonitor(object):

View File

@ -1188,7 +1188,9 @@ class gPodder(BuilderWidget, dbus.service.Object):
download_tasks_seen.add(task)
if (status == download.DownloadTask.DOWNLOADING and
if (status in [download.DownloadTask.DOWNLOADING,
download.DownloadTask.CANCELLING,
download.DownloadTask.PAUSING] and
activity == download.DownloadTask.ACTIVITY_DOWNLOAD):
downloading += 1
total_speed += speed
@ -1559,46 +1561,58 @@ class gPodder(BuilderWidget, dbus.service.Object):
episode_urls = set()
model = self.treeDownloads.get_model()
for row_reference, task in tasks:
if status == download.DownloadTask.QUEUED:
# Only queue task when its paused/failed/cancelled (or forced)
if task.status in (task.PAUSED, task.FAILED, task.CANCELLED) or force_start:
if force_start:
self.download_queue_manager.force_start_task(task)
else:
self.download_queue_manager.queue_task(task)
self.set_download_list_state(gPodderSyncUI.DL_ADDED_ONE)
elif status == download.DownloadTask.CANCELLED:
# Cancelling a download allowed when downloading/queued
if task.status in (task.QUEUED, task.DOWNLOADING):
with task:
if status == download.DownloadTask.QUEUED:
# Only queue task when its paused/failed/cancelled (or forced)
if task.status in (download.DownloadTask.PAUSED,
download.DownloadTask.FAILED,
download.DownloadTask.CANCELLED) or force_start:
if force_start:
self.download_queue_manager.force_start_task(task)
else:
self.download_queue_manager.queue_task(task)
self.set_download_list_state(gPodderSyncUI.DL_ONEOFF)
elif status == download.DownloadTask.CANCELLING:
logger.info(("cancelling task %s" % task.status))
# Cancelling a download allowed when downloading/queued
if task.status in (download.DownloadTask.QUEUED, download.DownloadTask.DOWNLOADING):
task.status = status
# Cancelling paused/failed downloads requires a call to .run()
elif task.status in (download.DownloadTask.PAUSED, download.DownloadTask.FAILED):
task.status = status
# Call run, so the partial file gets deleted
task.run()
task.recycle()
elif status == download.DownloadTask.PAUSING:
# Pausing a download only when queued/downloading
if task.status in (download.DownloadTask.QUEUED, download.DownloadTask.DOWNLOADING):
task.status = status
elif status is None:
# Remove the selected task - cancel downloading/queued tasks
if task.status in (download.DownloadTask.QUEUED, download.DownloadTask.DOWNLOADING):
task.status = download.DownloadTask.CANCELLED
path = row_reference.get_path()
# path isn't set if the item has already been removed from the list
# (to trigger this cancel one downloads in the active list, cancel all
# other downloads, quickly right click on the cancelled on one to get
# the context menu, wait until the active list is cleared, and then
# then choose remove from list)
if path:
model.remove(model.get_iter(path))
# Remember the URL, so we can tell the UI to update
try:
# We don't "see" this task anymore - remove it;
# this is needed, so update_episode_list_icons()
# below gets the correct list of "seen" tasks
self.download_tasks_seen.remove(task)
except KeyError as key_error:
pass
episode_urls.add(task.url)
# Tell the task that it has been removed (so it can clean up)
task.removed_from_list()
else:
# We can (hopefully) simply set the task status here
task.status = status
# Cancelling paused/failed downloads requires a call to .run()
elif task.status in (task.PAUSED, task.FAILED):
task.status = status
# Call run, so the partial file gets deleted
task.run()
elif status == download.DownloadTask.PAUSED:
# Pausing a download only when queued/downloading
if task.status in (task.DOWNLOADING, task.QUEUED):
task.status = status
elif status is None:
# Remove the selected task - cancel downloading/queued tasks
if task.status in (task.QUEUED, task.DOWNLOADING):
task.status = task.CANCELLED
model.remove(model.get_iter(row_reference.get_path()))
# Remember the URL, so we can tell the UI to update
try:
# We don't "see" this task anymore - remove it;
# this is needed, so update_episode_list_icons()
# below gets the correct list of "seen" tasks
self.download_tasks_seen.remove(task)
except KeyError as key_error:
pass
episode_urls.add(task.url)
# Tell the task that it has been removed (so it can clean up)
task.removed_from_list()
else:
# We can (hopefully) simply set the task status here
task.status = status
# Tell the podcasts tab to update icons for our removed podcasts
self.update_episode_list_icons(episode_urls)
# Update the tab title and downloads list
@ -1662,11 +1676,11 @@ class gPodder(BuilderWidget, dbus.service.Object):
menu.append(make_menu_item(_('Cancel'), 'media-playback-stop',
selected_tasks,
download.DownloadTask.CANCELLED,
download.DownloadTask.CANCELLING,
can_cancel))
menu.append(make_menu_item(_('Pause'), 'media-playback-pause',
selected_tasks,
download.DownloadTask.PAUSED, can_pause))
download.DownloadTask.PAUSING, can_pause))
menu.append(Gtk.SeparatorMenuItem())
menu.append(make_menu_item(_('Move up'), 'go-up',
action=move_selected_items_up))
@ -3063,14 +3077,15 @@ class gPodder(BuilderWidget, dbus.service.Object):
def download_episode_list(self, episodes, add_paused=False, force_start=False, downloader=None):
def queue_tasks(tasks, queued_existing_task):
for task in tasks:
if add_paused:
task.status = task.PAUSED
else:
self.mygpo_client.on_download([task.episode])
if force_start:
self.download_queue_manager.force_start_task(task)
with task:
if add_paused:
task.status = task.PAUSED
else:
self.download_queue_manager.queue_task(task)
self.mygpo_client.on_download([task.episode])
if force_start:
self.download_queue_manager.force_start_task(task)
else:
self.download_queue_manager.queue_task(task)
if tasks or queued_existing_task:
self.set_download_list_state(gPodderSyncUI.DL_ONEOFF)
# Flush updated episode status
@ -3128,14 +3143,16 @@ class gPodder(BuilderWidget, dbus.service.Object):
return
for task in tasks:
if task.status in (task.QUEUED, task.DOWNLOADING):
task.status = task.CANCELLED
elif task.status == task.PAUSED:
task.status = task.CANCELLED
# Call run, so the partial file gets deleted
task.run()
elif force:
task.status = task.CANCELLED
with task:
if task.status in (task.NEW, task.QUEUED, task.DOWNLOADING):
task.status = task.CANCELLING
elif task.status == task.PAUSED:
task.status = task.CANCELLED
# Call run, so the partial file gets deleted
task.run()
task.recycle()
elif force:
task.status = task.CANCELLED
self.update_episode_list_icons([task.url for task in tasks])
self.play_or_download()
@ -3623,13 +3640,14 @@ class gPodder(BuilderWidget, dbus.service.Object):
selected_tasks = [(Gtk.TreeRowReference.new(model, path), model.get_value(model.get_iter(path), 0)) for path in paths]
for tree_row_reference, task in selected_tasks:
if task.status in (task.DOWNLOADING, task.QUEUED):
task.status = task.PAUSED
elif task.status in (task.CANCELLED, task.PAUSED, task.FAILED):
self.download_queue_manager.queue_task(task)
self.set_download_list_state(gPodderSyncUI.DL_ONEOFF)
elif task.status == task.DONE:
model.remove(model.get_iter(tree_row_reference.get_path()))
with task:
if task.status in (task.DOWNLOADING, task.QUEUED):
task.status = task.PAUSED
elif task.status in (task.CANCELLED, task.PAUSED, task.FAILED):
self.download_queue_manager.queue_task(task)
self.set_download_list_state(gPodderSyncUI.DL_ONEOFF)
elif task.status == task.DONE:
model.remove(model.get_iter(tree_row_reference.get_path()))
self.play_or_download()

View File

@ -450,7 +450,7 @@ class PodcastEpisode(PodcastModelObject):
if task is None:
return False
return task.status in (task.DOWNLOADING, task.QUEUED, task.PAUSED)
return task.status in (task.DOWNLOADING, task.QUEUED, task.PAUSING, task.PAUSED, task.CANCELLING)
def check_is_new(self):
return (self.state == gpodder.STATE_NORMAL and self.is_new and

View File

@ -27,7 +27,10 @@ import calendar
import glob
import logging
import os.path
import threading
import time
from enum import Enum
from re import S
from urllib.parse import urlparse
import gpodder
@ -227,7 +230,7 @@ class Device(services.ObservableService):
# XXX: need to check if track is added properly?
sync_task = SyncTask(track)
sync_task.status = sync_task.QUEUED
sync_task.status = sync_task.NEW
sync_task.device = self
# New Task, we must wait on the GTK Loop
self.download_status_model.register_task(sync_task)
@ -694,9 +697,9 @@ class SyncTask(download.DownloadTask):
# An object representing the synchronization task of an episode
# Possible states this sync task can be in
STATUS_MESSAGE = (_('Queued'), _('Synchronizing'),
_('Finished'), _('Failed'), _('Cancelled'), _('Paused'))
(QUEUED, DOWNLOADING, DONE, FAILED, CANCELLED, PAUSED) = list(range(6))
STATUS_MESSAGE = (_('Queued'), _('Queued'), _('Downloading'),
_('Finished'), _('Failed'), _('Cancelling'), _('Cancelled'), _('Pausing'), _('Paused'))
(NEW, QUEUED, DOWNLOADING, DONE, FAILED, CANCELLING, CANCELLED, PAUSING, PAUSED) = list(range(9))
def __str__(self):
return self.__episode.title
@ -748,15 +751,17 @@ class SyncTask(download.DownloadTask):
episode = property(fget=__get_episode)
def cancel(self):
if self.status in (self.DOWNLOADING, self.QUEUED):
self.status = self.CANCELLED
with self:
if self.status in (self.DOWNLOADING, self.QUEUED):
self.status = self.CANCELLED
def removed_from_list(self):
# XXX: Should we delete temporary/incomplete files here?
pass
def __init__(self, episode):
self.__status = SyncTask.QUEUED
self.__lock = threading.RLock()
self.__status = SyncTask.NEW
self.__activity = SyncTask.ACTIVITY_SYNCHRONIZE
self.__status_changed = True
self.__episode = episode
@ -782,6 +787,12 @@ class SyncTask(download.DownloadTask):
# Callbacks
self._progress_updated = lambda x: None
def __enter__(self):
return self.__lock.acquire()
def __exit__(self, type, value, traceback):
self.__lock.release()
def notify_as_finished(self):
if self.status == SyncTask.DONE:
if self._notification_shown:
@ -815,10 +826,10 @@ class SyncTask(download.DownloadTask):
self.progress = max(0.0, min(1.0, (count * blockSize) / self.total_size))
self._progress_updated(self.progress)
if self.status == SyncTask.CANCELLED:
if self.status == SyncTask.CANCELLING:
raise SyncCancelledException()
if self.status == SyncTask.PAUSED:
if self.status == SyncTask.PAUSING:
raise SyncCancelledException()
def recycle(self):
@ -829,38 +840,54 @@ class SyncTask(download.DownloadTask):
self.__start_time = 0
self.__start_blocks = 0
# If the download has already been cancelled, skip it
if self.status == SyncTask.CANCELLED:
# If the download has already been cancelled/paused, skip it
if self.status == SyncTask.CANCELLING:
util.delete_file(self.tempname)
self.progress = 0.0
self.speed = 0.0
self.status = SyncTask.CANCELLED
return False
# We only start this download if its status is "downloading"
if self.status != SyncTask.DOWNLOADING:
if self.status == SyncTask.PAUSING:
self.status = SyncTask.PAUSED
return False
# We are synching this file right now
self.status = SyncTask.DOWNLOADING
with self:
# We only start this download if its status is "queued"
if self.status != SyncTask.QUEUED:
return False
# We are synching this file right now
self.status = SyncTask.DOWNLOADING
self._notification_shown = False
sync_result = SyncTask.DONE
try:
logger.info('Starting SyncTask')
self.device.add_track(self.episode, reporthook=self.status_updated)
except SyncCancelledException as e:
sync_result = SyncTask.CANCELLED
except Exception as e:
self.status = SyncTask.FAILED
sync_result = SyncTask.FAILED
logger.error('Sync failed: %s', str(e), exc_info=True)
self.error_message = _('Error: %s') % (str(e),)
if self.status == SyncTask.DOWNLOADING:
# Everything went well - we're done
self.status = SyncTask.DONE
if self.total_size <= 0:
self.total_size = util.calculate_size(self.filename)
logger.info('Total size updated to %d', self.total_size)
self.progress = 1.0
gpodder.user_extensions.on_episode_synced(self.device, self.__episode)
return True
with self:
if sync_result == SyncTask.CANCELLED:
if self.status == SyncTask.CANCELLING:
self.status = SyncTask.CANCELLED
else:
self.status = SyncTask.PAUSED
elif sync_result == SyncTask.DONE:
# Everything went well - we're done
self.status = SyncTask.DONE
if self.total_size <= 0:
self.total_size = util.calculate_size(self.filename)
logger.info('Total size updated to %d', self.total_size)
self.progress = 1.0
gpodder.user_extensions.on_episode_synced(self.device, self.__episode)
return True
self.speed = 0.0