Threading: Use util.run_in_background to spawn threads

This makes it easier to check where threading is used
and will allow us to better handle creation of threads.
This commit is contained in:
Thomas Perl 2012-07-10 13:52:34 +02:00
parent f2f187549d
commit dc06483bbd
12 changed files with 42 additions and 54 deletions

View File

@ -49,7 +49,7 @@ if __name__ != '__main__':
logger.debug('Ubuntu progress update failed.', exc_info=True)
else:
from gi.repository import Unity, GObject
import threading
from gpodder import util
import sys
class InputReader:
@ -88,7 +88,7 @@ else:
GObject.threads_init()
loop = GObject.MainLoop()
threading.Thread(target=loop.run).start()
util.run_in_background(loop.run)
launcher_entry = LauncherEntry()
reader = InputReader(sys.stdin, launcher_entry)

View File

@ -32,7 +32,6 @@ import atexit
import os
import shutil
import time
import threading
import logging
_ = gpodder.gettext
@ -298,9 +297,7 @@ class Config(object):
def schedule_save(self):
if self.__save_thread is None:
self.__save_thread = threading.Thread(target=self.save_thread_proc)
self.__save_thread.setDaemon(True)
self.__save_thread.start()
self.__save_thread = util.run_in_background(self.save_thread_proc, True)
def save_thread_proc(self):
time.sleep(self.WRITE_TO_DISK_TIMEOUT)

View File

@ -342,9 +342,8 @@ class DownloadURLOpener(urllib.FancyURLopener):
return (None, None)
class DownloadQueueWorker(threading.Thread):
class DownloadQueueWorker(object):
def __init__(self, queue, exit_callback, continue_check_callback, minimum_tasks):
threading.Thread.__init__(self)
self.queue = queue
self.exit_callback = exit_callback
self.continue_check_callback = continue_check_callback
@ -355,8 +354,11 @@ class DownloadQueueWorker(threading.Thread):
# download, even if a download limit is in effect.
self.minimum_tasks = minimum_tasks
def __repr__(self):
return threading.current_thread().getName()
def run(self):
logger.info('Starting new thread: %s', self.getName())
logger.info('Starting new thread: %s', self)
while True:
# Check if this thread is allowed to continue accepting tasks
# (But only after reducing minimum_tasks to zero - see above)
@ -367,11 +369,11 @@ class DownloadQueueWorker(threading.Thread):
try:
task = self.queue.pop()
logger.info('%s is processing: %s', self.getName(), task)
logger.info('%s is processing: %s', self, task)
task.run()
task.recycle()
except IndexError, e:
logger.info('No more tasks for %s to carry out.', self.getName())
logger.info('No more tasks for %s to carry out.', self)
break
self.exit_callback(self)
@ -421,10 +423,10 @@ class DownloadQueueManager(object):
else:
minimum_tasks = 0
worker = DownloadQueueWorker(self.tasks, self.__exit_callback, \
worker = DownloadQueueWorker(self.tasks, self.__exit_callback,
self.__continue_check_callback, minimum_tasks)
self.worker_threads.append(worker)
worker.start()
util.run_in_background(worker.run)
def are_queued_or_active_tasks(self):
with self.worker_threads_access:

View File

@ -19,7 +19,6 @@
import gtk
import pango
import threading
import urllib
import os.path
@ -141,8 +140,8 @@ class gPodderPodcastDirectory(BuilderWidget):
self.entryURL.set_sensitive(False)
self.btnOK.set_sensitive(False)
self.treeviewChannelChooser.set_sensitive(False)
threading.Thread(target=self.thread_func).start()
threading.Thread(target=lambda: self.thread_func(1)).start()
util.run_in_background(self.thread_func)
util.run_in_background(lambda: self.thread_func(1))
def select_all( self, value ):
enabled = False
@ -164,7 +163,7 @@ class gPodderPodcastDirectory(BuilderWidget):
self.entryYoutubeSearch.set_sensitive(False)
self.treeviewYouTubeChooser.set_sensitive(False)
self.btnSearchYouTube.set_sensitive(False)
threading.Thread(target = lambda: self.thread_func(2)).start()
util.run_in_background(lambda: self.thread_func(2))
def on_btnSelectAll_clicked(self, widget, *args):
self.select_all(True)

View File

@ -19,7 +19,6 @@
import gtk
import pango
import threading
import cgi
import gpodder
@ -358,11 +357,11 @@ class gPodderPreferences(BuilderWidget):
title = _('Replace subscription list on server')
message = _('Remote podcasts that have not been added locally will be removed on the server. Continue?')
if self.show_confirmation(message, title):
@util.run_in_background
def thread_proc():
self._config.mygpo.enabled = True
self.on_send_full_subscriptions()
self._config.mygpo.enabled = False
threading.Thread(target=thread_proc).start()
def on_combobox_on_sync_changed(self, widget):
index = self.combobox_on_sync.get_active()

View File

@ -22,7 +22,6 @@
# Ported to gPodder 3 by Joseph Wickremasinghe in June 2012
import gtk
import threading
import gpodder
_ = gpodder.gettext
@ -143,12 +142,11 @@ class gPodderSyncUI(object):
return
# Finally start the synchronization process
@util.run_in_background
def sync_thread_func():
self.enable_download_list_update()
device.add_sync_tasks(episodes, force_played=force_played)
threading.Thread(target=sync_thread_func).start()
# This function is used to remove files from the device
def cleanup_episodes():
# 'skip_played_episodes' must be used or else all the
@ -175,5 +173,5 @@ class gPodderSyncUI(object):
# 1. Remove old episodes (in worker thread)
# 2. Check for free space (in UI thread)
# 3. Sync the device (in UI thread)
threading.Thread(target=cleanup_episodes).start()
util.run_in_background(cleanup_episodes)

View File

@ -31,7 +31,6 @@ import glob
import time
import tempfile
import collections
import threading
import urllib
import cgi
@ -214,7 +213,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
# load list of user applications for audio playback
self.user_apps_reader = UserAppsReader(['audio', 'video'])
threading.Thread(target=self.user_apps_reader.read).start()
util.run_in_background(self.user_apps_reader.read)
# Set up the first instance of MygPoClient
self.mygpo_client = my.MygPoClient(self.config)
@ -293,7 +292,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
util.idle_add(self.wNotebook.set_current_page, 0)
else:
util.idle_add(self.clean_up_downloads, True)
threading.Thread(target=find_partial_downloads).start()
util.run_in_background(find_partial_downloads)
# Start the auto-update procedure
self._auto_update_timer_source_id = None
@ -1675,7 +1674,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
util.delete_file(destfile)
threading.Thread(target=convert_and_send_thread, args=[episodes_to_copy]).start()
util.run_in_background(lambda: convert_and_send_thread(episodes_to_copy))
def treeview_available_show_context_menu(self, treeview, event=None):
model, paths = self.treeview_handle_context_menu_click(treeview, event)
@ -2271,6 +2270,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
selected=[e.check_is_new() for e in episodes])
@util.run_in_background
def thread_proc():
# After the initial sorting and splitting, try all queued podcasts
length = len(queued)
@ -2319,7 +2319,6 @@ class gPodder(BuilderWidget, dbus.service.Object):
worked.append(channel.url)
util.idle_add(on_after_update)
threading.Thread(target=thread_proc).start()
def find_episode(self, podcast_url, episode_url):
"""Find an episode given its podcast and episode URL
@ -2399,6 +2398,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
self.pbFeedUpdate.set_text(text)
self.pbFeedUpdate.set_fraction(0)
@util.run_in_background
def update_feed_cache_proc():
updated_channels = []
for updated, channel in enumerate(channels):
@ -2487,8 +2487,6 @@ class gPodder(BuilderWidget, dbus.service.Object):
util.idle_add(update_feed_cache_finish_callback)
threading.Thread(target=update_feed_cache_proc).start()
def on_gPodder_delete_event(self, widget, *args):
"""Called when the GUI wants to close the window
Displays a confirmation dialog (and closes/hides gPodder)
@ -2610,6 +2608,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
self.update_podcast_list_model(channel_urls)
self.play_or_download()
@util.run_in_background
def thread_proc():
episode_urls = set()
channel_urls = set()
@ -2636,8 +2635,6 @@ class gPodder(BuilderWidget, dbus.service.Object):
util.idle_add(finish_deletion, episode_urls, channel_urls)
threading.Thread(target=thread_proc).start()
return True
def on_itemRemoveOldEpisodes_activate(self, widget):
@ -3016,6 +3013,7 @@ class gPodder(BuilderWidget, dbus.service.Object):
self.update_podcast_list_model(select_url=select_url)
progress.on_finished()
@util.run_in_background
def thread_proc():
select_url = None
@ -3057,8 +3055,6 @@ class gPodder(BuilderWidget, dbus.service.Object):
# The remaining stuff is to be done in the GTK main thread
util.idle_add(finish_deletion, select_url)
threading.Thread(target=thread_proc).start()
def on_itemRemoveChannel_activate(self, widget, *args):
if self.active_channel is None:
title = _('No podcast selected')

View File

@ -35,7 +35,6 @@ from gpodder import util
from gpodder import coverart
import gtk
import threading
class CoverDownloader(ObservableService):
@ -72,8 +71,8 @@ class CoverDownloader(ObservableService):
when we have no cover on the local disk.
"""
logger.debug('cover download request for %s', channel.url)
args = [channel, custom_url, True, avoid_downloading]
threading.Thread(target=self.__get_cover, args=args).start()
util.run_in_background(lambda: self.__get_cover(channel,
custom_url, True, avoid_downloading))
def get_cover(self, channel, custom_url=None, avoid_downloading=False):
"""

View File

@ -32,7 +32,6 @@ import datetime
import calendar
import os
import sys
import threading
import time
import logging
@ -437,9 +436,7 @@ class MygPoClient(object):
logger.debug('Flushing NOW.')
else:
logger.debug('Flush requested.')
self._worker_thread = threading.Thread(target=self._worker_proc, args=[now])
self._worker_thread.setDaemon(True)
self._worker_thread.start()
self._worker_thread = util.run_in_background(lambda: self._worker_proc(now), True)
else:
logger.debug('Flush requested, already waiting.')

View File

@ -27,7 +27,6 @@ from PySide.QtCore import QAbstractListModel, QModelIndex
from PySide.QtDeclarative import QDeclarativeView
import os
import threading
import signal
import functools
import subprocess
@ -262,8 +261,7 @@ class Controller(QObject):
finally:
self.root.end_progress()
t = threading.Thread(target=upload_proc, args=[self])
t.start()
util.run_in_background(lambda: upload_proc(self))
@Slot()
def saveMyGpoSettings(self):
@ -374,8 +372,7 @@ class Controller(QObject):
finally:
self.root.end_progress()
t = threading.Thread(target=merge_proc, args=[self])
t.start()
util.run_in_background(lambda: merge_proc(self))
for podcast in self.root.podcast_model.get_objects():
podcast.qupdate(finished_callback=self.update_subset_stats)
@ -576,8 +573,7 @@ class Controller(QObject):
finally:
self.root.end_progress()
t = threading.Thread(target=subscribe_proc, args=[self, urls])
t.start()
util.run_in_background(lambda: subscribe_proc(self, urls))
@Slot()
def currentEpisodeChanging(self):

View File

@ -35,7 +35,6 @@ from gpodder import query
from gpodder import model
from gpodder import coverart
import threading
import os
convert = util.convert_bytes
@ -177,9 +176,7 @@ class QEpisode(QObject):
self._wrapper_manager.remove_active_episode(self)
thread = threading.Thread(target=t, args=[self])
thread.setDaemon(True)
thread.start()
util.run_in_background(lambda: t(self), True)
def _description(self):
return convert(self._episode.description_html)
@ -267,7 +264,7 @@ class QPodcast(QObject):
if finished_callback is not None:
finished_callback()
threading.Thread(target=t, args=[self]).start()
util.run_in_background(lambda: t(self))
changed = Signal()

View File

@ -1249,7 +1249,7 @@ def open_website(url):
'open_new_window', \
(url,))
else:
threading.Thread(target=webbrowser.open, args=(url,)).start()
run_in_background(lambda: webbrowser.open(url))
def convert_bytes(d):
"""
@ -1620,3 +1620,11 @@ def get_update_info(url='http://gpodder.org/downloads'):
return up_to_date, latest_version, release_date, days_since_release
def run_in_background(function, daemon=False):
logger.debug('run_in_background: %s (%s)', function, str(daemon))
thread = threading.Thread(target=function)
thread.setDaemon(daemon)
thread.start()
return thread