diff --git a/src/Config.py b/src/Config.py index 2b0b0538..af277082 100644 --- a/src/Config.py +++ b/src/Config.py @@ -3,7 +3,7 @@ import ConfigParser class Config(object): def __init__(self): - self.version = "0.2.3" + self.version = "0.2.4" self.parser = self.createArguments() argv = sys.argv[:] # Copy command line arguments argv = self.parseConfig(argv) # Add arguments from config file @@ -52,6 +52,19 @@ class Config(object): action = subparsers.add_parser("siteVerify", help='Verify site files using sha512: address') action.add_argument('address', help='Site to verify') + # PeerPing + action = subparsers.add_parser("peerPing", help='Send Ping command to peer') + action.add_argument('peer_ip', help='Peer ip') + action.add_argument('peer_port', help='Peer port') + + # PeerGetFile + action = subparsers.add_parser("peerGetFile", help='Request and print a file content from peer') + action.add_argument('peer_ip', help='Peer ip') + action.add_argument('peer_port', help='Peer port') + action.add_argument('site', help='Site address') + action.add_argument('filename', help='File name to request') + + # Config parameters parser.add_argument('--debug', help='Debug mode', action='store_true') diff --git a/src/Connection/Connection.py b/src/Connection/Connection.py new file mode 100644 index 00000000..33dd1566 --- /dev/null +++ b/src/Connection/Connection.py @@ -0,0 +1,234 @@ +import logging, socket, time +from cStringIO import StringIO +import gevent, msgpack +from Config import config +from Debug import Debug +try: + import zmq.green as zmq +except: + zmq = None + +class Connection: + def __init__(self, server, ip, port, sock=None): + self.sock = sock + self.ip = ip + self.port = port + self.peer_id = None # Bittorrent style peer id (not used yet) + self.id = server.last_connection_id + self.protocol = "?" + server.last_connection_id += 1 + + self.server = server + self.log = logging.getLogger(str(self)) + self.unpacker = msgpack.Unpacker() # Stream incoming socket messages here + self.req_id = 0 # Last request id + self.handshake = None # Handshake info got from peer + self.event_handshake = gevent.event.AsyncResult() # Solves on handshake received + self.closed = False + + self.zmq_sock = None # Zeromq sock if outgoing connection + self.zmq_queue = [] # Messages queued to send + self.zmq_working = False # Zmq currently working, just add to queue + self.forward_thread = None # Zmq forwarder thread + + self.waiting_requests = {} # Waiting sent requests + if not sock: self.connect() # Not an incoming connection, connect to peer + + + def __str__(self): + return "Conn#%2s %-12s [%s]" % (self.id, self.ip, self.protocol) + + def __repr__(self): + return "<%s>" % self.__str__() + + + # Open connection to peer and wait for handshake + def connect(self): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.connect((self.ip, self.port)) + # Detect protocol + self.send({"cmd": "handshake", "req_id": 0, "params": self.handshakeInfo()}) + gevent.spawn(self.messageLoop) + return self.event_handshake.get() # Wait for handshake + + + + # Handle incoming connection + def handleIncomingConnection(self, sock): + firstchar = sock.recv(1) # Find out if pure socket or zeromq + if firstchar == "\xff": # Backward compatiblity: forward data to zmq + if config.debug_socket: self.log.debug("Fallback incoming connection to ZeroMQ") + + self.protocol = "zeromq" + self.log.name = str(self) + self.event_handshake.set(self.protocol) + + if self.server.zmq_running: + zmq_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + zmq_sock.connect(("127.0.0.1", self.server.zmq_port)) + zmq_sock.send(firstchar) + + self.forward_thread = gevent.spawn(self.server.forward, self, zmq_sock, sock) + self.server.forward(self, sock, zmq_sock) + self.close() # Forward ended close connection + else: + self.config.debug("ZeroMQ Server not running, exiting!") + else: # Normal socket + self.messageLoop(firstchar) + + + # Message loop for connection + def messageLoop(self, firstchar=None): + sock = self.sock + if not firstchar: firstchar = sock.recv(1) + if firstchar == "\xff": # Backward compatibility to zmq + self.sock.close() # Close normal socket + if zmq: + if config.debug_socket: self.log.debug("Connecting as ZeroMQ") + self.protocol = "zeromq" + self.log.name = str(self) + self.event_handshake.set(self.protocol) # Mark handshake as done + + try: + context = zmq.Context() + zmq_sock = context.socket(zmq.REQ) + zmq_sock.hwm = 1 + zmq_sock.setsockopt(zmq.RCVTIMEO, 50000) # Wait for data arrive + zmq_sock.setsockopt(zmq.SNDTIMEO, 5000) # Wait for data send + zmq_sock.setsockopt(zmq.LINGER, 500) # Wait for zmq_sock close + zmq_sock.connect('tcp://%s:%s' % (self.ip, self.port)) + self.zmq_sock = zmq_sock + except Exception, err: + self.log.debug("Socket error: %s" % Debug.formatException(err)) + else: + return False # No zeromq connection supported + else: # Normal socket + self.protocol = "v2" + self.log.name = str(self) + self.event_handshake.set(self.protocol) # Mark handshake as done + + unpacker = self.unpacker + unpacker.feed(firstchar) # Feed the first char we already requested + try: + while True: + buff = sock.recv(16*1024) + if not buff: break # Connection closed + unpacker.feed(buff) + for message in unpacker: + self.handleMessage(message) + except Exception, err: + self.log.debug("Socket error: %s" % Debug.formatException(err)) + self.close() # MessageLoop ended, close connection + + + # Read one line (not used) + def recvLine(self): + sock = self.sock + data = sock.recv(16*1024) + if not data: return + if not data.endswith("\n"): # Multipart, read until \n + buff = StringIO() + buff.write(data) + while not data.endswith("\n"): + data = sock.recv(16*1024) + if not data: break + buff.write(data) + return buff.getvalue().strip("\n") + + return data.strip("\n") + + + # My handshake info + def handshakeInfo(self): + return { + "version": config.version, + "protocol": "v2", + "peer_id": self.server.peer_id, + "fileserver_port": config.fileserver_port + } + + + # Handle incoming message + def handleMessage(self, message): + if message.get("cmd") == "response": # New style response + if message["to"] in self.waiting_requests: + self.waiting_requests[message["to"]].set(message) # Set the response to event + del self.waiting_requests[message["to"]] + elif message["to"] == 0: # Other peers handshake + if config.debug_socket: self.log.debug("Got handshake response: %s" % message) + self.handshake = message + self.port = message["fileserver_port"] # Set peer fileserver port + else: + self.log.debug("Unknown response: %s" % message) + elif message.get("cmd"): # Handhsake request + if message["cmd"] == "handshake": + self.handshake = message["params"] + self.port = self.handshake["fileserver_port"] # Set peer fileserver port + if config.debug_socket: self.log.debug("Handshake request: %s" % message) + data = self.handshakeInfo() + data["cmd"] = "response" + data["to"] = message["req_id"] + self.send(data) + else: + self.server.handleRequest(self, message) + else: # Old style response, no req_id definied + if config.debug_socket: self.log.debug("Old style response, waiting: %s" % self.waiting_requests.keys()) + last_req_id = min(self.waiting_requests.keys()) # Get the oldest waiting request and set it true + self.waiting_requests[last_req_id].set(message) + del self.waiting_requests[last_req_id] # Remove from waiting request + + + + # Send data to connection + def send(self, data): + if config.debug_socket: self.log.debug("Send: %s" % data.get("cmd")) + if self.protocol == "zeromq": + if self.zmq_sock: # Outgoing connection + self.zmq_queue.append(data) + if self.zmq_working: + self.log.debug("ZeroMQ already working...") + return + while self.zmq_queue: + self.zmq_working = True + data = self.zmq_queue.pop(0) + self.zmq_sock.send(msgpack.packb(data)) + self.handleMessage(msgpack.unpackb(self.zmq_sock.recv())) + self.zmq_working = False + + else: # Incoming request + self.server.zmq_sock.send(msgpack.packb(data)) + else: # Normal connection + self.sock.sendall(msgpack.packb(data)) + + + # Create and send a request to peer + def request(self, cmd, params={}): + self.req_id += 1 + data = {"cmd": cmd, "req_id": self.req_id, "params": params} + event = gevent.event.AsyncResult() # Create new event for response + self.waiting_requests[self.req_id] = event + self.send(data) # Send request + res = event.get() # Wait until event solves + + return res + + + # Close connection + def close(self): + if self.closed: return False # Already closed + self.closed = True + if config.debug_socket: self.log.debug("Closing connection, waiting_requests: %s..." % len(self.waiting_requests)) + for request in self.waiting_requests.values(): # Mark pending requests failed + request.set(False) + self.waiting_requests = {} + self.server.removeConnection(self) # Remove connection from server registry + try: + if self.forward_thread: + self.forward_thread.kill(exception=Debug.Notify("Closing connection")) + if self.zmq_sock: + self.zmq_sock.close() + if self.sock: + self.sock.shutdown(gevent.socket.SHUT_WR) + self.sock.close() + except Exception, err: + if config.debug_socket: self.log.debug("Close error: %s" % Debug.formatException(err)) diff --git a/src/Connection/ConnectionBenchmark.py b/src/Connection/ConnectionBenchmark.py new file mode 100644 index 00000000..5605398d --- /dev/null +++ b/src/Connection/ConnectionBenchmark.py @@ -0,0 +1,136 @@ +import time, socket, msgpack +from cStringIO import StringIO + +print "Connecting..." +sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +sock.connect(("localhost", 1234)) + + +print "1 Threaded: Send, receive 10000 ping request...", +s = time.time() +for i in range(10000): + sock.sendall(msgpack.packb({"cmd": "Ping"})) + req = sock.recv(16*1024) +print time.time()-s, repr(req), time.time()-s + + +print "1 Threaded: Send, receive, decode 10000 ping request...", +s = time.time() +unpacker = msgpack.Unpacker() +reqs = 0 +for i in range(10000): + sock.sendall(msgpack.packb({"cmd": "Ping"})) + unpacker.feed(sock.recv(16*1024)) + for req in unpacker: + reqs += 1 +print "Found:", req, "x", reqs, time.time()-s + + +print "1 Threaded: Send, receive, decode, reconnect 1000 ping request...", +s = time.time() +unpacker = msgpack.Unpacker() +reqs = 0 +for i in range(1000): + sock.sendall(msgpack.packb({"cmd": "Ping"})) + unpacker.feed(sock.recv(16*1024)) + for req in unpacker: + reqs += 1 + sock.close() + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(("localhost", 1234)) +print "Found:", req, "x", reqs, time.time()-s + + +print "1 Threaded: Request, receive, decode 10000 x 10k data request...", +s = time.time() +unpacker = msgpack.Unpacker() +reqs = 0 +for i in range(10000): + sock.sendall(msgpack.packb({"cmd": "Bigdata"})) + + """buff = StringIO() + data = sock.recv(16*1024) + buff.write(data) + if not data: + break + while not data.endswith("\n"): + data = sock.recv(16*1024) + if not data: break + buff.write(data) + req = msgpack.unpackb(buff.getvalue().strip("\n")) + reqs += 1""" + + req_found = False + while not req_found: + buff = sock.recv(16*1024) + unpacker.feed(buff) + for req in unpacker: + reqs += 1 + req_found = True + break # Only process one request +print "Found:", len(req["res"]), "x", reqs, time.time()-s + + +print "10 Threaded: Request, receive, decode 10000 x 10k data request...", +import gevent +s = time.time() +reqs = 0 +req = None +def requester(): + global reqs, req + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(("localhost", 1234)) + unpacker = msgpack.Unpacker() + for i in range(1000): + sock.sendall(msgpack.packb({"cmd": "Bigdata"})) + + req_found = False + while not req_found: + buff = sock.recv(16*1024) + unpacker.feed(buff) + for req in unpacker: + reqs += 1 + req_found = True + break # Only process one request + +threads = [] +for i in range(10): + threads.append(gevent.spawn(requester)) +gevent.joinall(threads) +print "Found:", len(req["res"]), "x", reqs, time.time()-s + + +print "1 Threaded: ZeroMQ Send, receive 1000 ping request...", +s = time.time() +import zmq.green as zmq +c = zmq.Context() +zmq_sock = c.socket(zmq.REQ) +zmq_sock.connect('tcp://127.0.0.1:1234') +for i in range(1000): + zmq_sock.send(msgpack.packb({"cmd": "Ping"})) + req = zmq_sock.recv(16*1024) +print "Found:", req, time.time()-s + + +print "1 Threaded: ZeroMQ Send, receive 1000 x 10k data request...", +s = time.time() +import zmq.green as zmq +c = zmq.Context() +zmq_sock = c.socket(zmq.REQ) +zmq_sock.connect('tcp://127.0.0.1:1234') +for i in range(1000): + zmq_sock.send(msgpack.packb({"cmd": "Bigdata"})) + req = msgpack.unpackb(zmq_sock.recv(1024*1024)) +print "Found:", len(req["res"]), time.time()-s + + +print "1 Threaded: direct ZeroMQ Send, receive 1000 x 10k data request...", +s = time.time() +import zmq.green as zmq +c = zmq.Context() +zmq_sock = c.socket(zmq.REQ) +zmq_sock.connect('tcp://127.0.0.1:1233') +for i in range(1000): + zmq_sock.send(msgpack.packb({"cmd": "Bigdata"})) + req = msgpack.unpackb(zmq_sock.recv(1024*1024)) +print "Found:", len(req["res"]), time.time()-s \ No newline at end of file diff --git a/src/Connection/ConnectionServer.py b/src/Connection/ConnectionServer.py new file mode 100644 index 00000000..e8fcb22f --- /dev/null +++ b/src/Connection/ConnectionServer.py @@ -0,0 +1,231 @@ +from gevent.server import StreamServer +from gevent.pool import Pool +import socket, os, logging, random, string +import gevent, msgpack +import cStringIO as StringIO +from Debug import Debug +from Connection import Connection +from Config import config + + +class ConnectionServer: + def __init__(self, ip=None, port=None, request_handler=None): + self.ip = ip + self.port = port + self.last_connection_id = 1 # Connection id incrementer + self.log = logging.getLogger(__name__) + + self.connections = [] # Connections + self.ips = {} # Connection by ip + self.peer_ids = {} # Connections by peer_ids + + self.running = True + self.zmq_running = False + self.zmq_last_connection = None # Last incoming message client + + self.peer_id = "-ZN0"+config.version.replace(".", "")+"-"+''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(12)) # Bittorrent style peerid + + if port: # Listen server on a port + self.zmq_port = port-1 + self.pool = Pool(1000) # do not accept more than 1000 connections + self.stream_server = StreamServer((ip.replace("*", ""), port), self.handleIncomingConnection, spawn=self.pool, backlog=100) + if request_handler: self.handleRequest = request_handler + gevent.spawn(self.zmqServer) # Start ZeroMQ server for backward compatibility + + + + def start(self): + self.running = True + try: + self.log.debug("Binding to: %s:%s" % (self.ip, self.port)) + self.stream_server.serve_forever() # Start normal connection server + except Exception, err: + self.log.info("StreamServer bind error, must be running already: %s" % err) + + + def stop(self): + self.running = False + self.stream_server.stop() + + + def handleIncomingConnection(self, sock, addr): + ip, port = addr + connection = Connection(self, ip, port, sock) + self.connections.append(connection) + self.ips[ip] = connection + connection.handleIncomingConnection(sock) + + + + def connect(self, ip=None, port=None, peer_id=None): + if peer_id and peer_id in self.peer_ids: # Find connection by peer id + return self.peer_ids.get(peer_id) + if ip in self.ips: # Find connection by ip + return self.ips[ip] + # No connection found yet + try: + connection = Connection(self, ip, port) + self.ips[ip] = connection + self.connections.append(connection) + except Exception, err: + self.log.debug("%s Connect error: %s" % (ip, Debug.formatException(err))) + raise err + return connection + + + + def removeConnection(self, connection): + if self.ips.get(connection.ip) == connection: # Delete if same as in registry + del self.ips[connection.ip] + if connection in self.connections: + self.connections.remove(connection) + if connection.peer_id and self.peer_ids.get(connection.peer_id) == connection: # Delete if same as in registry + del self.peer_ids[connection.peer_id] + + + def zmqServer(self): + self.log.debug("Starting ZeroMQ on: tcp://127.0.0.1:%s..." % self.zmq_port) + try: + import zmq.green as zmq + context = zmq.Context() + self.zmq_sock = context.socket(zmq.REP) + self.zmq_sock.bind("tcp://127.0.0.1:%s" % self.zmq_port) + self.zmq_sock.hwm = 1 + self.zmq_sock.setsockopt(zmq.RCVTIMEO, 5000) # Wait for data receive + self.zmq_sock.setsockopt(zmq.SNDTIMEO, 50000) # Wait for data send + self.zmq_running = True + except Exception, err: + self.log.debug("ZeroMQ start error: %s" % Debug.formatException(err)) + return False + + while True: + try: + data = self.zmq_sock.recv() + if not data: break + message = msgpack.unpackb(data) + self.zmq_last_connection.handleMessage(message) + except Exception, err: + self.log.debug("ZMQ Server error: %s" % Debug.formatException(err)) + self.zmq_sock.send(msgpack.packb({"error": "%s" % err}, use_bin_type=True)) + + + # Forward incoming data to other socket + def forward(self, connection, source, dest): + data = True + try: + while data: + data = source.recv(16*1024) + self.zmq_last_connection = connection + if data: + dest.sendall(data) + else: + source.shutdown(socket.SHUT_RD) + dest.shutdown(socket.SHUT_WR) + except Exception, err: + self.log.debug("%s ZMQ forward error: %s" % (connection.ip, Debug.formatException(err))) + connection.close() + + +# -- TESTING -- + +def testCreateServer(): + global server + server = ConnectionServer("127.0.0.1", 1234, testRequestHandler) + server.start() + + +def testRequestHandler(connection, req): + print req + if req["cmd"] == "Bigdata": + connection.send({"res": "HelloWorld"*1024}) + else: + connection.send({"res": "pong"}) + + +def testClient(num): + time.sleep(1) + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(("localhost", 1234)) + for i in range(10): + print "[C%s] send..." % num + s.sendall(msgpack.packb({"cmd": "[C] Ping"})) + print "[C%s] recv..." % num + print "[C%s] %s" % (num, repr(s.recv(1024))) + time.sleep(1) + + +def testSlowClient(num): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(("localhost", 1234)) + for i in range(1): + print "[C%s] send..." % num + s.sendall(msgpack.packb({"cmd": "Bigdata"})) + print "[C%s] recv..." % num + gevent.spawn_later(1, lambda s: s.send(msgpack.packb({"cmd": "[Z] Ping"})), s) + while 1: + data = s.recv(1000) + if not data: break + print "[C%s] %s" % (num, data) + time.sleep(1) + #s.sendall(msgpack.packb({"cmd": "[C] Ping"})) + + +def testZmqClient(num): + import zmq.green as zmq + c = zmq.Context(1) + for i in range(10): + s = c.socket(zmq.REQ) + s.connect('tcp://127.0.0.1:1234') + print "[Z%s] send..." % num + s.send(msgpack.packb({"cmd": "[Z] Ping %s" % i})) + print "[Z%s] recv..." % num + print "[Z%s] %s" % (num, s.recv(1024)) + s.close() + time.sleep(1) + + +def testZmqSlowClient(num): + import zmq.green as zmq + c = zmq.Context(1) + s = c.socket(zmq.REQ) + for i in range(1): + s.connect('tcp://127.0.0.1:1234') + print "[Z%s] send..." % num + s.send(msgpack.packb({"cmd": "Bigdata"})) + print "[Z%s] recv..." % num + #gevent.spawn_later(1, lambda s: s.send(msgpack.packb({"cmd": "[Z] Ping"})), s) + while 1: + data = s.recv(1024*1024) + if not data: break + print "[Z%s] %s" % (num, data) + time.sleep(1) + s.send(msgpack.packb({"cmd": "[Z] Ping"})) + + +def testConnection(): + global server + connection = server.connect("127.0.0.1", 1234) + connection.send({"res": "Sending: Hello!"}) + print connection + + +def greenletsNum(): + from greenlet import greenlet + import gc + while 1: + print len([ob for ob in gc.get_objects() if isinstance(ob, greenlet)]) + time.sleep(1) + +if __name__ == "__main__": + from gevent import monkey; monkey.patch_all(thread=False) + import sys, time + logging.getLogger().setLevel(logging.DEBUG) + + gevent.spawn(testZmqClient, 1) + gevent.spawn(greenletsNum) + #gevent.spawn(testClient, 1) + #gevent.spawn_later(1, testConnection) + print "Running server..." + server = None + testCreateServer() + diff --git a/src/Connection/__init__.py b/src/Connection/__init__.py new file mode 100644 index 00000000..8f47108e --- /dev/null +++ b/src/Connection/__init__.py @@ -0,0 +1,2 @@ +from ConnectionServer import ConnectionServer +from Connection import Connection \ No newline at end of file diff --git a/src/Content/ContentManager.py b/src/Content/ContentManager.py index 2e69252d..7286eacb 100644 --- a/src/Content/ContentManager.py +++ b/src/Content/ContentManager.py @@ -1,4 +1,4 @@ -import json, time, re, os +import json, time, re, os, gevent from Debug import Debug from Crypt import CryptHash from Config import config @@ -42,7 +42,7 @@ class ContentManager: new_hash = info[hash_type] if old_content and old_content["files"].get(relative_path): # We have the file in the old content - old_hash = old_content["files"][relative_path][hash_type] + old_hash = old_content["files"][relative_path].get(hash_type) else: # The file is not in the old content old_hash = None if old_hash != new_hash: changed.append(content_dir+relative_path) @@ -293,6 +293,7 @@ class ContentManager: return None elif old_content["modified"] > new_content["modified"]: # We have newer self.log.debug("We have newer %s (Our: %s, Sent: %s)" % (inner_path, old_content["modified"], new_content["modified"])) + gevent.spawn(self.site.publish, inner_path=inner_path) # Try to fix the broken peers return False if new_content["modified"] > time.time()+60*60*24: # Content modified in the far future (allow 1 day window) self.log.error("%s modify is in the future!" % inner_path) diff --git a/src/File/FileRequest.py b/src/File/FileRequest.py index 02b134e1..682b9911 100644 --- a/src/File/FileRequest.py +++ b/src/File/FileRequest.py @@ -1,5 +1,4 @@ import os, msgpack, shutil, gevent -from Site import SiteManager from cStringIO import StringIO from Debug import Debug from Config import config @@ -8,21 +7,30 @@ FILE_BUFF = 1024*512 # Request from me class FileRequest: - def __init__(self, server = None): - if server: - self.server = server - self.log = server.log - self.sites = SiteManager.list() + def __init__(self, server, connection): + self.server = server + self.connection = connection + + self.req_id = None + self.sites = self.server.sites + self.log = server.log def send(self, msg): + self.connection.send(msg) + + + def response(self, msg): if not isinstance(msg, dict): # If msg not a dict create a {"body": msg} msg = {"body": msg} - self.server.socket.send(msgpack.packb(msg, use_bin_type=True)) + msg["cmd"] = "response" + msg["to"] = self.req_id + self.send(msg) # Route file requests - def route(self, cmd, params): + def route(self, cmd, req_id, params): + self.req_id = req_id if cmd == "getFile": self.actionGetFile(params) elif cmd == "update": @@ -37,7 +45,7 @@ class FileRequest: def actionUpdate(self, params): site = self.sites.get(params["site"]) if not site or not site.settings["serving"]: # Site unknown or not serving - self.send({"error": "Unknown site"}) + self.response({"error": "Unknown site"}) return False if site.settings["own"] and params["inner_path"].endswith("content.json"): self.log.debug("Someone trying to push a file to own site %s, reload local %s first" % (site.address, params["inner_path"])) @@ -61,7 +69,7 @@ class FileRequest: lambda: site.downloadContent(params["inner_path"], peer=peer) ) # Load new content file and download changed files in new thread - self.send({"ok": "Thanks, file %s updated!" % params["inner_path"]}) + self.response({"ok": "Thanks, file %s updated!" % params["inner_path"]}) elif valid == None: # Not changed peer = site.addPeer(*params["peer"], return_peer = True) # Add or get peer @@ -70,18 +78,18 @@ class FileRequest: for task in site.worker_manager.tasks: # New peer add to every ongoing task if task["peers"]: site.needFile(task["inner_path"], peer=peer, update=True, blocking=False) # Download file from this peer too if its peer locked - self.send({"ok": "File not changed"}) + self.response({"ok": "File not changed"}) else: # Invalid sign or sha1 hash self.log.debug("Update for %s is invalid" % params["inner_path"]) - self.send({"error": "File invalid"}) + self.response({"error": "File invalid"}) # Send file content request def actionGetFile(self, params): site = self.sites.get(params["site"]) if not site or not site.settings["serving"]: # Site unknown or not serving - self.send({"error": "Unknown site"}) + self.response({"error": "Unknown site"}) return False try: file_path = site.getPath(params["inner_path"]) @@ -93,18 +101,18 @@ class FileRequest: back["location"] = file.tell() back["size"] = os.fstat(file.fileno()).st_size if config.debug_socket: self.log.debug("Sending file %s from position %s to %s" % (file_path, params["location"], back["location"])) - self.send(back) + self.response(back) if config.debug_socket: self.log.debug("File %s sent" % file_path) except Exception, err: - self.send({"error": "File read error: %s" % Debug.formatException(err)}) + self.response({"error": "File read error: %s" % Debug.formatException(err)}) return False # Send a simple Pong! answer def actionPing(self): - self.send("Pong!") + self.response("Pong!") # Unknown command def actionUnknown(self, cmd, params): - self.send({"error": "Unknown command: %s" % cmd}) + self.response({"error": "Unknown command: %s" % cmd}) diff --git a/src/File/FileServer.py b/src/File/FileServer.py index 5ae5b732..d5f2d763 100644 --- a/src/File/FileServer.py +++ b/src/File/FileServer.py @@ -5,30 +5,28 @@ from Config import config from FileRequest import FileRequest from Site import SiteManager from Debug import Debug +from Connection import ConnectionServer -class FileServer: +class FileServer(ConnectionServer): def __init__(self): - self.ip = config.fileserver_ip - self.port = config.fileserver_port - self.log = logging.getLogger(__name__) + ConnectionServer.__init__(self, config.fileserver_ip, config.fileserver_port, self.handleRequest) if config.ip_external: # Ip external definied in arguments self.port_opened = True SiteManager.peer_blacklist.append((config.ip_external, self.port)) # Add myself to peer blacklist else: self.port_opened = None # Is file server opened on router self.sites = SiteManager.list() - self.running = True # Handle request to fileserver - def handleRequest(self, msg): - if "params" in msg: - self.log.debug("FileRequest: %s %s %s" % (msg["cmd"], msg["params"].get("site"), msg["params"].get("inner_path"))) + def handleRequest(self, connection, message): + if "params" in message: + self.log.debug("FileRequest: %s %s %s" % (message["cmd"], message["params"].get("site"), message["params"].get("inner_path"))) else: - self.log.debug("FileRequest: %s" % msg["cmd"]) - req = FileRequest(self) - req.route(msg["cmd"], msg.get("params")) + self.log.debug("FileRequest: %s" % req["cmd"]) + req = FileRequest(self, connection) + req.route(message["cmd"], message.get("req_id"), message.get("params")) # Reload the FileRequest class to prevent restarts in debug mode @@ -124,13 +122,15 @@ class FileServer: time.sleep(2) # Prevent too quick request - # Announce sites every 10 min + # Announce sites every 20 min def announceSites(self): while 1: time.sleep(20*60) # Announce sites every 20 min for address, site in self.sites.items(): if site.settings["serving"]: site.announce() # Announce site to tracker + for inner_path in site.bad_files: # Reset bad file retry counter + site.bad_files[inner_path] = 0 time.sleep(2) # Prevent too quick request @@ -155,40 +155,14 @@ class FileServer: from Debug import DebugReloader DebugReloader(self.reload) - self.context = zmq.Context() - socket = self.context.socket(zmq.REP) - self.socket = socket - self.socket.setsockopt(zmq.RCVTIMEO, 5000) # Wait for data receive - self.socket.setsockopt(zmq.SNDTIMEO, 50000) # Wait for data send - self.log.info("Binding to tcp://%s:%s" % (self.ip, self.port)) - try: - self.socket.bind('tcp://%s:%s' % (self.ip, self.port)) - except Exception, err: - self.log.error("Can't bind, FileServer must be running already") - return if check_sites: # Open port, Update sites, Check files integrity gevent.spawn(self.checkSites) thread_announce_sites = gevent.spawn(self.announceSites) thread_wakeup_watcher = gevent.spawn(self.wakeupWatcher) - while self.running: - try: - ret = {} - req = msgpack.unpackb(socket.recv()) - self.handleRequest(req) - except Exception, err: - self.log.error(err) - if self.running: self.socket.send(msgpack.packb({"error": "%s" % Debug.formatException(err)}, use_bin_type=True)) - if config.debug: # Raise exception - import sys - sys.modules["src.main"].DebugHook.handleError() - thread_wakeup_watcher.kill(exception=Debug.Notify("Stopping FileServer")) - thread_announce_sites.kill(exception=Debug.Notify("Stopping FileServer")) + ConnectionServer.start(self) + + # thread_wakeup_watcher.kill(exception=Debug.Notify("Stopping FileServer")) + # thread_announce_sites.kill(exception=Debug.Notify("Stopping FileServer")) self.log.debug("Stopped.") - - - def stop(self): - self.running = False - self.socket.close() - diff --git a/src/Peer/Peer.py b/src/Peer/Peer.py index b13f7b5c..0c4cc8a5 100644 --- a/src/Peer/Peer.py +++ b/src/Peer/Peer.py @@ -1,21 +1,20 @@ -import os, logging, gevent, time, msgpack +import os, logging, gevent, time, msgpack, sys import zmq.green as zmq from cStringIO import StringIO from Config import config from Debug import Debug -context = zmq.Context() - # Communicate remote peers class Peer: - def __init__(self, ip, port, site): + def __init__(self, ip, port, site=None): self.ip = ip self.port = port self.site = site self.key = "%s:%s" % (ip, port) self.log = None + self.connection_server = sys.modules["src.main"].file_server - self.socket = None + self.connection = None self.last_found = None # Time of last found in the torrent tracker self.last_response = None # Time of last successfull response from peer self.last_ping = None # Last response time for ping @@ -29,19 +28,22 @@ class Peer: # Connect to host def connect(self): + if self.connection: self.connection.close() + self.connection = None if not self.log: self.log = logging.getLogger("Peer:%s:%s" % (self.ip, self.port)) - if self.socket: self.socket.close() - self.socket = context.socket(zmq.REQ) - self.socket.setsockopt(zmq.RCVTIMEO, 50000) # Wait for data arrive - self.socket.setsockopt(zmq.SNDTIMEO, 5000) # Wait for data send - self.socket.setsockopt(zmq.LINGER, 500) # Wait for socket close - # self.socket.setsockopt(zmq.TCP_KEEPALIVE, 1) # Enable keepalive - # self.socket.setsockopt(zmq.TCP_KEEPALIVE_IDLE, 4*60) # Send after 4 minute idle - # self.socket.setsockopt(zmq.TCP_KEEPALIVE_INTVL, 15) # Wait 15 sec to response - # self.socket.setsockopt(zmq.TCP_KEEPALIVE_CNT, 4) # 4 Probes - self.socket.connect('tcp://%s:%s' % (self.ip, self.port)) + self.log.debug("Connecting...") + try: + self.connection = self.connection_server.connect(self.ip, self.port) + except Exception, err: + self.log.debug("Connecting error: %s" % Debug.formatException(err)) + self.onConnectionError() + + def __str__(self): + return "Peer %-12s" % self.ip + def __repr__(self): + return "<%s>" % self.__str__() # Found a peer on tracker def found(self): @@ -49,18 +51,20 @@ class Peer: # Send a command to peer - def sendCmd(self, cmd, params = {}): - if not self.socket: self.connect() + def request(self, cmd, params = {}): + if not self.connection or self.connection.closed: + self.connect() + if not self.connection: return None # Connection failed + if cmd != "ping" and self.last_response and time.time() - self.last_response > 20*60: # If last response if older than 20 minute, ping first to see if still alive if not self.ping(): return None for retry in range(1,3): # Retry 3 times - if config.debug_socket: self.log.debug("sendCmd: %s %s" % (cmd, params.get("inner_path"))) + #if config.debug_socket: self.log.debug("sendCmd: %s %s" % (cmd, params.get("inner_path"))) try: - self.socket.send(msgpack.packb({"cmd": cmd, "params": params}, use_bin_type=True)) - if config.debug_socket: self.log.debug("Sent command: %s" % cmd) - response = msgpack.unpackb(self.socket.recv()) - if config.debug_socket: self.log.debug("Got response to: %s" % cmd) + response = self.connection.request(cmd, params) + if not response: raise Exception("Send error") + #if config.debug_socket: self.log.debug("Got response to: %s" % cmd) if "error" in response: self.log.debug("%s error: %s" % (cmd, response["error"])) self.onConnectionError() @@ -69,13 +73,14 @@ class Peer: self.last_response = time.time() return response except Exception, err: - self.onConnectionError() - self.log.debug("%s (connection_error: %s, hash_failed: %s, retry: %s)" % (Debug.formatException(err), self.connection_error, self.hash_failed, retry)) - time.sleep(1*retry) - self.connect() - if type(err).__name__ == "Notify" and err.message == "Worker stopped": # Greenlet kill by worker - self.log.debug("Peer worker got killed, aborting cmd: %s" % cmd) + if type(err).__name__ == "Notify": # Greenlet kill by worker + self.log.debug("Peer worker got killed: %s, aborting cmd: %s" % (err.message, cmd)) break + else: + self.onConnectionError() + self.log.debug("%s (connection_error: %s, hash_failed: %s, retry: %s)" % (Debug.formatException(err), self.connection_error, self.hash_failed, retry)) + time.sleep(1*retry) + self.connect() return None # Failed after 4 retry @@ -85,7 +90,7 @@ class Peer: buff = StringIO() s = time.time() while 1: # Read in 512k parts - back = self.sendCmd("getFile", {"site": site, "inner_path": inner_path, "location": location}) # Get file content from last location + back = self.request("getFile", {"site": site, "inner_path": inner_path, "location": location}) # Get file content from last location if not back or "body" not in back: # Error return False @@ -106,7 +111,8 @@ class Peer: for retry in range(1,3): # Retry 3 times s = time.time() with gevent.Timeout(10.0, False): # 10 sec timeout, dont raise exception - response = self.sendCmd("ping") + response = self.request("ping") + if response and "body" in response and response["body"] == "Pong!": response_time = time.time()-s break # All fine, exit from for loop @@ -126,7 +132,8 @@ class Peer: def remove(self): self.log.debug("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed)) if self.key in self.site.peers: del(self.site.peers[self.key]) - self.socket.close() + if self.connection: + self.connection.close() # - EVENTS - diff --git a/src/Site/Site.py b/src/Site/Site.py index 590ed856..df71ff6e 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -27,7 +27,7 @@ class Site: self.peer_blacklist = SiteManager.peer_blacklist # Ignore this peers (eg. myself) self.last_announce = 0 # Last announce time to tracker self.worker_manager = WorkerManager(self) # Handle site download from other peers - self.bad_files = {} # SHA512 check failed files, need to redownload + self.bad_files = {} # SHA512 check failed files, need to redownload {"inner.content": 1} (key: file, value: failed accept) self.content_updated = None # Content.js update time self.last_downloads = [] # Files downloaded in run of self.download() self.notifications = [] # Pending notifications displayed once on page load [error|ok|info, message, timeout] @@ -115,28 +115,39 @@ class Site: changed = self.content_manager.loadContent(inner_path, load_includes=False) # Start download files - evts = [] + file_threads = [] if download_files: for file_relative_path in self.content_manager.contents[inner_path].get("files", {}).keys(): file_inner_path = content_inner_dir+file_relative_path res = self.needFile(file_inner_path, blocking=False, update=self.bad_files.get(file_inner_path), peer=peer) # No waiting for finish, return the event if res != True: # Need downloading self.last_downloads.append(file_inner_path) - evts.append(res) # Append evt + file_threads.append(res) # Append evt # Wait for includes download + include_threads = [] for file_relative_path in self.content_manager.contents[inner_path].get("includes", {}).keys(): file_inner_path = content_inner_dir+file_relative_path - self.downloadContent(file_inner_path, download_files=download_files, peer=peer) + include_thread = gevent.spawn(self.downloadContent, file_inner_path, download_files=download_files, peer=peer) + include_threads.append(include_thread) + self.log.debug("%s: Downloading %s includes..." % (inner_path, len(include_threads))) + gevent.joinall(include_threads) self.log.debug("%s: Includes downloaded" % inner_path) - self.log.debug("%s: Downloading %s files..." % (inner_path, len(evts))) - gevent.joinall(evts) + + self.log.debug("%s: Downloading %s files..." % (inner_path, len(file_threads))) + gevent.joinall(file_threads) self.log.debug("%s: All file downloaded in %.2fs" % (inner_path, time.time()-s)) return True + # Return bad files with less than 3 retry + def getReachableBadFiles(self): + if not self.bad_files: return False + return [bad_file for bad_file, retry in self.bad_files.iteritems() if retry < 3] + + # Download all files of the site @util.Noparallel(blocking=False) def download(self, check_size=False): @@ -163,7 +174,7 @@ class Site: changed = self.content_manager.loadContent("content.json") if changed: for changed_file in changed: - self.bad_files[changed_file] = True + self.bad_files[changed_file] = self.bad_files.get(changed_file, 0)+1 if not self.settings["own"]: self.checkFiles(quick_check=True) # Quick check files based on file size if self.bad_files: self.download() @@ -178,16 +189,19 @@ class Site: if not peers or len(published) >= limit: break # All peers done, or published engouht peer = peers.pop(0) result = {"exception": "Timeout"} - try: - with gevent.Timeout(timeout, False): - result = peer.sendCmd("update", { - "site": self.address, - "inner_path": inner_path, - "body": open(self.getPath(inner_path), "rb").read(), - "peer": (config.ip_external, config.fileserver_port) - }) - except Exception, err: - result = {"exception": Debug.formatException(err)} + + for retry in range(2): + try: + with gevent.Timeout(timeout, False): + result = peer.request("update", { + "site": self.address, + "inner_path": inner_path, + "body": open(self.getPath(inner_path), "rb").read(), + "peer": (config.ip_external, config.fileserver_port) + }) + if result: break + except Exception, err: + result = {"exception": Debug.formatException(err)} if result and "ok" in result: published.append(peer) @@ -202,6 +216,8 @@ class Site: published = [] # Successfuly published (Peer) publishers = [] # Publisher threads peers = self.peers.values() + + random.shuffle(peers) for i in range(limit): publisher = gevent.spawn(self.publisher, inner_path, peers, published, limit) publishers.append(publisher) @@ -303,7 +319,7 @@ class Site: bad_files = self.verifyFiles(quick_check) if bad_files: for bad_file in bad_files: - self.bad_files[bad_file] = True + self.bad_files[bad_file] = self.bad_files.get("bad_file", 0)+1 def deleteFiles(self): @@ -387,6 +403,8 @@ class Site: if inner_path == "content.json": self.content_updated = False self.log.error("Can't update content.json") + if inner_path in self.bad_files: + self.bad_files[inner_path] = self.bad_files.get(inner_path, 0)+1 self.updateWebsocket(file_failed=inner_path) diff --git a/src/Ui/UiRequest.py b/src/Ui/UiRequest.py index ce648ebf..91f4cfdd 100644 --- a/src/Ui/UiRequest.py +++ b/src/Ui/UiRequest.py @@ -46,6 +46,8 @@ class UiRequest: return self.actionDebug() elif path == "/Console" and config.debug: return self.actionConsole() + elif path == "/Stats": + return self.actionStats() # Test elif path == "/Test/Websocket": return self.actionFile("Data/temp/ws_test.html") @@ -114,7 +116,7 @@ class UiRequest: if not inner_path: inner_path = "index.html" # If inner path defaults to index.html site = self.server.sites.get(match.group("site")) - if site and site.content_manager.contents.get("content.json") and (not site.bad_files or site.settings["own"]): # Its downloaded or own + if site and site.content_manager.contents.get("content.json") and (not site.getReachableBadFiles() or site.settings["own"]): # Its downloaded or own title = site.content_manager.contents["content.json"]["title"] else: title = "Loading %s..." % match.group("site") @@ -268,10 +270,30 @@ class UiRequest: # Just raise an error to get console def actionConsole(self): + import sys sites = self.server.sites + main = sys.modules["src.main"] raise Exception("Here is your console") + def actionStats(self): + import gc, sys + from greenlet import greenlet + greenlets = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)] + self.sendHeader() + main = sys.modules["src.main"] + + yield "
"
+		yield "Connections (%s):
" % len(main.file_server.connections) + for connection in main.file_server.connections: + yield "%s: %s %s
" % (connection.protocol, connection.ip, connection.zmq_sock) + + yield "Greenlets (%s):
" % len(greenlets) + for thread in greenlets: + yield " - %s
" % cgi.escape(repr(thread)) + yield "
" + + # - Tests - def actionTestStream(self): diff --git a/src/Ui/UiServer.py b/src/Ui/UiServer.py index 0592a019..5fa00690 100644 --- a/src/Ui/UiServer.py +++ b/src/Ui/UiServer.py @@ -94,11 +94,12 @@ class UiServer: browser = webbrowser.get(config.open_browser) browser.open("http://%s:%s" % (config.ui_ip, config.ui_port), new=2) - self.server = WSGIServer((self.ip, self.port), handler, handler_class=UiWSGIHandler, log=self.log) + self.server = WSGIServer((self.ip.replace("*", ""), self.port), handler, handler_class=UiWSGIHandler, log=self.log) self.server.sockets = {} self.server.serve_forever() self.log.debug("Stopped.") + def stop(self): # Close WS sockets for client in self.server.clients.values(): diff --git a/src/Worker/Worker.py b/src/Worker/Worker.py index df651e55..325ebb48 100644 --- a/src/Worker/Worker.py +++ b/src/Worker/Worker.py @@ -53,7 +53,8 @@ class Worker: self.manager.doneTask(task) self.task = None else: # Hash failed - self.manager.log.debug("%s: Hash failed: %s" % (self.key, task["inner_path"])) + self.manager.log.debug("%s: Hash failed: %s, failed peers: %s" % (self.key, task["inner_path"], len(task["failed"]))) + task["failed"].append(self.key) self.task = None self.peer.hash_failed += 1 if self.peer.hash_failed >= 3: # Broken peer diff --git a/src/Worker/WorkerManager.py b/src/Worker/WorkerManager.py index e5bf3915..02e1c1d4 100644 --- a/src/Worker/WorkerManager.py +++ b/src/Worker/WorkerManager.py @@ -20,8 +20,8 @@ class WorkerManager: time.sleep(15) # Check every 15 sec # Clean up workers - if not self.tasks and self.workers: # No task but workers still running - for worker in self.workers.values(): worker.stop() + for worker in self.workers.values(): + if worker.task and worker.task["done"]: worker.stop() # Stop workers with task done if not self.tasks: continue tasks = self.tasks[:] # Copy it so removing elements wont cause any problem @@ -38,7 +38,7 @@ class WorkerManager: elif (task["time_started"] and time.time() >= task["time_started"]+15) or not self.workers: # Task started more than 15 sec ago or no workers self.log.debug("Task taking more than 15 secs, find more peers: %s" % task["inner_path"]) task["site"].announce() # Find more peers - if task["peers"]: # Release the peer olck + if task["peers"]: # Release the peer lock self.log.debug("Task peer lock release: %s" % task["inner_path"]) task["peers"] = [] self.startWorkers() @@ -62,6 +62,7 @@ class WorkerManager: self.tasks.sort(key=self.taskSorter, reverse=True) # Sort tasks by priority and worker numbers for task in self.tasks: # Find a task if task["peers"] and peer not in task["peers"]: continue # This peer not allowed to pick this task + if peer.key in task["failed"]: continue # Peer already tried to solve this, but failed return task @@ -85,9 +86,9 @@ class WorkerManager: # Start workers to process tasks def startWorkers(self, peers=None): - if len(self.workers) >= MAX_WORKERS and not peers: return False # Workers number already maxed + if len(self.workers) >= MAX_WORKERS and not peers: return False # Workers number already maxed and no starting peers definied if not self.tasks: return False # No task for workers - peers = self.site.peers.values() + if not peers: peers = self.site.peers.values() # No peers definied, use any from site random.shuffle(peers) for peer in peers: # One worker for every peer if peers and peer not in peers: continue # If peers definied and peer not valid @@ -139,7 +140,7 @@ class WorkerManager: peers = [peer] # Only download from this peer else: peers = None - task = {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "time_added": time.time(), "time_started": None, "peers": peers, "priority": priority} + task = {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "time_added": time.time(), "time_started": None, "peers": peers, "priority": priority, "failed": []} self.tasks.append(task) self.log.debug("New task: %s, peer lock: %s, priority: %s" % (task["inner_path"], peers, priority)) self.startWorkers(peers) @@ -168,5 +169,5 @@ class WorkerManager: self.tasks.remove(task) # Remove from queue self.site.onFileDone(task["inner_path"]) task["evt"].set(True) - if not self.tasks: self.site.onComplete() # No more task trigger site compelte + if not self.tasks: self.site.onComplete() # No more task trigger site complete diff --git a/src/main.py b/src/main.py index f1385eb2..d5bc2f8e 100644 --- a/src/main.py +++ b/src/main.py @@ -150,8 +150,10 @@ def siteNeedFile(address, inner_path): def sitePublish(address, peer_ip=None, peer_port=15441, inner_path="content.json"): + global file_server from Site import Site from File import FileServer # We need fileserver to handle incoming file requests + logging.info("Creating FileServer....") file_server = FileServer() file_server_thread = gevent.spawn(file_server.start, check_sites=False) # Dont check every site integrity @@ -184,10 +186,15 @@ def cryptoPrivatekeyToAddress(privatekey=None): # Peer -def peerPing(ip, port): +def peerPing(peer_ip, peer_port): + logging.info("Opening a simple connection server") + global file_server + from Connection import ConnectionServer + file_server = ConnectionServer("127.0.0.1", 1234) + from Peer import Peer - logging.info("Pinging 5 times peer: %s:%s..." % (ip, port)) - peer = Peer(ip, port) + logging.info("Pinging 5 times peer: %s:%s..." % (peer_ip, peer_port)) + peer = Peer(peer_ip, peer_port) for i in range(5): s = time.time() print peer.ping(), @@ -195,12 +202,15 @@ def peerPing(ip, port): time.sleep(1) -def peerGetFile(ip, port, site, filename=None): +def peerGetFile(peer_ip, peer_port, site, filename): + logging.info("Opening a simple connection server") + global file_server + from Connection import ConnectionServer + file_server = ConnectionServer() + from Peer import Peer - if not site: site = config.homepage - if not filename: filename = "content.json" - logging.info("Getting %s/%s from peer: %s:%s..." % (site, filename, ip, port)) - peer = Peer(ip, port) + logging.info("Getting %s/%s from peer: %s:%s..." % (site, filename, peer_ip, peer_port)) + peer = Peer(peer_ip, peer_port) s = time.time() print peer.getFile(site, filename).read() print "Response time: %.3fs" % (time.time()-s) diff --git a/src/util/Event.py b/src/util/Event.py index 664daae9..4b4e7c96 100644 --- a/src/util/Event.py +++ b/src/util/Event.py @@ -3,7 +3,7 @@ class Event(list): def __call__(self, *args, **kwargs): for f in self[:]: - if "once" in dir(f): + if "once" in dir(f) and f in self: self.remove(f) f(*args, **kwargs) diff --git a/tools/upnpc/upnpc-static.exe b/tools/upnpc/upnpc-static.exe deleted file mode 100644 index 7b315990..00000000 Binary files a/tools/upnpc/upnpc-static.exe and /dev/null differ diff --git a/zeronet.py b/zeronet.py index e8156209..4dcceb73 100644 --- a/zeronet.py +++ b/zeronet.py @@ -35,3 +35,4 @@ def main(): if __name__ == '__main__': main() +