Rev536, Fix stats page, Support ranged http requests for better video browser compatibility, setHashfield command, One by one send hashfield to connected peers if changed, Keep count hashfield changetime, PeerHashfield optimalizations, Wait for peers on checkmodification, Give more time to query trackers, Do not count udp trackers as error if udp disabled, Test hashfield push

This commit is contained in:
HelloZeroNet 2015-10-30 02:08:02 +01:00
parent 9c5fda6ed2
commit 8e710beab1
9 changed files with 171 additions and 40 deletions

View File

@ -135,15 +135,15 @@ class UiRequestPlugin(object):
])
yield "<tr><td id='peers_%s' style='display: none; white-space: pre'>" % site.address
for key, peer in site.peers.items():
if peer.last_found:
last_found = int(time.time()-peer.last_found)/60
if peer.time_found:
time_found = int(time.time()-peer.time_found)/60
else:
last_found = "--"
time_found = "--"
if peer.connection:
connection_id = peer.connection.id
else:
connection_id = None
yield "(#%s, err: %s, found: %s min ago) %22s -<br>" % (connection_id, peer.connection_error, last_found, key)
yield "(#%s, err: %s, found: %s min ago) %22s -<br>" % (connection_id, peer.connection_error, time_found, key)
yield "<br></td></tr>"
yield "</table>"

View File

@ -8,7 +8,7 @@ class Config(object):
def __init__(self, argv):
self.version = "0.3.2"
self.rev = 505
self.rev = 536
self.argv = argv
self.action = None
self.createParser()

View File

@ -1,5 +1,6 @@
# Included modules
import os
import time
from cStringIO import StringIO
# Third party modules
@ -69,6 +70,8 @@ class FileRequest(object):
self.actionGetHashfield(params)
elif cmd == "findHashIds":
self.actionFindHashIds(params)
elif cmd == "setHashfield":
self.actionSetHashfield(params)
elif cmd == "ping":
self.actionPing()
else:
@ -268,9 +271,11 @@ class FileRequest(object):
return False
# Add peer to site if not added before
connected_peer = site.addPeer(self.connection.ip, self.connection.port)
if connected_peer: # Just added
connected_peer.connect(self.connection) # Assign current connection to peer
peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True)
if not peer.connection: # Just added
peer.connect(self.connection) # Assign current connection to peer
peer.time_my_hashfield_sent = time.time() # Don't send again if not changed
self.response({"hashfield_raw": site.content_manager.hashfield.tostring()})
@ -297,6 +302,18 @@ class FileRequest(object):
)
self.response({"peers": back})
def actionSetHashfield(self, params):
site = self.sites.get(params["site"])
if not site or not site.settings["serving"]: # Site unknown or not serving
self.response({"error": "Unknown site"})
return False
peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True) # Add or get peer
if not peer.connection:
peer.connect(self.connection)
peer.hashfield.replaceFromString(params["hashfield_raw"])
self.response({"ok": "Updated"})
# Send a simple Pong! answer
def actionPing(self):
self.response("Pong!")

View File

@ -172,7 +172,7 @@ class FileServer(ConnectionServer):
import gc
first_announce = True # First start
while 1:
# Sites healthcare
# Sites healthcare every 20 min
if config.trackers_file:
config.loadTrackersFile()
for address, site in self.sites.items():
@ -196,6 +196,9 @@ class FileServer(ConnectionServer):
if self.port_opened is False:
site.needConnections()
if first_announce: # Send my optional files to peers
site.sendMyHashfield()
time.sleep(2) # Prevent too quick request
site = None
@ -208,6 +211,7 @@ class FileServer(ConnectionServer):
config.loadTrackersFile()
for address, site in self.sites.items():
site.announce(num=1, pex=False)
site.sendMyHashfield(num_send=1)
time.sleep(2)
first_announce = False

View File

@ -17,7 +17,7 @@ if config.use_tempfiles:
class Peer(object):
__slots__ = (
"ip", "port", "site", "key", "connection", "time_found", "time_response", "time_hashfield", "time_added",
"last_ping", "hashfield", "connection_error", "hash_failed", "download_bytes", "download_time"
"time_my_hashfield_sent", "last_ping", "hashfield", "connection_error", "hash_failed", "download_bytes", "download_time"
)
def __init__(self, ip, port, site=None):
@ -28,7 +28,8 @@ class Peer(object):
self.connection = None
self.hashfield = PeerHashfield() # Got optional files hash_id
self.time_hashfield = None # Last time hashfiled downloaded
self.time_hashfield = None # Last time peer's hashfiled downloaded
self.time_my_hashfield_sent = None # Last time my hashfield sent to peer
self.time_found = time.time() # Time of last found in the torrent tracker
self.time_response = None # Time of last successful response from peer
self.time_added = time.time()
@ -87,7 +88,7 @@ class Peer(object):
def found(self):
self.time_found = time.time()
# Send a command to peer
# Send a command to peer and return response value
def request(self, cmd, params={}, stream_to=None):
if not self.connection or self.connection.closed:
self.connect()
@ -239,6 +240,7 @@ class Peer(object):
return self.hashfield
# Find peers for hashids
# Return: {hash1: ["ip:port", "ip:port",...],...}
def findHashIds(self, hash_ids):
res = self.request("findHashIds", {"site": self.site.address, "hash_ids": hash_ids})
@ -246,6 +248,21 @@ class Peer(object):
return False
return {key: map(helper.unpackAddress, val) for key, val in res["peers"].iteritems()}
# Send my hashfield to peer
# Return: True if sent
def sendMyHashfield(self):
if self.connection and self.connection.handshake.get("rev", 0) < 510:
return False # Not supported
if self.time_my_hashfield_sent and self.site.content_manager.hashfield.time_changed <= self.time_my_hashfield_sent:
return False # Peer already has the latest hashfield
res = self.request("setHashfield", {"site": self.site.address, "hashfield_raw": self.site.content_manager.hashfield.tostring()})
if not res or "error" in res:
return False
else:
self.time_my_hashfield_sent = time.time()
return True
# Stop and remove from site
def remove(self):
self.log("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed))

View File

@ -1,9 +1,11 @@
import array
import time
class PeerHashfield():
def __init__(self):
self.storage = self.createStoreage()
self.time_changed = time.time()
def createStoreage(self):
storage = array.array("H")
@ -17,23 +19,26 @@ class PeerHashfield():
def appendHash(self, hash):
hash_id = int(hash[0:4], 16)
if hash_id not in self:
self.append(hash_id)
if hash_id not in self.storage:
self.storage.append(hash_id)
self.time_changed = time.time()
return True
else:
return False
def appendHashId(self, hash_id):
if hash_id not in self:
self.append(hash_id)
if hash_id not in self.storage:
self.storage.append(hash_id)
self.time_changed = time.time()
return True
else:
return False
def removeHash(self, hash):
hash_id = int(hash[0:4], 16)
if hash_id in self:
self.remove(hash_id)
if hash_id in self.storage:
self.storage.remove(hash_id)
self.time_changed = time.time()
return True
else:
return False
@ -42,8 +47,20 @@ class PeerHashfield():
return int(hash[0:4], 16)
def hasHash(self, hash):
return int(hash[0:4], 16) in self
return int(hash[0:4], 16) in self.storage
def replaceFromString(self, hashfield_raw):
self.storage = self.createStoreage()
self.fromstring(hashfield_raw)
self.storage.fromstring(hashfield_raw)
self.time_changed = time.time()
if __name__ == "__main__":
field = PeerHashfield()
s = time.time()
for i in range(10000):
field.appendHashId(i)
print time.time()-s
s = time.time()
for i in range(10000):
field.hasHash("AABB")
print time.time()-s

View File

@ -210,6 +210,13 @@ class Site:
peers_try = [] # Try these peers
queried = [] # Successfully queried from these peers
# Wait for peers
if not self.peers:
for wait in range(10):
time.sleep(5+wait)
self.log.debug("Waiting for peers...")
if self.peers: break
peers = self.peers.values()
random.shuffle(peers)
for peer in peers: # Try to find connected good peers, but we must have at least 5 peers
@ -218,7 +225,7 @@ class Site:
elif len(peers_try) < 5: # Backup peers, add to end of the try list
peers_try.append(peer)
if since is None: # No since definied, download from last modification time-1day
if since is None: # No since defined, download from last modification time-1day
since = self.settings.get("modified", 60 * 60 * 24) - 60 * 60 * 24
self.log.debug("Try to get listModifications from peers: %s since: %s" % (peers_try, since))
@ -548,8 +555,8 @@ class Site:
try:
url = "http://" + address + "?" + urllib.urlencode(params)
# Load url
with gevent.Timeout(10, False): # Make sure of timeout
req = urllib2.urlopen(url, timeout=8)
with gevent.Timeout(30, False): # Make sure of timeout
req = urllib2.urlopen(url, timeout=25)
response = req.read()
req.fp._sock.recv = None # Hacky avoidance of memory leak for older python versions
req.close()
@ -593,7 +600,10 @@ class Site:
return # No reannouncing within 30 secs
self.time_announce = time.time()
trackers = config.trackers
if config.disable_udp:
trackers = [tracker for tracker in config.trackers if not tracker.startswith("udp://")]
else:
trackers = config.trackers
if num == 1: # Only announce on one tracker, increment the queried tracker id
self.last_tracker_id += 1
self.last_tracker_id = self.last_tracker_id % len(trackers)
@ -622,7 +632,7 @@ class Site:
if len(threads) > num: # Announce limit
break
gevent.joinall(threads) # Wait for announce finish
gevent.joinall(threads, timeout=10) # Wait for announce finish
for thread in threads:
if thread.value:
@ -630,7 +640,10 @@ class Site:
slow.append("%.2fs %s://%s" % (thread.value, thread.protocol, thread.address))
announced += 1
else:
errors.append("%s://%s" % (thread.protocol, thread.address))
if thread.ready():
errors.append("%s://%s" % (thread.protocol, thread.address))
else: # Still running
slow.append("10s+ %s://%s" % (thread.protocol, thread.address))
# Save peers num
self.settings["peers"] = len(self.peers)
@ -721,6 +734,19 @@ class Site:
if removed:
self.log.debug("Cleanup peers result: Removed %s, left: %s" % (removed, len(self.peers)))
# Send hashfield to peers
def sendMyHashfield(self, num_send=3):
if not self.content_manager.hashfield: # No optional files
return False
num_sent = 0
connected_peers = self.getConnectedPeers()
for peer in connected_peers:
if peer.sendMyHashfield():
num_sent += 1
if num_sent >= num_send:
return True
return False
# - Events -
# Add event listeners

View File

@ -1,9 +1,12 @@
import time
from cStringIO import StringIO
import pytest
from File import FileServer
from File import FileRequest
from Crypt import CryptHash
from cStringIO import StringIO
import Spy
@pytest.mark.usefixtures("resetSettings")
@ -77,27 +80,54 @@ class TestPeer:
assert site.content_manager.hashfield.getHashId(new_hash) not in site.content_manager.hashfield
def testHashfieldExchange(self, file_server, site, site_temp):
file_server.ip_incoming = {} # Reset flood protection
file_server.sites[site.address] = site
client = FileServer("127.0.0.1", 1545)
client.sites[site_temp.address] = site_temp
site_temp.connection_server = client
connection = client.getConnection("127.0.0.1", 1544)
server1 = file_server
server1.ip_incoming = {} # Reset flood protection
server1.sites[site.address] = site
server2 = FileServer("127.0.0.1", 1545)
server2.sites[site_temp.address] = site_temp
site_temp.connection_server = server2
site.storage.verifyFiles(quick_check=True) # Find what optional files we have
# Add file_server as peer to client
peer_file_server = site_temp.addPeer("127.0.0.1", 1544)
server2_peer1 = site_temp.addPeer("127.0.0.1", 1544)
# Check if hashfield has any files
assert len(site.content_manager.hashfield) > 0
# Testing hashfield sync
assert len(peer_file_server.hashfield) == 0
assert peer_file_server.updateHashfield()
assert len(peer_file_server.hashfield) > 0
assert len(server2_peer1.hashfield) == 0
assert server2_peer1.updateHashfield() # Query hashfield from peer
assert len(server2_peer1.hashfield) > 0
connection.close()
client.stop()
# Test force push new hashfield
site_temp.content_manager.hashfield.appendHash("AABB")
server1_peer2 = site.addPeer("127.0.0.1", 1545, return_peer=True)
with Spy.Spy(FileRequest, "route") as requests:
assert len(server1_peer2.hashfield) == 0
server2_peer1.sendMyHashfield()
assert len(server1_peer2.hashfield) == 1
server2_peer1.sendMyHashfield() # Hashfield not changed, should be ignored
assert len(requests) == 1
time.sleep(0.01) # To make hashfield change date different
site_temp.content_manager.hashfield.appendHash("AACC")
server2_peer1.sendMyHashfield() # Push hashfield
assert len(server1_peer2.hashfield) == 2
assert len(requests) == 2
site_temp.content_manager.hashfield.appendHash("AADD")
assert server1_peer2.updateHashfield(force=True) # Request hashfield
assert len(server1_peer2.hashfield) == 3
assert len(requests) == 3
assert not server2_peer1.sendMyHashfield() # Not changed, should be ignored
assert len(requests) == 3
server2.stop()
def testFindHash(self, file_server, site, site_temp):
file_server.ip_incoming = {} # Reset flood protection

View File

@ -14,6 +14,7 @@ from Crypt import CryptHash
status_texts = {
200: "200 OK",
206: "206 Partial Content",
400: "400 Bad Request",
403: "403 Forbidden",
404: "404 Not Found",
@ -367,10 +368,29 @@ class UiRequest(object):
# TODO: Dont allow external access: extra_headers=
# [("Content-Security-Policy", "default-src 'unsafe-inline' data: http://localhost:43110 ws://localhost:43110")]
range = self.env.get("HTTP_RANGE")
range_start = None
if send_header:
self.sendHeader(content_type=content_type)
extra_headers = {}
file_size = os.path.getsize(file_path)
extra_headers["Accept-Ranges"] = "bytes"
if range:
range_start = int(re.match(".*?([0-9]+)", range).group(1))
if re.match(".*?-([0-9]+)", range):
range_end = int(re.match(".*?-([0-9]+)", range).group(1))+1
else:
range_end = file_size
extra_headers["Content-Length"] = range_end - range_start
extra_headers["Content-Range"] = "bytes %s-%s/%s" % (range_start, range_end-1, file_size)
if range:
status = 206
else:
status = 200
self.sendHeader(status, content_type=content_type, extra_headers=extra_headers.items())
if self.env["REQUEST_METHOD"] != "OPTIONS":
file = open(file_path, "rb")
if range_start:
file.seek(range_start)
while 1:
try:
block = file.read(block_size)