diff --git a/src/Config.py b/src/Config.py index 69ccaaf7..4cbbac6c 100644 --- a/src/Config.py +++ b/src/Config.py @@ -201,6 +201,7 @@ class Config(object): self.parser.add_argument('--size_limit', help='Default site size limit in MB', default=10, type=int, metavar='limit') self.parser.add_argument('--file_size_limit', help='Maximum per file size limit in MB', default=10, type=int, metavar='limit') self.parser.add_argument('--connected_limit', help='Max connected peer per site', default=8, type=int, metavar='connected_limit') + self.parser.add_argument('--global_connected_limit', help='Max connections', default=512, type=int, metavar='global_connected_limit') self.parser.add_argument('--workers', help='Download workers per site', default=5, type=int, metavar='workers') self.parser.add_argument('--fileserver_ip', help='FileServer bind address', default="*", metavar='ip') diff --git a/src/Connection/ConnectionServer.py b/src/Connection/ConnectionServer.py index 6af519ac..4e59a682 100644 --- a/src/Connection/ConnectionServer.py +++ b/src/Connection/ConnectionServer.py @@ -8,6 +8,7 @@ import msgpack from gevent.server import StreamServer from gevent.pool import Pool +import util from Debug import Debug from Connection import Connection from Config import config @@ -158,6 +159,10 @@ class ConnectionServer: except Exception, err: connection.close("%s Connect error: %s" % (ip, Debug.formatException(err))) raise err + + if len(self.connections) > config.global_connected_limit: + gevent.spawn(self.checkMaxConnections) + return connection else: return None @@ -256,6 +261,28 @@ class ConnectionServer: if time.time() - s > 0.01: self.log.debug("Connection cleanup in %.3fs" % (time.time() - s)) + @util.Noparallel(blocking=False) + def checkMaxConnections(self): + if len(self.connections) < config.global_connected_limit: + return 0 + + s = time.time() + num_connected_before = len(self.connections) + self.connections.sort(key=lambda connection: connection.sites) + num_closed = 0 + for connection in self.connections: + idle = time.time() - max(connection.last_recv_time, connection.start_time, connection.last_message_time) + if idle > 60: + connection.close("Connection limit reached") + num_closed += 1 + if num_closed > config.global_connected_limit * 0.1: + break + + self.log.debug("Closed %s connections of %s after reached limit %s in %.3fs" % ( + num_closed, num_connected_before, config.global_connected_limit, time.time() - s + )) + return num_closed + def onInternetOnline(self): self.log.info("Internet online")