partial cleanup of FileRequest.py

This commit is contained in:
Matthew Bell 2015-06-17 22:44:20 +01:00
parent 39acf04b4b
commit 31b6dc4516
1 changed files with 158 additions and 155 deletions

View File

@ -1,5 +1,12 @@
import os, msgpack, shutil, gevent, socket, struct, random # Included modules
import socket
import struct
import os
from cStringIO import StringIO from cStringIO import StringIO
# Third party modules
import gevent
from Debug import Debug from Debug import Debug
from Config import config from Config import config
from util import RateLimit, StreamingMsgpack from util import RateLimit, StreamingMsgpack
@ -8,183 +15,179 @@ FILE_BUFF = 1024*512
# Request from me # Request from me
class FileRequest(object): class FileRequest(object):
__slots__ = ("server", "connection", "req_id", "sites", "log", "responded") __slots__ = ("server", "connection", "req_id", "sites", "log", "responded")
def __init__(self, server, connection): def __init__(self, server, connection):
self.server = server self.server = server
self.connection = connection self.connection = connection
self.req_id = None self.req_id = None
self.sites = self.server.sites self.sites = self.server.sites
self.log = server.log self.log = server.log
self.responded = False # Responded to the request self.responded = False # Responded to the request
def unpackAddress(self, packed):
return socket.inet_ntoa(packed[0:4]), struct.unpack_from("H", packed, 4)[0]
def unpackAddress(self, packed): def send(self, msg, streaming=False):
return (socket.inet_ntoa(packed[0:4]), struct.unpack_from("H", packed, 4)[0]) if not self.connection.closed:
self.connection.send(msg, streaming)
def response(self, msg, streaming=False):
if self.responded:
self.log.debug("Req id %s already responded" % self.req_id)
return
if not isinstance(msg, dict): # If msg not a dict create a {"body": msg}
msg = {"body": msg}
msg["cmd"] = "response"
msg["to"] = self.req_id
self.responded = True
self.send(msg, streaming=streaming)
def send(self, msg, streaming=False): # Route file requests
if not self.connection.closed: def route(self, cmd, req_id, params):
self.connection.send(msg, streaming) self.req_id = req_id
if cmd == "getFile":
self.actionGetFile(params)
elif cmd == "update":
event = "%s update %s %s" % (self.connection.id, params["site"], params["inner_path"])
if not RateLimit.isAllowed(event): # There was already an update for this file in the last 10 second
self.response({"ok": "File update queued"})
# If called more than once within 10 sec only keep the last update
RateLimit.callAsync(event, 10, self.actionUpdate, params)
def response(self, msg, streaming=False): elif cmd == "pex":
if self.responded: self.actionPex(params)
self.log.debug("Req id %s already responded" % self.req_id) elif cmd == "listModified":
return self.actionListModified(params)
if not isinstance(msg, dict): # If msg not a dict create a {"body": msg} elif cmd == "ping":
msg = {"body": msg} self.actionPing()
msg["cmd"] = "response" else:
msg["to"] = self.req_id self.actionUnknown(cmd, params)
self.responded = True
self.send(msg, streaming=streaming)
# Update a site file request
def actionUpdate(self, params):
site = self.sites.get(params["site"])
if not site or not site.settings["serving"]: # Site unknown or not serving
self.response({"error": "Unknown site"})
return False
if site.settings["own"] and params["inner_path"].endswith("content.json"):
self.log.debug("Someone trying to push a file to own site %s, reload local %s first" % (site.address, params["inner_path"]))
changed = site.content_manager.loadContent(params["inner_path"], add_bad_files=False)
if changed: # Content.json changed locally
site.settings["size"] = site.content_manager.getTotalSize() # Update site size
buff = StringIO(params["body"])
valid = site.content_manager.verifyFile(params["inner_path"], buff)
if valid == True: # Valid and changed
self.log.info("Update for %s looks valid, saving..." % params["inner_path"])
buff.seek(0)
site.storage.write(params["inner_path"], buff)
# Route file requests site.onFileDone(params["inner_path"]) # Trigger filedone
def route(self, cmd, req_id, params):
self.req_id = req_id
if cmd == "getFile": if params["inner_path"].endswith("content.json"): # Download every changed file from peer
self.actionGetFile(params) peer = site.addPeer(self.connection.ip, self.connection.port, return_peer = True) # Add or get peer
elif cmd == "update": site.onComplete.once(lambda: site.publish(inner_path=params["inner_path"]), "publish_%s" % params["inner_path"]) # On complete publish to other peers
event = "%s update %s %s" % (self.connection.id, params["site"], params["inner_path"]) gevent.spawn(
if not RateLimit.isAllowed(event): # There was already an updat for this file in the last 10 second lambda: site.downloadContent(params["inner_path"], peer=peer)
self.response({"ok": "File update queued"}) ) # Load new content file and download changed files in new thread
RateLimit.callAsync(event, 10, self.actionUpdate, params) # If called more than once within 10 sec only keep the last update
elif cmd == "pex": self.response({"ok": "Thanks, file %s updated!" % params["inner_path"]})
self.actionPex(params)
elif cmd == "listModified":
self.actionListModified(params)
elif cmd == "ping":
self.actionPing()
else:
self.actionUnknown(cmd, params)
elif valid == None: # Not changed
peer = site.addPeer(*params["peer"], return_peer = True) # Add or get peer
if peer:
self.log.debug("Same version, adding new peer for locked files: %s, tasks: %s" % (peer.key, len(site.worker_manager.tasks)) )
for task in site.worker_manager.tasks: # New peer add to every ongoing task
if task["peers"]: site.needFile(task["inner_path"], peer=peer, update=True, blocking=False) # Download file from this peer too if its peer locked
# Update a site file request self.response({"ok": "File not changed"})
def actionUpdate(self, params):
site = self.sites.get(params["site"])
if not site or not site.settings["serving"]: # Site unknown or not serving
self.response({"error": "Unknown site"})
return False
if site.settings["own"] and params["inner_path"].endswith("content.json"):
self.log.debug("Someone trying to push a file to own site %s, reload local %s first" % (site.address, params["inner_path"]))
changed = site.content_manager.loadContent(params["inner_path"], add_bad_files=False)
if changed: # Content.json changed locally
site.settings["size"] = site.content_manager.getTotalSize() # Update site size
buff = StringIO(params["body"])
valid = site.content_manager.verifyFile(params["inner_path"], buff)
if valid == True: # Valid and changed
self.log.info("Update for %s looks valid, saving..." % params["inner_path"])
buff.seek(0)
site.storage.write(params["inner_path"], buff)
site.onFileDone(params["inner_path"]) # Trigger filedone else: # Invalid sign or sha1 hash
self.log.debug("Update for %s is invalid" % params["inner_path"])
self.response({"error": "File invalid"})
if params["inner_path"].endswith("content.json"): # Download every changed file from peer # Send file content request
peer = site.addPeer(self.connection.ip, self.connection.port, return_peer = True) # Add or get peer def actionGetFile(self, params):
site.onComplete.once(lambda: site.publish(inner_path=params["inner_path"]), "publish_%s" % params["inner_path"]) # On complete publish to other peers site = self.sites.get(params["site"])
gevent.spawn( if not site or not site.settings["serving"]: # Site unknown or not serving
lambda: site.downloadContent(params["inner_path"], peer=peer) self.response({"error": "Unknown site"})
) # Load new content file and download changed files in new thread return False
try:
file_path = site.storage.getPath(params["inner_path"])
if config.debug_socket: self.log.debug("Opening file: %s" % file_path)
with StreamingMsgpack.FilePart(file_path, "rb") as file:
file.seek(params["location"])
file.read_bytes = FILE_BUFF
back = {"body": file,
"size": os.fstat(file.fileno()).st_size,
"location": min(file.tell()+FILE_BUFF, os.fstat(file.fileno()).st_size)
}
if config.debug_socket:
self.log.debug("Sending file %s from position %s to %s" % (file_path,
params["location"],
back["location"]))
self.response(back, streaming=True)
if config.debug_socket:
self.log.debug("File %s sent" % file_path)
self.response({"ok": "Thanks, file %s updated!" % params["inner_path"]}) # Add peer to site if not added before
connected_peer = site.addPeer(self.connection.ip, self.connection.port)
if connected_peer: # Just added
connected_peer.connect(self.connection) # Assign current connection to peer
elif valid == None: # Not changed except Exception, err:
peer = site.addPeer(*params["peer"], return_peer = True) # Add or get peer self.log.debug("GetFile read error: %s" % Debug.formatException(err))
if peer: self.response({"error": "File read error: %s" % Debug.formatException(err)})
self.log.debug("Same version, adding new peer for locked files: %s, tasks: %s" % (peer.key, len(site.worker_manager.tasks)) ) return False
for task in site.worker_manager.tasks: # New peer add to every ongoing task
if task["peers"]: site.needFile(task["inner_path"], peer=peer, update=True, blocking=False) # Download file from this peer too if its peer locked
self.response({"ok": "File not changed"}) # Peer exchange request
def actionPex(self, params):
site = self.sites.get(params["site"])
if not site or not site.settings["serving"]: # Site unknown or not serving
self.response({"error": "Unknown site"})
return False
else: # Invalid sign or sha1 hash got_peer_keys = []
self.log.debug("Update for %s is invalid" % params["inner_path"]) added = 0
self.response({"error": "File invalid"}) connected_peer = site.addPeer(self.connection.ip, self.connection.port) # Add requester peer to site
if connected_peer: # Just added
added += 1
connected_peer.connect(self.connection) # Assign current connection to peer
for peer in params["peers"]: # Add sent peers to site
address = self.unpackAddress(peer)
got_peer_keys.append("%s:%s" % address)
if site.addPeer(*address): added += 1
# Send back peers that is not in the sent list and connectable (not port 0)
packed_peers = [peer.packAddress() for peer in site.getConnectablePeers(params["need"], got_peer_keys)]
if added:
site.worker_manager.onPeers()
self.log.debug("Added %s peers to %s using pex, sending back %s" % (added, site, len(packed_peers)))
self.response({"peers": packed_peers})
# Send file content request # Get modified content.json files since
def actionGetFile(self, params): def actionListModified(self, params):
site = self.sites.get(params["site"]) site = self.sites.get(params["site"])
if not site or not site.settings["serving"]: # Site unknown or not serving if not site or not site.settings["serving"]: # Site unknown or not serving
self.response({"error": "Unknown site"}) self.response({"error": "Unknown site"})
return False return False
try: modified_files = {inner_path: content["modified"]
file_path = site.storage.getPath(params["inner_path"]) for inner_path, content in site.content_manager.contents.iteritems()
if config.debug_socket: self.log.debug("Opening file: %s" % file_path) if content["modified"] > params["since"]}
with StreamingMsgpack.FilePart(file_path, "rb") as file:
file.seek(params["location"])
file.read_bytes = FILE_BUFF
back = {}
back["body"] = file
back["size"] = os.fstat(file.fileno()).st_size
back["location"] = min(file.tell()+FILE_BUFF, back["size"])
if config.debug_socket: self.log.debug("Sending file %s from position %s to %s" % (file_path, params["location"], back["location"]))
self.response(back, streaming=True)
if config.debug_socket: self.log.debug("File %s sent" % file_path)
# Add peer to site if not added before # Add peer to site if not added before
connected_peer = site.addPeer(self.connection.ip, self.connection.port) connected_peer = site.addPeer(self.connection.ip, self.connection.port)
if connected_peer: # Just added if connected_peer: # Just added
connected_peer.connect(self.connection) # Assign current connection to peer connected_peer.connect(self.connection) # Assign current connection to peer
except Exception, err: self.response({"modified_files": modified_files})
self.log.debug("GetFile read error: %s" % Debug.formatException(err))
self.response({"error": "File read error: %s" % Debug.formatException(err)})
return False
# Send a simple Pong! answer
def actionPing(self):
self.response("Pong!")
# Peer exchange request # Unknown command
def actionPex(self, params): def actionUnknown(self, cmd, params):
site = self.sites.get(params["site"]) self.response({"error": "Unknown command: %s" % cmd})
if not site or not site.settings["serving"]: # Site unknown or not serving
self.response({"error": "Unknown site"})
return False
got_peer_keys = []
added = 0
connected_peer = site.addPeer(self.connection.ip, self.connection.port) # Add requester peer to site
if connected_peer: # Just added
added +=1
connected_peer.connect(self.connection) # Assign current connection to peer
for peer in params["peers"]: # Add sent peers to site
address = self.unpackAddress(peer)
got_peer_keys.append("%s:%s" % address)
if site.addPeer(*address): added += 1
# Send back peers that is not in the sent list and connectable (not port 0)
packed_peers = [peer.packAddress() for peer in site.getConnectablePeers(params["need"], got_peer_keys)]
if added:
site.worker_manager.onPeers()
self.log.debug("Added %s peers to %s using pex, sending back %s" % (added, site, len(packed_peers)))
self.response({"peers": packed_peers})
# Get modified content.json files since
def actionListModified(self, params):
site = self.sites.get(params["site"])
if not site or not site.settings["serving"]: # Site unknown or not serving
self.response({"error": "Unknown site"})
return False
modified_files = {inner_path: content["modified"] for inner_path, content in site.content_manager.contents.iteritems() if content["modified"] > params["since"]}
# Add peer to site if not added before
connected_peer = site.addPeer(self.connection.ip, self.connection.port)
if connected_peer: # Just added
connected_peer.connect(self.connection) # Assign current connection to peer
self.response({"modified_files": modified_files})
# Send a simple Pong! answer
def actionPing(self):
self.response("Pong!")
# Unknown command
def actionUnknown(self, cmd, params):
self.response({"error": "Unknown command: %s" % cmd})