ZeroNet/plugins/Bigfile/BigfilePlugin.py

767 lines
33 KiB
Python

import time
import os
import subprocess
import shutil
import collections
import math
import gevent
import gevent.lock
from Plugin import PluginManager
from Debug import Debug
from Crypt import CryptHash
from lib import merkletools
from util import helper
import util
from .BigfilePiecefield import BigfilePiecefield, BigfilePiecefieldPacked
# We can only import plugin host clases after the plugins are loaded
@PluginManager.afterLoad
def importPluginnedClasses():
global VerifyError, config
from Content.ContentManager import VerifyError
from Config import config
if "upload_nonces" not in locals():
upload_nonces = {}
@PluginManager.registerTo("UiRequest")
class UiRequestPlugin(object):
def isCorsAllowed(self, path):
if path == "/ZeroNet-Internal/BigfileUpload":
return True
else:
return super(UiRequestPlugin, self).isCorsAllowed(path)
def actionBigfileUpload(self):
nonce = self.get.get("upload_nonce")
if nonce not in upload_nonces:
return self.error403("Upload nonce error.")
upload_info = upload_nonces[nonce]
del upload_nonces[nonce]
self.sendHeader(200, "text/html", noscript=True, extra_headers={
"Access-Control-Allow-Origin": "null",
"Access-Control-Allow-Credentials": "true"
})
self.readMultipartHeaders(self.env['wsgi.input']) # Skip http headers
site = upload_info["site"]
inner_path = upload_info["inner_path"]
with site.storage.open(inner_path, "wb", create_dirs=True) as out_file:
merkle_root, piece_size, piecemap_info = site.content_manager.hashBigfile(
self.env['wsgi.input'], upload_info["size"], upload_info["piece_size"], out_file
)
if len(piecemap_info["sha512_pieces"]) == 1: # Small file, don't split
hash = piecemap_info["sha512_pieces"][0].hex()
hash_id = site.content_manager.hashfield.getHashId(hash)
site.content_manager.optionalDownloaded(inner_path, hash_id, upload_info["size"], own=True)
else: # Big file
file_name = helper.getFilename(inner_path)
msgpack.pack({file_name: piecemap_info}, site.storage.open(upload_info["piecemap"], "wb"))
# Find piecemap and file relative path to content.json
file_info = site.content_manager.getFileInfo(inner_path, new_file=True)
content_inner_path_dir = helper.getDirname(file_info["content_inner_path"])
piecemap_relative_path = upload_info["piecemap"][len(content_inner_path_dir):]
file_relative_path = inner_path[len(content_inner_path_dir):]
# Add file to content.json
if site.storage.isFile(file_info["content_inner_path"]):
content = site.storage.loadJson(file_info["content_inner_path"])
else:
content = {}
if "files_optional" not in content:
content["files_optional"] = {}
content["files_optional"][file_relative_path] = {
"sha512": merkle_root,
"size": upload_info["size"],
"piecemap": piecemap_relative_path,
"piece_size": piece_size
}
merkle_root_hash_id = site.content_manager.hashfield.getHashId(merkle_root)
site.content_manager.optionalDownloaded(inner_path, merkle_root_hash_id, upload_info["size"], own=True)
site.storage.writeJson(file_info["content_inner_path"], content)
site.content_manager.contents.loadItem(file_info["content_inner_path"]) # reload cache
return {
"merkle_root": merkle_root,
"piece_num": len(piecemap_info["sha512_pieces"]),
"piece_size": piece_size,
"inner_path": inner_path
}
def readMultipartHeaders(self, wsgi_input):
for i in range(100):
line = wsgi_input.readline()
if line == "\r\n":
break
return i
def actionFile(self, file_path, *args, **kwargs):
if kwargs.get("file_size", 0) > 1024 * 1024 and kwargs.get("path_parts"): # Only check files larger than 1MB
path_parts = kwargs["path_parts"]
site = self.server.site_manager.get(path_parts["address"])
big_file = site.storage.openBigfile(path_parts["inner_path"], prebuffer=2 * 1024 * 1024)
if big_file:
kwargs["file_obj"] = big_file
kwargs["file_size"] = big_file.size
return super(UiRequestPlugin, self).actionFile(file_path, *args, **kwargs)
@PluginManager.registerTo("UiWebsocket")
class UiWebsocketPlugin(object):
def actionBigfileUploadInit(self, to, inner_path, size):
valid_signers = self.site.content_manager.getValidSigners(inner_path)
auth_address = self.user.getAuthAddress(self.site.address)
if not self.site.settings["own"] and auth_address not in valid_signers:
self.log.error("FileWrite forbidden %s not in valid_signers %s" % (auth_address, valid_signers))
return self.response(to, {"error": "Forbidden, you can only modify your own files"})
nonce = CryptHash.random()
piece_size = 1024 * 1024
inner_path = self.site.content_manager.sanitizePath(inner_path)
file_info = self.site.content_manager.getFileInfo(inner_path, new_file=True)
content_inner_path_dir = helper.getDirname(file_info["content_inner_path"])
file_relative_path = inner_path[len(content_inner_path_dir):]
upload_nonces[nonce] = {
"added": time.time(),
"site": self.site,
"inner_path": inner_path,
"websocket_client": self,
"size": size,
"piece_size": piece_size,
"piecemap": inner_path + ".piecemap.msgpack"
}
return {
"url": "/ZeroNet-Internal/BigfileUpload?upload_nonce=" + nonce,
"piece_size": piece_size,
"inner_path": inner_path,
"file_relative_path": file_relative_path
}
def actionSiteSetAutodownloadBigfileLimit(self, to, limit):
permissions = self.getPermissions(to)
if "ADMIN" not in permissions:
return self.response(to, "You don't have permission to run this command")
self.site.settings["autodownload_bigfile_size_limit"] = int(limit)
self.response(to, "ok")
def actionFileDelete(self, to, inner_path):
piecemap_inner_path = inner_path + ".piecemap.msgpack"
if self.hasFilePermission(inner_path) and self.site.storage.isFile(piecemap_inner_path):
# Also delete .piecemap.msgpack file if exists
self.log.debug("Deleting piecemap: %s" % piecemap_inner_path)
file_info = self.site.content_manager.getFileInfo(piecemap_inner_path)
if file_info:
content_json = self.site.storage.loadJson(file_info["content_inner_path"])
relative_path = file_info["relative_path"]
if relative_path in content_json.get("files_optional", {}):
del content_json["files_optional"][relative_path]
self.site.storage.writeJson(file_info["content_inner_path"], content_json)
self.site.content_manager.loadContent(file_info["content_inner_path"], add_bad_files=False, force=True)
try:
self.site.storage.delete(piecemap_inner_path)
except Exception as err:
self.log.error("File %s delete error: %s" % (piecemap_inner_path, err))
return super(UiWebsocketPlugin, self).actionFileDelete(to, inner_path)
@PluginManager.registerTo("ContentManager")
class ContentManagerPlugin(object):
def getFileInfo(self, inner_path, *args, **kwargs):
if "|" not in inner_path:
return super(ContentManagerPlugin, self).getFileInfo(inner_path, *args, **kwargs)
inner_path, file_range = inner_path.split("|")
pos_from, pos_to = map(int, file_range.split("-"))
file_info = super(ContentManagerPlugin, self).getFileInfo(inner_path, *args, **kwargs)
return file_info
def readFile(self, file_in, size, buff_size=1024 * 64):
part_num = 0
recv_left = size
while 1:
part_num += 1
read_size = min(buff_size, recv_left)
part = file_in.read(read_size)
if not part:
break
yield part
if part_num % 100 == 0: # Avoid blocking ZeroNet execution during upload
time.sleep(0.001)
recv_left -= read_size
if recv_left <= 0:
break
def hashBigfile(self, file_in, size, piece_size=1024 * 1024, file_out=None):
self.site.settings["has_bigfile"] = True
recv = 0
try:
piece_hash = CryptHash.sha512t()
piece_hashes = []
piece_recv = 0
mt = merkletools.MerkleTools()
mt.hash_function = CryptHash.sha512t
part = ""
for part in self.readFile(file_in, size):
if file_out:
file_out.write(part)
recv += len(part)
piece_recv += len(part)
piece_hash.update(part)
if piece_recv >= piece_size:
piece_digest = piece_hash.digest()
piece_hashes.append(piece_digest)
mt.leaves.append(piece_digest)
piece_hash = CryptHash.sha512t()
piece_recv = 0
if len(piece_hashes) % 100 == 0 or recv == size:
self.log.info("- [HASHING:%.0f%%] Pieces: %s, %.1fMB/%.1fMB" % (
float(recv) / size * 100, len(piece_hashes), recv / 1024 / 1024, size / 1024 / 1024
))
part = ""
if len(part) > 0:
piece_digest = piece_hash.digest()
piece_hashes.append(piece_digest)
mt.leaves.append(piece_digest)
except Exception as err:
raise err
finally:
if file_out:
file_out.close()
mt.make_tree()
return mt.get_merkle_root(), piece_size, {
"sha512_pieces": piece_hashes
}
def hashFile(self, dir_inner_path, file_relative_path, optional=False):
inner_path = dir_inner_path + file_relative_path
file_size = self.site.storage.getSize(inner_path)
# Only care about optional files >1MB
if not optional or file_size < 1 * 1024 * 1024:
return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional)
back = {}
content = self.contents.get(dir_inner_path + "content.json")
hash = None
piecemap_relative_path = None
piece_size = None
# Don't re-hash if it's already in content.json
if content and file_relative_path in content.get("files_optional", {}):
file_node = content["files_optional"][file_relative_path]
if file_node["size"] == file_size:
self.log.info("- [SAME SIZE] %s" % file_relative_path)
hash = file_node.get("sha512")
piecemap_relative_path = file_node.get("piecemap")
piece_size = file_node.get("piece_size")
if not hash or not piecemap_relative_path: # Not in content.json yet
if file_size < 5 * 1024 * 1024: # Don't create piecemap automatically for files smaller than 5MB
return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional)
self.log.info("- [HASHING] %s" % file_relative_path)
merkle_root, piece_size, piecemap_info = self.hashBigfile(self.site.storage.open(inner_path, "rb"), file_size)
if not hash:
hash = merkle_root
if not piecemap_relative_path:
file_name = helper.getFilename(file_relative_path)
piecemap_relative_path = file_relative_path + ".piecemap.msgpack"
piecemap_inner_path = inner_path + ".piecemap.msgpack"
msgpack.pack({file_name: piecemap_info}, self.site.storage.open(piecemap_inner_path, "wb"))
back.update(super(ContentManagerPlugin, self).hashFile(dir_inner_path, piecemap_relative_path, optional=True))
piece_num = int(math.ceil(float(file_size) / piece_size))
# Add the merkle root to hashfield
hash_id = self.site.content_manager.hashfield.getHashId(hash)
self.optionalDownloaded(inner_path, hash_id, file_size, own=True)
self.site.storage.piecefields[hash].fromstring("1" * piece_num)
back[file_relative_path] = {"sha512": hash, "size": file_size, "piecemap": piecemap_relative_path, "piece_size": piece_size}
return back
def getPiecemap(self, inner_path):
file_info = self.site.content_manager.getFileInfo(inner_path)
piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"]
self.site.needFile(piecemap_inner_path, priority=20)
piecemap = msgpack.unpack(self.site.storage.open(piecemap_inner_path))[helper.getFilename(inner_path)]
piecemap["piece_size"] = file_info["piece_size"]
return piecemap
def verifyPiece(self, inner_path, pos, piece):
piecemap = self.getPiecemap(inner_path)
piece_i = int(pos / piecemap["piece_size"])
if CryptHash.sha512sum(piece, format="digest") != piecemap["sha512_pieces"][piece_i]:
raise VerifyError("Invalid hash")
return True
def verifyFile(self, inner_path, file, ignore_same=True):
if "|" not in inner_path:
return super(ContentManagerPlugin, self).verifyFile(inner_path, file, ignore_same)
inner_path, file_range = inner_path.split("|")
pos_from, pos_to = map(int, file_range.split("-"))
return self.verifyPiece(inner_path, pos_from, file)
def optionalDownloaded(self, inner_path, hash_id, size=None, own=False):
if "|" in inner_path:
inner_path, file_range = inner_path.split("|")
pos_from, pos_to = map(int, file_range.split("-"))
file_info = self.getFileInfo(inner_path)
# Mark piece downloaded
piece_i = int(pos_from / file_info["piece_size"])
self.site.storage.piecefields[file_info["sha512"]][piece_i] = True
# Only add to site size on first request
if hash_id in self.hashfield:
size = 0
elif size > 1024 * 1024:
file_info = self.getFileInfo(inner_path)
if file_info and "sha512" in file_info: # We already have the file, but not in piecefield
sha512 = file_info["sha512"]
if sha512 not in self.site.storage.piecefields:
self.site.storage.checkBigfile(inner_path)
return super(ContentManagerPlugin, self).optionalDownloaded(inner_path, hash_id, size, own)
def optionalRemoved(self, inner_path, hash_id, size=None):
if size and size > 1024 * 1024:
file_info = self.getFileInfo(inner_path)
sha512 = file_info["sha512"]
if sha512 in self.site.storage.piecefields:
del self.site.storage.piecefields[sha512]
# Also remove other pieces of the file from download queue
for key in list(self.site.bad_files.keys()):
if key.startswith(inner_path + "|"):
del self.site.bad_files[key]
self.site.worker_manager.removeSolvedFileTasks()
return super(ContentManagerPlugin, self).optionalRemoved(inner_path, hash_id, size)
@PluginManager.registerTo("SiteStorage")
class SiteStoragePlugin(object):
def __init__(self, *args, **kwargs):
super(SiteStoragePlugin, self).__init__(*args, **kwargs)
self.piecefields = collections.defaultdict(BigfilePiecefield)
if "piecefields" in self.site.settings.get("cache", {}):
for sha512, piecefield_packed in self.site.settings["cache"].get("piecefields").items():
if piecefield_packed:
self.piecefields[sha512].unpack(base64.b64decode(piecefield_packed))
self.site.settings["cache"]["piecefields"] = {}
def createSparseFile(self, inner_path, size, sha512=None):
file_path = self.getPath(inner_path)
file_dir = os.path.dirname(file_path)
if not os.path.isdir(file_dir):
os.makedirs(file_dir)
f = open(file_path, 'wb')
f.truncate(min(1024 * 1024 * 5, size)) # Only pre-allocate up to 5MB
f.close()
if os.name == "nt":
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
subprocess.call(["fsutil", "sparse", "setflag", file_path], close_fds=True, startupinfo=startupinfo)
if sha512 and sha512 in self.piecefields:
self.log.debug("%s: File not exists, but has piecefield. Deleting piecefield." % inner_path)
del self.piecefields[sha512]
def write(self, inner_path, content):
if "|" not in inner_path:
return super(SiteStoragePlugin, self).write(inner_path, content)
# Write to specific position by passing |{pos} after the filename
inner_path, file_range = inner_path.split("|")
pos_from, pos_to = map(int, file_range.split("-"))
file_path = self.getPath(inner_path)
# Create dir if not exist
file_dir = os.path.dirname(file_path)
if not os.path.isdir(file_dir):
os.makedirs(file_dir)
if not os.path.isfile(file_path):
file_info = self.site.content_manager.getFileInfo(inner_path)
self.createSparseFile(inner_path, file_info["size"])
# Write file
with open(file_path, "rb+") as file:
file.seek(pos_from)
if hasattr(content, 'read'): # File-like object
shutil.copyfileobj(content, file) # Write buff to disk
else: # Simple string
file.write(content)
del content
self.onUpdated(inner_path)
def checkBigfile(self, inner_path):
file_info = self.site.content_manager.getFileInfo(inner_path)
if not file_info or (file_info and "piecemap" not in file_info): # It's not a big file
return False
self.site.settings["has_bigfile"] = True
file_path = self.getPath(inner_path)
sha512 = file_info["sha512"]
piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"]))
if os.path.isfile(file_path):
if sha512 not in self.piecefields:
if open(file_path).read(128) == "\0" * 128:
piece_data = "0"
else:
piece_data = "1"
self.log.debug("%s: File exists, but not in piecefield. Filling piecefiled with %s * %s." % (inner_path, piece_num, piece_data))
self.piecefields[sha512].fromstring(piece_data * piece_num)
else:
self.log.debug("Creating bigfile: %s" % inner_path)
self.createSparseFile(inner_path, file_info["size"], sha512)
self.piecefields[sha512].fromstring("0" * piece_num)
return True
def openBigfile(self, inner_path, prebuffer=0):
if not self.checkBigfile(inner_path):
return False
self.site.needFile(inner_path, blocking=False) # Download piecemap
return BigFile(self.site, inner_path, prebuffer=prebuffer)
class BigFile(object):
def __init__(self, site, inner_path, prebuffer=0):
self.site = site
self.inner_path = inner_path
file_path = site.storage.getPath(inner_path)
file_info = self.site.content_manager.getFileInfo(inner_path)
self.piece_size = file_info["piece_size"]
self.sha512 = file_info["sha512"]
self.size = file_info["size"]
self.prebuffer = prebuffer
self.read_bytes = 0
self.piecefield = self.site.storage.piecefields[self.sha512]
self.f = open(file_path, "rb+")
self.read_lock = gevent.lock.Semaphore()
def read(self, buff=64 * 1024):
with self.read_lock:
pos = self.f.tell()
read_until = min(self.size, pos + buff)
requests = []
# Request all required blocks
while 1:
piece_i = int(pos / self.piece_size)
if piece_i * self.piece_size >= read_until:
break
pos_from = piece_i * self.piece_size
pos_to = pos_from + self.piece_size
if not self.piecefield[piece_i]:
requests.append(self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=10))
pos += self.piece_size
if not all(requests):
return None
# Request prebuffer
if self.prebuffer:
prebuffer_until = min(self.size, read_until + self.prebuffer)
priority = 3
while 1:
piece_i = int(pos / self.piece_size)
if piece_i * self.piece_size >= prebuffer_until:
break
pos_from = piece_i * self.piece_size
pos_to = pos_from + self.piece_size
if not self.piecefield[piece_i]:
self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=max(0, priority))
priority -= 1
pos += self.piece_size
gevent.joinall(requests)
self.read_bytes += buff
# Increase buffer for long reads
if self.read_bytes > 7 * 1024 * 1024 and self.prebuffer < 5 * 1024 * 1024:
self.site.log.debug("%s: Increasing bigfile buffer size to 5MB..." % self.inner_path)
self.prebuffer = 5 * 1024 * 1024
return self.f.read(buff)
def seek(self, pos, whence=0):
with self.read_lock:
if whence == 2: # Relative from file end
pos = self.size + pos # Use the real size instead of size on the disk
whence = 0
return self.f.seek(pos, whence)
def tell(self):
return self.f.tell()
def close(self):
self.f.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
@PluginManager.registerTo("WorkerManager")
class WorkerManagerPlugin(object):
def addTask(self, inner_path, *args, **kwargs):
file_info = kwargs.get("file_info")
if file_info and "piecemap" in file_info: # Bigfile
self.site.settings["has_bigfile"] = True
piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"]
piecemap_task = None
if not self.site.storage.isFile(piecemap_inner_path):
# Start download piecemap
piecemap_task = super(WorkerManagerPlugin, self).addTask(piecemap_inner_path, priority=30)
autodownload_bigfile_size_limit = self.site.settings.get("autodownload_bigfile_size_limit", config.autodownload_bigfile_size_limit)
if "|" not in inner_path and self.site.isDownloadable(inner_path) and file_info["size"] / 1024 / 1024 <= autodownload_bigfile_size_limit:
gevent.spawn_later(0.1, self.site.needFile, inner_path + "|all") # Download all pieces
if "|" in inner_path:
# Start download piece
task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs)
inner_path, file_range = inner_path.split("|")
pos_from, pos_to = map(int, file_range.split("-"))
task["piece_i"] = int(pos_from / file_info["piece_size"])
task["sha512"] = file_info["sha512"]
else:
if inner_path in self.site.bad_files:
del self.site.bad_files[inner_path]
if piecemap_task:
task = piecemap_task
else:
fake_evt = gevent.event.AsyncResult() # Don't download anything if no range specified
fake_evt.set(True)
task = {"evt": fake_evt}
if not self.site.storage.isFile(inner_path):
self.site.storage.createSparseFile(inner_path, file_info["size"], file_info["sha512"])
piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"]))
self.site.storage.piecefields[file_info["sha512"]].fromstring("0" * piece_num)
else:
task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs)
return task
def taskAddPeer(self, task, peer):
if "piece_i" in task:
if not peer.piecefields[task["sha512"]][task["piece_i"]]:
if task["sha512"] not in peer.piecefields:
gevent.spawn(peer.updatePiecefields, force=True)
elif not task["peers"]:
gevent.spawn(peer.updatePiecefields)
return False # Deny to add peers to task if file not in piecefield
return super(WorkerManagerPlugin, self).taskAddPeer(task, peer)
@PluginManager.registerTo("FileRequest")
class FileRequestPlugin(object):
def isReadable(self, site, inner_path, file, pos):
# Peek into file
if file.read(10) == b"\0" * 10:
# Looks empty, but makes sures we don't have that piece
file_info = site.content_manager.getFileInfo(inner_path)
piece_i = int(pos / file_info["piece_size"])
if not site.storage.piecefields[file_info["sha512"]][piece_i]:
return False
# Seek back to position we want to read
file.seek(pos)
return super(FileRequestPlugin, self).isReadable(site, inner_path, file, pos)
def actionGetPiecefields(self, params):
site = self.sites.get(params["site"])
if not site or not site.settings["serving"]: # Site unknown or not serving
self.response({"error": "Unknown site"})
return False
# Add peer to site if not added before
peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True)
if not peer.connection: # Just added
peer.connect(self.connection) # Assign current connection to peer
piecefields_packed = {sha512: piecefield.pack() for sha512, piecefield in site.storage.piecefields.items()}
self.response({"piecefields_packed": piecefields_packed})
def actionSetPiecefields(self, params):
site = self.sites.get(params["site"])
if not site or not site.settings["serving"]: # Site unknown or not serving
self.response({"error": "Unknown site"})
self.connection.badAction(5)
return False
# Add or get peer
peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, connection=self.connection)
if not peer.connection:
peer.connect(self.connection)
peer.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
for sha512, piecefield_packed in params["piecefields_packed"].items():
peer.piecefields[sha512].unpack(piecefield_packed)
site.settings["has_bigfile"] = True
self.response({"ok": "Updated"})
@PluginManager.registerTo("Peer")
class PeerPlugin(object):
def __getattr__(self, key):
if key == "piecefields":
self.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
return self.piecefields
elif key == "time_piecefields_updated":
self.time_piecefields_updated = None
return self.time_piecefields_updated
else:
return super(PeerPlugin, self).__getattr__(key)
@util.Noparallel(ignore_args=True)
def updatePiecefields(self, force=False):
if self.connection and self.connection.handshake.get("rev", 0) < 2190:
return False # Not supported
# Don't update piecefield again in 1 min
if self.time_piecefields_updated and time.time() - self.time_piecefields_updated < 60 and not force:
return False
self.time_piecefields_updated = time.time()
res = self.request("getPiecefields", {"site": self.site.address})
if not res or "error" in res:
return False
self.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
try:
for sha512, piecefield_packed in res["piecefields_packed"].items():
self.piecefields[sha512].unpack(piecefield_packed)
except Exception as err:
self.log("Invalid updatePiecefields response: %s" % Debug.formatException(err))
return self.piecefields
def sendMyHashfield(self, *args, **kwargs):
return super(PeerPlugin, self).sendMyHashfield(*args, **kwargs)
def updateHashfield(self, *args, **kwargs):
if self.site.settings.get("has_bigfile"):
thread = gevent.spawn(self.updatePiecefields, *args, **kwargs)
back = super(PeerPlugin, self).updateHashfield(*args, **kwargs)
thread.join()
return back
else:
return super(PeerPlugin, self).updateHashfield(*args, **kwargs)
def getFile(self, site, inner_path, *args, **kwargs):
if "|" in inner_path:
inner_path, file_range = inner_path.split("|")
pos_from, pos_to = map(int, file_range.split("-"))
kwargs["pos_from"] = pos_from
kwargs["pos_to"] = pos_to
return super(PeerPlugin, self).getFile(site, inner_path, *args, **kwargs)
@PluginManager.registerTo("Site")
class SitePlugin(object):
def isFileDownloadAllowed(self, inner_path, file_info):
if "piecemap" in file_info:
file_size_mb = file_info["size"] / 1024 / 1024
if config.bigfile_size_limit and file_size_mb > config.bigfile_size_limit:
self.log.debug(
"Bigfile size %s too large: %sMB > %sMB, skipping..." %
(inner_path, file_size_mb, config.bigfile_size_limit)
)
return False
file_info = file_info.copy()
file_info["size"] = file_info["piece_size"]
return super(SitePlugin, self).isFileDownloadAllowed(inner_path, file_info)
def getSettingsCache(self):
back = super(SitePlugin, self).getSettingsCache()
if self.storage.piecefields:
back["piecefields"] = {sha512: base64.b64encode(piecefield.pack()).decode("utf8") for sha512, piecefield in self.storage.piecefields.items()}
return back
def needFile(self, inner_path, *args, **kwargs):
if inner_path.endswith("|all"):
@util.Pooled(20)
def pooledNeedBigfile(inner_path, *args, **kwargs):
if inner_path not in self.bad_files:
self.log.debug("Cancelled piece, skipping %s" % inner_path)
return False
return self.needFile(inner_path, *args, **kwargs)
inner_path = inner_path.replace("|all", "")
file_info = self.needFileInfo(inner_path)
file_size = file_info["size"]
piece_size = file_info["piece_size"]
piece_num = int(math.ceil(float(file_size) / piece_size))
file_threads = []
piecefield = self.storage.piecefields.get(file_info["sha512"])
for piece_i in range(piece_num):
piece_from = piece_i * piece_size
piece_to = min(file_size, piece_from + piece_size)
if not piecefield or not piecefield[piece_i]:
inner_path_piece = "%s|%s-%s" % (inner_path, piece_from, piece_to)
self.bad_files[inner_path_piece] = self.bad_files.get(inner_path_piece, 1)
res = pooledNeedBigfile(inner_path_piece, blocking=False)
if res is not True and res is not False:
file_threads.append(res)
gevent.joinall(file_threads)
else:
return super(SitePlugin, self).needFile(inner_path, *args, **kwargs)
@PluginManager.registerTo("ConfigPlugin")
class ConfigPlugin(object):
def createArguments(self):
group = self.parser.add_argument_group("Bigfile plugin")
group.add_argument('--autodownload_bigfile_size_limit', help='Also download bigfiles smaller than this limit if help distribute option is checked', default=1, metavar="MB", type=int)
group.add_argument('--bigfile_size_limit', help='Maximum size of downloaded big files', default=False, metavar="MB", type=int)
return super(ConfigPlugin, self).createArguments()