ZeroNet/src/Peer/Peer.py

249 lines
9.0 KiB
Python
Raw Normal View History

import logging
import gevent
import time
import sys
import socket
import struct
from cStringIO import StringIO
from Debug import Debug
from Config import config
if config.use_tempfiles:
import tempfile
# Communicate remote peers
class Peer(object):
__slots__ = (
"ip", "port", "site", "key", "connection", "last_found", "last_response",
"last_ping", "added", "connection_error", "hash_failed", "download_bytes", "download_time"
)
2015-06-18 00:01:56 +02:00
def __init__(self, ip, port, site=None):
self.ip = ip
self.port = port
self.site = site
self.key = "%s:%s" % (ip, port)
self.connection = None
self.last_found = time.time() # Time of last found in the torrent tracker
2015-06-18 00:03:11 +02:00
self.last_response = None # Time of last successful response from peer
self.last_ping = None # Last response time for ping
2015-06-18 00:01:56 +02:00
self.added = time.time()
2015-06-18 00:03:11 +02:00
self.connection_error = 0 # Series of connection error
self.hash_failed = 0 # Number of bad files from peer
self.download_bytes = 0 # Bytes downloaded
self.download_time = 0 # Time spent to download
2015-06-18 00:01:56 +02:00
def log(self, text):
if self.site:
self.site.log.debug("%s:%s %s" % (self.ip, self.port, text))
else:
logging.debug("%s:%s %s" % (self.ip, self.port, text))
# Connect to host
def connect(self, connection=None):
if self.connection:
self.log("Getting connection (Closing %s)..." % self.connection)
self.connection.close()
else:
self.log("Getting connection...")
2015-06-18 00:03:11 +02:00
if connection: # Connection specified
2015-06-18 00:01:56 +02:00
self.connection = connection
else: # Try to find from connection pool or create new connection
self.connection = None
try:
self.connection = self.site.connection_server.getConnection(self.ip, self.port)
2015-06-18 00:01:56 +02:00
except Exception, err:
self.onConnectionError()
self.log("Getting connection error: %s (connection_error: %s, hash_failed: %s)" %
(Debug.formatException(err), self.connection_error, self.hash_failed))
2015-06-18 00:01:56 +02:00
self.connection = None
# Check if we have connection to peer
def findConnection(self):
2015-06-18 00:03:11 +02:00
if self.connection and self.connection.connected: # We have connection to peer
2015-06-18 00:01:56 +02:00
return self.connection
2015-06-18 00:03:11 +02:00
else: # Try to find from other sites connections
self.connection = self.site.connection_server.getConnection(self.ip, self.port, create=False)
2015-06-18 00:01:56 +02:00
return self.connection
def __str__(self):
return "Peer:%-12s" % self.ip
2015-06-18 00:01:56 +02:00
def __repr__(self):
return "<%s>" % self.__str__()
# Peer ip:port to packed 6byte format
def packAddress(self):
return socket.inet_aton(self.ip) + struct.pack("H", self.port)
2015-06-18 00:01:56 +02:00
def unpackAddress(self, packed):
return socket.inet_ntoa(packed[0:4]), struct.unpack_from("H", packed, 4)[0]
# Found a peer on tracker
def found(self):
self.last_found = time.time()
# Send a command to peer
def request(self, cmd, params={}, stream_to=None):
2015-06-18 00:01:56 +02:00
if not self.connection or self.connection.closed:
self.connect()
if not self.connection:
self.onConnectionError()
return None # Connection failed
2015-06-18 00:01:56 +02:00
for retry in range(1, 3): # Retry 3 times
2015-06-18 00:01:56 +02:00
try:
response = self.connection.request(cmd, params, stream_to)
2015-06-18 00:01:56 +02:00
if not response:
raise Exception("Send error")
if "error" in response:
self.log("%s error: %s" % (cmd, response["error"]))
self.onConnectionError()
else: # Successful request, reset connection error num
2015-06-18 00:01:56 +02:00
self.connection_error = 0
self.last_response = time.time()
return response
except Exception, err:
2015-06-18 00:03:11 +02:00
if type(err).__name__ == "Notify": # Greenlet killed by worker
2015-06-18 00:01:56 +02:00
self.log("Peer worker got killed: %s, aborting cmd: %s" % (err.message, cmd))
break
else:
self.onConnectionError()
self.log(
"%s (connection_error: %s, hash_failed: %s, retry: %s)" %
(Debug.formatException(err), self.connection_error, self.hash_failed, retry)
)
time.sleep(1 * retry)
2015-06-18 00:01:56 +02:00
self.connect()
return None # Failed after 4 retry
# Get a file content from peer
def getFile(self, site, inner_path):
# Use streamFile if client supports it
if config.stream_downloads and self.connection and self.connection.handshake and self.connection.handshake["rev"] > 310:
return self.streamFile(site, inner_path)
2015-06-18 00:01:56 +02:00
location = 0
if config.use_tempfiles:
buff = tempfile.SpooledTemporaryFile(max_size=16 * 1024, mode='w+b')
else:
buff = StringIO()
2015-06-18 00:01:56 +02:00
s = time.time()
2015-06-18 00:06:41 +02:00
while True: # Read in 512k parts
back = self.request("getFile", {"site": site, "inner_path": inner_path, "location": location})
2015-06-18 00:06:41 +02:00
if not back or "body" not in back: # Error
2015-06-18 00:01:56 +02:00
return False
buff.write(back["body"])
2015-06-18 00:06:41 +02:00
back["body"] = None # Save memory
if back["location"] == back["size"]: # End of file
2015-06-18 00:01:56 +02:00
break
else:
location = back["location"]
self.download_bytes += back["location"]
self.download_time += (time.time() - s)
self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + back["location"]
buff.seek(0)
return buff
# Download file out of msgpack context to save memory and cpu
def streamFile(self, site, inner_path):
location = 0
if config.use_tempfiles:
buff = tempfile.SpooledTemporaryFile(max_size=16 * 1024, mode='w+b')
else:
buff = StringIO()
s = time.time()
while True: # Read in 512k parts
back = self.request("streamFile", {"site": site, "inner_path": inner_path, "location": location}, stream_to=buff)
if not back: # Error
self.log("Invalid response: %s" % back)
return False
if back["location"] == back["size"]: # End of file
break
else:
location = back["location"]
2015-06-18 00:01:56 +02:00
self.download_bytes += back["location"]
self.download_time += (time.time() - s)
self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + back["location"]
2015-06-18 00:01:56 +02:00
buff.seek(0)
return buff
# Send a ping request
def ping(self):
response_time = None
2015-06-18 00:04:49 +02:00
for retry in range(1, 3): # Retry 3 times
2015-06-18 00:01:56 +02:00
s = time.time()
2015-06-18 00:06:41 +02:00
with gevent.Timeout(10.0, False): # 10 sec timeout, don't raise exception
2015-06-18 00:01:56 +02:00
response = self.request("ping")
if response and "body" in response and response["body"] == "Pong!":
response_time = time.time() - s
2015-06-18 00:04:49 +02:00
break # All fine, exit from for loop
2015-06-18 00:01:56 +02:00
# Timeout reached or bad response
self.onConnectionError()
self.connect()
time.sleep(1)
if response_time:
self.log("Ping: %.3f" % response_time)
else:
self.log("Ping failed")
self.last_ping = response_time
return response_time
# Request peer exchange from peer
def pex(self, site=None, need_num=5):
if not site:
site = self.site # If no site defined request peers for this site
2015-06-18 00:04:49 +02:00
# give him/her 5 connectible peers
2015-06-18 00:01:56 +02:00
packed_peers = [peer.packAddress() for peer in self.site.getConnectablePeers(5)]
response = self.request("pex", {"site": site.address, "peers": packed_peers, "need": need_num})
if not response or "error" in response:
return False
added = 0
for peer in response.get("peers", []):
address = self.unpackAddress(peer)
if site.addPeer(*address):
added += 1
if added:
self.log("Added peers using pex: %s" % added)
return added
# List modified files since the date
# Return: {inner_path: modification date,...}
def listModified(self, since):
return self.request("listModified", {"since": since, "site": self.site.address})
# Stop and remove from site
def remove(self):
self.log("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed))
if self.site and self.key in self.site.peers:
del(self.site.peers[self.key])
2015-06-18 00:01:56 +02:00
if self.connection:
self.connection.close()
# - EVENTS -
# On connection error
def onConnectionError(self):
self.connection_error += 1
if self.connection_error >= 3: # Dead peer
self.remove()
# Done working with peer
def onWorkerDone(self):
pass