From 3f974e0bc7b16ba595b20cfa86f1c30c0824c19c Mon Sep 17 00:00:00 2001 From: HelloZeroNet Date: Tue, 13 Jan 2015 21:19:40 +0100 Subject: [PATCH] count peer connection error, remove offline peers, content sign modification time fix, tracker order fix, reset peer hash_failed on download start, avoid util package name conflict --- src/Peer/Peer.py | 37 +++++++++++++++++++++++++++++-------- src/Site/Site.py | 5 ++--- src/Site/SiteManager.py | 4 ++-- src/Worker/Worker.py | 3 ++- src/main.py | 2 +- 5 files changed, 36 insertions(+), 15 deletions(-) diff --git a/src/Peer/Peer.py b/src/Peer/Peer.py index e14aceaf..af530f88 100644 --- a/src/Peer/Peer.py +++ b/src/Peer/Peer.py @@ -7,13 +7,15 @@ context = zmq.Context() # Communicate remote peers class Peer: - def __init__(self, ip, port): + def __init__(self, ip, port, site): self.ip = ip self.port = port + self.site = site self.socket = None self.last_found = None self.added = time.time() + self.connection_error = 0 self.hash_failed = 0 self.download_bytes = 0 self.download_time = 0 @@ -28,11 +30,6 @@ class Peer: self.socket.connect('tcp://%s:%s' % (self.ip, self.port)) - # Done working with peer - def disconnect(self): - pass - - # Found a peer on tracker def found(self): self.last_found = time.time() @@ -45,9 +42,12 @@ class Peer: self.socket.send(msgpack.packb({"cmd": cmd, "params": params}, use_bin_type=True)) response = msgpack.unpackb(self.socket.recv()) if "error" in response: - self.log.error("%s %s error: %s" % (cmd, params, response["error"])) + self.log.debug("%s %s error: %s" % (cmd, params, response["error"])) + else: # Successful request, reset connection error num + self.connection_error = 0 return response except Exception, err: + self.onConnectionError() self.log.error("%s" % err) if config.debug: import traceback @@ -65,7 +65,7 @@ class Peer: s = time.time() while 1: # Read in 512k parts back = self.sendCmd("getFile", {"site": site, "inner_path": inner_path, "location": location}) # Get file content from last location - if "body" not in back: # Error + if not back or "body" not in back: # Error return False buff.write(back["body"]) @@ -82,3 +82,24 @@ class Peer: # Send a ping request def ping(self): return self.sendCmd("ping") + + + # Stop and remove from site + def remove(self): + self.log.debug("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed)) + del(self.site.peers[self.key]) + self.socket.close() + + + # - EVENTS - + + # On connection error + def onConnectionError(self): + self.connection_error += 1 + if self.connection_error > 5: # Dead peer + self.remove() + + + # Done working with peer + def onWorkerDone(self): + pass diff --git a/src/Site/Site.py b/src/Site/Site.py index b98129e7..73c6423b 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -211,7 +211,7 @@ class Site: else: return False else: # New peer - peer = Peer(ip, port) + peer = Peer(ip, port, self) self.peers[key] = peer return peer @@ -403,12 +403,11 @@ class Site: # Generate new content.json self.log.info("Adding timestamp and sha1sums to new content.json...") - import datetime, time content = self.content.copy() # Create a copy of current content.json content["address"] = self.address # Add files sha1 hash content["files"] = hashed_files # Add files sha1 hash - content["modified"] = time.mktime(datetime.datetime.utcnow().utctimetuple()) # Add timestamp + content["modified"] = time.time() # Add timestamp del(content["sign"]) # Delete old site # Signing content diff --git a/src/Site/SiteManager.py b/src/Site/SiteManager.py index 66fae64a..daf5479d 100644 --- a/src/Site/SiteManager.py +++ b/src/Site/SiteManager.py @@ -2,9 +2,9 @@ import json, logging, time, re, os import gevent TRACKERS = [ + ("udp", "sugoi.pomf.se", 2710), # Retry 3 times + ("udp", "sugoi.pomf.se", 2710), ("udp", "sugoi.pomf.se", 2710), - ("udp", "open.demonii.com", 1337), # Retry 3 times - ("udp", "open.demonii.com", 1337), ("udp", "open.demonii.com", 1337), ("udp", "bigfoot1942.sektori.org", 6969), ("udp", "tracker.coppersurfer.tk", 80), diff --git a/src/Worker/Worker.py b/src/Worker/Worker.py index a1cfe737..018f98c7 100644 --- a/src/Worker/Worker.py +++ b/src/Worker/Worker.py @@ -13,6 +13,7 @@ class Worker: # Downloader thread def downloader(self): + self.peer.hash_failed = 0 # Reset hash error counter while self.running: # Try to pickup free file download task task = self.manager.getTask(self.peer) @@ -53,8 +54,8 @@ class Worker: task["workers_num"] -= 1 self.manager.log.error("%s: Hash failed: %s" % (self.key, task["inner_path"])) time.sleep(1) + self.peer.onWorkerDone() self.running = False - self.peer.disconnect() self.manager.removeWorker(self) diff --git a/src/main.py b/src/main.py index 2cd22b60..4b3dab92 100644 --- a/src/main.py +++ b/src/main.py @@ -1,5 +1,5 @@ import os, sys -sys.path.append(os.path.dirname(__file__)) # Imports relative to main.py +sys.path.insert(0, os.path.dirname(__file__)) # Imports relative to main.py # Create necessary files and dirs if not os.path.isdir("log"): os.mkdir("log")