diff --git a/src/Connection/Connection.py b/src/Connection/Connection.py index 33dd1566..8944e1d7 100644 --- a/src/Connection/Connection.py +++ b/src/Connection/Connection.py @@ -15,15 +15,15 @@ class Connection: self.port = port self.peer_id = None # Bittorrent style peer id (not used yet) self.id = server.last_connection_id - self.protocol = "?" server.last_connection_id += 1 + self.protocol = "?" self.server = server self.log = logging.getLogger(str(self)) self.unpacker = msgpack.Unpacker() # Stream incoming socket messages here self.req_id = 0 # Last request id self.handshake = None # Handshake info got from peer - self.event_handshake = gevent.event.AsyncResult() # Solves on handshake received + self.event_connected = gevent.event.AsyncResult() # Solves on handshake received self.closed = False self.zmq_sock = None # Zeromq sock if outgoing connection @@ -31,8 +31,19 @@ class Connection: self.zmq_working = False # Zmq currently working, just add to queue self.forward_thread = None # Zmq forwarder thread + # Stats + self.start_time = time.time() + self.last_recv_time = 0 + self.last_message_time = 0 + self.last_send_time = 0 + self.last_sent_time = 0 + self.incomplete_buff_recv = 0 + self.bytes_recv = 0 + self.bytes_sent = 0 + self.last_ping_delay = None + self.last_req_time = 0 + self.waiting_requests = {} # Waiting sent requests - if not sock: self.connect() # Not an incoming connection, connect to peer def __str__(self): @@ -44,12 +55,13 @@ class Connection: # Open connection to peer and wait for handshake def connect(self): + self.log.debug("Connecting...") self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((self.ip, self.port)) # Detect protocol self.send({"cmd": "handshake", "req_id": 0, "params": self.handshakeInfo()}) gevent.spawn(self.messageLoop) - return self.event_handshake.get() # Wait for handshake + return self.event_connected.get() # Wait for first char @@ -61,7 +73,7 @@ class Connection: self.protocol = "zeromq" self.log.name = str(self) - self.event_handshake.set(self.protocol) + self.event_connected.set(self.protocol) if self.server.zmq_running: zmq_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -80,14 +92,19 @@ class Connection: # Message loop for connection def messageLoop(self, firstchar=None): sock = self.sock - if not firstchar: firstchar = sock.recv(1) + try: + if not firstchar: firstchar = sock.recv(1) + except Exception, err: + self.log.debug("Socket firstchar error: %s" % Debug.formatException(err)) + self.close() + return False if firstchar == "\xff": # Backward compatibility to zmq self.sock.close() # Close normal socket if zmq: if config.debug_socket: self.log.debug("Connecting as ZeroMQ") self.protocol = "zeromq" self.log.name = str(self) - self.event_handshake.set(self.protocol) # Mark handshake as done + self.event_connected.set(self.protocol) # Mark handshake as done try: context = zmq.Context() @@ -105,7 +122,7 @@ class Connection: else: # Normal socket self.protocol = "v2" self.log.name = str(self) - self.event_handshake.set(self.protocol) # Mark handshake as done + self.event_connected.set(self.protocol) # Mark handshake as done unpacker = self.unpacker unpacker.feed(firstchar) # Feed the first char we already requested @@ -113,31 +130,18 @@ class Connection: while True: buff = sock.recv(16*1024) if not buff: break # Connection closed + self.last_recv_time = time.time() + self.incomplete_buff_recv += 1 + self.bytes_recv += len(buff) unpacker.feed(buff) for message in unpacker: + self.incomplete_buff_recv = 0 self.handleMessage(message) except Exception, err: self.log.debug("Socket error: %s" % Debug.formatException(err)) self.close() # MessageLoop ended, close connection - # Read one line (not used) - def recvLine(self): - sock = self.sock - data = sock.recv(16*1024) - if not data: return - if not data.endswith("\n"): # Multipart, read until \n - buff = StringIO() - buff.write(data) - while not data.endswith("\n"): - data = sock.recv(16*1024) - if not data: break - buff.write(data) - return buff.getvalue().strip("\n") - - return data.strip("\n") - - # My handshake info def handshakeInfo(self): return { @@ -150,12 +154,15 @@ class Connection: # Handle incoming message def handleMessage(self, message): + self.last_message_time = time.time() if message.get("cmd") == "response": # New style response if message["to"] in self.waiting_requests: self.waiting_requests[message["to"]].set(message) # Set the response to event del self.waiting_requests[message["to"]] elif message["to"] == 0: # Other peers handshake - if config.debug_socket: self.log.debug("Got handshake response: %s" % message) + ping = time.time()-self.start_time + if config.debug_socket: self.log.debug("Got handshake response: %s, ping: %s" % (message, ping)) + self.last_ping_delay = ping self.handshake = message self.port = message["fileserver_port"] # Set peer fileserver port else: @@ -180,29 +187,35 @@ class Connection: # Send data to connection - def send(self, data): - if config.debug_socket: self.log.debug("Send: %s" % data.get("cmd")) + def send(self, message): + if config.debug_socket: self.log.debug("Send: %s, to: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("req_id"))) + self.last_send_time = time.time() if self.protocol == "zeromq": if self.zmq_sock: # Outgoing connection - self.zmq_queue.append(data) + self.zmq_queue.append(message) if self.zmq_working: self.log.debug("ZeroMQ already working...") return while self.zmq_queue: self.zmq_working = True - data = self.zmq_queue.pop(0) - self.zmq_sock.send(msgpack.packb(data)) + message = self.zmq_queue.pop(0) + self.zmq_sock.send(msgpack.packb(message)) self.handleMessage(msgpack.unpackb(self.zmq_sock.recv())) self.zmq_working = False else: # Incoming request - self.server.zmq_sock.send(msgpack.packb(data)) + self.server.zmq_sock.send(msgpack.packb(message)) else: # Normal connection - self.sock.sendall(msgpack.packb(data)) + data = msgpack.packb(message) + self.bytes_sent += len(data) + self.sock.sendall(data) + self.last_sent_time = time.time() + if config.debug_socket: self.log.debug("Sent: %s, to: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("req_id"))) # Create and send a request to peer def request(self, cmd, params={}): + self.last_req_time = time.time() self.req_id += 1 data = {"cmd": cmd, "req_id": self.req_id, "params": params} event = gevent.event.AsyncResult() # Create new event for response diff --git a/src/Connection/ConnectionServer.py b/src/Connection/ConnectionServer.py index e8fcb22f..345ad927 100644 --- a/src/Connection/ConnectionServer.py +++ b/src/Connection/ConnectionServer.py @@ -13,7 +13,7 @@ class ConnectionServer: self.ip = ip self.port = port self.last_connection_id = 1 # Connection id incrementer - self.log = logging.getLogger(__name__) + self.log = logging.getLogger("ConnServer") self.connections = [] # Connections self.ips = {} # Connection by ip @@ -57,18 +57,25 @@ class ConnectionServer: - def connect(self, ip=None, port=None, peer_id=None): + def getConnection(self, ip=None, port=None, peer_id=None): if peer_id and peer_id in self.peer_ids: # Find connection by peer id - return self.peer_ids.get(peer_id) + connection = self.peer_ids.get(peer_id) + connection.event_connected.get() # Wait for connection + return connection if ip in self.ips: # Find connection by ip - return self.ips[ip] + connection = self.ips[ip] + connection.event_connected.get() # Wait for connection + return connection + # No connection found yet try: connection = Connection(self, ip, port) self.ips[ip] = connection self.connections.append(connection) + connection.connect() except Exception, err: self.log.debug("%s Connect error: %s" % (ip, Debug.formatException(err))) + connection.close() raise err return connection @@ -77,10 +84,10 @@ class ConnectionServer: def removeConnection(self, connection): if self.ips.get(connection.ip) == connection: # Delete if same as in registry del self.ips[connection.ip] - if connection in self.connections: - self.connections.remove(connection) if connection.peer_id and self.peer_ids.get(connection.peer_id) == connection: # Delete if same as in registry del self.peer_ids[connection.peer_id] + if connection in self.connections: + self.connections.remove(connection) def zmqServer(self): @@ -204,7 +211,7 @@ def testZmqSlowClient(num): def testConnection(): global server - connection = server.connect("127.0.0.1", 1234) + connection = server.getConnection("127.0.0.1", 1234) connection.send({"res": "Sending: Hello!"}) print connection diff --git a/src/File/FileServer.py b/src/File/FileServer.py index d5f2d763..98f0377e 100644 --- a/src/File/FileServer.py +++ b/src/File/FileServer.py @@ -22,9 +22,9 @@ class FileServer(ConnectionServer): # Handle request to fileserver def handleRequest(self, connection, message): if "params" in message: - self.log.debug("FileRequest: %s %s %s" % (message["cmd"], message["params"].get("site"), message["params"].get("inner_path"))) + self.log.debug("FileRequest: %s %s %s %s" % (str(connection), message["cmd"], message["params"].get("site"), message["params"].get("inner_path"))) else: - self.log.debug("FileRequest: %s" % req["cmd"]) + self.log.debug("FileRequest: %s %s" % (str(connection), req["cmd"])) req = FileRequest(self, connection) req.route(message["cmd"], message.get("req_id"), message.get("params")) diff --git a/src/Peer/Peer.py b/src/Peer/Peer.py index 0c4cc8a5..deed0dc1 100644 --- a/src/Peer/Peer.py +++ b/src/Peer/Peer.py @@ -28,15 +28,18 @@ class Peer: # Connect to host def connect(self): - if self.connection: self.connection.close() + if not self.log: self.log = logging.getLogger("Peer:%s:%s %s" % (self.ip, self.port, self.site.address_short)) + if self.connection: + self.log.debug("Getting connection (Closing %s)..." % self.connection) + self.connection.close() + else: + self.log.debug("Getting connection...") self.connection = None - if not self.log: self.log = logging.getLogger("Peer:%s:%s" % (self.ip, self.port)) - self.log.debug("Connecting...") try: - self.connection = self.connection_server.connect(self.ip, self.port) + self.connection = self.connection_server.getConnection(self.ip, self.port) except Exception, err: - self.log.debug("Connecting error: %s" % Debug.formatException(err)) + self.log.debug("Getting connection error: %s" % Debug.formatException(err)) self.onConnectionError() def __str__(self): @@ -118,6 +121,7 @@ class Peer: break # All fine, exit from for loop # Timeout reached or bad response self.onConnectionError() + self.connect() time.sleep(1) if response_time: diff --git a/src/Site/Site.py b/src/Site/Site.py index df71ff6e..201955c1 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -416,6 +416,9 @@ class Site: self.needFile("content.json", update=True) # Force update to fix corrupt file self.content_manager.loadContent() # Reload content.json for content_inner_path, content in self.content_manager.contents.items(): + if not os.path.isfile(self.getPath(content_inner_path)): # Missing content.json file + self.log.error("[MISSING] %s" % content_inner_path) + bad_files.append(content_inner_path) for file_relative_path in content["files"].keys(): file_inner_path = self.content_manager.toDir(content_inner_path)+file_relative_path # Relative to content.json file_inner_path = file_inner_path.strip("/") # Strip leading / diff --git a/src/Ui/UiRequest.py b/src/Ui/UiRequest.py index 91f4cfdd..70631c17 100644 --- a/src/Ui/UiRequest.py +++ b/src/Ui/UiRequest.py @@ -276,22 +276,54 @@ class UiRequest: raise Exception("Here is your console") + def formatTableRow(self, row): + back = [] + for format, val in row: + if val == None: + formatted = "n/a" + elif format == "since": + if val: + formatted = "%.0f" % (time.time()-val) + else: + formatted = "n/a" + else: + formatted = format % val + back.append("%s" % formatted) + return "%s" % "".join(back) + def actionStats(self): import gc, sys from greenlet import greenlet greenlets = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)] self.sendHeader() main = sys.modules["src.main"] + yield """ + + """ - yield "
"
 		yield "Connections (%s):
" % len(main.file_server.connections) + yield "" for connection in main.file_server.connections: - yield "%s: %s %s
" % (connection.protocol, connection.ip, connection.zmq_sock) + yield self.formatTableRow([ + ("%3d", connection.id), + ("%s", connection.protocol), + ("%s", connection.ip), + ("%s", bool(connection.zmq_sock)), + ("%6.3f", connection.last_ping_delay), + ("%s", connection.incomplete_buff_recv), + ("since", max(connection.last_send_time, connection.last_recv_time)), + ("%.3f", connection.last_sent_time-connection.last_send_time), + ("%.0fkB", connection.bytes_sent/1024), + ("%.0fkB", connection.bytes_recv/1024) + ]) + yield "
id protocol ip zmqs ping buff idle delay sent received
" yield "Greenlets (%s):
" % len(greenlets) for thread in greenlets: yield " - %s
" % cgi.escape(repr(thread)) - yield "
" # - Tests - diff --git a/src/Ui/UiWebsocket.py b/src/Ui/UiWebsocket.py index dec12fb5..629a41e8 100644 --- a/src/Ui/UiWebsocket.py +++ b/src/Ui/UiWebsocket.py @@ -15,6 +15,8 @@ class UiWebsocket: self.next_message_id = 1 self.waiting_cb = {} # Waiting for callback. Key: message_id, Value: function pointer self.channels = [] # Channels joined to + self.sending = False # Currently sending to client + self.send_queue = [] # Messages to send to client # Start listener loop @@ -69,10 +71,16 @@ class UiWebsocket: def send(self, message, cb = None): message["id"] = self.next_message_id # Add message id to allow response self.next_message_id += 1 + if cb: # Callback after client responsed + self.waiting_cb[message["id"]] = cb + if self.sending: return # Already sending + self.send_queue.append(message) try: - self.ws.send(json.dumps(message)) - if cb: # Callback after client responsed - self.waiting_cb[message["id"]] = cb + while self.send_queue: + self.sending = True + message = self.send_queue.pop(0) + self.ws.send(json.dumps(message)) + self.sending = False except Exception, err: self.log.debug("Websocket send error: %s" % Debug.formatException(err)) @@ -177,7 +185,7 @@ class UiWebsocket: "next_size_limit": site.getNextSizeLimit(), "last_downloads": len(site.last_downloads), "peers": site.settings.get("peers", len(site.peers)), - "tasks": len([task["inner_path"] for task in site.worker_manager.tasks]), + "tasks": len(site.worker_manager.tasks), "content": content } if site.settings["serving"] and content: ret["peers"] += 1 # Add myself if serving diff --git a/src/Ui/media/Wrapper.coffee b/src/Ui/media/Wrapper.coffee index 2b87f6c1..909b7539 100644 --- a/src/Ui/media/Wrapper.coffee +++ b/src/Ui/media/Wrapper.coffee @@ -249,6 +249,10 @@ class Wrapper @ws.cmd "siteSetLimit", [site_info.next_size_limit], (res) => @notifications.add("size_limit", "done", res, 5000) return false + + if @loading.screen_visible and @inner_loaded and site_info.settings.size < site_info.size_limit*1024*1024 # Loading screen still visible, but inner loaded + @loading.hideScreen() + @site_info = site_info diff --git a/src/Ui/media/all.js b/src/Ui/media/all.js index 401459dd..11807a5c 100644 --- a/src/Ui/media/all.js +++ b/src/Ui/media/all.js @@ -1057,6 +1057,9 @@ jQuery.extend( jQuery.easing, })(this)); } } + if (this.loading.screen_visible && this.inner_loaded && site_info.settings.size < site_info.size_limit * 1024 * 1024) { + this.loading.hideScreen(); + } return this.site_info = site_info; }; diff --git a/src/User/UserManager.py b/src/User/UserManager.py index 2926027c..805b50ea 100644 --- a/src/User/UserManager.py +++ b/src/User/UserManager.py @@ -58,8 +58,9 @@ def getCurrent(): # Debug: Reload User.py def reload(): - import imp + return False # Disabled + """import imp global users, User User = imp.load_source("User", "src/User/User.py").User # Reload source users.clear() # Remove all items - load() + load()"""