# -*- coding: utf-8 -*- # # gPodder - A media aggregator and podcast client # Copyright (c) 2005-2018 The gPodder Team # # gPodder is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # gPodder is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # # my.py -- mygpo Client Abstraction for gPodder # Thomas Perl ; 2010-01-19 # import atexit import calendar import datetime import logging import os import sys import time # Append gPodder's user agent to mygpoclient's user agent import mygpoclient from mygpoclient import api, public from mygpoclient import util as mygpoutil import gpodder from gpodder import minidb, util _ = gpodder.gettext logger = logging.getLogger(__name__) mygpoclient.user_agent += ' ' + gpodder.user_agent # 2013-02-08: We should update this to 1.7 once we use the new features MYGPOCLIENT_REQUIRED = '1.4' if (not hasattr(mygpoclient, 'require_version') or not mygpoclient.require_version(MYGPOCLIENT_REQUIRED)): print(""" Please upgrade your mygpoclient library. See http://thp.io/2010/mygpoclient/ Required version: %s Installed version: %s """ % (MYGPOCLIENT_REQUIRED, mygpoclient.__version__), file=sys.stderr) sys.exit(1) try: from mygpoclient.simple import MissingCredentials except ImportError: # if MissingCredentials does not yet exist in the installed version of # mygpoclient, we use an object that can never be raised/caught MissingCredentials = object() EPISODE_ACTIONS_BATCH_SIZE = 100 # Database model classes class SinceValue(object): __slots__ = {'host': str, 'device_id': str, 'category': int, 'since': int} # Possible values for the "category" field PODCASTS, EPISODES = list(range(2)) def __init__(self, host, device_id, category, since=0): self.host = host self.device_id = device_id self.category = category self.since = since class SubscribeAction(object): __slots__ = {'action_type': int, 'url': str} # Possible values for the "action_type" field ADD, REMOVE = list(range(2)) def __init__(self, action_type, url): self.action_type = action_type self.url = url @property def is_add(self): return self.action_type == self.ADD @property def is_remove(self): return self.action_type == self.REMOVE @classmethod def add(cls, url): return cls(cls.ADD, url) @classmethod def remove(cls, url): return cls(cls.REMOVE, url) @classmethod def undo(cls, action): if action.is_add: return cls(cls.REMOVE, action.url) elif action.is_remove: return cls(cls.ADD, action.url) raise ValueError('Cannot undo action: %r' % action) # New entity name for "received" actions class ReceivedSubscribeAction(SubscribeAction): pass class UpdateDeviceAction(object): __slots__ = {'device_id': str, 'caption': str, 'device_type': str} def __init__(self, device_id, caption, device_type): self.device_id = device_id self.caption = caption self.device_type = device_type class EpisodeAction(object): __slots__ = {'podcast_url': str, 'episode_url': str, 'device_id': str, 'action': str, 'timestamp': int, 'started': int, 'position': int, 'total': int} def __init__(self, podcast_url, episode_url, device_id, action, timestamp, started, position, total): self.podcast_url = podcast_url self.episode_url = episode_url self.device_id = device_id self.action = action self.timestamp = timestamp self.started = started self.position = position self.total = total # New entity name for "received" actions class ReceivedEpisodeAction(EpisodeAction): pass class RewrittenUrl(object): __slots__ = {'old_url': str, 'new_url': str} def __init__(self, old_url, new_url): self.old_url = old_url self.new_url = new_url # End Database model classes # Helper class for displaying changes in the UI class Change(object): def __init__(self, action, podcast=None): self.action = action self.podcast = podcast @property def description(self): if self.action.is_add: return _('Add %s') % self.action.url else: return _('Remove %s') % self.podcast.title class MygPoClient(object): STORE_FILE = 'gpodder.net' FLUSH_TIMEOUT = 60 FLUSH_RETRIES = 3 def __init__(self, config): self._store = minidb.Store(os.path.join(gpodder.home, self.STORE_FILE)) self._config = config self._client = None # Initialize the _client attribute and register with config self.on_config_changed() assert self._client is not None self._config.add_observer(self.on_config_changed) self._worker_thread = None atexit.register(self._at_exit) def create_device(self): """Uploads the device changes to the server This should be called when device settings change or when the mygpo client functionality is enabled. """ # Remove all previous device update actions self._store.remove(self._store.load(UpdateDeviceAction)) # Insert our new update action action = UpdateDeviceAction(self.device_id, self._config.mygpo.device.caption, self._config.mygpo.device.type) self._store.save(action) def get_rewritten_urls(self): """Returns a list of rewritten URLs for uploads This should be called regularly. Every object returned should be merged into the database, and the old_url should be updated to new_url in every podcdast. """ rewritten_urls = self._store.load(RewrittenUrl) self._store.remove(rewritten_urls) return rewritten_urls def process_episode_actions(self, find_episode, on_updated=None): """Process received episode actions The parameter "find_episode" should be a function accepting two parameters (podcast_url and episode_url). It will be used to get an episode object that needs to be updated. It should return None if the requested episode does not exist. The optional callback "on_updated" should accept a single parameter (the episode object) and will be called whenever the episode data is changed in some way. """ logger.debug('Processing received episode actions...') for action in self._store.load(ReceivedEpisodeAction): if action.action not in ('play', 'delete'): # Ignore all other action types for now continue episode = find_episode(action.podcast_url, action.episode_url) if episode is None: # The episode does not exist on this client continue if action.action == 'play': logger.debug('Play action for %s', episode.url) episode.mark(is_played=True) if (action.timestamp > episode.current_position_updated and action.position is not None): logger.debug('Updating position for %s', episode.url) episode.current_position = action.position episode.current_position_updated = action.timestamp if action.total: logger.debug('Updating total time for %s', episode.url) episode.total_time = action.total episode.save() if on_updated is not None: on_updated(episode) elif action.action == 'delete': if not episode.was_downloaded(and_exists=True): # Set the episode to a "deleted" state logger.debug('Marking as deleted: %s', episode.url) episode.delete_from_disk() episode.save() if on_updated is not None: on_updated(episode) # Remove all received episode actions self._store.delete(ReceivedEpisodeAction) self._store.commit() logger.debug('Received episode actions processed.') def get_received_actions(self): """Returns a list of ReceivedSubscribeAction objects The list might be empty. All these actions have to be processed. The user should confirm which of these actions should be taken, the rest should be rejected. Use confirm_received_actions and reject_received_actions to return and finalize the actions received by this method in order to not receive duplicate actions. """ return self._store.load(ReceivedSubscribeAction) def confirm_received_actions(self, actions): """Confirm that a list of actions has been processed The UI should call this with a list of actions that have been accepted by the user and processed by the podcast backend. """ # Simply remove the received actions from the queue self._store.remove(actions) def reject_received_actions(self, actions): """Reject (undo) a list of ReceivedSubscribeAction objects The UI should call this with a list of actions that have been rejected by the user. A reversed set of actions will be uploaded to the server so that the state on the server matches the state on the client. """ # Create "undo" actions for received subscriptions self._store.save(SubscribeAction.undo(a) for a in actions) self.flush() # After we've handled the reverse-actions, clean up self._store.remove(actions) @property def host(self): return self._config.mygpo.server @property def device_id(self): return self._config.mygpo.device.uid def can_access_webservice(self): return self._config.mygpo.enabled and \ self._config.mygpo.username and \ self._config.mygpo.device.uid def set_subscriptions(self, urls): if self.can_access_webservice(): logger.debug('Uploading (overwriting) subscriptions...') self._client.put_subscriptions(self.device_id, urls) logger.debug('Subscription upload done.') else: raise Exception('Webservice access not enabled') def _convert_played_episode(self, episode, start, end, total): return EpisodeAction(episode.channel.url, episode.url, self.device_id, 'play', int(time.time()), start, end, total) def _convert_episode(self, episode, action): return EpisodeAction(episode.channel.url, episode.url, self.device_id, action, int(time.time()), None, None, None) def on_delete(self, episodes): logger.debug('Storing %d episode delete actions', len(episodes)) self._store.save(self._convert_episode(e, 'delete') for e in episodes) def on_download(self, episodes): logger.debug('Storing %d episode download actions', len(episodes)) self._store.save(self._convert_episode(e, 'download') for e in episodes) def on_playback_full(self, episode, start, end, total): logger.debug('Storing full episode playback action') self._store.save(self._convert_played_episode(episode, start, end, total)) def on_playback(self, episodes): logger.debug('Storing %d episode playback actions', len(episodes)) self._store.save(self._convert_episode(e, 'play') for e in episodes) def on_subscribe(self, urls): # Cancel previously-inserted "remove" actions self._store.remove(SubscribeAction.remove(url) for url in urls) # Insert new "add" actions self._store.save(SubscribeAction.add(url) for url in urls) self.flush() def on_unsubscribe(self, urls): # Cancel previously-inserted "add" actions self._store.remove(SubscribeAction.add(url) for url in urls) # Insert new "remove" actions self._store.save(SubscribeAction.remove(url) for url in urls) self.flush() def _at_exit(self): self._worker_proc(forced=True) self._store.commit() self._store.close() def _worker_proc(self, forced=False): if not forced: # Store the current contents of the queue database self._store.commit() logger.debug('Worker thread waiting for timeout') time.sleep(self.FLUSH_TIMEOUT) # Only work when enabled, UID set and allowed to work if self.can_access_webservice() and \ (self._worker_thread is not None or forced): self._worker_thread = None logger.debug('Worker thread starting to work...') for retry in range(self.FLUSH_RETRIES): must_retry = False if retry: logger.debug('Retrying flush queue...') # Update the device first, so it can be created if new for action in self._store.load(UpdateDeviceAction): if self.update_device(action): self._store.remove(action) else: must_retry = True # Upload podcast subscription actions actions = self._store.load(SubscribeAction) if self.synchronize_subscriptions(actions): self._store.remove(actions) else: must_retry = True # Upload episode actions actions = self._store.load(EpisodeAction) if self.synchronize_episodes(actions): self._store.remove(actions) else: must_retry = True if not must_retry or not self.can_access_webservice(): # No more pending actions, or no longer enabled. # Ready to quit. break logger.debug('Worker thread finished.') else: logger.info('Worker thread may not execute (disabled).') # Store the current contents of the queue database self._store.commit() def flush(self, now=False): if not self.can_access_webservice(): logger.warning('Flush requested, but sync disabled.') return if self._worker_thread is None or now: if now: logger.debug('Flushing NOW.') else: logger.debug('Flush requested.') self._worker_thread = util.run_in_background(lambda: self._worker_proc(now), True) else: logger.debug('Flush requested, already waiting.') def on_config_changed(self, name=None, old_value=None, new_value=None): if name in ('mygpo.username', 'mygpo.password', 'mygpo.server') \ or self._client is None: self._client = api.MygPodderClient(self._config.mygpo.username, self._config.mygpo.password, self._config.mygpo.server) logger.info('Reloading settings.') elif name.startswith('mygpo.device.'): # Update or create the device self.create_device() def synchronize_episodes(self, actions): logger.debug('Starting episode status sync.') def convert_to_api(action): dt = datetime.datetime.utcfromtimestamp(action.timestamp) action_ts = mygpoutil.datetime_to_iso8601(dt) return api.EpisodeAction(action.podcast_url, action.episode_url, action.action, action.device_id, action_ts, action.started, action.position, action.total) def convert_from_api(action): dt = mygpoutil.iso8601_to_datetime(action.timestamp) action_ts = calendar.timegm(dt.timetuple()) return ReceivedEpisodeAction(action.podcast, action.episode, action.device, action.action, action_ts, action.started, action.position, action.total) try: # Load the "since" value from the database since_o = self._store.get(SinceValue, host=self.host, device_id=self.device_id, category=SinceValue.EPISODES) # Use a default since object for the first-time case if since_o is None: since_o = SinceValue(self.host, self.device_id, SinceValue.EPISODES) # Step 1: Download Episode actions try: changes = self._client.download_episode_actions(since_o.since) received_actions = [convert_from_api(a) for a in changes.actions] logger.debug('Received %d episode actions', len(received_actions)) self._store.save(received_actions) # Save the "since" value for later use self._store.update(since_o, since=changes.since) except (MissingCredentials, mygpoclient.http.Unauthorized): # handle outside raise except Exception as e: logger.warning('Exception while polling for episodes.', exc_info=True) # Step 2: Upload Episode actions # Uploads are done in batches; uploading can resume if only parts # be uploaded; avoids empty uploads as well for lower in range(0, len(actions), EPISODE_ACTIONS_BATCH_SIZE): batch = actions[lower:(lower + EPISODE_ACTIONS_BATCH_SIZE)] # Convert actions to the mygpoclient format for uploading episode_actions = [convert_to_api(a) for a in batch] # Upload the episode actions self._client.upload_episode_actions(episode_actions) # Actions have been uploaded to the server - remove them self._store.remove(batch) logger.debug('Episode actions have been uploaded to the server.') return True except (MissingCredentials, mygpoclient.http.Unauthorized): logger.warning('Invalid credentials. Disabling gpodder.net.') self._config.mygpo.enabled = False return False except Exception as e: logger.error('Cannot upload episode actions: %s', str(e), exc_info=True) return False def synchronize_subscriptions(self, actions): logger.debug('Starting subscription sync.') try: # Load the "since" value from the database since_o = self._store.get(SinceValue, host=self.host, device_id=self.device_id, category=SinceValue.PODCASTS) # Use a default since object for the first-time case if since_o is None: since_o = SinceValue(self.host, self.device_id, SinceValue.PODCASTS) # Step 1: Pull updates from the server and notify the frontend result = self._client.pull_subscriptions(self.device_id, since_o.since) # Update the "since" value in the database self._store.update(since_o, since=result.since) # Store received actions for later retrieval (and in case we # have outdated actions in the database, simply remove them) for url in result.add: logger.debug('Received add action: %s', url) self._store.remove(ReceivedSubscribeAction.remove(url)) self._store.remove(ReceivedSubscribeAction.add(url)) self._store.save(ReceivedSubscribeAction.add(url)) for url in result.remove: logger.debug('Received remove action: %s', url) self._store.remove(ReceivedSubscribeAction.add(url)) self._store.remove(ReceivedSubscribeAction.remove(url)) self._store.save(ReceivedSubscribeAction.remove(url)) # Step 2: Push updates to the server and rewrite URLs (if any) actions = self._store.load(SubscribeAction) add = [a.url for a in actions if a.is_add] remove = [a.url for a in actions if a.is_remove] if add or remove: logger.debug('Uploading: +%d / -%d', len(add), len(remove)) # Only do a push request if something has changed result = self._client.update_subscriptions(self.device_id, add, remove) # Update the "since" value in the database self._store.update(since_o, since=result.since) # Store URL rewrites for later retrieval by GUI for old_url, new_url in result.update_urls: if new_url: logger.debug('Rewritten URL: %s', new_url) self._store.save(RewrittenUrl(old_url, new_url)) # Actions have been uploaded to the server - remove them self._store.remove(actions) logger.debug('All actions have been uploaded to the server.') return True except (MissingCredentials, mygpoclient.http.Unauthorized): logger.warning('Invalid credentials. Disabling gpodder.net.') self._config.mygpo.enabled = False return False except Exception as e: logger.error('Cannot upload subscriptions: %s', str(e), exc_info=True) return False def update_device(self, action): try: logger.debug('Uploading device settings...') self._client.update_device_settings(action.device_id, action.caption, action.device_type) logger.debug('Device settings uploaded.') return True except (MissingCredentials, mygpoclient.http.Unauthorized): logger.warning('Invalid credentials. Disabling gpodder.net.') self._config.mygpo.enabled = False return False except Exception as e: logger.error('Cannot update device %s: %s', self.device_id, str(e), exc_info=True) return False def get_devices(self): result = [] try: devices = self._client.get_devices() except (MissingCredentials, mygpoclient.http.Unauthorized): logger.warning('Invalid credentials. Disabling gpodder.net.') self._config.mygpo.enabled = False raise for d in devices: result.append((d.device_id, d.caption, d.type)) return result def open_website(self): util.open_website('http://' + self._config.mygpo.server) def get_download_user_subscriptions_url(self): OPML_URL = self._client.locator.subscriptions_uri() url = util.url_add_authentication(OPML_URL, self._config.mygpo.username, self._config.mygpo.password) return url class Directory(object): def __init__(self): self.client = public.PublicClient() def toplist(self): return [(p.title or p.url, p.url) for p in self.client.get_toplist() if p.url] def search(self, query): return [(p.title or p.url, p.url) for p in self.client.search_podcasts(query) if p.url]