From efb1dc32105e3cd75eb112788d0def0303cac5b2 Mon Sep 17 00:00:00 2001 From: HelloZeroNet Date: Wed, 14 Jan 2015 02:41:13 +0100 Subject: [PATCH] file download queue priority by browser request, newer content json log, peer remove key error fix, peer request error also a connection error, new sites created with own flag --- src/Peer/Peer.py | 3 +++ src/Site/Site.py | 7 ++++--- src/Ui/UiRequest.py | 2 +- src/Ui/UiWebsocket.py | 14 +++++++++----- src/Worker/WorkerManager.py | 31 +++++++++++++++++++------------ src/main.py | 2 ++ 6 files changed, 38 insertions(+), 21 deletions(-) diff --git a/src/Peer/Peer.py b/src/Peer/Peer.py index af530f88..1c308b60 100644 --- a/src/Peer/Peer.py +++ b/src/Peer/Peer.py @@ -11,6 +11,8 @@ class Peer: self.ip = ip self.port = port self.site = site + self.key = "%s:%s" % (ip, port) + self.socket = None self.last_found = None self.added = time.time() @@ -43,6 +45,7 @@ class Peer: response = msgpack.unpackb(self.socket.recv()) if "error" in response: self.log.debug("%s %s error: %s" % (cmd, params, response["error"])) + self.onConnectionError() else: # Successful request, reset connection error num self.connection_error = 0 return response diff --git a/src/Site/Site.py b/src/Site/Site.py index 73c6423b..44c73df3 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -179,7 +179,7 @@ class Site: # Check and download if file not exits - def needFile(self, inner_path, update=False, blocking=True, peer=None): + def needFile(self, inner_path, update=False, blocking=True, peer=None, priority=0): if os.path.isfile(self.getPath(inner_path)) and not update: # File exits, no need to do anything return True elif self.settings["serving"] == False: # Site not serving @@ -189,12 +189,12 @@ class Site: self.log.debug("Need content.json first") self.announce() if inner_path != "content.json": # Prevent double download - task = self.worker_manager.addTask("content.json", peer) + task = self.worker_manager.addTask("content.json", peer, priority=99999) task.get() self.loadContent() if not self.content: return False - task = self.worker_manager.addTask(inner_path, peer) + task = self.worker_manager.addTask(inner_path, peer, priority=priority) if blocking: return task.get() else: @@ -330,6 +330,7 @@ class Site: if self.content["modified"] == content["modified"]: # Ignore, have the same content.json return None elif self.content["modified"] > content["modified"]: # We have newer + self.log.debug("We have newer content.json (Our: %s, Sent: %s)" % (self.content["modified"], content["modified"])) return False if content["modified"] > time.time()+60*60*24: # Content modified in the far future (allow 1 day window) self.log.error("Content.json modify is in the future!") diff --git a/src/Ui/UiRequest.py b/src/Ui/UiRequest.py index 84767abf..c870e157 100644 --- a/src/Ui/UiRequest.py +++ b/src/Ui/UiRequest.py @@ -158,7 +158,7 @@ class UiRequest: else: # File not exits, try to download site = SiteManager.need(match.group("site"), all_file=False) self.sendHeader(content_type=self.getContentType(file_path)) # ?? Get Exception without this - result = site.needFile(match.group("inner_path")) # Wait until file downloads + result = site.needFile(match.group("inner_path"), priority=1) # Wait until file downloads return self.actionFile(file_path) else: # Bad url diff --git a/src/Ui/UiWebsocket.py b/src/Ui/UiWebsocket.py index 8056fc9f..6b7b2520 100644 --- a/src/Ui/UiWebsocket.py +++ b/src/Ui/UiWebsocket.py @@ -6,6 +6,7 @@ class UiWebsocket: def __init__(self, ws, site, server): self.ws = ws self.site = site + self.log = site.log self.server = server self.next_message_id = 1 self.waiting_cb = {} # Waiting for callback. Key: message_id, Value: function pointer @@ -35,7 +36,7 @@ class UiWebsocket: if config.debug: # Allow websocket errors to appear on /Debug import sys sys.modules["src.main"].DebugHook.handleError() - self.site.log.error("WebSocket error: %s" % err) + self.log.error("WebSocket error: %s" % err) return "Bye." @@ -64,9 +65,12 @@ class UiWebsocket: def send(self, message, cb = None): message["id"] = self.next_message_id # Add message id to allow response self.next_message_id += 1 - self.ws.send(json.dumps(message)) - if cb: # Callback after client responsed - self.waiting_cb[message["id"]] = cb + try: + self.ws.send(json.dumps(message)) + if cb: # Callback after client responsed + self.waiting_cb[message["id"]] = cb + except Exception, err: + self.log.debug("Websocket send error: %s" % err) # Handle incoming messages @@ -107,7 +111,7 @@ class UiWebsocket: if req["to"] in self.waiting_cb: self.waiting_cb(req["result"]) # Call callback function else: - self.site.log.error("Websocket callback not found: %s" % req) + self.log.error("Websocket callback not found: %s" % req) # Send a simple pong answer diff --git a/src/Worker/WorkerManager.py b/src/Worker/WorkerManager.py index f54bc448..d5e63fe8 100644 --- a/src/Worker/WorkerManager.py +++ b/src/Worker/WorkerManager.py @@ -8,7 +8,7 @@ class WorkerManager: def __init__(self, site): self.site = site self.workers = {} # Key: ip:port, Value: Worker.Worker - self.tasks = [] # {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "time_start": time.time(), "peers": peers} + self.tasks = [] # {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "time_start": time.time(), "peers": peers, "priority": 0} self.log = logging.getLogger("WorkerManager:%s" % self.site.address_short) self.process_taskchecker = gevent.spawn(self.checkTasks) @@ -40,15 +40,19 @@ class WorkerManager: continue # One reannounce per loop + # Tasks sorted by this + def taskSorter(self, task): + if task["inner_path"] == "content.json": return 9999 # Content.json always prority + if task["inner_path"] == "index.html": return 9998 # index.html also important + return task["priority"]-task["workers_num"] # Prefer more priority and less workers + + # Returns the next free or less worked task - def getTask(self, peer, only_free=False): - best_task = None - for task in self.tasks: # Find out the task with lowest worker number + def getTask(self, peer): + self.tasks.sort(key=self.taskSorter, reverse=True) # Sort tasks by priority and worker numbers + for task in self.tasks: # Find a task if task["peers"] and peer not in task["peers"]: continue # This peer not allowed to pick this task - if task["inner_path"] == "content.json": return task # Content.json always prority - if not best_task or task["workers_num"] < best_task["workers_num"]: # If task has lower worker number then its better - best_task = task - return best_task + return task # New peers added to site @@ -76,21 +80,24 @@ class WorkerManager: if worker.task == task: workers.append(worker) return workers + # Ends and remove a worker def removeWorker(self, worker): worker.running = False - del(self.workers[worker.key]) + if worker.key in self.workers: del(self.workers[worker.key]) self.log.debug("Removed worker, workers: %s/%s" % (len(self.workers), MAX_WORKERS)) # Create new task and return asyncresult - def addTask(self, inner_path, peer=None): + def addTask(self, inner_path, peer=None, priority = 0): self.site.onFileStart(inner_path) # First task, trigger site download started task = self.findTask(inner_path) if task: # Already has task for that file - if peer and task["peers"]: # This peer has new version too + if peer and task["peers"]: # This peer also has new version, add it to task possible peers task["peers"].append(peer) self.startWorkers() + if priority: + task["priority"] += priority # Boost on priority return task["evt"] else: # No task for that file yet evt = gevent.event.AsyncResult() @@ -98,7 +105,7 @@ class WorkerManager: peers = [peer] # Only download from this peer else: peers = None - task = {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "time_start": time.time(), "peers": peers} + task = {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "time_start": time.time(), "peers": peers, "priority": priority} self.tasks.append(task) self.log.debug("New task: %s" % task) self.startWorkers() diff --git a/src/main.py b/src/main.py index 4b3dab92..f031a743 100644 --- a/src/main.py +++ b/src/main.py @@ -81,6 +81,8 @@ def siteCreate(): logging.info("Creating content.json...") site = Site(address) site.signContent(privatekey) + site.settings["own"] = True + site.saveSettings() logging.info("Site created!")