ZeroNet/src/File/FileRequest.py

109 lines
3.9 KiB
Python
Raw Normal View History

import os, msgpack, shutil, gevent
from Site import SiteManager
from cStringIO import StringIO
from Debug import Debug
from Config import config
FILE_BUFF = 1024*512
# Request from me
class FileRequest:
def __init__(self, server = None):
if server:
self.server = server
self.log = server.log
self.sites = SiteManager.list()
def send(self, msg):
if not isinstance(msg, dict): # If msg not a dict create a {"body": msg}
msg = {"body": msg}
self.server.socket.send(msgpack.packb(msg, use_bin_type=True))
# Route file requests
def route(self, cmd, params):
if cmd == "getFile":
self.actionGetFile(params)
elif cmd == "update":
self.actionUpdate(params)
elif cmd == "ping":
self.actionPing()
else:
self.actionUnknown(cmd, params)
# Update a site file request
def actionUpdate(self, params):
site = self.sites.get(params["site"])
if not site or not site.settings["serving"]: # Site unknown or not serving
self.send({"error": "Unknown site"})
return False
if site.settings["own"] and params["inner_path"].endswith("content.json"):
self.log.debug("Someone trying to push a file to own site %s, reload local %s first" % (site.address, params["inner_path"]))
site.content_manager.loadContent(params["inner_path"])
buff = StringIO(params["body"])
valid = site.content_manager.verifyFile(params["inner_path"], buff)
if valid == True: # Valid and changed
self.log.debug("Update for %s looks valid, saving..." % params["inner_path"])
buff.seek(0)
file = open(site.getPath(params["inner_path"]), "wb")
shutil.copyfileobj(buff, file) # Write buff to disk
file.close()
site.onFileDone(params["inner_path"]) # Trigger filedone
if params["inner_path"].endswith("content.json"): # Download every changed file from peer
peer = site.addPeer(*params["peer"], return_peer = True) # Add or get peer
site.onComplete.once(lambda: site.publish(inner_path=params["inner_path"])) # On complete publish to other peers
gevent.spawn(
lambda: site.downloadContent(params["inner_path"], peer=peer)
) # Load new content file and download changed files in new thread
self.send({"ok": "Thanks, file %s updated!" % params["inner_path"]})
elif valid == None: # Not changed
peer = site.addPeer(*params["peer"], return_peer = True) # Add or get peer
if peer:
self.log.debug("Same version, adding new peer for locked files: %s, tasks: %s" % (peer.key, len(site.worker_manager.tasks)) )
for task in site.worker_manager.tasks: # New peer add to every ongoing task
if task["peers"]: site.needFile(task["inner_path"], peer=peer, update=True, blocking=False) # Download file from this peer too if its peer locked
self.send({"ok": "File not changed"})
else: # Invalid sign or sha1 hash
self.log.debug("Update for %s is invalid" % params["inner_path"])
self.send({"error": "File invalid"})
# Send file content request
def actionGetFile(self, params):
site = self.sites.get(params["site"])
if not site or not site.settings["serving"]: # Site unknown or not serving
self.send({"error": "Unknown site"})
return False
try:
file_path = site.getPath(params["inner_path"])
if config.debug_socket: self.log.debug("Opening file: %s" % file_path)
file = open(file_path, "rb")
file.seek(params["location"])
back = {}
back["body"] = file.read(FILE_BUFF)
back["location"] = file.tell()
back["size"] = os.fstat(file.fileno()).st_size
if config.debug_socket: self.log.debug("Sending file %s from position %s to %s" % (file_path, params["location"], back["location"]))
self.send(back)
if config.debug_socket: self.log.debug("File %s sent" % file_path)
except Exception, err:
self.send({"error": "File read error: %s" % Debug.formatException(err)})
return False
# Send a simple Pong! answer
def actionPing(self):
self.send("Pong!")
# Unknown command
def actionUnknown(self, cmd, params):
self.send({"error": "Unknown command: %s" % cmd})