From 44d5ac784dbfe51f62253dc14c1f9344452c1beb Mon Sep 17 00:00:00 2001 From: HelloZeroNet Date: Mon, 13 Apr 2015 23:08:57 +0200 Subject: [PATCH] Trackerless peer exchange between peers, fix event once bug --- plugins/Stats/StatsPlugin.py | 5 ++--- src/File/FileRequest.py | 33 +++++++++++++++++++++++++++++++-- src/Peer/Peer.py | 30 +++++++++++++++++++++++++++++- src/Site/Site.py | 30 +++++++++++++++++++++++++++++- src/util/Event.py | 2 +- 5 files changed, 92 insertions(+), 8 deletions(-) diff --git a/plugins/Stats/StatsPlugin.py b/plugins/Stats/StatsPlugin.py index 9aee507d..11f2758e 100644 --- a/plugins/Stats/StatsPlugin.py +++ b/plugins/Stats/StatsPlugin.py @@ -97,13 +97,12 @@ class UiRequestPlugin(object): # Sites yield "

Sites:" yield "" - yield "" + yield "" for site in self.server.sites.values(): yield self.formatTableRow([ ("%s", site.address), - ("%s", len(site.peers)), - ("%s/%s", ( len([peer for peer in site.peers.values() if peer.connection and peer.connection.connected]), len(site.peers) ) ), ("%s", [peer.connection.id for peer in site.peers.values() if peer.connection and peer.connection.connected]), + ("%s/%s", ( len([peer for peer in site.peers.values() if peer.connection and peer.connection.connected]), len(site.peers) ) ), ("%s", len(site.content_manager.contents)), ]) yield "
address peers connected content.json
address connected peers content.json
" diff --git a/src/File/FileRequest.py b/src/File/FileRequest.py index 8d4fdacf..a6224df7 100644 --- a/src/File/FileRequest.py +++ b/src/File/FileRequest.py @@ -1,4 +1,4 @@ -import os, msgpack, shutil, gevent +import os, msgpack, shutil, gevent, socket, struct, random from cStringIO import StringIO from Debug import Debug from Config import config @@ -16,6 +16,10 @@ class FileRequest: self.log = server.log + def unpackAddress(self, packed): + return (socket.inet_ntoa(packed[0:4]), struct.unpack_from("H", packed, 4)[0]) + + def send(self, msg): self.connection.send(msg) @@ -35,6 +39,8 @@ class FileRequest: self.actionGetFile(params) elif cmd == "update": self.actionUpdate(params) + elif cmd == "pex": + self.actionPex(params) elif cmd == "ping": self.actionPing() else: @@ -104,13 +110,36 @@ class FileRequest: if config.debug_socket: self.log.debug("File %s sent" % file_path) # Add peer to site if not added before - # site.addPeer(self.connection.ip, self.connection.port) + site.addPeer(self.connection.ip, self.connection.port) except Exception, err: self.log.debug("GetFile read error: %s" % Debug.formatException(err)) self.response({"error": "File read error: %s" % Debug.formatException(err)}) return False + # Peer exchange request + def actionPex(self, params): + site = self.sites.get(params["site"]) + if not site or not site.settings["serving"]: # Site unknown or not serving + self.response({"error": "Unknown site"}) + return False + + got_peer_keys = [] + added = 0 + site.addPeer(self.connection.ip, self.connection.port) # Add requester peer to site + for peer in params["peers"]: # Add sent peers to site + address = self.unpackAddress(peer) + got_peer_keys.append("%s:%s" % address) + if (site.addPeer(*address)): added += 1 + # Send back peers that is not in the sent list + peers = site.peers.values() + random.shuffle(peers) + packed_peers = [peer.packAddress() for peer in peers if peer.key not in got_peer_keys][0:params["need"]] + if added: + self.log.debug("Added %s peers to %s using PEX, sending back %s" % (added, site, len(packed_peers))) + self.response({"peers": packed_peers}) + + # Send a simple Pong! answer def actionPing(self): self.response("Pong!") diff --git a/src/Peer/Peer.py b/src/Peer/Peer.py index eb7ec3a5..755eddbc 100644 --- a/src/Peer/Peer.py +++ b/src/Peer/Peer.py @@ -1,4 +1,4 @@ -import os, logging, gevent, time, msgpack, sys +import os, logging, gevent, time, msgpack, sys, random, socket, struct from cStringIO import StringIO from Config import config from Debug import Debug @@ -49,6 +49,16 @@ class Peer: def __repr__(self): return "<%s>" % self.__str__() + + # Peer ip:port to packed 6byte format + def packAddress(self): + return socket.inet_aton(self.ip)+struct.pack("H", self.port) + + + def unpackAddress(self, packed): + return (socket.inet_ntoa(packed[0:4]), struct.unpack_from("H", packed, 4)[0]) + + # Found a peer on tracker def found(self): self.last_found = time.time() @@ -135,6 +145,24 @@ class Peer: return response_time + # Request peer exchange from peer + def pex(self, site=None, need_num=5): + if not site: site = self.site # If no site definied request peers for this site + peers = self.site.peers.values() + random.shuffle(peers) + packed_peers = [peer.packAddress() for peer in peers][0:need_num] + response = self.request("pex", {"site": site.address, "peers": packed_peers, "need": need_num}) + if not response or "error" in response: + return False + added = 0 + for peer in response.get("peers", []): + address = self.unpackAddress(peer) + if (site.addPeer(*address)): added += 1 + if added: + self.log.debug("Added peers using PEX: %s" % added) + return added + + # Stop and remove from site def remove(self): self.log.debug("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed)) diff --git a/src/Site/Site.py b/src/Site/Site.py index 62cf7391..567d2528 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -272,7 +272,7 @@ class Site: if not ip: return False key = "%s:%s" % (ip, port) if key in self.peers: # Already has this ip - self.peers[key].found() + #self.peers[key].found() if return_peer: # Always return peer return self.peers[key] else: @@ -283,6 +283,28 @@ class Site: return peer + # Gather peer from connected peers + @util.Noparallel(blocking=False) + def announcePex(self, query_num=3, need_num=5): + peers = [peer for peer in self.peers.values() if peer.connection and peer.connection.connected] # Connected peers + if len(peers) == 0: # Small number of connected peers for this site, connect to any + peers = self.peers.values() + need_num = 10 + + random.shuffle(peers) + done = 0 + added = 0 + for peer in peers: + res = peer.pex(need_num=need_num) + if res != False: + done += 1 + added += res + if added: + self.worker_manager.onPeers() + self.updateWebsocket(peers_added=added) + if done == query_num: break + + # Add myself and get other peers from tracker def announce(self, force=False): if time.time() < self.last_announce+60 and not force: return # No reannouncing within 60 secs @@ -364,6 +386,12 @@ class Site: else: self.log.error("Announced to %s trackers in %.3fs, failed" % (announced, time.time()-s)) + if not [peer for peer in self.peers.values() if peer.connection and peer.connection.connected]: # If no connected peer yet then wait for connections + gevent.spawn_later(3, self.announcePex, need_num=10) # Spawn 3 secs later + # self.onFileDone.once(lambda inner_path: self.announcePex(need_num=10), "announcePex_%s" % self.address) # After first file downloaded try to find more peers using pex + else: # Else announce immediately + self.announcePex() + # Need open connections def needConnections(self): diff --git a/src/util/Event.py b/src/util/Event.py index ae7d0a53..850e920d 100644 --- a/src/util/Event.py +++ b/src/util/Event.py @@ -16,7 +16,7 @@ class Event(list): func.once = True func.name = None if name: # Dont function with same name twice - names = [f.name for f in self] + names = [f.name for f in self if "once" in dir(f)] if name not in names: func.name = name self.append(func)