2015-06-17 23:44:20 +02:00
|
|
|
# Included modules
|
|
|
|
import socket
|
|
|
|
import struct
|
|
|
|
import os
|
2015-01-12 02:03:45 +01:00
|
|
|
from cStringIO import StringIO
|
2015-06-17 23:44:20 +02:00
|
|
|
|
|
|
|
# Third party modules
|
|
|
|
import gevent
|
|
|
|
|
2015-01-17 18:50:56 +01:00
|
|
|
from Debug import Debug
|
2015-01-21 12:58:26 +01:00
|
|
|
from Config import config
|
2015-09-27 02:08:53 +02:00
|
|
|
from util import RateLimit
|
|
|
|
from util import StreamingMsgpack
|
|
|
|
from util import helper
|
2015-01-12 02:03:45 +01:00
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
FILE_BUFF = 1024 * 512
|
|
|
|
|
2015-01-12 02:03:45 +01:00
|
|
|
|
|
|
|
# Request from me
|
rev125, Class statistics, OpenSSL disabled on OSX by default because of possible segfault, --disable_openssl command line parameter, Save memory on Connection, Peer and FileRequest objects using slots, Dont store modification time from the far future, Able to query modified files from peer, Allow reannounce in 30secs, Use with command in SiteStorage, Always create dir before write file, PeerCmd shell command to query specific command from peer
2015-04-29 23:12:45 +02:00
|
|
|
class FileRequest(object):
|
2015-06-17 23:44:20 +02:00
|
|
|
__slots__ = ("server", "connection", "req_id", "sites", "log", "responded")
|
|
|
|
|
|
|
|
def __init__(self, server, connection):
|
|
|
|
self.server = server
|
|
|
|
self.connection = connection
|
|
|
|
|
|
|
|
self.req_id = None
|
|
|
|
self.sites = self.server.sites
|
|
|
|
self.log = server.log
|
|
|
|
self.responded = False # Responded to the request
|
|
|
|
|
|
|
|
def send(self, msg, streaming=False):
|
|
|
|
if not self.connection.closed:
|
|
|
|
self.connection.send(msg, streaming)
|
|
|
|
|
2015-07-25 13:38:58 +02:00
|
|
|
def sendRawfile(self, file, read_bytes):
|
|
|
|
if not self.connection.closed:
|
|
|
|
self.connection.sendRawfile(file, read_bytes)
|
|
|
|
|
2015-06-17 23:44:20 +02:00
|
|
|
def response(self, msg, streaming=False):
|
|
|
|
if self.responded:
|
|
|
|
self.log.debug("Req id %s already responded" % self.req_id)
|
|
|
|
return
|
|
|
|
if not isinstance(msg, dict): # If msg not a dict create a {"body": msg}
|
|
|
|
msg = {"body": msg}
|
|
|
|
msg["cmd"] = "response"
|
|
|
|
msg["to"] = self.req_id
|
|
|
|
self.responded = True
|
|
|
|
self.send(msg, streaming=streaming)
|
|
|
|
|
|
|
|
# Route file requests
|
|
|
|
def route(self, cmd, req_id, params):
|
|
|
|
self.req_id = req_id
|
|
|
|
|
|
|
|
if cmd == "getFile":
|
|
|
|
self.actionGetFile(params)
|
2015-07-25 13:38:58 +02:00
|
|
|
elif cmd == "streamFile":
|
|
|
|
self.actionStreamFile(params)
|
2015-06-17 23:44:20 +02:00
|
|
|
elif cmd == "update":
|
|
|
|
event = "%s update %s %s" % (self.connection.id, params["site"], params["inner_path"])
|
2015-07-12 20:36:46 +02:00
|
|
|
if not RateLimit.isAllowed(event): # There was already an update for this file in the last 10 second
|
2015-06-17 23:44:20 +02:00
|
|
|
self.response({"ok": "File update queued"})
|
|
|
|
# If called more than once within 10 sec only keep the last update
|
|
|
|
RateLimit.callAsync(event, 10, self.actionUpdate, params)
|
|
|
|
|
|
|
|
elif cmd == "pex":
|
|
|
|
self.actionPex(params)
|
|
|
|
elif cmd == "listModified":
|
|
|
|
self.actionListModified(params)
|
|
|
|
elif cmd == "ping":
|
|
|
|
self.actionPing()
|
|
|
|
else:
|
|
|
|
self.actionUnknown(cmd, params)
|
|
|
|
|
|
|
|
# Update a site file request
|
|
|
|
def actionUpdate(self, params):
|
|
|
|
site = self.sites.get(params["site"])
|
2015-07-12 20:36:46 +02:00
|
|
|
if not site or not site.settings["serving"]: # Site unknown or not serving
|
2015-06-17 23:44:20 +02:00
|
|
|
self.response({"error": "Unknown site"})
|
|
|
|
return False
|
|
|
|
if site.settings["own"] and params["inner_path"].endswith("content.json"):
|
2015-07-12 20:36:46 +02:00
|
|
|
self.log.debug(
|
|
|
|
"Someone trying to push a file to own site %s, reload local %s first" %
|
|
|
|
(site.address, params["inner_path"])
|
|
|
|
)
|
2015-09-16 00:01:23 +02:00
|
|
|
changed, deleted = site.content_manager.loadContent(params["inner_path"], add_bad_files=False)
|
|
|
|
if changed or deleted: # Content.json changed locally
|
2015-07-12 20:36:46 +02:00
|
|
|
site.settings["size"] = site.content_manager.getTotalSize() # Update site size
|
2015-06-17 23:44:20 +02:00
|
|
|
buff = StringIO(params["body"])
|
|
|
|
valid = site.content_manager.verifyFile(params["inner_path"], buff)
|
2015-07-12 20:36:46 +02:00
|
|
|
if valid is True: # Valid and changed
|
2015-06-17 23:44:20 +02:00
|
|
|
self.log.info("Update for %s looks valid, saving..." % params["inner_path"])
|
|
|
|
buff.seek(0)
|
|
|
|
site.storage.write(params["inner_path"], buff)
|
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
site.onFileDone(params["inner_path"]) # Trigger filedone
|
|
|
|
|
|
|
|
if params["inner_path"].endswith("content.json"): # Download every changed file from peer
|
|
|
|
peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True) # Add or get peer
|
|
|
|
# On complete publish to other peers
|
|
|
|
site.onComplete.once(lambda: site.publish(inner_path=params["inner_path"]), "publish_%s" % params["inner_path"])
|
2015-06-17 23:44:20 +02:00
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
# Load new content file and download changed files in new thread
|
2015-06-17 23:44:20 +02:00
|
|
|
gevent.spawn(
|
|
|
|
lambda: site.downloadContent(params["inner_path"], peer=peer)
|
2015-07-12 20:36:46 +02:00
|
|
|
)
|
2015-06-17 23:44:20 +02:00
|
|
|
|
|
|
|
self.response({"ok": "Thanks, file %s updated!" % params["inner_path"]})
|
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
elif valid is None: # Not changed
|
|
|
|
peer = site.addPeer(*params["peer"], return_peer=True) # Add or get peer
|
2015-06-17 23:44:20 +02:00
|
|
|
if peer:
|
2015-07-12 20:36:46 +02:00
|
|
|
self.log.debug(
|
|
|
|
"Same version, adding new peer for locked files: %s, tasks: %s" %
|
|
|
|
(peer.key, len(site.worker_manager.tasks))
|
|
|
|
)
|
|
|
|
for task in site.worker_manager.tasks: # New peer add to every ongoing task
|
|
|
|
if task["peers"]:
|
|
|
|
# Download file from this peer too if its peer locked
|
|
|
|
site.needFile(task["inner_path"], peer=peer, update=True, blocking=False)
|
2015-06-17 23:44:20 +02:00
|
|
|
|
|
|
|
self.response({"ok": "File not changed"})
|
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
else: # Invalid sign or sha1 hash
|
2015-06-17 23:44:20 +02:00
|
|
|
self.log.debug("Update for %s is invalid" % params["inner_path"])
|
|
|
|
self.response({"error": "File invalid"})
|
|
|
|
|
|
|
|
# Send file content request
|
|
|
|
def actionGetFile(self, params):
|
|
|
|
site = self.sites.get(params["site"])
|
2015-07-12 20:36:46 +02:00
|
|
|
if not site or not site.settings["serving"]: # Site unknown or not serving
|
2015-06-17 23:44:20 +02:00
|
|
|
self.response({"error": "Unknown site"})
|
|
|
|
return False
|
|
|
|
try:
|
|
|
|
file_path = site.storage.getPath(params["inner_path"])
|
2015-07-12 20:36:46 +02:00
|
|
|
if config.debug_socket:
|
|
|
|
self.log.debug("Opening file: %s" % file_path)
|
2015-06-17 23:44:20 +02:00
|
|
|
with StreamingMsgpack.FilePart(file_path, "rb") as file:
|
|
|
|
file.seek(params["location"])
|
|
|
|
file.read_bytes = FILE_BUFF
|
2015-08-16 11:51:00 +02:00
|
|
|
file_size = os.fstat(file.fileno()).st_size
|
2015-07-12 20:36:46 +02:00
|
|
|
back = {
|
|
|
|
"body": file,
|
2015-08-16 11:51:00 +02:00
|
|
|
"size": file_size,
|
|
|
|
"location": min(file.tell() + FILE_BUFF, file_size)
|
2015-07-12 20:36:46 +02:00
|
|
|
}
|
2015-06-17 23:44:20 +02:00
|
|
|
if config.debug_socket:
|
2015-07-12 20:36:46 +02:00
|
|
|
self.log.debug(
|
|
|
|
"Sending file %s from position %s to %s" %
|
|
|
|
(file_path, params["location"], back["location"])
|
|
|
|
)
|
2015-06-17 23:44:20 +02:00
|
|
|
self.response(back, streaming=True)
|
2015-08-16 11:51:00 +02:00
|
|
|
|
|
|
|
bytes_sent = min(FILE_BUFF, file_size - params["location"]) # Number of bytes we going to send
|
|
|
|
site.settings["bytes_sent"] = site.settings.get("bytes_sent", 0) + bytes_sent
|
2015-06-17 23:44:20 +02:00
|
|
|
if config.debug_socket:
|
2015-08-16 11:51:00 +02:00
|
|
|
self.log.debug("File %s at position %s sent %s bytes" % (file_path, params["location"], bytes_sent))
|
2015-06-17 23:44:20 +02:00
|
|
|
|
|
|
|
# Add peer to site if not added before
|
|
|
|
connected_peer = site.addPeer(self.connection.ip, self.connection.port)
|
2015-07-12 20:36:46 +02:00
|
|
|
if connected_peer: # Just added
|
2015-06-17 23:44:20 +02:00
|
|
|
connected_peer.connect(self.connection) # Assign current connection to peer
|
|
|
|
|
|
|
|
except Exception, err:
|
|
|
|
self.log.debug("GetFile read error: %s" % Debug.formatException(err))
|
|
|
|
self.response({"error": "File read error: %s" % Debug.formatException(err)})
|
|
|
|
return False
|
|
|
|
|
2015-07-25 13:38:58 +02:00
|
|
|
# New-style file streaming out of Msgpack context
|
|
|
|
def actionStreamFile(self, params):
|
|
|
|
site = self.sites.get(params["site"])
|
|
|
|
if not site or not site.settings["serving"]: # Site unknown or not serving
|
|
|
|
self.response({"error": "Unknown site"})
|
|
|
|
return False
|
|
|
|
try:
|
|
|
|
if config.debug_socket:
|
|
|
|
self.log.debug("Opening file: %s" % params["inner_path"])
|
|
|
|
with site.storage.open(params["inner_path"]) as file:
|
|
|
|
file.seek(params["location"])
|
2015-08-16 11:51:00 +02:00
|
|
|
file_size = os.fstat(file.fileno()).st_size
|
|
|
|
stream_bytes = min(FILE_BUFF, file_size - params["location"])
|
2015-07-25 13:38:58 +02:00
|
|
|
back = {
|
2015-08-16 11:51:00 +02:00
|
|
|
"size": file_size,
|
|
|
|
"location": min(file.tell() + FILE_BUFF, file_size),
|
2015-07-25 13:38:58 +02:00
|
|
|
"stream_bytes": stream_bytes
|
|
|
|
}
|
|
|
|
if config.debug_socket:
|
|
|
|
self.log.debug(
|
|
|
|
"Sending file %s from position %s to %s" %
|
|
|
|
(params["inner_path"], params["location"], back["location"])
|
|
|
|
)
|
|
|
|
self.response(back)
|
|
|
|
self.sendRawfile(file, read_bytes=FILE_BUFF)
|
2015-08-16 11:51:00 +02:00
|
|
|
|
|
|
|
site.settings["bytes_sent"] = site.settings.get("bytes_sent", 0) + stream_bytes
|
2015-07-25 13:38:58 +02:00
|
|
|
if config.debug_socket:
|
2015-08-16 11:51:00 +02:00
|
|
|
self.log.debug("File %s at position %s sent %s bytes" % (params["inner_path"], params["location"], stream_bytes))
|
2015-07-25 13:38:58 +02:00
|
|
|
|
|
|
|
# Add peer to site if not added before
|
|
|
|
connected_peer = site.addPeer(self.connection.ip, self.connection.port)
|
|
|
|
if connected_peer: # Just added
|
|
|
|
connected_peer.connect(self.connection) # Assign current connection to peer
|
|
|
|
|
|
|
|
except Exception, err:
|
|
|
|
self.log.debug("GetFile read error: %s" % Debug.formatException(err))
|
|
|
|
self.response({"error": "File read error: %s" % Debug.formatException(err)})
|
|
|
|
return False
|
|
|
|
|
2015-06-17 23:44:20 +02:00
|
|
|
# Peer exchange request
|
|
|
|
def actionPex(self, params):
|
|
|
|
site = self.sites.get(params["site"])
|
2015-07-12 20:36:46 +02:00
|
|
|
if not site or not site.settings["serving"]: # Site unknown or not serving
|
2015-06-17 23:44:20 +02:00
|
|
|
self.response({"error": "Unknown site"})
|
|
|
|
return False
|
|
|
|
|
|
|
|
got_peer_keys = []
|
|
|
|
added = 0
|
|
|
|
connected_peer = site.addPeer(self.connection.ip, self.connection.port) # Add requester peer to site
|
|
|
|
if connected_peer: # Just added
|
|
|
|
added += 1
|
|
|
|
connected_peer.connect(self.connection) # Assign current connection to peer
|
|
|
|
|
2015-09-27 12:42:53 +02:00
|
|
|
for packed_address in params["peers"]: # Add sent peers to site
|
|
|
|
address = helper.unpackAddress(packed_address)
|
2015-06-17 23:44:20 +02:00
|
|
|
got_peer_keys.append("%s:%s" % address)
|
2015-07-12 20:36:46 +02:00
|
|
|
if site.addPeer(*address):
|
|
|
|
added += 1
|
2015-06-17 23:44:20 +02:00
|
|
|
# Send back peers that is not in the sent list and connectable (not port 0)
|
2015-09-27 12:42:53 +02:00
|
|
|
packed_peers = [peer.packMyAddress() for peer in site.getConnectablePeers(params["need"], got_peer_keys)]
|
2015-06-17 23:44:20 +02:00
|
|
|
if added:
|
|
|
|
site.worker_manager.onPeers()
|
|
|
|
self.log.debug("Added %s peers to %s using pex, sending back %s" % (added, site, len(packed_peers)))
|
|
|
|
self.response({"peers": packed_peers})
|
|
|
|
|
|
|
|
# Get modified content.json files since
|
|
|
|
def actionListModified(self, params):
|
|
|
|
site = self.sites.get(params["site"])
|
2015-07-12 20:36:46 +02:00
|
|
|
if not site or not site.settings["serving"]: # Site unknown or not serving
|
2015-06-17 23:44:20 +02:00
|
|
|
self.response({"error": "Unknown site"})
|
|
|
|
return False
|
2015-07-12 20:36:46 +02:00
|
|
|
modified_files = {
|
|
|
|
inner_path: content["modified"]
|
|
|
|
for inner_path, content in site.content_manager.contents.iteritems()
|
|
|
|
if content["modified"] > params["since"]
|
|
|
|
}
|
2015-06-17 23:44:20 +02:00
|
|
|
|
|
|
|
# Add peer to site if not added before
|
|
|
|
connected_peer = site.addPeer(self.connection.ip, self.connection.port)
|
|
|
|
if connected_peer: # Just added
|
|
|
|
connected_peer.connect(self.connection) # Assign current connection to peer
|
|
|
|
|
|
|
|
self.response({"modified_files": modified_files})
|
|
|
|
|
|
|
|
# Send a simple Pong! answer
|
|
|
|
def actionPing(self):
|
|
|
|
self.response("Pong!")
|
|
|
|
|
|
|
|
# Unknown command
|
|
|
|
def actionUnknown(self, cmd, params):
|
|
|
|
self.response({"error": "Unknown command: %s" % cmd})
|