
144 lines
4.7 KiB

import os, logging, gevent, time, msgpack
import as zmq
from cStringIO import StringIO
from Config import config
from Debug import Debug
context = zmq.Context()
# Communicate remote peers
class Peer:
def __init__(self, ip, port, site):
self.ip = ip
self.port = port = site
self.key = "%s:%s" % (ip, port)
self.log = None
self.socket = None
self.last_found = None # Time of last found in the torrent tracker
self.last_response = None # Time of last successfull response from peer
self.last_ping = None # Last response time for ping
self.added = time.time()
self.connection_error = 0 # Series of connection error
self.hash_failed = 0 # Number of bad files from peer
self.download_bytes = 0 # Bytes downloaded
self.download_time = 0 # Time spent to download
# Connect to host
def connect(self):
if not self.log: self.log = logging.getLogger("Peer:%s:%s" % (self.ip, self.port))
if self.socket: self.socket.close()
self.socket = context.socket(zmq.REQ)
self.socket.setsockopt(zmq.RCVTIMEO, 50000) # Wait for data arrive
self.socket.setsockopt(zmq.SNDTIMEO, 5000) # Wait for data send
self.socket.setsockopt(zmq.LINGER, 500) # Wait for socket close
# self.socket.setsockopt(zmq.TCP_KEEPALIVE, 1) # Enable keepalive
# self.socket.setsockopt(zmq.TCP_KEEPALIVE_IDLE, 4*60) # Send after 4 minute idle
# self.socket.setsockopt(zmq.TCP_KEEPALIVE_INTVL, 15) # Wait 15 sec to response
# self.socket.setsockopt(zmq.TCP_KEEPALIVE_CNT, 4) # 4 Probes
self.socket.connect('tcp://%s:%s' % (self.ip, self.port))
# Found a peer on tracker
def found(self):
self.last_found = time.time()
# Send a command to peer
def sendCmd(self, cmd, params = {}):
if not self.socket: self.connect()
if cmd != "ping" and self.last_response and time.time() - self.last_response > 20*60: # If last response if older than 20 minute, ping first to see if still alive
if not return None
for retry in range(1,3): # Retry 3 times
if config.debug_socket: self.log.debug("sendCmd: %s %s" % (cmd, params.get("inner_path")))
self.socket.send(msgpack.packb({"cmd": cmd, "params": params}, use_bin_type=True))
if config.debug_socket: self.log.debug("Sent command: %s" % cmd)
response = msgpack.unpackb(self.socket.recv())
if config.debug_socket: self.log.debug("Got response to: %s" % cmd)
if "error" in response:
self.log.debug("%s error: %s" % (cmd, response["error"]))
else: # Successful request, reset connection error num
self.connection_error = 0
self.last_response = time.time()
return response
except Exception, err:
self.log.debug("%s (connection_error: %s, hash_failed: %s, retry: %s)" % (Debug.formatException(err), self.connection_error, self.hash_failed, retry))
if type(err).__name__ == "Notify" and err.message == "Worker stopped": # Greenlet kill by worker
self.log.debug("Peer worker got killed, aborting cmd: %s" % cmd)
return None # Failed after 4 retry
# Get a file content from peer
def getFile(self, site, inner_path):
location = 0
buff = StringIO()
s = time.time()
while 1: # Read in 512k parts
back = self.sendCmd("getFile", {"site": site, "inner_path": inner_path, "location": location}) # Get file content from last location
if not back or "body" not in back: # Error
return False
if back["location"] == back["size"]: # End of file
location = back["location"]
self.download_bytes += back["location"]
self.download_time += (time.time() - s)
return buff
# Send a ping request
def ping(self):
response_time = None
for retry in range(1,3): # Retry 3 times
s = time.time()
with gevent.Timeout(10.0, False): # 10 sec timeout, dont raise exception
response = self.sendCmd("ping")
if response and "body" in response and response["body"] == "Pong!":
response_time = time.time()-s
break # All fine, exit from for loop
# Timeout reached or bad response
if response_time:
self.log.debug("Ping: %.3f" % response_time)
self.log.debug("Ping failed")
self.last_ping = response_time
return response_time
# Stop and remove from site
def remove(self):
self.log.debug("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed))
if self.key in del([self.key])
# - EVENTS -
# On connection error
def onConnectionError(self):
self.connection_error += 1
if self.connection_error >= 5: # Dead peer
# Done working with peer
def onWorkerDone(self):