diff --git a/plugins/Stats/StatsPlugin.py b/plugins/Stats/StatsPlugin.py index f8976410..a6b43475 100644 --- a/plugins/Stats/StatsPlugin.py +++ b/plugins/Stats/StatsPlugin.py @@ -218,6 +218,7 @@ class UiRequestPlugin(object): for module_name, module in objs: yield " - %.3fkb: %s %s
" % (self.getObjSize(module, hpy), module_name, cgi.escape(repr(module))) + gc.collect() # Implicit grabage collection yield "Done in %.1f" % (time.time()-s) @@ -243,6 +244,8 @@ class UiRequestPlugin(object): yield "- %s: %s
" % (attr, cgi.escape(str(getattr(obj, attr)))) yield "
" + gc.collect() # Implicit grabage collection + def actionListobj(self): import gc, sys @@ -286,9 +289,11 @@ class UiRequestPlugin(object): for obj, stat in sorted(ref_count.items(), key=lambda x: x[1][0], reverse=True)[0:30]: # Sorted by count yield " - %.1fkb = %s x %s
" % (stat[1], stat[0], cgi.escape(str(obj)) ) + gc.collect() # Implicit grabage collection + def actionBenchmark(self): - import sys + import sys, gc from contextlib import contextmanager output = self.sendHeader() @@ -493,5 +498,12 @@ class UiRequestPlugin(object): db.close() if os.path.isfile("data/benchmark.db"): os.unlink("data/benchmark.db") + gc.collect() # Implicit grabage collection - yield "
Done. Total: %.2fs" % (time.time()-t) \ No newline at end of file + yield "
Done. Total: %.2fs" % (time.time()-t) + + + def actionGcCollect(self): + import gc + self.sendHeader() + yield str(gc.collect()) diff --git a/src/Config.py b/src/Config.py index 7aadfc98..e7eeb85a 100644 --- a/src/Config.py +++ b/src/Config.py @@ -4,7 +4,7 @@ import ConfigParser class Config(object): def __init__(self): self.version = "0.2.9" - self.rev = 126 + self.rev = 134 self.parser = self.createArguments() argv = sys.argv[:] # Copy command line arguments argv = self.parseConfig(argv) # Add arguments from config file @@ -104,7 +104,6 @@ class Config(object): parser.add_argument('--fileserver_ip', help='FileServer bind address', default="*", metavar='ip') parser.add_argument('--fileserver_port',help='FileServer bind port', default=15441, type=int, metavar='port') - parser.add_argument('--disable_zeromq', help='Disable compatibility with old clients', action='store_true') parser.add_argument('--disable_udp', help='Disable UDP connections', action='store_true') parser.add_argument('--proxy', help='Socks proxy address', metavar='ip:port') parser.add_argument('--use_openssl', help='Use OpenSSL liblary for speedup', type='bool', choices=[True, False], default=use_openssl) diff --git a/src/Connection/Connection.py b/src/Connection/Connection.py index 76bdd45c..bb28193e 100644 --- a/src/Connection/Connection.py +++ b/src/Connection/Connection.py @@ -3,17 +3,10 @@ from cStringIO import StringIO import gevent, msgpack from Config import config from Debug import Debug -zmq = None -if not config.disable_zeromq: - try: - import zmq.green as zmq - except: - zmq = None - - +from util import StreamingMsgpack class Connection(object): - __slots__ = ("sock", "ip", "port", "peer_id", "id", "protocol", "type", "server", "unpacker", "req_id", "handshake", "connected", "event_connected", "closed", "zmq_sock", "zmq_queue", "zmq_working", "forward_thread", "start_time", "last_recv_time", "last_message_time", "last_send_time", "last_sent_time", "incomplete_buff_recv", "bytes_recv", "bytes_sent", "last_ping_delay", "last_req_time", "last_cmd", "name", "updateName", "waiting_requests") + __slots__ = ("sock", "ip", "port", "peer_id", "id", "protocol", "type", "server", "unpacker", "req_id", "handshake", "connected", "event_connected", "closed", "start_time", "last_recv_time", "last_message_time", "last_send_time", "last_sent_time", "incomplete_buff_recv", "bytes_recv", "bytes_sent", "last_ping_delay", "last_req_time", "last_cmd", "name", "updateName", "waiting_requests") def __init__(self, server, ip, port, sock=None): self.sock = sock @@ -33,11 +26,6 @@ class Connection(object): self.event_connected = gevent.event.AsyncResult() # Solves on handshake received self.closed = False - self.zmq_sock = None # Zeromq sock if outgoing connection - self.zmq_queue = [] # Messages queued to send - self.zmq_working = False # Zmq currently working, just add to queue - self.forward_thread = None # Zmq forwarder thread - # Stats self.start_time = time.time() self.last_recv_time = 0 @@ -82,103 +70,46 @@ class Connection(object): # Detect protocol self.send({"cmd": "handshake", "req_id": 0, "params": self.handshakeInfo()}) gevent.spawn(self.messageLoop) - return self.event_connected.get() # Wait for first char + return self.event_connected.get() # Wait for handshake # Handle incoming connection def handleIncomingConnection(self, sock): self.type = "in" - try: - firstchar = sock.recv(1) # Find out if pure socket or zeromq - except Exception, err: - self.log("Socket firstchar error: %s" % Debug.formatException(err)) - self.close() - return False - if firstchar == "\xff": # Backward compatiblity: forward data to zmq - if config.debug_socket: self.log("Fallback incoming connection to ZeroMQ") - - self.protocol = "zeromq" - self.updateName() - self.connected = True - self.event_connected.set(self.protocol) - - if self.server.zmq_running: - zmq_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - zmq_sock.connect(("127.0.0.1", self.server.zmq_port)) - zmq_sock.send(firstchar) - - self.forward_thread = gevent.spawn(self.server.forward, self, zmq_sock, sock) - self.server.forward(self, sock, zmq_sock) - self.close() # Forward ended close connection - else: - self.log("ZeroMQ Server not running, exiting!") - else: # Normal socket - self.messageLoop(firstchar) + self.messageLoop() # Message loop for connection - def messageLoop(self, firstchar=None): + def messageLoop(self): if not self.sock: self.log("Socket error: No socket found") return False + self.protocol = "v2" + self.updateName() + self.connected = True + + self.unpacker = msgpack.Unpacker() sock = self.sock try: - if not firstchar: firstchar = sock.recv(1) + while True: + buff = sock.recv(16*1024) + if not buff: break # Connection closed + self.last_recv_time = time.time() + self.incomplete_buff_recv += 1 + self.bytes_recv += len(buff) + self.server.bytes_recv += len(buff) + if not self.unpacker: + self.unpacker = msgpack.Unpacker() + self.unpacker.feed(buff) + for message in self.unpacker: + self.incomplete_buff_recv = 0 + self.handleMessage(message) + message = None + buf = None except Exception, err: - self.log("Socket firstchar error: %s" % Debug.formatException(err)) - self.close() - return False - if firstchar == "\xff": # Backward compatibility to zmq - self.sock.close() # Close normal socket - del firstchar - if zmq: - if config.debug_socket: self.log("Connecting as ZeroMQ") - self.protocol = "zeromq" - self.updateName() - self.connected = True - self.event_connected.set(self.protocol) # Mark handshake as done - - try: - context = zmq.Context() - zmq_sock = context.socket(zmq.REQ) - zmq_sock.hwm = 1 - zmq_sock.setsockopt(zmq.RCVTIMEO, 50000) # Wait for data arrive - zmq_sock.setsockopt(zmq.SNDTIMEO, 5000) # Wait for data send - zmq_sock.setsockopt(zmq.LINGER, 500) # Wait for zmq_sock close - zmq_sock.connect('tcp://%s:%s' % (self.ip, self.port)) - self.zmq_sock = zmq_sock - except Exception, err: - if not self.closed: self.log("Socket error: %s" % Debug.formatException(err)) - else: - return False # No zeromq connection supported - else: # Normal socket - self.protocol = "v2" - self.updateName() - self.connected = True - self.event_connected.set(self.protocol) # Mark handshake as done - - self.unpacker = msgpack.Unpacker() - self.unpacker.feed(firstchar) # Feed the first char we already requested - try: - while True: - buff = sock.recv(16*1024) - if not buff: break # Connection closed - self.last_recv_time = time.time() - self.incomplete_buff_recv += 1 - self.bytes_recv += len(buff) - self.server.bytes_recv += len(buff) - if not self.unpacker: - self.unpacker = msgpack.Unpacker() - self.unpacker.feed(buff) - for message in self.unpacker: - self.incomplete_buff_recv = 0 - self.handleMessage(message) - message = None - buf = None - except Exception, err: - if not self.closed: self.log("Socket error: %s" % Debug.formatException(err)) - self.close() # MessageLoop ended, close connection + if not self.closed: self.log("Socket error: %s" % Debug.formatException(err)) + self.close() # MessageLoop ended, close connection # My handshake info @@ -209,6 +140,7 @@ class Connection(object): self.port = 0 else: self.port = message["fileserver_port"] # Set peer fileserver port + self.event_connected.set(True) # Mark handshake as done else: self.log("Unknown response: %s" % message) elif message.get("cmd"): # Handhsake request @@ -218,6 +150,7 @@ class Connection(object): self.port = 0 else: self.port = self.handshake["fileserver_port"] # Set peer fileserver port + self.event_connected.set(True) # Mark handshake as done if config.debug_socket: self.log("Handshake request: %s" % message) data = self.handshakeInfo() data["cmd"] = "response" @@ -234,26 +167,17 @@ class Connection(object): # Send data to connection - def send(self, message): - if config.debug_socket: self.log("Send: %s, to: %s, site: %s, inner_path: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("params", {}).get("site"), message.get("params", {}).get("inner_path"), message.get("req_id"))) + def send(self, message, streaming=False): + if config.debug_socket: self.log("Send: %s, to: %s, streaming: %s, site: %s, inner_path: %s, req_id: %s" % (message.get("cmd"), message.get("to"), streaming, message.get("params", {}).get("site"), message.get("params", {}).get("inner_path"), message.get("req_id"))) self.last_send_time = time.time() - if self.protocol == "zeromq": - if self.zmq_sock: # Outgoing connection - self.zmq_queue.append(message) - if self.zmq_working: - self.log("ZeroMQ already working...") - return - while self.zmq_queue: - self.zmq_working = True - message = self.zmq_queue.pop(0) - self.zmq_sock.send(msgpack.packb(message)) - self.handleMessage(msgpack.unpackb(self.zmq_sock.recv())) - self.zmq_working = False - - else: # Incoming request - self.server.zmq_sock.send(msgpack.packb(message)) - else: # Normal connection + if streaming: + bytes_sent = StreamingMsgpack.stream(message, self.sock.sendall) + message = None + self.bytes_sent += bytes_sent + self.server.bytes_sent += bytes_sent + else: data = msgpack.packb(message) + message = None self.bytes_sent += len(data) self.server.bytes_sent += len(data) self.sock.sendall(data) @@ -292,8 +216,6 @@ class Connection(object): return True else: return False - - # Close connection @@ -309,10 +231,6 @@ class Connection(object): self.waiting_requests = {} self.server.removeConnection(self) # Remove connection from server registry try: - if self.forward_thread: - self.forward_thread.kill(exception=Debug.Notify("Closing connection")) - if self.zmq_sock: - self.zmq_sock.close() if self.sock: self.sock.shutdown(gevent.socket.SHUT_WR) self.sock.close() @@ -320,7 +238,5 @@ class Connection(object): if config.debug_socket: self.log("Close error: %s" % err) # Little cleanup - del self.unpacker - del self.sock self.sock = None self.unpacker = None diff --git a/src/Connection/ConnectionServer.py b/src/Connection/ConnectionServer.py index fdec9348..8801c61d 100644 --- a/src/Connection/ConnectionServer.py +++ b/src/Connection/ConnectionServer.py @@ -26,17 +26,12 @@ class ConnectionServer: self.bytes_recv = 0 self.bytes_sent = 0 - self.zmq_running = False - self.zmq_last_connection = None # Last incoming message client - self.peer_id = "-ZN0"+config.version.replace(".", "")+"-"+''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(12)) # Bittorrent style peerid if port: # Listen server on a port - self.zmq_port = port-1 self.pool = Pool(1000) # do not accept more than 1000 connections self.stream_server = StreamServer((ip.replace("*", ""), port), self.handleIncomingConnection, spawn=self.pool, backlog=100) if request_handler: self.handleRequest = request_handler - gevent.spawn(self.zmqServer) # Start ZeroMQ server for backward compatibility @@ -63,45 +58,48 @@ class ConnectionServer: - def getConnection(self, ip=None, port=None, peer_id=None): + def getConnection(self, ip=None, port=None, peer_id=None, create=True): if peer_id and peer_id in self.peer_ids: # Find connection by peer id connection = self.peer_ids.get(peer_id) - if not connection.connected: + if not connection.connected and create: succ = connection.event_connected.get() # Wait for connection if not succ: raise Exception("Connection event return error") return connection # Find connection by ip if ip in self.ips: connection = self.ips[ip] - if not connection.connected: + if not connection.connected and create: succ = connection.event_connected.get() # Wait for connection if not succ: raise Exception("Connection event return error") return connection # Recover from connection pool for connection in self.connections: if connection.ip == ip: - if not connection.connected: + if not connection.connected and create: succ = connection.event_connected.get() # Wait for connection if not succ: raise Exception("Connection event return error") return connection # No connection found - if port == 0: - raise Exception("This peer is not connectable") - try: - connection = Connection(self, ip, port) - self.ips[ip] = connection - self.connections.append(connection) - succ = connection.connect() - if not succ: - connection.close() - raise Exception("Connection event return error") + if create: # Allow to create new connection if not found + if port == 0: + raise Exception("This peer is not connectable") + try: + connection = Connection(self, ip, port) + self.ips[ip] = connection + self.connections.append(connection) + succ = connection.connect() + if not succ: + connection.close() + raise Exception("Connection event return error") - except Exception, err: - self.log.debug("%s Connect error: %s" % (ip, Debug.formatException(err))) - connection.close() - raise err - return connection + except Exception, err: + self.log.debug("%s Connect error: %s" % (ip, Debug.formatException(err))) + connection.close() + raise err + return connection + else: + return None @@ -132,12 +130,8 @@ class ConnectionServer: connection.close() elif idle > 20*60 and connection.last_send_time < time.time()-10: # Idle more than 20 min and we not send request in last 10 sec - if connection.protocol == "zeromq": - if idle > 50*60 and not connection.ping(): # Only ping every 50 sec - connection.close() - else: - if not connection.ping(): # send ping request - connection.close() + if not connection.ping(): # send ping request + connection.close() elif idle > 10 and connection.incomplete_buff_recv > 0: # Incompelte data with more than 10 sec idle connection.log("[Cleanup] Connection buff stalled") @@ -153,52 +147,6 @@ class ConnectionServer: - def zmqServer(self): - if config.disable_zeromq: - self.log.debug("ZeroMQ disabled by config") - return False - self.log.debug("Starting ZeroMQ on: tcp://127.0.0.1:%s..." % self.zmq_port) - try: - import zmq.green as zmq - context = zmq.Context() - self.zmq_sock = context.socket(zmq.REP) - self.zmq_sock.bind("tcp://127.0.0.1:%s" % self.zmq_port) - self.zmq_sock.hwm = 1 - self.zmq_sock.setsockopt(zmq.RCVTIMEO, 5000) # Wait for data receive - self.zmq_sock.setsockopt(zmq.SNDTIMEO, 50000) # Wait for data send - self.zmq_running = True - except Exception, err: - self.log.debug("ZeroMQ start error: %s" % Debug.formatException(err)) - return False - - while True: - try: - data = self.zmq_sock.recv() - if not data: break - message = msgpack.unpackb(data) - self.zmq_last_connection.handleMessage(message) - except Exception, err: - self.log.debug("ZMQ Server error: %s" % Debug.formatException(err)) - self.zmq_sock.send(msgpack.packb({"error": "%s" % err}, use_bin_type=True)) - - - # Forward incoming data to other socket - def forward(self, connection, source, dest): - data = True - try: - while data: - data = source.recv(16*1024) - self.zmq_last_connection = connection - if data: - dest.sendall(data) - else: - source.shutdown(socket.SHUT_RD) - dest.shutdown(socket.SHUT_WR) - except Exception, err: - self.log.debug("%s ZMQ forward error: %s" % (connection.ip, Debug.formatException(err))) - connection.close() - - # -- TESTING -- def testCreateServer(): diff --git a/src/Content/ContentManager.py b/src/Content/ContentManager.py index 2fdfe1a1..99d112fa 100644 --- a/src/Content/ContentManager.py +++ b/src/Content/ContentManager.py @@ -286,7 +286,7 @@ class ContentManager: if include_info.get("files_allowed"): for file_inner_path in content["files"].keys(): if not re.match("^%s$" % include_info["files_allowed"], file_inner_path): - self.log.error("%s: File not allowed: " % file_inner_path) + self.log.error("%s: File not allowed" % file_inner_path) return False return True # All good diff --git a/src/Debug/DebugReloader.py b/src/Debug/DebugReloader.py index 78c1fd5d..c4d4ae72 100644 --- a/src/Debug/DebugReloader.py +++ b/src/Debug/DebugReloader.py @@ -8,7 +8,7 @@ if config.debug: # Only load pyfilesytem if using debug mode pyfilesystem = OSFS("src") pyfilesystem_plugins = OSFS("plugins") except Exception, err: - logging.debug("%s: For autoreload please download pyfilesystem (https://code.google.com/p/pyfilesystem/)" % err) + logging.debug("%s: For autoreload please download pyfilesystem (https://code.google.com/p/pyfilesystem/) (only useful if you modifying ZeroNet source code)" % err) pyfilesystem = False else: pyfilesystem = False @@ -35,7 +35,7 @@ class DebugReloader: def changed(self, evt): - if not evt.path or evt.path.endswith("pyc") or time.time()-self.last_chaged < 1: return False # Ignore *.pyc changes and no reload within 1 sec + if not evt.path or "data/" in evt.path or evt.path.endswith("pyc") or time.time()-self.last_chaged < 1: return False # Ignore *.pyc changes and no reload within 1 sec #logging.debug("Changed: %s" % evt) time.sleep(0.1) # Wait for lock release self.callback() diff --git a/src/File/FileRequest.py b/src/File/FileRequest.py index 41234b5f..57942bcd 100644 --- a/src/File/FileRequest.py +++ b/src/File/FileRequest.py @@ -2,7 +2,7 @@ import os, msgpack, shutil, gevent, socket, struct, random from cStringIO import StringIO from Debug import Debug from Config import config -from util import RateLimit +from util import RateLimit, StreamingMsgpack FILE_BUFF = 1024*512 @@ -24,12 +24,12 @@ class FileRequest(object): return (socket.inet_ntoa(packed[0:4]), struct.unpack_from("H", packed, 4)[0]) - def send(self, msg): + def send(self, msg, streaming=False): if not self.connection.closed: - self.connection.send(msg) + self.connection.send(msg, streaming) - def response(self, msg): + def response(self, msg, streaming=False): if self.responded: self.log.debug("Req id %s already responded" % self.req_id) return @@ -38,7 +38,7 @@ class FileRequest(object): msg["cmd"] = "response" msg["to"] = self.req_id self.responded = True - self.send(msg) + self.send(msg, streaming=streaming) # Route file requests @@ -115,14 +115,15 @@ class FileRequest(object): try: file_path = site.storage.getPath(params["inner_path"]) if config.debug_socket: self.log.debug("Opening file: %s" % file_path) - file = open(file_path, "rb") - file.seek(params["location"]) - back = {} - back["body"] = file.read(FILE_BUFF) - back["location"] = file.tell() - back["size"] = os.fstat(file.fileno()).st_size - if config.debug_socket: self.log.debug("Sending file %s from position %s to %s" % (file_path, params["location"], back["location"])) - self.response(back) + with StreamingMsgpack.FilePart(file_path, "rb") as file: + file.seek(params["location"]) + file.read_bytes = FILE_BUFF + back = {} + back["body"] = file + back["size"] = os.fstat(file.fileno()).st_size + back["location"] = min(file.tell()+FILE_BUFF, back["size"]) + if config.debug_socket: self.log.debug("Sending file %s from position %s to %s" % (file_path, params["location"], back["location"])) + self.response(back, streaming=True) if config.debug_socket: self.log.debug("File %s sent" % file_path) # Add peer to site if not added before diff --git a/src/File/FileServer.py b/src/File/FileServer.py index b16e1421..f3b8338f 100644 --- a/src/File/FileServer.py +++ b/src/File/FileServer.py @@ -124,6 +124,7 @@ class FileServer(ConnectionServer): # Announce sites every 20 min def announceSites(self): + import gc while 1: time.sleep(20*60) # Announce sites every 20 min for address, site in self.sites.items(): @@ -145,6 +146,7 @@ class FileServer(ConnectionServer): time.sleep(2) # Prevent too quick request site = None + gc.collect() # Implicit grabage collection # Detects if computer back from wakeup diff --git a/src/Peer/Peer.py b/src/Peer/Peer.py index 17a82dc5..2a84423d 100644 --- a/src/Peer/Peer.py +++ b/src/Peer/Peer.py @@ -52,6 +52,16 @@ class Peer(object): self.onConnectionError() self.log("Getting connection error: %s (connection_error: %s, hash_failed: %s)" % (Debug.formatException(err), self.connection_error, self.hash_failed)) self.connection = None + + + # Check if we have connection to peer + def findConnection(self): + if self.connection and self.connection.connected: # We have connection to peer + return self.connection + else: # Try to find from other sites connections + self.connection = self.connection_server.getConnection(self.ip, self.port, create=False) # Do not create new connection if not found + return self.connection + def __str__(self): return "Peer %-12s" % self.ip @@ -174,7 +184,7 @@ class Peer(object): # List modified files since the date # Return: {inner_path: modification date,...} def listModified(self, since): - response = self.request("listModified", {"since": since}) + response = self.request("listModified", {"since": since, "site": self.site.address}) return response diff --git a/src/Site/Site.py b/src/Site/Site.py index 82ebaa57..b0f5aad9 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -159,11 +159,78 @@ class Site: return found + # Update worker, try to find client that supports listModifications command + def updater(self, peers_try, queried): + since = self.settings.get("modified", 60*60*24)-60*60*24 # Get modified since last update - 1day + while 1: + if not peers_try or len(queried) >= 3: # Stop after 3 successful query + break + peer = peers_try.pop(0) + if not peer.connection and len(queried) < 2: peer.connect() # Only open new connection if less than 2 queried already + if not peer.connection or peer.connection.handshake.get("rev",0) < 126: continue # Not compatible + res = peer.listModified(since) + if not res or not "modified_files" in res: continue # Failed query + + queried.append(peer) + for inner_path, modified in res["modified_files"].iteritems(): # Check if the peer has newer files than we + content = self.content_manager.contents.get(inner_path) + if not content or modified > content["modified"]: # We dont have this file or we have older + self.bad_files[inner_path] = self.bad_files.get(inner_path, 0)+1 # Mark as bad file + gevent.spawn(self.downloadContent, inner_path) # Download the content.json + the changed files + + + # Update content.json from peers and download changed files + # Return: None @util.Noparallel() - def update(self): + def update(self, announce=False): self.content_manager.loadContent("content.json") # Reload content.json - self.content_updated = None + self.content_updated = None # Reset content updated time + self.updateWebsocket(updating=True) + if announce: self.announce() + + peers_try = [] # Try these peers + queried = [] # Successfully queried from these peers + + peers = self.peers.values() + random.shuffle(peers) + for peer in peers: # Try to find connected good peers, but we must have at least 5 peers + if peer.findConnection() and peer.connection.handshake.get("rev",0) > 125: # Add to the beginning if rev125 + peers_try.insert(0, peer) + elif len(peers_try) < 5: # Backup peers, add to end of the try list + peers_try.append(peer) + + self.log.debug("Try to get listModifications from peers: %s" % peers_try) + + updaters = [] + for i in range(3): + updaters.append(gevent.spawn(self.updater, peers_try, queried)) + + gevent.joinall(updaters, timeout=5) # Wait 5 sec to workers + time.sleep(0.1) + self.log.debug("Queried listModifications from: %s" % queried) + + if not queried: # Not found any client that supports listModifications + self.log.debug("Fallback to old-style update") + self.redownloadContents() + + if not self.settings["own"]: self.storage.checkFiles(quick_check=True) # Quick check files based on file size + + changed = self.content_manager.loadContent("content.json") + if changed: + for changed_file in changed: + self.bad_files[changed_file] = self.bad_files.get(changed_file, 0)+1 + + if self.bad_files: + self.download() + + self.settings["size"] = self.content_manager.getTotalSize() # Update site size + self.updateWebsocket(updated=True) + + + # Update site by redownload all content.json + def redownloadContents(self): + # Download all content.json again content_threads = [] for inner_path in self.content_manager.contents.keys(): @@ -172,18 +239,6 @@ class Site: self.log.debug("Waiting %s content.json to finish..." % len(content_threads)) gevent.joinall(content_threads) - changed = self.content_manager.loadContent("content.json") - if changed: - for changed_file in changed: - self.bad_files[changed_file] = self.bad_files.get(changed_file, 0)+1 - if not self.settings["own"]: self.storage.checkFiles(quick_check=True) # Quick check files based on file size - - if self.bad_files: - self.download() - - self.settings["size"] = self.content_manager.getTotalSize() # Update site size - return changed - # Publish worker def publisher(self, inner_path, peers, published, limit, event_done=None): @@ -364,13 +419,18 @@ class Site: 'uploaded': 0, 'downloaded': 0, 'left': 0, 'compact': 1, 'numwant': 30, 'event': 'started' } + req = None try: url = "http://"+ip+"?"+urllib.urlencode(params) # Load url - opener = urllib2.build_opener() - response = opener.open(url, timeout=10).read() + req = urllib2.urlopen(url, timeout=10) + response = req.read() + req.fp._sock.recv=None # Hacky avoidance of memory leak for older python versions + req.close() + req = None # Decode peers peer_data = bencode.decode(response)["peers"] + response = None peer_count = len(peer_data) / 6 peers = [] for peer_offset in xrange(peer_count): @@ -381,6 +441,9 @@ class Site: except Exception, err: self.log.debug("Http tracker %s error: %s" % (url, err)) errors.append("%s://%s" % (protocol, ip)) + if req: + req.close() + req = None continue # Adding peers @@ -411,8 +474,8 @@ class Site: # Keep connections to get the updates (required for passive clients) - def needConnections(self): - need = min(len(self.peers), 3) # Need 3 peer, but max total peers + def needConnections(self, num=3): + need = min(len(self.peers), num) # Need 3 peer, but max total peers connected = 0 for peer in self.peers.values(): # Check current connected number diff --git a/src/Test/test.py b/src/Test/test.py index 7a93da22..8067ebd3 100644 --- a/src/Test/test.py +++ b/src/Test/test.py @@ -11,12 +11,14 @@ class TestCase(unittest.TestCase): urllib.urlopen("http://127.0.0.1:43110").read() except Exception, err: raise unittest.SkipTest(err) - self.assertIn("Not Found", urllib.urlopen("http://127.0.0.1:43110/media//sites.json").read()) self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/./sites.json").read()) self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/../config.py").read()) - self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/1P2rJhkQjYSHdHpWDDwxfRGYXaoWE8u1vV/../sites.json").read()) - self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/1P2rJhkQjYSHdHpWDDwxfRGYXaoWE8u1vV/..//sites.json").read()) - self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/1P2rJhkQjYSHdHpWDDwxfRGYXaoWE8u1vV/../../config.py").read()) + self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../sites.json").read()) + self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/..//sites.json").read()) + self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../../zeronet.py").read()) + self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../sites.json").read()) + self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/..//sites.json").read()) + self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../../zeronet.py").read()) def testBitcoinSignOld(self): @@ -65,7 +67,7 @@ class TestCase(unittest.TestCase): def testBitcoinSignCompressed(self): - raise unittest.SkipTest("Not working") + raise unittest.SkipTest("Not supported yet") s = time.time() privatekey = "Kwg4YXhL5gsNwarFWtzTKuUiwAhKbZAgWdpFo1UETZSKdgHaNN2J" privatekey_bad = "Kwg4YXhL5gsNwarFWtzTKuUiwAhKsZAgWdpFo1UETZSKdgHaNN2J" @@ -153,11 +155,61 @@ class TestCase(unittest.TestCase): print "ok" + def testContentManagerIncludes(self): + from Site import Site + from cStringIO import StringIO + import json + site = Site("1TaLk3zM7ZRskJvrh3ZNCDVGXvkJusPKQ") + # Include info + include_info = site.content_manager.getIncludeInfo("data/users/1BhcaqWViN1YBnNgXb5aq5NtEhKtKdKZMB/content.json") + self.assertEqual(include_info["signers"], ['1BhcaqWViN1YBnNgXb5aq5NtEhKtKdKZMB']) + self.assertEqual(include_info["user_name"], 'testuser4') + self.assertEqual(include_info["max_size"], 10000) + self.assertEqual(include_info["includes_allowed"], False) + self.assertEqual(include_info["files_allowed"], 'data.json') + # Valid signers + self.assertEqual( + site.content_manager.getValidSigners("data/users/1BhcaqWViN1YBnNgXb5aq5NtEhKtKdKZMB/content.json"), + ['1BhcaqWViN1YBnNgXb5aq5NtEhKtKdKZMB', '1TaLk3zM7ZRskJvrh3ZNCDVGXvkJusPKQ'] + ) + self.assertEqual(site.content_manager.getValidSigners("data/content.json"), ['1TaLk3zM7ZRskJvrh3ZNCDVGXvkJusPKQ']) + self.assertEqual(site.content_manager.getValidSigners("content.json"), ['1TaLk3zM7ZRskJvrh3ZNCDVGXvkJusPKQ']) + + # Data validation + data_dict = { + "files": { + "data.json": { + "sha512": "be589f313e7b2d8b9b41280e603e8ba72c3f74d3cfdb771a7c418a0a64598035", + "size": 216 + } + }, + "modified": 1428591454.423, + "signs": { + "1BhcaqWViN1YBnNgXb5aq5NtEhKtKdKZMB": "HM1sv686/aIdgqyFF2t0NmZY5pv1TALo6H0zOmOJ63VOnNg2LSCpbuubb+IcHTUIJq3StUDo6okczJDeowyjOUo=" + } + } + # Normal data + data = StringIO(json.dumps(data_dict)) + self.assertEqual(site.content_manager.verifyFile("data/users/1BhcaqWViN1YBnNgXb5aq5NtEhKtKdKZMB/content.json", data, ignore_same=False), True) + # Too large + data_dict["files"]["data.json"]["size"] = 200000 + data = StringIO(json.dumps(data_dict)) + self.assertEqual(site.content_manager.verifyFile("data/users/1BhcaqWViN1YBnNgXb5aq5NtEhKtKdKZMB/content.json", data, ignore_same=False), False) + data_dict["files"]["data.json"]["size"] = 216 # Reset + # Not allowed file + data_dict["files"]["data.html"] = data_dict["files"]["data.json"] + data = StringIO(json.dumps(data_dict)) + self.assertEqual(site.content_manager.verifyFile("data/users/1BhcaqWViN1YBnNgXb5aq5NtEhKtKdKZMB/content.json", data, ignore_same=False), False) + del data_dict["files"]["data.html"] # Reset + # Should work again + data = StringIO(json.dumps(data_dict)) + self.assertEqual(site.content_manager.verifyFile("data/users/1BhcaqWViN1YBnNgXb5aq5NtEhKtKdKZMB/content.json", data, ignore_same=False), True) - if __name__ == "__main__": - unittest.main(verbosity=2) + import logging + logging.getLogger().setLevel(level=logging.CRITICAL) + unittest.main(verbosity=2, defaultTest="TestCase.testContentManagerIncludes") diff --git a/src/Ui/UiWebsocket.py b/src/Ui/UiWebsocket.py index 83dacaf1..83a4b0ac 100644 --- a/src/Ui/UiWebsocket.py +++ b/src/Ui/UiWebsocket.py @@ -386,7 +386,7 @@ class UiWebsocket(object): if site: site.settings["serving"] = True site.saveSettings() - gevent.spawn(site.update) + gevent.spawn(site.update, announce=True) time.sleep(0.001) # Wait for update thread starting site.updateWebsocket() else: diff --git a/src/main.py b/src/main.py index a2230b10..5822c0d1 100644 --- a/src/main.py +++ b/src/main.py @@ -51,7 +51,6 @@ if config.proxy: from util import SocksProxy import urllib2 logging.info("Patching sockets to socks proxy: %s" % config.proxy) - config.disable_zeromq = True # ZeroMQ doesnt support proxy config.fileserver_ip = '127.0.0.1' # Do not accept connections anywhere but localhost SocksProxy.monkeyPath(*config.proxy.split(":")) @@ -200,8 +199,8 @@ class Actions: site.announce() # Gather peers site.publish(20, inner_path) # Push to 20 peers time.sleep(3) - logging.info("Serving files...") - gevent.joinall([file_server_thread]) + logging.info("Serving files (max 60s)...") + gevent.joinall([file_server_thread], timeout=60) logging.info("Done.") diff --git a/src/util/StreamingMsgpack.py b/src/util/StreamingMsgpack.py new file mode 100644 index 00000000..b869d814 --- /dev/null +++ b/src/util/StreamingMsgpack.py @@ -0,0 +1,36 @@ +import msgpack, os, struct + + +def msgpackHeader(size): + if size <= 2**8-1: + return b"\xc4" + struct.pack("B", size) + elif size <= 2**16-1: + return b"\xc5" + struct.pack(">H", size) + elif size <= 2**32-1: + return b"\xc6" + struct.pack(">I", size) + else: + raise Exception("huge binary string") + + +def stream(data, writer): + packer = msgpack.Packer() + writer(packer.pack_map_header(len(data))) + for key, val in data.iteritems(): + writer(packer.pack(key)) + if issubclass(type(val), file): # File obj + max_size = os.fstat(val.fileno()).st_size-val.tell() + size = min(max_size, val.read_bytes) + bytes_left = size + writer(msgpackHeader(size)) + buff = 1024*64 + while 1: + writer(val.read(min(bytes_left, buff))) + bytes_left = bytes_left-buff + if bytes_left <= 0: break + else: # Simple + writer(packer.pack(val)) + return size + + +class FilePart(file): + pass \ No newline at end of file