2017-10-04 13:28:59 +02:00
import time
import os
import subprocess
import shutil
import collections
import math
2019-03-16 02:13:17 +01:00
import warnings
2019-03-16 02:14:08 +01:00
import base64
2019-03-21 02:22:22 +01:00
import binascii
2019-03-23 03:38:04 +01:00
import json
2017-10-04 13:28:59 +02:00
2018-08-16 16:20:11 +02:00
import gevent
import gevent . lock
2017-10-04 13:28:59 +02:00
from Plugin import PluginManager
2017-10-10 14:58:13 +02:00
from Debug import Debug
2017-10-04 13:28:59 +02:00
from Crypt import CryptHash
2019-03-16 02:13:17 +01:00
with warnings . catch_warnings ( ) :
warnings . filterwarnings ( " ignore " ) # Ignore missing sha3 warning
import merkletools
2017-10-04 13:28:59 +02:00
from util import helper
2019-03-16 02:14:08 +01:00
from util import Msgpack
2019-08-26 03:20:07 +02:00
from util . Flag import flag
2017-10-04 13:28:59 +02:00
import util
2019-03-15 21:06:59 +01:00
from . BigfilePiecefield import BigfilePiecefield , BigfilePiecefieldPacked
2017-10-04 13:28:59 +02:00
# We can only import plugin host clases after the plugins are loaded
@PluginManager.afterLoad
def importPluginnedClasses ( ) :
global VerifyError , config
from Content . ContentManager import VerifyError
from Config import config
2020-01-22 16:31:35 +01:00
2017-10-04 13:28:59 +02:00
if " upload_nonces " not in locals ( ) :
upload_nonces = { }
@PluginManager.registerTo ( " UiRequest " )
class UiRequestPlugin ( object ) :
def isCorsAllowed ( self , path ) :
if path == " /ZeroNet-Internal/BigfileUpload " :
return True
else :
return super ( UiRequestPlugin , self ) . isCorsAllowed ( path )
2019-03-23 03:38:04 +01:00
@helper.encodeResponse
2017-10-04 13:28:59 +02:00
def actionBigfileUpload ( self ) :
nonce = self . get . get ( " upload_nonce " )
if nonce not in upload_nonces :
return self . error403 ( " Upload nonce error. " )
upload_info = upload_nonces [ nonce ]
del upload_nonces [ nonce ]
2018-03-06 11:58:56 +01:00
self . sendHeader ( 200 , " text/html " , noscript = True , extra_headers = {
" Access-Control-Allow-Origin " : " null " ,
" Access-Control-Allow-Credentials " : " true "
} )
2017-10-04 13:28:59 +02:00
self . readMultipartHeaders ( self . env [ ' wsgi.input ' ] ) # Skip http headers
2020-02-24 13:46:01 +01:00
result = self . handleBigfileUpload ( upload_info , self . env [ ' wsgi.input ' ] . read )
return json . dumps ( result )
2017-10-04 13:28:59 +02:00
2020-02-24 13:46:01 +01:00
def actionBigfileUploadWebsocket ( self ) :
ws = self . env . get ( " wsgi.websocket " )
if not ws :
self . start_response ( " 400 Bad Request " , [ ] )
return [ b " Not a websocket request! " ]
nonce = self . get . get ( " upload_nonce " )
if nonce not in upload_nonces :
return self . error403 ( " Upload nonce error. " )
upload_info = upload_nonces [ nonce ]
del upload_nonces [ nonce ]
ws . send ( " poll " )
buffer = b " "
def read ( size ) :
nonlocal buffer
while len ( buffer ) < size :
buffer + = ws . receive ( )
ws . send ( " poll " )
part , buffer = buffer [ : size ] , buffer [ size : ]
return part
result = self . handleBigfileUpload ( upload_info , read )
ws . send ( json . dumps ( result ) )
def handleBigfileUpload ( self , upload_info , read ) :
2017-10-04 13:28:59 +02:00
site = upload_info [ " site " ]
inner_path = upload_info [ " inner_path " ]
with site . storage . open ( inner_path , " wb " , create_dirs = True ) as out_file :
merkle_root , piece_size , piecemap_info = site . content_manager . hashBigfile (
2020-02-24 13:46:01 +01:00
read , upload_info [ " size " ] , upload_info [ " piece_size " ] , out_file
2017-10-04 13:28:59 +02:00
)
if len ( piecemap_info [ " sha512_pieces " ] ) == 1 : # Small file, don't split
2019-04-15 15:07:31 +02:00
hash = binascii . hexlify ( piecemap_info [ " sha512_pieces " ] [ 0 ] )
2018-04-04 10:24:38 +02:00
hash_id = site . content_manager . hashfield . getHashId ( hash )
2018-03-29 03:15:49 +02:00
site . content_manager . optionalDownloaded ( inner_path , hash_id , upload_info [ " size " ] , own = True )
2017-10-04 13:28:59 +02:00
else : # Big file
file_name = helper . getFilename ( inner_path )
2019-03-16 02:14:08 +01:00
site . storage . open ( upload_info [ " piecemap " ] , " wb " ) . write ( Msgpack . pack ( { file_name : piecemap_info } ) )
2017-10-04 13:28:59 +02:00
# Find piecemap and file relative path to content.json
2017-10-26 10:35:46 +02:00
file_info = site . content_manager . getFileInfo ( inner_path , new_file = True )
2017-10-04 13:28:59 +02:00
content_inner_path_dir = helper . getDirname ( file_info [ " content_inner_path " ] )
piecemap_relative_path = upload_info [ " piecemap " ] [ len ( content_inner_path_dir ) : ]
file_relative_path = inner_path [ len ( content_inner_path_dir ) : ]
# Add file to content.json
if site . storage . isFile ( file_info [ " content_inner_path " ] ) :
content = site . storage . loadJson ( file_info [ " content_inner_path " ] )
else :
content = { }
if " files_optional " not in content :
content [ " files_optional " ] = { }
content [ " files_optional " ] [ file_relative_path ] = {
" sha512 " : merkle_root ,
" size " : upload_info [ " size " ] ,
" piecemap " : piecemap_relative_path ,
" piece_size " : piece_size
}
2018-04-04 12:39:46 +02:00
merkle_root_hash_id = site . content_manager . hashfield . getHashId ( merkle_root )
2018-03-29 03:15:49 +02:00
site . content_manager . optionalDownloaded ( inner_path , merkle_root_hash_id , upload_info [ " size " ] , own = True )
2017-10-04 13:28:59 +02:00
site . storage . writeJson ( file_info [ " content_inner_path " ] , content )
2017-10-26 10:35:46 +02:00
site . content_manager . contents . loadItem ( file_info [ " content_inner_path " ] ) # reload cache
2020-02-24 13:46:01 +01:00
return {
2017-10-04 13:28:59 +02:00
" merkle_root " : merkle_root ,
" piece_num " : len ( piecemap_info [ " sha512_pieces " ] ) ,
" piece_size " : piece_size ,
" inner_path " : inner_path
2020-02-24 13:46:01 +01:00
}
2017-10-04 13:28:59 +02:00
def readMultipartHeaders ( self , wsgi_input ) :
2019-04-08 18:11:46 +02:00
found = False
2017-10-04 13:28:59 +02:00
for i in range ( 100 ) :
line = wsgi_input . readline ( )
2019-04-08 18:11:46 +02:00
if line == b " \r \n " :
found = True
2017-10-04 13:28:59 +02:00
break
2019-04-08 18:11:46 +02:00
if not found :
raise Exception ( " No multipart header found " )
2017-10-04 13:28:59 +02:00
return i
def actionFile ( self , file_path , * args , * * kwargs ) :
if kwargs . get ( " file_size " , 0 ) > 1024 * 1024 and kwargs . get ( " path_parts " ) : # Only check files larger than 1MB
path_parts = kwargs [ " path_parts " ]
site = self . server . site_manager . get ( path_parts [ " address " ] )
2018-07-16 01:34:48 +02:00
big_file = site . storage . openBigfile ( path_parts [ " inner_path " ] , prebuffer = 2 * 1024 * 1024 )
2018-07-16 17:32:18 +02:00
if big_file :
kwargs [ " file_obj " ] = big_file
kwargs [ " file_size " ] = big_file . size
2017-10-04 13:28:59 +02:00
return super ( UiRequestPlugin , self ) . actionFile ( file_path , * args , * * kwargs )
@PluginManager.registerTo ( " UiWebsocket " )
class UiWebsocketPlugin ( object ) :
2020-02-24 17:19:35 +01:00
def actionBigfileUploadInit ( self , to , inner_path , size , protocol = " xhr " ) :
2017-10-04 13:28:59 +02:00
valid_signers = self . site . content_manager . getValidSigners ( inner_path )
auth_address = self . user . getAuthAddress ( self . site . address )
if not self . site . settings [ " own " ] and auth_address not in valid_signers :
self . log . error ( " FileWrite forbidden %s not in valid_signers %s " % ( auth_address , valid_signers ) )
return self . response ( to , { " error " : " Forbidden, you can only modify your own files " } )
nonce = CryptHash . random ( )
piece_size = 1024 * 1024
inner_path = self . site . content_manager . sanitizePath ( inner_path )
2017-10-16 15:11:19 +02:00
file_info = self . site . content_manager . getFileInfo ( inner_path , new_file = True )
2017-10-04 13:28:59 +02:00
content_inner_path_dir = helper . getDirname ( file_info [ " content_inner_path " ] )
file_relative_path = inner_path [ len ( content_inner_path_dir ) : ]
upload_nonces [ nonce ] = {
" added " : time . time ( ) ,
" site " : self . site ,
" inner_path " : inner_path ,
" websocket_client " : self ,
" size " : size ,
" piece_size " : piece_size ,
" piecemap " : inner_path + " .piecemap.msgpack "
}
2020-02-24 13:46:01 +01:00
2020-02-24 17:19:35 +01:00
if protocol == " xhr " :
return {
" url " : " /ZeroNet-Internal/BigfileUpload?upload_nonce= " + nonce ,
" piece_size " : piece_size ,
" inner_path " : inner_path ,
" file_relative_path " : file_relative_path
}
elif protocol == " websocket " :
server_url = self . request . getWsServerUrl ( )
if server_url :
proto , host = server_url . split ( " :// " )
origin = proto . replace ( " http " , " ws " ) + " :// " + host
else :
origin = " {origin} "
return {
" url " : origin + " /ZeroNet-Internal/BigfileUploadWebsocket?upload_nonce= " + nonce ,
" piece_size " : piece_size ,
" inner_path " : inner_path ,
" file_relative_path " : file_relative_path
}
2020-02-24 13:46:01 +01:00
else :
2020-02-24 17:19:35 +01:00
return { " error " : " Unknown protocol " }
2020-02-24 13:46:01 +01:00
2019-08-26 03:20:07 +02:00
@flag.no_multiuser
2018-03-10 02:00:51 +01:00
def actionSiteSetAutodownloadBigfileLimit ( self , to , limit ) :
permissions = self . getPermissions ( to )
if " ADMIN " not in permissions :
return self . response ( to , " You don ' t have permission to run this command " )
self . site . settings [ " autodownload_bigfile_size_limit " ] = int ( limit )
self . response ( to , " ok " )
2018-03-20 21:58:51 +01:00
def actionFileDelete ( self , to , inner_path ) :
piecemap_inner_path = inner_path + " .piecemap.msgpack "
if self . hasFilePermission ( inner_path ) and self . site . storage . isFile ( piecemap_inner_path ) :
# Also delete .piecemap.msgpack file if exists
self . log . debug ( " Deleting piecemap: %s " % piecemap_inner_path )
file_info = self . site . content_manager . getFileInfo ( piecemap_inner_path )
if file_info :
content_json = self . site . storage . loadJson ( file_info [ " content_inner_path " ] )
relative_path = file_info [ " relative_path " ]
if relative_path in content_json . get ( " files_optional " , { } ) :
del content_json [ " files_optional " ] [ relative_path ]
self . site . storage . writeJson ( file_info [ " content_inner_path " ] , content_json )
self . site . content_manager . loadContent ( file_info [ " content_inner_path " ] , add_bad_files = False , force = True )
try :
self . site . storage . delete ( piecemap_inner_path )
2019-03-15 21:06:59 +01:00
except Exception as err :
2018-03-20 21:58:51 +01:00
self . log . error ( " File %s delete error: %s " % ( piecemap_inner_path , err ) )
return super ( UiWebsocketPlugin , self ) . actionFileDelete ( to , inner_path )
2017-10-04 13:28:59 +02:00
@PluginManager.registerTo ( " ContentManager " )
class ContentManagerPlugin ( object ) :
2017-10-16 15:11:19 +02:00
def getFileInfo ( self , inner_path , * args , * * kwargs ) :
2017-10-04 13:28:59 +02:00
if " | " not in inner_path :
2017-10-16 15:11:19 +02:00
return super ( ContentManagerPlugin , self ) . getFileInfo ( inner_path , * args , * * kwargs )
2017-10-04 13:28:59 +02:00
inner_path , file_range = inner_path . split ( " | " )
pos_from , pos_to = map ( int , file_range . split ( " - " ) )
2017-10-16 15:11:19 +02:00
file_info = super ( ContentManagerPlugin , self ) . getFileInfo ( inner_path , * args , * * kwargs )
2017-10-04 13:28:59 +02:00
return file_info
2020-02-24 13:46:01 +01:00
def readFile ( self , read_func , size , buff_size = 1024 * 64 ) :
2017-10-04 13:28:59 +02:00
part_num = 0
recv_left = size
while 1 :
part_num + = 1
read_size = min ( buff_size , recv_left )
2020-02-24 13:46:01 +01:00
part = read_func ( read_size )
2017-10-04 13:28:59 +02:00
if not part :
break
yield part
if part_num % 100 == 0 : # Avoid blocking ZeroNet execution during upload
time . sleep ( 0.001 )
recv_left - = read_size
if recv_left < = 0 :
break
2020-02-24 13:46:01 +01:00
def hashBigfile ( self , read_func , size , piece_size = 1024 * 1024 , file_out = None ) :
2017-10-04 13:28:59 +02:00
self . site . settings [ " has_bigfile " ] = True
recv = 0
try :
piece_hash = CryptHash . sha512t ( )
piece_hashes = [ ]
piece_recv = 0
mt = merkletools . MerkleTools ( )
mt . hash_function = CryptHash . sha512t
part = " "
2020-02-24 13:46:01 +01:00
for part in self . readFile ( read_func , size ) :
2017-10-04 13:28:59 +02:00
if file_out :
file_out . write ( part )
recv + = len ( part )
piece_recv + = len ( part )
piece_hash . update ( part )
if piece_recv > = piece_size :
piece_digest = piece_hash . digest ( )
piece_hashes . append ( piece_digest )
mt . leaves . append ( piece_digest )
piece_hash = CryptHash . sha512t ( )
piece_recv = 0
if len ( piece_hashes ) % 100 == 0 or recv == size :
self . log . info ( " - [HASHING: %.0f %% ] Pieces: %s , %.1f MB/ %.1f MB " % (
float ( recv ) / size * 100 , len ( piece_hashes ) , recv / 1024 / 1024 , size / 1024 / 1024
) )
part = " "
if len ( part ) > 0 :
piece_digest = piece_hash . digest ( )
piece_hashes . append ( piece_digest )
mt . leaves . append ( piece_digest )
except Exception as err :
raise err
finally :
if file_out :
file_out . close ( )
mt . make_tree ( )
2019-03-21 02:48:21 +01:00
merkle_root = mt . get_merkle_root ( )
if type ( merkle_root ) is bytes : # Python <3.5
merkle_root = merkle_root . decode ( )
return merkle_root , piece_size , {
2017-10-04 13:28:59 +02:00
" sha512_pieces " : piece_hashes
}
def hashFile ( self , dir_inner_path , file_relative_path , optional = False ) :
inner_path = dir_inner_path + file_relative_path
file_size = self . site . storage . getSize ( inner_path )
# Only care about optional files >1MB
if not optional or file_size < 1 * 1024 * 1024 :
return super ( ContentManagerPlugin , self ) . hashFile ( dir_inner_path , file_relative_path , optional )
back = { }
content = self . contents . get ( dir_inner_path + " content.json " )
hash = None
piecemap_relative_path = None
piece_size = None
# Don't re-hash if it's already in content.json
2017-10-13 11:27:45 +02:00
if content and file_relative_path in content . get ( " files_optional " , { } ) :
2017-10-04 13:28:59 +02:00
file_node = content [ " files_optional " ] [ file_relative_path ]
if file_node [ " size " ] == file_size :
self . log . info ( " - [SAME SIZE] %s " % file_relative_path )
hash = file_node . get ( " sha512 " )
piecemap_relative_path = file_node . get ( " piecemap " )
piece_size = file_node . get ( " piece_size " )
if not hash or not piecemap_relative_path : # Not in content.json yet
if file_size < 5 * 1024 * 1024 : # Don't create piecemap automatically for files smaller than 5MB
return super ( ContentManagerPlugin , self ) . hashFile ( dir_inner_path , file_relative_path , optional )
self . log . info ( " - [HASHING] %s " % file_relative_path )
2020-02-24 13:46:01 +01:00
merkle_root , piece_size , piecemap_info = self . hashBigfile ( self . site . storage . open ( inner_path , " rb " ) . read , file_size )
2017-10-04 13:28:59 +02:00
if not hash :
hash = merkle_root
if not piecemap_relative_path :
file_name = helper . getFilename ( file_relative_path )
piecemap_relative_path = file_relative_path + " .piecemap.msgpack "
piecemap_inner_path = inner_path + " .piecemap.msgpack "
2019-03-16 02:14:08 +01:00
self . site . storage . open ( piecemap_inner_path , " wb " ) . write ( Msgpack . pack ( { file_name : piecemap_info } ) )
2018-02-11 01:48:17 +01:00
back . update ( super ( ContentManagerPlugin , self ) . hashFile ( dir_inner_path , piecemap_relative_path , optional = True ) )
2017-10-04 13:28:59 +02:00
piece_num = int ( math . ceil ( float ( file_size ) / piece_size ) )
# Add the merkle root to hashfield
2018-03-29 03:15:49 +02:00
hash_id = self . site . content_manager . hashfield . getHashId ( hash )
self . optionalDownloaded ( inner_path , hash_id , file_size , own = True )
2019-04-16 15:14:19 +02:00
self . site . storage . piecefields [ hash ] . frombytes ( b " \x01 " * piece_num )
2017-10-04 13:28:59 +02:00
back [ file_relative_path ] = { " sha512 " : hash , " size " : file_size , " piecemap " : piecemap_relative_path , " piece_size " : piece_size }
return back
def getPiecemap ( self , inner_path ) :
file_info = self . site . content_manager . getFileInfo ( inner_path )
piecemap_inner_path = helper . getDirname ( file_info [ " content_inner_path " ] ) + file_info [ " piecemap " ]
self . site . needFile ( piecemap_inner_path , priority = 20 )
2019-04-08 18:11:46 +02:00
piecemap = Msgpack . unpack ( self . site . storage . open ( piecemap_inner_path , " rb " ) . read ( ) ) [ helper . getFilename ( inner_path ) ]
2017-10-04 13:28:59 +02:00
piecemap [ " piece_size " ] = file_info [ " piece_size " ]
return piecemap
def verifyPiece ( self , inner_path , pos , piece ) :
2020-01-22 16:31:35 +01:00
try :
piecemap = self . getPiecemap ( inner_path )
2020-06-30 17:02:39 +02:00
except Exception as err :
raise VerifyError ( " Unable to download piecemap: %s " % Debug . formatException ( err ) )
2020-01-22 16:31:35 +01:00
2019-03-15 21:06:59 +01:00
piece_i = int ( pos / piecemap [ " piece_size " ] )
2017-10-04 13:28:59 +02:00
if CryptHash . sha512sum ( piece , format = " digest " ) != piecemap [ " sha512_pieces " ] [ piece_i ] :
raise VerifyError ( " Invalid hash " )
return True
def verifyFile ( self , inner_path , file , ignore_same = True ) :
if " | " not in inner_path :
return super ( ContentManagerPlugin , self ) . verifyFile ( inner_path , file , ignore_same )
inner_path , file_range = inner_path . split ( " | " )
pos_from , pos_to = map ( int , file_range . split ( " - " ) )
return self . verifyPiece ( inner_path , pos_from , file )
2018-03-29 03:15:49 +02:00
def optionalDownloaded ( self , inner_path , hash_id , size = None , own = False ) :
2017-10-04 13:28:59 +02:00
if " | " in inner_path :
inner_path , file_range = inner_path . split ( " | " )
pos_from , pos_to = map ( int , file_range . split ( " - " ) )
file_info = self . getFileInfo ( inner_path )
# Mark piece downloaded
2019-03-15 21:06:59 +01:00
piece_i = int ( pos_from / file_info [ " piece_size " ] )
2019-04-16 15:14:19 +02:00
self . site . storage . piecefields [ file_info [ " sha512 " ] ] [ piece_i ] = b " \x01 "
2017-10-04 13:28:59 +02:00
# Only add to site size on first request
2018-03-29 03:15:49 +02:00
if hash_id in self . hashfield :
2017-10-04 13:28:59 +02:00
size = 0
2018-03-14 22:21:45 +01:00
elif size > 1024 * 1024 :
file_info = self . getFileInfo ( inner_path )
2018-03-19 02:57:54 +01:00
if file_info and " sha512 " in file_info : # We already have the file, but not in piecefield
2018-03-14 22:21:45 +01:00
sha512 = file_info [ " sha512 " ]
if sha512 not in self . site . storage . piecefields :
self . site . storage . checkBigfile ( inner_path )
2017-10-04 13:28:59 +02:00
2018-03-29 03:15:49 +02:00
return super ( ContentManagerPlugin , self ) . optionalDownloaded ( inner_path , hash_id , size , own )
2017-10-04 13:28:59 +02:00
2018-03-29 03:15:49 +02:00
def optionalRemoved ( self , inner_path , hash_id , size = None ) :
2017-10-04 13:28:59 +02:00
if size and size > 1024 * 1024 :
file_info = self . getFileInfo ( inner_path )
sha512 = file_info [ " sha512 " ]
if sha512 in self . site . storage . piecefields :
del self . site . storage . piecefields [ sha512 ]
2017-10-16 01:43:51 +02:00
# Also remove other pieces of the file from download queue
2019-03-15 21:06:59 +01:00
for key in list ( self . site . bad_files . keys ( ) ) :
2017-10-16 01:43:51 +02:00
if key . startswith ( inner_path + " | " ) :
del self . site . bad_files [ key ]
2018-10-15 13:03:36 +02:00
self . site . worker_manager . removeSolvedFileTasks ( )
2018-03-29 03:15:49 +02:00
return super ( ContentManagerPlugin , self ) . optionalRemoved ( inner_path , hash_id , size )
2017-10-04 13:28:59 +02:00
@PluginManager.registerTo ( " SiteStorage " )
class SiteStoragePlugin ( object ) :
def __init__ ( self , * args , * * kwargs ) :
super ( SiteStoragePlugin , self ) . __init__ ( * args , * * kwargs )
self . piecefields = collections . defaultdict ( BigfilePiecefield )
if " piecefields " in self . site . settings . get ( " cache " , { } ) :
2019-03-15 21:06:59 +01:00
for sha512 , piecefield_packed in self . site . settings [ " cache " ] . get ( " piecefields " ) . items ( ) :
2017-10-13 01:15:21 +02:00
if piecefield_packed :
2019-03-15 21:06:59 +01:00
self . piecefields [ sha512 ] . unpack ( base64 . b64decode ( piecefield_packed ) )
2017-10-04 13:28:59 +02:00
self . site . settings [ " cache " ] [ " piecefields " ] = { }
def createSparseFile ( self , inner_path , size , sha512 = None ) :
file_path = self . getPath ( inner_path )
2018-02-11 01:48:17 +01:00
2020-01-09 16:35:05 +01:00
self . ensureDir ( os . path . dirname ( inner_path ) )
2018-02-11 01:48:17 +01:00
2017-10-04 13:28:59 +02:00
f = open ( file_path , ' wb ' )
2018-07-16 01:34:21 +02:00
f . truncate ( min ( 1024 * 1024 * 5 , size ) ) # Only pre-allocate up to 5MB
2017-10-04 13:28:59 +02:00
f . close ( )
if os . name == " nt " :
2017-10-04 17:25:14 +02:00
startupinfo = subprocess . STARTUPINFO ( )
startupinfo . dwFlags | = subprocess . STARTF_USESHOWWINDOW
subprocess . call ( [ " fsutil " , " sparse " , " setflag " , file_path ] , close_fds = True , startupinfo = startupinfo )
2017-10-04 13:28:59 +02:00
if sha512 and sha512 in self . piecefields :
self . log . debug ( " %s : File not exists, but has piecefield. Deleting piecefield. " % inner_path )
del self . piecefields [ sha512 ]
def write ( self , inner_path , content ) :
if " | " not in inner_path :
return super ( SiteStoragePlugin , self ) . write ( inner_path , content )
# Write to specific position by passing |{pos} after the filename
inner_path , file_range = inner_path . split ( " | " )
pos_from , pos_to = map ( int , file_range . split ( " - " ) )
file_path = self . getPath ( inner_path )
# Create dir if not exist
2019-12-17 14:18:54 +01:00
self . ensureDir ( os . path . dirname ( inner_path ) )
2017-10-04 13:28:59 +02:00
if not os . path . isfile ( file_path ) :
file_info = self . site . content_manager . getFileInfo ( inner_path )
self . createSparseFile ( inner_path , file_info [ " size " ] )
# Write file
with open ( file_path , " rb+ " ) as file :
file . seek ( pos_from )
if hasattr ( content , ' read ' ) : # File-like object
shutil . copyfileobj ( content , file ) # Write buff to disk
else : # Simple string
file . write ( content )
del content
self . onUpdated ( inner_path )
2018-03-14 22:21:45 +01:00
def checkBigfile ( self , inner_path ) :
2017-10-04 13:28:59 +02:00
file_info = self . site . content_manager . getFileInfo ( inner_path )
2017-11-19 13:43:30 +01:00
if not file_info or ( file_info and " piecemap " not in file_info ) : # It's not a big file
2017-10-04 13:28:59 +02:00
return False
2018-03-29 03:16:47 +02:00
self . site . settings [ " has_bigfile " ] = True
2017-10-04 13:28:59 +02:00
file_path = self . getPath ( inner_path )
sha512 = file_info [ " sha512 " ]
piece_num = int ( math . ceil ( float ( file_info [ " size " ] ) / file_info [ " piece_size " ] ) )
if os . path . isfile ( file_path ) :
if sha512 not in self . piecefields :
2019-04-08 18:11:46 +02:00
if open ( file_path , " rb " ) . read ( 128 ) == b " \0 " * 128 :
2019-04-16 15:14:19 +02:00
piece_data = b " \x00 "
2017-10-04 13:28:59 +02:00
else :
2019-04-16 15:14:19 +02:00
piece_data = b " \x01 "
2017-10-04 13:28:59 +02:00
self . log . debug ( " %s : File exists, but not in piecefield. Filling piecefiled with %s * %s . " % ( inner_path , piece_num , piece_data ) )
2019-04-16 15:14:19 +02:00
self . piecefields [ sha512 ] . frombytes ( piece_data * piece_num )
2017-10-04 13:28:59 +02:00
else :
self . log . debug ( " Creating bigfile: %s " % inner_path )
self . createSparseFile ( inner_path , file_info [ " size " ] , sha512 )
2019-04-16 15:14:19 +02:00
self . piecefields [ sha512 ] . frombytes ( b " \x00 " * piece_num )
self . log . debug ( " Created bigfile: %s " % inner_path )
2018-03-14 22:21:45 +01:00
return True
def openBigfile ( self , inner_path , prebuffer = 0 ) :
if not self . checkBigfile ( inner_path ) :
return False
self . site . needFile ( inner_path , blocking = False ) # Download piecemap
2017-10-04 13:28:59 +02:00
return BigFile ( self . site , inner_path , prebuffer = prebuffer )
class BigFile ( object ) :
def __init__ ( self , site , inner_path , prebuffer = 0 ) :
self . site = site
self . inner_path = inner_path
file_path = site . storage . getPath ( inner_path )
file_info = self . site . content_manager . getFileInfo ( inner_path )
self . piece_size = file_info [ " piece_size " ]
self . sha512 = file_info [ " sha512 " ]
self . size = file_info [ " size " ]
self . prebuffer = prebuffer
self . read_bytes = 0
self . piecefield = self . site . storage . piecefields [ self . sha512 ]
self . f = open ( file_path , " rb+ " )
2018-08-16 16:20:11 +02:00
self . read_lock = gevent . lock . Semaphore ( )
2017-10-04 13:28:59 +02:00
def read ( self , buff = 64 * 1024 ) :
2018-08-16 16:20:11 +02:00
with self . read_lock :
pos = self . f . tell ( )
read_until = min ( self . size , pos + buff )
requests = [ ]
# Request all required blocks
2017-10-04 13:28:59 +02:00
while 1 :
2019-03-15 21:06:59 +01:00
piece_i = int ( pos / self . piece_size )
2018-08-16 16:20:11 +02:00
if piece_i * self . piece_size > = read_until :
2017-10-04 13:28:59 +02:00
break
pos_from = piece_i * self . piece_size
pos_to = pos_from + self . piece_size
if not self . piecefield [ piece_i ] :
2018-08-16 16:20:11 +02:00
requests . append ( self . site . needFile ( " %s | %s - %s " % ( self . inner_path , pos_from , pos_to ) , blocking = False , update = True , priority = 10 ) )
2017-10-04 13:28:59 +02:00
pos + = self . piece_size
2018-08-16 16:20:11 +02:00
if not all ( requests ) :
return None
# Request prebuffer
if self . prebuffer :
prebuffer_until = min ( self . size , read_until + self . prebuffer )
priority = 3
while 1 :
2019-03-15 21:06:59 +01:00
piece_i = int ( pos / self . piece_size )
2018-08-16 16:20:11 +02:00
if piece_i * self . piece_size > = prebuffer_until :
break
pos_from = piece_i * self . piece_size
pos_to = pos_from + self . piece_size
if not self . piecefield [ piece_i ] :
self . site . needFile ( " %s | %s - %s " % ( self . inner_path , pos_from , pos_to ) , blocking = False , update = True , priority = max ( 0 , priority ) )
priority - = 1
pos + = self . piece_size
gevent . joinall ( requests )
self . read_bytes + = buff
# Increase buffer for long reads
if self . read_bytes > 7 * 1024 * 1024 and self . prebuffer < 5 * 1024 * 1024 :
self . site . log . debug ( " %s : Increasing bigfile buffer size to 5MB... " % self . inner_path )
self . prebuffer = 5 * 1024 * 1024
return self . f . read ( buff )
def seek ( self , pos , whence = 0 ) :
with self . read_lock :
if whence == 2 : # Relative from file end
pos = self . size + pos # Use the real size instead of size on the disk
whence = 0
return self . f . seek ( pos , whence )
2017-10-04 13:28:59 +02:00
2020-05-01 19:24:47 +02:00
def seekable ( self ) :
return self . f . seekable ( )
2017-10-04 13:28:59 +02:00
def tell ( self ) :
2018-08-16 16:20:21 +02:00
return self . f . tell ( )
2017-10-04 13:28:59 +02:00
def close ( self ) :
self . f . close ( )
def __enter__ ( self ) :
return self
def __exit__ ( self , exc_type , exc_val , exc_tb ) :
self . close ( )
@PluginManager.registerTo ( " WorkerManager " )
class WorkerManagerPlugin ( object ) :
def addTask ( self , inner_path , * args , * * kwargs ) :
file_info = kwargs . get ( " file_info " )
if file_info and " piecemap " in file_info : # Bigfile
self . site . settings [ " has_bigfile " ] = True
piecemap_inner_path = helper . getDirname ( file_info [ " content_inner_path " ] ) + file_info [ " piecemap " ]
piecemap_task = None
if not self . site . storage . isFile ( piecemap_inner_path ) :
# Start download piecemap
piecemap_task = super ( WorkerManagerPlugin , self ) . addTask ( piecemap_inner_path , priority = 30 )
2018-03-10 01:59:55 +01:00
autodownload_bigfile_size_limit = self . site . settings . get ( " autodownload_bigfile_size_limit " , config . autodownload_bigfile_size_limit )
if " | " not in inner_path and self . site . isDownloadable ( inner_path ) and file_info [ " size " ] / 1024 / 1024 < = autodownload_bigfile_size_limit :
2017-10-04 13:28:59 +02:00
gevent . spawn_later ( 0.1 , self . site . needFile , inner_path + " |all " ) # Download all pieces
if " | " in inner_path :
# Start download piece
task = super ( WorkerManagerPlugin , self ) . addTask ( inner_path , * args , * * kwargs )
inner_path , file_range = inner_path . split ( " | " )
pos_from , pos_to = map ( int , file_range . split ( " - " ) )
2019-03-15 21:06:59 +01:00
task [ " piece_i " ] = int ( pos_from / file_info [ " piece_size " ] )
2017-10-04 13:28:59 +02:00
task [ " sha512 " ] = file_info [ " sha512 " ]
else :
if inner_path in self . site . bad_files :
del self . site . bad_files [ inner_path ]
if piecemap_task :
task = piecemap_task
else :
fake_evt = gevent . event . AsyncResult ( ) # Don't download anything if no range specified
fake_evt . set ( True )
task = { " evt " : fake_evt }
if not self . site . storage . isFile ( inner_path ) :
self . site . storage . createSparseFile ( inner_path , file_info [ " size " ] , file_info [ " sha512 " ] )
piece_num = int ( math . ceil ( float ( file_info [ " size " ] ) / file_info [ " piece_size " ] ) )
2019-04-16 15:14:19 +02:00
self . site . storage . piecefields [ file_info [ " sha512 " ] ] . frombytes ( b " \x00 " * piece_num )
2017-10-04 13:28:59 +02:00
else :
task = super ( WorkerManagerPlugin , self ) . addTask ( inner_path , * args , * * kwargs )
return task
def taskAddPeer ( self , task , peer ) :
if " piece_i " in task :
if not peer . piecefields [ task [ " sha512 " ] ] [ task [ " piece_i " ] ] :
if task [ " sha512 " ] not in peer . piecefields :
gevent . spawn ( peer . updatePiecefields , force = True )
elif not task [ " peers " ] :
gevent . spawn ( peer . updatePiecefields )
return False # Deny to add peers to task if file not in piecefield
return super ( WorkerManagerPlugin , self ) . taskAddPeer ( task , peer )
@PluginManager.registerTo ( " FileRequest " )
class FileRequestPlugin ( object ) :
def isReadable ( self , site , inner_path , file , pos ) :
# Peek into file
2019-03-15 21:06:59 +01:00
if file . read ( 10 ) == b " \0 " * 10 :
2017-10-04 13:28:59 +02:00
# Looks empty, but makes sures we don't have that piece
file_info = site . content_manager . getFileInfo ( inner_path )
2019-06-04 16:18:52 +02:00
if " piece_size " in file_info :
piece_i = int ( pos / file_info [ " piece_size " ] )
if not site . storage . piecefields [ file_info [ " sha512 " ] ] [ piece_i ] :
return False
2017-10-04 13:28:59 +02:00
# Seek back to position we want to read
file . seek ( pos )
return super ( FileRequestPlugin , self ) . isReadable ( site , inner_path , file , pos )
def actionGetPiecefields ( self , params ) :
site = self . sites . get ( params [ " site " ] )
2019-04-15 15:06:25 +02:00
if not site or not site . isServing ( ) : # Site unknown or not serving
2017-10-04 13:28:59 +02:00
self . response ( { " error " : " Unknown site " } )
return False
# Add peer to site if not added before
peer = site . addPeer ( self . connection . ip , self . connection . port , return_peer = True )
if not peer . connection : # Just added
peer . connect ( self . connection ) # Assign current connection to peer
2019-03-15 21:06:59 +01:00
piecefields_packed = { sha512 : piecefield . pack ( ) for sha512 , piecefield in site . storage . piecefields . items ( ) }
2017-10-04 13:28:59 +02:00
self . response ( { " piecefields_packed " : piecefields_packed } )
def actionSetPiecefields ( self , params ) :
site = self . sites . get ( params [ " site " ] )
2019-04-15 15:06:25 +02:00
if not site or not site . isServing ( ) : # Site unknown or not serving
2017-10-04 13:28:59 +02:00
self . response ( { " error " : " Unknown site " } )
self . connection . badAction ( 5 )
return False
# Add or get peer
peer = site . addPeer ( self . connection . ip , self . connection . port , return_peer = True , connection = self . connection )
if not peer . connection :
peer . connect ( self . connection )
peer . piecefields = collections . defaultdict ( BigfilePiecefieldPacked )
2019-03-15 21:06:59 +01:00
for sha512 , piecefield_packed in params [ " piecefields_packed " ] . items ( ) :
2017-10-04 13:28:59 +02:00
peer . piecefields [ sha512 ] . unpack ( piecefield_packed )
site . settings [ " has_bigfile " ] = True
self . response ( { " ok " : " Updated " } )
@PluginManager.registerTo ( " Peer " )
class PeerPlugin ( object ) :
def __getattr__ ( self , key ) :
if key == " piecefields " :
self . piecefields = collections . defaultdict ( BigfilePiecefieldPacked )
return self . piecefields
elif key == " time_piecefields_updated " :
self . time_piecefields_updated = None
return self . time_piecefields_updated
else :
return super ( PeerPlugin , self ) . __getattr__ ( key )
@util.Noparallel ( ignore_args = True )
def updatePiecefields ( self , force = False ) :
if self . connection and self . connection . handshake . get ( " rev " , 0 ) < 2190 :
return False # Not supported
# Don't update piecefield again in 1 min
if self . time_piecefields_updated and time . time ( ) - self . time_piecefields_updated < 60 and not force :
return False
self . time_piecefields_updated = time . time ( )
res = self . request ( " getPiecefields " , { " site " : self . site . address } )
if not res or " error " in res :
return False
self . piecefields = collections . defaultdict ( BigfilePiecefieldPacked )
2017-10-10 14:58:13 +02:00
try :
2019-03-15 21:06:59 +01:00
for sha512 , piecefield_packed in res [ " piecefields_packed " ] . items ( ) :
2017-10-10 14:58:13 +02:00
self . piecefields [ sha512 ] . unpack ( piecefield_packed )
except Exception as err :
self . log ( " Invalid updatePiecefields response: %s " % Debug . formatException ( err ) )
2017-10-04 13:28:59 +02:00
return self . piecefields
def sendMyHashfield ( self , * args , * * kwargs ) :
return super ( PeerPlugin , self ) . sendMyHashfield ( * args , * * kwargs )
def updateHashfield ( self , * args , * * kwargs ) :
if self . site . settings . get ( " has_bigfile " ) :
thread = gevent . spawn ( self . updatePiecefields , * args , * * kwargs )
back = super ( PeerPlugin , self ) . updateHashfield ( * args , * * kwargs )
thread . join ( )
return back
else :
return super ( PeerPlugin , self ) . updateHashfield ( * args , * * kwargs )
def getFile ( self , site , inner_path , * args , * * kwargs ) :
if " | " in inner_path :
inner_path , file_range = inner_path . split ( " | " )
pos_from , pos_to = map ( int , file_range . split ( " - " ) )
kwargs [ " pos_from " ] = pos_from
kwargs [ " pos_to " ] = pos_to
return super ( PeerPlugin , self ) . getFile ( site , inner_path , * args , * * kwargs )
@PluginManager.registerTo ( " Site " )
class SitePlugin ( object ) :
def isFileDownloadAllowed ( self , inner_path , file_info ) :
if " piecemap " in file_info :
2018-10-30 04:38:18 +01:00
file_size_mb = file_info [ " size " ] / 1024 / 1024
if config . bigfile_size_limit and file_size_mb > config . bigfile_size_limit :
self . log . debug (
" Bigfile size %s too large: %s MB > %s MB, skipping... " %
( inner_path , file_size_mb , config . bigfile_size_limit )
)
return False
2017-10-04 13:28:59 +02:00
file_info = file_info . copy ( )
file_info [ " size " ] = file_info [ " piece_size " ]
return super ( SitePlugin , self ) . isFileDownloadAllowed ( inner_path , file_info )
def getSettingsCache ( self ) :
back = super ( SitePlugin , self ) . getSettingsCache ( )
if self . storage . piecefields :
2019-03-15 21:06:59 +01:00
back [ " piecefields " ] = { sha512 : base64 . b64encode ( piecefield . pack ( ) ) . decode ( " utf8 " ) for sha512 , piecefield in self . storage . piecefields . items ( ) }
2017-10-04 13:28:59 +02:00
return back
def needFile ( self , inner_path , * args , * * kwargs ) :
if inner_path . endswith ( " |all " ) :
@util.Pooled ( 20 )
2018-10-15 13:03:36 +02:00
def pooledNeedBigfile ( inner_path , * args , * * kwargs ) :
if inner_path not in self . bad_files :
self . log . debug ( " Cancelled piece, skipping %s " % inner_path )
return False
return self . needFile ( inner_path , * args , * * kwargs )
2017-10-04 13:28:59 +02:00
inner_path = inner_path . replace ( " |all " , " " )
file_info = self . needFileInfo ( inner_path )
2019-08-30 18:59:19 +02:00
# Use default function to download non-optional file
if " piece_size " not in file_info :
return super ( SitePlugin , self ) . needFile ( inner_path , * args , * * kwargs )
2017-10-04 13:28:59 +02:00
file_size = file_info [ " size " ]
piece_size = file_info [ " piece_size " ]
piece_num = int ( math . ceil ( float ( file_size ) / piece_size ) )
file_threads = [ ]
piecefield = self . storage . piecefields . get ( file_info [ " sha512 " ] )
for piece_i in range ( piece_num ) :
piece_from = piece_i * piece_size
piece_to = min ( file_size , piece_from + piece_size )
if not piecefield or not piecefield [ piece_i ] :
2018-10-15 13:03:36 +02:00
inner_path_piece = " %s | %s - %s " % ( inner_path , piece_from , piece_to )
self . bad_files [ inner_path_piece ] = self . bad_files . get ( inner_path_piece , 1 )
res = pooledNeedBigfile ( inner_path_piece , blocking = False )
2017-10-04 13:28:59 +02:00
if res is not True and res is not False :
file_threads . append ( res )
gevent . joinall ( file_threads )
else :
return super ( SitePlugin , self ) . needFile ( inner_path , * args , * * kwargs )
@PluginManager.registerTo ( " ConfigPlugin " )
class ConfigPlugin ( object ) :
def createArguments ( self ) :
group = self . parser . add_argument_group ( " Bigfile plugin " )
2019-04-06 05:57:58 +02:00
group . add_argument ( ' --autodownload_bigfile_size_limit ' , help = ' Also download bigfiles smaller than this limit if help distribute option is checked ' , default = 10 , metavar = " MB " , type = int )
2018-10-30 04:38:18 +01:00
group . add_argument ( ' --bigfile_size_limit ' , help = ' Maximum size of downloaded big files ' , default = False , metavar = " MB " , type = int )
2017-10-04 13:28:59 +02:00
return super ( ConfigPlugin , self ) . createArguments ( )