import random import time import hashlib import urllib.request import struct import socket import re import collections import bencode from lib.subtl.subtl import UdpTrackerClient import socks import sockshandler import gevent from Plugin import PluginManager from Config import config from Debug import Debug from util import helper import util class AnnounceError(Exception): pass global_stats = collections.defaultdict(lambda: collections.defaultdict(int)) @PluginManager.acceptPlugins class SiteAnnouncer(object): def __init__(self, site): self.site = site self.stats = {} self.fileserver_port = config.fileserver_port self.peer_id = self.site.connection_server.peer_id self.last_tracker_id = random.randint(0, 10) self.time_last_announce = 0 def getTrackers(self): return config.trackers def getSupportedTrackers(self): trackers = self.getTrackers() if config.disable_udp or config.trackers_proxy != "disable": trackers = [tracker for tracker in trackers if not tracker.startswith("udp://")] if not self.site.connection_server.tor_manager.enabled: trackers = [tracker for tracker in trackers if ".onion" not in tracker] if "ipv6" not in self.site.connection_server.supported_ip_types: trackers = [tracker for tracker in trackers if helper.getIpType(self.getAddressParts(tracker)["ip"]) != "ipv6"] return trackers def getAnnouncingTrackers(self, mode): trackers = self.getSupportedTrackers() if trackers and (mode == "update" or mode == "more"): # Only announce on one tracker, increment the queried tracker id self.last_tracker_id += 1 self.last_tracker_id = self.last_tracker_id % len(trackers) trackers_announcing = [trackers[self.last_tracker_id]] # We only going to use this one else: trackers_announcing = trackers return trackers_announcing def getOpenedServiceTypes(self): back = [] # Type of addresses they can reach me if config.trackers_proxy == "disable": for ip_type, opened in list(self.site.connection_server.port_opened.items()): if opened: back.append(ip_type) if self.site.connection_server.tor_manager.start_onions: back.append("onion") return back @util.Noparallel(blocking=False) def announce(self, force=False, mode="start", pex=True): if time.time() - self.time_last_announce < 30 and not force: return # No reannouncing within 30 secs if force: self.site.log.debug("Force reannounce in mode %s" % mode) self.fileserver_port = config.fileserver_port self.time_last_announce = time.time() trackers = self.getAnnouncingTrackers(mode) if config.verbose: self.site.log.debug("Tracker announcing, trackers: %s" % trackers) errors = [] slow = [] s = time.time() threads = [] num_announced = 0 for tracker in trackers: # Start announce threads tracker_stats = global_stats[tracker] # Reduce the announce time for trackers that looks unreliable if tracker_stats["num_error"] > 5 and tracker_stats["time_request"] > time.time() - 60 * min(30, tracker_stats["num_error"]): if config.verbose: self.site.log.debug("Tracker %s looks unreliable, announce skipped (error: %s)" % (tracker, tracker_stats["num_error"])) continue thread = gevent.spawn(self.announceTracker, tracker, mode=mode) threads.append(thread) thread.tracker = tracker time.sleep(0.01) self.updateWebsocket(trackers="announcing") gevent.joinall(threads, timeout=20) # Wait for announce finish for thread in threads: if thread.value is None: continue if thread.value is not False: if thread.value > 1.0: # Takes more than 1 second to announce slow.append("%.2fs %s" % (thread.value, thread.tracker)) num_announced += 1 else: if thread.ready(): errors.append(thread.tracker) else: # Still running slow.append("30s+ %s" % thread.tracker) # Save peers num self.site.settings["peers"] = len(self.site.peers) if len(errors) < len(threads): # At least one tracker finished if len(trackers) == 1: announced_to = trackers[0] else: announced_to = "%s/%s trackers" % (num_announced, len(threads)) if mode != "update" or config.verbose: self.site.log.debug( "Announced in mode %s to %s in %.3fs, errors: %s, slow: %s" % (mode, announced_to, time.time() - s, errors, slow) ) else: if len(threads) > 1: self.site.log.error("Announce to %s trackers in %.3fs, failed" % (len(threads), time.time() - s)) if len(threads) == 1 and mode != "start": # Move to next tracker self.site.log.debug("Tracker failed, skipping to next one...") gevent.spawn_later(1.0, self.announce, force=force, mode=mode, pex=pex) self.updateWebsocket(trackers="announced") if pex: self.updateWebsocket(pex="announcing") if mode == "more": # Need more peers self.announcePex(need_num=10) else: self.announcePex() self.updateWebsocket(pex="announced") def getTrackerHandler(self, protocol): if protocol == "udp": handler = self.announceTrackerUdp elif protocol == "http": handler = self.announceTrackerHttp elif protocol == "https": handler = self.announceTrackerHttps else: handler = None return handler def getAddressParts(self, tracker): if "://" not in tracker or not re.match("^[A-Za-z0-9:/\\.#-]+$", tracker): return None protocol, address = tracker.split("://", 1) if ":" in address: ip, port = address.rsplit(":", 1) else: ip = address if protocol.startswith("https"): port = 443 else: port = 80 back = {} back["protocol"] = protocol back["address"] = address back["ip"] = ip back["port"] = port return back def announceTracker(self, tracker, mode="start", num_want=10): s = time.time() address_parts = self.getAddressParts(tracker) if not address_parts: self.site.log.warning("Tracker %s error: Invalid address" % tracker) return False if tracker not in self.stats: self.stats[tracker] = {"status": "", "num_request": 0, "num_success": 0, "num_error": 0, "time_request": 0, "time_last_error": 0} last_status = self.stats[tracker]["status"] self.stats[tracker]["status"] = "announcing" self.stats[tracker]["time_request"] = time.time() global_stats[tracker]["time_request"] = time.time() if config.verbose: self.site.log.debug("Tracker announcing to %s (mode: %s)" % (tracker, mode)) if mode == "update": num_want = 10 else: num_want = 30 handler = self.getTrackerHandler(address_parts["protocol"]) error = None try: if handler: peers = handler(address_parts["address"], mode=mode, num_want=num_want) else: raise AnnounceError("Unknown protocol: %s" % address_parts["protocol"]) except Exception as err: self.site.log.warning("Tracker %s announce failed: %s in mode %s" % (tracker, Debug.formatException(err), mode)) error = err if error: self.stats[tracker]["status"] = "error" self.stats[tracker]["time_status"] = time.time() self.stats[tracker]["last_error"] = str(error) self.stats[tracker]["time_last_error"] = time.time() self.stats[tracker]["num_error"] += 1 self.stats[tracker]["num_request"] += 1 global_stats[tracker]["num_request"] += 1 global_stats[tracker]["num_error"] += 1 self.updateWebsocket(tracker="error") return False if peers is None: # Announce skipped self.stats[tracker]["time_status"] = time.time() self.stats[tracker]["status"] = last_status return None self.stats[tracker]["status"] = "announced" self.stats[tracker]["time_status"] = time.time() self.stats[tracker]["num_success"] += 1 self.stats[tracker]["num_request"] += 1 global_stats[tracker]["num_request"] += 1 global_stats[tracker]["num_error"] = 0 if peers is True: # Announce success, but no peers returned return time.time() - s # Adding peers added = 0 for peer in peers: if peer["port"] == 1: # Some trackers does not accept port 0, so we send port 1 as not-connectable peer["port"] = 0 if not peer["port"]: continue # Dont add peers with port 0 if self.site.addPeer(peer["addr"], peer["port"], source="tracker"): added += 1 if added: self.site.worker_manager.onPeers() self.site.updateWebsocket(peers_added=added) if config.verbose: self.site.log.debug( "Tracker result: %s://%s (found %s peers, new: %s, total: %s)" % (address_parts["protocol"], address_parts["address"], len(peers), added, len(self.site.peers)) ) return time.time() - s def announceTrackerUdp(self, tracker_address, mode="start", num_want=10): s = time.time() if config.disable_udp: raise AnnounceError("Udp disabled by config") if config.trackers_proxy != "disable": raise AnnounceError("Udp trackers not available with proxies") ip, port = tracker_address.split("/")[0].split(":") tracker = UdpTrackerClient(ip, int(port)) if helper.getIpType(ip) in self.getOpenedServiceTypes(): tracker.peer_port = self.fileserver_port else: tracker.peer_port = 0 tracker.connect() if not tracker.poll_once(): raise AnnounceError("Could not connect") tracker.announce(info_hash=self.site.address_sha1, num_want=num_want, left=431102370) back = tracker.poll_once() if not back: raise AnnounceError("No response after %.0fs" % (time.time() - s)) elif type(back) is dict and "response" in back: peers = back["response"]["peers"] else: raise AnnounceError("Invalid response: %r" % back) return peers def httpRequest(self, url): headers = { 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.3', 'Accept-Encoding': 'none', 'Accept-Language': 'en-US,en;q=0.8', 'Connection': 'keep-alive' } req = urllib.request.Request(url, headers=headers) if config.trackers_proxy == "tor": tor_manager = self.site.connection_server.tor_manager handler = sockshandler.SocksiPyHandler(socks.SOCKS5, tor_manager.proxy_ip, tor_manager.proxy_port) opener = urllib.request.build_opener(handler) return opener.open(req, timeout=50) elif config.trackers_proxy == "disable": return urllib.request.urlopen(req, timeout=25) else: proxy_ip, proxy_port = config.trackers_proxy.split(":") handler = sockshandler.SocksiPyHandler(socks.SOCKS5, proxy_ip, int(proxy_port)) opener = urllib.request.build_opener(handler) return opener.open(req, timeout=50) def announceTrackerHttps(self, *args, **kwargs): kwargs["protocol"] = "https" return self.announceTrackerHttp(*args, **kwargs) def announceTrackerHttp(self, tracker_address, mode="start", num_want=10, protocol="http"): tracker_ip, tracker_port = tracker_address.rsplit(":", 1) if helper.getIpType(tracker_ip) in self.getOpenedServiceTypes(): port = self.fileserver_port else: port = 1 params = { 'info_hash': self.site.address_sha1, 'peer_id': self.peer_id, 'port': port, 'uploaded': 0, 'downloaded': 0, 'left': 431102370, 'compact': 1, 'numwant': num_want, 'event': 'started' } url = protocol + "://" + tracker_address + "?" + urllib.parse.urlencode(params) s = time.time() response = None # Load url if config.tor == "always" or config.trackers_proxy != "disable": timeout = 60 else: timeout = 30 with gevent.Timeout(timeout, False): # Make sure of timeout req = self.httpRequest(url) response = req.read() req.close() req = None if not response: raise AnnounceError("No response after %.0fs" % (time.time() - s)) # Decode peers try: peer_data = bencode.decode(response)["peers"] if type(peer_data) is not bytes: peer_data = peer_data.encode() response = None peer_count = int(len(peer_data) / 6) peers = [] for peer_offset in range(peer_count): off = 6 * peer_offset peer = peer_data[off:off + 6] addr, port = struct.unpack('!LH', peer) peers.append({"addr": socket.inet_ntoa(struct.pack('!L', addr)), "port": port}) except Exception as err: raise AnnounceError("Invalid response: %r (%s)" % (response, Debug.formatException(err))) return peers @util.Noparallel(blocking=False) def announcePex(self, query_num=2, need_num=5): peers = self.site.getConnectedPeers() if len(peers) == 0: # Wait 3s for connections time.sleep(3) peers = self.site.getConnectedPeers() if len(peers) == 0: # Small number of connected peers for this site, connect to any peers = list(self.site.peers.values()) need_num = 10 random.shuffle(peers) done = 0 total_added = 0 for peer in peers: num_added = peer.pex(need_num=need_num) if num_added is not False: done += 1 total_added += num_added if num_added: self.site.worker_manager.onPeers() self.site.updateWebsocket(peers_added=num_added) if done == query_num: break self.site.log.debug("Pex result: from %s peers got %s new peers." % (done, total_added)) def updateWebsocket(self, **kwargs): if kwargs: param = {"event": list(kwargs.items())[0]} else: param = None for ws in self.site.websockets: ws.event("announcerChanged", self.site, param)