* #1152: fix re-ordering downloads list * fix downloading a task that's already been removed from the download list * fix download/delete/download of the same episode * fix pausing/cancelling sync tasks; clean up partially synchronised files * fix cancelling a failed download * make failed/paused/cancelled mutually exclusive so the code has less paths (and hopefully is easier to follow)
This commit is contained in:
parent
032161c002
commit
ed5d18e1b0
2
bin/gpo
2
bin/gpo
|
@ -623,6 +623,7 @@ class gPodderCli(object):
|
|||
task.add_progress_callback(self._update_action)
|
||||
task.status = download.DownloadTask.DOWNLOADING
|
||||
task.run()
|
||||
task.recycle()
|
||||
|
||||
def _download_episodes(self, episodes):
|
||||
if self._config.downloads.chronological_order:
|
||||
|
@ -954,6 +955,7 @@ class gPodderCli(object):
|
|||
task.status = sync.SyncTask.DOWNLOADING
|
||||
task.add_progress_callback(progress_updated)
|
||||
task.run()
|
||||
task.recycle()
|
||||
|
||||
done_lock = threading.Lock()
|
||||
self.mygpo_client = my.MygPoClient(self._config)
|
||||
|
|
|
@ -453,9 +453,11 @@ class DownloadQueueManager(object):
|
|||
self.__spawn_threads()
|
||||
|
||||
def force_start_task(self, task):
|
||||
if self.tasks.set_downloading(task):
|
||||
worker = ForceDownloadWorker(task)
|
||||
util.run_in_background(worker.run)
|
||||
with task:
|
||||
if task.status in (task.QUEUED, task.PAUSED, task.CANCELLED, task.FAILED):
|
||||
task.status = task.DOWNLOADING
|
||||
worker = ForceDownloadWorker(task)
|
||||
util.run_in_background(worker.run)
|
||||
|
||||
def queue_task(self, task):
|
||||
"""Marks a task as queued
|
||||
|
@ -609,9 +611,25 @@ class DownloadTask(object):
|
|||
|
||||
downloader = property(fget=__get_downloader, fset=__set_downloader)
|
||||
|
||||
def pause(self):
|
||||
with self:
|
||||
# Pause a queued download
|
||||
if self.status == self.QUEUED:
|
||||
self.status = self.PAUSED
|
||||
# Request pause of a running download
|
||||
elif self.status == self.DOWNLOADING:
|
||||
self.status = self.PAUSING
|
||||
|
||||
def cancel(self):
|
||||
with self:
|
||||
if self.status in (self.DOWNLOADING, self.QUEUED):
|
||||
# Cancelling directly is allowed if the task isn't currently downloading
|
||||
if self.status in (self.QUEUED, self.PAUSED, self.FAILED):
|
||||
self.status = self.CANCELLED
|
||||
# Call run, so the partial file gets deleted
|
||||
self.run()
|
||||
self.recycle()
|
||||
# Otherwise request cancellation
|
||||
elif self.status == self.DOWNLOADING:
|
||||
self.status = self.CANCELLING
|
||||
|
||||
def removed_from_list(self):
|
||||
|
@ -760,6 +778,10 @@ class DownloadTask(object):
|
|||
def recycle(self):
|
||||
self.episode.download_task = None
|
||||
|
||||
def set_episode_download_task(self):
|
||||
if not self.episode.download_task:
|
||||
self.episode.download_task = self
|
||||
|
||||
def run(self):
|
||||
# Speed calculation (re-)starts here
|
||||
self.__start_time = 0
|
||||
|
@ -779,19 +801,17 @@ class DownloadTask(object):
|
|||
self.status = DownloadTask.PAUSED
|
||||
return False
|
||||
|
||||
# We only start this download if its status is queued
|
||||
if self.status != DownloadTask.QUEUED:
|
||||
# We only start this download if its status is downloading
|
||||
if self.status != DownloadTask.DOWNLOADING:
|
||||
return False
|
||||
|
||||
# We are downloading this file right now
|
||||
self.status = DownloadTask.DOWNLOADING
|
||||
self._notification_shown = False
|
||||
|
||||
# Restore a reference to this task in the episode
|
||||
# when running a recycled task following a pause or failed
|
||||
# see #649
|
||||
if not self.episode.download_task:
|
||||
self.episode.download_task = self
|
||||
self.set_episode_download_task()
|
||||
|
||||
url = self.__episode.url
|
||||
result = DownloadTask.DOWNLOADING
|
||||
|
@ -916,14 +936,6 @@ class DownloadTask(object):
|
|||
self.error_message = _('Error: %s') % (str(e),)
|
||||
|
||||
with self:
|
||||
if result == DownloadTask.FAILED:
|
||||
self.status = DownloadTask.FAILED
|
||||
self.__episode._download_error = self.error_message
|
||||
|
||||
# Delete empty partial files, they prevent streaming after a download failure (live stream)
|
||||
if util.calculate_size(self.filename) == 0:
|
||||
util.delete_file(self.tempname)
|
||||
|
||||
if result == DownloadTask.DOWNLOADING:
|
||||
# Everything went well - we're done (even if the task was cancelled/paused,
|
||||
# since it's finished we might as well mark it done)
|
||||
|
@ -937,8 +949,16 @@ class DownloadTask(object):
|
|||
|
||||
self.speed = 0.0
|
||||
|
||||
# cancelled -- update state to mark it as safe to manipulate this task again
|
||||
if self.status == DownloadTask.PAUSING:
|
||||
if result == DownloadTask.FAILED:
|
||||
self.status = DownloadTask.FAILED
|
||||
self.__episode._download_error = self.error_message
|
||||
|
||||
# Delete empty partial files, they prevent streaming after a download failure (live stream)
|
||||
if util.calculate_size(self.filename) == 0:
|
||||
util.delete_file(self.tempname)
|
||||
|
||||
# cancelled/paused -- update state to mark it as safe to manipulate this task again
|
||||
elif self.status == DownloadTask.PAUSING:
|
||||
self.status = DownloadTask.PAUSED
|
||||
elif self.status == DownloadTask.CANCELLING:
|
||||
self.status = DownloadTask.CANCELLED
|
||||
|
|
|
@ -35,64 +35,32 @@ from gpodder import download, util
|
|||
_ = gpodder.gettext
|
||||
|
||||
|
||||
class TaskQueue:
|
||||
class DequeueRequest:
|
||||
def __init__(self):
|
||||
self.lock = threading.Lock()
|
||||
self.tasks = list()
|
||||
self.cv = threading.Condition()
|
||||
self.value = None
|
||||
self.resolved = False
|
||||
|
||||
def __len__(self):
|
||||
with self.lock:
|
||||
return len(self.tasks)
|
||||
def dequeue(self):
|
||||
with self.cv:
|
||||
self.cv.wait_for(lambda: self.resolved)
|
||||
return self.value
|
||||
|
||||
def add_task(self, task):
|
||||
with self.lock:
|
||||
if task not in self.tasks:
|
||||
self.tasks.append(task)
|
||||
|
||||
def remove_task(self, task):
|
||||
with self.lock:
|
||||
try:
|
||||
self.tasks.remove(task)
|
||||
return True
|
||||
except ValueError:
|
||||
# already dequeued
|
||||
return False
|
||||
|
||||
def pop(self):
|
||||
with self.lock:
|
||||
if len(self.tasks) == 0:
|
||||
return None
|
||||
task = self.tasks.pop(0)
|
||||
return task
|
||||
|
||||
def move_after(self, task, after):
|
||||
with self.lock:
|
||||
try:
|
||||
index = self.tasks.index(after)
|
||||
self.tasks.remove(task)
|
||||
self.tasks.insert(index + 1, task)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
def move_before(self, task, before):
|
||||
with self.lock:
|
||||
try:
|
||||
index = self.tasks.index(before)
|
||||
self.tasks.remove(task)
|
||||
self.tasks.insert(index, task)
|
||||
except ValueError:
|
||||
pass
|
||||
def resolve(self, value):
|
||||
self.value = value
|
||||
self.resolved = True
|
||||
with self.cv:
|
||||
self.cv.notify()
|
||||
|
||||
|
||||
class DownloadStatusModel:
|
||||
class DownloadStatusModel(Gtk.ListStore):
|
||||
# Symbolic names for our columns, so we know what we're up to
|
||||
C_TASK, C_NAME, C_URL, C_PROGRESS, C_PROGRESS_TEXT, C_ICON_NAME = list(range(6))
|
||||
|
||||
SEARCH_COLUMNS = (C_NAME, C_URL)
|
||||
|
||||
def __init__(self):
|
||||
self.list = Gtk.ListStore(object, str, str, int, str, str)
|
||||
self.work_queue = TaskQueue()
|
||||
Gtk.ListStore.__init__(self, object, str, str, int, str, str)
|
||||
|
||||
# Set up stock icon IDs for tasks
|
||||
self._status_ids = collections.defaultdict(lambda: None)
|
||||
|
@ -102,9 +70,6 @@ class DownloadStatusModel:
|
|||
self._status_ids[download.DownloadTask.CANCELLED] = 'media-playback-stop'
|
||||
self._status_ids[download.DownloadTask.PAUSED] = 'media-playback-pause'
|
||||
|
||||
def get_model(self):
|
||||
return self.list
|
||||
|
||||
def _format_message(self, episode, message, podcast):
|
||||
episode = html.escape(episode)
|
||||
podcast = html.escape(podcast)
|
||||
|
@ -114,10 +79,10 @@ class DownloadStatusModel:
|
|||
def request_update(self, iter, task=None):
|
||||
if task is None:
|
||||
# Ongoing update request from UI - get task from model
|
||||
task = self.list.get_value(iter, self.C_TASK)
|
||||
task = self.get_value(iter, self.C_TASK)
|
||||
else:
|
||||
# Initial update request - update non-changing fields
|
||||
self.list.set(iter,
|
||||
self.set(iter,
|
||||
self.C_TASK, task,
|
||||
self.C_URL, task.url)
|
||||
|
||||
|
@ -151,7 +116,7 @@ class DownloadStatusModel:
|
|||
else:
|
||||
progress_message = ('unknown size')
|
||||
|
||||
self.list.set(iter,
|
||||
self.set(iter,
|
||||
self.C_NAME, self._format_message(task.episode.title,
|
||||
status_message, task.episode.channel.title),
|
||||
self.C_PROGRESS, 100. * task.progress,
|
||||
|
@ -159,30 +124,36 @@ class DownloadStatusModel:
|
|||
self.C_ICON_NAME, self._status_ids[task.status])
|
||||
|
||||
def __add_new_task(self, task):
|
||||
iter = self.list.append()
|
||||
iter = self.append()
|
||||
self.request_update(iter, task)
|
||||
|
||||
def register_task(self, task):
|
||||
# self.work_queue.add_task(task)
|
||||
util.idle_add(self.__add_new_task, task)
|
||||
def register_task(self, task, background=True):
|
||||
if background:
|
||||
util.idle_add(self.__add_new_task, task)
|
||||
else:
|
||||
self.__add_new_task(task)
|
||||
|
||||
def queue_task(self, task):
|
||||
with task:
|
||||
if task.status in (task.NEW, task.FAILED, task.CANCELLED, task.PAUSED):
|
||||
task.status = task.QUEUED
|
||||
self.work_queue.add_task(task)
|
||||
task.set_episode_download_task()
|
||||
|
||||
def tell_all_tasks_to_quit(self):
|
||||
for row in self.list:
|
||||
for row in self:
|
||||
task = row[DownloadStatusModel.C_TASK]
|
||||
if task is not None:
|
||||
with task:
|
||||
# Pause currently-running (and queued) downloads
|
||||
if task.status in (task.QUEUED, task.DOWNLOADING):
|
||||
# Pause currently queued downloads
|
||||
if task.status == task.QUEUED:
|
||||
task.status = task.PAUSED
|
||||
|
||||
# Request pause of currently running downloads
|
||||
elif task.status == task.DOWNLOADING:
|
||||
task.status = task.PAUSING
|
||||
|
||||
# Delete cancelled and failed downloads
|
||||
if task.status in (task.CANCELLED, task.FAILED):
|
||||
elif task.status in (task.CANCELLED, task.FAILED):
|
||||
task.removed_from_list()
|
||||
|
||||
def are_downloads_in_progress(self):
|
||||
|
@ -190,7 +161,7 @@ class DownloadStatusModel:
|
|||
Returns True if there are any downloads in the
|
||||
QUEUED or DOWNLOADING status, False otherwise.
|
||||
"""
|
||||
for row in self.list:
|
||||
for row in self:
|
||||
task = row[DownloadStatusModel.C_TASK]
|
||||
if task is not None and \
|
||||
task.status in (task.DOWNLOADING,
|
||||
|
@ -199,30 +170,34 @@ class DownloadStatusModel:
|
|||
|
||||
return False
|
||||
|
||||
def move_after(self, iter, position):
|
||||
self.list.move_after(iter, position)
|
||||
iter_task = self.list.get_value(iter, DownloadStatusModel.C_TASK)
|
||||
pos_task = self.list.get_value(position, DownloadStatusModel.C_TASK)
|
||||
self.work_queue.move_after(iter_task, pos_task)
|
||||
|
||||
def move_before(self, iter, position):
|
||||
self.list.move_before(iter, position)
|
||||
iter_task = self.list.get_value(iter, DownloadStatusModel.C_TASK)
|
||||
pos_task = self.list.get_value(position, DownloadStatusModel.C_TASK)
|
||||
self.work_queue.move_before(iter_task, pos_task)
|
||||
|
||||
def has_work(self):
|
||||
return len(self.work_queue) > 0
|
||||
return any(self._work_gen())
|
||||
|
||||
def available_work_count(self):
|
||||
return len(self.work_queue)
|
||||
return len(list(self._work_gen()))
|
||||
|
||||
def __get_next(self, dqr):
|
||||
try:
|
||||
task = next(self._work_gen())
|
||||
# this is the only thread accessing the list store, so it's safe
|
||||
# to assume a) the task is still queued and b) we can transition to downloading
|
||||
task.status = task.DOWNLOADING
|
||||
except StopIteration as e:
|
||||
task = None
|
||||
# hand the task off to the worker thread
|
||||
dqr.resolve(task)
|
||||
|
||||
# get the next task to download. this proxies the request to the main thread,
|
||||
# as only the main thread is allowed to manipulate the list store.
|
||||
def get_next(self):
|
||||
return self.work_queue.pop()
|
||||
dqr = DequeueRequest()
|
||||
util.idle_add(self.__get_next, dqr)
|
||||
return dqr.dequeue()
|
||||
|
||||
def set_downloading(self, task):
|
||||
# return False if Task was already dequeued by get_next
|
||||
return self.work_queue.remove_task(task)
|
||||
def _work_gen(self):
|
||||
return (task for task in
|
||||
(row[DownloadStatusModel.C_TASK] for row in self)
|
||||
if task.status == task.QUEUED)
|
||||
|
||||
|
||||
class DownloadTaskMonitor(object):
|
||||
|
|
|
@ -1064,7 +1064,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
|
|||
column.set_property('min-width', 150)
|
||||
column.set_property('max-width', 150)
|
||||
|
||||
self.treeDownloads.set_model(self.download_status_model.get_model())
|
||||
self.treeDownloads.set_model(self.download_status_model)
|
||||
TreeViewHelper.set(self.treeDownloads, TreeViewHelper.ROLE_DOWNLOADS)
|
||||
|
||||
self.treeDownloads.connect('popup-menu', self.treeview_downloads_show_context_menu)
|
||||
|
@ -1113,7 +1113,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
|
|||
self.download_list_update_enabled = True
|
||||
|
||||
def cleanup_downloads(self):
|
||||
model = self.download_status_model.get_model()
|
||||
model = self.download_status_model
|
||||
|
||||
all_tasks = [(Gtk.TreeRowReference.new(model, row.path), row[0]) for row in model]
|
||||
changed_episode_urls = set()
|
||||
|
@ -1160,7 +1160,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
|
|||
|
||||
def update_downloads_list(self, can_call_cleanup=True):
|
||||
try:
|
||||
model = self.download_status_model.get_model()
|
||||
model = self.download_status_model
|
||||
|
||||
downloading, synchronizing, failed, finished, queued, paused, others = 0, 0, 0, 0, 0, 0, 0
|
||||
total_speed, total_size, done_size = 0, 0, 0
|
||||
|
@ -1177,7 +1177,6 @@ class gPodder(BuilderWidget, dbus.service.Object):
|
|||
|
||||
task = row[self.download_status_model.C_TASK]
|
||||
speed, size, status, progress, activity = task.speed, task.total_size, task.status, task.progress, task.activity
|
||||
logger.info("%s: %f", task.episode.title, progress)
|
||||
|
||||
# Let the download task monitors know of changes
|
||||
for monitor in self.download_task_monitors:
|
||||
|
@ -1557,36 +1556,39 @@ class gPodder(BuilderWidget, dbus.service.Object):
|
|||
|
||||
return (''.join(result)).strip()
|
||||
|
||||
def queue_task(self, task, force_start):
|
||||
if force_start:
|
||||
self.download_queue_manager.force_start_task(task)
|
||||
else:
|
||||
self.download_queue_manager.queue_task(task)
|
||||
|
||||
def _for_each_task_set_status(self, tasks, status, force_start=False):
|
||||
episode_urls = set()
|
||||
model = self.treeDownloads.get_model()
|
||||
for row_reference, task in tasks:
|
||||
with task:
|
||||
if status == download.DownloadTask.QUEUED:
|
||||
# Only queue task when its paused/failed/cancelled (or forced)
|
||||
# Only queue task when it's paused/failed/cancelled (or forced)
|
||||
if task.status in (download.DownloadTask.PAUSED,
|
||||
download.DownloadTask.FAILED,
|
||||
download.DownloadTask.CANCELLED) or force_start:
|
||||
if force_start:
|
||||
self.download_queue_manager.force_start_task(task)
|
||||
else:
|
||||
self.download_queue_manager.queue_task(task)
|
||||
|
||||
# add the task back in if it was already cleaned up
|
||||
# (to trigger this cancel one downloads in the active list, cancel all
|
||||
# other downloads, quickly right click on the cancelled on one to get
|
||||
# the context menu, wait until the active list is cleared, and then
|
||||
# then choose download)
|
||||
if task not in self.download_tasks_seen:
|
||||
self.download_status_model.register_task(task, False)
|
||||
self.download_tasks_seen.add(task)
|
||||
|
||||
self.queue_task(task, force_start)
|
||||
self.set_download_list_state(gPodderSyncUI.DL_ONEOFF)
|
||||
elif status == download.DownloadTask.CANCELLING:
|
||||
logger.info(("cancelling task %s" % task.status))
|
||||
# Cancelling a download allowed when downloading/queued
|
||||
if task.status in (download.DownloadTask.QUEUED, download.DownloadTask.DOWNLOADING):
|
||||
task.status = status
|
||||
# Cancelling paused/failed downloads requires a call to .run()
|
||||
elif task.status in (download.DownloadTask.PAUSED, download.DownloadTask.FAILED):
|
||||
task.status = status
|
||||
# Call run, so the partial file gets deleted
|
||||
task.run()
|
||||
task.recycle()
|
||||
task.cancel()
|
||||
elif status == download.DownloadTask.PAUSING:
|
||||
# Pausing a download only when queued/downloading
|
||||
if task.status in (download.DownloadTask.QUEUED, download.DownloadTask.DOWNLOADING):
|
||||
task.status = status
|
||||
task.pause()
|
||||
elif status is None:
|
||||
# Remove the selected task - cancel downloading/queued tasks
|
||||
if task.status in (download.DownloadTask.QUEUED, download.DownloadTask.DOWNLOADING):
|
||||
|
@ -1646,7 +1648,10 @@ class gPodder(BuilderWidget, dbus.service.Object):
|
|||
index_above = path[0] - 1
|
||||
if index_above < 0:
|
||||
return
|
||||
self.download_status_model.move_before(
|
||||
task = model.get_value(
|
||||
model.get_iter(path),
|
||||
DownloadStatusModel.C_TASK)
|
||||
model.move_before(
|
||||
model.get_iter(path),
|
||||
model.get_iter((index_above,)))
|
||||
|
||||
|
@ -1657,7 +1662,10 @@ class gPodder(BuilderWidget, dbus.service.Object):
|
|||
index_below = path[0] + 1
|
||||
if index_below >= len(model):
|
||||
return
|
||||
self.download_status_model.move_after(
|
||||
task = model.get_value(
|
||||
model.get_iter(path),
|
||||
DownloadStatusModel.C_TASK)
|
||||
model.move_after(
|
||||
model.get_iter(path),
|
||||
model.get_iter((index_below,)))
|
||||
|
||||
|
@ -1955,12 +1963,12 @@ class gPodder(BuilderWidget, dbus.service.Object):
|
|||
item.connect('activate', self.on_playback_selected_episodes)
|
||||
menu.append(item)
|
||||
|
||||
if not can_cancel:
|
||||
if can_download:
|
||||
item = Gtk.ImageMenuItem(_('Download'))
|
||||
item.set_image(Gtk.Image.new_from_icon_name('document-save', Gtk.IconSize.MENU))
|
||||
item.set_action_name('win.download')
|
||||
menu.append(item)
|
||||
else:
|
||||
elif can_cancel:
|
||||
item = Gtk.ImageMenuItem.new_with_mnemonic(_('_Cancel'))
|
||||
item.set_action_name('win.cancel')
|
||||
menu.append(item)
|
||||
|
@ -3079,10 +3087,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
|
|||
task.status = task.PAUSED
|
||||
else:
|
||||
self.mygpo_client.on_download([task.episode])
|
||||
if force_start:
|
||||
self.download_queue_manager.force_start_task(task)
|
||||
else:
|
||||
self.download_queue_manager.queue_task(task)
|
||||
self.queue_task(task, force_start)
|
||||
if tasks or queued_existing_task:
|
||||
self.set_download_list_state(gPodderSyncUI.DL_ONEOFF)
|
||||
# Flush updated episode status
|
||||
|
@ -3108,10 +3113,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
|
|||
if downloader:
|
||||
# replace existing task's download with forced one
|
||||
task.downloader = downloader
|
||||
if force_start:
|
||||
self.download_queue_manager.force_start_task(task)
|
||||
else:
|
||||
self.download_queue_manager.queue_task(task)
|
||||
self.queue_task(task, force_start)
|
||||
queued_existing_task = True
|
||||
continue
|
||||
|
||||
|
@ -3140,16 +3142,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
|
|||
return
|
||||
|
||||
for task in tasks:
|
||||
with task:
|
||||
if task.status in (task.NEW, task.QUEUED, task.DOWNLOADING):
|
||||
task.status = task.CANCELLING
|
||||
elif task.status == task.PAUSED:
|
||||
task.status = task.CANCELLED
|
||||
# Call run, so the partial file gets deleted
|
||||
task.run()
|
||||
task.recycle()
|
||||
elif force:
|
||||
task.status = task.CANCELLED
|
||||
task.cancel()
|
||||
|
||||
self.update_episode_list_icons([task.url for task in tasks])
|
||||
self.play_or_download()
|
||||
|
@ -3639,7 +3632,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
|
|||
for tree_row_reference, task in selected_tasks:
|
||||
with task:
|
||||
if task.status in (task.DOWNLOADING, task.QUEUED):
|
||||
task.status = task.PAUSED
|
||||
task.pause()
|
||||
elif task.status in (task.CANCELLED, task.PAUSED, task.FAILED):
|
||||
self.download_queue_manager.queue_task(task)
|
||||
self.set_download_list_state(gPodderSyncUI.DL_ONEOFF)
|
||||
|
|
|
@ -30,6 +30,7 @@ import os.path
|
|||
import threading
|
||||
import time
|
||||
from enum import Enum
|
||||
from os import sync
|
||||
from re import S
|
||||
from urllib.parse import urlparse
|
||||
|
||||
|
@ -210,6 +211,15 @@ class Device(services.ObservableService):
|
|||
logger.warning('Not syncing disks. Unmount your device before unplugging.')
|
||||
return True
|
||||
|
||||
def create_task(self, track):
|
||||
return SyncTask(track)
|
||||
|
||||
def cancel_task(self, task):
|
||||
pass
|
||||
|
||||
def cleanup_task(self, task):
|
||||
pass
|
||||
|
||||
def add_sync_tasks(self, tracklist, force_played=False, done_callback=None):
|
||||
for track in list(tracklist):
|
||||
# Filter tracks that are not meant to be synchronized
|
||||
|
@ -228,7 +238,7 @@ class Device(services.ObservableService):
|
|||
break
|
||||
|
||||
# XXX: need to check if track is added properly?
|
||||
sync_task = SyncTask(track)
|
||||
sync_task = self.create_task(track)
|
||||
|
||||
sync_task.status = sync_task.NEW
|
||||
sync_task.device = self
|
||||
|
@ -567,7 +577,22 @@ class MP3PlayerDevice(Device):
|
|||
def get_episode_file_on_device(self, episode):
|
||||
return episode_filename_on_device(self._config, episode)
|
||||
|
||||
def add_track(self, episode, reporthook=None):
|
||||
def create_task(self, track):
|
||||
return GioSyncTask(track)
|
||||
|
||||
def cancel_task(self, task):
|
||||
task.cancellable.cancel()
|
||||
|
||||
# called by the sync task when it is removed and needs partial files cleaning up
|
||||
def cleanup_task(self, task):
|
||||
episode = task.episode
|
||||
folder = self.get_episode_folder_on_device(episode)
|
||||
file = self.get_episode_file_on_device(episode)
|
||||
file = folder.get_child(file)
|
||||
self.remove_track_file(file)
|
||||
|
||||
def add_track(self, task, reporthook=None):
|
||||
episode = task.episode
|
||||
self.notify('status', _('Adding %s') % episode.title)
|
||||
|
||||
# get the folder on the device
|
||||
|
@ -604,8 +629,10 @@ class MP3PlayerDevice(Device):
|
|||
try:
|
||||
def hookconvert(current_bytes, total_bytes, user_data):
|
||||
return reporthook(current_bytes, 1, total_bytes)
|
||||
from_file.copy(to_file, Gio.FileCopyFlags.OVERWRITE, None, hookconvert, None)
|
||||
from_file.copy(to_file, Gio.FileCopyFlags.OVERWRITE, task.cancellable, hookconvert, None)
|
||||
except GLib.Error as err:
|
||||
if err.matches(Gio.io_error_quark(), Gio.IOErrorEnum.CANCELLED):
|
||||
raise SyncCancelledException()
|
||||
logger.error('Error copying %s to %s: %s', from_file.get_uri(), to_file.get_uri(), err.message)
|
||||
d = {'from_file': from_file.get_uri(), 'to_file': to_file.get_uri(), 'message': err.message}
|
||||
self.errors.append(_('Error copying %(from_file)s to %(to_file)s: %(message)s') % d)
|
||||
|
@ -656,11 +683,7 @@ class MP3PlayerDevice(Device):
|
|||
self._config.device_sync.max_filename_length)
|
||||
return self._track_on_device(e)
|
||||
|
||||
def remove_track(self, track):
|
||||
self.notify('status', _('Removing %s') % track.title)
|
||||
|
||||
# get the folder on the device
|
||||
file = Gio.File.new_for_uri(track.filename)
|
||||
def remove_track_file(self, file):
|
||||
folder = file.get_parent()
|
||||
if file.query_exists():
|
||||
try:
|
||||
|
@ -681,6 +704,13 @@ class MP3PlayerDevice(Device):
|
|||
if not err.matches(Gio.io_error_quark(), Gio.IOErrorEnum.NOT_FOUND):
|
||||
logger.error('deleting folder %s failed: %s', folder.get_uri(), err.message)
|
||||
|
||||
def remove_track(self, track):
|
||||
self.notify('status', _('Removing %s') % track.title)
|
||||
|
||||
# get the folder on the device
|
||||
file = Gio.File.new_for_uri(track.filename)
|
||||
self.remove_track_file(file)
|
||||
|
||||
def directory_is_empty(self, directory):
|
||||
for child in directory.enumerate_children(Gio.FILE_ATTRIBUTE_STANDARD_NAME, Gio.FileQueryInfoFlags.NONE, None):
|
||||
return False
|
||||
|
@ -750,14 +780,31 @@ class SyncTask(download.DownloadTask):
|
|||
|
||||
episode = property(fget=__get_episode)
|
||||
|
||||
def pause(self):
|
||||
with self:
|
||||
# Pause a queued download
|
||||
if self.status == self.QUEUED:
|
||||
self.status = self.PAUSED
|
||||
# Request pause of a running download
|
||||
elif self.status == self.DOWNLOADING:
|
||||
self.status = self.PAUSING
|
||||
|
||||
def cancel(self):
|
||||
with self:
|
||||
if self.status in (self.DOWNLOADING, self.QUEUED):
|
||||
# Cancelling directly is allowed if the task isn't currently downloading
|
||||
if self.status in (self.QUEUED, self.PAUSED, self.FAILED):
|
||||
self.status = self.CANCELLED
|
||||
# Call run, so the partial file gets deleted
|
||||
self.run()
|
||||
self.recycle()
|
||||
# Otherwise request cancellation
|
||||
elif self.status == self.DOWNLOADING:
|
||||
self.status = self.CANCELLING
|
||||
self.device.cancel(self)
|
||||
|
||||
def removed_from_list(self):
|
||||
# XXX: Should we delete temporary/incomplete files here?
|
||||
pass
|
||||
if self.status != self.DONE:
|
||||
self.device.cleanup_task(self)
|
||||
|
||||
def __init__(self, episode):
|
||||
self.__lock = threading.RLock()
|
||||
|
@ -768,7 +815,6 @@ class SyncTask(download.DownloadTask):
|
|||
|
||||
# Create the target filename and save it in the database
|
||||
self.filename = self.__episode.local_filename(create=False)
|
||||
self.tempname = self.filename + '.partial'
|
||||
|
||||
self.total_size = self.__episode.file_size
|
||||
self.speed = 0.0
|
||||
|
@ -826,11 +872,12 @@ class SyncTask(download.DownloadTask):
|
|||
self.progress = max(0.0, min(1.0, (count * blockSize) / self.total_size))
|
||||
self._progress_updated(self.progress)
|
||||
|
||||
if self.status == SyncTask.CANCELLING:
|
||||
raise SyncCancelledException()
|
||||
if self.status in (SyncTask.CANCELLING, SyncTask.PAUSING):
|
||||
self._signal_cancel_from_status()
|
||||
|
||||
if self.status == SyncTask.PAUSING:
|
||||
raise SyncCancelledException()
|
||||
# default implementation
|
||||
def _signal_cancel_from_status(self):
|
||||
raise SyncCancelledException()
|
||||
|
||||
def recycle(self):
|
||||
self.episode.download_task = None
|
||||
|
@ -841,31 +888,28 @@ class SyncTask(download.DownloadTask):
|
|||
self.__start_blocks = 0
|
||||
|
||||
# If the download has already been cancelled/paused, skip it
|
||||
if self.status == SyncTask.CANCELLING:
|
||||
util.delete_file(self.tempname)
|
||||
self.progress = 0.0
|
||||
self.speed = 0.0
|
||||
self.status = SyncTask.CANCELLED
|
||||
return False
|
||||
|
||||
if self.status == SyncTask.PAUSING:
|
||||
self.status = SyncTask.PAUSED
|
||||
return False
|
||||
|
||||
with self:
|
||||
# We only start this download if its status is "queued"
|
||||
if self.status != SyncTask.QUEUED:
|
||||
if self.status in (SyncTask.CANCELLING, SyncTask.CANCELLED):
|
||||
self.progress = 0.0
|
||||
self.speed = 0.0
|
||||
self.status = SyncTask.CANCELLED
|
||||
return False
|
||||
|
||||
if self.status == SyncTask.PAUSING:
|
||||
self.status = SyncTask.PAUSED
|
||||
return False
|
||||
|
||||
# We only start this download if its status is downloading
|
||||
if self.status != SyncTask.DOWNLOADING:
|
||||
return False
|
||||
|
||||
# We are synching this file right now
|
||||
self.status = SyncTask.DOWNLOADING
|
||||
self._notification_shown = False
|
||||
|
||||
self._notification_shown = False
|
||||
|
||||
sync_result = SyncTask.DONE
|
||||
sync_result = SyncTask.DOWNLOADING
|
||||
try:
|
||||
logger.info('Starting SyncTask')
|
||||
self.device.add_track(self.episode, reporthook=self.status_updated)
|
||||
self.device.add_track(self, reporthook=self.status_updated)
|
||||
except SyncCancelledException as e:
|
||||
sync_result = SyncTask.CANCELLED
|
||||
except Exception as e:
|
||||
|
@ -874,12 +918,7 @@ class SyncTask(download.DownloadTask):
|
|||
self.error_message = _('Error: %s') % (str(e),)
|
||||
|
||||
with self:
|
||||
if sync_result == SyncTask.CANCELLED:
|
||||
if self.status == SyncTask.CANCELLING:
|
||||
self.status = SyncTask.CANCELLED
|
||||
else:
|
||||
self.status = SyncTask.PAUSED
|
||||
elif sync_result == SyncTask.DONE:
|
||||
if sync_result == SyncTask.DOWNLOADING:
|
||||
# Everything went well - we're done
|
||||
self.status = SyncTask.DONE
|
||||
if self.total_size <= 0:
|
||||
|
@ -889,7 +928,26 @@ class SyncTask(download.DownloadTask):
|
|||
gpodder.user_extensions.on_episode_synced(self.device, self.__episode)
|
||||
return True
|
||||
|
||||
self.speed = 0.0
|
||||
self.speed = 0.0
|
||||
|
||||
if sync_result == SyncTask.FAILED:
|
||||
self.status = SyncTask.FAILED
|
||||
|
||||
# cancelled/paused -- update state to mark it as safe to manipulate this task again
|
||||
elif self.status == SyncTask.PAUSING:
|
||||
self.status = SyncTask.PAUSED
|
||||
elif self.status == SyncTask.CANCELLING:
|
||||
self.status = SyncTask.CANCELLED
|
||||
|
||||
# We finished, but not successfully (at least not really)
|
||||
return False
|
||||
|
||||
|
||||
class GioSyncTask(SyncTask):
|
||||
def __init__(self, episode):
|
||||
super().__init__(episode)
|
||||
# For cancelling the copy
|
||||
self.cancellable = Gio.Cancellable()
|
||||
|
||||
def _signal_cancel_from_status(self):
|
||||
self.cancellable.cancel()
|
||||
|
|
Loading…
Reference in New Issue