From b35d21d6436a189915906ab6bcfa2c55b5bf9d76 Mon Sep 17 00:00:00 2001 From: HelloZeroNet Date: Fri, 6 Mar 2015 02:31:51 +0100 Subject: [PATCH] more detailed stats, memory optimalizations, connection pinging and timeout, request timeout, validate content after signing, only recompile changed coffeescripts, remove unnecessary js logs --- src/Connection/Connection.py | 53 ++++++++++++++++++++---- src/Connection/ConnectionServer.py | 43 ++++++++++++++++++-- src/Content/ContentManager.py | 6 ++- src/Debug/DebugHook.py | 2 +- src/Debug/DebugMedia.py | 65 ++++++++++++++++++++---------- src/Peer/Peer.py | 7 ++-- src/Ui/UiRequest.py | 65 ++++++++++++++++++++++++++---- src/Ui/UiWebsocket.py | 2 + src/Ui/media/Loading.coffee | 2 - src/Ui/media/Wrapper.coffee | 3 -- src/Ui/media/all.js | 7 +--- src/Worker/Worker.py | 11 ++++- src/Worker/WorkerManager.py | 15 ++++++- 13 files changed, 222 insertions(+), 59 deletions(-) diff --git a/src/Connection/Connection.py b/src/Connection/Connection.py index 8944e1d7..2d692020 100644 --- a/src/Connection/Connection.py +++ b/src/Connection/Connection.py @@ -17,12 +17,14 @@ class Connection: self.id = server.last_connection_id server.last_connection_id += 1 self.protocol = "?" + self.type = "?" self.server = server self.log = logging.getLogger(str(self)) self.unpacker = msgpack.Unpacker() # Stream incoming socket messages here self.req_id = 0 # Last request id - self.handshake = None # Handshake info got from peer + self.handshake = {} # Handshake info got from peer + self.connected = False self.event_connected = gevent.event.AsyncResult() # Solves on handshake received self.closed = False @@ -42,6 +44,7 @@ class Connection: self.bytes_sent = 0 self.last_ping_delay = None self.last_req_time = 0 + self.last_cmd = None self.waiting_requests = {} # Waiting sent requests @@ -56,6 +59,7 @@ class Connection: # Open connection to peer and wait for handshake def connect(self): self.log.debug("Connecting...") + self.type = "out" self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((self.ip, self.port)) # Detect protocol @@ -67,12 +71,14 @@ class Connection: # Handle incoming connection def handleIncomingConnection(self, sock): + self.type = "in" firstchar = sock.recv(1) # Find out if pure socket or zeromq if firstchar == "\xff": # Backward compatiblity: forward data to zmq if config.debug_socket: self.log.debug("Fallback incoming connection to ZeroMQ") self.protocol = "zeromq" self.log.name = str(self) + self.connected = True self.event_connected.set(self.protocol) if self.server.zmq_running: @@ -100,10 +106,12 @@ class Connection: return False if firstchar == "\xff": # Backward compatibility to zmq self.sock.close() # Close normal socket + del firstchar if zmq: if config.debug_socket: self.log.debug("Connecting as ZeroMQ") self.protocol = "zeromq" self.log.name = str(self) + self.connected = True self.event_connected.set(self.protocol) # Mark handshake as done try: @@ -116,12 +124,13 @@ class Connection: zmq_sock.connect('tcp://%s:%s' % (self.ip, self.port)) self.zmq_sock = zmq_sock except Exception, err: - self.log.debug("Socket error: %s" % Debug.formatException(err)) + if not self.closed: self.log.debug("Socket error: %s" % Debug.formatException(err)) else: return False # No zeromq connection supported else: # Normal socket self.protocol = "v2" self.log.name = str(self) + self.connected = True self.event_connected.set(self.protocol) # Mark handshake as done unpacker = self.unpacker @@ -137,8 +146,10 @@ class Connection: for message in unpacker: self.incomplete_buff_recv = 0 self.handleMessage(message) + message = None + buf = None except Exception, err: - self.log.debug("Socket error: %s" % Debug.formatException(err)) + if not self.closed: self.log.debug("Socket error: %s" % Debug.formatException(err)) self.close() # MessageLoop ended, close connection @@ -188,7 +199,7 @@ class Connection: # Send data to connection def send(self, message): - if config.debug_socket: self.log.debug("Send: %s, to: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("req_id"))) + if config.debug_socket: self.log.debug("Send: %s, to: %s, site: %s, inner_path: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("params", {}).get("site"), message.get("params", {}).get("inner_path"), message.get("req_id"))) self.last_send_time = time.time() if self.protocol == "zeromq": if self.zmq_sock: # Outgoing connection @@ -210,27 +221,50 @@ class Connection: self.bytes_sent += len(data) self.sock.sendall(data) self.last_sent_time = time.time() - if config.debug_socket: self.log.debug("Sent: %s, to: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("req_id"))) + if config.debug_socket: self.log.debug("Sent: %s, to: %s, site: %s, inner_path: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("params", {}).get("site"), message.get("params", {}).get("inner_path"), message.get("req_id"))) + return True # Create and send a request to peer def request(self, cmd, params={}): + if self.waiting_requests and self.protocol == "v2" and time.time() - max(self.last_req_time, self.last_recv_time) > 10: # Last command sent more than 10 sec ago, timeout + self.log.debug("Request %s timeout: %s" % (self.last_cmd, time.time() - self.last_send_time)) + self.close() + return False + self.last_req_time = time.time() + self.last_cmd = cmd self.req_id += 1 data = {"cmd": cmd, "req_id": self.req_id, "params": params} event = gevent.event.AsyncResult() # Create new event for response self.waiting_requests[self.req_id] = event self.send(data) # Send request res = event.get() # Wait until event solves - return res + + + def ping(self): + s = time.time() + response = None + with gevent.Timeout(10.0, False): + try: + response = self.request("ping") + except Exception, err: + self.log.debug("Ping error: %s" % Debug.formatException(err)) + if response and "body" in response and response["body"] == "Pong!": + self.last_ping_delay = time.time()-s + return True + else: + return False + + # Close connection def close(self): if self.closed: return False # Already closed self.closed = True - if config.debug_socket: self.log.debug("Closing connection, waiting_requests: %s..." % len(self.waiting_requests)) + if config.debug_socket: self.log.debug("Closing connection, waiting_requests: %s, buff: %s..." % (len(self.waiting_requests), self.incomplete_buff_recv)) for request in self.waiting_requests.values(): # Mark pending requests failed request.set(False) self.waiting_requests = {} @@ -245,3 +279,8 @@ class Connection: self.sock.close() except Exception, err: if config.debug_socket: self.log.debug("Close error: %s" % Debug.formatException(err)) + + # Little cleanup + del self.log + del self.unpacker + del self.sock diff --git a/src/Connection/ConnectionServer.py b/src/Connection/ConnectionServer.py index 345ad927..c3b8f7cd 100644 --- a/src/Connection/ConnectionServer.py +++ b/src/Connection/ConnectionServer.py @@ -1,6 +1,6 @@ from gevent.server import StreamServer from gevent.pool import Pool -import socket, os, logging, random, string +import socket, os, logging, random, string, time import gevent, msgpack import cStringIO as StringIO from Debug import Debug @@ -20,6 +20,8 @@ class ConnectionServer: self.peer_ids = {} # Connections by peer_ids self.running = True + self.thread_checker = gevent.spawn(self.checkConnections) + self.zmq_running = False self.zmq_last_connection = None # Last incoming message client @@ -60,14 +62,20 @@ class ConnectionServer: def getConnection(self, ip=None, port=None, peer_id=None): if peer_id and peer_id in self.peer_ids: # Find connection by peer id connection = self.peer_ids.get(peer_id) - connection.event_connected.get() # Wait for connection + if not connection.connected: connection.event_connected.get() # Wait for connection return connection if ip in self.ips: # Find connection by ip connection = self.ips[ip] - connection.event_connected.get() # Wait for connection + if not connection.connected: connection.event_connected.get() # Wait for connection return connection - # No connection found yet + # Recover from connection pool + for connection in self.connections: + if connection.ip == ip: + if not connection.connected: connection.event_connected.get() # Wait for connection + return connection + + # No connection found try: connection = Connection(self, ip, port) self.ips[ip] = connection @@ -90,6 +98,33 @@ class ConnectionServer: self.connections.remove(connection) + + def checkConnections(self): + while self.running: + time.sleep(60) # Sleep 1 min + for connection in self.connections[:]: # Make a copy + if connection.protocol == "zeromq": continue # No stat on ZeroMQ sockets + idle = time.time() - max(connection.last_recv_time, connection.start_time) + + if idle > 60*60: # Wake up after 1h + connection.close() + + elif idle > 20*60 and connection.last_send_time < time.time()-10: # Idle more than 20 min and we not send request in last 10 sec + if connection.protocol == "?": connection.close() # Got no handshake response, close it + else: + if not connection.ping(): # send ping request + connection.close() + + elif idle > 10 and connection.incomplete_buff_recv > 0: # Incompelte data with more than 10 sec idle + connection.log.debug("[Cleanup] Connection buff stalled, content: %s" % connection.u.read_bytes(1024)) + connection.close() + + elif idle > 10 and connection.waiting_requests and time.time() - connection.last_send_time > 10: # Sent command and no response in 10 sec + connection.log.debug("[Cleanup] Command %s timeout: %s" % (connection.last_cmd, time.time() - connection.last_send_time)) + connection.close() + + + def zmqServer(self): self.log.debug("Starting ZeroMQ on: tcp://127.0.0.1:%s..." % self.zmq_port) try: diff --git a/src/Content/ContentManager.py b/src/Content/ContentManager.py index 7286eacb..b4332a34 100644 --- a/src/Content/ContentManager.py +++ b/src/Content/ContentManager.py @@ -199,6 +199,10 @@ class ContentManager: oldsign_content = json.dumps(new_content, sort_keys=True) new_content["sign"] = CryptBitcoin.signOld(oldsign_content, privatekey) + if not self.validContent(inner_path, new_content): + self.log.error("Sign failed: Invalid content") + return False + if filewrite: self.log.info("Saving to %s..." % inner_path) json.dump(new_content, open(self.site.getPath(inner_path), "w"), indent=2, sort_keys=True) @@ -260,7 +264,7 @@ class ContentManager: # Check include size limit if include_info.get("max_size"): # Include size limit if content_size > include_info["max_size"]: - self.log.error("%s: Include too large %s > %s" % (inner_path, total_size, include_info["max_size"])) + self.log.error("%s: Include too large %s > %s" % (inner_path, content_size, include_info["max_size"])) return False # Check if content includes allowed diff --git a/src/Debug/DebugHook.py b/src/Debug/DebugHook.py index 3fdfa918..9c6d3b64 100644 --- a/src/Debug/DebugHook.py +++ b/src/Debug/DebugHook.py @@ -24,7 +24,7 @@ def handleErrorNotify(*args): OriginalGreenlet = gevent.Greenlet class ErrorhookedGreenlet(OriginalGreenlet): def _report_error(self, exc_info): - handleError(exc_info[0], exc_info[1], exc_info[2]) + sys.excepthook(exc_info[0], exc_info[1], exc_info[2]) if config.debug: sys.excepthook = handleError diff --git a/src/Debug/DebugMedia.py b/src/Debug/DebugMedia.py index 69043bc9..d02891e8 100644 --- a/src/Debug/DebugMedia.py +++ b/src/Debug/DebugMedia.py @@ -7,7 +7,7 @@ def findfiles(path, find_ext): for file in sorted(files): file_path = root+"/"+file file_ext = file.split(".")[-1] - if file_ext in find_ext and not file.startswith("all."): yield file_path + if file_ext in find_ext and not file.startswith("all."): yield file_path.replace("\\", "/") # Generates: all.js: merge *.js, compile coffeescript, all.css: merge *.css, vendor prefix features @@ -23,31 +23,45 @@ def merge(merged_path): # If exits check the other files modification date if os.path.isfile(merged_path): merged_mtime = os.path.getmtime(merged_path) - changed = False - for file_path in findfiles(merge_dir, find_ext): - if os.path.getmtime(file_path) > merged_mtime: - changed = True - break - if not changed: return # Assets not changed, nothing to do + else: + merged_mtime = 0 + + + changed = {} + for file_path in findfiles(merge_dir, find_ext): + if os.path.getmtime(file_path) > merged_mtime: + changed[file_path] = True + if not changed: return # Assets not changed, nothing to do + + if os.path.isfile(merged_path): # Find old parts to avoid unncessary recompile + merged_old = open(merged_path, "rb").read() + old_parts = {} + for match in re.findall("(/\* ---- (.*?) ---- \*/(.*?)(?=/\* ----|$))", merged_old, re.DOTALL): + old_parts[match[1]] = match[2].strip("\n\r") # Merge files parts = [] + s_total = time.time() for file_path in findfiles(merge_dir, find_ext): - parts.append("\n\n/* ---- %s ---- */\n\n" % file_path.replace("\\", "/")) + parts.append("\n\n/* ---- %s ---- */\n\n" % file_path) if file_path.endswith(".coffee"): # Compile coffee script - if not config.coffeescript_compiler: - logging.error("No coffeescript compiler definied, skipping compiling %s" % merged_path) - return False # No coffeescript compiler, skip this file - command = config.coffeescript_compiler % file_path.replace("/", "\\") - s = time.time() - compiler = subprocess.Popen(command, shell=True, stderr=subprocess.STDOUT, stdout=subprocess.PIPE) - logging.debug("Running: %s (Done in %.2fs)" % (command, time.time()-s)) - out = compiler.stdout.read() - if out and out.startswith("("): - parts.append(out) - else: - error = out - parts.append("alert('%s compile error: %s');" % (file_path, re.escape(error).replace("\n", "\\n").replace(r"\\n", r"\n") ) ) + if file_path in changed or file_path not in old_parts: # Only recompile if changed or its not compiled before + if not config.coffeescript_compiler: + logging.error("No coffeescript compiler definied, skipping compiling %s" % merged_path) + return False # No coffeescript compiler, skip this file + command = config.coffeescript_compiler % os.path.join(*file_path.split("/")) # Fix os path separator + s = time.time() + compiler = subprocess.Popen(command, shell=True, stderr=subprocess.STDOUT, stdout=subprocess.PIPE) + out = compiler.stdout.read() + logging.debug("Running: %s (Done in %.2fs)" % (command, time.time()-s)) + if out and out.startswith("("): + parts.append(out) + else: + error = out + logging.error("%s Compile error %s:" % (file_path, error)) + parts.append("alert('%s compile error: %s');" % (file_path, re.escape(error).replace("\n", "\\n").replace(r"\\n", r"\n") ) ) + else: # Not changed use the old_part + parts.append(old_parts[file_path]) else: # Add to parts parts.append(open(file_path).read()) @@ -57,4 +71,11 @@ def merge(merged_path): merged = cssvendor.prefix(merged) merged = merged.replace("\r", "") open(merged_path, "wb").write(merged) - logging.debug("Merged %s (%.2fs)" % (merged_path, time.time()-s)) + logging.debug("Merged %s (%.2fs)" % (merged_path, time.time()-s_total)) + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.DEBUG) + os.chdir("..") + config.coffeescript_compiler = r'type "%s" | tools\coffee-node\bin\node.exe tools\coffee-node\bin\coffee --no-header -s -p' + merge("data/1TaLk3zM7ZRskJvrh3ZNCDVGXvkJusPKQ/js/all.js") \ No newline at end of file diff --git a/src/Peer/Peer.py b/src/Peer/Peer.py index deed0dc1..bf10152b 100644 --- a/src/Peer/Peer.py +++ b/src/Peer/Peer.py @@ -34,6 +34,7 @@ class Peer: self.connection.close() else: self.log.debug("Getting connection...") + self.connection = None try: @@ -59,8 +60,8 @@ class Peer: self.connect() if not self.connection: return None # Connection failed - if cmd != "ping" and self.last_response and time.time() - self.last_response > 20*60: # If last response if older than 20 minute, ping first to see if still alive - if not self.ping(): return None + #if cmd != "ping" and self.last_response and time.time() - self.last_response > 20*60: # If last response if older than 20 minute, ping first to see if still alive + # if not self.ping(): return None for retry in range(1,3): # Retry 3 times #if config.debug_socket: self.log.debug("sendCmd: %s %s" % (cmd, params.get("inner_path"))) @@ -145,7 +146,7 @@ class Peer: # On connection error def onConnectionError(self): self.connection_error += 1 - if self.connection_error >= 5: # Dead peer + if self.connection_error >= 3: # Dead peer self.remove() diff --git a/src/Ui/UiRequest.py b/src/Ui/UiRequest.py index 70631c17..97ab7fd8 100644 --- a/src/Ui/UiRequest.py +++ b/src/Ui/UiRequest.py @@ -293,9 +293,8 @@ class UiRequest: def actionStats(self): import gc, sys - from greenlet import greenlet - greenlets = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)] self.sendHeader() + s = time.time() main = sys.modules["src.main"] yield """ """ + # Memory + try: + import psutil + process = psutil.Process(os.getpid()) + mem = process.get_memory_info()[0] / float(2 ** 20) + yield "Memory usage: %.2fMB | " % mem + yield "Threads: %s | " % len(process.threads()) + yield "CPU: usr %.2fs sys %.2fs | " % process.cpu_times() + yield "Open files: %s | " % len(process.open_files()) + yield "Sockets: %s" % len(process.connections()) + yield "
" + except Exception, err: + pass + yield "Connections (%s):
" % len(main.file_server.connections) - yield "" + yield "
id protocol ip zmqs ping buff idle delay sent received
" + yield "" for connection in main.file_server.connections: yield self.formatTableRow([ ("%3d", connection.id), ("%s", connection.protocol), + ("%s", connection.type), ("%s", connection.ip), - ("%s", bool(connection.zmq_sock)), ("%6.3f", connection.last_ping_delay), ("%s", connection.incomplete_buff_recv), ("since", max(connection.last_send_time, connection.last_recv_time)), + ("since", connection.start_time), ("%.3f", connection.last_sent_time-connection.last_send_time), ("%.0fkB", connection.bytes_sent/1024), - ("%.0fkB", connection.bytes_recv/1024) + ("%.0fkB", connection.bytes_recv/1024), + ("%s", connection.last_cmd), + ("%s", connection.waiting_requests.keys()), + ("%s", connection.handshake.get("version")), + ("%s", connection.handshake.get("peer_id")), ]) yield "
id protocol type ip ping buffidle open delay sent received last sent waiting version peerid
" - yield "Greenlets (%s):
" % len(greenlets) - for thread in greenlets: - yield " - %s
" % cgi.escape(repr(thread)) + from greenlet import greenlet + objs = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)] + yield "
Greenlets (%s):
" % len(objs) + for obj in objs: + yield " - %sbyte: %s
" % (sys.getsizeof(obj), cgi.escape(repr(obj))) + + + from Worker import Worker + objs = [obj for obj in gc.get_objects() if isinstance(obj, Worker)] + yield "
Workers (%s):
" % len(objs) + for obj in objs: + yield " - %sbyte: %s
" % (sys.getsizeof(obj), cgi.escape(repr(obj))) + + + from Connection import Connection + objs = [obj for obj in gc.get_objects() if isinstance(obj, Connection)] + yield "
Connections (%s):
" % len(objs) + for obj in objs: + yield " - %sbyte: %s
" % (sys.getsizeof(obj), cgi.escape(repr(obj))) + + + objs = [obj for obj in gc.get_objects() if isinstance(obj, self.server.log.__class__)] + yield "
Loggers (%s):
" % len(objs) + for obj in objs: + yield " - %sbyte: %s
" % (sys.getsizeof(obj), cgi.escape(repr(obj.name))) + + + objs = [obj for obj in gc.get_objects() if isinstance(obj, UiRequest)] + yield "
UiRequest (%s):
" % len(objs) + for obj in objs: + yield " - %sbyte: %s
" % (sys.getsizeof(obj), cgi.escape(repr(obj))) + + yield "Done in %.3f" % (time.time()-s) # - Tests - diff --git a/src/Ui/UiWebsocket.py b/src/Ui/UiWebsocket.py index 10234770..97d206cc 100644 --- a/src/Ui/UiWebsocket.py +++ b/src/Ui/UiWebsocket.py @@ -282,8 +282,10 @@ class UiWebsocket: # Find data in json files def actionFileQuery(self, to, dir_inner_path, query): + # s = time.time() dir_path = self.site.getPath(dir_inner_path) rows = list(QueryJson.query(dir_path, query)) + # self.log.debug("FileQuery %s %s done in %s" % (dir_inner_path, query, time.time()-s)) return self.response(to, rows) diff --git a/src/Ui/media/Loading.coffee b/src/Ui/media/Loading.coffee index 8a0bf7cd..86fbda3d 100644 --- a/src/Ui/media/Loading.coffee +++ b/src/Ui/media/Loading.coffee @@ -4,12 +4,10 @@ class Loading setProgress: (percent) -> - console.log "Progress:", percent $(".progressbar").css("width", percent*100+"%").css("opacity", "1").css("display", "block") hideProgress: -> $(".progressbar").css("width", "100%").css("opacity", "0").cssLater("display", "none", 1000) - console.log "Hideprogress" showScreen: -> diff --git a/src/Ui/media/Wrapper.coffee b/src/Ui/media/Wrapper.coffee index f825c49d..04571af5 100644 --- a/src/Ui/media/Wrapper.coffee +++ b/src/Ui/media/Wrapper.coffee @@ -59,7 +59,6 @@ class Wrapper cmd = message.cmd if cmd == "innerReady" @inner_ready = true - @log "innerReady", @ws.ws.readyState, @wrapperWsInited if @ws.ws.readyState == 1 and not @wrapperWsInited # If ws already opened @sendInner {"cmd": "wrapperOpenedWebsocket"} @wrapperWsInited = true @@ -148,7 +147,6 @@ class Wrapper onOpenWebsocket: (e) => @ws.cmd "channelJoin", {"channel": "siteChanged"} # Get info on modifications - @log "onOpenWebsocket", @inner_ready, @wrapperWsInited if not @wrapperWsInited and @inner_ready @sendInner {"cmd": "wrapperOpenedWebsocket"} # Send to inner frame @wrapperWsInited = true @@ -178,7 +176,6 @@ class Wrapper # Iframe loaded onLoad: (e) => - @log "onLoad" @inner_loaded = true if not @inner_ready then @sendInner {"cmd": "wrapperReady"} # Inner frame loaded before wrapper #if not @site_error then @loading.hideScreen() # Hide loading screen diff --git a/src/Ui/media/all.js b/src/Ui/media/all.js index 63637942..0ff4f45d 100644 --- a/src/Ui/media/all.js +++ b/src/Ui/media/all.js @@ -472,13 +472,11 @@ jQuery.extend( jQuery.easing, } Loading.prototype.setProgress = function(percent) { - console.log("Progress:", percent); return $(".progressbar").css("width", percent * 100 + "%").css("opacity", "1").css("display", "block"); }; Loading.prototype.hideProgress = function() { - $(".progressbar").css("width", "100%").css("opacity", "0").cssLater("display", "none", 1000); - return console.log("Hideprogress"); + return $(".progressbar").css("width", "100%").css("opacity", "0").cssLater("display", "none", 1000); }; Loading.prototype.showScreen = function() { @@ -807,7 +805,6 @@ jQuery.extend( jQuery.easing, cmd = message.cmd; if (cmd === "innerReady") { this.inner_ready = true; - this.log("innerReady", this.ws.ws.readyState, this.wrapperWsInited); if (this.ws.ws.readyState === 1 && !this.wrapperWsInited) { this.sendInner({ "cmd": "wrapperOpenedWebsocket" @@ -933,7 +930,6 @@ jQuery.extend( jQuery.easing, this.ws.cmd("channelJoin", { "channel": "siteChanged" }); - this.log("onOpenWebsocket", this.inner_ready, this.wrapperWsInited); if (!this.wrapperWsInited && this.inner_ready) { this.sendInner({ "cmd": "wrapperOpenedWebsocket" @@ -974,7 +970,6 @@ jQuery.extend( jQuery.easing, Wrapper.prototype.onLoad = function(e) { var _ref; - this.log("onLoad"); this.inner_loaded = true; if (!this.inner_ready) { this.sendInner({ diff --git a/src/Worker/Worker.py b/src/Worker/Worker.py index 325ebb48..da4274aa 100644 --- a/src/Worker/Worker.py +++ b/src/Worker/Worker.py @@ -12,6 +12,14 @@ class Worker: self.thread = None + def __str__(self): + return "Worker %s %s" % (self.manager.site.address_short, self.key) + + + def __repr__(self): + return "<%s>" % self.__str__() + + # Downloader thread def downloader(self): self.peer.hash_failed = 0 # Reset hash error counter @@ -34,7 +42,7 @@ class Worker: buff = self.peer.getFile(task["site"].address, task["inner_path"]) if self.running == False: # Worker no longer needed or got killed self.manager.log.debug("%s: No longer needed, returning: %s" % (self.key, task["inner_path"])) - return None + break if buff: # Download ok correct = task["site"].content_manager.verifyFile(task["inner_path"], buff) else: # Download error @@ -78,4 +86,5 @@ class Worker: self.running = False if self.thread: self.thread.kill(exception=Debug.Notify("Worker stopped")) + del self.thread self.manager.removeWorker(self) diff --git a/src/Worker/WorkerManager.py b/src/Worker/WorkerManager.py index 12543d19..6f991ed4 100644 --- a/src/Worker/WorkerManager.py +++ b/src/Worker/WorkerManager.py @@ -8,16 +8,26 @@ class WorkerManager: def __init__(self, site): self.site = site self.workers = {} # Key: ip:port, Value: Worker.Worker - self.tasks = [] # {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "time_started": None, "time_added": time.time(), "peers": peers, "priority": 0} + self.tasks = [] # {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "time_started": None, "time_added": time.time(), "peers": peers, "priority": 0, "failed": peer_ids} self.started_task_num = 0 # Last added task num self.running = True self.log = logging.getLogger("WorkerManager:%s" % self.site.address_short) self.process_taskchecker = gevent.spawn(self.checkTasks) + def __str__(self): + return "WorkerManager %s" % self.site.address_short + + + def __repr__(self): + return "<%s>" % self.__str__() + + + # Check expired tasks def checkTasks(self): while self.running: + tasks = task = worker = workers = None # Cleanup local variables time.sleep(15) # Check every 15 sec # Clean up workers @@ -25,6 +35,7 @@ class WorkerManager: if worker.task and worker.task["done"]: worker.stop() # Stop workers with task done if not self.tasks: continue + tasks = self.tasks[:] # Copy it so removing elements wont cause any problem for task in tasks: if (task["time_started"] and time.time() >= task["time_started"]+60) or (time.time() >= task["time_added"]+60 and not self.workers): # Task taking too long time, or no peer after 60sec kill it @@ -44,6 +55,8 @@ class WorkerManager: task["peers"] = [] self.startWorkers() break # One reannounce per loop + + self.log.debug("checkTasks stopped running")