2015-01-12 02:03:45 +01:00
|
|
|
from Worker import Worker
|
2015-02-20 01:37:12 +01:00
|
|
|
import gevent, time, logging, random
|
2015-01-12 02:03:45 +01:00
|
|
|
|
|
|
|
MAX_WORKERS = 10
|
|
|
|
|
|
|
|
# Worker manager for site
|
|
|
|
class WorkerManager:
|
|
|
|
def __init__(self, site):
|
|
|
|
self.site = site
|
|
|
|
self.workers = {} # Key: ip:port, Value: Worker.Worker
|
2015-03-06 02:31:51 +01:00
|
|
|
self.tasks = [] # {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "time_started": None, "time_added": time.time(), "peers": peers, "priority": 0, "failed": peer_ids}
|
2015-02-26 01:32:27 +01:00
|
|
|
self.started_task_num = 0 # Last added task num
|
2015-01-21 12:58:26 +01:00
|
|
|
self.running = True
|
2015-01-12 02:03:45 +01:00
|
|
|
self.log = logging.getLogger("WorkerManager:%s" % self.site.address_short)
|
|
|
|
self.process_taskchecker = gevent.spawn(self.checkTasks)
|
|
|
|
|
|
|
|
|
2015-03-06 02:31:51 +01:00
|
|
|
def __str__(self):
|
|
|
|
return "WorkerManager %s" % self.site.address_short
|
|
|
|
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return "<%s>" % self.__str__()
|
|
|
|
|
|
|
|
|
|
|
|
|
2015-01-12 02:03:45 +01:00
|
|
|
# Check expired tasks
|
|
|
|
def checkTasks(self):
|
2015-01-21 12:58:26 +01:00
|
|
|
while self.running:
|
2015-03-06 02:31:51 +01:00
|
|
|
tasks = task = worker = workers = None # Cleanup local variables
|
2015-01-17 18:50:56 +01:00
|
|
|
time.sleep(15) # Check every 15 sec
|
|
|
|
|
|
|
|
# Clean up workers
|
version 0.2.4, peerPing and peerGetFile commands, old content update bugfix, new network code and protocol, connection share between sites, connection reuse, dont retry bad file more than 3 times in 20 min, multi threaded include file download, shuffle peers before publish, simple internal stats page, dont retry on failed peers, more than 10 peers publish bugfix
2015-02-23 23:33:31 +01:00
|
|
|
for worker in self.workers.values():
|
version 0.3.0, rev187, Trusted authorization sites support, --publish option on signing, cryptSign command line option, OpenSSL enabled on OSX, Crypto verify allows list of valid addresses, Option for version 2 json DB tables, DbCursor SELECT parameters bugfix, Add peer to site on ListModified, Download blind includes when new site added, Publish command better messages, Multi-threaded announce, New http Torrent trackers, Wait for dbschema.json on query, Handle json import errors, More compact writeJson storage command, Testcase for signing and verifying, Workaround to make non target=_top links work, More clean UiWebsocket command route, Send cert_user_id on siteinfo, Notify other local clients on local file modify, Option to wait for file download before sql query, File rules websocket API command, Cert add and select, set websocket API command, Put focus on innerframe, innerloaded wrapper api command to add hashtag, Allow more file error on big sites, Keep worker running after stuked on done task, New more stable openSSL layer that works on OSX, Noparallel parameter bugfix, RateLimit allowed again interval bugfix, Updater skips non-writeable files, Try to close openssl dll before update
2015-05-25 01:26:33 +02:00
|
|
|
if worker.task and worker.task["done"]: worker.skip() # Stop workers with task done
|
2015-01-17 18:50:56 +01:00
|
|
|
|
2015-01-12 02:03:45 +01:00
|
|
|
if not self.tasks: continue
|
2015-03-06 02:31:51 +01:00
|
|
|
|
2015-01-12 02:03:45 +01:00
|
|
|
tasks = self.tasks[:] # Copy it so removing elements wont cause any problem
|
|
|
|
for task in tasks:
|
version 0.3.0, rev187, Trusted authorization sites support, --publish option on signing, cryptSign command line option, OpenSSL enabled on OSX, Crypto verify allows list of valid addresses, Option for version 2 json DB tables, DbCursor SELECT parameters bugfix, Add peer to site on ListModified, Download blind includes when new site added, Publish command better messages, Multi-threaded announce, New http Torrent trackers, Wait for dbschema.json on query, Handle json import errors, More compact writeJson storage command, Testcase for signing and verifying, Workaround to make non target=_top links work, More clean UiWebsocket command route, Send cert_user_id on siteinfo, Notify other local clients on local file modify, Option to wait for file download before sql query, File rules websocket API command, Cert add and select, set websocket API command, Put focus on innerframe, innerloaded wrapper api command to add hashtag, Allow more file error on big sites, Keep worker running after stuked on done task, New more stable openSSL layer that works on OSX, Noparallel parameter bugfix, RateLimit allowed again interval bugfix, Updater skips non-writeable files, Try to close openssl dll before update
2015-05-25 01:26:33 +02:00
|
|
|
if task["time_started"] and time.time() >= task["time_started"]+60: # Task taking too long time, skip it
|
|
|
|
self.log.debug("Timeout, Skipping: %s" % task)
|
|
|
|
# Skip to next file workers
|
2015-01-12 02:03:45 +01:00
|
|
|
workers = self.findWorkers(task)
|
version 0.3.0, rev187, Trusted authorization sites support, --publish option on signing, cryptSign command line option, OpenSSL enabled on OSX, Crypto verify allows list of valid addresses, Option for version 2 json DB tables, DbCursor SELECT parameters bugfix, Add peer to site on ListModified, Download blind includes when new site added, Publish command better messages, Multi-threaded announce, New http Torrent trackers, Wait for dbschema.json on query, Handle json import errors, More compact writeJson storage command, Testcase for signing and verifying, Workaround to make non target=_top links work, More clean UiWebsocket command route, Send cert_user_id on siteinfo, Notify other local clients on local file modify, Option to wait for file download before sql query, File rules websocket API command, Cert add and select, set websocket API command, Put focus on innerframe, innerloaded wrapper api command to add hashtag, Allow more file error on big sites, Keep worker running after stuked on done task, New more stable openSSL layer that works on OSX, Noparallel parameter bugfix, RateLimit allowed again interval bugfix, Updater skips non-writeable files, Try to close openssl dll before update
2015-05-25 01:26:33 +02:00
|
|
|
if workers:
|
|
|
|
for worker in workers:
|
|
|
|
worker.skip()
|
|
|
|
else:
|
|
|
|
self.failTask(task)
|
|
|
|
elif time.time() >= task["time_added"]+60 and not self.workers: # No workers left
|
|
|
|
self.log.debug("Timeout, Cleanup task: %s" % task)
|
2015-01-12 02:03:45 +01:00
|
|
|
# Remove task
|
|
|
|
self.failTask(task)
|
2015-01-14 22:57:43 +01:00
|
|
|
|
|
|
|
elif (task["time_started"] and time.time() >= task["time_started"]+15) or not self.workers: # Task started more than 15 sec ago or no workers
|
|
|
|
self.log.debug("Task taking more than 15 secs, find more peers: %s" % task["inner_path"])
|
|
|
|
task["site"].announce() # Find more peers
|
version 0.2.4, peerPing and peerGetFile commands, old content update bugfix, new network code and protocol, connection share between sites, connection reuse, dont retry bad file more than 3 times in 20 min, multi threaded include file download, shuffle peers before publish, simple internal stats page, dont retry on failed peers, more than 10 peers publish bugfix
2015-02-23 23:33:31 +01:00
|
|
|
if task["peers"]: # Release the peer lock
|
2015-01-14 22:57:43 +01:00
|
|
|
self.log.debug("Task peer lock release: %s" % task["inner_path"])
|
|
|
|
task["peers"] = []
|
|
|
|
self.startWorkers()
|
|
|
|
break # One reannounce per loop
|
2015-03-06 02:31:51 +01:00
|
|
|
|
|
|
|
|
2015-01-21 12:58:26 +01:00
|
|
|
self.log.debug("checkTasks stopped running")
|
2015-01-14 22:57:43 +01:00
|
|
|
|
2015-01-12 02:03:45 +01:00
|
|
|
|
|
|
|
|
2015-01-17 18:50:56 +01:00
|
|
|
|
2015-01-14 02:41:13 +01:00
|
|
|
# Tasks sorted by this
|
|
|
|
def taskSorter(self, task):
|
|
|
|
if task["inner_path"] == "content.json": return 9999 # Content.json always prority
|
|
|
|
if task["inner_path"] == "index.html": return 9998 # index.html also important
|
2015-01-14 22:57:43 +01:00
|
|
|
priority = task["priority"]
|
|
|
|
if task["inner_path"].endswith(".js") or task["inner_path"].endswith(".css"): priority += 1 # download js and css files first
|
|
|
|
return priority-task["workers_num"] # Prefer more priority and less workers
|
2015-01-14 02:41:13 +01:00
|
|
|
|
|
|
|
|
2015-01-12 02:03:45 +01:00
|
|
|
# Returns the next free or less worked task
|
2015-01-14 02:41:13 +01:00
|
|
|
def getTask(self, peer):
|
|
|
|
self.tasks.sort(key=self.taskSorter, reverse=True) # Sort tasks by priority and worker numbers
|
|
|
|
for task in self.tasks: # Find a task
|
2015-01-12 02:03:45 +01:00
|
|
|
if task["peers"] and peer not in task["peers"]: continue # This peer not allowed to pick this task
|
2015-04-24 02:36:00 +02:00
|
|
|
if peer in task["failed"]: continue # Peer already tried to solve this, but failed
|
2015-01-14 02:41:13 +01:00
|
|
|
return task
|
2015-01-12 02:03:45 +01:00
|
|
|
|
|
|
|
|
|
|
|
# New peers added to site
|
|
|
|
def onPeers(self):
|
|
|
|
self.startWorkers()
|
|
|
|
|
|
|
|
|
2015-01-15 23:24:51 +01:00
|
|
|
# Add new worker
|
|
|
|
def addWorker(self, peer):
|
|
|
|
key = peer.key
|
|
|
|
if key not in self.workers and len(self.workers) < MAX_WORKERS: # We dont have worker for that peer and workers num less than max
|
|
|
|
worker = Worker(self, peer)
|
|
|
|
self.workers[key] = worker
|
|
|
|
worker.key = key
|
|
|
|
worker.start()
|
|
|
|
return worker
|
|
|
|
else: # We have woker for this peer or its over the limit
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
2015-01-12 02:03:45 +01:00
|
|
|
# Start workers to process tasks
|
2015-01-15 23:24:51 +01:00
|
|
|
def startWorkers(self, peers=None):
|
2015-01-12 02:03:45 +01:00
|
|
|
if not self.tasks: return False # No task for workers
|
2015-04-17 23:12:22 +02:00
|
|
|
if len(self.workers) >= MAX_WORKERS and not peers: return False # Workers number already maxed and no starting peers definied
|
version 0.2.4, peerPing and peerGetFile commands, old content update bugfix, new network code and protocol, connection share between sites, connection reuse, dont retry bad file more than 3 times in 20 min, multi threaded include file download, shuffle peers before publish, simple internal stats page, dont retry on failed peers, more than 10 peers publish bugfix
2015-02-23 23:33:31 +01:00
|
|
|
if not peers: peers = self.site.peers.values() # No peers definied, use any from site
|
2015-02-20 01:37:12 +01:00
|
|
|
random.shuffle(peers)
|
|
|
|
for peer in peers: # One worker for every peer
|
2015-01-15 23:24:51 +01:00
|
|
|
if peers and peer not in peers: continue # If peers definied and peer not valid
|
|
|
|
worker = self.addWorker(peer)
|
2015-02-20 01:37:12 +01:00
|
|
|
if worker: self.log.debug("Added worker: %s, workers: %s/%s" % (peer.key, len(self.workers), MAX_WORKERS))
|
2015-01-12 02:03:45 +01:00
|
|
|
|
|
|
|
|
2015-01-21 12:58:26 +01:00
|
|
|
# Stop all worker
|
|
|
|
def stopWorkers(self):
|
|
|
|
for worker in self.workers.values():
|
|
|
|
worker.stop()
|
|
|
|
tasks = self.tasks[:] # Copy
|
|
|
|
for task in tasks: # Mark all current task as failed
|
|
|
|
self.failTask(task)
|
|
|
|
|
|
|
|
|
|
|
|
|
2015-01-12 02:03:45 +01:00
|
|
|
# Find workers by task
|
|
|
|
def findWorkers(self, task):
|
|
|
|
workers = []
|
|
|
|
for worker in self.workers.values():
|
|
|
|
if worker.task == task: workers.append(worker)
|
|
|
|
return workers
|
|
|
|
|
2015-01-14 02:41:13 +01:00
|
|
|
|
2015-01-12 02:03:45 +01:00
|
|
|
# Ends and remove a worker
|
|
|
|
def removeWorker(self, worker):
|
|
|
|
worker.running = False
|
2015-01-17 18:50:56 +01:00
|
|
|
if worker.key in self.workers:
|
|
|
|
del(self.workers[worker.key])
|
|
|
|
self.log.debug("Removed worker, workers: %s/%s" % (len(self.workers), MAX_WORKERS))
|
2015-01-12 02:03:45 +01:00
|
|
|
|
|
|
|
|
|
|
|
# Create new task and return asyncresult
|
2015-01-14 02:41:13 +01:00
|
|
|
def addTask(self, inner_path, peer=None, priority = 0):
|
2015-01-12 02:03:45 +01:00
|
|
|
self.site.onFileStart(inner_path) # First task, trigger site download started
|
|
|
|
task = self.findTask(inner_path)
|
|
|
|
if task: # Already has task for that file
|
2015-01-14 02:41:13 +01:00
|
|
|
if peer and task["peers"]: # This peer also has new version, add it to task possible peers
|
2015-01-12 02:03:45 +01:00
|
|
|
task["peers"].append(peer)
|
2015-01-15 23:24:51 +01:00
|
|
|
self.log.debug("Added peer %s to %s" % (peer.key, task["inner_path"]))
|
|
|
|
self.startWorkers([peer])
|
2015-04-24 02:36:00 +02:00
|
|
|
elif peer and peer in task["failed"]:
|
|
|
|
task["failed"].remove(peer) # New update arrived, remove the peer from failed peers
|
|
|
|
self.log.debug("Removed peer %s from failed %s" % (peer.key, task["inner_path"]))
|
|
|
|
self.startWorkers([peer])
|
|
|
|
|
|
|
|
|
2015-01-14 02:41:13 +01:00
|
|
|
if priority:
|
|
|
|
task["priority"] += priority # Boost on priority
|
2015-01-12 02:03:45 +01:00
|
|
|
return task["evt"]
|
|
|
|
else: # No task for that file yet
|
|
|
|
evt = gevent.event.AsyncResult()
|
|
|
|
if peer:
|
|
|
|
peers = [peer] # Only download from this peer
|
|
|
|
else:
|
|
|
|
peers = None
|
version 0.2.4, peerPing and peerGetFile commands, old content update bugfix, new network code and protocol, connection share between sites, connection reuse, dont retry bad file more than 3 times in 20 min, multi threaded include file download, shuffle peers before publish, simple internal stats page, dont retry on failed peers, more than 10 peers publish bugfix
2015-02-23 23:33:31 +01:00
|
|
|
task = {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "time_added": time.time(), "time_started": None, "peers": peers, "priority": priority, "failed": []}
|
2015-01-12 02:03:45 +01:00
|
|
|
self.tasks.append(task)
|
version 0.2.8, Namecoin domains using internal resolver site, --disable_zeromq option to skip backward compatiblity layer and save some memory, connectionserver firstchar error fixes, missing unpacker crash fix, sitemanager class to allow extensions, add loaded plugin list to websocket api, faster content publishing, mark updating file as bad, remove coppersurfer tracker add eddie4, internal server error with error displaying, allow site domains in UiRequest, better progress bar, wait for siteinfo before before using localstorage, csslater hide only if opacity is 0
2015-03-30 23:44:29 +02:00
|
|
|
self.started_task_num += 1
|
2015-02-26 01:32:27 +01:00
|
|
|
self.log.debug("New task: %s, peer lock: %s, priority: %s, tasks: %s" % (task["inner_path"], peers, priority, self.started_task_num))
|
2015-01-15 23:24:51 +01:00
|
|
|
self.startWorkers(peers)
|
2015-01-12 02:03:45 +01:00
|
|
|
return evt
|
|
|
|
|
|
|
|
|
|
|
|
# Find a task using inner_path
|
|
|
|
def findTask(self, inner_path):
|
|
|
|
for task in self.tasks:
|
|
|
|
if task["inner_path"] == inner_path:
|
|
|
|
return task
|
|
|
|
return None # Not found
|
|
|
|
|
|
|
|
|
|
|
|
# Mark a task failed
|
|
|
|
def failTask(self, task):
|
version 0.3.0, rev187, Trusted authorization sites support, --publish option on signing, cryptSign command line option, OpenSSL enabled on OSX, Crypto verify allows list of valid addresses, Option for version 2 json DB tables, DbCursor SELECT parameters bugfix, Add peer to site on ListModified, Download blind includes when new site added, Publish command better messages, Multi-threaded announce, New http Torrent trackers, Wait for dbschema.json on query, Handle json import errors, More compact writeJson storage command, Testcase for signing and verifying, Workaround to make non target=_top links work, More clean UiWebsocket command route, Send cert_user_id on siteinfo, Notify other local clients on local file modify, Option to wait for file download before sql query, File rules websocket API command, Cert add and select, set websocket API command, Put focus on innerframe, innerloaded wrapper api command to add hashtag, Allow more file error on big sites, Keep worker running after stuked on done task, New more stable openSSL layer that works on OSX, Noparallel parameter bugfix, RateLimit allowed again interval bugfix, Updater skips non-writeable files, Try to close openssl dll before update
2015-05-25 01:26:33 +02:00
|
|
|
if task in self.tasks:
|
|
|
|
task["done"] = True
|
|
|
|
self.tasks.remove(task) # Remove from queue
|
|
|
|
self.site.onFileFail(task["inner_path"])
|
|
|
|
task["evt"].set(False)
|
|
|
|
if not self.tasks:
|
|
|
|
self.started_task_num = 0
|
2015-01-12 02:03:45 +01:00
|
|
|
|
|
|
|
|
|
|
|
# Mark a task done
|
|
|
|
def doneTask(self, task):
|
|
|
|
task["done"] = True
|
|
|
|
self.tasks.remove(task) # Remove from queue
|
|
|
|
self.site.onFileDone(task["inner_path"])
|
|
|
|
task["evt"].set(True)
|
version 0.2.8, Namecoin domains using internal resolver site, --disable_zeromq option to skip backward compatiblity layer and save some memory, connectionserver firstchar error fixes, missing unpacker crash fix, sitemanager class to allow extensions, add loaded plugin list to websocket api, faster content publishing, mark updating file as bad, remove coppersurfer tracker add eddie4, internal server error with error displaying, allow site domains in UiRequest, better progress bar, wait for siteinfo before before using localstorage, csslater hide only if opacity is 0
2015-03-30 23:44:29 +02:00
|
|
|
if not self.tasks:
|
|
|
|
self.started_task_num = 0
|
|
|
|
self.site.onComplete() # No more task trigger site complete
|
2015-01-12 02:03:45 +01:00
|
|
|
|