2015-07-12 20:36:46 +02:00
|
|
|
import time
|
|
|
|
import logging
|
|
|
|
import random
|
2015-10-22 11:42:55 +02:00
|
|
|
import collections
|
2015-03-06 02:31:51 +01:00
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
import gevent
|
2015-03-06 02:31:51 +01:00
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
from Worker import Worker
|
2016-08-15 15:59:49 +02:00
|
|
|
from Config import config
|
2015-10-22 11:42:55 +02:00
|
|
|
from util import helper
|
2016-10-02 14:18:35 +02:00
|
|
|
from Plugin import PluginManager
|
2015-10-22 11:42:55 +02:00
|
|
|
import util
|
2015-03-06 02:31:51 +01:00
|
|
|
|
2016-11-07 23:27:42 +01:00
|
|
|
|
2016-10-02 14:18:35 +02:00
|
|
|
@PluginManager.acceptPlugins
|
|
|
|
class WorkerManager(object):
|
2015-01-12 02:03:45 +01:00
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
def __init__(self, site):
|
|
|
|
self.site = site
|
|
|
|
self.workers = {} # Key: ip:port, Value: Worker.Worker
|
|
|
|
self.tasks = []
|
2015-10-22 11:42:55 +02:00
|
|
|
# {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "optional_hash_id": None,
|
2015-07-12 20:36:46 +02:00
|
|
|
# "time_started": None, "time_added": time.time(), "peers": peers, "priority": 0, "failed": peer_ids}
|
|
|
|
self.started_task_num = 0 # Last added task num
|
2016-11-07 23:32:20 +01:00
|
|
|
self.asked_peers = []
|
2015-07-12 20:36:46 +02:00
|
|
|
self.running = True
|
2016-11-07 23:27:42 +01:00
|
|
|
self.time_task_added = 0
|
2015-07-12 20:36:46 +02:00
|
|
|
self.log = logging.getLogger("WorkerManager:%s" % self.site.address_short)
|
|
|
|
self.process_taskchecker = gevent.spawn(self.checkTasks)
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
return "WorkerManager %s" % self.site.address_short
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return "<%s>" % self.__str__()
|
|
|
|
|
|
|
|
# Check expired tasks
|
|
|
|
def checkTasks(self):
|
|
|
|
while self.running:
|
|
|
|
tasks = task = worker = workers = None # Cleanup local variables
|
2017-10-04 17:25:56 +02:00
|
|
|
announced = False
|
2015-07-12 20:36:46 +02:00
|
|
|
time.sleep(15) # Check every 15 sec
|
|
|
|
|
|
|
|
# Clean up workers
|
|
|
|
for worker in self.workers.values():
|
|
|
|
if worker.task and worker.task["done"]:
|
|
|
|
worker.skip() # Stop workers with task done
|
|
|
|
|
|
|
|
if not self.tasks:
|
|
|
|
continue
|
|
|
|
|
|
|
|
tasks = self.tasks[:] # Copy it so removing elements wont cause any problem
|
|
|
|
for task in tasks:
|
2017-10-03 15:53:59 +02:00
|
|
|
if task["time_started"] and time.time() >= task["time_started"] + 60:
|
2016-08-15 15:59:49 +02:00
|
|
|
self.log.debug("Timeout, Skipping: %s" % task) # Task taking too long time, skip it
|
2015-07-12 20:36:46 +02:00
|
|
|
# Skip to next file workers
|
|
|
|
workers = self.findWorkers(task)
|
|
|
|
if workers:
|
|
|
|
for worker in workers:
|
|
|
|
worker.skip()
|
|
|
|
else:
|
|
|
|
self.failTask(task)
|
2017-10-03 15:53:59 +02:00
|
|
|
elif time.time() >= task["time_added"] + 60 and not self.workers: # No workers left
|
2015-07-12 20:36:46 +02:00
|
|
|
self.log.debug("Timeout, Cleanup task: %s" % task)
|
|
|
|
# Remove task
|
|
|
|
self.failTask(task)
|
|
|
|
|
|
|
|
elif (task["time_started"] and time.time() >= task["time_started"] + 15) or not self.workers:
|
Rev900, Sidebar filestats bar width round fix, Sidebar WebGL not supported error, Sidebar optimalizations, Trayicon gray shadow, Trim end of line whitespace from json files, Fix testweb testcase, Implement experimental postMessage nonce security, Return None when testing external ip, Window opener security check and message, Increase timeout for large files
2016-02-10 02:30:04 +01:00
|
|
|
# Find more workers: Task started more than 15 sec ago or no workers
|
2015-10-22 11:42:55 +02:00
|
|
|
workers = self.findWorkers(task)
|
|
|
|
self.log.debug(
|
2017-10-03 15:53:59 +02:00
|
|
|
"Slow task: %s, (workers: %s, optional_hash_id: %s, peers: %s, failed: %s, asked: %s)" %
|
2016-11-07 23:32:20 +01:00
|
|
|
(
|
2017-10-03 15:53:59 +02:00
|
|
|
task["inner_path"], len(workers), task["optional_hash_id"],
|
2016-11-07 23:32:20 +01:00
|
|
|
len(task["peers"] or []), len(task["failed"]), len(self.asked_peers)
|
|
|
|
)
|
2015-10-22 11:42:55 +02:00
|
|
|
)
|
2017-10-04 17:25:56 +02:00
|
|
|
if not announced:
|
|
|
|
task["site"].announce(mode="more") # Find more peers
|
|
|
|
announced = True
|
2015-10-22 11:42:55 +02:00
|
|
|
if task["optional_hash_id"]:
|
2017-04-01 22:29:49 +02:00
|
|
|
if self.workers:
|
|
|
|
if not task["time_started"]:
|
|
|
|
ask_limit = 20
|
|
|
|
else:
|
2017-10-03 15:56:58 +02:00
|
|
|
ask_limit = max(10, time.time() - task["time_started"])
|
2017-04-01 22:29:49 +02:00
|
|
|
if len(self.asked_peers) < ask_limit and len(task["peers"] or []) <= len(task["failed"]) * 2:
|
|
|
|
# Re-search for high priority
|
|
|
|
self.startFindOptional(find_more=True)
|
2017-10-04 17:26:21 +02:00
|
|
|
if task["peers"]:
|
2018-02-12 17:05:58 +01:00
|
|
|
peers_try = [peer for peer in task["peers"] if peer not in task["failed"] and peer not in workers]
|
2017-04-01 22:29:49 +02:00
|
|
|
if peers_try:
|
2017-10-04 17:26:43 +02:00
|
|
|
self.startWorkers(peers_try, force_num=5)
|
2017-10-13 01:23:09 +02:00
|
|
|
else:
|
|
|
|
self.startFindOptional(find_more=True)
|
|
|
|
else:
|
2017-10-03 15:56:58 +02:00
|
|
|
self.startFindOptional(find_more=True)
|
2015-10-22 11:42:55 +02:00
|
|
|
else:
|
|
|
|
if task["peers"]: # Release the peer lock
|
|
|
|
self.log.debug("Task peer lock release: %s" % task["inner_path"])
|
|
|
|
task["peers"] = []
|
2017-10-26 10:40:41 +02:00
|
|
|
self.startWorkers()
|
2015-07-12 20:36:46 +02:00
|
|
|
|
2017-06-19 15:36:07 +02:00
|
|
|
if len(self.tasks) > len(self.workers) * 2 and len(self.workers) < self.getMaxWorkers():
|
|
|
|
self.startWorkers()
|
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
self.log.debug("checkTasks stopped running")
|
|
|
|
|
|
|
|
# Returns the next free or less worked task
|
|
|
|
def getTask(self, peer):
|
2016-08-15 15:59:49 +02:00
|
|
|
# Sort tasks by priority and worker numbers
|
2017-10-03 15:58:01 +02:00
|
|
|
self.tasks.sort(key=lambda task: task["priority"] - task["workers_num"] * 10, reverse=True)
|
2016-08-15 15:59:49 +02:00
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
for task in self.tasks: # Find a task
|
|
|
|
if task["peers"] and peer not in task["peers"]:
|
|
|
|
continue # This peer not allowed to pick this task
|
|
|
|
if peer in task["failed"]:
|
|
|
|
continue # Peer already tried to solve this, but failed
|
2015-10-22 11:42:55 +02:00
|
|
|
if task["optional_hash_id"] and task["peers"] is None:
|
|
|
|
continue # No peers found yet for the optional task
|
2015-07-12 20:36:46 +02:00
|
|
|
return task
|
|
|
|
|
Rev571, Optional file sizes to sidebar, Download all optional files option in sidebar, Optional file number in peer stats, Delete removed or changed optional files, Auto download optional files if autodownloadoptional checked, SiteReload command, Peer use global file server if no site defined, Allow browser cache video files, Allow more keepalive connections, Gevent 1.1 ranged request bugfix, Dont sent optional files details on websocket, Remove files from workermanager tasks if no longer in bad_files, Notify local client about changes on external siteSign
2015-11-09 00:44:03 +01:00
|
|
|
def removeGoodFileTasks(self):
|
|
|
|
for task in self.tasks[:]:
|
|
|
|
if task["inner_path"] not in self.site.bad_files:
|
|
|
|
self.log.debug("No longer in bad_files, marking as good: %s" % task["inner_path"])
|
|
|
|
task["done"] = True
|
|
|
|
task["evt"].set(True)
|
|
|
|
self.tasks.remove(task)
|
|
|
|
if not self.tasks:
|
|
|
|
self.started_task_num = 0
|
|
|
|
self.site.updateWebsocket()
|
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
# New peers added to site
|
|
|
|
def onPeers(self):
|
|
|
|
self.startWorkers()
|
|
|
|
|
2016-03-19 18:07:14 +01:00
|
|
|
def getMaxWorkers(self):
|
2017-04-13 16:27:05 +02:00
|
|
|
if len(self.tasks) > 50:
|
2017-02-27 00:02:56 +01:00
|
|
|
return config.workers * 3
|
2016-03-19 18:07:14 +01:00
|
|
|
else:
|
2017-02-27 00:02:56 +01:00
|
|
|
return config.workers
|
2016-03-19 18:07:14 +01:00
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
# Add new worker
|
2017-10-04 17:26:43 +02:00
|
|
|
def addWorker(self, peer, multiplexing=False, force=False):
|
2015-07-12 20:36:46 +02:00
|
|
|
key = peer.key
|
2017-10-04 17:26:43 +02:00
|
|
|
if len(self.workers) > self.getMaxWorkers() and not force:
|
2017-10-03 15:55:07 +02:00
|
|
|
return False
|
|
|
|
if multiplexing: # Add even if we already have worker for this peer
|
|
|
|
key = "%s/%s" % (key, len(self.workers))
|
|
|
|
if key not in self.workers:
|
2015-07-12 20:36:46 +02:00
|
|
|
# We dont have worker for that peer and workers num less than max
|
|
|
|
worker = Worker(self, peer)
|
|
|
|
self.workers[key] = worker
|
|
|
|
worker.key = key
|
|
|
|
worker.start()
|
|
|
|
return worker
|
|
|
|
else: # We have woker for this peer or its over the limit
|
|
|
|
return False
|
|
|
|
|
2017-10-03 15:55:42 +02:00
|
|
|
def taskAddPeer(self, task, peer):
|
|
|
|
if task["peers"] is None:
|
|
|
|
task["peers"] = []
|
|
|
|
if peer in task["failed"]:
|
|
|
|
return False
|
|
|
|
|
|
|
|
if peer not in task["peers"]:
|
|
|
|
task["peers"].append(peer)
|
|
|
|
return True
|
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
# Start workers to process tasks
|
2017-10-04 17:26:43 +02:00
|
|
|
def startWorkers(self, peers=None, force_num=0):
|
2015-07-12 20:36:46 +02:00
|
|
|
if not self.tasks:
|
|
|
|
return False # No task for workers
|
2016-03-19 18:07:14 +01:00
|
|
|
if len(self.workers) >= self.getMaxWorkers() and not peers:
|
2016-03-21 09:43:53 +01:00
|
|
|
return False # Workers number already maxed and no starting peers defined
|
2017-03-06 15:30:42 +01:00
|
|
|
self.log.debug("Starting workers, tasks: %s, peers: %s, workers: %s" % (len(self.tasks), len(peers or []), len(self.workers)))
|
2015-07-12 20:36:46 +02:00
|
|
|
if not peers:
|
2016-04-25 02:25:28 +02:00
|
|
|
peers = self.site.getConnectedPeers()
|
|
|
|
if len(peers) < self.getMaxWorkers():
|
2017-05-30 01:04:45 +02:00
|
|
|
peers += self.site.getRecentPeers(self.getMaxWorkers())
|
2015-11-08 12:33:13 +01:00
|
|
|
if type(peers) is set:
|
|
|
|
peers = list(peers)
|
|
|
|
|
2017-03-02 23:39:52 +01:00
|
|
|
# Sort by ping
|
2018-02-08 17:49:40 +01:00
|
|
|
peers.sort(key = lambda peer: peer.connection.last_ping_delay if peer.connection and len(peer.connection.waiting_requests) == 0 and peer.connection.connected else 9999)
|
2017-03-02 23:39:52 +01:00
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
for peer in peers: # One worker for every peer
|
|
|
|
if peers and peer not in peers:
|
2016-03-21 09:43:53 +01:00
|
|
|
continue # If peers defined and peer not valid
|
2017-10-04 17:26:43 +02:00
|
|
|
|
|
|
|
if force_num:
|
|
|
|
worker = self.addWorker(peer, force=True)
|
|
|
|
force_num -= 1
|
|
|
|
else:
|
|
|
|
worker = self.addWorker(peer)
|
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
if worker:
|
2016-03-19 18:07:14 +01:00
|
|
|
self.log.debug("Added worker: %s, workers: %s/%s" % (peer.key, len(self.workers), self.getMaxWorkers()))
|
2015-07-12 20:36:46 +02:00
|
|
|
|
2015-10-22 11:42:55 +02:00
|
|
|
# Find peers for optional hash in local hash tables and add to task peers
|
2016-03-23 13:05:48 +01:00
|
|
|
def findOptionalTasks(self, optional_tasks, reset_task=False):
|
2015-10-22 11:42:55 +02:00
|
|
|
found = collections.defaultdict(list) # { found_hash: [peer1, peer2...], ...}
|
|
|
|
|
|
|
|
for peer in self.site.peers.values():
|
2016-11-07 23:25:25 +01:00
|
|
|
if not peer.has_hashfield:
|
2015-10-22 11:42:55 +02:00
|
|
|
continue
|
|
|
|
|
2016-11-07 23:25:25 +01:00
|
|
|
hashfield_set = set(peer.hashfield) # Finding in set is much faster
|
2015-10-22 11:42:55 +02:00
|
|
|
for task in optional_tasks:
|
|
|
|
optional_hash_id = task["optional_hash_id"]
|
2016-11-07 23:25:25 +01:00
|
|
|
if optional_hash_id in hashfield_set:
|
2016-11-16 11:22:38 +01:00
|
|
|
if reset_task and len(task["failed"]) > 0:
|
|
|
|
task["failed"] = []
|
|
|
|
if peer in task["failed"]:
|
|
|
|
continue
|
2017-10-03 15:55:42 +02:00
|
|
|
if self.taskAddPeer(task, peer):
|
|
|
|
found[optional_hash_id].append(peer)
|
2015-10-22 11:42:55 +02:00
|
|
|
|
|
|
|
return found
|
|
|
|
|
|
|
|
# Find peers for optional hash ids in local hash tables
|
2016-11-07 23:24:50 +01:00
|
|
|
def findOptionalHashIds(self, optional_hash_ids, limit=0):
|
2015-10-22 11:42:55 +02:00
|
|
|
found = collections.defaultdict(list) # { found_hash_id: [peer1, peer2...], ...}
|
|
|
|
|
|
|
|
for peer in self.site.peers.values():
|
2016-11-07 23:24:50 +01:00
|
|
|
if not peer.has_hashfield:
|
2015-10-22 11:42:55 +02:00
|
|
|
continue
|
2016-11-07 23:24:50 +01:00
|
|
|
|
|
|
|
hashfield_set = set(peer.hashfield) # Finding in set is much faster
|
2015-10-22 11:42:55 +02:00
|
|
|
for optional_hash_id in optional_hash_ids:
|
2016-11-07 23:24:50 +01:00
|
|
|
if optional_hash_id in hashfield_set:
|
2015-10-22 11:42:55 +02:00
|
|
|
found[optional_hash_id].append(peer)
|
2016-11-07 23:24:50 +01:00
|
|
|
if limit and len(found[optional_hash_id]) >= limit:
|
|
|
|
optional_hash_ids.remove(optional_hash_id)
|
2015-10-22 11:42:55 +02:00
|
|
|
|
|
|
|
return found
|
|
|
|
|
|
|
|
# Add peers to tasks from found result
|
|
|
|
def addOptionalPeers(self, found_ips):
|
|
|
|
found = collections.defaultdict(list)
|
|
|
|
for hash_id, peer_ips in found_ips.iteritems():
|
|
|
|
task = [task for task in self.tasks if task["optional_hash_id"] == hash_id]
|
|
|
|
if task: # Found task, lets take the first
|
|
|
|
task = task[0]
|
2015-10-28 01:28:29 +01:00
|
|
|
else:
|
|
|
|
continue
|
2015-10-22 11:42:55 +02:00
|
|
|
for peer_ip in peer_ips:
|
2018-02-08 17:57:26 +01:00
|
|
|
peer = self.site.addPeer(peer_ip[0], peer_ip[1], return_peer=True, source="optional")
|
2015-10-22 11:42:55 +02:00
|
|
|
if not peer:
|
|
|
|
continue
|
2017-10-03 15:55:42 +02:00
|
|
|
if self.taskAddPeer(task, peer):
|
2016-11-16 11:22:38 +01:00
|
|
|
found[hash_id].append(peer)
|
2015-10-28 01:28:29 +01:00
|
|
|
if peer.hashfield.appendHashId(hash_id): # Peer has this file
|
|
|
|
peer.time_hashfield = None # Peer hashfield probably outdated
|
2015-10-22 11:42:55 +02:00
|
|
|
|
|
|
|
return found
|
|
|
|
|
|
|
|
# Start find peers for optional files
|
2016-11-07 23:26:07 +01:00
|
|
|
@util.Noparallel(blocking=False, ignore_args=True)
|
|
|
|
def startFindOptional(self, reset_task=False, find_more=False, high_priority=False):
|
2016-11-07 23:27:42 +01:00
|
|
|
# Wait for more file requests
|
|
|
|
if len(self.tasks) < 20 or high_priority:
|
|
|
|
time.sleep(0.01)
|
2017-10-03 15:53:00 +02:00
|
|
|
elif len(self.tasks) > 90:
|
2016-11-07 23:27:42 +01:00
|
|
|
time.sleep(5)
|
|
|
|
else:
|
|
|
|
time.sleep(0.5)
|
|
|
|
|
2015-10-22 11:42:55 +02:00
|
|
|
optional_tasks = [task for task in self.tasks if task["optional_hash_id"]]
|
2016-11-07 23:27:42 +01:00
|
|
|
if not optional_tasks:
|
|
|
|
return False
|
2015-10-22 11:42:55 +02:00
|
|
|
optional_hash_ids = set([task["optional_hash_id"] for task in optional_tasks])
|
2016-11-07 23:27:42 +01:00
|
|
|
time_tasks = self.time_task_added
|
|
|
|
|
2016-03-23 13:05:48 +01:00
|
|
|
self.log.debug(
|
|
|
|
"Finding peers for optional files: %s (reset_task: %s, find_more: %s)" %
|
|
|
|
(optional_hash_ids, reset_task, find_more)
|
|
|
|
)
|
|
|
|
found = self.findOptionalTasks(optional_tasks, reset_task=reset_task)
|
2015-10-22 11:42:55 +02:00
|
|
|
|
|
|
|
if found:
|
|
|
|
found_peers = set([peer for peers in found.values() for peer in peers])
|
2017-10-04 17:26:43 +02:00
|
|
|
self.startWorkers(found_peers, force_num=3)
|
2015-10-22 11:42:55 +02:00
|
|
|
|
2016-11-07 23:29:53 +01:00
|
|
|
if len(found) < len(optional_hash_ids) or find_more or (high_priority and any(len(peers) < 10 for peers in found.itervalues())):
|
2015-10-22 11:42:55 +02:00
|
|
|
self.log.debug("No local result for optional files: %s" % (optional_hash_ids - set(found)))
|
|
|
|
|
|
|
|
# Query hashfield from connected peers
|
|
|
|
threads = []
|
|
|
|
peers = self.site.getConnectedPeers()
|
|
|
|
if not peers:
|
|
|
|
peers = self.site.getConnectablePeers()
|
|
|
|
for peer in peers:
|
2017-10-03 15:58:51 +02:00
|
|
|
threads.append(gevent.spawn(peer.updateHashfield, force=find_more))
|
2015-10-22 11:42:55 +02:00
|
|
|
gevent.joinall(threads, timeout=5)
|
|
|
|
|
2016-11-07 23:27:42 +01:00
|
|
|
if time_tasks != self.time_task_added: # New task added since start
|
|
|
|
optional_tasks = [task for task in self.tasks if task["optional_hash_id"]]
|
|
|
|
optional_hash_ids = set([task["optional_hash_id"] for task in optional_tasks])
|
|
|
|
|
2015-10-22 11:42:55 +02:00
|
|
|
found = self.findOptionalTasks(optional_tasks)
|
2016-08-15 15:59:49 +02:00
|
|
|
self.log.debug("Found optional files after query hashtable connected peers: %s/%s" % (
|
|
|
|
len(found), len(optional_hash_ids)
|
|
|
|
))
|
2015-10-22 11:42:55 +02:00
|
|
|
|
|
|
|
if found:
|
|
|
|
found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers])
|
2017-10-04 17:26:43 +02:00
|
|
|
self.startWorkers(found_peers, force_num=3)
|
2015-10-22 11:42:55 +02:00
|
|
|
|
2016-11-07 23:29:53 +01:00
|
|
|
if len(found) < len(optional_hash_ids) or find_more:
|
2015-10-22 11:42:55 +02:00
|
|
|
self.log.debug("No connected hashtable result for optional files: %s" % (optional_hash_ids - set(found)))
|
2017-10-03 15:52:37 +02:00
|
|
|
if not self.tasks:
|
|
|
|
self.log.debug("No tasks, stopping finding optional peers")
|
|
|
|
return
|
2015-10-22 11:42:55 +02:00
|
|
|
|
|
|
|
# Try to query connected peers
|
|
|
|
threads = []
|
2017-10-03 15:51:17 +02:00
|
|
|
peers = [peer for peer in self.site.getConnectedPeers() if peer.key not in self.asked_peers]
|
2015-10-22 11:42:55 +02:00
|
|
|
if not peers:
|
2017-10-03 15:51:17 +02:00
|
|
|
peers = self.site.getConnectablePeers(ignore=self.asked_peers)
|
2015-10-22 11:42:55 +02:00
|
|
|
|
|
|
|
for peer in peers:
|
|
|
|
threads.append(gevent.spawn(peer.findHashIds, list(optional_hash_ids)))
|
2017-10-03 15:51:17 +02:00
|
|
|
self.asked_peers.append(peer.key)
|
2015-10-22 11:42:55 +02:00
|
|
|
|
2016-05-05 12:11:07 +02:00
|
|
|
for i in range(5):
|
|
|
|
time.sleep(1)
|
2017-10-03 15:51:17 +02:00
|
|
|
|
2016-05-05 12:11:07 +02:00
|
|
|
thread_values = [thread.value for thread in threads if thread.value]
|
|
|
|
if not thread_values:
|
|
|
|
continue
|
2015-10-22 11:42:55 +02:00
|
|
|
|
2016-05-05 12:11:07 +02:00
|
|
|
found_ips = helper.mergeDicts(thread_values)
|
|
|
|
found = self.addOptionalPeers(found_ips)
|
2016-11-07 23:27:42 +01:00
|
|
|
self.log.debug("Found optional files after findhash connected peers: %s/%s (asked: %s)" % (
|
|
|
|
len(found), len(optional_hash_ids), len(threads)
|
2016-08-15 15:59:49 +02:00
|
|
|
))
|
2015-10-22 11:42:55 +02:00
|
|
|
|
2016-05-05 12:11:07 +02:00
|
|
|
if found:
|
|
|
|
found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers])
|
2017-10-04 17:26:43 +02:00
|
|
|
self.startWorkers(found_peers, force_num=3)
|
2016-05-05 12:11:07 +02:00
|
|
|
|
|
|
|
if len(thread_values) == len(threads):
|
|
|
|
# Got result from all started thread
|
|
|
|
break
|
2015-10-22 11:42:55 +02:00
|
|
|
|
2016-03-04 19:59:59 +01:00
|
|
|
if len(found) < len(optional_hash_ids):
|
|
|
|
self.log.debug("No findHash result, try random peers: %s" % (optional_hash_ids - set(found)))
|
|
|
|
# Try to query random peers
|
2016-11-07 23:27:42 +01:00
|
|
|
|
|
|
|
if time_tasks != self.time_task_added: # New task added since start
|
|
|
|
optional_tasks = [task for task in self.tasks if task["optional_hash_id"]]
|
|
|
|
optional_hash_ids = set([task["optional_hash_id"] for task in optional_tasks])
|
|
|
|
|
2016-03-04 19:59:59 +01:00
|
|
|
threads = []
|
2016-11-07 23:27:42 +01:00
|
|
|
peers = self.site.getConnectablePeers(ignore=self.asked_peers)
|
2016-03-04 19:59:59 +01:00
|
|
|
|
2016-11-07 23:27:42 +01:00
|
|
|
for peer in peers:
|
2016-03-04 19:59:59 +01:00
|
|
|
threads.append(gevent.spawn(peer.findHashIds, list(optional_hash_ids)))
|
2017-10-03 15:51:17 +02:00
|
|
|
self.asked_peers.append(peer.key)
|
2016-03-04 19:59:59 +01:00
|
|
|
|
|
|
|
gevent.joinall(threads, timeout=15)
|
|
|
|
|
|
|
|
found_ips = helper.mergeDicts([thread.value for thread in threads if thread.value])
|
|
|
|
found = self.addOptionalPeers(found_ips)
|
|
|
|
self.log.debug("Found optional files after findhash random peers: %s/%s" % (len(found), len(optional_hash_ids)))
|
|
|
|
|
|
|
|
if found:
|
|
|
|
found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers])
|
2017-10-04 17:26:43 +02:00
|
|
|
self.startWorkers(found_peers, force_num=3)
|
2016-03-04 19:59:59 +01:00
|
|
|
|
2015-10-22 11:42:55 +02:00
|
|
|
if len(found) < len(optional_hash_ids):
|
|
|
|
self.log.debug("No findhash result for optional files: %s" % (optional_hash_ids - set(found)))
|
|
|
|
|
2017-10-03 15:52:04 +02:00
|
|
|
if time_tasks != self.time_task_added: # New task added since start
|
|
|
|
self.log.debug("New task since start, restarting...")
|
|
|
|
gevent.spawn_later(0.1, self.startFindOptional)
|
2017-10-13 01:23:09 +02:00
|
|
|
else:
|
|
|
|
self.log.debug("startFindOptional ended")
|
2017-10-03 15:52:04 +02:00
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
# Stop all worker
|
|
|
|
def stopWorkers(self):
|
|
|
|
for worker in self.workers.values():
|
|
|
|
worker.stop()
|
|
|
|
tasks = self.tasks[:] # Copy
|
|
|
|
for task in tasks: # Mark all current task as failed
|
|
|
|
self.failTask(task)
|
|
|
|
|
|
|
|
# Find workers by task
|
|
|
|
def findWorkers(self, task):
|
|
|
|
workers = []
|
|
|
|
for worker in self.workers.values():
|
|
|
|
if worker.task == task:
|
|
|
|
workers.append(worker)
|
|
|
|
return workers
|
|
|
|
|
|
|
|
# Ends and remove a worker
|
|
|
|
def removeWorker(self, worker):
|
|
|
|
worker.running = False
|
|
|
|
if worker.key in self.workers:
|
|
|
|
del(self.workers[worker.key])
|
2016-03-19 18:07:14 +01:00
|
|
|
self.log.debug("Removed worker, workers: %s/%s" % (len(self.workers), self.getMaxWorkers()))
|
2016-11-07 23:27:42 +01:00
|
|
|
if len(self.workers) <= self.getMaxWorkers() / 3 and len(self.asked_peers) < 10:
|
2017-10-03 16:01:02 +02:00
|
|
|
optional_task = next((task for task in self.tasks if task["optional_hash_id"]), None)
|
|
|
|
if optional_task:
|
|
|
|
if len(self.workers) == 0:
|
|
|
|
self.startFindOptional(find_more=True)
|
|
|
|
else:
|
|
|
|
self.startFindOptional()
|
|
|
|
elif self.tasks and not self.workers and worker.task:
|
|
|
|
self.log.debug("Starting new workers... (tasks: %s)" % len(self.tasks))
|
|
|
|
self.startWorkers()
|
2016-11-07 23:27:42 +01:00
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
|
2016-04-06 13:47:00 +02:00
|
|
|
# Tasks sorted by this
|
|
|
|
def getPriorityBoost(self, inner_path):
|
|
|
|
if inner_path == "content.json":
|
|
|
|
return 9999 # Content.json always priority
|
|
|
|
if inner_path == "index.html":
|
|
|
|
return 9998 # index.html also important
|
|
|
|
if "-default" in inner_path:
|
|
|
|
return -4 # Default files are cloning not important
|
2017-05-07 21:34:44 +02:00
|
|
|
elif inner_path.endswith("all.css"):
|
2017-10-04 17:27:01 +02:00
|
|
|
return 14 # boost css files priority
|
2017-05-07 21:34:44 +02:00
|
|
|
elif inner_path.endswith("all.js"):
|
2017-10-04 17:27:01 +02:00
|
|
|
return 13 # boost js files priority
|
2016-04-06 13:47:00 +02:00
|
|
|
elif inner_path.endswith("dbschema.json"):
|
2017-10-04 17:27:01 +02:00
|
|
|
return 12 # boost database specification
|
2016-04-06 13:47:00 +02:00
|
|
|
elif inner_path.endswith("content.json"):
|
|
|
|
return 1 # boost included content.json files priority a bit
|
|
|
|
elif inner_path.endswith(".json"):
|
2017-05-07 21:21:26 +02:00
|
|
|
if len(inner_path) < 50: # Boost non-user json files
|
2017-10-04 17:27:01 +02:00
|
|
|
return 11
|
2017-03-12 17:55:47 +01:00
|
|
|
else:
|
|
|
|
return 2
|
2016-04-06 13:47:00 +02:00
|
|
|
return 0
|
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
# Create new task and return asyncresult
|
2017-10-03 15:49:08 +02:00
|
|
|
def addTask(self, inner_path, peer=None, priority=0, file_info=None):
|
2015-07-12 20:36:46 +02:00
|
|
|
self.site.onFileStart(inner_path) # First task, trigger site download started
|
|
|
|
task = self.findTask(inner_path)
|
|
|
|
if task: # Already has task for that file
|
2017-10-03 15:49:52 +02:00
|
|
|
task["priority"] = max(priority, task["priority"])
|
2015-07-12 20:36:46 +02:00
|
|
|
if peer and task["peers"]: # This peer also has new version, add it to task possible peers
|
|
|
|
task["peers"].append(peer)
|
|
|
|
self.log.debug("Added peer %s to %s" % (peer.key, task["inner_path"]))
|
|
|
|
self.startWorkers([peer])
|
|
|
|
elif peer and peer in task["failed"]:
|
|
|
|
task["failed"].remove(peer) # New update arrived, remove the peer from failed peers
|
|
|
|
self.log.debug("Removed peer %s from failed %s" % (peer.key, task["inner_path"]))
|
|
|
|
self.startWorkers([peer])
|
2017-10-03 15:47:46 +02:00
|
|
|
return task
|
2015-07-12 20:36:46 +02:00
|
|
|
else: # No task for that file yet
|
|
|
|
evt = gevent.event.AsyncResult()
|
|
|
|
if peer:
|
|
|
|
peers = [peer] # Only download from this peer
|
|
|
|
else:
|
|
|
|
peers = None
|
2017-10-03 15:49:08 +02:00
|
|
|
if not file_info:
|
|
|
|
file_info = self.site.content_manager.getFileInfo(inner_path)
|
2015-10-22 11:42:55 +02:00
|
|
|
if file_info and file_info["optional"]:
|
|
|
|
optional_hash_id = helper.toHashId(file_info["sha512"])
|
|
|
|
else:
|
|
|
|
optional_hash_id = None
|
Rev900, Sidebar filestats bar width round fix, Sidebar WebGL not supported error, Sidebar optimalizations, Trayicon gray shadow, Trim end of line whitespace from json files, Fix testweb testcase, Implement experimental postMessage nonce security, Return None when testing external ip, Window opener security check and message, Increase timeout for large files
2016-02-10 02:30:04 +01:00
|
|
|
if file_info:
|
|
|
|
size = file_info.get("size", 0)
|
|
|
|
else:
|
|
|
|
size = 0
|
2016-04-06 13:47:00 +02:00
|
|
|
priority += self.getPriorityBoost(inner_path)
|
2017-03-02 23:40:08 +01:00
|
|
|
|
|
|
|
if self.started_task_num == 0: # Boost priority for first requested file
|
|
|
|
priority += 1
|
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
task = {
|
2016-08-15 15:59:49 +02:00
|
|
|
"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False,
|
|
|
|
"optional_hash_id": optional_hash_id, "time_added": time.time(), "time_started": None,
|
|
|
|
"time_action": None, "peers": peers, "priority": priority, "failed": [], "size": size
|
2015-07-12 20:36:46 +02:00
|
|
|
}
|
2015-10-22 11:42:55 +02:00
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
self.tasks.append(task)
|
2015-10-22 11:42:55 +02:00
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
self.started_task_num += 1
|
|
|
|
self.log.debug(
|
2016-09-04 18:03:27 +02:00
|
|
|
"New task: %s, peer lock: %s, priority: %s, optional_hash_id: %s, tasks started: %s" %
|
2015-10-22 11:42:55 +02:00
|
|
|
(task["inner_path"], peers, priority, optional_hash_id, self.started_task_num)
|
2015-07-12 20:36:46 +02:00
|
|
|
)
|
2016-11-07 23:27:42 +01:00
|
|
|
self.time_task_added = time.time()
|
2015-10-22 11:42:55 +02:00
|
|
|
|
|
|
|
if optional_hash_id:
|
2016-11-07 23:32:20 +01:00
|
|
|
if self.asked_peers:
|
|
|
|
del self.asked_peers[:] # Reset asked peers
|
|
|
|
self.startFindOptional(high_priority=priority > 0)
|
|
|
|
|
2015-11-12 23:01:35 +01:00
|
|
|
if peers:
|
|
|
|
self.startWorkers(peers)
|
2016-11-07 23:32:20 +01:00
|
|
|
|
2015-11-12 23:01:35 +01:00
|
|
|
else:
|
|
|
|
self.startWorkers(peers)
|
2017-10-03 15:47:46 +02:00
|
|
|
return task
|
2015-07-12 20:36:46 +02:00
|
|
|
|
|
|
|
# Find a task using inner_path
|
|
|
|
def findTask(self, inner_path):
|
|
|
|
for task in self.tasks:
|
|
|
|
if task["inner_path"] == inner_path:
|
|
|
|
return task
|
|
|
|
return None # Not found
|
|
|
|
|
2016-03-19 18:09:20 +01:00
|
|
|
# Wait for other tasks
|
|
|
|
def checkComplete(self):
|
|
|
|
time.sleep(0.1)
|
|
|
|
if not self.tasks:
|
|
|
|
self.log.debug("Check compelte: No tasks")
|
2016-11-07 23:47:53 +01:00
|
|
|
self.onComplete()
|
|
|
|
|
|
|
|
def onComplete(self):
|
|
|
|
self.started_task_num = 0
|
|
|
|
del self.asked_peers[:]
|
|
|
|
self.site.onComplete() # No more task trigger site complete
|
2016-03-19 18:09:20 +01:00
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
# Mark a task done
|
|
|
|
def doneTask(self, task):
|
|
|
|
task["done"] = True
|
|
|
|
self.tasks.remove(task) # Remove from queue
|
2016-10-02 14:18:35 +02:00
|
|
|
if task["optional_hash_id"]:
|
2017-10-03 15:48:36 +02:00
|
|
|
self.log.debug("Downloaded optional file in %.3fs, adding to hashfield: %s" % (time.time() - task["time_started"], task["inner_path"]))
|
2016-11-07 23:48:11 +01:00
|
|
|
self.site.content_manager.optionalDownloaded(task["inner_path"], task["optional_hash_id"], task["size"])
|
2015-07-12 20:36:46 +02:00
|
|
|
self.site.onFileDone(task["inner_path"])
|
|
|
|
task["evt"].set(True)
|
|
|
|
if not self.tasks:
|
2016-03-19 18:09:20 +01:00
|
|
|
gevent.spawn(self.checkComplete)
|
2016-09-04 18:03:27 +02:00
|
|
|
|
|
|
|
# Mark a task failed
|
|
|
|
def failTask(self, task):
|
|
|
|
if task in self.tasks:
|
|
|
|
task["done"] = True
|
|
|
|
self.tasks.remove(task) # Remove from queue
|
|
|
|
self.site.onFileFail(task["inner_path"])
|
|
|
|
task["evt"].set(False)
|
|
|
|
if not self.tasks:
|
|
|
|
self.started_task_num = 0
|