ZeroNet/src/Worker/WorkerManager.py

601 lines
26 KiB
Python
Raw Normal View History

import time
import logging
import collections
import gevent
2019-03-15 21:06:59 +01:00
from .Worker import Worker
2019-11-25 14:43:28 +01:00
from .WorkerTaskManager import WorkerTaskManager
2016-08-15 15:59:49 +02:00
from Config import config
from util import helper
from Plugin import PluginManager
from Debug.DebugLock import DebugLock
import util
@PluginManager.acceptPlugins
class WorkerManager(object):
def __init__(self, site):
self.site = site
self.workers = {} # Key: ip:port, Value: Worker.Worker
2019-11-25 14:43:28 +01:00
self.tasks = WorkerTaskManager()
self.next_task_id = 1
2019-12-31 12:50:39 +01:00
self.lock_add_task = DebugLock(name="Lock AddTask:%s" % self.site.address_short)
2019-11-25 14:43:28 +01:00
# {"id": 1, "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "optional_hash_id": None,
# "time_started": None, "time_added": time.time(), "peers": peers, "priority": 0, "failed": peer_ids, "lock": None or gevent.lock.RLock}
self.started_task_num = 0 # Last added task num
2016-11-07 23:32:20 +01:00
self.asked_peers = []
self.running = True
self.time_task_added = 0
self.log = logging.getLogger("WorkerManager:%s" % self.site.address_short)
self.site.greenlet_manager.spawn(self.checkTasks)
def __str__(self):
return "WorkerManager %s" % self.site.address_short
def __repr__(self):
return "<%s>" % self.__str__()
# Check expired tasks
def checkTasks(self):
while self.running:
tasks = task = worker = workers = None # Cleanup local variables
2017-10-04 17:25:56 +02:00
announced = False
time.sleep(15) # Check every 15 sec
# Clean up workers
2019-03-15 21:06:59 +01:00
for worker in list(self.workers.values()):
if worker.task and worker.task["done"]:
2019-12-21 02:57:25 +01:00
worker.skip(reason="Task done") # Stop workers with task done
if not self.tasks:
continue
tasks = self.tasks[:] # Copy it so removing elements wont cause any problem
num_tasks_started = len([task for task in tasks if task["time_started"]])
self.log.debug(
"Tasks: %s, started: %s, bad files: %s, total started: %s" %
(len(tasks), num_tasks_started, len(self.site.bad_files), self.started_task_num)
)
for task in tasks:
if task["time_started"] and time.time() >= task["time_started"] + 60:
2016-08-15 15:59:49 +02:00
self.log.debug("Timeout, Skipping: %s" % task) # Task taking too long time, skip it
# Skip to next file workers
workers = self.findWorkers(task)
if workers:
for worker in workers:
2019-12-21 02:57:25 +01:00
worker.skip(reason="Task timeout")
else:
2019-12-21 02:57:25 +01:00
self.failTask(task, reason="No workers")
2018-03-18 21:27:26 +01:00
elif time.time() >= task["time_added"] + 60 and not self.workers: # No workers left
2019-12-21 02:57:25 +01:00
self.failTask(task, reason="Timeout")
elif (task["time_started"] and time.time() >= task["time_started"] + 15) or not self.workers:
# Find more workers: Task started more than 15 sec ago or no workers
workers = self.findWorkers(task)
self.log.debug(
"Slow task: %s, (workers: %s, optional_hash_id: %s, peers: %s, failed: %s, asked: %s)" %
2016-11-07 23:32:20 +01:00
(
task["inner_path"], len(workers), task["optional_hash_id"],
2016-11-07 23:32:20 +01:00
len(task["peers"] or []), len(task["failed"]), len(self.asked_peers)
)
)
if not announced and task["site"].isAddedRecently():
2017-10-04 17:25:56 +02:00
task["site"].announce(mode="more") # Find more peers
announced = True
if task["optional_hash_id"]:
if self.workers:
if not task["time_started"]:
ask_limit = 20
else:
2017-10-03 15:56:58 +02:00
ask_limit = max(10, time.time() - task["time_started"])
if len(self.asked_peers) < ask_limit and len(task["peers"] or []) <= len(task["failed"]) * 2:
# Re-search for high priority
self.startFindOptional(find_more=True)
2017-10-04 17:26:21 +02:00
if task["peers"]:
peers_try = [peer for peer in task["peers"] if peer not in task["failed"] and peer not in workers]
if peers_try:
2019-04-04 13:27:06 +02:00
self.startWorkers(peers_try, force_num=5, reason="Task checker (optional, has peers)")
else:
self.startFindOptional(find_more=True)
else:
2017-10-03 15:56:58 +02:00
self.startFindOptional(find_more=True)
else:
if task["peers"]: # Release the peer lock
self.log.debug("Task peer lock release: %s" % task["inner_path"])
task["peers"] = []
2019-04-04 13:27:06 +02:00
self.startWorkers(reason="Task checker")
if len(self.tasks) > len(self.workers) * 2 and len(self.workers) < self.getMaxWorkers():
2019-04-04 13:27:06 +02:00
self.startWorkers(reason="Task checker (need more workers)")
self.log.debug("checkTasks stopped running")
# Returns the next free or less worked task
def getTask(self, peer):
for task in self.tasks: # Find a task
if task["peers"] and peer not in task["peers"]:
continue # This peer not allowed to pick this task
if peer in task["failed"]:
continue # Peer already tried to solve this, but failed
if task["optional_hash_id"] and task["peers"] is None:
continue # No peers found yet for the optional task
2019-11-30 02:04:59 +01:00
if task["done"]:
continue
return task
def removeSolvedFileTasks(self, mark_as_good=True):
for task in self.tasks[:]:
if task["inner_path"] not in self.site.bad_files:
self.log.debug("No longer in bad_files, marking as %s: %s" % (mark_as_good, task["inner_path"]))
task["done"] = True
task["evt"].set(mark_as_good)
self.tasks.remove(task)
if not self.tasks:
self.started_task_num = 0
self.site.updateWebsocket()
# New peers added to site
def onPeers(self):
2019-04-04 13:27:06 +02:00
self.startWorkers(reason="More peers found")
2016-03-19 18:07:14 +01:00
def getMaxWorkers(self):
2017-04-13 16:27:05 +02:00
if len(self.tasks) > 50:
2017-02-27 00:02:56 +01:00
return config.workers * 3
2016-03-19 18:07:14 +01:00
else:
2017-02-27 00:02:56 +01:00
return config.workers
2016-03-19 18:07:14 +01:00
# Add new worker
2017-10-04 17:26:43 +02:00
def addWorker(self, peer, multiplexing=False, force=False):
key = peer.key
2017-10-04 17:26:43 +02:00
if len(self.workers) > self.getMaxWorkers() and not force:
return False
if multiplexing: # Add even if we already have worker for this peer
key = "%s/%s" % (key, len(self.workers))
if key not in self.workers:
# We dont have worker for that peer and workers num less than max
task = self.getTask(peer)
if task:
worker = Worker(self, peer)
self.workers[key] = worker
worker.key = key
worker.start()
return worker
else:
return False
else: # We have worker for this peer or its over the limit
return False
2017-10-03 15:55:42 +02:00
def taskAddPeer(self, task, peer):
if task["peers"] is None:
task["peers"] = []
if peer in task["failed"]:
return False
if peer not in task["peers"]:
task["peers"].append(peer)
return True
# Start workers to process tasks
2019-04-04 13:27:06 +02:00
def startWorkers(self, peers=None, force_num=0, reason="Unknown"):
if not self.tasks:
return False # No task for workers
2019-04-04 13:27:06 +02:00
max_workers = min(self.getMaxWorkers(), len(self.site.peers))
if len(self.workers) >= max_workers and not peers:
2016-03-21 09:43:53 +01:00
return False # Workers number already maxed and no starting peers defined
2018-02-12 17:06:36 +01:00
self.log.debug(
2019-04-04 13:27:06 +02:00
"Starting workers (%s), tasks: %s, peers: %s, workers: %s" %
(reason, len(self.tasks), len(peers or []), len(self.workers))
2018-02-12 17:06:36 +01:00
)
if not peers:
2016-04-25 02:25:28 +02:00
peers = self.site.getConnectedPeers()
2019-04-04 13:27:06 +02:00
if len(peers) < max_workers:
peers += self.site.getRecentPeers(max_workers * 2)
if type(peers) is set:
peers = list(peers)
# Sort by ping
peers.sort(key=lambda peer: peer.connection.last_ping_delay if peer.connection and peer.connection.last_ping_delay and len(peer.connection.waiting_requests) == 0 and peer.connection.connected else 9999)
for peer in peers: # One worker for every peer
if peers and peer not in peers:
2016-03-21 09:43:53 +01:00
continue # If peers defined and peer not valid
2017-10-04 17:26:43 +02:00
if force_num:
worker = self.addWorker(peer, force=True)
force_num -= 1
else:
worker = self.addWorker(peer)
if worker:
2019-11-25 14:43:28 +01:00
self.log.debug("Added worker: %s (rep: %s), workers: %s/%s" % (peer.key, peer.reputation, len(self.workers), max_workers))
# Find peers for optional hash in local hash tables and add to task peers
def findOptionalTasks(self, optional_tasks, reset_task=False):
found = collections.defaultdict(list) # { found_hash: [peer1, peer2...], ...}
2019-03-15 21:06:59 +01:00
for peer in list(self.site.peers.values()):
2016-11-07 23:25:25 +01:00
if not peer.has_hashfield:
continue
2016-11-07 23:25:25 +01:00
hashfield_set = set(peer.hashfield) # Finding in set is much faster
for task in optional_tasks:
optional_hash_id = task["optional_hash_id"]
2016-11-07 23:25:25 +01:00
if optional_hash_id in hashfield_set:
if reset_task and len(task["failed"]) > 0:
task["failed"] = []
if peer in task["failed"]:
continue
2017-10-03 15:55:42 +02:00
if self.taskAddPeer(task, peer):
found[optional_hash_id].append(peer)
return found
# Find peers for optional hash ids in local hash tables
2016-11-07 23:24:50 +01:00
def findOptionalHashIds(self, optional_hash_ids, limit=0):
found = collections.defaultdict(list) # { found_hash_id: [peer1, peer2...], ...}
2019-03-15 21:06:59 +01:00
for peer in list(self.site.peers.values()):
2016-11-07 23:24:50 +01:00
if not peer.has_hashfield:
continue
2016-11-07 23:24:50 +01:00
hashfield_set = set(peer.hashfield) # Finding in set is much faster
for optional_hash_id in optional_hash_ids:
2016-11-07 23:24:50 +01:00
if optional_hash_id in hashfield_set:
found[optional_hash_id].append(peer)
2016-11-07 23:24:50 +01:00
if limit and len(found[optional_hash_id]) >= limit:
optional_hash_ids.remove(optional_hash_id)
return found
# Add peers to tasks from found result
def addOptionalPeers(self, found_ips):
found = collections.defaultdict(list)
2019-03-15 21:06:59 +01:00
for hash_id, peer_ips in found_ips.items():
task = [task for task in self.tasks if task["optional_hash_id"] == hash_id]
if task: # Found task, lets take the first
task = task[0]
else:
continue
for peer_ip in peer_ips:
2018-02-08 17:57:26 +01:00
peer = self.site.addPeer(peer_ip[0], peer_ip[1], return_peer=True, source="optional")
if not peer:
continue
2017-10-03 15:55:42 +02:00
if self.taskAddPeer(task, peer):
found[hash_id].append(peer)
if peer.hashfield.appendHashId(hash_id): # Peer has this file
peer.time_hashfield = None # Peer hashfield probably outdated
return found
# Start find peers for optional files
@util.Noparallel(blocking=False, ignore_args=True)
def startFindOptional(self, reset_task=False, find_more=False, high_priority=False):
# Wait for more file requests
if len(self.tasks) < 20 or high_priority:
time.sleep(0.01)
2017-10-03 15:53:00 +02:00
elif len(self.tasks) > 90:
time.sleep(5)
else:
time.sleep(0.5)
optional_tasks = [task for task in self.tasks if task["optional_hash_id"]]
if not optional_tasks:
return False
optional_hash_ids = set([task["optional_hash_id"] for task in optional_tasks])
time_tasks = self.time_task_added
self.log.debug(
"Finding peers for optional files: %s (reset_task: %s, find_more: %s)" %
(optional_hash_ids, reset_task, find_more)
)
found = self.findOptionalTasks(optional_tasks, reset_task=reset_task)
if found:
2019-03-15 21:06:59 +01:00
found_peers = set([peer for peers in list(found.values()) for peer in peers])
2019-04-04 13:27:06 +02:00
self.startWorkers(found_peers, force_num=3, reason="Optional found in local peers")
2019-03-15 21:06:59 +01:00
if len(found) < len(optional_hash_ids) or find_more or (high_priority and any(len(peers) < 10 for peers in found.values())):
self.log.debug("No local result for optional files: %s" % (optional_hash_ids - set(found)))
# Query hashfield from connected peers
threads = []
peers = self.site.getConnectedPeers()
if not peers:
peers = self.site.getConnectablePeers()
for peer in peers:
threads.append(self.site.greenlet_manager.spawn(peer.updateHashfield, force=find_more))
gevent.joinall(threads, timeout=5)
if time_tasks != self.time_task_added: # New task added since start
optional_tasks = [task for task in self.tasks if task["optional_hash_id"]]
optional_hash_ids = set([task["optional_hash_id"] for task in optional_tasks])
found = self.findOptionalTasks(optional_tasks)
2016-08-15 15:59:49 +02:00
self.log.debug("Found optional files after query hashtable connected peers: %s/%s" % (
len(found), len(optional_hash_ids)
))
if found:
2019-03-15 21:06:59 +01:00
found_peers = set([peer for hash_id_peers in list(found.values()) for peer in hash_id_peers])
2019-04-04 13:27:06 +02:00
self.startWorkers(found_peers, force_num=3, reason="Optional found in connected peers")
if len(found) < len(optional_hash_ids) or find_more:
2018-02-12 17:06:36 +01:00
self.log.debug(
"No connected hashtable result for optional files: %s (asked: %s)" %
(optional_hash_ids - set(found), len(self.asked_peers))
)
if not self.tasks:
self.log.debug("No tasks, stopping finding optional peers")
return
# Try to query connected peers
threads = []
2018-02-12 17:06:18 +01:00
peers = [peer for peer in self.site.getConnectedPeers() if peer.key not in self.asked_peers][0:10]
if not peers:
peers = self.site.getConnectablePeers(ignore=self.asked_peers)
for peer in peers:
threads.append(self.site.greenlet_manager.spawn(peer.findHashIds, list(optional_hash_ids)))
self.asked_peers.append(peer.key)
for i in range(5):
time.sleep(1)
thread_values = [thread.value for thread in threads if thread.value]
if not thread_values:
continue
found_ips = helper.mergeDicts(thread_values)
found = self.addOptionalPeers(found_ips)
self.log.debug("Found optional files after findhash connected peers: %s/%s (asked: %s)" % (
len(found), len(optional_hash_ids), len(threads)
2016-08-15 15:59:49 +02:00
))
if found:
2019-03-15 21:06:59 +01:00
found_peers = set([peer for hash_id_peers in list(found.values()) for peer in hash_id_peers])
2019-04-04 13:27:06 +02:00
self.startWorkers(found_peers, force_num=3, reason="Optional found by findhash connected peers")
if len(thread_values) == len(threads):
# Got result from all started thread
break
if len(found) < len(optional_hash_ids):
2018-02-12 17:06:36 +01:00
self.log.debug(
"No findHash result, try random peers: %s (asked: %s)" %
(optional_hash_ids - set(found), len(self.asked_peers))
)
# Try to query random peers
if time_tasks != self.time_task_added: # New task added since start
optional_tasks = [task for task in self.tasks if task["optional_hash_id"]]
optional_hash_ids = set([task["optional_hash_id"] for task in optional_tasks])
threads = []
peers = self.site.getConnectablePeers(ignore=self.asked_peers)
for peer in peers:
threads.append(self.site.greenlet_manager.spawn(peer.findHashIds, list(optional_hash_ids)))
self.asked_peers.append(peer.key)
gevent.joinall(threads, timeout=15)
found_ips = helper.mergeDicts([thread.value for thread in threads if thread.value])
found = self.addOptionalPeers(found_ips)
self.log.debug("Found optional files after findhash random peers: %s/%s" % (len(found), len(optional_hash_ids)))
if found:
2019-03-15 21:06:59 +01:00
found_peers = set([peer for hash_id_peers in list(found.values()) for peer in hash_id_peers])
2019-04-04 13:27:06 +02:00
self.startWorkers(found_peers, force_num=3, reason="Option found using findhash random peers")
if len(found) < len(optional_hash_ids):
self.log.debug("No findhash result for optional files: %s" % (optional_hash_ids - set(found)))
if time_tasks != self.time_task_added: # New task added since start
self.log.debug("New task since start, restarting...")
self.site.greenlet_manager.spawnLater(0.1, self.startFindOptional)
else:
self.log.debug("startFindOptional ended")
# Stop all worker
def stopWorkers(self):
num = 0
2019-03-15 21:06:59 +01:00
for worker in list(self.workers.values()):
2019-12-21 02:57:25 +01:00
worker.stop(reason="Stopping all workers")
num += 1
tasks = self.tasks[:] # Copy
for task in tasks: # Mark all current task as failed
2019-12-21 02:57:25 +01:00
self.failTask(task, reason="Stopping all workers")
return num
# Find workers by task
def findWorkers(self, task):
workers = []
2019-03-15 21:06:59 +01:00
for worker in list(self.workers.values()):
if worker.task == task:
workers.append(worker)
return workers
# Ends and remove a worker
def removeWorker(self, worker):
worker.running = False
if worker.key in self.workers:
del(self.workers[worker.key])
2016-03-19 18:07:14 +01:00
self.log.debug("Removed worker, workers: %s/%s" % (len(self.workers), self.getMaxWorkers()))
if len(self.workers) <= self.getMaxWorkers() / 3 and len(self.asked_peers) < 10:
optional_task = next((task for task in self.tasks if task["optional_hash_id"]), None)
if optional_task:
if len(self.workers) == 0:
self.startFindOptional(find_more=True)
else:
self.startFindOptional()
elif self.tasks and not self.workers and worker.task and len(worker.task["failed"]) < 20:
self.log.debug("Starting new workers... (tasks: %s)" % len(self.tasks))
2019-04-04 13:27:06 +02:00
self.startWorkers(reason="Removed worker")
# Tasks sorted by this
def getPriorityBoost(self, inner_path):
if inner_path == "content.json":
return 9999 # Content.json always priority
if inner_path == "index.html":
return 9998 # index.html also important
if "-default" in inner_path:
return -4 # Default files are cloning not important
elif inner_path.endswith("all.css"):
2017-10-04 17:27:01 +02:00
return 14 # boost css files priority
elif inner_path.endswith("all.js"):
2017-10-04 17:27:01 +02:00
return 13 # boost js files priority
elif inner_path.endswith("dbschema.json"):
2017-10-04 17:27:01 +02:00
return 12 # boost database specification
elif inner_path.endswith("content.json"):
return 1 # boost included content.json files priority a bit
elif inner_path.endswith(".json"):
if len(inner_path) < 50: # Boost non-user json files
2017-10-04 17:27:01 +02:00
return 11
else:
return 2
return 0
2019-12-31 12:51:52 +01:00
def addTaskUpdate(self, task, peer, priority=0):
if priority > task["priority"]:
self.tasks.updateItem(task, "priority", priority)
if peer and task["peers"]: # This peer also has new version, add it to task possible peers
task["peers"].append(peer)
self.log.debug("Added peer %s to %s" % (peer.key, task["inner_path"]))
self.startWorkers([peer], reason="Added new task (update received by peer)")
elif peer and peer in task["failed"]:
task["failed"].remove(peer) # New update arrived, remove the peer from failed peers
self.log.debug("Removed peer %s from failed %s" % (peer.key, task["inner_path"]))
self.startWorkers([peer], reason="Added new task (peer failed before)")
def addTaskCreate(self, inner_path, peer, priority=0, file_info=None):
evt = gevent.event.AsyncResult()
if peer:
peers = [peer] # Only download from this peer
else:
peers = None
if not file_info:
file_info = self.site.content_manager.getFileInfo(inner_path)
if file_info and file_info["optional"]:
optional_hash_id = helper.toHashId(file_info["sha512"])
else:
optional_hash_id = None
if file_info:
size = file_info.get("size", 0)
else:
size = 0
self.lock_add_task.acquire()
2019-12-31 12:51:52 +01:00
# Check again if we have task for this file
2019-11-25 14:43:28 +01:00
task = self.tasks.findTask(inner_path)
2019-12-31 12:51:52 +01:00
if task:
self.addTaskUpdate(task, peer, priority)
return task
2019-12-31 12:51:52 +01:00
priority += self.getPriorityBoost(inner_path)
2019-12-31 12:51:52 +01:00
if self.started_task_num == 0: # Boost priority for first requested file
priority += 1
2019-12-31 12:51:52 +01:00
task = {
"id": self.next_task_id, "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False,
"optional_hash_id": optional_hash_id, "time_added": time.time(), "time_started": None, "lock": None,
"time_action": None, "peers": peers, "priority": priority, "failed": [], "size": size
}
2019-12-31 12:51:52 +01:00
self.tasks.append(task)
self.lock_add_task.release()
2019-03-15 23:26:33 +01:00
2019-12-31 12:51:52 +01:00
self.next_task_id += 1
self.started_task_num += 1
if config.verbose:
self.log.debug(
"New task: %s, peer lock: %s, priority: %s, optional_hash_id: %s, tasks started: %s" %
(task["inner_path"], peers, priority, optional_hash_id, self.started_task_num)
)
2019-12-31 12:51:52 +01:00
self.time_task_added = time.time()
2016-11-07 23:32:20 +01:00
2019-12-31 12:51:52 +01:00
if optional_hash_id:
if self.asked_peers:
del self.asked_peers[:] # Reset asked peers
self.startFindOptional(high_priority=priority > 0)
2016-11-07 23:32:20 +01:00
2019-12-31 12:51:52 +01:00
if peers:
self.startWorkers(peers, reason="Added new optional task")
else:
self.startWorkers(peers, reason="Added new task")
return task
# Create new task and return asyncresult
def addTask(self, inner_path, peer=None, priority=0, file_info=None):
self.site.onFileStart(inner_path) # First task, trigger site download started
task = self.tasks.findTask(inner_path)
if task: # Already has task for that file
self.addTaskUpdate(task, peer, priority)
else: # No task for that file yet
task = self.addTaskCreate(inner_path, peer, priority, file_info)
return task
2019-11-25 14:43:28 +01:00
def addTaskWorker(self, task, worker):
try:
2019-11-25 14:43:28 +01:00
self.tasks.updateItem(task, "workers_num", task["workers_num"] + 1)
except ValueError:
2019-11-25 14:43:28 +01:00
task["workers_num"] += 1
def removeTaskWorker(self, task, worker):
try:
2019-11-25 14:43:28 +01:00
self.tasks.updateItem(task, "workers_num", task["workers_num"] - 1)
except ValueError:
2019-11-25 14:43:28 +01:00
task["workers_num"] -= 1
2019-12-21 02:57:53 +01:00
if len(task["failed"]) >= len(self.workers):
fail_reason = "Too many fails: %s (workers: %s)" % (len(task["failed"]), len(self.workers))
self.failTask(task, reason=fail_reason)
# Wait for other tasks
def checkComplete(self):
time.sleep(0.1)
if not self.tasks:
2018-03-18 21:27:26 +01:00
self.log.debug("Check complete: No tasks")
2016-11-07 23:47:53 +01:00
self.onComplete()
def onComplete(self):
self.started_task_num = 0
del self.asked_peers[:]
self.site.onComplete() # No more task trigger site complete
# Mark a task done
def doneTask(self, task):
task["done"] = True
self.tasks.remove(task) # Remove from queue
if task["optional_hash_id"]:
2018-02-12 17:06:36 +01:00
self.log.debug(
"Downloaded optional file in %.3fs, adding to hashfield: %s" %
(time.time() - task["time_started"], task["inner_path"])
)
self.site.content_manager.optionalDownloaded(task["inner_path"], task["optional_hash_id"], task["size"])
self.site.onFileDone(task["inner_path"])
task["evt"].set(True)
if not self.tasks:
self.site.greenlet_manager.spawn(self.checkComplete)
# Mark a task failed
2019-12-21 02:58:48 +01:00
def failTask(self, task, reason="Unknown"):
2019-12-31 12:54:45 +01:00
try:
self.tasks.remove(task) # Remove from queue
2019-12-31 12:54:45 +01:00
except ValueError as err:
return False
self.log.debug("Task %s failed (Reason: %s)" % (task["inner_path"], reason))
task["done"] = True
self.site.onFileFail(task["inner_path"])
task["evt"].set(False)
if not self.tasks:
self.site.greenlet_manager.spawn(self.checkComplete)