Initial version of bigfile plugin

This commit is contained in:
shortcutme 2017-10-04 13:28:59 +02:00
parent f7ce401564
commit cf1154f2c5
No known key found for this signature in database
GPG Key ID: 5B63BAE6CB9613AE
6 changed files with 1311 additions and 0 deletions

View File

@ -0,0 +1,158 @@
import array
def packPiecefield(data):
res = []
if not data:
return array.array("H", "")
if data[0] == "0":
res.append(0)
find = "1"
else:
find = "0"
last_pos = 0
pos = 0
while 1:
pos = data.find(find, pos)
if find == "0":
find = "1"
else:
find = "0"
if pos == -1:
res.append(len(data) - last_pos)
break
res.append(pos - last_pos)
last_pos = pos
return array.array("H", res)
def unpackPiecefield(data):
if not data:
return ""
res = []
char = "1"
for times in data:
if times > 10000:
return ""
res.append(char * times)
if char == "1":
char = "0"
else:
char = "1"
return "".join(res)
class BigfilePiecefield(object):
__slots__ = ["data"]
def __init__(self):
self.data = ""
def fromstring(self, s):
self.data = s
def tostring(self):
return self.data
def pack(self):
return packPiecefield(self.data).tostring()
def unpack(self, s):
self.data = unpackPiecefield(array.array("H", s))
def __getitem__(self, key):
try:
return int(self.data[key])
except IndexError:
return False
def __setitem__(self, key, value):
data = self.data
if len(data) < key:
data = data.ljust(key+1, "0")
data = data[:key] + str(int(value)) + data[key + 1:]
self.data = data
class BigfilePiecefieldPacked(object):
__slots__ = ["data"]
def __init__(self):
self.data = ""
def fromstring(self, data):
self.data = packPiecefield(data).tostring()
def tostring(self):
return unpackPiecefield(array.array("H", self.data))
def pack(self):
return array.array("H", self.data).tostring()
def unpack(self, data):
self.data = data
def __getitem__(self, key):
try:
return int(self.tostring()[key])
except IndexError:
return False
def __setitem__(self, key, value):
data = self.tostring()
if len(data) < key:
data = data.ljust(key+1, "0")
data = data[:key] + str(int(value)) + data[key + 1:]
self.fromstring(data)
if __name__ == "__main__":
import os
import psutil
import time
testdata = "1" * 100 + "0" * 900 + "1" * 4000 + "0" * 4999 + "1"
meminfo = psutil.Process(os.getpid()).memory_info
for storage in [BigfilePiecefieldPacked, BigfilePiecefield]:
print "-- Testing storage: %s --" % storage
m = meminfo()[0]
s = time.time()
piecefields = {}
for i in range(10000):
piecefield = storage()
piecefield.fromstring(testdata[:i] + "0" + testdata[i + 1:])
piecefields[i] = piecefield
print "Create x10000: +%sKB in %.3fs (len: %s)" % ((meminfo()[0] - m) / 1024, time.time() - s, len(piecefields[0].data))
m = meminfo()[0]
s = time.time()
for piecefield in piecefields.values():
val = piecefield[1000]
print "Query one x10000: +%sKB in %.3fs" % ((meminfo()[0] - m) / 1024, time.time() - s)
m = meminfo()[0]
s = time.time()
for piecefield in piecefields.values():
piecefield[1000] = True
print "Change one x10000: +%sKB in %.3fs" % ((meminfo()[0] - m) / 1024, time.time() - s)
m = meminfo()[0]
s = time.time()
for piecefield in piecefields.values():
packed = piecefield.pack()
print "Pack x10000: +%sKB in %.3fs (len: %s)" % ((meminfo()[0] - m) / 1024, time.time() - s, len(packed))
m = meminfo()[0]
s = time.time()
for piecefield in piecefields.values():
piecefield.unpack(packed)
print "Unpack x10000: +%sKB in %.3fs (len: %s)" % ((meminfo()[0] - m) / 1024, time.time() - s, len(piecefields[0].data))
piecefields = {}

View File

@ -0,0 +1,678 @@
import time
import os
import subprocess
import shutil
import collections
import gevent
import math
import msgpack
from Plugin import PluginManager
from Crypt import CryptHash
from lib import merkletools
from util import helper
import util
from BigfilePiecefield import BigfilePiecefield, BigfilePiecefieldPacked
# We can only import plugin host clases after the plugins are loaded
@PluginManager.afterLoad
def importPluginnedClasses():
global VerifyError, config
from Content.ContentManager import VerifyError
from Config import config
if "upload_nonces" not in locals():
upload_nonces = {}
@PluginManager.registerTo("UiRequest")
class UiRequestPlugin(object):
def isCorsAllowed(self, path):
if path == "/ZeroNet-Internal/BigfileUpload":
return True
else:
return super(UiRequestPlugin, self).isCorsAllowed(path)
def actionBigfileUpload(self):
nonce = self.get.get("upload_nonce")
if nonce not in upload_nonces:
return self.error403("Upload nonce error.")
upload_info = upload_nonces[nonce]
del upload_nonces[nonce]
self.sendHeader(200, "text/html", noscript=True, extra_headers=[
("Access-Control-Allow-Origin", "null"),
("Access-Control-Allow-Credentials", "true")
])
self.readMultipartHeaders(self.env['wsgi.input']) # Skip http headers
site = upload_info["site"]
inner_path = upload_info["inner_path"]
with site.storage.open(inner_path, "wb", create_dirs=True) as out_file:
merkle_root, piece_size, piecemap_info = site.content_manager.hashBigfile(
self.env['wsgi.input'], upload_info["size"], upload_info["piece_size"], out_file
)
if len(piecemap_info["sha512_pieces"]) == 1: # Small file, don't split
hash = piecemap_info["sha512_pieces"][0].encode("hex")
site.content_manager.optionalDownloaded(inner_path, hash, upload_info["size"], own=True)
else: # Big file
file_name = helper.getFilename(inner_path)
msgpack.pack({file_name: piecemap_info}, site.storage.open(upload_info["piecemap"], "wb"))
# Find piecemap and file relative path to content.json
file_info = site.content_manager.getFileInfo(inner_path)
content_inner_path_dir = helper.getDirname(file_info["content_inner_path"])
piecemap_relative_path = upload_info["piecemap"][len(content_inner_path_dir):]
file_relative_path = inner_path[len(content_inner_path_dir):]
# Add file to content.json
if site.storage.isFile(file_info["content_inner_path"]):
content = site.storage.loadJson(file_info["content_inner_path"])
else:
content = {}
if "files_optional" not in content:
content["files_optional"] = {}
content["files_optional"][file_relative_path] = {
"sha512": merkle_root,
"size": upload_info["size"],
"piecemap": piecemap_relative_path,
"piece_size": piece_size
}
site.content_manager.optionalDownloaded(inner_path, merkle_root, upload_info["size"], own=True)
site.storage.writeJson(file_info["content_inner_path"], content)
return {
"merkle_root": merkle_root,
"piece_num": len(piecemap_info["sha512_pieces"]),
"piece_size": piece_size,
"inner_path": inner_path
}
def readMultipartHeaders(self, wsgi_input):
for i in range(100):
line = wsgi_input.readline()
if line == "\r\n":
break
return i
def actionFile(self, file_path, *args, **kwargs):
if kwargs.get("file_size", 0) > 1024 * 1024 and kwargs.get("path_parts"): # Only check files larger than 1MB
path_parts = kwargs["path_parts"]
site = self.server.site_manager.get(path_parts["address"])
kwargs["file_obj"] = site.storage.openBigfile(path_parts["inner_path"], prebuffer=2 * 1024 * 1024)
return super(UiRequestPlugin, self).actionFile(file_path, *args, **kwargs)
@PluginManager.registerTo("UiWebsocket")
class UiWebsocketPlugin(object):
def actionBigfileUploadInit(self, to, inner_path, size):
valid_signers = self.site.content_manager.getValidSigners(inner_path)
auth_address = self.user.getAuthAddress(self.site.address)
if not self.site.settings["own"] and auth_address not in valid_signers:
self.log.error("FileWrite forbidden %s not in valid_signers %s" % (auth_address, valid_signers))
return self.response(to, {"error": "Forbidden, you can only modify your own files"})
nonce = CryptHash.random()
piece_size = 1024 * 1024
inner_path = self.site.content_manager.sanitizePath(inner_path)
file_info = self.site.content_manager.getFileInfo(inner_path)
content_inner_path_dir = helper.getDirname(file_info["content_inner_path"])
file_relative_path = inner_path[len(content_inner_path_dir):]
upload_nonces[nonce] = {
"added": time.time(),
"site": self.site,
"inner_path": inner_path,
"websocket_client": self,
"size": size,
"piece_size": piece_size,
"piecemap": inner_path + ".piecemap.msgpack"
}
self.response(to, {
"url": "/ZeroNet-Internal/BigfileUpload?upload_nonce=" + nonce,
"pice_size": piece_size,
"inner_path": inner_path,
"file_relative_path": file_relative_path
})
@PluginManager.registerTo("ContentManager")
class ContentManagerPlugin(object):
def getFileInfo(self, inner_path):
if "|" not in inner_path:
return super(ContentManagerPlugin, self).getFileInfo(inner_path)
inner_path, file_range = inner_path.split("|")
pos_from, pos_to = map(int, file_range.split("-"))
file_info = super(ContentManagerPlugin, self).getFileInfo(inner_path)
return file_info
def readFile(self, file_in, size, buff_size=1024 * 64):
part_num = 0
recv_left = size
while 1:
part_num += 1
read_size = min(buff_size, recv_left)
part = file_in.read(read_size)
if not part:
break
yield part
if part_num % 100 == 0: # Avoid blocking ZeroNet execution during upload
time.sleep(0.001)
recv_left -= read_size
if recv_left <= 0:
break
def hashBigfile(self, file_in, size, piece_size=1024 * 1024, file_out=None):
self.site.settings["has_bigfile"] = True
recv = 0
try:
piece_hash = CryptHash.sha512t()
piece_hashes = []
piece_recv = 0
mt = merkletools.MerkleTools()
mt.hash_function = CryptHash.sha512t
part = ""
for part in self.readFile(file_in, size):
if file_out:
file_out.write(part)
recv += len(part)
piece_recv += len(part)
piece_hash.update(part)
if piece_recv >= piece_size:
piece_digest = piece_hash.digest()
piece_hashes.append(piece_digest)
mt.leaves.append(piece_digest)
piece_hash = CryptHash.sha512t()
piece_recv = 0
if len(piece_hashes) % 100 == 0 or recv == size:
self.log.info("- [HASHING:%.0f%%] Pieces: %s, %.1fMB/%.1fMB" % (
float(recv) / size * 100, len(piece_hashes), recv / 1024 / 1024, size / 1024 / 1024
))
part = ""
if len(part) > 0:
piece_digest = piece_hash.digest()
piece_hashes.append(piece_digest)
mt.leaves.append(piece_digest)
except Exception as err:
raise err
finally:
if file_out:
file_out.close()
mt.make_tree()
return mt.get_merkle_root(), piece_size, {
"sha512_pieces": piece_hashes
}
def hashFile(self, dir_inner_path, file_relative_path, optional=False):
inner_path = dir_inner_path + file_relative_path
file_size = self.site.storage.getSize(inner_path)
# Only care about optional files >1MB
if not optional or file_size < 1 * 1024 * 1024:
return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional)
back = {}
content = self.contents.get(dir_inner_path + "content.json")
hash = None
piecemap_relative_path = None
piece_size = None
# Don't re-hash if it's already in content.json
if content and file_relative_path in content.get("files_optional"):
file_node = content["files_optional"][file_relative_path]
if file_node["size"] == file_size:
self.log.info("- [SAME SIZE] %s" % file_relative_path)
hash = file_node.get("sha512")
piecemap_relative_path = file_node.get("piecemap")
piece_size = file_node.get("piece_size")
if not hash or not piecemap_relative_path: # Not in content.json yet
if file_size < 5 * 1024 * 1024: # Don't create piecemap automatically for files smaller than 5MB
return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional)
self.log.info("- [HASHING] %s" % file_relative_path)
merkle_root, piece_size, piecemap_info = self.hashBigfile(self.site.storage.open(inner_path, "rb"), file_size)
if not hash:
hash = merkle_root
if not piecemap_relative_path:
file_name = helper.getFilename(file_relative_path)
piecemap_relative_path = file_relative_path + ".piecemap.msgpack"
piecemap_inner_path = inner_path + ".piecemap.msgpack"
msgpack.pack({file_name: piecemap_info}, self.site.storage.open(piecemap_inner_path, "wb"))
back.update(super(ContentManagerPlugin, self).hashFile(dir_inner_path, piecemap_relative_path, True))
piece_num = int(math.ceil(float(file_size) / piece_size))
# Add the merkle root to hashfield
self.optionalDownloaded(inner_path, hash, file_size, own=True)
self.site.storage.piecefields[hash].fromstring("1" * piece_num)
back[file_relative_path] = {"sha512": hash, "size": file_size, "piecemap": piecemap_relative_path, "piece_size": piece_size}
return back
def getPiecemap(self, inner_path):
file_info = self.site.content_manager.getFileInfo(inner_path)
piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"]
self.site.needFile(piecemap_inner_path, priority=20)
piecemap = msgpack.unpack(self.site.storage.open(piecemap_inner_path))[helper.getFilename(inner_path)]
piecemap["piece_size"] = file_info["piece_size"]
return piecemap
def verifyPiece(self, inner_path, pos, piece):
piecemap = self.getPiecemap(inner_path)
piece_i = pos / piecemap["piece_size"]
if CryptHash.sha512sum(piece, format="digest") != piecemap["sha512_pieces"][piece_i]:
raise VerifyError("Invalid hash")
return True
def verifyFile(self, inner_path, file, ignore_same=True):
if "|" not in inner_path:
return super(ContentManagerPlugin, self).verifyFile(inner_path, file, ignore_same)
inner_path, file_range = inner_path.split("|")
pos_from, pos_to = map(int, file_range.split("-"))
return self.verifyPiece(inner_path, pos_from, file)
def optionalDownloaded(self, inner_path, hash, size=None, own=False):
if "|" in inner_path:
inner_path, file_range = inner_path.split("|")
pos_from, pos_to = map(int, file_range.split("-"))
file_info = self.getFileInfo(inner_path)
# Mark piece downloaded
piece_i = pos_from / file_info["piece_size"]
self.site.storage.piecefields[file_info["sha512"]][piece_i] = True
# Only add to site size on first request
if hash in self.hashfield:
size = 0
return super(ContentManagerPlugin, self).optionalDownloaded(inner_path, hash, size, own)
def optionalRemove(self, inner_path, hash, size=None):
if size and size > 1024 * 1024:
file_info = self.getFileInfo(inner_path)
sha512 = file_info["sha512"]
if sha512 in self.site.storage.piecefields:
del self.site.storage.piecefields[sha512]
return super(ContentManagerPlugin, self).optionalRemove(inner_path, hash, size)
@PluginManager.registerTo("SiteStorage")
class SiteStoragePlugin(object):
def __init__(self, *args, **kwargs):
super(SiteStoragePlugin, self).__init__(*args, **kwargs)
self.piecefields = collections.defaultdict(BigfilePiecefield)
if "piecefields" in self.site.settings.get("cache", {}):
for sha512, piecefield_packed in self.site.settings["cache"].get("piecefields").iteritems():
self.piecefields[sha512].unpack(piecefield_packed.decode("base64"))
self.site.settings["cache"]["piecefields"] = {}
def createSparseFile(self, inner_path, size, sha512=None):
file_path = self.getPath(inner_path)
f = open(file_path, 'wb')
f.truncate(size)
f.close()
if os.name == "nt":
subprocess.call(["fsutil", "sparse", "setflag", file_path])
if sha512 and sha512 in self.piecefields:
self.log.debug("%s: File not exists, but has piecefield. Deleting piecefield." % inner_path)
del self.piecefields[sha512]
def write(self, inner_path, content):
if "|" not in inner_path:
return super(SiteStoragePlugin, self).write(inner_path, content)
# Write to specific position by passing |{pos} after the filename
inner_path, file_range = inner_path.split("|")
pos_from, pos_to = map(int, file_range.split("-"))
file_path = self.getPath(inner_path)
# Create dir if not exist
file_dir = os.path.dirname(file_path)
if not os.path.isdir(file_dir):
os.makedirs(file_dir)
if not os.path.isfile(file_path):
file_info = self.site.content_manager.getFileInfo(inner_path)
self.createSparseFile(inner_path, file_info["size"])
# Write file
with open(file_path, "rb+") as file:
file.seek(pos_from)
if hasattr(content, 'read'): # File-like object
shutil.copyfileobj(content, file) # Write buff to disk
else: # Simple string
file.write(content)
del content
self.onUpdated(inner_path)
def openBigfile(self, inner_path, prebuffer=0):
file_info = self.site.content_manager.getFileInfo(inner_path)
if "piecemap" not in file_info: # It's not a big file
return False
self.site.needFile(inner_path, blocking=False) # Download piecemap
file_path = self.getPath(inner_path)
sha512 = file_info["sha512"]
piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"]))
if os.path.isfile(file_path):
if sha512 not in self.piecefields:
if open(file_path).read(128) == "\0" * 128:
piece_data = "0"
else:
piece_data = "1"
self.log.debug("%s: File exists, but not in piecefield. Filling piecefiled with %s * %s." % (inner_path, piece_num, piece_data))
self.piecefields[sha512].fromstring(piece_data * piece_num)
else:
self.log.debug("Creating bigfile: %s" % inner_path)
self.createSparseFile(inner_path, file_info["size"], sha512)
self.piecefields[sha512].fromstring(piece_data * "0")
return BigFile(self.site, inner_path, prebuffer=prebuffer)
class BigFile(object):
def __init__(self, site, inner_path, prebuffer=0):
self.site = site
self.inner_path = inner_path
file_path = site.storage.getPath(inner_path)
file_info = self.site.content_manager.getFileInfo(inner_path)
self.piece_size = file_info["piece_size"]
self.sha512 = file_info["sha512"]
self.size = file_info["size"]
self.prebuffer = prebuffer
self.read_bytes = 0
self.piecefield = self.site.storage.piecefields[self.sha512]
self.f = open(file_path, "rb+")
def read(self, buff=64 * 1024):
pos = self.f.tell()
read_until = pos + buff
requests = []
# Request all required blocks
while 1:
piece_i = pos / self.piece_size
if piece_i * self.piece_size >= read_until:
break
pos_from = piece_i * self.piece_size
pos_to = pos_from + self.piece_size
if not self.piecefield[piece_i]:
requests.append(self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=10))
pos += self.piece_size
if not all(requests):
return None
# Request prebuffer
if self.prebuffer:
prebuffer_until = min(self.size, read_until + self.prebuffer)
priority = 3
while 1:
piece_i = pos / self.piece_size
if piece_i * self.piece_size >= prebuffer_until:
break
pos_from = piece_i * self.piece_size
pos_to = pos_from + self.piece_size
if not self.piecefield[piece_i]:
self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=max(0, priority))
priority -= 1
pos += self.piece_size
gevent.joinall(requests)
self.read_bytes += buff
# Increase buffer for long reads
if self.read_bytes > 7 * 1024 * 1024 and self.prebuffer < 5 * 1024 * 1024:
self.site.log.debug("%s: Increasing bigfile buffer size to 5MB..." % self.inner_path)
self.prebuffer = 5 * 1024 * 1024
return self.f.read(buff)
def seek(self, pos):
return self.f.seek(pos)
def tell(self):
self.f.tell()
def close(self):
self.f.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
@PluginManager.registerTo("WorkerManager")
class WorkerManagerPlugin(object):
def addTask(self, inner_path, *args, **kwargs):
file_info = kwargs.get("file_info")
if file_info and "piecemap" in file_info: # Bigfile
self.site.settings["has_bigfile"] = True
piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"]
piecemap_task = None
if not self.site.storage.isFile(piecemap_inner_path):
# Start download piecemap
piecemap_task = super(WorkerManagerPlugin, self).addTask(piecemap_inner_path, priority=30)
if "|" not in inner_path and self.site.isDownloadable(inner_path) and file_info["size"] / 1024 / 1024 <= config.autodownload_bigfile_size_limit:
gevent.spawn_later(0.1, self.site.needFile, inner_path + "|all") # Download all pieces
if "|" in inner_path:
# Start download piece
task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs)
inner_path, file_range = inner_path.split("|")
pos_from, pos_to = map(int, file_range.split("-"))
task["piece_i"] = pos_from / file_info["piece_size"]
task["sha512"] = file_info["sha512"]
else:
if inner_path in self.site.bad_files:
del self.site.bad_files[inner_path]
if piecemap_task:
task = piecemap_task
else:
fake_evt = gevent.event.AsyncResult() # Don't download anything if no range specified
fake_evt.set(True)
task = {"evt": fake_evt}
if not self.site.storage.isFile(inner_path):
self.site.storage.createSparseFile(inner_path, file_info["size"], file_info["sha512"])
piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"]))
self.site.storage.piecefields[file_info["sha512"]].fromstring("0" * piece_num)
else:
task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs)
return task
def taskAddPeer(self, task, peer):
if "piece_i" in task:
if not peer.piecefields[task["sha512"]][task["piece_i"]]:
if task["sha512"] not in peer.piecefields:
gevent.spawn(peer.updatePiecefields, force=True)
elif not task["peers"]:
gevent.spawn(peer.updatePiecefields)
return False # Deny to add peers to task if file not in piecefield
return super(WorkerManagerPlugin, self).taskAddPeer(task, peer)
@PluginManager.registerTo("FileRequest")
class FileRequestPlugin(object):
def isReadable(self, site, inner_path, file, pos):
# Peek into file
if file.read(10) == "\0" * 10:
# Looks empty, but makes sures we don't have that piece
file_info = site.content_manager.getFileInfo(inner_path)
piece_i = pos / file_info["piece_size"]
if not site.storage.piecefields[file_info["sha512"]][piece_i]:
return False
# Seek back to position we want to read
file.seek(pos)
return super(FileRequestPlugin, self).isReadable(site, inner_path, file, pos)
def actionGetPiecefields(self, params):
site = self.sites.get(params["site"])
if not site or not site.settings["serving"]: # Site unknown or not serving
self.response({"error": "Unknown site"})
return False
# Add peer to site if not added before
peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True)
if not peer.connection: # Just added
peer.connect(self.connection) # Assign current connection to peer
piecefields_packed = {sha512: piecefield.pack() for sha512, piecefield in site.storage.piecefields.iteritems()}
self.response({"piecefields_packed": piecefields_packed})
def actionSetPiecefields(self, params):
site = self.sites.get(params["site"])
if not site or not site.settings["serving"]: # Site unknown or not serving
self.response({"error": "Unknown site"})
self.connection.badAction(5)
return False
# Add or get peer
peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, connection=self.connection)
if not peer.connection:
peer.connect(self.connection)
peer.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
for sha512, piecefield_packed in params["piecefields_packed"].iteritems():
peer.piecefields[sha512].unpack(piecefield_packed)
site.settings["has_bigfile"] = True
self.response({"ok": "Updated"})
@PluginManager.registerTo("Peer")
class PeerPlugin(object):
def __getattr__(self, key):
if key == "piecefields":
self.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
return self.piecefields
elif key == "time_piecefields_updated":
self.time_piecefields_updated = None
return self.time_piecefields_updated
else:
return super(PeerPlugin, self).__getattr__(key)
@util.Noparallel(ignore_args=True)
def updatePiecefields(self, force=False):
if self.connection and self.connection.handshake.get("rev", 0) < 2190:
return False # Not supported
# Don't update piecefield again in 1 min
if self.time_piecefields_updated and time.time() - self.time_piecefields_updated < 60 and not force:
return False
self.time_piecefields_updated = time.time()
res = self.request("getPiecefields", {"site": self.site.address})
if not res or "error" in res:
return False
self.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
for sha512, piecefield_packed in res["piecefields_packed"].iteritems():
self.piecefields[sha512].unpack(piecefield_packed)
return self.piecefields
def sendMyHashfield(self, *args, **kwargs):
return super(PeerPlugin, self).sendMyHashfield(*args, **kwargs)
def updateHashfield(self, *args, **kwargs):
if self.site.settings.get("has_bigfile"):
thread = gevent.spawn(self.updatePiecefields, *args, **kwargs)
back = super(PeerPlugin, self).updateHashfield(*args, **kwargs)
thread.join()
return back
else:
return super(PeerPlugin, self).updateHashfield(*args, **kwargs)
def getFile(self, site, inner_path, *args, **kwargs):
if "|" in inner_path:
inner_path, file_range = inner_path.split("|")
pos_from, pos_to = map(int, file_range.split("-"))
kwargs["pos_from"] = pos_from
kwargs["pos_to"] = pos_to
return super(PeerPlugin, self).getFile(site, inner_path, *args, **kwargs)
@PluginManager.registerTo("Site")
class SitePlugin(object):
def isFileDownloadAllowed(self, inner_path, file_info):
if "piecemap" in file_info:
file_info = file_info.copy()
file_info["size"] = file_info["piece_size"]
return super(SitePlugin, self).isFileDownloadAllowed(inner_path, file_info)
def getSettingsCache(self):
back = super(SitePlugin, self).getSettingsCache()
if self.storage.piecefields:
back["piecefields"] = {sha512: piecefield.pack().encode("base64") for sha512, piecefield in self.storage.piecefields.iteritems()}
return back
def needFile(self, inner_path, *args, **kwargs):
if inner_path.endswith("|all"):
@util.Pooled(20)
def pooledNeedBigfile(*args, **kwargs):
return self.needFile(*args, **kwargs)
inner_path = inner_path.replace("|all", "")
file_info = self.needFileInfo(inner_path)
file_size = file_info["size"]
piece_size = file_info["piece_size"]
piece_num = int(math.ceil(float(file_size) / piece_size))
file_threads = []
piecefield = self.storage.piecefields.get(file_info["sha512"])
for piece_i in range(piece_num):
piece_from = piece_i * piece_size
piece_to = min(file_size, piece_from + piece_size)
if not piecefield or not piecefield[piece_i]:
res = pooledNeedBigfile("%s|%s-%s" % (inner_path, piece_from, piece_to), blocking=False)
if res is not True and res is not False:
file_threads.append(res)
gevent.joinall(file_threads)
else:
return super(SitePlugin, self).needFile(inner_path, *args, **kwargs)
@PluginManager.registerTo("ConfigPlugin")
class ConfigPlugin(object):
def createArguments(self):
group = self.parser.add_argument_group("Bigfile plugin")
group.add_argument('--autodownload_bigfile_size_limit', help='Also download bigfiles until this limit if help distribute option is checked', default=1, metavar="MB", type=int)
return super(ConfigPlugin, self).createArguments()

View File

@ -0,0 +1,467 @@
import time
import os
from cStringIO import StringIO
import pytest
import msgpack
import mock
from lib import merkletools
from Connection import ConnectionServer
from Content.ContentManager import VerifyError
from File import FileServer
from File import FileRequest
from Worker import WorkerManager
from Peer import Peer
from Bigfile import BigfilePiecefield, BigfilePiecefieldPacked
from Test import Spy
@pytest.mark.usefixtures("resetSettings")
@pytest.mark.usefixtures("resetTempSettings")
class TestBigfile:
privatekey = "5KUh3PvNm5HUWoCfSUfcYvfQ2g3PrRNJWr6Q9eqdBGu23mtMntv"
def createBigfile(self, site, inner_path="data/optional.any.iso", pieces=10):
f = site.storage.open(inner_path, "w")
for i in range(pieces * 100):
f.write(("Test%s" % i).ljust(10, "-") * 1000)
f.close()
assert site.content_manager.sign("content.json", self.privatekey)
return inner_path
def testPiecemapCreate(self, site):
inner_path = self.createBigfile(site)
content = site.storage.loadJson("content.json")
assert "data/optional.any.iso" in content["files_optional"]
file_node = content["files_optional"][inner_path]
assert file_node["size"] == 10 * 1000 * 1000
assert file_node["sha512"] == "47a72cde3be80b4a829e7674f72b7c6878cf6a70b0c58c6aa6c17d7e9948daf6"
assert file_node["piecemap"] == inner_path + ".piecemap.msgpack"
piecemap = msgpack.unpack(site.storage.open(file_node["piecemap"], "rb"))["optional.any.iso"]
assert len(piecemap["sha512_pieces"]) == 10
assert piecemap["sha512_pieces"][0] != piecemap["sha512_pieces"][1]
assert piecemap["sha512_pieces"][0].encode("hex") == "a73abad9992b3d0b672d0c2a292046695d31bebdcb1e150c8410bbe7c972eff3"
def testVerifyPiece(self, site):
inner_path = self.createBigfile(site)
# Verify all 10 piece
f = site.storage.open(inner_path, "rb")
for i in range(10):
piece = StringIO(f.read(1024 * 1024))
piece.seek(0)
site.content_manager.verifyPiece(inner_path, i * 1024 * 1024, piece)
f.close()
# Try to verify piece 0 with piece 1 hash
with pytest.raises(VerifyError) as err:
i = 1
f = site.storage.open(inner_path, "rb")
piece = StringIO(f.read(1024 * 1024))
f.close()
site.content_manager.verifyPiece(inner_path, i * 1024 * 1024, piece)
assert "Invalid hash" in str(err)
def testSparseFile(self, site):
inner_path = "sparsefile"
# Create a 100MB sparse file
site.storage.createSparseFile(inner_path, 100 * 1024 * 1024)
# Write to file beginning
s = time.time()
f = site.storage.write("%s|%s-%s" % (inner_path, 0, 1024 * 1024), "hellostart" * 1024)
time_write_start = time.time() - s
# Write to file end
s = time.time()
f = site.storage.write("%s|%s-%s" % (inner_path, 99 * 1024 * 1024, 99 * 1024 * 1024 + 1024 * 1024), "helloend" * 1024)
time_write_end = time.time() - s
# Verify writes
f = site.storage.open(inner_path)
assert f.read(10) == "hellostart"
f.seek(99 * 1024 * 1024)
assert f.read(8) == "helloend"
f.close()
site.storage.delete(inner_path)
# Writing to end shold not take much longer, than writing to start
assert time_write_end <= max(0.1, time_write_start * 1.1)
def testRangedFileRequest(self, file_server, site, site_temp):
inner_path = self.createBigfile(site)
file_server.sites[site.address] = site
client = FileServer("127.0.0.1", 1545)
client.sites[site_temp.address] = site_temp
site_temp.connection_server = client
connection = client.getConnection("127.0.0.1", 1544)
# Add file_server as peer to client
peer_file_server = site_temp.addPeer("127.0.0.1", 1544)
buff = peer_file_server.getFile(site_temp.address, "%s|%s-%s" % (inner_path, 5 * 1024 * 1024, 6 * 1024 * 1024))
assert len(buff.getvalue()) == 1 * 1024 * 1024 # Correct block size
assert buff.getvalue().startswith("Test524") # Correct data
buff.seek(0)
assert site.content_manager.verifyPiece(inner_path, 5 * 1024 * 1024, buff) # Correct hash
connection.close()
client.stop()
def testRangedFileDownload(self, file_server, site, site_temp):
inner_path = self.createBigfile(site)
# Init source server
site.connection_server = file_server
file_server.sites[site.address] = site
# Make sure the file and the piecemap in the optional hashfield
file_info = site.content_manager.getFileInfo(inner_path)
assert site.content_manager.hashfield.hasHash(file_info["sha512"])
piecemap_hash = site.content_manager.getFileInfo(file_info["piecemap"])["sha512"]
assert site.content_manager.hashfield.hasHash(piecemap_hash)
# Init client server
client = ConnectionServer("127.0.0.1", 1545)
site_temp.connection_server = client
peer_client = site_temp.addPeer("127.0.0.1", 1544)
# Download site
site_temp.download(blind_includes=True).join(timeout=5)
bad_files = site_temp.storage.verifyFiles(quick_check=True)
assert not bad_files
# client_piecefield = peer_client.piecefields[file_info["sha512"]].tostring()
# assert client_piecefield == "1" * 10
# Download 5. and 10. block
site_temp.needFile("%s|%s-%s" % (inner_path, 5 * 1024 * 1024, 6 * 1024 * 1024))
site_temp.needFile("%s|%s-%s" % (inner_path, 9 * 1024 * 1024, 10 * 1024 * 1024))
# Verify 0. block not downloaded
f = site_temp.storage.open(inner_path)
assert f.read(10) == "\0" * 10
# Verify 5. and 10. block downloaded
f.seek(5 * 1024 * 1024)
assert f.read(7) == "Test524"
f.seek(9 * 1024 * 1024)
assert f.read(7) == "943---T"
# Verify hashfield
assert set(site_temp.content_manager.hashfield) == set([18343, 30970]) # 18343: data/optional.any.iso, 30970: data/optional.any.iso.hashmap.msgpack
def testOpenBigfile(self, file_server, site, site_temp):
inner_path = self.createBigfile(site)
# Init source server
site.connection_server = file_server
file_server.sites[site.address] = site
# Init client server
client = ConnectionServer("127.0.0.1", 1545)
site_temp.connection_server = client
site_temp.addPeer("127.0.0.1", 1544)
# Download site
site_temp.download(blind_includes=True).join(timeout=5)
# Open virtual file
assert not site_temp.storage.isFile(inner_path)
with site_temp.storage.openBigfile(inner_path) as f:
with Spy.Spy(FileRequest, "route") as requests:
f.seek(5 * 1024 * 1024)
assert f.read(7) == "Test524"
f.seek(9 * 1024 * 1024)
assert f.read(7) == "943---T"
assert len(requests) == 4 # 1x peicemap + 1x getpiecefield + 2x for pieces
assert set(site_temp.content_manager.hashfield) == set([18343, 30970])
assert site_temp.storage.piecefields[f.sha512].tostring() == "0000010001"
assert f.sha512 in site_temp.getSettingsCache()["piecefields"]
# Test requesting already downloaded
with Spy.Spy(FileRequest, "route") as requests:
f.seek(5 * 1024 * 1024)
assert f.read(7) == "Test524"
assert len(requests) == 0
# Test requesting multi-block overflow reads
with Spy.Spy(FileRequest, "route") as requests:
f.seek(5 * 1024 * 1024) # We already have this block
data = f.read(1024 * 1024 * 3) # Our read overflow to 6. and 7. block
assert data.startswith("Test524")
assert data.endswith("Test838-")
assert "\0" not in data # No null bytes allowed
assert len(requests) == 2 # Two block download
@pytest.mark.parametrize("piecefield_obj", [BigfilePiecefield, BigfilePiecefieldPacked])
def testPiecefield(self, piecefield_obj, site):
testdatas = [
"1" * 100 + "0" * 900 + "1" * 4000 + "0" * 4999 + "1",
"010101" * 10 + "01" * 90 + "10" * 400 + "0" * 4999,
"1" * 10000,
"0" * 10000
]
for testdata in testdatas:
piecefield = piecefield_obj()
piecefield.fromstring(testdata)
assert piecefield.tostring() == testdata
assert piecefield[0] == int(testdata[0])
assert piecefield[100] == int(testdata[100])
assert piecefield[1000] == int(testdata[1000])
assert piecefield[len(testdata)-1] == int(testdata[len(testdata)-1])
packed = piecefield.pack()
piecefield_new = piecefield_obj()
piecefield_new.unpack(packed)
assert piecefield.tostring() == piecefield_new.tostring()
assert piecefield_new.tostring() == testdata
def testFileGet(self, file_server, site, site_temp):
inner_path = self.createBigfile(site)
# Init source server
site.connection_server = file_server
file_server.sites[site.address] = site
# Init client server
site_temp.connection_server = FileServer("127.0.0.1", 1545)
site_temp.connection_server.sites[site_temp.address] = site_temp
site_temp.addPeer("127.0.0.1", 1544)
# Download site
site_temp.download(blind_includes=True).join(timeout=5)
# Download second block
with site_temp.storage.openBigfile(inner_path) as f:
f.seek(1024 * 1024)
assert f.read(1024)[0] != "\0"
# Make sure first block not download
with site_temp.storage.open(inner_path) as f:
assert f.read(1024)[0] == "\0"
peer2 = site.addPeer("127.0.0.1", 1545, return_peer=True)
# Should drop error on first block request
assert not peer2.getFile(site.address, "%s|0-%s" % (inner_path, 1024 * 1024 * 1))
# Should not drop error for second block request
assert peer2.getFile(site.address, "%s|%s-%s" % (inner_path, 1024 * 1024 * 1, 1024 * 1024 * 2))
def benchmarkPeerMemory(self, site, file_server):
# Init source server
site.connection_server = file_server
file_server.sites[site.address] = site
import psutil, os
meminfo = psutil.Process(os.getpid()).memory_info
mem_s = meminfo()[0]
s = time.time()
for i in range(25000):
site.addPeer("127.0.0.1", i)
print "%.3fs MEM: + %sKB" % (time.time() - s, (meminfo()[0] - mem_s) / 1024) # 0.082s MEM: + 6800KB
print site.peers.values()[0].piecefields
def testUpdatePiecefield(self, file_server, site, site_temp):
inner_path = self.createBigfile(site)
server1 = file_server
server1.sites[site.address] = site
server2 = FileServer("127.0.0.1", 1545)
server2.sites[site_temp.address] = site_temp
site_temp.connection_server = server2
# Add file_server as peer to client
server2_peer1 = site_temp.addPeer("127.0.0.1", 1544)
# Testing piecefield sync
assert len(server2_peer1.piecefields) == 0
assert server2_peer1.updatePiecefields() # Query piecefields from peer
assert len(server2_peer1.piecefields) > 0
def testWorkerManagerPiecefieldDeny(self, file_server, site, site_temp):
inner_path = self.createBigfile(site)
server1 = file_server
server1.sites[site.address] = site
server2 = FileServer("127.0.0.1", 1545)
server2.sites[site_temp.address] = site_temp
site_temp.connection_server = server2
# Add file_server as peer to client
server2_peer1 = site_temp.addPeer("127.0.0.1", 1544) # Working
site_temp.downloadContent("content.json", download_files=False)
site_temp.needFile("data/optional.any.iso.piecemap.msgpack")
# Add fake peers with optional files downloaded
for i in range(5):
fake_peer = site_temp.addPeer("127.0.1.%s" % i, 1544)
fake_peer.hashfield = site.content_manager.hashfield
fake_peer.has_hashfield = True
with Spy.Spy(WorkerManager, "addWorker") as requests:
site_temp.needFile("%s|%s-%s" % (inner_path, 5 * 1024 * 1024, 6 * 1024 * 1024))
site_temp.needFile("%s|%s-%s" % (inner_path, 6 * 1024 * 1024, 7 * 1024 * 1024))
# It should only request parts from peer1 as the other peers does not have the requested parts in piecefields
assert len([request[1] for request in requests if request[1] != server2_peer1]) == 0
def testWorkerManagerPiecefieldDownload(self, file_server, site, site_temp):
inner_path = self.createBigfile(site)
server1 = file_server
server1.sites[site.address] = site
server2 = FileServer("127.0.0.1", 1545)
server2.sites[site_temp.address] = site_temp
site_temp.connection_server = server2
sha512 = site.content_manager.getFileInfo(inner_path)["sha512"]
# Create 10 fake peer for each piece
for i in range(10):
peer = Peer("127.0.0.1", 1544, site_temp, server2)
peer.piecefields[sha512][i] = "1"
peer.updateHashfield = mock.MagicMock(return_value=False)
peer.updatePiecefields = mock.MagicMock(return_value=False)
peer.findHashIds = mock.MagicMock(return_value={"nope": []})
peer.hashfield = site.content_manager.hashfield
peer.has_hashfield = True
peer.key = "Peer:%s" % i
site_temp.peers["Peer:%s" % i] = peer
site_temp.downloadContent("content.json", download_files=False)
site_temp.needFile("data/optional.any.iso.piecemap.msgpack")
with Spy.Spy(Peer, "getFile") as requests:
for i in range(10):
site_temp.needFile("%s|%s-%s" % (inner_path, i * 1024 * 1024, (i + 1) * 1024 * 1024))
assert len(requests) == 10
for i in range(10):
assert requests[i][0] == site_temp.peers["Peer:%s" % i] # Every part should be requested from piece owner peer
def testDownloadStats(self, file_server, site, site_temp):
inner_path = self.createBigfile(site)
# Init source server
site.connection_server = file_server
file_server.sites[site.address] = site
# Init client server
client = ConnectionServer("127.0.0.1", 1545)
site_temp.connection_server = client
site_temp.addPeer("127.0.0.1", 1544)
# Download site
site_temp.download(blind_includes=True).join(timeout=5)
# Open virtual file
assert not site_temp.storage.isFile(inner_path)
# Check size before downloads
assert site_temp.settings["size"] < 10 * 1024 * 1024
assert site_temp.settings["optional_downloaded"] == 0
size_piecemap = site_temp.content_manager.getFileInfo(inner_path + ".piecemap.msgpack")["size"]
size_bigfile = site_temp.content_manager.getFileInfo(inner_path)["size"]
with site_temp.storage.openBigfile(inner_path) as f:
assert not "\0" in f.read(1024)
assert site_temp.settings["optional_downloaded"] == size_piecemap + size_bigfile
with site_temp.storage.openBigfile(inner_path) as f:
# Don't count twice
assert not "\0" in f.read(1024)
assert site_temp.settings["optional_downloaded"] == size_piecemap + size_bigfile
# Add second block
assert not "\0" in f.read(1024 * 1024)
assert site_temp.settings["optional_downloaded"] == size_piecemap + size_bigfile
def testPrebuffer(self, file_server, site, site_temp):
inner_path = self.createBigfile(site)
# Init source server
site.connection_server = file_server
file_server.sites[site.address] = site
# Init client server
client = ConnectionServer("127.0.0.1", 1545)
site_temp.connection_server = client
site_temp.addPeer("127.0.0.1", 1544)
# Download site
site_temp.download(blind_includes=True).join(timeout=5)
# Open virtual file
assert not site_temp.storage.isFile(inner_path)
with site_temp.storage.openBigfile(inner_path, prebuffer=1024 * 1024 * 2) as f:
with Spy.Spy(FileRequest, "route") as requests:
f.seek(5 * 1024 * 1024)
assert f.read(7) == "Test524"
# assert len(requests) == 3 # 1x piecemap + 1x getpiecefield + 1x for pieces
assert len([task for task in site_temp.worker_manager.tasks if task["inner_path"].startswith(inner_path)]) == 2
time.sleep(0.5) # Wait prebuffer download
sha512 = site.content_manager.getFileInfo(inner_path)["sha512"]
assert site_temp.storage.piecefields[sha512].tostring() == "0000011100"
# No prebuffer beyond end of the file
f.seek(9 * 1024 * 1024)
assert "\0" not in f.read(7)
assert len([task for task in site_temp.worker_manager.tasks if task["inner_path"].startswith(inner_path)]) == 0
def testDownloadAllPieces(self, file_server, site, site_temp):
inner_path = self.createBigfile(site)
# Init source server
site.connection_server = file_server
file_server.sites[site.address] = site
# Init client server
client = ConnectionServer("127.0.0.1", 1545)
site_temp.connection_server = client
site_temp.addPeer("127.0.0.1", 1544)
# Download site
site_temp.download(blind_includes=True).join(timeout=5)
# Open virtual file
assert not site_temp.storage.isFile(inner_path)
with Spy.Spy(FileRequest, "route") as requests:
site_temp.needFile("%s|all" % inner_path)
assert len(requests) == 12 # piecemap.msgpack, getPiecefields, 10 x piece
# Don't re-download already got pieces
with Spy.Spy(FileRequest, "route") as requests:
site_temp.needFile("%s|all" % inner_path)
assert len(requests) == 0

View File

@ -0,0 +1 @@
from src.Test.conftest import *

View File

@ -0,0 +1,5 @@
[pytest]
python_files = Test*.py
addopts = -rsxX -v --durations=6
markers =
webtest: mark a test as a webtest.

View File

@ -0,0 +1,2 @@
import BigfilePlugin
from BigfilePiecefield import BigfilePiecefield, BigfilePiecefieldPacked