rev259, Protection against connection flood, Fix site size limit error dialog, Convert ConnectionServer and ContentManager to PEP8 format

This commit is contained in:
HelloZeroNet 2015-06-25 20:09:41 +02:00
parent f63b711972
commit dc791a31ab
6 changed files with 301 additions and 339 deletions

View File

@ -4,7 +4,7 @@ import ConfigParser
class Config(object):
def __init__(self):
self.version = "0.3.1"
self.rev = 247
self.rev = 259
self.parser = self.createArguments()
argv = sys.argv[:] # Copy command line arguments
argv = self.parseConfig(argv) # Add arguments from config file

View File

@ -1,8 +1,14 @@
import logging
import random
import string
import time
import sys
import gevent
import msgpack
from gevent.server import StreamServer
from gevent.pool import Pool
import socket, os, logging, random, string, time, sys
import gevent, msgpack
import cStringIO as StringIO
from Debug import Debug
from Connection import Connection
from Config import config
@ -18,6 +24,7 @@ class ConnectionServer:
self.port_opened = None
self.connections = [] # Connections
self.ip_incoming = {} # Incoming connections from ip in the last minute to avoid connection flood
self.ips = {} # Connection by ip
self.peer_ids = {} # Connections by peer_ids
@ -27,65 +34,85 @@ class ConnectionServer:
self.bytes_recv = 0
self.bytes_sent = 0
self.peer_id = "-ZN0"+config.version.replace(".", "")+"-"+''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(12)) # Bittorrent style peerid
# Bittorrent style peerid
self.peer_id = "-ZN0%s-%s" % (
config.version.replace(".", ""),
''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(12))
# Check msgpack version
if msgpack.version[0] == 0 and msgpack.version[1] < 4:
self.log.error("Error: Too old msgpack version: %s (>0.4.0 required), please update using `sudo pip install msgpack-python --upgrade`" % str(msgpack.version))
import sys
"Error: Too old msgpack version: %s (>0.4.0 required), please update using `sudo pip install msgpack-python --upgrade`" %
if port: # Listen server on a port
self.pool = Pool(1000) # do not accept more than 1000 connections
self.stream_server = StreamServer((ip.replace("*", ""), port), self.handleIncomingConnection, spawn=self.pool, backlog=100)
if request_handler: self.handleRequest = request_handler
if request_handler:
self.handleRequest = request_handler
def start(self):
self.running = True
self.log.debug("Binding to: %s:%s, (msgpack: %s), supported crypt: %s" % (self.ip, self.port, ".".join(map(str, msgpack.version)), CryptConnection.manager.crypt_supported ) )
self.log.debug("Binding to: %s:%s, (msgpack: %s), supported crypt: %s" % (
self.ip, self.port,
".".join(map(str, msgpack.version)), CryptConnection.manager.crypt_supported)
self.stream_server.serve_forever() # Start normal connection server
except Exception, err:"StreamServer bind error, must be running already: %s" % err)
def stop(self):
self.running = False
def handleIncomingConnection(self, sock, addr):
ip, port = addr
# Connection flood protection
if ip in self.ip_incoming:
self.ip_incoming[ip] += 1
if self.ip_incoming[ip] > 3: # Allow 3 in 1 minute from same ip
self.log.debug("Connection flood detected from %s" % ip)
return False
self.ip_incoming[ip] = 0
connection = Connection(self, ip, port, sock)
self.ips[ip] = connection
def getConnection(self, ip=None, port=None, peer_id=None, create=True):
if peer_id and peer_id in self.peer_ids: # Find connection by peer id
connection = self.peer_ids.get(peer_id)
if not connection.connected and create:
succ = connection.event_connected.get() # Wait for connection
if not succ: raise Exception("Connection event return error")
if not succ:
raise Exception("Connection event return error")
return connection
# Find connection by ip
if ip in self.ips:
connection = self.ips[ip]
if not connection.connected and create:
succ = connection.event_connected.get() # Wait for connection
if not succ: raise Exception("Connection event return error")
if not succ:
raise Exception("Connection event return error")
return connection
# Recover from connection pool
for connection in self.connections:
if connection.ip == ip:
if not connection.connected and create:
succ = connection.event_connected.get() # Wait for connection
if not succ: raise Exception("Connection event return error")
if not succ:
raise Exception("Connection event return error")
return connection
# No connection found
@ -109,8 +136,6 @@ class ConnectionServer:
return None
def removeConnection(self, connection):
self.log.debug("Removing %s..." % connection)
if self.ips.get(connection.ip) == connection: # Delete if same as in registry
@ -120,11 +145,10 @@ class ConnectionServer:
if connection in self.connections:
def checkConnections(self):
while self.running:
time.sleep(60) # Sleep 1 min
self.ip_incoming = {}
for connection in self.connections[:]: # Make a copy
idle = time.time() - max(connection.last_recv_time, connection.start_time, connection.last_message_time)
@ -152,108 +176,3 @@ class ConnectionServer:
elif idle > 60 and connection.protocol == "?": # No connection after 1 min
connection.log("[Cleanup] Connect timeout: %s" % idle)
# -- TESTING --
def testCreateServer():
global server
server = ConnectionServer("", 1234, testRequestHandler)
def testRequestHandler(connection, req):
print req
if req["cmd"] == "Bigdata":
connection.send({"res": "HelloWorld"*1024})
connection.send({"res": "pong"})
def testClient(num):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("localhost", 1234))
for i in range(10):
print "[C%s] send..." % num
s.sendall(msgpack.packb({"cmd": "[C] Ping"}))
print "[C%s] recv..." % num
print "[C%s] %s" % (num, repr(s.recv(1024)))
def testSlowClient(num):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("localhost", 1234))
for i in range(1):
print "[C%s] send..." % num
s.sendall(msgpack.packb({"cmd": "Bigdata"}))
print "[C%s] recv..." % num
gevent.spawn_later(1, lambda s: s.send(msgpack.packb({"cmd": "[Z] Ping"})), s)
while 1:
data = s.recv(1000)
if not data: break
print "[C%s] %s" % (num, data)
#s.sendall(msgpack.packb({"cmd": "[C] Ping"}))
def testZmqClient(num):
import as zmq
c = zmq.Context(1)
for i in range(10):
s = c.socket(zmq.REQ)
print "[Z%s] send..." % num
s.send(msgpack.packb({"cmd": "[Z] Ping %s" % i}))
print "[Z%s] recv..." % num
print "[Z%s] %s" % (num, s.recv(1024))
def testZmqSlowClient(num):
import as zmq
c = zmq.Context(1)
s = c.socket(zmq.REQ)
for i in range(1):
print "[Z%s] send..." % num
s.send(msgpack.packb({"cmd": "Bigdata"}))
print "[Z%s] recv..." % num
#gevent.spawn_later(1, lambda s: s.send(msgpack.packb({"cmd": "[Z] Ping"})), s)
while 1:
data = s.recv(1024*1024)
if not data: break
print "[Z%s] %s" % (num, data)
s.send(msgpack.packb({"cmd": "[Z] Ping"}))
def testConnection():
global server
connection = server.getConnection("", 1234)
connection.send({"res": "Sending: Hello!"})
print connection
def greenletsNum():
from greenlet import greenlet
import gc
while 1:
print len([ob for ob in gc.get_objects() if isinstance(ob, greenlet)])
if __name__ == "__main__":
from gevent import monkey; monkey.patch_all(thread=False)
import sys, time
gevent.spawn(testZmqClient, 1)
#gevent.spawn(testClient, 1)
#gevent.spawn_later(1, testConnection)
print "Running server..."
server = None

View File

@ -1,9 +1,18 @@
import json, time, re, os, gevent, copy
import json
import time
import re
import os
import copy
import gevent
from Debug import Debug
from Crypt import CryptHash
from Config import config
class ContentManager(object):
def __init__(self, site): = site
self.log =
@ -44,7 +53,8 @@ class ContentManager(object):
old_hash = old_content["files"][relative_path].get(hash_type)
else: # The file is not in the old content
old_hash = None
if old_hash != new_hash: changed.append(content_dir+relative_path)
if old_hash != new_hash:
changed.append(content_dir + relative_path)
# Load includes
if load_includes and "includes" in new_content:
@ -52,7 +62,8 @@ class ContentManager(object):
include_inner_path = content_dir + relative_path
if # Content.json exists, load it
success = self.loadContent(include_inner_path, add_bad_files=add_bad_files)
if success: changed += success # Add changed files
if success:
changed += success # Add changed files
else: # Content.json not exist, add to changed files
self.log.debug("Missing include: %s" % include_inner_path)
changed += [include_inner_path]
@ -61,9 +72,11 @@ class ContentManager(object):
if load_includes and "user_contents" in new_content:
for relative_dir in os.listdir(content_path_dir):
include_inner_path = content_dir + relative_dir + "/content.json"
if not continue # Content.json not exist
if not
continue # Content.json not exist
success = self.loadContent(include_inner_path, add_bad_files=add_bad_files, load_includes=False)
if success: changed += success # Add changed files
if success:
changed += success # Add changed files
# Update the content
self.contents[content_inner_path] = new_content
@ -77,7 +90,8 @@ class ContentManager(object):[inner_path] = True
if new_content["modified"] >"modified", 0):["modified"] = min(time.time()+60*10, new_content["modified"]) # Dont store modifications in the far future (more than 10 minute)
# Dont store modifications in the far future (more than 10 minute)["modified"] = min(time.time() + 60 * 10, new_content["modified"])
return changed
@ -86,7 +100,8 @@ class ContentManager(object):
def getTotalSize(self, ignore=None):
total_size = 0
for inner_path, content in self.contents.iteritems():
if inner_path == ignore: continue
if inner_path == ignore:
total_size += # Size of content.json
for file, info in content.get("files", {}).iteritems():
total_size += info["size"]
@ -126,7 +141,8 @@ class ContentManager(object):
def getRules(self, inner_path, content=None):
if not inner_path.endswith("content.json"): # Find the files content.json first
file_info = self.getFileInfo(inner_path)
if not file_info: return False # File not found
if not file_info:
return False # File not found
inner_path = file_info["content_inner_path"]
dirs = inner_path.split("/") # Parent dirs of content.json
inner_path_parts = [dirs.pop()] # Filename relative to content.json
@ -146,7 +162,6 @@ class ContentManager(object):
return False
# Get rules for a user file
# Return: The rules of the file or False if not allowed
def getUserContentRules(self, parent_content, inner_path, content):
@ -154,7 +169,8 @@ class ContentManager(object):
user_address = re.match(".*/([A-Za-z0-9]*?)/.*?$", inner_path).group(1) # Delivered for directory
if not content: content = # Read the file if no content specified
if not content:
content = # Read the file if no content specified
except (Exception, ): # Content.json not exist
return {"signers": [user_address], "user_address": user_address} # Return information that we know for sure
@ -165,12 +181,13 @@ class ContentManager(object):
user_urn = "%s/%s" % (content["cert_auth_type"], content["cert_user_id"]) # web/nofish@zeroid.bit
rules = copy.copy(user_contents["permissions"].get(content["cert_user_id"], {})) # Default rules by username
if rules == False:
if rules is False:
return False # User banned
if "signers" in rules:
rules["signers"] = rules["signers"][:] # Make copy of the signers
for permission_pattern, permission_rules in user_contents["permission_rules"].items(): # Regexp rules
if not re.match(permission_pattern, user_urn): continue # Rule is not valid for user
if not re.match(permission_pattern, user_urn):
continue # Rule is not valid for user
# Update rules if its better than current recorded ones
for key, val in permission_rules.iteritems():
if key not in rules:
@ -182,12 +199,14 @@ class ContentManager(object):
if val > rules[key]:
rules[key] = val
elif hasattr(val, "startswith"): # String, update if longer
if len(val) > len(rules[key]): rules[key] = val
if len(val) > len(rules[key]):
rules[key] = val
elif type(val) is list: # List, append
rules[key] += val
rules["cert_signers"] = user_contents["cert_signers"] # Add valid cert signers
if "signers" not in rules: rules["signers"] = []
if "signers" not in rules:
rules["signers"] = []
rules["signers"].append(user_address) # Add user as valid signer
rules["user_address"] = user_address
@ -205,7 +224,8 @@ class ContentManager(object):
content["description"] = ""
content["signs_required"] = 1
content["ignore"] = ""
if extend: content.update(extend) # Add custom fields
if extend:
content.update(extend) # Add custom fields
directory = self.toDir("Opening site data directory: %s..." % directory)
@ -217,10 +237,14 @@ class ContentManager(object):
file_path ="%s/%s" % (root.strip("/"), file_name))
file_inner_path = re.sub(re.escape(directory), "", file_path)
if file_name == "content.json": ignored = True
elif content.get("ignore") and re.match(content["ignore"], file_inner_path): ignored = True
elif file_name.startswith("."): ignored = True
else: ignored = False
if file_name == "content.json":
ignored = True
elif content.get("ignore") and re.match(content["ignore"], file_inner_path):
ignored = True
elif file_name.startswith("."):
ignored = True
ignored = False
if ignored: # Ignore content.json, definied regexp and files starting with ."- [SKIPPED] %s" % file_inner_path)
@ -228,7 +252,10 @@ class ContentManager(object):
sha512sum = CryptHash.sha512sum(file_path) # Calculate sha512 sum of file"- %s (SHA512: %s)" % (file_inner_path, sha512sum))
hashed_files[file_inner_path] = {"sha512": sha512sum, "size": os.path.getsize(file_path)}
if file_inner_path in content["files"].keys() and hashed_files[file_inner_path]["sha512"] != content["files"][file_inner_path].get("sha512"):
if (
file_inner_path in content["files"].keys()
and hashed_files[file_inner_path]["sha512"] != content["files"][file_inner_path].get("sha512")
self.log.debug("Changed files: %s" % changed_files)
@ -257,12 +284,15 @@ class ContentManager(object):
if inner_path == "content.json" and privatekey_address == # If signing using the root key sign the valid signers
new_content["signers_sign"] = CryptBitcoin.sign("%s:%s" % (new_content["signs_required"], ",".join(valid_signers)), privatekey)
if not new_content["signers_sign"]:"Old style address, signers_sign is none")
if not new_content["signers_sign"]:"Old style address, signers_sign is none")"Signing %s..." % inner_path)
if "signs" in new_content: del(new_content["signs"]) # Delete old signs
if "sign" in new_content: del(new_content["sign"]) # Delete old sign (backward compatibility)
if "signs" in new_content:
del(new_content["signs"]) # Delete old signs
if "sign" in new_content:
del(new_content["sign"]) # Delete old sign (backward compatibility)
sign_content = json.dumps(new_content, sort_keys=True)
sign = CryptBitcoin.sign(sign_content, privatekey)
@ -314,7 +344,8 @@ class ContentManager(object):
from Crypt import CryptBitcoin
rules = self.getRules(inner_path, content)
if not rules.get("cert_signers"): return True # Does not need cert
if not rules.get("cert_signers"):
return True # Does not need cert
name, domain = content["cert_user_id"].split("@")
cert_address = rules["cert_signers"].get(domain)
@ -328,7 +359,8 @@ class ContentManager(object):
def validContent(self, inner_path, content):
content_size = len(json.dumps(content)) + sum([file["size"] for file in content["files"].values()]) # Size of new content
site_size = self.getTotalSize(ignore=inner_path) + content_size # Site size without old content
if site_size >"size", 0):["size"] = site_size # Save to settings if larger
if site_size >"size", 0):["size"] = site_size # Save to settings if larger
site_size_limit = * 1024 * 1024
@ -340,7 +372,8 @@ class ContentManager(object):
return False
if inner_path == "content.json": return True # Root content.json is passed
if inner_path == "content.json":
return True # Root content.json is passed
# Load include details
rules = self.getRules(inner_path, content)
@ -355,7 +388,7 @@ class ContentManager(object):
return False
# Check if content includes allowed
if rules.get("includes_allowed") == False and content.get("includes"):
if rules.get("includes_allowed") is False and content.get("includes"):
self.log.error("%s: Includes not allowed" % inner_path)
return False # Includes not allowed
@ -390,11 +423,14 @@ class ContentManager(object):
# Check sign
sign = new_content.get("sign")
signs = new_content.get("signs", {})
if "sign" in new_content: del(new_content["sign"]) # The file signed without the sign
if "signs" in new_content: del(new_content["signs"]) # The file signed without the signs
if "sign" in new_content:
del(new_content["sign"]) # The file signed without the sign
if "signs" in new_content:
del(new_content["signs"]) # The file signed without the signs
sign_content = json.dumps(new_content, sort_keys=True) # Dump the json to string to remove whitepsace
if not self.validContent(inner_path, new_content): return False # Content not valid (files too large, invalid files)
if not self.validContent(inner_path, new_content):
return False # Content not valid (files too large, invalid files)
if signs: # New style signing
valid_signers = self.getValidSigners(inner_path, new_content)
@ -411,10 +447,10 @@ class ContentManager(object):
valid_signs = 0
for address in valid_signers:
if address in signs: valid_signs += CryptBitcoin.verify(sign_content, address, signs[address])
if valid_signs >= signs_required: break # Break if we has enough signs
if address in signs:
valid_signs += CryptBitcoin.verify(sign_content, address, signs[address])
if valid_signs >= signs_required:
break # Break if we has enough signs
return valid_signs >= signs_required
else: # Old style signing
@ -443,18 +479,17 @@ class ContentManager(object):
self.log.error("File not in content.json: %s" % inner_path)
return False
# Get dir from file
# Return: data/site/content.json -> data/site
def toDir(self, inner_path):
file_dir = re.sub("[^/]*?$", "", inner_path).strip("/")
if file_dir: file_dir += "/" # Add / at end if its not the root
if file_dir:
file_dir += "/" # Add / at end if its not the root
return file_dir
def testSign():
global config
from Config import config
from Site import Site
site = Site("12Hw8rTgzrNo4DSh2AkqwPRqDyTticwJyH")
content_manager = ContentManager(site)
@ -462,9 +497,7 @@ def testSign():
def testVerify():
from Config import config
from Site import Site
#site = Site("1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr")
site = Site("12Hw8rTgzrNo4DSh2AkqwPRqDyTticwJyH")
content_manager = ContentManager(site)
@ -478,7 +511,6 @@ def testVerify():
def testInfo():
from Config import config
from Site import Site
site = Site("12Hw8rTgzrNo4DSh2AkqwPRqDyTticwJyH")
@ -493,13 +525,12 @@ def testInfo():
if __name__ == "__main__":
import os, sys, logging
import sys
import logging
sys.path.insert(0, os.path.abspath("."))
sys.path.insert(0, os.path.abspath("src"))
from Debug import Debug
from Crypt import CryptHash
# testSign()

View File

@ -185,7 +185,7 @@ class Peer(object):
# Stop and remove from site
def remove(self):
self.log("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed))
if self.key in del([self.key])
if and self.key in del([self.key])
if self.connection:

View File

@ -242,7 +242,12 @@ class Wrapper
@setSiteInfo site_info
if site_info.settings.size > site_info.size_limit*1024*1024 # Site size too large and not displaying it yet
if @loading.screen_visible
@displayConfirm "Site is larger than allowed: #{(site_info.settings.size/1024/1024).toFixed(1)}MB/#{site_info.size_limit}MB", "Set limit to #{site_info.next_size_limit}MB", =>
@ws.cmd "siteSetLimit", [site_info.next_size_limit], (res) =>
@notifications.add("size_limit", "done", res, 5000)
if site_info.content
window.document.title = site_info.content.title+" - ZeroNet"
@ -286,8 +291,8 @@ class Wrapper
@loading.printLine "No peers found"
if not @site_info and not @loading.screen_visible and $("#inner-iframe").attr("src").indexOf("?") == -1 # First site info and mainpage
if site_info.size_limit < site_info.next_size_limit # Need upgrade soon
@wrapperConfirm "Running out of size limit (#{(site_info.settings.size/1024/1024).toFixed(1)}MB/#{site_info.size_limit}MB)", "Set limit to #{site_info.next_size_limit}MB", =>
if site_info.size_limit*1.1 < site_info.next_size_limit # Need upgrade soon
@actionConfirm "Running out of size limit (#{(site_info.settings.size/1024/1024).toFixed(1)}MB/#{site_info.size_limit}MB)", "Set limit to #{site_info.next_size_limit}MB", =>
@ws.cmd "siteSetLimit", [site_info.next_size_limit], (res) =>
@notifications.add("size_limit", "done", res, 5000)
return false

View File

@ -681,7 +681,6 @@ jQuery.extend( jQuery.easing,
/* ---- src/Ui/media/ ---- */
@ -1069,7 +1068,15 @@ jQuery.extend( jQuery.easing,
_this.address = site_info.address;
if (site_info.settings.size > site_info.size_limit * 1024 * 1024) {
if (_this.loading.screen_visible) {
} else {
_this.displayConfirm("Site is larger than allowed: " + ((site_info.settings.size / 1024 / 1024).toFixed(1)) + "MB/" + site_info.size_limit + "MB", "Set limit to " + site_info.next_size_limit + "MB", function() {
return"siteSetLimit", [site_info.next_size_limit], function(res) {
return _this.notifications.add("size_limit", "done", res, 5000);
if (site_info.content) {
window.document.title = site_info.content.title + " - ZeroNet";
@ -1118,8 +1125,8 @@ jQuery.extend( jQuery.easing,
if (!this.site_info && !this.loading.screen_visible && $("#inner-iframe").attr("src").indexOf("?") === -1) {
if (site_info.size_limit < site_info.next_size_limit) {
this.wrapperConfirm("Running out of size limit (" + ((site_info.settings.size / 1024 / 1024).toFixed(1)) + "MB/" + site_info.size_limit + "MB)", "Set limit to " + site_info.next_size_limit + "MB", (function(_this) {
if (site_info.size_limit * 1.1 < site_info.next_size_limit) {
this.actionConfirm("Running out of size limit (" + ((site_info.settings.size / 1024 / 1024).toFixed(1)) + "MB/" + site_info.size_limit + "MB)", "Set limit to " + site_info.next_size_limit + "MB", (function(_this) {
return function() {"siteSetLimit", [site_info.next_size_limit], function(res) {
return _this.notifications.add("size_limit", "done", res, 5000);