import json
import time
import sys
import os
import shutil
import re
import copy
import logging
import gevent
from Config import config
from Site import SiteManager
from Crypt import CryptBitcoin
from Debug import Debug
from util import QueryJson, RateLimit
from Plugin import PluginManager
from Translate import translate as _
from util import helper
from util import SafeRe
from util.Flag import flag
from Content.ContentManager import VerifyError, SignError
@PluginManager.acceptPlugins
class UiWebsocket(object):
def __init__(self, ws, site, server, user, request):
self.ws = ws
self.site = site
self.user = user
self.log = site.log
self.request = request
self.permissions = []
self.server = server
self.next_message_id = 1
self.waiting_cb = {} # Waiting for callback. Key: message_id, Value: function pointer
self.channels = [] # Channels joined to
self.state = {"sending": False} # Shared state of websocket connection
self.send_queue = [] # Messages to send to client
# Start listener loop
def start(self):
ws = self.ws
if self.site.address == config.homepage and not self.site.page_requested:
# Add open fileserver port message or closed port error to homepage at first request after start
self.site.page_requested = True # Dont add connection notification anymore
import main
file_server = main.file_server
if not file_server.port_opened or file_server.tor_manager.start_onions is None:
self.site.page_requested = False # Not ready yet, check next time
else:
try:
self.addHomepageNotifications()
except Exception as err:
self.log.error("Uncaught Exception: " + Debug.formatException(err))
for notification in self.site.notifications: # Send pending notification messages
# send via WebSocket
self.cmd("notification", notification)
# just in case, log them to terminal
if notification[0] == "error":
self.log.error("\n*** %s\n" % self.dedent(notification[1]))
self.site.notifications = []
while True:
try:
if ws.closed:
break
else:
message = ws.receive()
except Exception as err:
self.log.error("WebSocket receive error: %s" % Debug.formatException(err))
break
if message:
try:
req = json.loads(message)
self.handleRequest(req)
except Exception as err:
if config.debug: # Allow websocket errors to appear on /Debug
import main
main.DebugHook.handleError()
self.log.error("WebSocket handleRequest error: %s \n %s" % (Debug.formatException(err), message))
if not self.hasPlugin("Multiuser"):
self.cmd("error", "Internal error: %s" % Debug.formatException(err, "html"))
self.onClosed()
def onClosed(self):
pass
def dedent(self, text):
return re.sub("[\\r\\n\\x20\\t]+", " ", text.strip().replace("
", " "))
def addHomepageNotifications(self):
if not(self.hasPlugin("Multiuser")) and not(self.hasPlugin("UiPassword")):
bind_ip = getattr(config, "ui_ip", "")
whitelist = getattr(config, "ui_restrict", [])
# binds to the Internet, no IP whitelist, no UiPassword, no Multiuser
if ("0.0.0.0" == bind_ip or "*" == bind_ip) and (not whitelist):
self.site.notifications.append([
"error",
_("You are not going to set up a public gateway. However, your Web UI is
" +
"open to the whole Internet. " +
"Please check your configuration.")
])
def hasPlugin(self, name):
return name in PluginManager.plugin_manager.plugin_names
# Has permission to run the command
def hasCmdPermission(self, cmd):
flags = flag.db.get(self.getCmdFuncName(cmd), ())
if "admin" in flags and "ADMIN" not in self.permissions:
return False
else:
return True
# Has permission to access a site
def hasSitePermission(self, address, cmd=None):
if address != self.site.address and "ADMIN" not in self.site.settings["permissions"]:
return False
else:
return True
def hasFilePermission(self, inner_path):
valid_signers = self.site.content_manager.getValidSigners(inner_path)
return self.site.settings["own"] or self.user.getAuthAddress(self.site.address) in valid_signers
# Event in a channel
def event(self, channel, *params):
if channel in self.channels: # We are joined to channel
if channel == "siteChanged":
site = params[0]
site_info = self.formatSiteInfo(site, create_user=False)
if len(params) > 1 and params[1]: # Extra data
site_info.update(params[1])
self.cmd("setSiteInfo", site_info)
elif channel == "serverChanged":
server_info = self.formatServerInfo()
if len(params) > 0 and params[0]: # Extra data
server_info.update(params[0])
self.cmd("setServerInfo", server_info)
elif channel == "announcerChanged":
site = params[0]
announcer_info = self.formatAnnouncerInfo(site)
if len(params) > 1 and params[1]: # Extra data
announcer_info.update(params[1])
self.cmd("setAnnouncerInfo", announcer_info)
# Send response to client (to = message.id)
def response(self, to, result):
self.send({"cmd": "response", "to": to, "result": result})
# Send a command
def cmd(self, cmd, params={}, cb=None):
self.send({"cmd": cmd, "params": params}, cb)
# Encode to json and send message
def send(self, message, cb=None):
message["id"] = self.next_message_id # Add message id to allow response
self.next_message_id += 1
if cb: # Callback after client responded
self.waiting_cb[message["id"]] = cb
self.send_queue.append(message)
if self.state["sending"]:
return # Already sending
try:
while self.send_queue:
self.state["sending"] = True
message = self.send_queue.pop(0)
self.ws.send(json.dumps(message))
self.state["sending"] = False
except Exception as err:
self.log.debug("Websocket send error: %s" % Debug.formatException(err))
self.state["sending"] = False
def getPermissions(self, req_id):
permissions = self.site.settings["permissions"]
if req_id >= 1000000: # Its a wrapper command, allow admin commands
permissions = permissions[:]
permissions.append("ADMIN")
return permissions
def asyncWrapper(self, func):
def asyncErrorWatcher(func, *args, **kwargs):
try:
result = func(*args, **kwargs)
if result is not None:
self.response(args[0], result)
except Exception as err:
if config.debug: # Allow websocket errors to appear on /Debug
import main
main.DebugHook.handleError()
self.log.error("WebSocket handleRequest error: %s" % Debug.formatException(err))
self.cmd("error", "Internal error: %s" % Debug.formatException(err, "html"))
def wrapper(*args, **kwargs):
gevent.spawn(asyncErrorWatcher, func, *args, **kwargs)
return wrapper
def getCmdFuncName(self, cmd):
func_name = "action" + cmd[0].upper() + cmd[1:]
return func_name
# Handle incoming messages
def handleRequest(self, req):
cmd = req.get("cmd")
params = req.get("params")
self.permissions = self.getPermissions(req["id"])
if cmd == "response": # It's a response to a command
return self.actionResponse(req["to"], req["result"])
else: # Normal command
func_name = self.getCmdFuncName(cmd)
func = getattr(self, func_name, None)
if not func: # Unknown command
return self.response(req["id"], {"error": "Unknown command: %s" % cmd})
if not self.hasCmdPermission(cmd): # Admin commands
return self.response(req["id"], {"error": "You don't have permission to run %s" % cmd})
# Execute in parallel
func_flags = flag.db.get(self.getCmdFuncName(cmd), ())
if func_flags and "async_run" in func_flags:
func = self.asyncWrapper(func)
# Support calling as named, unnamed parameters and raw first argument too
if type(params) is dict:
result = func(req["id"], **params)
elif type(params) is list:
result = func(req["id"], *params)
elif params:
result = func(req["id"], params)
else:
result = func(req["id"])
if result is not None:
self.response(req["id"], result)
# Format site info
def formatSiteInfo(self, site, create_user=True):
content = site.content_manager.contents.get("content.json", {})
if content: # Remove unnecessary data transfer
content = content.copy()
content["files"] = len(content.get("files", {}))
content["files_optional"] = len(content.get("files_optional", {}))
content["includes"] = len(content.get("includes", {}))
if "sign" in content:
del(content["sign"])
if "signs" in content:
del(content["signs"])
if "signers_sign" in content:
del(content["signers_sign"])
settings = site.settings.copy()
del settings["wrapper_key"] # Dont expose wrapper key
del settings["auth_key"] # Dont send auth key twice
ret = {
"auth_key": self.site.settings["auth_key"], # Obsolete, will be removed
"auth_address": self.user.getAuthAddress(site.address, create=create_user),
"cert_user_id": self.user.getCertUserId(site.address),
"address": site.address,
"address_short": site.address_short,
"settings": settings,
"content_updated": site.content_updated,
"bad_files": len(site.bad_files),
"size_limit": site.getSizeLimit(),
"next_size_limit": site.getNextSizeLimit(),
"peers": max(site.settings.get("peers", 0), len(site.peers)),
"started_task_num": site.worker_manager.started_task_num,
"tasks": len(site.worker_manager.tasks),
"workers": len(site.worker_manager.workers),
"content": content
}
if site.settings["own"]:
ret["privatekey"] = bool(self.user.getSiteData(site.address, create=create_user).get("privatekey"))
if site.isServing() and content:
ret["peers"] += 1 # Add myself if serving
return ret
def formatServerInfo(self):
import main
file_server = main.file_server
if file_server.port_opened == {}:
ip_external = None
else:
ip_external = any(file_server.port_opened.values())
back = {
"ip_external": ip_external,
"port_opened": file_server.port_opened,
"platform": sys.platform,
"fileserver_ip": config.fileserver_ip,
"fileserver_port": config.fileserver_port,
"tor_enabled": file_server.tor_manager.enabled,
"tor_status": file_server.tor_manager.status,
"tor_has_meek_bridges": file_server.tor_manager.has_meek_bridges,
"tor_use_bridges": config.tor_use_bridges,
"ui_ip": config.ui_ip,
"ui_port": config.ui_port,
"version": config.version,
"rev": config.rev,
"timecorrection": file_server.timecorrection,
"language": config.language,
"debug": config.debug,
"offline": config.offline,
"plugins": PluginManager.plugin_manager.plugin_names,
"plugins_rev": PluginManager.plugin_manager.plugins_rev,
"user_settings": self.user.settings
}
if "ADMIN" in self.site.settings["permissions"]:
back["updatesite"] = config.updatesite
back["dist_type"] = config.dist_type
back["lib_verify_best"] = CryptBitcoin.lib_verify_best
return back
def formatAnnouncerInfo(self, site):
return {"address": site.address, "stats": site.announcer.stats}
# - Actions -
def actionAs(self, to, address, cmd, params=[]):
if not self.hasSitePermission(address, cmd=cmd):
return self.response(to, "No permission for site %s" % address)
req_self = copy.copy(self)
req_self.site = self.server.sites.get(address)
req_self.hasCmdPermission = self.hasCmdPermission # Use the same permissions as current site
req_obj = super(UiWebsocket, req_self)
req = {"id": to, "cmd": cmd, "params": params}
req_obj.handleRequest(req)
# Do callback on response {"cmd": "response", "to": message_id, "result": result}
def actionResponse(self, to, result):
if to in self.waiting_cb:
self.waiting_cb[to](result) # Call callback function
else:
self.log.error("Websocket callback not found: %s, %s" % (to, result))
# Send a simple pong answer
def actionPing(self, to):
self.response(to, "pong")
# Send site details
def actionSiteInfo(self, to, file_status=None):
ret = self.formatSiteInfo(self.site)
if file_status: # Client queries file status
if self.site.storage.isFile(file_status): # File exist, add event done
ret["event"] = ("file_done", file_status)
self.response(to, ret)
def actionSiteBadFiles(self, to):
return list(self.site.bad_files.keys())
# Join to an event channel
def actionChannelJoin(self, to, channels):
if type(channels) != list:
channels = [channels]
for channel in channels:
if channel not in self.channels:
self.channels.append(channel)
self.response(to, "ok")
# Server variables
def actionServerInfo(self, to):
back = self.formatServerInfo()
self.response(to, back)
# Create a new wrapper nonce that allows to load html file
@flag.admin
def actionServerGetWrapperNonce(self, to):
wrapper_nonce = self.request.getWrapperNonce()
self.response(to, wrapper_nonce)
def actionAnnouncerInfo(self, to):
back = self.formatAnnouncerInfo(self.site)
self.response(to, back)
@flag.admin
def actionAnnouncerStats(self, to):
back = {}
trackers = self.site.announcer.getTrackers()
for site in list(self.server.sites.values()):
for tracker, stats in site.announcer.stats.items():
if tracker not in trackers:
continue
if tracker not in back:
back[tracker] = {}
is_latest_data = bool(stats["time_request"] > back[tracker].get("time_request", 0) and stats["status"])
for key, val in stats.items():
if key.startswith("num_"):
back[tracker][key] = back[tracker].get(key, 0) + val
elif is_latest_data:
back[tracker][key] = val
return back
# Sign content.json
def actionSiteSign(self, to, privatekey=None, inner_path="content.json", remove_missing_optional=False, update_changed_files=False, response_ok=True):
self.log.debug("Signing: %s" % inner_path)
site = self.site
extend = {} # Extended info for signing
# Change to the file's content.json
file_info = site.content_manager.getFileInfo(inner_path)
if not inner_path.endswith("content.json"):
if not file_info:
raise Exception("Invalid content.json file: %s" % inner_path)
inner_path = file_info["content_inner_path"]
# Add certificate to user files
is_user_content = file_info and ("cert_signers" in file_info or "cert_signers_pattern" in file_info)
if is_user_content and privatekey is None:
cert = self.user.getCert(self.site.address)
extend["cert_auth_type"] = cert["auth_type"]
extend["cert_user_id"] = self.user.getCertUserId(site.address)
extend["cert_sign"] = cert["cert_sign"]
self.log.debug("Extending content.json with cert %s" % extend["cert_user_id"])
if not self.hasFilePermission(inner_path):
self.log.error("SiteSign error: you don't own this site & site owner doesn't allow you to do so.")
return self.response(to, {"error": "Forbidden, you can only modify your own sites"})
if privatekey == "stored": # Get privatekey from sites.json
privatekey = self.user.getSiteData(self.site.address).get("privatekey")
if not privatekey:
self.cmd("notification", ["error", _["Content signing failed"] + "
Private key not found in sites.json "])
self.response(to, {"error": "Site sign failed: Private key not stored."})
self.log.error("Site sign failed: %s: Private key not stored in sites.json" % inner_path)
return
if not privatekey: # Get privatekey from users.json auth_address
privatekey = self.user.getAuthPrivatekey(self.site.address)
# Signing
# Reload content.json, ignore errors to make it up-to-date
site.content_manager.loadContent(inner_path, add_bad_files=False, force=True)
# Sign using private key sent by user
try:
site.content_manager.sign(inner_path, privatekey, extend=extend, update_changed_files=update_changed_files, remove_missing_optional=remove_missing_optional)
except (VerifyError, SignError) as err:
self.cmd("notification", ["error", _["Content signing failed"] + "
%s" % err])
self.response(to, {"error": "Site sign failed: %s" % err})
self.log.error("Site sign failed: %s: %s" % (inner_path, Debug.formatException(err)))
return
except Exception as err:
self.cmd("notification", ["error", _["Content signing error"] + "
%s" % Debug.formatException(err)])
self.response(to, {"error": "Site sign error: %s" % Debug.formatException(err)})
self.log.error("Site sign error: %s: %s" % (inner_path, Debug.formatException(err)))
return
site.content_manager.loadContent(inner_path, add_bad_files=False) # Load new content.json, ignore errors
if update_changed_files:
self.site.updateWebsocket(file_done=inner_path)
if response_ok:
self.response(to, "ok")
else:
return inner_path
# Sign and publish content.json
def actionSitePublish(self, to, privatekey=None, inner_path="content.json", sign=True, remove_missing_optional=False, update_changed_files=False):
if sign:
inner_path = self.actionSiteSign(
to, privatekey, inner_path, response_ok=False,
remove_missing_optional=remove_missing_optional, update_changed_files=update_changed_files
)
if not inner_path:
return
# Publishing
if not self.site.settings["serving"]: # Enable site if paused
self.site.settings["serving"] = True
self.site.saveSettings()
self.site.announce()
if inner_path not in self.site.content_manager.contents:
return self.response(to, {"error": "File %s not found" % inner_path})
event_name = "publish %s %s" % (self.site.address, inner_path)
called_instantly = RateLimit.isAllowed(event_name, 30)
thread = RateLimit.callAsync(event_name, 30, self.doSitePublish, self.site, inner_path) # Only publish once in 30 seconds
notification = "linked" not in dir(thread) # Only display notification on first callback
thread.linked = True
if called_instantly: # Allowed to call instantly
# At the end callback with request id and thread
self.cmd("progress", ["publish", _["Content published to {0}/{1} peers."].format(0, 5), 0])
thread.link(lambda thread: self.cbSitePublish(to, self.site, thread, notification, callback=notification))
else:
self.cmd(
"notification",
["info", _["Content publish queued for {0:.0f} seconds."].format(RateLimit.delayLeft(event_name, 30)), 5000]
)
self.response(to, "ok")
# At the end display notification
thread.link(lambda thread: self.cbSitePublish(to, self.site, thread, notification, callback=False))
def doSitePublish(self, site, inner_path):
def cbProgress(published, limit):
progress = int(float(published) / limit * 100)
self.cmd("progress", [
"publish",
_["Content published to {0}/{1} peers."].format(published, limit),
progress
])
diffs = site.content_manager.getDiffs(inner_path)
back = site.publish(limit=5, inner_path=inner_path, diffs=diffs, cb_progress=cbProgress)
if back == 0: # Failed to publish to anyone
self.cmd("progress", ["publish", _["Content publish failed."], -100])
else:
cbProgress(back, back)
return back
# Callback of site publish
def cbSitePublish(self, to, site, thread, notification=True, callback=True):
published = thread.value
if published > 0: # Successfully published
if notification:
# self.cmd("notification", ["done", _["Content published to {0} peers."].format(published), 5000])
site.updateWebsocket() # Send updated site data to local websocket clients
if callback:
self.response(to, "ok")
else:
if len(site.peers) == 0:
import main
if any(main.file_server.port_opened.values()) or main.file_server.tor_manager.start_onions:
if notification:
self.cmd("notification", ["info", _["No peers found, but your content is ready to access."]])
if callback:
self.response(to, "ok")
else:
if notification:
self.cmd("notification", [
"info",
_("""{_[Your network connection is restricted. Please, open {0} port]}
{_[on your router to make your site accessible for everyone.]}""").format(config.fileserver_port)
])
if callback:
self.response(to, {"error": "Port not opened."})
else:
if notification:
self.response(to, {"error": "Content publish failed."})
def actionSiteReload(self, to, inner_path):
self.site.content_manager.loadContent(inner_path, add_bad_files=False)
self.site.storage.verifyFiles(quick_check=True)
self.site.updateWebsocket()
return "ok"
# Write a file to disk
def actionFileWrite(self, to, inner_path, content_base64, ignore_bad_files=False):
valid_signers = self.site.content_manager.getValidSigners(inner_path)
auth_address = self.user.getAuthAddress(self.site.address)
if not self.hasFilePermission(inner_path):
self.log.error("FileWrite forbidden %s not in valid_signers %s" % (auth_address, valid_signers))
return self.response(to, {"error": "Forbidden, you can only modify your own files"})
# Try not to overwrite files currently in sync
content_inner_path = re.sub("^(.*)/.*?$", "\\1/content.json", inner_path) # Also check the content.json from same directory
if (self.site.bad_files.get(inner_path) or self.site.bad_files.get(content_inner_path)) and not ignore_bad_files:
found = self.site.needFile(inner_path, update=True, priority=10)
if not found:
self.cmd(
"confirm",
[_["This file still in sync, if you write it now, then the previous content may be lost."], _["Write content anyway"]],
lambda res: self.actionFileWrite(to, inner_path, content_base64, ignore_bad_files=True)
)
return False
try:
import base64
content = base64.b64decode(content_base64)
# Save old file to generate patch later
if (
inner_path.endswith(".json") and not inner_path.endswith("content.json") and
self.site.storage.isFile(inner_path) and not self.site.storage.isFile(inner_path + "-old")
):
try:
self.site.storage.rename(inner_path, inner_path + "-old")
except Exception:
# Rename failed, fall back to standard file write
f_old = self.site.storage.open(inner_path, "rb")
f_new = self.site.storage.open(inner_path + "-old", "wb")
shutil.copyfileobj(f_old, f_new)
self.site.storage.write(inner_path, content)
except Exception as err:
self.log.error("File write error: %s" % Debug.formatException(err))
return self.response(to, {"error": "Write error: %s" % Debug.formatException(err)})
if inner_path.endswith("content.json"):
self.site.content_manager.loadContent(inner_path, add_bad_files=False, force=True)
self.response(to, "ok")
# Send sitechanged to other local users
for ws in self.site.websockets:
if ws != self:
ws.event("siteChanged", self.site, {"event": ["file_done", inner_path]})
def actionFileDelete(self, to, inner_path):
if not self.hasFilePermission(inner_path):
self.log.error("File delete error: you don't own this site & you are not approved by the owner.")
return self.response(to, {"error": "Forbidden, you can only modify your own files"})
need_delete = True
file_info = self.site.content_manager.getFileInfo(inner_path)
if file_info and file_info.get("optional"):
# Non-existing optional files won't be removed from content.json, so we have to do it manually
self.log.debug("Deleting optional file: %s" % inner_path)
relative_path = file_info["relative_path"]
content_json = self.site.storage.loadJson(file_info["content_inner_path"])
if relative_path in content_json.get("files_optional", {}):
del content_json["files_optional"][relative_path]
self.site.storage.writeJson(file_info["content_inner_path"], content_json)
self.site.content_manager.loadContent(file_info["content_inner_path"], add_bad_files=False, force=True)
need_delete = self.site.storage.isFile(inner_path) # File sill exists after removing from content.json (owned site)
if need_delete:
try:
self.site.storage.delete(inner_path)
except Exception as err:
self.log.error("File delete error: %s" % err)
return self.response(to, {"error": "Delete error: %s" % err})
self.response(to, "ok")
# Send sitechanged to other local users
for ws in self.site.websockets:
if ws != self:
ws.event("siteChanged", self.site, {"event": ["file_deleted", inner_path]})
# Find data in json files
def actionFileQuery(self, to, dir_inner_path, query=None):
# s = time.time()
dir_path = self.site.storage.getPath(dir_inner_path)
rows = list(QueryJson.query(dir_path, query or ""))
# self.log.debug("FileQuery %s %s done in %s" % (dir_inner_path, query, time.time()-s))
return self.response(to, rows)
# List files in directory
@flag.async_run
def actionFileList(self, to, inner_path):
try:
return list(self.site.storage.walk(inner_path))
except Exception as err:
self.log.error("fileList %s error: %s" % (inner_path, Debug.formatException(err)))
return {"error": Debug.formatExceptionMessage(err)}
# List directories in a directory
@flag.async_run
def actionDirList(self, to, inner_path):
try:
return list(self.site.storage.list(inner_path))
except Exception as err:
self.log.error("dirList %s error: %s" % (inner_path, Debug.formatException(err)))
return {"error": Debug.formatExceptionMessage(err)}
# Sql query
def actionDbQuery(self, to, query, params=None, wait_for=None):
if config.debug or config.verbose:
s = time.time()
rows = []
try:
res = self.site.storage.query(query, params)
except Exception as err: # Response the error to client
self.log.error("DbQuery error: %s" % err)
return self.response(to, {"error": Debug.formatExceptionMessage(err)})
# Convert result to dict
for row in res:
rows.append(dict(row))
if config.verbose and time.time() - s > 0.1: # Log slow query
self.log.debug("Slow query: %s (%.3fs)" % (query, time.time() - s))
return self.response(to, rows)
# Return file content
@flag.async_run
def actionFileGet(self, to, inner_path, required=True, format="text", timeout=300, priority=6):
try:
if required or inner_path in self.site.bad_files:
with gevent.Timeout(timeout):
self.site.needFile(inner_path, priority=priority)
body = self.site.storage.read(inner_path, "rb")
except (Exception, gevent.Timeout) as err:
self.log.error("%s fileGet error: %s" % (inner_path, Debug.formatException(err)))
body = None
if not body:
body = None
elif format == "base64":
import base64
body = base64.b64encode(body).decode()
else:
body = body.decode()
self.response(to, body)
@flag.async_run
def actionFileNeed(self, to, inner_path, timeout=300, priority=6):
try:
with gevent.Timeout(timeout):
self.site.needFile(inner_path, priority=priority)
except (Exception, gevent.Timeout) as err:
return self.response(to, {"error": Debug.formatExceptionMessage(err)})
return self.response(to, "ok")
def actionFileRules(self, to, inner_path, use_my_cert=False, content=None):
if not content: # No content defined by function call
content = self.site.content_manager.contents.get(inner_path)
if not content: # File not created yet
cert = self.user.getCert(self.site.address)
if cert and cert["auth_address"] in self.site.content_manager.getValidSigners(inner_path):
# Current selected cert if valid for this site, add it to query rules
content = {}
content["cert_auth_type"] = cert["auth_type"]
content["cert_user_id"] = self.user.getCertUserId(self.site.address)
content["cert_sign"] = cert["cert_sign"]
rules = self.site.content_manager.getRules(inner_path, content)
if inner_path.endswith("content.json") and rules:
if content:
rules["current_size"] = len(json.dumps(content)) + sum([file["size"] for file in list(content.get("files", {}).values())])
else:
rules["current_size"] = 0
return self.response(to, rules)
# Add certificate to user
def actionCertAdd(self, to, domain, auth_type, auth_user_name, cert):
try:
res = self.user.addCert(self.user.getAuthAddress(self.site.address), domain, auth_type, auth_user_name, cert)
if res is True:
self.cmd(
"notification",
["done", _("{_[New certificate added]:} {auth_type}/{auth_user_name}@{domain}.")]
)
self.user.setCert(self.site.address, domain)
self.site.updateWebsocket(cert_changed=domain)
self.response(to, "ok")
elif res is False:
# Display confirmation of change
cert_current = self.user.certs[domain]
body = _("{_[Your current certificate]:} {cert_current[auth_type]}/{cert_current[auth_user_name]}@{domain}")
self.cmd(
"confirm",
[body, _("Change it to {auth_type}/{auth_user_name}@{domain}")],
lambda res: self.cbCertAddConfirm(to, domain, auth_type, auth_user_name, cert)
)
else:
self.response(to, "Not changed")
except Exception as err:
self.log.error("CertAdd error: Exception - %s (%s)" % (err.message, Debug.formatException(err)))
self.response(to, {"error": err.message})
def cbCertAddConfirm(self, to, domain, auth_type, auth_user_name, cert):
self.user.deleteCert(domain)
self.user.addCert(self.user.getAuthAddress(self.site.address), domain, auth_type, auth_user_name, cert)
self.cmd(
"notification",
["done", _("Certificate changed to: {auth_type}/{auth_user_name}@{domain}.")]
)
self.user.setCert(self.site.address, domain)
self.site.updateWebsocket(cert_changed=domain)
self.response(to, "ok")
# Select certificate for site
def actionCertSelect(self, to, accepted_domains=[], accept_any=False, accepted_pattern=None):
accounts = []
accounts.append(["", _["No certificate"], ""]) # Default option
active = "" # Make it active if no other option found
# Add my certs
auth_address = self.user.getAuthAddress(self.site.address) # Current auth address
site_data = self.user.getSiteData(self.site.address) # Current auth address
if not accepted_domains and not accepted_pattern: # Accept any if no filter defined
accept_any = True
for domain, cert in list(self.user.certs.items()):
if auth_address == cert["auth_address"] and domain == site_data.get("cert"):
active = domain
title = cert["auth_user_name"] + "@" + domain
accepted_pattern_match = accepted_pattern and SafeRe.match(accepted_pattern, domain)
if domain in accepted_domains or accept_any or accepted_pattern_match:
accounts.append([domain, title, ""])
else:
accounts.append([domain, title, "disabled"])
# Render the html
body = "" + _["Select account you want to use in this site:"] + ""
# Accounts
for domain, account, css_class in accounts:
if domain == active:
css_class += " active" # Currently selected option
title = _("%s ({_[currently selected]})") % account
else:
title = "%s" % account
body += "%s" % (css_class, domain, title)
# More available providers
more_domains = [domain for domain in accepted_domains if domain not in self.user.certs] # Domains we not displayed yet
if more_domains:
# body+= "Accepted authorization providers by the site:"
body += "