ZeroNet/plugins/Bigfile/BigfilePlugin.py

774 lines
33 KiB
Python
Raw Normal View History

2017-10-04 13:28:59 +02:00
import time
import os
import subprocess
import shutil
import collections
import math
import warnings
2019-03-16 02:14:08 +01:00
import base64
import binascii
2017-10-04 13:28:59 +02:00
import gevent
import gevent.lock
2017-10-04 13:28:59 +02:00
from Plugin import PluginManager
from Debug import Debug
2017-10-04 13:28:59 +02:00
from Crypt import CryptHash
with warnings.catch_warnings():
warnings.filterwarnings("ignore") # Ignore missing sha3 warning
import merkletools
2017-10-04 13:28:59 +02:00
from util import helper
2019-03-16 02:14:08 +01:00
from util import Msgpack
2017-10-04 13:28:59 +02:00
import util
2019-03-15 21:06:59 +01:00
from .BigfilePiecefield import BigfilePiecefield, BigfilePiecefieldPacked
2017-10-04 13:28:59 +02:00
# We can only import plugin host clases after the plugins are loaded
@PluginManager.afterLoad
def importPluginnedClasses():
global VerifyError, config
from Content.ContentManager import VerifyError
from Config import config
if "upload_nonces" not in locals():
upload_nonces = {}
@PluginManager.registerTo("UiRequest")
class UiRequestPlugin(object):
def isCorsAllowed(self, path):
if path == "/ZeroNet-Internal/BigfileUpload":
return True
else:
return super(UiRequestPlugin, self).isCorsAllowed(path)
def actionBigfileUpload(self):
nonce = self.get.get("upload_nonce")
if nonce not in upload_nonces:
return self.error403("Upload nonce error.")
upload_info = upload_nonces[nonce]
del upload_nonces[nonce]
2018-03-06 11:58:56 +01:00
self.sendHeader(200, "text/html", noscript=True, extra_headers={
"Access-Control-Allow-Origin": "null",
"Access-Control-Allow-Credentials": "true"
})
2017-10-04 13:28:59 +02:00
self.readMultipartHeaders(self.env['wsgi.input']) # Skip http headers
site = upload_info["site"]
inner_path = upload_info["inner_path"]
with site.storage.open(inner_path, "wb", create_dirs=True) as out_file:
merkle_root, piece_size, piecemap_info = site.content_manager.hashBigfile(
self.env['wsgi.input'], upload_info["size"], upload_info["piece_size"], out_file
)
if len(piecemap_info["sha512_pieces"]) == 1: # Small file, don't split
hash = binascii.hexlify(piecemap_info["sha512_pieces"][0].encode())
hash_id = site.content_manager.hashfield.getHashId(hash)
site.content_manager.optionalDownloaded(inner_path, hash_id, upload_info["size"], own=True)
2017-10-04 13:28:59 +02:00
else: # Big file
file_name = helper.getFilename(inner_path)
2019-03-16 02:14:08 +01:00
site.storage.open(upload_info["piecemap"], "wb").write(Msgpack.pack({file_name: piecemap_info}))
2017-10-04 13:28:59 +02:00
# Find piecemap and file relative path to content.json
file_info = site.content_manager.getFileInfo(inner_path, new_file=True)
2017-10-04 13:28:59 +02:00
content_inner_path_dir = helper.getDirname(file_info["content_inner_path"])
piecemap_relative_path = upload_info["piecemap"][len(content_inner_path_dir):]
file_relative_path = inner_path[len(content_inner_path_dir):]
# Add file to content.json
if site.storage.isFile(file_info["content_inner_path"]):
content = site.storage.loadJson(file_info["content_inner_path"])
else:
content = {}
if "files_optional" not in content:
content["files_optional"] = {}
content["files_optional"][file_relative_path] = {
"sha512": merkle_root,
"size": upload_info["size"],
"piecemap": piecemap_relative_path,
"piece_size": piece_size
}
2018-04-04 12:39:46 +02:00
merkle_root_hash_id = site.content_manager.hashfield.getHashId(merkle_root)
site.content_manager.optionalDownloaded(inner_path, merkle_root_hash_id, upload_info["size"], own=True)
2017-10-04 13:28:59 +02:00
site.storage.writeJson(file_info["content_inner_path"], content)
site.content_manager.contents.loadItem(file_info["content_inner_path"]) # reload cache
2017-10-04 13:28:59 +02:00
return {
"merkle_root": merkle_root,
"piece_num": len(piecemap_info["sha512_pieces"]),
"piece_size": piece_size,
"inner_path": inner_path
}
def readMultipartHeaders(self, wsgi_input):
for i in range(100):
line = wsgi_input.readline()
if line == "\r\n":
break
return i
def actionFile(self, file_path, *args, **kwargs):
if kwargs.get("file_size", 0) > 1024 * 1024 and kwargs.get("path_parts"): # Only check files larger than 1MB
path_parts = kwargs["path_parts"]
site = self.server.site_manager.get(path_parts["address"])
big_file = site.storage.openBigfile(path_parts["inner_path"], prebuffer=2 * 1024 * 1024)
2018-07-16 17:32:18 +02:00
if big_file:
kwargs["file_obj"] = big_file
kwargs["file_size"] = big_file.size
2017-10-04 13:28:59 +02:00
return super(UiRequestPlugin, self).actionFile(file_path, *args, **kwargs)
@PluginManager.registerTo("UiWebsocket")
class UiWebsocketPlugin(object):
def actionBigfileUploadInit(self, to, inner_path, size):
valid_signers = self.site.content_manager.getValidSigners(inner_path)
auth_address = self.user.getAuthAddress(self.site.address)
if not self.site.settings["own"] and auth_address not in valid_signers:
self.log.error("FileWrite forbidden %s not in valid_signers %s" % (auth_address, valid_signers))
return self.response(to, {"error": "Forbidden, you can only modify your own files"})
nonce = CryptHash.random()
piece_size = 1024 * 1024
inner_path = self.site.content_manager.sanitizePath(inner_path)
file_info = self.site.content_manager.getFileInfo(inner_path, new_file=True)
2017-10-04 13:28:59 +02:00
content_inner_path_dir = helper.getDirname(file_info["content_inner_path"])
file_relative_path = inner_path[len(content_inner_path_dir):]
upload_nonces[nonce] = {
"added": time.time(),
"site": self.site,
"inner_path": inner_path,
"websocket_client": self,
"size": size,
"piece_size": piece_size,
"piecemap": inner_path + ".piecemap.msgpack"
}
return {
2017-10-04 13:28:59 +02:00
"url": "/ZeroNet-Internal/BigfileUpload?upload_nonce=" + nonce,
2017-11-05 23:34:31 +01:00
"piece_size": piece_size,
2017-10-04 13:28:59 +02:00
"inner_path": inner_path,
"file_relative_path": file_relative_path
}
2017-10-04 13:28:59 +02:00
def actionSiteSetAutodownloadBigfileLimit(self, to, limit):
permissions = self.getPermissions(to)
if "ADMIN" not in permissions:
return self.response(to, "You don't have permission to run this command")
self.site.settings["autodownload_bigfile_size_limit"] = int(limit)
self.response(to, "ok")
def actionFileDelete(self, to, inner_path):
piecemap_inner_path = inner_path + ".piecemap.msgpack"
if self.hasFilePermission(inner_path) and self.site.storage.isFile(piecemap_inner_path):
# Also delete .piecemap.msgpack file if exists
self.log.debug("Deleting piecemap: %s" % piecemap_inner_path)
file_info = self.site.content_manager.getFileInfo(piecemap_inner_path)
if file_info:
content_json = self.site.storage.loadJson(file_info["content_inner_path"])
relative_path = file_info["relative_path"]
if relative_path in content_json.get("files_optional", {}):
del content_json["files_optional"][relative_path]
self.site.storage.writeJson(file_info["content_inner_path"], content_json)
self.site.content_manager.loadContent(file_info["content_inner_path"], add_bad_files=False, force=True)
try:
self.site.storage.delete(piecemap_inner_path)
2019-03-15 21:06:59 +01:00
except Exception as err:
self.log.error("File %s delete error: %s" % (piecemap_inner_path, err))
return super(UiWebsocketPlugin, self).actionFileDelete(to, inner_path)
2017-10-04 13:28:59 +02:00
@PluginManager.registerTo("ContentManager")
class ContentManagerPlugin(object):
def getFileInfo(self, inner_path, *args, **kwargs):
2017-10-04 13:28:59 +02:00
if "|" not in inner_path:
return super(ContentManagerPlugin, self).getFileInfo(inner_path, *args, **kwargs)
2017-10-04 13:28:59 +02:00
inner_path, file_range = inner_path.split("|")
pos_from, pos_to = map(int, file_range.split("-"))
file_info = super(ContentManagerPlugin, self).getFileInfo(inner_path, *args, **kwargs)
2017-10-04 13:28:59 +02:00
return file_info
def readFile(self, file_in, size, buff_size=1024 * 64):
part_num = 0
recv_left = size
while 1:
part_num += 1
read_size = min(buff_size, recv_left)
part = file_in.read(read_size)
if not part:
break
yield part
if part_num % 100 == 0: # Avoid blocking ZeroNet execution during upload
time.sleep(0.001)
recv_left -= read_size
if recv_left <= 0:
break
def hashBigfile(self, file_in, size, piece_size=1024 * 1024, file_out=None):
self.site.settings["has_bigfile"] = True
recv = 0
try:
piece_hash = CryptHash.sha512t()
piece_hashes = []
piece_recv = 0
mt = merkletools.MerkleTools()
mt.hash_function = CryptHash.sha512t
part = ""
for part in self.readFile(file_in, size):
if file_out:
file_out.write(part)
recv += len(part)
piece_recv += len(part)
piece_hash.update(part)
if piece_recv >= piece_size:
piece_digest = piece_hash.digest()
piece_hashes.append(piece_digest)
mt.leaves.append(piece_digest)
piece_hash = CryptHash.sha512t()
piece_recv = 0
if len(piece_hashes) % 100 == 0 or recv == size:
self.log.info("- [HASHING:%.0f%%] Pieces: %s, %.1fMB/%.1fMB" % (
float(recv) / size * 100, len(piece_hashes), recv / 1024 / 1024, size / 1024 / 1024
))
part = ""
if len(part) > 0:
piece_digest = piece_hash.digest()
piece_hashes.append(piece_digest)
mt.leaves.append(piece_digest)
except Exception as err:
raise err
finally:
if file_out:
file_out.close()
mt.make_tree()
return mt.get_merkle_root(), piece_size, {
"sha512_pieces": piece_hashes
}
def hashFile(self, dir_inner_path, file_relative_path, optional=False):
inner_path = dir_inner_path + file_relative_path
file_size = self.site.storage.getSize(inner_path)
# Only care about optional files >1MB
if not optional or file_size < 1 * 1024 * 1024:
return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional)
back = {}
content = self.contents.get(dir_inner_path + "content.json")
hash = None
piecemap_relative_path = None
piece_size = None
# Don't re-hash if it's already in content.json
if content and file_relative_path in content.get("files_optional", {}):
2017-10-04 13:28:59 +02:00
file_node = content["files_optional"][file_relative_path]
if file_node["size"] == file_size:
self.log.info("- [SAME SIZE] %s" % file_relative_path)
hash = file_node.get("sha512")
piecemap_relative_path = file_node.get("piecemap")
piece_size = file_node.get("piece_size")
if not hash or not piecemap_relative_path: # Not in content.json yet
if file_size < 5 * 1024 * 1024: # Don't create piecemap automatically for files smaller than 5MB
return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional)
self.log.info("- [HASHING] %s" % file_relative_path)
merkle_root, piece_size, piecemap_info = self.hashBigfile(self.site.storage.open(inner_path, "rb"), file_size)
if not hash:
hash = merkle_root
if not piecemap_relative_path:
file_name = helper.getFilename(file_relative_path)
piecemap_relative_path = file_relative_path + ".piecemap.msgpack"
piecemap_inner_path = inner_path + ".piecemap.msgpack"
2019-03-16 02:14:08 +01:00
self.site.storage.open(piecemap_inner_path, "wb").write(Msgpack.pack({file_name: piecemap_info}))
back.update(super(ContentManagerPlugin, self).hashFile(dir_inner_path, piecemap_relative_path, optional=True))
2017-10-04 13:28:59 +02:00
piece_num = int(math.ceil(float(file_size) / piece_size))
# Add the merkle root to hashfield
hash_id = self.site.content_manager.hashfield.getHashId(hash)
self.optionalDownloaded(inner_path, hash_id, file_size, own=True)
2017-10-04 13:28:59 +02:00
self.site.storage.piecefields[hash].fromstring("1" * piece_num)
back[file_relative_path] = {"sha512": hash, "size": file_size, "piecemap": piecemap_relative_path, "piece_size": piece_size}
return back
def getPiecemap(self, inner_path):
file_info = self.site.content_manager.getFileInfo(inner_path)
piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"]
self.site.needFile(piecemap_inner_path, priority=20)
2019-03-16 02:14:08 +01:00
piecemap = Msgpack.unpack(self.site.storage.open(piecemap_inner_path).read())[helper.getFilename(inner_path)]
2017-10-04 13:28:59 +02:00
piecemap["piece_size"] = file_info["piece_size"]
return piecemap
def verifyPiece(self, inner_path, pos, piece):
piecemap = self.getPiecemap(inner_path)
2019-03-15 21:06:59 +01:00
piece_i = int(pos / piecemap["piece_size"])
2017-10-04 13:28:59 +02:00
if CryptHash.sha512sum(piece, format="digest") != piecemap["sha512_pieces"][piece_i]:
raise VerifyError("Invalid hash")
return True
def verifyFile(self, inner_path, file, ignore_same=True):
if "|" not in inner_path:
return super(ContentManagerPlugin, self).verifyFile(inner_path, file, ignore_same)
inner_path, file_range = inner_path.split("|")
pos_from, pos_to = map(int, file_range.split("-"))
return self.verifyPiece(inner_path, pos_from, file)
def optionalDownloaded(self, inner_path, hash_id, size=None, own=False):
2017-10-04 13:28:59 +02:00
if "|" in inner_path:
inner_path, file_range = inner_path.split("|")
pos_from, pos_to = map(int, file_range.split("-"))
file_info = self.getFileInfo(inner_path)
# Mark piece downloaded
2019-03-15 21:06:59 +01:00
piece_i = int(pos_from / file_info["piece_size"])
2017-10-04 13:28:59 +02:00
self.site.storage.piecefields[file_info["sha512"]][piece_i] = True
# Only add to site size on first request
if hash_id in self.hashfield:
2017-10-04 13:28:59 +02:00
size = 0
elif size > 1024 * 1024:
file_info = self.getFileInfo(inner_path)
if file_info and "sha512" in file_info: # We already have the file, but not in piecefield
sha512 = file_info["sha512"]
if sha512 not in self.site.storage.piecefields:
self.site.storage.checkBigfile(inner_path)
2017-10-04 13:28:59 +02:00
return super(ContentManagerPlugin, self).optionalDownloaded(inner_path, hash_id, size, own)
2017-10-04 13:28:59 +02:00
def optionalRemoved(self, inner_path, hash_id, size=None):
2017-10-04 13:28:59 +02:00
if size and size > 1024 * 1024:
file_info = self.getFileInfo(inner_path)
sha512 = file_info["sha512"]
if sha512 in self.site.storage.piecefields:
del self.site.storage.piecefields[sha512]
# Also remove other pieces of the file from download queue
2019-03-15 21:06:59 +01:00
for key in list(self.site.bad_files.keys()):
if key.startswith(inner_path + "|"):
del self.site.bad_files[key]
self.site.worker_manager.removeSolvedFileTasks()
return super(ContentManagerPlugin, self).optionalRemoved(inner_path, hash_id, size)
2017-10-04 13:28:59 +02:00
@PluginManager.registerTo("SiteStorage")
class SiteStoragePlugin(object):
def __init__(self, *args, **kwargs):
super(SiteStoragePlugin, self).__init__(*args, **kwargs)
self.piecefields = collections.defaultdict(BigfilePiecefield)
if "piecefields" in self.site.settings.get("cache", {}):
2019-03-15 21:06:59 +01:00
for sha512, piecefield_packed in self.site.settings["cache"].get("piecefields").items():
if piecefield_packed:
2019-03-15 21:06:59 +01:00
self.piecefields[sha512].unpack(base64.b64decode(piecefield_packed))
2017-10-04 13:28:59 +02:00
self.site.settings["cache"]["piecefields"] = {}
def createSparseFile(self, inner_path, size, sha512=None):
file_path = self.getPath(inner_path)
file_dir = os.path.dirname(file_path)
if not os.path.isdir(file_dir):
os.makedirs(file_dir)
2017-10-04 13:28:59 +02:00
f = open(file_path, 'wb')
f.truncate(min(1024 * 1024 * 5, size)) # Only pre-allocate up to 5MB
2017-10-04 13:28:59 +02:00
f.close()
if os.name == "nt":
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
subprocess.call(["fsutil", "sparse", "setflag", file_path], close_fds=True, startupinfo=startupinfo)
2017-10-04 13:28:59 +02:00
if sha512 and sha512 in self.piecefields:
self.log.debug("%s: File not exists, but has piecefield. Deleting piecefield." % inner_path)
del self.piecefields[sha512]
def write(self, inner_path, content):
if "|" not in inner_path:
return super(SiteStoragePlugin, self).write(inner_path, content)
# Write to specific position by passing |{pos} after the filename
inner_path, file_range = inner_path.split("|")
pos_from, pos_to = map(int, file_range.split("-"))
file_path = self.getPath(inner_path)
# Create dir if not exist
file_dir = os.path.dirname(file_path)
if not os.path.isdir(file_dir):
os.makedirs(file_dir)
if not os.path.isfile(file_path):
file_info = self.site.content_manager.getFileInfo(inner_path)
self.createSparseFile(inner_path, file_info["size"])
# Write file
with open(file_path, "rb+") as file:
file.seek(pos_from)
if hasattr(content, 'read'): # File-like object
shutil.copyfileobj(content, file) # Write buff to disk
else: # Simple string
file.write(content)
del content
self.onUpdated(inner_path)
def checkBigfile(self, inner_path):
2017-10-04 13:28:59 +02:00
file_info = self.site.content_manager.getFileInfo(inner_path)
2017-11-19 13:43:30 +01:00
if not file_info or (file_info and "piecemap" not in file_info): # It's not a big file
2017-10-04 13:28:59 +02:00
return False
self.site.settings["has_bigfile"] = True
2017-10-04 13:28:59 +02:00
file_path = self.getPath(inner_path)
sha512 = file_info["sha512"]
piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"]))
if os.path.isfile(file_path):
if sha512 not in self.piecefields:
if open(file_path).read(128) == "\0" * 128:
piece_data = "0"
else:
piece_data = "1"
self.log.debug("%s: File exists, but not in piecefield. Filling piecefiled with %s * %s." % (inner_path, piece_num, piece_data))
self.piecefields[sha512].fromstring(piece_data * piece_num)
else:
self.log.debug("Creating bigfile: %s" % inner_path)
self.createSparseFile(inner_path, file_info["size"], sha512)
self.piecefields[sha512].fromstring("0" * piece_num)
return True
def openBigfile(self, inner_path, prebuffer=0):
if not self.checkBigfile(inner_path):
return False
self.site.needFile(inner_path, blocking=False) # Download piecemap
2017-10-04 13:28:59 +02:00
return BigFile(self.site, inner_path, prebuffer=prebuffer)
class BigFile(object):
def __init__(self, site, inner_path, prebuffer=0):
self.site = site
self.inner_path = inner_path
file_path = site.storage.getPath(inner_path)
file_info = self.site.content_manager.getFileInfo(inner_path)
self.piece_size = file_info["piece_size"]
self.sha512 = file_info["sha512"]
self.size = file_info["size"]
self.prebuffer = prebuffer
self.read_bytes = 0
self.piecefield = self.site.storage.piecefields[self.sha512]
self.f = open(file_path, "rb+")
self.read_lock = gevent.lock.Semaphore()
2017-10-04 13:28:59 +02:00
def read(self, buff=64 * 1024):
with self.read_lock:
pos = self.f.tell()
read_until = min(self.size, pos + buff)
requests = []
# Request all required blocks
2017-10-04 13:28:59 +02:00
while 1:
2019-03-15 21:06:59 +01:00
piece_i = int(pos / self.piece_size)
if piece_i * self.piece_size >= read_until:
2017-10-04 13:28:59 +02:00
break
pos_from = piece_i * self.piece_size
pos_to = pos_from + self.piece_size
if not self.piecefield[piece_i]:
requests.append(self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=10))
2017-10-04 13:28:59 +02:00
pos += self.piece_size
if not all(requests):
return None
# Request prebuffer
if self.prebuffer:
prebuffer_until = min(self.size, read_until + self.prebuffer)
priority = 3
while 1:
2019-03-15 21:06:59 +01:00
piece_i = int(pos / self.piece_size)
if piece_i * self.piece_size >= prebuffer_until:
break
pos_from = piece_i * self.piece_size
pos_to = pos_from + self.piece_size
if not self.piecefield[piece_i]:
self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=max(0, priority))
priority -= 1
pos += self.piece_size
gevent.joinall(requests)
self.read_bytes += buff
# Increase buffer for long reads
if self.read_bytes > 7 * 1024 * 1024 and self.prebuffer < 5 * 1024 * 1024:
self.site.log.debug("%s: Increasing bigfile buffer size to 5MB..." % self.inner_path)
self.prebuffer = 5 * 1024 * 1024
return self.f.read(buff)
def seek(self, pos, whence=0):
with self.read_lock:
if whence == 2: # Relative from file end
pos = self.size + pos # Use the real size instead of size on the disk
whence = 0
return self.f.seek(pos, whence)
2017-10-04 13:28:59 +02:00
def tell(self):
2018-08-16 16:20:21 +02:00
return self.f.tell()
2017-10-04 13:28:59 +02:00
def close(self):
self.f.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
@PluginManager.registerTo("WorkerManager")
class WorkerManagerPlugin(object):
def addTask(self, inner_path, *args, **kwargs):
file_info = kwargs.get("file_info")
if file_info and "piecemap" in file_info: # Bigfile
self.site.settings["has_bigfile"] = True
piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"]
piecemap_task = None
if not self.site.storage.isFile(piecemap_inner_path):
# Start download piecemap
piecemap_task = super(WorkerManagerPlugin, self).addTask(piecemap_inner_path, priority=30)
autodownload_bigfile_size_limit = self.site.settings.get("autodownload_bigfile_size_limit", config.autodownload_bigfile_size_limit)
if "|" not in inner_path and self.site.isDownloadable(inner_path) and file_info["size"] / 1024 / 1024 <= autodownload_bigfile_size_limit:
2017-10-04 13:28:59 +02:00
gevent.spawn_later(0.1, self.site.needFile, inner_path + "|all") # Download all pieces
if "|" in inner_path:
# Start download piece
task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs)
inner_path, file_range = inner_path.split("|")
pos_from, pos_to = map(int, file_range.split("-"))
2019-03-15 21:06:59 +01:00
task["piece_i"] = int(pos_from / file_info["piece_size"])
2017-10-04 13:28:59 +02:00
task["sha512"] = file_info["sha512"]
else:
if inner_path in self.site.bad_files:
del self.site.bad_files[inner_path]
if piecemap_task:
task = piecemap_task
else:
fake_evt = gevent.event.AsyncResult() # Don't download anything if no range specified
fake_evt.set(True)
task = {"evt": fake_evt}
if not self.site.storage.isFile(inner_path):
self.site.storage.createSparseFile(inner_path, file_info["size"], file_info["sha512"])
piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"]))
self.site.storage.piecefields[file_info["sha512"]].fromstring("0" * piece_num)
else:
task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs)
return task
def taskAddPeer(self, task, peer):
if "piece_i" in task:
if not peer.piecefields[task["sha512"]][task["piece_i"]]:
if task["sha512"] not in peer.piecefields:
gevent.spawn(peer.updatePiecefields, force=True)
elif not task["peers"]:
gevent.spawn(peer.updatePiecefields)
return False # Deny to add peers to task if file not in piecefield
return super(WorkerManagerPlugin, self).taskAddPeer(task, peer)
@PluginManager.registerTo("FileRequest")
class FileRequestPlugin(object):
def isReadable(self, site, inner_path, file, pos):
# Peek into file
2019-03-15 21:06:59 +01:00
if file.read(10) == b"\0" * 10:
2017-10-04 13:28:59 +02:00
# Looks empty, but makes sures we don't have that piece
file_info = site.content_manager.getFileInfo(inner_path)
2019-03-15 21:06:59 +01:00
piece_i = int(pos / file_info["piece_size"])
2017-10-04 13:28:59 +02:00
if not site.storage.piecefields[file_info["sha512"]][piece_i]:
return False
# Seek back to position we want to read
file.seek(pos)
return super(FileRequestPlugin, self).isReadable(site, inner_path, file, pos)
def actionGetPiecefields(self, params):
site = self.sites.get(params["site"])
if not site or not site.settings["serving"]: # Site unknown or not serving
self.response({"error": "Unknown site"})
return False
# Add peer to site if not added before
peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True)
if not peer.connection: # Just added
peer.connect(self.connection) # Assign current connection to peer
2019-03-15 21:06:59 +01:00
piecefields_packed = {sha512: piecefield.pack() for sha512, piecefield in site.storage.piecefields.items()}
2017-10-04 13:28:59 +02:00
self.response({"piecefields_packed": piecefields_packed})
def actionSetPiecefields(self, params):
site = self.sites.get(params["site"])
if not site or not site.settings["serving"]: # Site unknown or not serving
self.response({"error": "Unknown site"})
self.connection.badAction(5)
return False
# Add or get peer
peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, connection=self.connection)
if not peer.connection:
peer.connect(self.connection)
peer.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
2019-03-15 21:06:59 +01:00
for sha512, piecefield_packed in params["piecefields_packed"].items():
2017-10-04 13:28:59 +02:00
peer.piecefields[sha512].unpack(piecefield_packed)
site.settings["has_bigfile"] = True
self.response({"ok": "Updated"})
@PluginManager.registerTo("Peer")
class PeerPlugin(object):
def __getattr__(self, key):
if key == "piecefields":
self.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
return self.piecefields
elif key == "time_piecefields_updated":
self.time_piecefields_updated = None
return self.time_piecefields_updated
else:
return super(PeerPlugin, self).__getattr__(key)
@util.Noparallel(ignore_args=True)
def updatePiecefields(self, force=False):
if self.connection and self.connection.handshake.get("rev", 0) < 2190:
return False # Not supported
# Don't update piecefield again in 1 min
if self.time_piecefields_updated and time.time() - self.time_piecefields_updated < 60 and not force:
return False
self.time_piecefields_updated = time.time()
res = self.request("getPiecefields", {"site": self.site.address})
if not res or "error" in res:
return False
self.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
try:
2019-03-15 21:06:59 +01:00
for sha512, piecefield_packed in res["piecefields_packed"].items():
self.piecefields[sha512].unpack(piecefield_packed)
except Exception as err:
self.log("Invalid updatePiecefields response: %s" % Debug.formatException(err))
2017-10-04 13:28:59 +02:00
return self.piecefields
def sendMyHashfield(self, *args, **kwargs):
return super(PeerPlugin, self).sendMyHashfield(*args, **kwargs)
def updateHashfield(self, *args, **kwargs):
if self.site.settings.get("has_bigfile"):
thread = gevent.spawn(self.updatePiecefields, *args, **kwargs)
back = super(PeerPlugin, self).updateHashfield(*args, **kwargs)
thread.join()
return back
else:
return super(PeerPlugin, self).updateHashfield(*args, **kwargs)
def getFile(self, site, inner_path, *args, **kwargs):
if "|" in inner_path:
inner_path, file_range = inner_path.split("|")
pos_from, pos_to = map(int, file_range.split("-"))
kwargs["pos_from"] = pos_from
kwargs["pos_to"] = pos_to
return super(PeerPlugin, self).getFile(site, inner_path, *args, **kwargs)
@PluginManager.registerTo("Site")
class SitePlugin(object):
def isFileDownloadAllowed(self, inner_path, file_info):
if "piecemap" in file_info:
2018-10-30 04:38:18 +01:00
file_size_mb = file_info["size"] / 1024 / 1024
if config.bigfile_size_limit and file_size_mb > config.bigfile_size_limit:
self.log.debug(
"Bigfile size %s too large: %sMB > %sMB, skipping..." %
(inner_path, file_size_mb, config.bigfile_size_limit)
)
return False
2017-10-04 13:28:59 +02:00
file_info = file_info.copy()
file_info["size"] = file_info["piece_size"]
return super(SitePlugin, self).isFileDownloadAllowed(inner_path, file_info)
def getSettingsCache(self):
back = super(SitePlugin, self).getSettingsCache()
if self.storage.piecefields:
2019-03-15 21:06:59 +01:00
back["piecefields"] = {sha512: base64.b64encode(piecefield.pack()).decode("utf8") for sha512, piecefield in self.storage.piecefields.items()}
2017-10-04 13:28:59 +02:00
return back
def needFile(self, inner_path, *args, **kwargs):
if inner_path.endswith("|all"):
@util.Pooled(20)
def pooledNeedBigfile(inner_path, *args, **kwargs):
if inner_path not in self.bad_files:
self.log.debug("Cancelled piece, skipping %s" % inner_path)
return False
return self.needFile(inner_path, *args, **kwargs)
2017-10-04 13:28:59 +02:00
inner_path = inner_path.replace("|all", "")
file_info = self.needFileInfo(inner_path)
file_size = file_info["size"]
piece_size = file_info["piece_size"]
piece_num = int(math.ceil(float(file_size) / piece_size))
file_threads = []
piecefield = self.storage.piecefields.get(file_info["sha512"])
for piece_i in range(piece_num):
piece_from = piece_i * piece_size
piece_to = min(file_size, piece_from + piece_size)
if not piecefield or not piecefield[piece_i]:
inner_path_piece = "%s|%s-%s" % (inner_path, piece_from, piece_to)
self.bad_files[inner_path_piece] = self.bad_files.get(inner_path_piece, 1)
res = pooledNeedBigfile(inner_path_piece, blocking=False)
2017-10-04 13:28:59 +02:00
if res is not True and res is not False:
file_threads.append(res)
gevent.joinall(file_threads)
else:
return super(SitePlugin, self).needFile(inner_path, *args, **kwargs)
@PluginManager.registerTo("ConfigPlugin")
class ConfigPlugin(object):
def createArguments(self):
group = self.parser.add_argument_group("Bigfile plugin")
2018-03-10 02:00:12 +01:00
group.add_argument('--autodownload_bigfile_size_limit', help='Also download bigfiles smaller than this limit if help distribute option is checked', default=1, metavar="MB", type=int)
2018-10-30 04:38:18 +01:00
group.add_argument('--bigfile_size_limit', help='Maximum size of downloaded big files', default=False, metavar="MB", type=int)
2017-10-04 13:28:59 +02:00
return super(ConfigPlugin, self).createArguments()