2015-01-12 02:03:45 +01:00
from Worker import Worker
2015-02-20 01:37:12 +01:00
import gevent , time , logging , random
2015-01-12 02:03:45 +01:00
MAX_WORKERS = 10
# Worker manager for site
class WorkerManager :
def __init__ ( self , site ) :
self . site = site
self . workers = { } # Key: ip:port, Value: Worker.Worker
2015-01-14 22:57:43 +01:00
self . tasks = [ ] # {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "time_started": None, "time_added": time.time(), "peers": peers, "priority": 0}
2015-02-26 01:32:27 +01:00
self . started_task_num = 0 # Last added task num
2015-01-21 12:58:26 +01:00
self . running = True
2015-01-12 02:03:45 +01:00
self . log = logging . getLogger ( " WorkerManager: %s " % self . site . address_short )
self . process_taskchecker = gevent . spawn ( self . checkTasks )
# Check expired tasks
def checkTasks ( self ) :
2015-01-21 12:58:26 +01:00
while self . running :
2015-01-17 18:50:56 +01:00
time . sleep ( 15 ) # Check every 15 sec
# Clean up workers
version 0.2.4, peerPing and peerGetFile commands, old content update bugfix, new network code and protocol, connection share between sites, connection reuse, dont retry bad file more than 3 times in 20 min, multi threaded include file download, shuffle peers before publish, simple internal stats page, dont retry on failed peers, more than 10 peers publish bugfix
2015-02-23 23:33:31 +01:00
for worker in self . workers . values ( ) :
if worker . task and worker . task [ " done " ] : worker . stop ( ) # Stop workers with task done
2015-01-17 18:50:56 +01:00
2015-01-12 02:03:45 +01:00
if not self . tasks : continue
tasks = self . tasks [ : ] # Copy it so removing elements wont cause any problem
for task in tasks :
2015-01-15 23:24:51 +01:00
if ( task [ " time_started " ] and time . time ( ) > = task [ " time_started " ] + 60 ) or ( time . time ( ) > = task [ " time_added " ] + 60 and not self . workers ) : # Task taking too long time, or no peer after 60sec kill it
2015-01-14 22:57:43 +01:00
self . log . debug ( " Timeout, Cleaning up task: %s " % task )
2015-01-12 02:03:45 +01:00
# Clean up workers
workers = self . findWorkers ( task )
for worker in workers :
worker . stop ( )
# Remove task
self . failTask ( task )
2015-01-14 22:57:43 +01:00
elif ( task [ " time_started " ] and time . time ( ) > = task [ " time_started " ] + 15 ) or not self . workers : # Task started more than 15 sec ago or no workers
self . log . debug ( " Task taking more than 15 secs, find more peers: %s " % task [ " inner_path " ] )
task [ " site " ] . announce ( ) # Find more peers
version 0.2.4, peerPing and peerGetFile commands, old content update bugfix, new network code and protocol, connection share between sites, connection reuse, dont retry bad file more than 3 times in 20 min, multi threaded include file download, shuffle peers before publish, simple internal stats page, dont retry on failed peers, more than 10 peers publish bugfix
2015-02-23 23:33:31 +01:00
if task [ " peers " ] : # Release the peer lock
2015-01-14 22:57:43 +01:00
self . log . debug ( " Task peer lock release: %s " % task [ " inner_path " ] )
task [ " peers " ] = [ ]
self . startWorkers ( )
break # One reannounce per loop
2015-01-21 12:58:26 +01:00
self . log . debug ( " checkTasks stopped running " )
2015-01-14 22:57:43 +01:00
2015-01-12 02:03:45 +01:00
2015-01-17 18:50:56 +01:00
2015-01-14 02:41:13 +01:00
# Tasks sorted by this
def taskSorter ( self , task ) :
if task [ " inner_path " ] == " content.json " : return 9999 # Content.json always prority
if task [ " inner_path " ] == " index.html " : return 9998 # index.html also important
2015-01-14 22:57:43 +01:00
priority = task [ " priority " ]
if task [ " inner_path " ] . endswith ( " .js " ) or task [ " inner_path " ] . endswith ( " .css " ) : priority + = 1 # download js and css files first
return priority - task [ " workers_num " ] # Prefer more priority and less workers
2015-01-14 02:41:13 +01:00
2015-01-12 02:03:45 +01:00
# Returns the next free or less worked task
2015-01-14 02:41:13 +01:00
def getTask ( self , peer ) :
self . tasks . sort ( key = self . taskSorter , reverse = True ) # Sort tasks by priority and worker numbers
for task in self . tasks : # Find a task
2015-01-12 02:03:45 +01:00
if task [ " peers " ] and peer not in task [ " peers " ] : continue # This peer not allowed to pick this task
version 0.2.4, peerPing and peerGetFile commands, old content update bugfix, new network code and protocol, connection share between sites, connection reuse, dont retry bad file more than 3 times in 20 min, multi threaded include file download, shuffle peers before publish, simple internal stats page, dont retry on failed peers, more than 10 peers publish bugfix
2015-02-23 23:33:31 +01:00
if peer . key in task [ " failed " ] : continue # Peer already tried to solve this, but failed
2015-01-14 02:41:13 +01:00
return task
2015-01-12 02:03:45 +01:00
# New peers added to site
def onPeers ( self ) :
self . startWorkers ( )
2015-01-15 23:24:51 +01:00
# Add new worker
def addWorker ( self , peer ) :
key = peer . key
if key not in self . workers and len ( self . workers ) < MAX_WORKERS : # We dont have worker for that peer and workers num less than max
worker = Worker ( self , peer )
self . workers [ key ] = worker
worker . key = key
worker . start ( )
return worker
else : # We have woker for this peer or its over the limit
return False
2015-01-12 02:03:45 +01:00
# Start workers to process tasks
2015-01-15 23:24:51 +01:00
def startWorkers ( self , peers = None ) :
version 0.2.4, peerPing and peerGetFile commands, old content update bugfix, new network code and protocol, connection share between sites, connection reuse, dont retry bad file more than 3 times in 20 min, multi threaded include file download, shuffle peers before publish, simple internal stats page, dont retry on failed peers, more than 10 peers publish bugfix
2015-02-23 23:33:31 +01:00
if len ( self . workers ) > = MAX_WORKERS and not peers : return False # Workers number already maxed and no starting peers definied
2015-01-12 02:03:45 +01:00
if not self . tasks : return False # No task for workers
version 0.2.4, peerPing and peerGetFile commands, old content update bugfix, new network code and protocol, connection share between sites, connection reuse, dont retry bad file more than 3 times in 20 min, multi threaded include file download, shuffle peers before publish, simple internal stats page, dont retry on failed peers, more than 10 peers publish bugfix
2015-02-23 23:33:31 +01:00
if not peers : peers = self . site . peers . values ( ) # No peers definied, use any from site
2015-02-20 01:37:12 +01:00
random . shuffle ( peers )
for peer in peers : # One worker for every peer
2015-01-15 23:24:51 +01:00
if peers and peer not in peers : continue # If peers definied and peer not valid
worker = self . addWorker ( peer )
2015-02-20 01:37:12 +01:00
if worker : self . log . debug ( " Added worker: %s , workers: %s / %s " % ( peer . key , len ( self . workers ) , MAX_WORKERS ) )
2015-01-12 02:03:45 +01:00
2015-01-21 12:58:26 +01:00
# Stop all worker
def stopWorkers ( self ) :
for worker in self . workers . values ( ) :
worker . stop ( )
tasks = self . tasks [ : ] # Copy
for task in tasks : # Mark all current task as failed
self . failTask ( task )
2015-01-12 02:03:45 +01:00
# Find workers by task
def findWorkers ( self , task ) :
workers = [ ]
for worker in self . workers . values ( ) :
if worker . task == task : workers . append ( worker )
return workers
2015-01-14 02:41:13 +01:00
2015-01-12 02:03:45 +01:00
# Ends and remove a worker
def removeWorker ( self , worker ) :
worker . running = False
2015-01-17 18:50:56 +01:00
if worker . key in self . workers :
del ( self . workers [ worker . key ] )
self . log . debug ( " Removed worker, workers: %s / %s " % ( len ( self . workers ) , MAX_WORKERS ) )
2015-01-12 02:03:45 +01:00
# Create new task and return asyncresult
2015-01-14 02:41:13 +01:00
def addTask ( self , inner_path , peer = None , priority = 0 ) :
2015-01-12 02:03:45 +01:00
self . site . onFileStart ( inner_path ) # First task, trigger site download started
task = self . findTask ( inner_path )
if task : # Already has task for that file
2015-01-14 02:41:13 +01:00
if peer and task [ " peers " ] : # This peer also has new version, add it to task possible peers
2015-01-12 02:03:45 +01:00
task [ " peers " ] . append ( peer )
2015-01-15 23:24:51 +01:00
self . log . debug ( " Added peer %s to %s " % ( peer . key , task [ " inner_path " ] ) )
self . startWorkers ( [ peer ] )
2015-01-14 02:41:13 +01:00
if priority :
task [ " priority " ] + = priority # Boost on priority
2015-01-12 02:03:45 +01:00
return task [ " evt " ]
else : # No task for that file yet
evt = gevent . event . AsyncResult ( )
if peer :
peers = [ peer ] # Only download from this peer
else :
peers = None
version 0.2.4, peerPing and peerGetFile commands, old content update bugfix, new network code and protocol, connection share between sites, connection reuse, dont retry bad file more than 3 times in 20 min, multi threaded include file download, shuffle peers before publish, simple internal stats page, dont retry on failed peers, more than 10 peers publish bugfix
2015-02-23 23:33:31 +01:00
task = { " evt " : evt , " workers_num " : 0 , " site " : self . site , " inner_path " : inner_path , " done " : False , " time_added " : time . time ( ) , " time_started " : None , " peers " : peers , " priority " : priority , " failed " : [ ] }
2015-01-12 02:03:45 +01:00
self . tasks . append ( task )
2015-02-26 01:32:27 +01:00
self . started_task_num = len ( self . tasks )
self . log . debug ( " New task: %s , peer lock: %s , priority: %s , tasks: %s " % ( task [ " inner_path " ] , peers , priority , self . started_task_num ) )
2015-01-15 23:24:51 +01:00
self . startWorkers ( peers )
2015-01-12 02:03:45 +01:00
return evt
# Find a task using inner_path
def findTask ( self , inner_path ) :
for task in self . tasks :
if task [ " inner_path " ] == inner_path :
return task
return None # Not found
# Mark a task failed
def failTask ( self , task ) :
task [ " done " ] = True
self . tasks . remove ( task ) # Remove from queue
self . site . onFileFail ( task [ " inner_path " ] )
task [ " evt " ] . set ( False )
# Mark a task done
def doneTask ( self , task ) :
task [ " done " ] = True
self . tasks . remove ( task ) # Remove from queue
self . site . onFileDone ( task [ " inner_path " ] )
task [ " evt " ] . set ( True )
version 0.2.4, peerPing and peerGetFile commands, old content update bugfix, new network code and protocol, connection share between sites, connection reuse, dont retry bad file more than 3 times in 20 min, multi threaded include file download, shuffle peers before publish, simple internal stats page, dont retry on failed peers, more than 10 peers publish bugfix
2015-02-23 23:33:31 +01:00
if not self . tasks : self . site . onComplete ( ) # No more task trigger site complete
2015-01-12 02:03:45 +01:00