2015-01-12 02:03:45 +01:00
|
|
|
import gevent, time, logging, shutil, os
|
|
|
|
from Peer import Peer
|
2015-01-17 18:50:56 +01:00
|
|
|
from Debug import Debug
|
2015-01-12 02:03:45 +01:00
|
|
|
|
|
|
|
class Worker:
|
|
|
|
def __init__(self, manager, peer):
|
|
|
|
self.manager = manager
|
|
|
|
self.peer = peer
|
|
|
|
self.task = None
|
|
|
|
self.key = None
|
|
|
|
self.running = False
|
|
|
|
self.thread = None
|
|
|
|
|
|
|
|
|
|
|
|
# Downloader thread
|
|
|
|
def downloader(self):
|
2015-01-13 21:19:40 +01:00
|
|
|
self.peer.hash_failed = 0 # Reset hash error counter
|
2015-01-12 02:03:45 +01:00
|
|
|
while self.running:
|
|
|
|
# Try to pickup free file download task
|
|
|
|
task = self.manager.getTask(self.peer)
|
|
|
|
if not task: # Die, no more task
|
|
|
|
self.manager.log.debug("%s: No task found, stopping" % self.key)
|
|
|
|
break
|
2015-01-16 11:52:42 +01:00
|
|
|
if not task["time_started"]: task["time_started"] = time.time() # Task started now
|
2015-01-12 02:03:45 +01:00
|
|
|
|
|
|
|
if task["workers_num"] > 0: # Wait a bit if someone already working on it
|
|
|
|
self.manager.log.debug("%s: Someone already working on %s, sleeping 1 sec..." % (self.key, task["inner_path"]))
|
|
|
|
time.sleep(1)
|
2015-01-14 22:57:43 +01:00
|
|
|
self.manager.log.debug("%s: %s, task done after sleep: %s" % (self.key, task["inner_path"], task["done"]))
|
2015-01-12 02:03:45 +01:00
|
|
|
|
|
|
|
if task["done"] == False:
|
|
|
|
self.task = task
|
|
|
|
task["workers_num"] += 1
|
|
|
|
buff = self.peer.getFile(task["site"].address, task["inner_path"])
|
2015-01-20 02:47:00 +01:00
|
|
|
if self.running == False: # Worker no longer needed or got killed
|
|
|
|
self.manager.log.debug("%s: No longer needed, returning: %s" % (self.key, task["inner_path"]))
|
|
|
|
return None
|
2015-01-12 02:03:45 +01:00
|
|
|
if buff: # Download ok
|
version 0.2.0, new lib for bitcoin ecc, dont display or track notify errors, dont reload again within 1 sec, null peer ip fix, signingmoved to ContentManager, content.json include support, content.json multisig ready, content.json proper bitcoincore compatible signing, content.json include permissions, multithreaded publish, publish timeout 60s, no exception on invalid bitcoin address, testcase for new lib, bip32 based persite privatekey generation, multiuser ready, simple json database query command, websocket api fileGet, wrapper loading title stuck bugfix
2015-02-09 02:09:02 +01:00
|
|
|
correct = task["site"].content_manager.verifyFile(task["inner_path"], buff)
|
2015-01-12 02:03:45 +01:00
|
|
|
else: # Download error
|
|
|
|
correct = False
|
|
|
|
if correct == True or correct == None: # Hash ok or same file
|
|
|
|
self.manager.log.debug("%s: Hash correct: %s" % (self.key, task["inner_path"]))
|
|
|
|
if task["done"] == False: # Task not done yet
|
|
|
|
buff.seek(0)
|
|
|
|
file_path = task["site"].getPath(task["inner_path"])
|
|
|
|
file_dir = os.path.dirname(file_path)
|
|
|
|
if not os.path.isdir(file_dir): os.makedirs(file_dir) # Make directory for files
|
|
|
|
file = open(file_path, "wb")
|
|
|
|
shutil.copyfileobj(buff, file) # Write buff to disk
|
|
|
|
file.close()
|
|
|
|
task["workers_num"] -= 1
|
|
|
|
self.manager.doneTask(task)
|
|
|
|
self.task = None
|
|
|
|
else: # Hash failed
|
2015-01-18 22:52:19 +01:00
|
|
|
self.manager.log.debug("%s: Hash failed: %s" % (self.key, task["inner_path"]))
|
2015-01-12 02:03:45 +01:00
|
|
|
self.task = None
|
|
|
|
self.peer.hash_failed += 1
|
2015-01-14 22:57:43 +01:00
|
|
|
if self.peer.hash_failed >= 3: # Broken peer
|
2015-01-12 02:03:45 +01:00
|
|
|
break
|
|
|
|
task["workers_num"] -= 1
|
|
|
|
time.sleep(1)
|
2015-01-13 21:19:40 +01:00
|
|
|
self.peer.onWorkerDone()
|
2015-01-12 02:03:45 +01:00
|
|
|
self.running = False
|
|
|
|
self.manager.removeWorker(self)
|
|
|
|
|
|
|
|
|
|
|
|
# Start the worker
|
|
|
|
def start(self):
|
|
|
|
self.running = True
|
|
|
|
self.thread = gevent.spawn(self.downloader)
|
|
|
|
|
2015-01-17 18:50:56 +01:00
|
|
|
|
|
|
|
# Force stop the worker
|
2015-01-12 02:03:45 +01:00
|
|
|
def stop(self):
|
2015-01-17 18:50:56 +01:00
|
|
|
self.manager.log.debug("%s: Force stopping, thread: %s" % (self.key, self.thread))
|
2015-01-12 02:03:45 +01:00
|
|
|
self.running = False
|
2015-01-17 18:50:56 +01:00
|
|
|
if self.thread:
|
|
|
|
self.thread.kill(exception=Debug.Notify("Worker stopped"))
|
2015-01-12 02:03:45 +01:00
|
|
|
self.manager.removeWorker(self)
|