2017-10-04 13:28:59 +02:00
import time
import os
import subprocess
import shutil
import collections
import gevent
import math
import msgpack
from Plugin import PluginManager
2017-10-10 14:58:13 +02:00
from Debug import Debug
2017-10-04 13:28:59 +02:00
from Crypt import CryptHash
from lib import merkletools
from util import helper
import util
from BigfilePiecefield import BigfilePiecefield , BigfilePiecefieldPacked
# We can only import plugin host clases after the plugins are loaded
@PluginManager.afterLoad
def importPluginnedClasses ( ) :
global VerifyError , config
from Content . ContentManager import VerifyError
from Config import config
if " upload_nonces " not in locals ( ) :
upload_nonces = { }
@PluginManager.registerTo ( " UiRequest " )
class UiRequestPlugin ( object ) :
def isCorsAllowed ( self , path ) :
if path == " /ZeroNet-Internal/BigfileUpload " :
return True
else :
return super ( UiRequestPlugin , self ) . isCorsAllowed ( path )
def actionBigfileUpload ( self ) :
nonce = self . get . get ( " upload_nonce " )
if nonce not in upload_nonces :
return self . error403 ( " Upload nonce error. " )
upload_info = upload_nonces [ nonce ]
del upload_nonces [ nonce ]
self . sendHeader ( 200 , " text/html " , noscript = True , extra_headers = [
( " Access-Control-Allow-Origin " , " null " ) ,
( " Access-Control-Allow-Credentials " , " true " )
] )
self . readMultipartHeaders ( self . env [ ' wsgi.input ' ] ) # Skip http headers
site = upload_info [ " site " ]
inner_path = upload_info [ " inner_path " ]
with site . storage . open ( inner_path , " wb " , create_dirs = True ) as out_file :
merkle_root , piece_size , piecemap_info = site . content_manager . hashBigfile (
self . env [ ' wsgi.input ' ] , upload_info [ " size " ] , upload_info [ " piece_size " ] , out_file
)
if len ( piecemap_info [ " sha512_pieces " ] ) == 1 : # Small file, don't split
hash = piecemap_info [ " sha512_pieces " ] [ 0 ] . encode ( " hex " )
site . content_manager . optionalDownloaded ( inner_path , hash , upload_info [ " size " ] , own = True )
else : # Big file
file_name = helper . getFilename ( inner_path )
msgpack . pack ( { file_name : piecemap_info } , site . storage . open ( upload_info [ " piecemap " ] , " wb " ) )
# Find piecemap and file relative path to content.json
2017-10-26 10:35:46 +02:00
file_info = site . content_manager . getFileInfo ( inner_path , new_file = True )
2017-10-04 13:28:59 +02:00
content_inner_path_dir = helper . getDirname ( file_info [ " content_inner_path " ] )
piecemap_relative_path = upload_info [ " piecemap " ] [ len ( content_inner_path_dir ) : ]
file_relative_path = inner_path [ len ( content_inner_path_dir ) : ]
# Add file to content.json
if site . storage . isFile ( file_info [ " content_inner_path " ] ) :
content = site . storage . loadJson ( file_info [ " content_inner_path " ] )
else :
content = { }
if " files_optional " not in content :
content [ " files_optional " ] = { }
content [ " files_optional " ] [ file_relative_path ] = {
" sha512 " : merkle_root ,
" size " : upload_info [ " size " ] ,
" piecemap " : piecemap_relative_path ,
" piece_size " : piece_size
}
site . content_manager . optionalDownloaded ( inner_path , merkle_root , upload_info [ " size " ] , own = True )
site . storage . writeJson ( file_info [ " content_inner_path " ] , content )
2017-10-26 10:35:46 +02:00
site . content_manager . contents . loadItem ( file_info [ " content_inner_path " ] ) # reload cache
2017-10-04 13:28:59 +02:00
return {
" merkle_root " : merkle_root ,
" piece_num " : len ( piecemap_info [ " sha512_pieces " ] ) ,
" piece_size " : piece_size ,
" inner_path " : inner_path
}
def readMultipartHeaders ( self , wsgi_input ) :
for i in range ( 100 ) :
line = wsgi_input . readline ( )
if line == " \r \n " :
break
return i
def actionFile ( self , file_path , * args , * * kwargs ) :
if kwargs . get ( " file_size " , 0 ) > 1024 * 1024 and kwargs . get ( " path_parts " ) : # Only check files larger than 1MB
path_parts = kwargs [ " path_parts " ]
site = self . server . site_manager . get ( path_parts [ " address " ] )
kwargs [ " file_obj " ] = site . storage . openBigfile ( path_parts [ " inner_path " ] , prebuffer = 2 * 1024 * 1024 )
return super ( UiRequestPlugin , self ) . actionFile ( file_path , * args , * * kwargs )
@PluginManager.registerTo ( " UiWebsocket " )
class UiWebsocketPlugin ( object ) :
def actionBigfileUploadInit ( self , to , inner_path , size ) :
valid_signers = self . site . content_manager . getValidSigners ( inner_path )
auth_address = self . user . getAuthAddress ( self . site . address )
if not self . site . settings [ " own " ] and auth_address not in valid_signers :
self . log . error ( " FileWrite forbidden %s not in valid_signers %s " % ( auth_address , valid_signers ) )
return self . response ( to , { " error " : " Forbidden, you can only modify your own files " } )
nonce = CryptHash . random ( )
piece_size = 1024 * 1024
inner_path = self . site . content_manager . sanitizePath ( inner_path )
2017-10-16 15:11:19 +02:00
file_info = self . site . content_manager . getFileInfo ( inner_path , new_file = True )
2017-10-04 13:28:59 +02:00
content_inner_path_dir = helper . getDirname ( file_info [ " content_inner_path " ] )
file_relative_path = inner_path [ len ( content_inner_path_dir ) : ]
upload_nonces [ nonce ] = {
" added " : time . time ( ) ,
" site " : self . site ,
" inner_path " : inner_path ,
" websocket_client " : self ,
" size " : size ,
" piece_size " : piece_size ,
" piecemap " : inner_path + " .piecemap.msgpack "
}
2017-10-26 10:43:45 +02:00
return {
2017-10-04 13:28:59 +02:00
" url " : " /ZeroNet-Internal/BigfileUpload?upload_nonce= " + nonce ,
" pice_size " : piece_size ,
" inner_path " : inner_path ,
" file_relative_path " : file_relative_path
2017-10-26 10:43:45 +02:00
}
2017-10-04 13:28:59 +02:00
@PluginManager.registerTo ( " ContentManager " )
class ContentManagerPlugin ( object ) :
2017-10-16 15:11:19 +02:00
def getFileInfo ( self , inner_path , * args , * * kwargs ) :
2017-10-04 13:28:59 +02:00
if " | " not in inner_path :
2017-10-16 15:11:19 +02:00
return super ( ContentManagerPlugin , self ) . getFileInfo ( inner_path , * args , * * kwargs )
2017-10-04 13:28:59 +02:00
inner_path , file_range = inner_path . split ( " | " )
pos_from , pos_to = map ( int , file_range . split ( " - " ) )
2017-10-16 15:11:19 +02:00
file_info = super ( ContentManagerPlugin , self ) . getFileInfo ( inner_path , * args , * * kwargs )
2017-10-04 13:28:59 +02:00
return file_info
def readFile ( self , file_in , size , buff_size = 1024 * 64 ) :
part_num = 0
recv_left = size
while 1 :
part_num + = 1
read_size = min ( buff_size , recv_left )
part = file_in . read ( read_size )
if not part :
break
yield part
if part_num % 100 == 0 : # Avoid blocking ZeroNet execution during upload
time . sleep ( 0.001 )
recv_left - = read_size
if recv_left < = 0 :
break
def hashBigfile ( self , file_in , size , piece_size = 1024 * 1024 , file_out = None ) :
self . site . settings [ " has_bigfile " ] = True
recv = 0
try :
piece_hash = CryptHash . sha512t ( )
piece_hashes = [ ]
piece_recv = 0
mt = merkletools . MerkleTools ( )
mt . hash_function = CryptHash . sha512t
part = " "
for part in self . readFile ( file_in , size ) :
if file_out :
file_out . write ( part )
recv + = len ( part )
piece_recv + = len ( part )
piece_hash . update ( part )
if piece_recv > = piece_size :
piece_digest = piece_hash . digest ( )
piece_hashes . append ( piece_digest )
mt . leaves . append ( piece_digest )
piece_hash = CryptHash . sha512t ( )
piece_recv = 0
if len ( piece_hashes ) % 100 == 0 or recv == size :
self . log . info ( " - [HASHING: %.0f %% ] Pieces: %s , %.1f MB/ %.1f MB " % (
float ( recv ) / size * 100 , len ( piece_hashes ) , recv / 1024 / 1024 , size / 1024 / 1024
) )
part = " "
if len ( part ) > 0 :
piece_digest = piece_hash . digest ( )
piece_hashes . append ( piece_digest )
mt . leaves . append ( piece_digest )
except Exception as err :
raise err
finally :
if file_out :
file_out . close ( )
mt . make_tree ( )
return mt . get_merkle_root ( ) , piece_size , {
" sha512_pieces " : piece_hashes
}
def hashFile ( self , dir_inner_path , file_relative_path , optional = False ) :
inner_path = dir_inner_path + file_relative_path
file_size = self . site . storage . getSize ( inner_path )
# Only care about optional files >1MB
if not optional or file_size < 1 * 1024 * 1024 :
return super ( ContentManagerPlugin , self ) . hashFile ( dir_inner_path , file_relative_path , optional )
back = { }
content = self . contents . get ( dir_inner_path + " content.json " )
hash = None
piecemap_relative_path = None
piece_size = None
# Don't re-hash if it's already in content.json
2017-10-13 11:27:45 +02:00
if content and file_relative_path in content . get ( " files_optional " , { } ) :
2017-10-04 13:28:59 +02:00
file_node = content [ " files_optional " ] [ file_relative_path ]
if file_node [ " size " ] == file_size :
self . log . info ( " - [SAME SIZE] %s " % file_relative_path )
hash = file_node . get ( " sha512 " )
piecemap_relative_path = file_node . get ( " piecemap " )
piece_size = file_node . get ( " piece_size " )
if not hash or not piecemap_relative_path : # Not in content.json yet
if file_size < 5 * 1024 * 1024 : # Don't create piecemap automatically for files smaller than 5MB
return super ( ContentManagerPlugin , self ) . hashFile ( dir_inner_path , file_relative_path , optional )
self . log . info ( " - [HASHING] %s " % file_relative_path )
merkle_root , piece_size , piecemap_info = self . hashBigfile ( self . site . storage . open ( inner_path , " rb " ) , file_size )
if not hash :
hash = merkle_root
if not piecemap_relative_path :
file_name = helper . getFilename ( file_relative_path )
piecemap_relative_path = file_relative_path + " .piecemap.msgpack "
piecemap_inner_path = inner_path + " .piecemap.msgpack "
msgpack . pack ( { file_name : piecemap_info } , self . site . storage . open ( piecemap_inner_path , " wb " ) )
back . update ( super ( ContentManagerPlugin , self ) . hashFile ( dir_inner_path , piecemap_relative_path , True ) )
piece_num = int ( math . ceil ( float ( file_size ) / piece_size ) )
# Add the merkle root to hashfield
self . optionalDownloaded ( inner_path , hash , file_size , own = True )
self . site . storage . piecefields [ hash ] . fromstring ( " 1 " * piece_num )
back [ file_relative_path ] = { " sha512 " : hash , " size " : file_size , " piecemap " : piecemap_relative_path , " piece_size " : piece_size }
return back
def getPiecemap ( self , inner_path ) :
file_info = self . site . content_manager . getFileInfo ( inner_path )
piecemap_inner_path = helper . getDirname ( file_info [ " content_inner_path " ] ) + file_info [ " piecemap " ]
self . site . needFile ( piecemap_inner_path , priority = 20 )
piecemap = msgpack . unpack ( self . site . storage . open ( piecemap_inner_path ) ) [ helper . getFilename ( inner_path ) ]
piecemap [ " piece_size " ] = file_info [ " piece_size " ]
return piecemap
def verifyPiece ( self , inner_path , pos , piece ) :
piecemap = self . getPiecemap ( inner_path )
piece_i = pos / piecemap [ " piece_size " ]
if CryptHash . sha512sum ( piece , format = " digest " ) != piecemap [ " sha512_pieces " ] [ piece_i ] :
raise VerifyError ( " Invalid hash " )
return True
def verifyFile ( self , inner_path , file , ignore_same = True ) :
if " | " not in inner_path :
return super ( ContentManagerPlugin , self ) . verifyFile ( inner_path , file , ignore_same )
inner_path , file_range = inner_path . split ( " | " )
pos_from , pos_to = map ( int , file_range . split ( " - " ) )
return self . verifyPiece ( inner_path , pos_from , file )
def optionalDownloaded ( self , inner_path , hash , size = None , own = False ) :
if " | " in inner_path :
inner_path , file_range = inner_path . split ( " | " )
pos_from , pos_to = map ( int , file_range . split ( " - " ) )
file_info = self . getFileInfo ( inner_path )
# Mark piece downloaded
piece_i = pos_from / file_info [ " piece_size " ]
self . site . storage . piecefields [ file_info [ " sha512 " ] ] [ piece_i ] = True
# Only add to site size on first request
if hash in self . hashfield :
size = 0
return super ( ContentManagerPlugin , self ) . optionalDownloaded ( inner_path , hash , size , own )
def optionalRemove ( self , inner_path , hash , size = None ) :
if size and size > 1024 * 1024 :
file_info = self . getFileInfo ( inner_path )
sha512 = file_info [ " sha512 " ]
if sha512 in self . site . storage . piecefields :
del self . site . storage . piecefields [ sha512 ]
2017-10-16 01:43:51 +02:00
# Also remove other pieces of the file from download queue
for key in self . site . bad_files . keys ( ) :
if key . startswith ( inner_path + " | " ) :
del self . site . bad_files [ key ]
2017-10-04 13:28:59 +02:00
return super ( ContentManagerPlugin , self ) . optionalRemove ( inner_path , hash , size )
@PluginManager.registerTo ( " SiteStorage " )
class SiteStoragePlugin ( object ) :
def __init__ ( self , * args , * * kwargs ) :
super ( SiteStoragePlugin , self ) . __init__ ( * args , * * kwargs )
self . piecefields = collections . defaultdict ( BigfilePiecefield )
if " piecefields " in self . site . settings . get ( " cache " , { } ) :
for sha512 , piecefield_packed in self . site . settings [ " cache " ] . get ( " piecefields " ) . iteritems ( ) :
2017-10-13 01:15:21 +02:00
if piecefield_packed :
self . piecefields [ sha512 ] . unpack ( piecefield_packed . decode ( " base64 " ) )
2017-10-04 13:28:59 +02:00
self . site . settings [ " cache " ] [ " piecefields " ] = { }
def createSparseFile ( self , inner_path , size , sha512 = None ) :
file_path = self . getPath ( inner_path )
f = open ( file_path , ' wb ' )
f . truncate ( size )
f . close ( )
if os . name == " nt " :
2017-10-04 17:25:14 +02:00
startupinfo = subprocess . STARTUPINFO ( )
startupinfo . dwFlags | = subprocess . STARTF_USESHOWWINDOW
subprocess . call ( [ " fsutil " , " sparse " , " setflag " , file_path ] , close_fds = True , startupinfo = startupinfo )
2017-10-04 13:28:59 +02:00
if sha512 and sha512 in self . piecefields :
self . log . debug ( " %s : File not exists, but has piecefield. Deleting piecefield. " % inner_path )
del self . piecefields [ sha512 ]
def write ( self , inner_path , content ) :
if " | " not in inner_path :
return super ( SiteStoragePlugin , self ) . write ( inner_path , content )
# Write to specific position by passing |{pos} after the filename
inner_path , file_range = inner_path . split ( " | " )
pos_from , pos_to = map ( int , file_range . split ( " - " ) )
file_path = self . getPath ( inner_path )
# Create dir if not exist
file_dir = os . path . dirname ( file_path )
if not os . path . isdir ( file_dir ) :
os . makedirs ( file_dir )
if not os . path . isfile ( file_path ) :
file_info = self . site . content_manager . getFileInfo ( inner_path )
self . createSparseFile ( inner_path , file_info [ " size " ] )
# Write file
with open ( file_path , " rb+ " ) as file :
file . seek ( pos_from )
if hasattr ( content , ' read ' ) : # File-like object
shutil . copyfileobj ( content , file ) # Write buff to disk
else : # Simple string
file . write ( content )
del content
self . onUpdated ( inner_path )
def openBigfile ( self , inner_path , prebuffer = 0 ) :
file_info = self . site . content_manager . getFileInfo ( inner_path )
2017-10-10 14:58:43 +02:00
if file_info and " piecemap " not in file_info : # It's not a big file
2017-10-04 13:28:59 +02:00
return False
self . site . needFile ( inner_path , blocking = False ) # Download piecemap
file_path = self . getPath ( inner_path )
sha512 = file_info [ " sha512 " ]
piece_num = int ( math . ceil ( float ( file_info [ " size " ] ) / file_info [ " piece_size " ] ) )
if os . path . isfile ( file_path ) :
if sha512 not in self . piecefields :
if open ( file_path ) . read ( 128 ) == " \0 " * 128 :
piece_data = " 0 "
else :
piece_data = " 1 "
self . log . debug ( " %s : File exists, but not in piecefield. Filling piecefiled with %s * %s . " % ( inner_path , piece_num , piece_data ) )
self . piecefields [ sha512 ] . fromstring ( piece_data * piece_num )
else :
self . log . debug ( " Creating bigfile: %s " % inner_path )
self . createSparseFile ( inner_path , file_info [ " size " ] , sha512 )
self . piecefields [ sha512 ] . fromstring ( piece_data * " 0 " )
return BigFile ( self . site , inner_path , prebuffer = prebuffer )
class BigFile ( object ) :
def __init__ ( self , site , inner_path , prebuffer = 0 ) :
self . site = site
self . inner_path = inner_path
file_path = site . storage . getPath ( inner_path )
file_info = self . site . content_manager . getFileInfo ( inner_path )
self . piece_size = file_info [ " piece_size " ]
self . sha512 = file_info [ " sha512 " ]
self . size = file_info [ " size " ]
self . prebuffer = prebuffer
self . read_bytes = 0
self . piecefield = self . site . storage . piecefields [ self . sha512 ]
self . f = open ( file_path , " rb+ " )
def read ( self , buff = 64 * 1024 ) :
pos = self . f . tell ( )
read_until = pos + buff
requests = [ ]
# Request all required blocks
while 1 :
piece_i = pos / self . piece_size
if piece_i * self . piece_size > = read_until :
break
pos_from = piece_i * self . piece_size
pos_to = pos_from + self . piece_size
if not self . piecefield [ piece_i ] :
requests . append ( self . site . needFile ( " %s | %s - %s " % ( self . inner_path , pos_from , pos_to ) , blocking = False , update = True , priority = 10 ) )
pos + = self . piece_size
if not all ( requests ) :
return None
# Request prebuffer
if self . prebuffer :
prebuffer_until = min ( self . size , read_until + self . prebuffer )
priority = 3
while 1 :
piece_i = pos / self . piece_size
if piece_i * self . piece_size > = prebuffer_until :
break
pos_from = piece_i * self . piece_size
pos_to = pos_from + self . piece_size
if not self . piecefield [ piece_i ] :
self . site . needFile ( " %s | %s - %s " % ( self . inner_path , pos_from , pos_to ) , blocking = False , update = True , priority = max ( 0 , priority ) )
priority - = 1
pos + = self . piece_size
gevent . joinall ( requests )
self . read_bytes + = buff
# Increase buffer for long reads
if self . read_bytes > 7 * 1024 * 1024 and self . prebuffer < 5 * 1024 * 1024 :
self . site . log . debug ( " %s : Increasing bigfile buffer size to 5MB... " % self . inner_path )
self . prebuffer = 5 * 1024 * 1024
return self . f . read ( buff )
def seek ( self , pos ) :
return self . f . seek ( pos )
def tell ( self ) :
self . f . tell ( )
def close ( self ) :
self . f . close ( )
def __enter__ ( self ) :
return self
def __exit__ ( self , exc_type , exc_val , exc_tb ) :
self . close ( )
@PluginManager.registerTo ( " WorkerManager " )
class WorkerManagerPlugin ( object ) :
def addTask ( self , inner_path , * args , * * kwargs ) :
file_info = kwargs . get ( " file_info " )
if file_info and " piecemap " in file_info : # Bigfile
self . site . settings [ " has_bigfile " ] = True
piecemap_inner_path = helper . getDirname ( file_info [ " content_inner_path " ] ) + file_info [ " piecemap " ]
piecemap_task = None
if not self . site . storage . isFile ( piecemap_inner_path ) :
# Start download piecemap
piecemap_task = super ( WorkerManagerPlugin , self ) . addTask ( piecemap_inner_path , priority = 30 )
if " | " not in inner_path and self . site . isDownloadable ( inner_path ) and file_info [ " size " ] / 1024 / 1024 < = config . autodownload_bigfile_size_limit :
gevent . spawn_later ( 0.1 , self . site . needFile , inner_path + " |all " ) # Download all pieces
if " | " in inner_path :
# Start download piece
task = super ( WorkerManagerPlugin , self ) . addTask ( inner_path , * args , * * kwargs )
inner_path , file_range = inner_path . split ( " | " )
pos_from , pos_to = map ( int , file_range . split ( " - " ) )
task [ " piece_i " ] = pos_from / file_info [ " piece_size " ]
task [ " sha512 " ] = file_info [ " sha512 " ]
else :
if inner_path in self . site . bad_files :
del self . site . bad_files [ inner_path ]
if piecemap_task :
task = piecemap_task
else :
fake_evt = gevent . event . AsyncResult ( ) # Don't download anything if no range specified
fake_evt . set ( True )
task = { " evt " : fake_evt }
if not self . site . storage . isFile ( inner_path ) :
self . site . storage . createSparseFile ( inner_path , file_info [ " size " ] , file_info [ " sha512 " ] )
piece_num = int ( math . ceil ( float ( file_info [ " size " ] ) / file_info [ " piece_size " ] ) )
self . site . storage . piecefields [ file_info [ " sha512 " ] ] . fromstring ( " 0 " * piece_num )
else :
task = super ( WorkerManagerPlugin , self ) . addTask ( inner_path , * args , * * kwargs )
return task
def taskAddPeer ( self , task , peer ) :
if " piece_i " in task :
if not peer . piecefields [ task [ " sha512 " ] ] [ task [ " piece_i " ] ] :
if task [ " sha512 " ] not in peer . piecefields :
gevent . spawn ( peer . updatePiecefields , force = True )
elif not task [ " peers " ] :
gevent . spawn ( peer . updatePiecefields )
return False # Deny to add peers to task if file not in piecefield
return super ( WorkerManagerPlugin , self ) . taskAddPeer ( task , peer )
@PluginManager.registerTo ( " FileRequest " )
class FileRequestPlugin ( object ) :
def isReadable ( self , site , inner_path , file , pos ) :
# Peek into file
if file . read ( 10 ) == " \0 " * 10 :
# Looks empty, but makes sures we don't have that piece
file_info = site . content_manager . getFileInfo ( inner_path )
piece_i = pos / file_info [ " piece_size " ]
if not site . storage . piecefields [ file_info [ " sha512 " ] ] [ piece_i ] :
return False
# Seek back to position we want to read
file . seek ( pos )
return super ( FileRequestPlugin , self ) . isReadable ( site , inner_path , file , pos )
def actionGetPiecefields ( self , params ) :
site = self . sites . get ( params [ " site " ] )
if not site or not site . settings [ " serving " ] : # Site unknown or not serving
self . response ( { " error " : " Unknown site " } )
return False
# Add peer to site if not added before
peer = site . addPeer ( self . connection . ip , self . connection . port , return_peer = True )
if not peer . connection : # Just added
peer . connect ( self . connection ) # Assign current connection to peer
piecefields_packed = { sha512 : piecefield . pack ( ) for sha512 , piecefield in site . storage . piecefields . iteritems ( ) }
self . response ( { " piecefields_packed " : piecefields_packed } )
def actionSetPiecefields ( self , params ) :
site = self . sites . get ( params [ " site " ] )
if not site or not site . settings [ " serving " ] : # Site unknown or not serving
self . response ( { " error " : " Unknown site " } )
self . connection . badAction ( 5 )
return False
# Add or get peer
peer = site . addPeer ( self . connection . ip , self . connection . port , return_peer = True , connection = self . connection )
if not peer . connection :
peer . connect ( self . connection )
peer . piecefields = collections . defaultdict ( BigfilePiecefieldPacked )
for sha512 , piecefield_packed in params [ " piecefields_packed " ] . iteritems ( ) :
peer . piecefields [ sha512 ] . unpack ( piecefield_packed )
site . settings [ " has_bigfile " ] = True
self . response ( { " ok " : " Updated " } )
@PluginManager.registerTo ( " Peer " )
class PeerPlugin ( object ) :
def __getattr__ ( self , key ) :
if key == " piecefields " :
self . piecefields = collections . defaultdict ( BigfilePiecefieldPacked )
return self . piecefields
elif key == " time_piecefields_updated " :
self . time_piecefields_updated = None
return self . time_piecefields_updated
else :
return super ( PeerPlugin , self ) . __getattr__ ( key )
@util.Noparallel ( ignore_args = True )
def updatePiecefields ( self , force = False ) :
if self . connection and self . connection . handshake . get ( " rev " , 0 ) < 2190 :
return False # Not supported
# Don't update piecefield again in 1 min
if self . time_piecefields_updated and time . time ( ) - self . time_piecefields_updated < 60 and not force :
return False
self . time_piecefields_updated = time . time ( )
res = self . request ( " getPiecefields " , { " site " : self . site . address } )
if not res or " error " in res :
return False
self . piecefields = collections . defaultdict ( BigfilePiecefieldPacked )
2017-10-10 14:58:13 +02:00
try :
for sha512 , piecefield_packed in res [ " piecefields_packed " ] . iteritems ( ) :
self . piecefields [ sha512 ] . unpack ( piecefield_packed )
except Exception as err :
self . log ( " Invalid updatePiecefields response: %s " % Debug . formatException ( err ) )
2017-10-04 13:28:59 +02:00
return self . piecefields
def sendMyHashfield ( self , * args , * * kwargs ) :
return super ( PeerPlugin , self ) . sendMyHashfield ( * args , * * kwargs )
def updateHashfield ( self , * args , * * kwargs ) :
if self . site . settings . get ( " has_bigfile " ) :
thread = gevent . spawn ( self . updatePiecefields , * args , * * kwargs )
back = super ( PeerPlugin , self ) . updateHashfield ( * args , * * kwargs )
thread . join ( )
return back
else :
return super ( PeerPlugin , self ) . updateHashfield ( * args , * * kwargs )
def getFile ( self , site , inner_path , * args , * * kwargs ) :
if " | " in inner_path :
inner_path , file_range = inner_path . split ( " | " )
pos_from , pos_to = map ( int , file_range . split ( " - " ) )
kwargs [ " pos_from " ] = pos_from
kwargs [ " pos_to " ] = pos_to
return super ( PeerPlugin , self ) . getFile ( site , inner_path , * args , * * kwargs )
@PluginManager.registerTo ( " Site " )
class SitePlugin ( object ) :
def isFileDownloadAllowed ( self , inner_path , file_info ) :
if " piecemap " in file_info :
file_info = file_info . copy ( )
file_info [ " size " ] = file_info [ " piece_size " ]
return super ( SitePlugin , self ) . isFileDownloadAllowed ( inner_path , file_info )
def getSettingsCache ( self ) :
back = super ( SitePlugin , self ) . getSettingsCache ( )
if self . storage . piecefields :
back [ " piecefields " ] = { sha512 : piecefield . pack ( ) . encode ( " base64 " ) for sha512 , piecefield in self . storage . piecefields . iteritems ( ) }
return back
def needFile ( self , inner_path , * args , * * kwargs ) :
if inner_path . endswith ( " |all " ) :
@util.Pooled ( 20 )
def pooledNeedBigfile ( * args , * * kwargs ) :
return self . needFile ( * args , * * kwargs )
inner_path = inner_path . replace ( " |all " , " " )
file_info = self . needFileInfo ( inner_path )
file_size = file_info [ " size " ]
piece_size = file_info [ " piece_size " ]
piece_num = int ( math . ceil ( float ( file_size ) / piece_size ) )
file_threads = [ ]
piecefield = self . storage . piecefields . get ( file_info [ " sha512 " ] )
for piece_i in range ( piece_num ) :
piece_from = piece_i * piece_size
piece_to = min ( file_size , piece_from + piece_size )
if not piecefield or not piecefield [ piece_i ] :
res = pooledNeedBigfile ( " %s | %s - %s " % ( inner_path , piece_from , piece_to ) , blocking = False )
if res is not True and res is not False :
file_threads . append ( res )
gevent . joinall ( file_threads )
else :
return super ( SitePlugin , self ) . needFile ( inner_path , * args , * * kwargs )
@PluginManager.registerTo ( " ConfigPlugin " )
class ConfigPlugin ( object ) :
def createArguments ( self ) :
group = self . parser . add_argument_group ( " Bigfile plugin " )
group . add_argument ( ' --autodownload_bigfile_size_limit ' , help = ' Also download bigfiles until this limit if help distribute option is checked ' , default = 1 , metavar = " MB " , type = int )
return super ( ConfigPlugin , self ) . createArguments ( )