Split up code similarly to what pysogs does

This commit is contained in:
Jason Rhinelander 2021-11-22 09:46:21 -04:00
parent 91f2dc3911
commit 6633a7918b
21 changed files with 547 additions and 409 deletions

31
.drone.jsonnet Normal file
View File

@ -0,0 +1,31 @@
local docker_base = 'registry.oxen.rocks/lokinet-ci-';
local apt_get_quiet = 'apt-get -o=Dpkg::Use-Pty=0 -q';
[
{
name: 'Lint checks',
kind: 'pipeline',
type: 'docker',
platform: { arch: 'amd64' },
steps: [
{
name: 'Formatting',
image: docker_base + 'debian-stable',
commands: [
'echo "Running on ${DRONE_STAGE_MACHINE}"',
apt_get_quiet + ' install -y black',
'black --check --diff --color .',
],
},
{
name: 'Flake8',
image: docker_base + 'debian-stable',
commands: [
'echo "Running on ${DRONE_STAGE_MACHINE}"',
apt_get_quiet + ' install -y flake8',
'flake8 .',
],
},
],
},
]

3
.flake8 Normal file
View File

@ -0,0 +1,3 @@
[flake8]
max-line-length = 100
exclude=libonionrequests,fileserver/config.py

4
.gitignore vendored
View File

@ -1 +1,3 @@
/config.py
/fileserver/config.py
__pycache__
/key_x25519

View File

@ -47,6 +47,9 @@ Everything is stored in PostgreSQL; no local file storage is used at all.
pip3 install psycopg psycopg_pool # Or as above, once these enter Debian/Ubuntu
```
You may optionally also install `psycopg_c` for some performance improvements; this likely
requires first installing libpq-dev for postgresql headers.
2. Build the required oxen Python modules:
make
@ -74,8 +77,8 @@ Everything is stored in PostgreSQL; no local file storage is used at all.
# The 'sessionfiles' database is now ready to go.
```
4. Copy `config.py.sample` to `config.py` and edit as needed. In particular you'll need to edit the
`pgsql_connect_opts` variable to specify database connection parameters.
4. Copy `fileserver/config.py.sample` to `fileserver/config.py` and edit as needed. In particular
you'll need to edit the `pgsql_connect_opts` variable to specify database connection parameters.
5. Set up the application to run via wsgi. The setup I use is:
@ -94,7 +97,7 @@ Everything is stored in PostgreSQL; no local file storage is used at all.
plugins = python3,logfile
processes = 4
manage-script-name = true
mount = /=fileserver:app
mount = /=fileserver.web:app
logger = file:logfile=/home/YOURUSER/session-file-server/sfs.log
```
@ -108,6 +111,8 @@ Everything is stored in PostgreSQL; no local file storage is used at all.
sudo chown YOURUSER:www-data /etc/uwsgi-emperor/vassals/sfs.ini
```
(For nginx, you may need to change `www-data` to `nginx` in the above command).
Because of the configuration you added in step 5, the ownership of the `sfs.ini` determines the
user and group the program runs as. Also note that uwsgi sensibly refuses to run as root, but if
you are contemplating running this program in the first place then hopefully you knew not to do
@ -140,6 +145,6 @@ Everything is stored in PostgreSQL; no local file storage is used at all.
9. Restart the web server and UWSGI emperor: `systemctl restart nginx uwsgi-emperor`
10. In the future, if you update the file server code and want to restart it, you can just `touch
/etc/uwsgi-emperor/vassals/` — uwsgi-emperor watches the files for modifications and restarts
gracefully upon modifications (or in this case simply touching, which updates the file's
modification time without changing its content).
/etc/uwsgi-emperor/vassals/sfs.ini` — uwsgi-emperor watches the files for modifications and
restarts gracefully upon modifications (or in this case simply touching, which updates the
file's modification time without changing its content).

View File

@ -1,368 +0,0 @@
#!/usr/bin/env python3
import flask
from flask import request, g
from datetime import datetime
import base64
import coloredlogs
from hashlib import blake2b
import json
import logging
import psycopg
from psycopg_pool import ConnectionPool
import requests
import secrets
from werkzeug.local import LocalProxy
import io
import os
import nacl.public
import pyonionreq.junk
import config
from timer import timer
from postfork import postfork
from stats import log_stats
# error status codes:
HTTP_ERROR_PAYLOAD_TOO_LARGE = 413
HTTP_ERROR_INSUFFICIENT_STORAGE = 507
HTTP_ERROR_INTERNAL_SERVER_ERROR = 500
HTTP_BAD_GATEWAY = 502
HTTP_BAD_REQUEST = 400
HTTP_NOT_FOUND = 404
coloredlogs.install(level=config.log_level, milliseconds=True, isatty=True)
if config.BACKWARDS_COMPAT_IDS:
assert all(x in (0, 1) for x in config.BACKWARDS_COMPAT_IDS_FIXED_BITS)
BACKWARDS_COMPAT_MSB = sum(
y << x for x, y in enumerate(reversed(config.BACKWARDS_COMPAT_IDS_FIXED_BITS)))
BACKWARDS_COMPAT_RANDOM_BITS = 53 - len(config.BACKWARDS_COMPAT_IDS_FIXED_BITS)
if os.path.exists('key_x25519'):
with open('key_x25519', 'rb') as f:
key = f.read()
if len(key) != 32:
raise RuntimeError("Invalid key_x25519: expected 32 bytes, not {} bytes".format(len(key)))
privkey = nacl.public.PrivateKey(key)
else:
privkey = nacl.public.PrivateKey.generate()
with open('key_x25519', 'wb') as f:
f.write(privkey.encode())
logging.info("File server pubkey: {}".format(
privkey.public_key.encode(encoder=nacl.encoding.HexEncoder).decode()))
onionparser = pyonionreq.junk.Parser(pubkey=privkey.public_key.encode(), privkey=privkey.encode())
app = flask.Flask(__name__)
@postfork
def pg_connect():
global psql_pool
psql_pool = ConnectionPool(min_size=2, max_size=32, kwargs={**config.pgsql_connect_opts, "autocommit": True})
psql_pool.wait()
def get_psql_conn():
if 'psql' not in g:
g.psql = psql_pool.getconn()
return g.psql
@app.teardown_appcontext
def release_psql_conn(exception):
psql = g.pop('psql', None)
if psql is not None:
psql_pool.putconn(psql)
psql = LocalProxy(get_psql_conn)
last_stats_printed = None
@timer(15)
def periodic(signum):
with app.app_context(), psql.cursor() as cur:
logging.debug("Cleaning up expired files")
cur.execute("DELETE FROM files WHERE expiry <= NOW()")
# NB: we do this infrequently (once every 30 minutes, per project) because Github rate
# limits if you make more than 60 requests in an hour.
# Limit to 1 because, if there are more than 1 outdated, it doesn't hurt anything to delay
# the next one by 30 seconds (and avoids triggering github rate limiting).
cur.execute("""
SELECT project, version FROM release_versions
WHERE updated < NOW() + '30 minutes ago' LIMIT 1""")
row = cur.fetchone()
if row:
project, old_v = row
v = requests.get(
'https://api.github.com/repos/{}/releases/latest'.format(project),
timeout=5
).json()['tag_name']
if v != old_v:
logging.info("{} latest release version changed from {} to {}".format(
project, old_v, v))
cur.execute("""
UPDATE release_versions SET updated = NOW(), version = %s
WHERE project = %s""", (v, project))
now = datetime.now()
global last_stats_printed
if last_stats_printed is None or (now - last_stats_printed).total_seconds() >= 3600:
print("wtf now={}, lsp={}".format(now, last_stats_printed))
log_stats(cur)
last_stats_printed = now
def json_resp(data, status=200):
"""Takes data and optionally an HTTP status, returns it as a json response."""
return flask.Response(
json.dumps(data),
status=code,
mimetype='application/json')
def error_resp(code):
"""
Simple JSON error response to send back, embedded as `status_code` and also as the HTTP response
code.
"""
return json_resp({'status_code': code}, code)
def generate_file_id(data):
"""
Generate a file ID by blake2b hashing the file body, then using a 33-byte digest encoded into 44
base64 chars. (Ideally would be 32, but that would result in base64 padding, so increased to 33
to fit perfectly).
"""
return base64.urlsafe_b64encode(
blake2b(data, digest_size=33, salt=b'SessionFileSvr\0\0').digest()).decode()
@app.post('/file')
def submit_file(*, body=None, deprecated=False):
if body is None:
body = request.data
if not 0 < len(body) <= config.MAX_FILE_SIZE:
logging.warn("Rejecting upload of size {} ∉ (0, {}]".format(
len(body), config.MAX_FILE_SIZE))
return error_resp(HTTP_ERROR_PAYLOAD_TOO_LARGE)
id = None
try:
if config.BACKWARDS_COMPAT_IDS:
done = False
for attempt in range(25):
id = (BACKWARDS_COMPAT_MSB << BACKWARDS_COMPAT_RANDOM_BITS
| secrets.randbits(BACKWARDS_COMPAT_RANDOM_BITS))
if not deprecated:
id = str(id) # New ids are always strings; legacy requests require an integer
try:
with psql.cursor() as cur:
cur.execute(
"INSERT INTO files (id, data, expiry) VALUES (%s, %s, NOW() + %s)",
(id, body, config.FILE_EXPIRY))
except psycopg.errors.UniqueViolation:
continue
done = True
break
if not done:
logging.error(
"Tried 25 random IDs and got all constraint failures, something getting wrong!")
return error_resp(HTTP_ERROR_INSUFFICIENT_STORAGE)
else:
with psql.transaction(), psql.cursor() as cur:
id = generate_file_id(body)
try:
# Don't pass the data yet because we might be de-duplicating
with psql.transaction():
cur.execute(
"INSERT INTO files (id, data, expiry) VALUES (%s, '', NOW() + %s)",
(id, config.FILE_EXPIRY))
except psycopg.errors.UniqueViolation:
# Found a duplicate id, so de-duplicate by just refreshing the expiry
cur.execute(
"UPDATE files SET uploaded = NOW(), expiry = NOW() + %s WHERE id = %s",
(config.FILE_EXPIRY, id))
else:
cur.execute("UPDATE files SET data = %s WHERE id = %s", (body, id))
except Exception as e:
logging.error("Failed to insert file: {}".format(e))
return error_resp(HTTP_ERROR_INTERNAL_SERVER_ERROR)
response = {"id": id}
if deprecated:
response['status_code'] = 200
return json_resp(response)
@app.post('/files')
def submit_file_old():
input = request.json()
if input is None or 'file' not in input:
logging.warn("Invalid request: did not find json with a 'file' property")
return error_resp(HTTP_BAD_REQUEST)
body = input['file']
if not 0 < len(body) <= config.MAX_FILE_SIZE_B64:
logging.warn("Rejecting upload of b64-encoded size {} ∉ (0, {}]".format(
len(body), config.MAX_FILE_SIZE_B64))
return error_resp(HTTP_ERROR_PAYLOAD_TOO_LARGE)
# base64.b64decode is picky about padding (but not, by default, about random non-alphabet
# characters in the middle of the data, wtf!)
while len(body) % 4 != 0:
body += '='
body = base64.b64decode(body, validate=True)
return submit_file(body=body)
@app.route('/file/<id>')
def get_file(id):
with psql.cursor() as cur:
cur.execute("SELECT data FROM files WHERE id = %s", (id,), binary=True)
row = cur.fetchone()
if row:
response = flask.make_response(row[0].tobytes())
response.headers.set('Content-Type', 'application/octet-stream')
return response
else:
logging.warn("File '{}' does not exist".format(id))
return error_resp(HTTP_NOT_FOUND)
@app.route('/files/<id>')
def get_file_old(id):
with psql.cursor() as cur:
cur.execute("SELECT data FROM files WHERE id = %s", (id,), binary=True)
row = cur.fetchone()
if row:
return json_resp({
"status_code": 200,
"result": base64.b64encode(row[0])
})
else:
logging.warn("File '{}' does not exist".format(id))
return error_resp(HTTP_NOT_FOUND)
@app.route('/file/<id>/info')
def get_file_info(id):
with psql.cursor() as cur:
cur.execute("SELECT length(data), uploaded, expiry FROM files WHERE id = %s", (id,))
row = cur.fetchone()
if row:
return json_resp({
"size": row[0],
"uploaded": row[1].timestamp(),
"expires": row[2].timestamp()
})
else:
logging.warn("File '{}' does not exist".format(id))
return error_resp(HTTP_NOT_FOUND)
@app.route('/session_version')
def get_session_version():
platform = request.args['platform']
if platform not in ('desktop', 'android', 'ios'):
logging.warn("Invalid session platform '{}'".format(platform))
return error_resp(HTTP_NOT_FOUND)
project = 'oxen-io/session-' + platform
with psql.cursor() as cur:
cur.execute("""
SELECT version, updated FROM release_versions
WHERE project = %s AND updated >= NOW() + '24 hours ago'
""", (project,))
row = cur.fetchone()
if row is None:
logging.warn("{} version is more than 24 hours stale!".format(project))
return error_resp(HTTP_BAD_GATEWAY)
return json_resp({
"status_code": 200,
"updated": row[1].timestamp(),
"result": row[0]
})
# FIXME TODO: this has some other allowed aliases, I think, /oxen and... dunno? Check SS.
@app.post('/loki/v3/lsrpc')
def onion_request():
body = request.data
logging.warn("onion request received: {}".format(body))
try:
junk = onionparser.parse_junk(body)
except RuntimeError as e:
logging.warn("Failed to decrypt onion request: {}".format(e))
return flask.Response(status=HTTP_ERROR_INTERNAL_SERVER_ERROR)
body = junk.payload
logging.warn("onion request decrypted to: {}".format(body))
try:
if body.startswith(b'{'):
# JSON input
req = json.loads(body)
meth, target = req['method'], req['endpoint']
if '?' in target:
target, query_string = target.split('?', 1)
else:
query_string = ''
subreq_body = body.get('body', '').encode()
if meth in ('POST', 'PUT'):
ct = body.get('contentType', 'application/json')
cl = len(subreq_body)
else:
if 'body' in req and len(req['body']):
raise RuntimeError("Invalid {} {} request: request must not contain a body", meth, target)
ct, cl = '', ''
elif body.startswith(b'd'):
# bt-encoded input
raise RuntimeError("Not implemented yet")
else:
raise RuntimeError("Invalid onion request body: expected JSON object or a bt-encoded dict")
# Set up the wsgi environ variables for the subrequest (see PEP 0333)
subreq_env = {
**request.environ,
"REQUEST_METHOD": method,
"PATH_INFO": target,
"QUERY_STRING": query_string,
"CONTENT_TYPE": ct,
"CONTENT_LENGTH": cl,
**{'HTTP_{}'.format(h.upper().replace('-', '_')): v for h, v in req.get('headers', {}).items()},
'wsgi.input': input
}
try:
with app.request_context(subreq_env) as subreq_ctx:
response = app.full_dispatch_request()
return junk.transformReply(response.get_data())
except Exception as e:
logging.warn("Onion sub-request failed: {}".format(e))
return flask.Response(status=HTTP_BAD_GATEWAY)
except Exception as e:
logging.warn("Invalid onion request: {}".format(e))
return error_resp(HTTP_ERROR_INTERNAL_SERVER_ERROR)

0
fileserver/__init__.py Normal file
View File

50
fileserver/cleanup.py Normal file
View File

@ -0,0 +1,50 @@
from .web import app
from .db import psql
from .timer import timer
from .stats import log_stats
from datetime import datetime
import requests
last_stats_printed = None
@timer(15)
def periodic(signum):
with app.app_context(), psql.cursor() as cur:
app.logger.debug("Cleaning up expired files")
cur.execute("DELETE FROM files WHERE expiry <= NOW()")
# NB: we do this infrequently (once every 30 minutes, per project) because Github rate
# limits if you make more than 60 requests in an hour.
# Limit to 1 because, if there are more than 1 outdated, it doesn't hurt anything to delay
# the next one by 30 seconds (and avoids triggering github rate limiting).
cur.execute(
"""
SELECT project, version FROM release_versions
WHERE updated < NOW() + '30 minutes ago' LIMIT 1
"""
)
row = cur.fetchone()
if row:
project, old_v = row
v = requests.get(
"https://api.github.com/repos/{}/releases/latest".format(project), timeout=5
).json()["tag_name"]
if v != old_v:
app.logger.info(
"{} latest release version changed from {} to {}".format(project, old_v, v)
)
cur.execute(
"""
UPDATE release_versions SET updated = NOW(), version = %s
WHERE project = %s""",
(v, project),
)
now = datetime.now()
global last_stats_printed
if last_stats_printed is None or (now - last_stats_printed).total_seconds() >= 3600:
print("wtf now={}, lsp={}".format(now, last_stats_printed))
log_stats(cur)
last_stats_printed = now

View File

@ -34,9 +34,7 @@ FILE_EXPIRY = '3 weeks'
# postgresql connect options
pgsql_connect_opts = {
"dbname": "sessionfiles",
}
pgsql_connect_opts = {"dbname": "sessionfiles"}
# The default log level
log_level = logging.INFO

23
fileserver/crypto.py Normal file
View File

@ -0,0 +1,23 @@
import nacl.public
import os
from .web import app
if os.path.exists("key_x25519"):
with open("key_x25519", "rb") as f:
key = f.read()
if len(key) != 32:
raise RuntimeError(
"Invalid key_x25519: expected 32 bytes, not {} bytes".format(len(key))
)
privkey = nacl.public.PrivateKey(key)
else:
privkey = nacl.public.PrivateKey.generate()
with open("key_x25519", "wb") as f:
f.write(privkey.encode())
app.logger.info(
"File server pubkey: {}".format(
privkey.public_key.encode(encoder=nacl.encoding.HexEncoder).decode()
)
)

9
fileserver/http.py Normal file
View File

@ -0,0 +1,9 @@
# error status codes:
BAD_REQUEST = 400
NOT_FOUND = 404
PAYLOAD_TOO_LARGE = 413
INSUFFICIENT_STORAGE = 507
INTERNAL_SERVER_ERROR = 500
BAD_GATEWAY = 502
BODY_METHODS = ("POST", "PUT")

4
fileserver/logging.py Normal file
View File

@ -0,0 +1,4 @@
from . import config
import coloredlogs
coloredlogs.install(level=config.log_level, milliseconds=True, isatty=True)

131
fileserver/onion_req.py Normal file
View File

@ -0,0 +1,131 @@
from flask import request, Response
import base64
import json
from io import BytesIO
import pyonionreq.junk
from .web import app
from . import http
from . import crypto
import traceback
onionparser = pyonionreq.junk.Parser(
pubkey=crypto.privkey.public_key.encode(), privkey=crypto.privkey.encode()
)
def handle_onionreq_plaintext(body):
"""
Handles a decrypted onion request; this injects a subrequest to process it then returns the
result of that subrequest (as bytes).
Note that this does not throw: if errors occur we map them into "success" responses with a body
of {"status_code":xxx} as onion requests have no ability at all to signal a request failure.
"""
try:
if body.startswith(b"{"):
# JSON input
req = json.loads(body)
endpoint, method = req["endpoint"], req["method"]
subreq_headers = {k.lower(): v for k, v in req.get("headers", {}.items()).items()}
if method in http.BODY_METHODS:
if "body_binary" in req:
subreq_body = base64.b64decode(req["body_binary"], validate=True)
else:
subreq_body = req.get("body", "").encode()
ct = subreq_headers.pop(
"content-type",
"application/octet-stream" if "body_binary" in req else "application/json",
)
cl = len(subreq_body)
else:
subreq_body = b""
# Android bug workaround: Android Session (at least up to v1.11.12) sends a body on
# GET requests with a 4-character string "null" when it should send no body.
if "body" in req and len(req["body"]) == 4 and req["body"] == "null":
del req["body"]
if "body" in req and len(req["body"]) or "body_binary" in req:
raise RuntimeError(
"Invalid {} {} request: request must not contain a body".format(
method, endpoint
)
)
ct, cl = "", ""
for h in ("content-type", "content-length"):
if h in subreq_headers:
del subreq_headers[h]
elif body.startswith(b"d"):
raise RuntimeError("Bencoded onion requests not implemented yet")
else:
raise RuntimeError(
"Invalid onion request body: expected JSON object or a bt-encoded dict"
)
if "?" in endpoint:
endpoint, query_string = endpoint.split("?", 1)
else:
query_string = ""
# Set up the wsgi environ variables for the subrequest (see PEP 0333)
subreq_env = {
**request.environ,
"REQUEST_METHOD": method,
"PATH_INFO": endpoint,
"QUERY_STRING": query_string,
"CONTENT_TYPE": ct,
"CONTENT_LENGTH": cl,
**{"HTTP_{}".format(h.upper().replace("-", "_")): v for h, v in subreq_headers.items()},
"wsgi.input": BytesIO(subreq_body),
}
try:
with app.request_context(subreq_env):
response = app.full_dispatch_request()
if response.status_code == 200:
data = response.get_data()
app.logger.debug(
"Onion sub-request for {} returned success, {} bytes".format(
endpoint, len(data)
)
)
return data
app.logger.warn(
"Onion sub-request for {} {} returned status {}".format(
method, endpoint, response.status_code
)
)
return json.dumps({"status_code": response.status_code}).encode()
except Exception:
app.logger.warn(
"Onion sub-request for {} {} failed: {}".format(
method, endpoint, traceback.format_exc()
)
)
return json.dumps({"status_code": http.BAD_GATEWAY}).encode()
except Exception as e:
app.logger.warn("Invalid onion request: {}".format(e))
return json.dumps({"status_code": http.BAD_REQUEST}).encode()
@app.post("/oxen/v3/lsrpc")
@app.post("/loki/v3/lsrpc")
def handle_onion_request():
"""
Parse an onion request, handle it as a subrequest, then encrypt the subrequest result and send
it back to the requestor.
"""
try:
junk = onionparser.parse_junk(request.data)
except RuntimeError as e:
app.logger.warn("Failed to decrypt onion request: {}".format(e))
return Response(status=http.INTERNAL_SERVER_ERROR)
response = handle_onionreq_plaintext(junk.payload)
return base64.b64encode(junk.transformReply(response)).decode()

View File

@ -1,14 +1,18 @@
import logging
try:
import uwsgi
import uwsgi # noqa: F401
except ModuleNotFoundError:
class postfork:
"""Simple non-uwsgi stub that just calls the postfork function"""
def __init__(self, f):
f()
def __call__(self, f):
pass
else:
from uwsgidecorators import postfork
import uwsgidecorators
postfork = uwsgidecorators.postfork

200
fileserver/routes.py Normal file
View File

@ -0,0 +1,200 @@
from . import config
from .web import app
from .db import psql
from . import http
import flask
from flask import request
import secrets
import base64
from hashlib import blake2b
import json
import psycopg
if config.BACKWARDS_COMPAT_IDS:
assert all(x in (0, 1) for x in config.BACKWARDS_COMPAT_IDS_FIXED_BITS)
BACKWARDS_COMPAT_MSB = sum(
y << x for x, y in enumerate(reversed(config.BACKWARDS_COMPAT_IDS_FIXED_BITS))
)
BACKWARDS_COMPAT_RANDOM_BITS = 53 - len(config.BACKWARDS_COMPAT_IDS_FIXED_BITS)
def json_resp(data, status=200):
"""Takes data and optionally an HTTP status, returns it as a json response."""
return flask.Response(json.dumps(data), status=status, mimetype="application/json")
def error_resp(code):
"""
Simple JSON error response to send back, embedded as `status_code` and also as the HTTP response
code.
"""
return json_resp({"status_code": code}, code)
def generate_file_id(data):
"""
Generate a file ID by blake2b hashing the file body, then using a 33-byte digest encoded into 44
base64 chars. (Ideally would be 32, but that would result in base64 padding, so increased to 33
to fit perfectly).
"""
return base64.urlsafe_b64encode(
blake2b(data, digest_size=33, salt=b"SessionFileSvr\0\0").digest()
).decode()
@app.post("/file")
def submit_file(*, body=None, deprecated=False):
if body is None:
body = request.data
if not 0 < len(body) <= config.MAX_FILE_SIZE:
app.logger.warn(
"Rejecting upload of size {} ∉ (0, {}]".format(len(body), config.MAX_FILE_SIZE)
)
return error_resp(http.PAYLOAD_TOO_LARGE)
id = None
try:
if config.BACKWARDS_COMPAT_IDS:
done = False
for attempt in range(25):
id = BACKWARDS_COMPAT_MSB << BACKWARDS_COMPAT_RANDOM_BITS | secrets.randbits(
BACKWARDS_COMPAT_RANDOM_BITS
)
if not deprecated:
id = str(id) # New ids are always strings; legacy requests require an integer
try:
with psql.cursor() as cur:
cur.execute(
"INSERT INTO files (id, data, expiry) VALUES (%s, %s, NOW() + %s)",
(id, body, config.FILE_EXPIRY),
)
except psycopg.errors.UniqueViolation:
continue
done = True
break
if not done:
app.logger.error(
"Tried 25 random IDs and got all constraint failures, something getting wrong!"
)
return error_resp(http.INSUFFICIENT_STORAGE)
else:
with psql.transaction(), psql.cursor() as cur:
id = generate_file_id(body)
try:
# Don't pass the data yet because we might be de-duplicating
with psql.transaction():
cur.execute(
"INSERT INTO files (id, data, expiry) VALUES (%s, '', NOW() + %s)",
(id, config.FILE_EXPIRY),
)
except psycopg.errors.UniqueViolation:
# Found a duplicate id, so de-duplicate by just refreshing the expiry
cur.execute(
"UPDATE files SET uploaded = NOW(), expiry = NOW() + %s WHERE id = %s",
(config.FILE_EXPIRY, id),
)
else:
cur.execute("UPDATE files SET data = %s WHERE id = %s", (body, id))
except Exception as e:
app.logger.error("Failed to insert file: {}".format(e))
return error_resp(http.INTERNAL_SERVER_ERROR)
response = {"id": id}
if deprecated:
response["status_code"] = 200
return json_resp(response)
@app.post("/files")
def submit_file_old():
input = request.json()
if input is None or "file" not in input:
app.logger.warn("Invalid request: did not find json with a 'file' property")
return error_resp(http.BAD_REQUEST)
body = input["file"]
if not 0 < len(body) <= config.MAX_FILE_SIZE_B64:
app.logger.warn(
"Rejecting upload of b64-encoded size {} ∉ (0, {}]".format(
len(body), config.MAX_FILE_SIZE_B64
)
)
return error_resp(http.PAYLOAD_TOO_LARGE)
# base64.b64decode is picky about padding (but not, by default, about random non-alphabet
# characters in the middle of the data, wtf!)
while len(body) % 4 != 0:
body += "="
body = base64.b64decode(body, validate=True)
return submit_file(body=body)
@app.route("/file/<id>")
def get_file(id):
with psql.cursor() as cur:
cur.execute("SELECT data FROM files WHERE id = %s", (id,), binary=True)
row = cur.fetchone()
if row:
response = flask.make_response(row[0].tobytes())
response.headers.set("Content-Type", "application/octet-stream")
return response
else:
app.logger.warn("File '{}' does not exist".format(id))
return error_resp(http.NOT_FOUND)
@app.route("/files/<id>")
def get_file_old(id):
with psql.cursor() as cur:
cur.execute("SELECT data FROM files WHERE id = %s", (id,), binary=True)
row = cur.fetchone()
if row:
return json_resp({"status_code": 200, "result": base64.b64encode(row[0])})
else:
app.logger.warn("File '{}' does not exist".format(id))
return error_resp(http.NOT_FOUND)
@app.route("/file/<id>/info")
def get_file_info(id):
with psql.cursor() as cur:
cur.execute("SELECT length(data), uploaded, expiry FROM files WHERE id = %s", (id,))
row = cur.fetchone()
if row:
return json_resp(
{"size": row[0], "uploaded": row[1].timestamp(), "expires": row[2].timestamp()}
)
else:
app.logger.warn("File '{}' does not exist".format(id))
return error_resp(http.NOT_FOUND)
@app.route("/session_version")
def get_session_version():
platform = request.args["platform"]
if platform not in ("desktop", "android", "ios"):
app.logger.warn("Invalid session platform '{}'".format(platform))
return error_resp(http.NOT_FOUND)
project = "oxen-io/session-" + platform
with psql.cursor() as cur:
cur.execute(
"""
SELECT version, updated FROM release_versions
WHERE project = %s AND updated >= NOW() + '24 hours ago'
""",
(project,),
)
row = cur.fetchone()
if row is None:
app.logger.warn("{} version is more than 24 hours stale!".format(project))
return error_resp(http.BAD_GATEWAY)
return json_resp({"status_code": 200, "updated": row[1].timestamp(), "result": row[0]})

View File

@ -1,10 +1,11 @@
from .web import app
si_prefixes = ["", "k", "M", "G", "T", "P", "E", "Z", "Y"]
import logging
si_prefixes = ['', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y']
def pretty_bytes(nbytes):
i = 0
while nbytes >= 1000 and i + 1 < len(si_prefix):
while nbytes >= 1000 and i + 1 < len(si_prefixes):
nbytes /= 1000
i += 1
return ("{} B" if i == 0 else "{:.1f} {}B").format(nbytes, si_prefixes[i])
@ -16,5 +17,4 @@ def log_stats(cur):
if num == 0 and size is None:
size = 0
logging.info("Current stats: {} files stored totalling {}".format(
num, pretty_bytes(size)))
app.logger.info("Current stats: {} files stored totalling {}".format(num, pretty_bytes(size)))

View File

@ -1,23 +1,29 @@
import logging
try:
import uwsgi
import uwsgi # noqa: F401
except ModuleNotFoundError:
import sys
logging.error("""
logging.error(
"""
WARNING:
Failed to load uwsgidecorators; we probably aren't running under uwsgi.
File cleanup and session version updating will not be enabled!
""")
"""
)
class timer:
"""Do-nothing stub"""
def __init__(self, secs, **kwargs):
pass
def __call__(self, f):
pass
else:
from uwsgidecorators import timer
import uwsgidecorators
timer = uwsgidecorators.timer

10
fileserver/web.py Normal file
View File

@ -0,0 +1,10 @@
#!/usr/bin/env python3
import flask
app = flask.Flask(__name__)
from . import logging # noqa: F401, E402
from . import routes # noqa: F401, E402
from . import cleanup # noqa: F401, E402
from . import db # noqa: F401, E402

View File

@ -31,9 +31,12 @@ window = [(0, started)]
total_files = sum(1 for _ in os.scandir(filesdir))
for dentry in os.scandir(filesdir):
if not dentry.name.isdigit() or not dentry.is_file():
print("\nWARNING: {} doesn't look like an old file server upload, skipping.".format(
dentry.name),
file=sys.stderr)
print(
"\nWARNING: {} doesn't look like an old file server upload, skipping.".format(
dentry.name
),
file=sys.stderr,
)
continue
stat = dentry.stat()
@ -41,8 +44,12 @@ for dentry in os.scandir(filesdir):
row = cur.execute("SELECT length(data) FROM files WHERE id = %s", (dentry.name,)).fetchone()
if row:
if size != row[0]:
print(("\nWARNING: Skipping duplicate id {} with mismatched size "
"(expected {} ≠ actual {})").format(dentry.name, size, row[0]))
print(
(
"\nWARNING: Skipping duplicate id {} with mismatched size "
"(expected {} ≠ actual {})"
).format(dentry.name, size, row[0])
)
skipped += 1
skipped_size += size
@ -57,7 +64,8 @@ for dentry in os.scandir(filesdir):
INSERT INTO files (id, data, uploaded, expiry)
VALUES (%s, %b, %s, %s + %s)
""",
(dentry.name, data, uploaded, uploaded, config.FILE_EXPIRY))
(dentry.name, data, uploaded, uploaded, config.FILE_EXPIRY),
)
count += 1
committed_size += size
@ -67,22 +75,37 @@ for dentry in os.scandir(filesdir):
window.pop(0)
mb = committed_size / 1_000_000
window.append((mb, now))
speed = ((window[-1][0] - window[0][0]) / (window[-1][1] - window[0][1]).total_seconds()
if len(window) > 1 else 0)
print(("\rImported {:,} (new: {:,}, skipped: {:,}) / {:,} files containing "
"{:,.1f}MB new ({:,.2f}MB/s), {:,.1f}MB skipped data").format(
count + skipped, count, skipped, total_files,
mb, speed, skipped_size / 1_000_000
), end='', flush=True)
speed = (
(window[-1][0] - window[0][0]) / (window[-1][1] - window[0][1]).total_seconds()
if len(window) > 1
else 0
)
print(
(
"\rImported {:,} (new: {:,}, skipped: {:,}) / {:,} files containing "
"{:,.1f}MB new ({:,.2f}MB/s), {:,.1f}MB skipped data"
).format(
count + skipped, count, skipped, total_files, mb, speed, skipped_size / 1_000_000
),
end='',
flush=True,
)
duration = (datetime.now() - started).total_seconds()
print("""
print(
"""
Import finished: imported {:,} files containing {:,d} bytes of data in {:,.2f} seconds ({:,.2f}MB/s)
Skipped {:,} already-existing files containing {:,} bytes
""".format(
count, committed_size, duration, committed_size / 1_000_000 / duration,
skipped, skipped_size))
count,
committed_size,
duration,
committed_size / 1_000_000 / duration,
skipped,
skipped_size,
)
)

@ -1 +1 @@
Subproject commit 12445fccc741f5ce4aa146a11cca217e91d11066
Subproject commit bc373fcb0915e0378a898762e239266409d62f0c

7
pyproject.toml Normal file
View File

@ -0,0 +1,7 @@
[tool.black]
line-length = 100
skip-string-normalization = true
skip-magic-trailing-comma = true
target-version = ['py38']
include = '\.py$'
extend-exclude = 'libonionrequests'