Merge pull request #24 from jagerman/db-refactor

Db refactor
This commit is contained in:
Jason Rhinelander 2021-10-14 10:14:03 -03:00 committed by GitHub
commit e229a69424
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 3590 additions and 1891 deletions

134
.drone.jsonnet Normal file
View File

@ -0,0 +1,134 @@
local cargo = 'cargo --color=always --verbose ';
// Regular build on a rust docker image:
local rust_pipeline(
name,
image='rust:1-bullseye',
cargo_extra='--release',
tests=true,
deb=null, // set to distro name to make a deb
deb_revision_suffix='',
jobs=6,
arch='amd64'
) = {
kind: 'pipeline',
type: 'docker',
name: name,
platform: { arch: arch },
steps: [{
name: 'check',
image: image,
commands: [
'echo "Running on ${DRONE_STAGE_MACHINE}"',
cargo + 'check -j' + jobs + ' ' + cargo_extra,
],
}, {
name: 'build',
image: image,
commands: [cargo + 'build -j' + jobs + ' ' + cargo_extra],
}] + (if tests then [{
name: 'tests',
image: image,
commands: [cargo + 'test -j' + jobs + ' ' + cargo_extra],
}] else [])
+ (if deb != null then [{
name: 'deb',
image: image,
commands: [
cargo + 'install -j' + jobs + ' cargo-deb',
'sed -i -Ee \'s/^revision = "([^~]*)(~.*)?"$/revision = "\\\\\\\\1' + deb_revision_suffix + '"/\' Cargo.toml',
cargo + 'deb',
],
}] else []),
};
local apt_get_quiet = 'apt-get -o=Dpkg::Use-Pty=0 -q';
local default_apt_deps = 'pkg-config libssl-dev';
// Build on a stock debian/ubuntu distro
local debian_pipeline(
name,
image,
cargo_extra='--release',
apt_deps=default_apt_deps,
tests=true,
deb=null, // set to distro name to make a deb
deb_revision_suffix='',
jobs=6,
arch='amd64'
) = {
kind: 'pipeline',
type: 'docker',
name: name,
platform: { arch: arch },
steps: [{
name: 'build',
image: image,
environment: { SSH_KEY: { from_secret: 'SSH_KEY' } },
commands: [
'echo "Building on ${DRONE_STAGE_MACHINE}"',
'echo "man-db man-db/auto-update boolean false" | debconf-set-selections',
apt_get_quiet + ' update',
apt_get_quiet + ' install -y eatmydata',
'eatmydata ' + apt_get_quiet + ' dist-upgrade -y',
'eatmydata ' + apt_get_quiet + ' install -y cargo ' + apt_deps + (if deb != null then ' openssh-client' else ''),
cargo + 'build -j' + jobs + ' ' + cargo_extra,
]
+ (if tests then [cargo + 'test -j' + jobs + ' ' + cargo_extra] else [])
+ (if deb != null then [
cargo + 'install -j' + jobs + ' cargo-deb',
'sed -i -Ee \'s/^revision = "([^~]*)(~.*)?"$/revision = "\\\\\\\\1' + deb_revision_suffix + '"/\' Cargo.toml',
cargo + 'deb',
'./contrib/ci/drone-debs-upload.sh ' + deb,
] else []),
}],
};
[
{
name: 'lint check',
kind: 'pipeline',
type: 'docker',
platform: { arch: 'amd64' },
steps: [{
name: 'format',
image: 'rust:1-bullseye',
commands: [
'echo "Running on ${DRONE_STAGE_MACHINE}"',
'rustup component add rustfmt',
'cargo fmt -- --check --color=always',
],
}],
},
rust_pipeline('Rust latest/Release (amd64)'),
rust_pipeline('Rust latest/Debug (amd64)', cargo_extra=''),
rust_pipeline('Rust latest/Release (ARM64)', arch='arm64'),
// Various debian builds
debian_pipeline('Debian sid (amd64)', 'debian:sid', deb='sid', deb_revision_suffix=''),
debian_pipeline('Debian 11 (amd64)', 'debian:bullseye', deb='bullseye', deb_revision_suffix='~deb11'),
debian_pipeline('Debian 11 (ARM64)', 'debian:bullseye', arch='arm64', deb='bullseye', deb_revision_suffix='~deb11'),
debian_pipeline('Ubuntu 21.04 (amd64)', 'ubuntu:hirsute', deb='hirsute', deb_revision_suffix='~ubuntu2104'),
debian_pipeline('Ubuntu 20.04 (amd64)', 'ubuntu:focal', deb='focal', deb_revision_suffix='~ubuntu2004'),
debian_pipeline('Ubuntu 18.04 (amd64)', 'ubuntu:bionic', deb='bionic', deb_revision_suffix='~ubuntu1804'),
rust_pipeline('Debian 10 (amd64)', 'rust:1-buster', deb='buster', deb_revision_suffix='~deb10'),
// Macos build:
{
kind: 'pipeline',
type: 'exec',
name: 'MacOS/Release',
platform: { os: 'darwin', arch: 'amd64' },
steps: [
{
name: 'build',
commands: [
'echo "Building on ${DRONE_STAGE_MACHINE}"',
'cargo build -j6 --release',
'cargo test -j6 --release',
],
},
],
},
]

View File

@ -1,31 +0,0 @@
name: Check
on: [push, pull_request]
env:
CARGO_TERM_COLOR: always
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/cache@v2
with:
path: |
~/.cargo/registry
~/.cargo/git
target
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
- name: Prepare
run: |
openssl genpkey -algorithm x25519 -out x25519_private_key.pem
openssl pkey -in x25519_private_key.pem -pubout -out x25519_public_key.pem
- name: Check build
run: cargo check --verbose --release
- name: Run tests
run: cargo test --verbose
- name: Check formatting
run: cargo fmt -- --check

View File

@ -1,30 +0,0 @@
name: Build DEB (Ubuntu 20.04)
on: release
env:
CARGO_TERM_COLOR: always
jobs:
build:
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v2
- uses: actions/cache@v2
with:
path: |
~/.cargo/registry
~/.cargo/git
target
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
- name: Build DEB
run: |
cargo install cargo-deb
cargo deb
- name: Upload DEB
uses: actions/upload-artifact@v2
with:
name: "ubuntu-20.04"
path: target/debian/*.deb

5
.gitignore vendored
View File

@ -1,6 +1,9 @@
/target
*.db
*.db-shm
*.db-wal
.DS_Store
*.pem
/files
/rooms
/rooms
/uploads

View File

@ -4,12 +4,16 @@ unstable_features = true
blank_lines_upper_bound = 3
brace_style = "PreferSameLine"
combine_control_expr = true
fn_args_layout = "Compressed"
comment_width = 100
fn_args_layout = "Tall"
fn_single_line = true
imports_indent = "Visual"
overflow_delimited_expr = true
group_imports = "StdExternalCrate"
imports_indent = "Visual"
imports_layout = "HorizontalVertical"
newline_style = "Native"
overflow_delimited_expr = true
trailing_comma = "Never"
use_field_init_shorthand = true
use_field_init_shorthand = false
use_small_heuristics = "Max"
where_single_line = true
wrap_comments = true

92
Cargo.lock generated
View File

@ -71,9 +71,14 @@ dependencies = [
[[package]]
name = "ahash"
version = "0.4.7"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "739f4a8db6605981345c5654f3a85b056ce52f37a39d34da03f25bf2151ea16e"
checksum = "43bb833f0bf979d8475d38fbf09ed3b8a55e1885fe93ad3f93239fc6a4f17b98"
dependencies = [
"getrandom 0.2.2",
"once_cell",
"version_check",
]
[[package]]
name = "aho-corasick"
@ -192,6 +197,17 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
name = "blake2"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a4e37d16930f5459780f5621038b6382b9bb37c19016f39fb6b5808d831f174"
dependencies = [
"crypto-mac 0.8.0",
"digest",
"opaque-debug",
]
[[package]]
name = "block-buffer"
version = "0.9.0"
@ -307,6 +323,16 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcb25d077389e53838a8158c8e99174c5a9d902dee4904320db714f3c653ffba"
[[package]]
name = "crypto-mac"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b584a330336237c1eecd3e94266efb216c56ed91225d634cb2991c5f3fd1aeab"
dependencies = [
"generic-array",
"subtle",
]
[[package]]
name = "crypto-mac"
version = "0.10.0"
@ -411,6 +437,29 @@ version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56899898ce76aaf4a0f24d914c97ea6ed976d42fec6ad33fcbb0a1103e07b2b0"
[[package]]
name = "ed25519"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4620d40f6d2601794401d6dd95a5cf69b6c157852539470eeda433a99b3c0efc"
dependencies = [
"signature",
]
[[package]]
name = "ed25519-dalek"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c762bae6dcaf24c4c84667b8579785430908723d5c889f469d76a41d59cc7a9d"
dependencies = [
"curve25519-dalek",
"ed25519",
"rand 0.7.3",
"serde",
"sha2",
"zeroize",
]
[[package]]
name = "encoding_rs"
version = "0.8.28"
@ -627,17 +676,23 @@ name = "hashbrown"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04"
[[package]]
name = "hashbrown"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
dependencies = [
"ahash",
]
[[package]]
name = "hashlink"
version = "0.6.0"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d99cf782f0dc4372d26846bec3de7804ceb5df083c2d4462c0b8d2330e894fa8"
checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf"
dependencies = [
"hashbrown",
"hashbrown 0.11.2",
]
[[package]]
@ -695,7 +750,7 @@ version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1441c6b1e930e2817404b5046f1f989899143a12bf92de603b69f4e0aee1e15"
dependencies = [
"crypto-mac",
"crypto-mac 0.10.0",
"digest",
]
@ -811,7 +866,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "824845a0bf897a9042383849b02c1bc219c2383772efcd5c6f9766fa4b81aef3"
dependencies = [
"autocfg",
"hashbrown",
"hashbrown 0.9.1",
]
[[package]]
@ -886,9 +941,9 @@ checksum = "56d855069fafbb9b344c0f962150cd2c1187975cb1c22c1522c240d8c4986714"
[[package]]
name = "libsqlite3-sys"
version = "0.20.1"
version = "0.22.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64d31059f22935e6c31830db5249ba2b7ecd54fd73a9909286f0a67aa55c2fbd"
checksum = "290b64917f8b0cb885d9de0f9959fe1f775d7fa12f1da2db9001c1c8ab60f89d"
dependencies = [
"cc",
"pkg-config",
@ -1360,9 +1415,9 @@ dependencies = [
[[package]]
name = "r2d2_sqlite"
version = "0.17.0"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "227ab35ff4cbb01fa76da8f062590fe677b93c8d9e8415eb5fa981f2c1dba9d8"
checksum = "9d24607049214c5e42d3df53ac1d8a23c34cc6a5eefe3122acb2c72174719959"
dependencies = [
"r2d2",
"rusqlite",
@ -1542,9 +1597,9 @@ dependencies = [
[[package]]
name = "rusqlite"
version = "0.24.2"
version = "0.25.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5f38ee71cbab2c827ec0ac24e76f82eca723cee92c509a65f67dee393c25112"
checksum = "57adcf67c8faaf96f3248c2a7b419a0dbc52ebe36ba83dd57fe83827c1ea4eb3"
dependencies = [
"bitflags",
"fallible-iterator",
@ -1749,8 +1804,12 @@ version = "0.1.10"
dependencies = [
"aes-gcm",
"base64",
"blake2",
"chrono",
"curve25519-dalek",
"curve25519-parser",
"ed25519-dalek",
"form_urlencoded",
"futures",
"hex",
"hmac",
@ -1774,7 +1833,6 @@ dependencies = [
"structopt",
"tokio",
"tokio-test",
"url",
"warp",
"x25519-dalek",
]
@ -1814,6 +1872,12 @@ dependencies = [
"libc",
]
[[package]]
name = "signature"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c19772be3c4dd2ceaacf03cb41d5885f2a02c4d8804884918e3a258480803335"
[[package]]
name = "slab"
version = "0.4.2"

View File

@ -11,12 +11,17 @@ systemd-units = { enable = true, start = true }
maintainer-scripts = "debian/"
depends = "libssl1.1, openssl"
section = "net"
revision = "1"
[dependencies]
aes-gcm = "0.8"
base64 = "0.13"
blake2 = "0.9"
chrono = "0.4"
curve25519-dalek = "3"
curve25519-parser = "0.2"
ed25519-dalek = "^1.0.1"
form_urlencoded = "1"
futures = "0.3"
hex = "0.4"
hmac = "0.10"
@ -25,23 +30,22 @@ lazy_static = "1.4"
log = "0.4"
log4rs = "1.0"
octocrab = "0.9"
parking_lot = "0.11.1"
rand = "0.8"
rand_core = "0.5"
regex = "1"
reqwest = { version = "0.11", features = ["json"] }
rusqlite = { version = "0.24", features = ["bundled"] }
rusqlite = { version = "^0.25", features = ["bundled"] }
rusqlite_migration = "0.4"
r2d2_sqlite = "0.17"
r2d2 = "0.8"
r2d2_sqlite = "^0.18"
r2d2 = "^0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.9"
structopt = "0.3"
tokio = { version = "1.3", features = ["full"] }
url = "2.2.1"
tokio = { version = "^1.3", features = ["full"] }
warp = { version = "0.3", features = ["tls"] }
x25519-dalek = "1.1"
parking_lot = "0.11.1"
x25519-dalek = "^1.1"
[dev-dependencies]
tokio-test = "*"

View File

@ -21,7 +21,6 @@ sudo curl -so /etc/apt/trusted.gpg.d/oxen.gpg https://deb.oxen.io/pub.gpg
echo "deb https://deb.oxen.io $(lsb_release -sc) main" | sudo tee /etc/apt/sources.list.d/oxen.list
sudo apt update
sudo apt install session-open-group-server
sudo chown _loki /var/lib/session-open-group-server -R
```
### Step 2: Add a room

50
contrib/ci/drone-debs-upload.sh Executable file
View File

@ -0,0 +1,50 @@
#!/bin/bash
# Script used with Drone CI to upload debs from the deb building pipelines (because specifying all
# this in .drone.jsonnet is too painful). This is expected to run from the base project dir after
# having build with debuild (which will leave the debs in ..).
set -o errexit
distro="$1"
if [ -z "$distro" ]; then
echo "Bad usage: need distro name as first argument"
exit 1
fi
if [ -z "$SSH_KEY" ]; then
echo -e "\n\n\n\e[31;1mUnable to upload debs: SSH_KEY not set\e[0m"
# Just warn but don't fail, so that this doesn't trigger a build failure for untrusted builds
exit 0
fi
echo "$SSH_KEY" >~/ssh_key
set -o xtrace # Don't start tracing until *after* we write the ssh key
chmod 600 ~/ssh_key
upload_to="oxen.rocks/debs/${DRONE_REPO// /_}@${DRONE_BRANCH// /_}/$(date --date=@$DRONE_BUILD_CREATED +%Y%m%dT%H%M%SZ)-${DRONE_COMMIT:0:9}/$distro/$DRONE_STAGE_ARCH"
# sftp doesn't have any equivalent to mkdir -p, so we have to split the above up into a chain of
# -mkdir a/, -mkdir a/b/, -mkdir a/b/c/, ... commands. The leading `-` allows the command to fail
# without error.
upload_dirs=(${upload_to//\// })
mkdirs=
dir_tmp=""
for p in "${upload_dirs[@]}"; do
dir_tmp="$dir_tmp$p/"
mkdirs="$mkdirs
-mkdir $dir_tmp"
done
sftp -i ~/ssh_key -b - -o StrictHostKeyChecking=off drone@oxen.rocks <<SFTP
$mkdirs
put target/debian/*.*deb $upload_to
SFTP
set +o xtrace
echo -e "\n\n\n\n\e[32;1mUploaded debs to https://${upload_to}/\e[0m\n\n\n"

View File

@ -21,12 +21,14 @@ if [ "$1" = configure ]; then
# Generate the key pair if needed
if ! [ -f $SOGS/x25519_private_key.pem ]; then
openssl genpkey -algorithm x25519 -out $SOGS/x25519_private_key.pem
chown _loki $SOGS/x25519_private_key.pem
rm -f $SOGS/x25519_public_key.pem
fi
if ! [ -f $SOGS/x25519_public_key.pem ]; then
openssl pkey -in $SOGS/x25519_private_key.pem -pubout -out $SOGS/x25519_public_key.pem
chown _loki $SOGS/x25519_public_key.pem
fi
# Set permissions
su -s /bin/sh _loki -c "test -O $SOGS && test -G $SOGS" || \
chown _loki:_loki $SOGS
fi
fi

View File

@ -4,6 +4,8 @@ use std::sync::Mutex;
use aes_gcm::aead::{generic_array::GenericArray, Aead, NewAead};
use aes_gcm::Aes256Gcm;
use blake2::{Blake2b, Digest};
use curve25519_dalek;
use hmac::{Hmac, Mac, NewMac};
use log::{error, warn};
use rand::{thread_rng, Rng};
@ -14,11 +16,13 @@ use super::errors::Error;
type HmacSha256 = Hmac<Sha256>;
// By default the aes-gcm crate will use software implementations of both AES and the POLYVAL universal hash function. When
// targeting modern x86/x86_64 CPUs, use the following RUSTFLAGS to take advantage of high performance AES-NI and CLMUL CPU
// intrinsics:
// By default the aes-gcm crate will use software implementations of both AES
// and the POLYVAL universal hash function. When targeting modern x86/x86_64
// CPUs, use the following RUSTFLAGS to take advantage of high performance
// AES-NI and CLMUL CPU intrinsics:
//
// RUSTFLAGS="-Ctarget-cpu=sandybridge -Ctarget-feature=+aes,+sse2,+sse4.1,+ssse3"
// RUSTFLAGS="-Ctarget-cpu=sandybridge
// -Ctarget-feature=+aes,+sse2,+sse4.1,+ssse3"
const IV_SIZE: usize = 12;
@ -39,10 +43,56 @@ lazy_static::lazy_static! {
let raw_public_key = fs::read_to_string(path).unwrap();
return curve25519_parser::parse_openssl_25519_pubkey(raw_public_key.as_bytes()).unwrap();
};
// For backwards compatibility with token-using Session client versions we include a signature
// in the "token" value we send back, signed using this key. When we drop token support we can
// also drop this.
pub static ref TOKEN_SIGNING_KEYS: ed25519_dalek::Keypair = {
let mut hasher = Blake2b::new();
hasher.update(b"SOGS TOKEN SIGNING KEY");
hasher.update(PRIVATE_KEY.to_bytes());
hasher.update(PUBLIC_KEY.as_bytes());
let res = hasher.finalize();
let secret = ed25519_dalek::SecretKey::from_bytes(&res[0..32]).unwrap();
let public = ed25519_dalek::PublicKey::from(&secret);
ed25519_dalek::Keypair{ secret, public }
};
}
/// Takes hex string representation of an ed25519 pubkey, returns the ed25519
/// pubkey, derived x25519 pubkey, and the Session id in hex.
pub fn get_pubkeys(
edpk_hex: &str,
) -> Result<(ed25519_dalek::PublicKey, x25519_dalek::PublicKey, String), warp::reject::Rejection> {
if edpk_hex.len() != 64 {
return Err(warp::reject::custom(Error::DecryptionFailed));
}
let edpk_bytes = match hex::decode(edpk_hex) {
Ok(bytes) => bytes,
Err(_) => {
warn!("Invalid ed25519 pubkey: '{}' is not hex", edpk_hex);
return Err(warp::reject::custom(Error::DecryptionFailed));
}
};
let edpk =
ed25519_dalek::PublicKey::from_bytes(&edpk_bytes).map_err(|_| Error::DecryptionFailed)?;
let compressed = curve25519_dalek::edwards::CompressedEdwardsY::from_slice(&edpk_bytes);
let edpoint = compressed.decompress().ok_or(warp::reject::custom(Error::DecryptionFailed))?;
if !edpoint.is_torsion_free() {
return Err(Error::DecryptionFailed.into());
}
let xpk = x25519_dalek::PublicKey::from(*edpoint.to_montgomery().as_bytes());
let mut session_id = String::with_capacity(66);
session_id.push_str("05");
session_id.push_str(&hex::encode(xpk.as_bytes()));
return Ok((edpk, xpk, session_id));
}
pub fn get_x25519_symmetric_key(
public_key: &[u8], private_key: &x25519_dalek::StaticSecret,
public_key: &[u8],
private_key: &x25519_dalek::StaticSecret,
) -> Result<Vec<u8>, warp::reject::Rejection> {
if public_key.len() != 32 {
error!(
@ -60,7 +110,8 @@ pub fn get_x25519_symmetric_key(
}
pub fn encrypt_aes_gcm(
plaintext: &[u8], symmetric_key: &[u8],
plaintext: &[u8],
symmetric_key: &[u8],
) -> Result<Vec<u8>, warp::reject::Rejection> {
let mut iv = [0u8; IV_SIZE];
thread_rng().fill(&mut iv[..]);
@ -72,14 +123,15 @@ pub fn encrypt_aes_gcm(
return Ok(iv_and_ciphertext);
}
Err(e) => {
error!("Couldn't encrypt ciphertext due to error: {}.", e);
error!("Couldn't encrypt ciphertext: {}.", e);
return Err(warp::reject::custom(Error::DecryptionFailed));
}
};
}
pub fn decrypt_aes_gcm(
iv_and_ciphertext: &[u8], symmetric_key: &[u8],
iv_and_ciphertext: &[u8],
symmetric_key: &[u8],
) -> Result<Vec<u8>, warp::reject::Rejection> {
if iv_and_ciphertext.len() < IV_SIZE {
warn!("Ignoring ciphertext of invalid size: {}.", iv_and_ciphertext.len());
@ -91,7 +143,7 @@ pub fn decrypt_aes_gcm(
match cipher.decrypt(GenericArray::from_slice(&iv), &*ciphertext) {
Ok(plaintext) => return Ok(plaintext),
Err(e) => {
error!("Couldn't decrypt ciphertext due to error: {}.", e);
error!("Couldn't decrypt ciphertext: {}.", e);
return Err(warp::reject::custom(Error::DecryptionFailed));
}
};
@ -102,3 +154,28 @@ pub fn generate_x25519_key_pair() -> (x25519_dalek::StaticSecret, x25519_dalek::
let public_key = x25519_dalek::PublicKey::from(&private_key);
return (private_key, public_key);
}
// Verifies a signature over the given byte parts, concatenated together.
pub fn verify_signature(
edpk: &ed25519_dalek::PublicKey,
sig: &ed25519_dalek::Signature,
parts: &[&[u8]],
) -> Result<(), Error> {
let mut verify_buf: Vec<u8> = Vec::new();
let verify: &[u8];
if parts.len() == 1 {
verify = &parts[0];
} else {
verify_buf.reserve_exact(parts.iter().map(|&x| x.len()).sum());
for &x in parts {
verify_buf.extend_from_slice(x);
}
verify = &verify_buf;
}
if let Err(sigerr) = edpk.verify_strict(verify, &sig) {
warn!("Request signature verification failed: {}", sigerr);
return Err(Error::ValidationFailed);
}
Ok(())
}

View File

@ -1,21 +1,24 @@
use warp::{http::StatusCode, reply::Reply, reply::Response, Rejection};
use warp::{http::StatusCode, reject::Reject, reply::Reply, reply::Response, Rejection};
#[derive(Debug)]
pub enum Error {
DecryptionFailed,
DatabaseFailedInternally,
InvalidOnionRequest,
/// Usually this means the endpoint or HTTP method specified in the RPC call was malformed.
/// Usually this means the endpoint or HTTP method specified in the RPC call
/// was malformed.
InvalidRpcCall,
/// The requesting user didn't provide an auth token for a route that requires one.
/// The requesting user didn't provide an auth token for a route that
/// requires one.
NoAuthToken,
NoSuchRoom,
RateLimited,
/// The requesting user provided a valid auth token, but they don't have a high enough permission level.
/// The requesting user provided a valid auth token, but they don't have a
/// high enough permission level.
Unauthorized,
ValidationFailed,
}
impl warp::reject::Reject for Error {}
impl Reject for Error {}
#[rustfmt::skip]
pub fn status_code(e: Rejection) -> StatusCode {

File diff suppressed because it is too large Load Diff

View File

@ -8,19 +8,23 @@ use log4rs::{
encode::pattern::PatternEncoder,
filter::threshold::ThresholdFilter,
};
use std::str::FromStr;
pub fn init(log_file: Option<String>) {
let console_level = LevelFilter::Debug;
let file_level = LevelFilter::Info;
pub fn init(log_file: Option<String>, log_level: Option<String>) {
let level = if log_level.is_some() {
LevelFilter::from_str(&log_level.unwrap()).unwrap()
} else {
LevelFilter::Info
};
let stdout_appender = {
let encoder = Box::new(PatternEncoder::new("{h({l})} {d} - {m}{n}"));
let encoder =
Box::new(PatternEncoder::new("{h({l})} {d(%Y-%m-%d %H:%M:%S%.3f)} [{f}:{L}] {m}{n}"));
let stdout = ConsoleAppender::builder().encoder(encoder).build();
let filter = Box::new(ThresholdFilter::new(console_level));
let filter = Box::new(ThresholdFilter::new(level));
Appender::builder().filter(filter).build("stdout", Box::new(stdout))
};
let mut root = Root::builder().appender("stdout");
// Increase SOGS logging level to debug
let sogs = Logger::builder().build("session_open_group_server", LevelFilter::Debug);
let sogs = Logger::builder().build("session_open_group_server", level);
let mut config_builder = log4rs::Config::builder().logger(sogs).appender(stdout_appender);
if let Some(log_file) = log_file {
// Rotate log files every ~50MB keeping 1 archived
@ -29,16 +33,16 @@ pub fn init(log_file: Option<String>) {
.build(&format!("{}-archive.{{}}", &log_file), 1)
.unwrap();
let roll_policy = compound::CompoundPolicy::new(Box::new(size_trigger), Box::new(roller));
// Print to the file at info level
// Print to the file at the given level
let file_appender =
RollingFileAppender::builder().build(&log_file, Box::new(roll_policy)).unwrap();
let filter = Box::new(ThresholdFilter::new(file_level));
let filter = Box::new(ThresholdFilter::new(level));
let file_appender =
Appender::builder().filter(filter).build("file", Box::new(file_appender));
config_builder = config_builder.appender(file_appender);
root = root.appender("file");
}
let root = root.build(file_level);
let root = root.build(level);
let config = config_builder.build(root).unwrap();
let _ = log4rs::init_config(config).expect("Couldn't initialize log configuration.");
}

View File

@ -17,6 +17,7 @@ mod crypto;
mod errors;
mod handlers;
mod logging;
mod migration;
mod models;
mod onion_requests;
mod options;
@ -53,7 +54,7 @@ async fn main() {
PORT.store(opt.port, Ordering::SeqCst);
USES_TLS.store(opt.tls, Ordering::SeqCst);
// Run in server mode
logging::init(opt.log_file);
logging::init(opt.log_file, opt.log_level);
let addr = SocketAddr::new(IpAddr::V4(opt.host), opt.port);
let localhost = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), LOCALHOST_PORT);
*crypto::PRIVATE_KEY_PATH.lock().unwrap() = opt.x25519_private_key;
@ -64,16 +65,11 @@ async fn main() {
info!("Users can join rooms on this open group server using the following URL format:");
info!("{}", get_url());
// Create the main database
storage::create_main_database_if_needed();
storage::setup_database();
// Create required folders
fs::create_dir_all("./rooms").unwrap();
fs::create_dir_all("./files").unwrap();
// Perform migration
storage::perform_migration();
// Set up pruning jobs
let prune_pending_tokens_future = storage::prune_pending_tokens_periodically();
let prune_tokens_future = storage::prune_tokens_periodically();
let prune_files_future = storage::prune_files_periodically();
let db_maintenance_future = storage::db_maintenance_job();
// Serve routes
let public_routes = routes::root().or(routes::fallback()).or(routes::lsrpc());
let private_routes = routes::create_room()
@ -91,25 +87,13 @@ async fn main() {
.run(addr);
let serve_private_routes_future = warp::serve(private_routes).run(localhost);
// Keep futures alive
join!(
prune_pending_tokens_future,
prune_tokens_future,
prune_files_future,
serve_public_routes_future,
serve_private_routes_future
);
join!(db_maintenance_future, serve_public_routes_future, serve_private_routes_future);
} else {
info!("Running on {}.", addr);
let serve_public_routes_future = warp::serve(public_routes).run(addr);
let serve_private_routes_future = warp::serve(private_routes).run(localhost);
// Keep futures alive
join!(
prune_pending_tokens_future,
prune_tokens_future,
prune_files_future,
serve_public_routes_future,
serve_private_routes_future
);
join!(db_maintenance_future, serve_public_routes_future, serve_private_routes_future);
}
}
}
@ -120,7 +104,7 @@ async fn execute_commands(opt: options::Opt) {
// Add a room
if let Some(args) = opt.add_room {
let mut params = HashMap::new();
params.insert("id", &args[0]);
params.insert("token", &args[0]);
params.insert("name", &args[1]);
client.post(format!("{}/rooms", localhost)).json(&params).send().await.unwrap();
println!("Added room with ID: {}", &args[0]);
@ -132,6 +116,8 @@ async fn execute_commands(opt: options::Opt) {
}
// Add a moderator
if let Some(args) = opt.add_moderator {
// FIXME: need to add an ability to add an admin instead of moderator (by setting the
// "admin" param to true)
let mut params = HashMap::new();
params.insert("public_key", &args[0]);
params.insert("room_id", &args[1]);

473
src/migration.rs Normal file
View File

@ -0,0 +1,473 @@
use std::fs;
use std::os::unix::fs::MetadataExt;
use std::path::Path;
use std::time::SystemTime;
use super::handlers;
use super::storage;
use log::{info, warn};
use rusqlite::{params, types::Null, Connection, OpenFlags};
// Performs database migration from v0.1.8 to v0.2.0
pub fn migrate_0_2_0(conn: &mut Connection) -> Result<(), rusqlite::Error> {
// Old database database.db is a single table database containing just the list of rooms:
/*
CREATE TABLE IF NOT EXISTS main (
id TEXT PRIMARY KEY, -- AKA token
name TEXT,
image_id TEXT -- entirely unused.
)
*/
// Do the entire import in one transaction so that if anything fails we leave the db empty (so
// that starting again will try to import again).
let tx = conn.transaction()?;
struct Rm {
token: String,
name: Option<String>,
}
let rooms = Connection::open_with_flags("database.db", OpenFlags::SQLITE_OPEN_READ_ONLY)?
.prepare("SELECT id, name FROM main")?
.query_map(params![], |row| Ok(Rm { token: row.get(0)?, name: row.get(1)? }))?
.collect::<Result<Vec<Rm>, _>>()?;
warn!("{} rooms to import", rooms.len());
{
tx.execute(
"\
CREATE TABLE room_import_hacks (
room INTEGER PRIMARY KEY NOT NULL REFERENCES rooms(id),
old_message_id_max INTEGER NOT NULL,
message_id_offset INTEGER NOT NULL
)",
[],
)?;
let mut used_room_hacks: bool = false;
let mut ins_room_hack = tx.prepare(
"INSERT INTO room_import_hacks (room, old_message_id_max, message_id_offset) VALUES (?, ?, ?)")?;
tx.execute(
"\
CREATE TABLE file_id_hacks (
room INTEGER NOT NULL REFERENCES rooms(id),
old_file_id INTEGER NOT NULL,
file INTEGER NOT NULL REFERENCES files(id) ON DELETE CASCADE,
PRIMARY KEY(room, old_file_id)
)",
[],
)?;
let mut used_file_hacks: bool = false;
let mut ins_file_hack =
tx.prepare("INSERT INTO file_id_hacks (room, old_file_id, file) VALUES (?, ?, ?)")?;
let mut ins_room =
tx.prepare("INSERT INTO rooms (token, name) VALUES (?, ?) RETURNING id")?;
let mut ins_user = tx.prepare(
"INSERT INTO users (session_id, last_active) VALUES (?, 0.0) ON CONFLICT DO NOTHING",
)?;
let mut ins_msg = tx.prepare(
"INSERT INTO messages (id, room, user, posted, data, data_size, signature) \
VALUES (?, ?, (SELECT id FROM users WHERE session_id = ?), ?, ?, ?, ?)",
)?;
let mut upd_msg_updated = tx.prepare("UPDATE messages SET updated = ? WHERE id = ?")?;
let mut upd_room_updates = tx.prepare("UPDATE rooms SET updates = ? WHERE id = ?")?;
let mut ins_file = tx.prepare(
"INSERT INTO files (room, size, uploaded, expiry, path) VALUES (?, ?, ?, ?, ?) RETURNING id")?;
let mut upd_file_path = tx.prepare("UPDATE files SET path = ? WHERE id = ?")?;
let mut upd_room_image = tx.prepare("UPDATE rooms SET image = ? WHERE id = ?")?;
let mut ins_room_mod = tx.prepare(
"INSERT INTO user_permission_overrides (room, user, moderator, admin) VALUES (?, (SELECT id FROM users WHERE session_id = ?), TRUE, TRUE) \
ON CONFLICT DO UPDATE SET banned = FALSE, read = TRUE, write = TRUE, moderator = TRUE, admin = TRUE")?;
let mut ins_room_ban = tx.prepare(
"INSERT INTO user_permission_overrides (room, user, banned) VALUES (?, (SELECT id FROM users WHERE session_id = ?), TRUE) \
ON CONFLICT DO UPDATE SET banned = TRUE")?;
let mut ins_room_activity = tx.prepare(
"INSERT INTO room_users (room, user, last_active) VALUES (?, (SELECT id FROM users WHERE session_id = ?), ?) \
ON CONFLICT DO UPDATE SET last_active = excluded.last_active WHERE excluded.last_active > last_active")?;
let mut upd_user_activity = tx.prepare(
"UPDATE users SET last_active = ?1 WHERE session_id = ?2 AND last_active < ?1",
)?;
for room in rooms {
let room_db_filename = format!("rooms/{}.db", room.token);
let room_db = Path::new(&room_db_filename);
if !room_db.exists() {
warn!("Skipping room {}: {} does not exist", room.token, room_db.display());
continue;
}
info!("Importing room {}...", room.token);
let room_id =
ins_room.query_row(params![room.token, room.name], |row| row.get::<_, i64>(0))?;
let rconn = Connection::open_with_flags(room_db, OpenFlags::SQLITE_OPEN_READ_ONLY)?;
/*
Messages were stored in this:
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY,
public_key TEXT,
timestamp INTEGER,
data TEXT,
signature TEXT,
is_deleted INTEGER
);
where public_key is the session_id (in hex), timestamp is in milliseconds since unix epoch,
data and signature are in base64 (wtf), data is typically padded from the client (i.e. to
the next multiple, with lots of 0s on the end). If the message was deleted then it remains
here but `is_deleted` is set to 1 (data are signature should be NULL as well, but older
versions apparently didn't do that), plus we have a row in here:
CREATE TABLE IF NOT EXISTS deleted_messages (
id INTEGER PRIMARY KEY,
deleted_message_id INTEGER
);
where the `id` of this table is returned to the Session client so that they can query for
"deletions since [id]".
This introduces some major complications, though: Session message polling works by
requesting messages (and deletions) since a given id, but we can't preserve IDs because
there are guaranteed to be duplicates across rooms. So we use this room_import_hacks
defined above to figure this out:
- if requesting messages in a room since some id <= old_message_id_max then we
actually query messages in the room since id + message_id_offset.
Deletions doesn't have the same complication because in the new database they use a
monotonic updates field that we can make conform (for imported rows) to the imported
deletion ids.
*/
let mut id_offset: i64 =
tx.query_row("SELECT COALESCE(MAX(id), 0) + 1 FROM messages", [], |row| {
row.get(0)
})?;
let mut top_old_id: i64 = -1;
let mut updated: i64 = 0;
let mut imported_msgs: i64 = 0;
struct Msg {
id: i64,
session_id: String,
ts_ms: i64,
data: Option<String>,
signature: Option<String>,
deleted: Option<i64>,
}
let n_msgs: i64 =
rconn.query_row("SELECT COUNT(*) FROM messages", [], |row| row.get(0))?;
let mut msg_st = rconn.prepare("\
SELECT messages.id, public_key, timestamp, data, signature, is_deleted, deleted_messages.id \
FROM messages LEFT JOIN deleted_messages ON messages.id = deleted_messages.deleted_message_id
ORDER BY messages.id")?;
let mut msg_rows = msg_st.query([])?;
let mut last_id: i64 = -1;
let mut dupe_dels: i64 = 0;
while let Some(row) = msg_rows.next()? {
let msg = Msg {
id: row.get(0)?,
session_id: row.get(1)?,
ts_ms: row.get(2)?,
data: row.get(3)?,
signature: row.get(4)?,
deleted: if row.get::<_, Option<bool>>(5)?.unwrap_or(false) {
Some(row.get(6)?)
} else {
None
},
};
if top_old_id == -1 {
id_offset -= msg.id;
}
if msg.id > top_old_id {
top_old_id = msg.id;
}
if msg.id == last_id {
// There are duplicates in the deleted_messages table (WTF) that can give us
// multiple rows through the join, so skip duplicates if they occur.
dupe_dels += 1;
continue;
} else {
last_id = msg.id;
}
ins_user.execute(params![msg.session_id])?;
if msg.data.is_some() && msg.signature.is_some() && msg.deleted.is_none() {
// Regular message
// Data was pointlessly store padding, so unpad it:
let padded_data = match base64::decode(msg.data.unwrap()) {
Ok(d) => d,
Err(e) => panic!(
"Unexpected data: {} message id={} has non-base64 data ({})",
room_db.display(),
msg.id,
e
),
};
let data_size = padded_data.len();
let data = match padded_data.iter().rposition(|&c| c != 0u8) {
Some(last) => &padded_data[0..=last],
None => &padded_data,
};
let sig = match base64::decode(msg.signature.unwrap()) {
Ok(d) if d.len() == 64 => d,
Ok(_) => panic!(
"Unexpected data: {} message id={} has invalid signature",
room_db.display(),
msg.id
),
Err(e) => panic!(
"Unexpected data: {} message id={} has non-base64 signature ({})",
room_db.display(),
msg.id,
e
),
};
ins_msg.execute(params![
msg.id + id_offset,
room_id,
msg.session_id,
(msg.ts_ms as f64) / 1000.,
data,
data_size,
sig
])?;
} else if msg.deleted.is_some() &&
// Deleted messages are usually set to the fixed string "deleted" (why not
// NULL?) for data and signature, so accept either null or that string if the
// other columns indicate a deleted message.
(msg.data.is_none() || msg.data.as_ref().unwrap() == "deleted") &&
(msg.signature.is_none() || msg.signature.as_ref().unwrap() == "deleted")
{
updated += 1;
// Deleted message; we still need to insert a tombstone for it, and copy the
// deletion id as the "updated" field. (We do this with a second query because the
// first query is going to trigger an automatic update of the field).
ins_msg.execute(params![
msg.id + id_offset,
room_id,
msg.session_id,
(msg.ts_ms as f64) / 1000.,
Null,
Null,
Null
])?;
} else {
panic!("Inconsistent message in {} database: message id={} has inconsistent deletion state (data: {}, signature: {}, del row: {})",
room_db.display(), msg.id, msg.data.is_some(), msg.signature.is_some(), msg.deleted.is_some());
}
upd_msg_updated.execute(params![updated, msg.id + id_offset])?;
imported_msgs += 1;
if imported_msgs % 1000 == 0 {
info!("- ... imported {}/{} messages", imported_msgs, n_msgs);
}
}
info!(
"- migrated {} messages, {} duplicate deletions ignored",
imported_msgs, dupe_dels
);
upd_room_updates.execute(params![updated, room_id])?;
// If we have to offset rowids then make sure the hack table exists and insert our hack.
if id_offset != 0 {
used_room_hacks = true;
ins_room_hack.execute(params![room_id, top_old_id, id_offset])?;
}
let mut imported_files: i64 = 0;
let n_files: i64 =
rconn.query_row("SELECT COUNT(*) FROM files", [], |row| row.get(0))?;
// WTF is this id stored as a TEXT?
struct File {
id: String,
ts: i64,
}
let mut rows_st = rconn.prepare("SELECT id, timestamp FROM files")?;
let mut file_rows = rows_st.query([])?;
while let Some(row) = file_rows.next()? {
let file = File { id: row.get(0)?, ts: row.get(1)? };
let old_id = match file.id.parse::<i64>() {
Ok(id) => id,
Err(e) => {
panic!("Invalid fileid '{}' found in {}: {}", file.id, room_db.display(), e)
}
};
let old_path = format!("files/{}_files/{}", room.token, old_id);
let size = match fs::metadata(&old_path) {
Ok(md) => md.len(),
Err(e) => {
warn!(
"Error accessing file {} ({}); skipping import of this upload",
old_path, e
);
continue;
}
};
let ts = if file.ts > 10000000000 {
warn!(
"- file {} has nonsensical timestamp {}; importing it with current time",
old_path, file.ts
);
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs_f64()
} else {
file.ts as f64
};
let new_id = ins_file.query_row(
params![
room_id,
size,
ts,
ts + handlers::UPLOAD_DEFAULT_EXPIRY.as_secs_f64(),
old_path
],
|row| row.get::<_, i64>(0),
)?;
ins_file_hack.execute(params![room_id, old_id, new_id])?;
imported_files += 1;
if imported_files % 1000 == 0 {
info!("- ... imported {}/{} files", imported_files, n_files);
}
}
if imported_files > 0 {
used_file_hacks = true;
}
info!("- migrated {} files", imported_files);
// There's also a potential room image, which is just stored on disk and not referenced in
// the database at all because why not.
//
// Unlike the regular files (which will expire in 15 days) this one doesn't expire, so
// link it into the new uploads directory so that (after 15 days) the old dirs can be
// cleared out.
let room_image_path = format!("files/{}", room.token);
if let Ok(md) = fs::metadata(&room_image_path) {
let files_dir = format!("uploads/{}", room.token);
if let Err(e) = std::fs::create_dir_all(&files_dir) {
panic!("Unable to mkdir {} for room file storage: {}", files_dir, e);
}
let file_id = ins_file.query_row(
params![
room_id,
md.len(),
md.mtime() as f64 + md.mtime_nsec() as f64 * 1e-9,
Null,
"tmp"
],
|row| row.get::<_, i64>(0),
)?;
let new_image_path = format!("uploads/{}/{}_(unnamed)", room.token, file_id);
if let Err(e) = fs::hard_link(&room_image_path, &new_image_path) {
panic!(
"Unable to hard link room image file {} => {}: {}",
room_image_path, new_image_path, e
);
}
upd_file_path.execute(params![new_image_path, file_id])?;
upd_room_image.execute(params![file_id, room_id])?;
// Don't need a file hack row because the room image isn't reference by id from
// existing clients.
info!("- migrated room image");
} else {
info!("- no room image");
}
// Banned users.
let mut imported_bans: i64 = 0;
let mut ban_st = rconn.prepare("SELECT public_key FROM block_list")?;
let mut ban_rows = ban_st.query([])?;
while let Some(row) = ban_rows.next()? {
let banned_id: String = row.get(0)?;
ins_user.execute(params![banned_id])?;
ins_room_ban.execute(params![room_id, banned_id])?;
imported_bans += 1;
}
// Moderators. Since the older version didn't have the concept of moderators and admins,
// old moderators had all the permissions that new admins have, so import them all as
// admins.
let mut imported_mods: i64 = 0;
let mut mods_st = rconn.prepare("SELECT public_key from moderators")?;
let mut mod_rows = mods_st.query([])?;
while let Some(row) = mod_rows.next()? {
let mod_id: String = row.get(0)?;
ins_user.execute(params![mod_id])?;
ins_room_mod.execute(params![room_id, mod_id])?;
imported_mods += 1;
}
// User activity
let mut imported_activity: i64 = 0;
let mut imported_active: i64 = 0;
// Don't import rows we're going to immediately prune:
let import_cutoff = (SystemTime::now() - storage::ROOM_ACTIVE_PRUNE_THRESHOLD)
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs_f64();
let n_activity: i64 = rconn.query_row(
"SELECT COUNT(*) FROM user_activity WHERE last_active > ?",
params![import_cutoff],
|row| row.get(0),
)?;
let mut activity_st = rconn.prepare("SELECT public_key, last_active FROM user_activity WHERE last_active > ? AND public_key IS NOT NULL")?;
let mut act_rows = activity_st.query(params![import_cutoff])?;
let cutoff = (SystemTime::now() - handlers::ROOM_DEFAULT_ACTIVE_THRESHOLD)
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs_f64();
while let Some(row) = act_rows.next()? {
let session_id: String = row.get(0)?;
let ts: f64 = row.get::<_, i64>(1)? as f64;
ins_user.execute(params![session_id])?;
ins_room_activity.execute(params![room_id, session_id, ts])?;
upd_user_activity.execute(params![ts, session_id])?;
if ts >= cutoff {
imported_active += 1;
}
imported_activity += 1;
if imported_activity % 1000 == 0 {
info!(
"- ... imported {}/{} user activity records ({} active)",
imported_activity, n_activity, imported_active
);
}
}
warn!("Imported room {}: {} messages, {} files, {} moderators, {} bans, {} users ({} active)",
room.token, imported_msgs, imported_files, imported_mods, imported_bans, imported_activity, imported_active);
}
if !used_room_hacks {
tx.execute("DROP TABLE room_import_hacks", [])?;
}
if !used_file_hacks {
tx.execute("DROP TABLE file_id_hacks", [])?;
}
}
tx.commit()?;
warn!("Import finished!");
Ok(())
}

View File

@ -1,52 +1,304 @@
use serde::{Deserialize, Serialize};
use base64;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
#[derive(Debug, Deserialize, Serialize)]
pub struct Message {
pub server_id: Option<i64>,
pub public_key: Option<String>,
pub timestamp: i64,
pub data: String,
pub signature: String,
pub struct User {
pub id: i64,
pub session_id: String,
pub created: f64,
pub last_active: f64,
pub banned: bool,
pub moderator: bool,
pub admin: bool,
}
impl Message {
pub fn is_valid(&self) -> bool {
return self.timestamp > 0 && !self.data.is_empty() && !self.signature.is_empty();
impl User {
pub fn from_row(row: &rusqlite::Row) -> Result<User, rusqlite::Error> {
return Ok(User {
id: row.get(row.column_index("id")?)?,
session_id: row.get(row.column_index("session_id")?)?,
created: row.get(row.column_index("created")?)?,
last_active: row.get(row.column_index("last_active")?)?,
banned: row.get(row.column_index("banned")?)?,
moderator: row.get(row.column_index("moderator")?)?,
admin: row.get(row.column_index("admin")?)?,
});
}
}
#[derive(Debug, Deserialize, Serialize)]
pub struct DeletedMessage {
fn as_opt_base64<S>(val: &Option<Vec<u8>>, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
s.serialize_str(&base64::encode(val.as_ref().unwrap()))
}
/// Old message structure returned by the deprecated compact_poll endpoint.
#[derive(Debug, Serialize)]
pub struct OldMessage {
/// Server-side message id. Migration: this becomes `id` in the new Message format.
pub server_id: i64,
/// Session id of the poster. Omitted when the information isn't available/useful (such as
/// submitting new messages). Migration: this becomes `session_id` in the new Message format.
#[serde(skip_serializing_if = "Option::is_none")]
pub public_key: Option<String>,
/// Timestamp, in unix epoch milliseconds. Migration: in the new Message format this value is
/// a floating point value (rather than integer) *and* is returned as actual unix time (i.e.
/// seconds) rather than milliseconds.
pub timestamp: i64,
/// Message data, encoded in base64
#[serde(serialize_with = "as_opt_base64")]
pub data: Option<Vec<u8>>,
/// XEd25519 message signature of the `data` bytes (not the base64 representation), encoded in
/// base64
#[serde(serialize_with = "as_opt_base64")]
pub signature: Option<Vec<u8>>,
}
impl OldMessage {
pub fn from_row(row: &rusqlite::Row) -> Result<OldMessage, rusqlite::Error> {
let mut data: Option<Vec<u8>> = row.get(row.column_index("data")?)?;
repad(&mut data, row.get::<_, Option<usize>>(row.column_index("data_size")?)?);
let session_id = match row.column_index("session_id") {
Ok(index) => Some(row.get(index)?),
Err(_) => None,
};
return Ok(OldMessage {
server_id: row.get(row.column_index("id")?)?,
public_key: session_id,
timestamp: (row.get::<_, f64>(row.column_index("posted")?)? * 1000.0) as i64,
data,
signature: row.get(row.column_index("signature")?)?,
});
}
}
#[derive(Debug, Serialize)]
pub struct Message {
/// The message id.
pub id: i64,
/// The session ID of the user who posted this message, in hex. Omitted in contexts where the
/// information isn't available or isn't useful, such as when inserting a message.
#[serde(skip_serializing_if = "Option::is_none")]
pub session_id: Option<String>,
/// unix timestamp of when the message was received on the server.
pub timestamp: f64,
/// unix timestamp of when the message was last edited (null if never edited).
pub edited: Option<f64>,
/// set to the room's current `updates` value at the time this message was created, last
/// edited, or deleted.
pub updated: i64,
/// The message data, encoded in base64. This field is omitted if the message is deleted.
#[serde(skip_serializing_if = "Option::is_none", serialize_with = "as_opt_base64")]
pub data: Option<Vec<u8>>,
/// The message signature, encoded in base64. This field is omitted if the message is deleted.
#[serde(skip_serializing_if = "Option::is_none", serialize_with = "as_opt_base64")]
pub signature: Option<Vec<u8>>,
/// Flag set to true if the message is deleted, and omitted otherwise.
#[serde(skip_serializing_if = "Option::is_none")]
pub deleted: Option<bool>,
}
fn repad(data: &mut Option<Vec<u8>>, size: Option<usize>) {
if let Some(size) = size {
if data.is_some() && data.as_ref().unwrap().len() < size {
data.as_mut().unwrap().resize(size, 0);
}
}
}
impl Message {
pub fn from_row(row: &rusqlite::Row) -> Result<Message, rusqlite::Error> {
let mut data: Option<Vec<u8>> = row.get(row.column_index("data")?)?;
repad(&mut data, row.get::<_, Option<usize>>(row.column_index("data_size")?)?);
let deleted = if data.is_none() { Some(true) } else { None };
let session_id = match row.column_index("session_id") {
Ok(index) => Some(row.get(index)?),
Err(_) => None,
};
return Ok(Message {
id: row.get(row.column_index("id")?)?,
session_id,
timestamp: row.get(row.column_index("posted")?)?,
edited: row.get(row.column_index("edited")?)?,
updated: row.get(row.column_index("updated")?)?,
data,
signature: row.get(row.column_index("signature")?)?,
deleted,
});
}
}
fn bytes_from_base64<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
use serde::de::Error;
String::deserialize(deserializer)
.and_then(|str| base64::decode(&str).map_err(|err| Error::custom(err.to_string())))
}
#[derive(Debug, Deserialize)]
pub struct PostMessage {
#[serde(deserialize_with = "bytes_from_base64")]
pub data: Vec<u8>,
#[serde(deserialize_with = "bytes_from_base64")]
pub signature: Vec<u8>,
}
#[derive(Debug, Serialize)]
pub struct DeletedMessage {
#[serde(rename = "id")]
pub updated: i64,
pub deleted_message_id: i64,
}
#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Serialize)]
pub struct Room {
pub id: String,
#[serde(skip)]
pub id: i64,
#[serde(rename = "id")]
pub token: String,
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub image_file_id: Option<i64>,
pub created: f64,
pub updates: i64,
pub default_read: bool,
pub default_write: bool,
pub default_upload: bool,
}
#[derive(Debug, Deserialize, Serialize)]
impl Room {
pub fn from_row(row: &rusqlite::Row) -> Result<Room, rusqlite::Error> {
return Ok(Room {
id: row.get(row.column_index("id")?)?,
token: row.get(row.column_index("token")?)?,
name: row.get(row.column_index("name")?)?,
description: row.get(row.column_index("description")?)?,
image_file_id: row.get(row.column_index("image")?)?,
created: row.get(row.column_index("created")?)?,
updates: row.get(row.column_index("updates")?)?,
default_read: row.get(row.column_index("read")?)?,
default_write: row.get(row.column_index("write")?)?,
default_upload: row.get(row.column_index("upload")?)?,
});
}
}
// FIXME: this appears to be used for both add/remove. But what if we want to promote to admin, or
// demote to moderator?
#[derive(Debug, Deserialize)]
pub struct ChangeModeratorRequestBody {
pub public_key: String,
pub room_id: String,
#[serde(rename = "room_id")]
pub room_token: String,
#[serde(rename = "public_key")]
pub session_id: String,
pub admin: Option<bool>,
}
#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Deserialize)]
pub struct PollRoomMetadata {
/// Token of the room to poll
pub room: String,
/// The last `info_update` value the client has; results are only returned if the room has been
/// modified since the value provided by the client.
pub since_update: i64,
}
#[derive(Debug, Serialize)]
pub struct RoomDetails {
/// The token of this room
pub token: String,
/// Number of recently active users in the room
pub active_users: i64,
/// Metadata of the room; this omitted from the response when polling if the room metadata
/// (other than active user count) has not changed since the request update counter.
#[serde(skip_serializing_if = "Option::is_none")]
pub details: Option<RoomMetadata>,
}
#[derive(Debug, Serialize)]
pub struct RoomMetadata {
/// A counter that is updated whenever this room's metadata changes; clients are expected to
/// poll for updates using this id.
pub info_update: i64,
/// Unix timestamp (seconds since epoch) when this room was created
pub created: f64,
/// The human-readable room name
pub name: String,
/// Text description of the room
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
/// ID of an uploaded file that contains the image for this room
#[serde(skip_serializing_if = "Option::is_none")]
pub image_id: Option<i64>,
/// IDs of any pinned message in this room.
pub pinned_messages: Vec<i64>,
/// List of non-admin public moderator Session IDs. This list includes both room-specific and
/// global moderators, but not admins and only if the moderator is configured as visible.
pub moderators: Vec<String>,
/// List of public admin session IDs for this room. In addition to everything moderators can
/// do, admins can also add/remove/ban other moderators and admins. As with `moderators` only
/// visible admins are included.
pub admins: Vec<String>,
/// List of hidden moderator Session IDs. This field is omitted if the requesting user is not
/// a moderator or admin.
#[serde(skip_serializing_if = "Option::is_none")]
pub hidden_mods: Option<Vec<String>>,
/// List of hidden admin Session IDs. This field is omitted if the requesting user is not a
/// moderator or admin.
#[serde(skip_serializing_if = "Option::is_none")]
pub hidden_admins: Option<Vec<String>>,
/// Will be present and true if the requesting user has moderator powers, omitted otherwise.
#[serde(skip_serializing_if = "Option::is_none")]
pub moderator: Option<bool>,
/// Will be present and true if the requesting user has admin powers, omitted otherwise.
pub admin: Option<bool>,
}
#[derive(Debug, Deserialize)]
pub struct PollRoomMessages {
/// Token of the room to poll for messages.
pub room: String,
/// Return new messages, edit, and deletions posted since this `updates` value. Clients should
/// poll with the most recent updates value they have received.
pub since_update: i64,
}
#[derive(Debug, Serialize)]
pub struct RoomMessages {
/// The token of this room
pub room: String,
/// Vector of new/edited/deleted message posted to the room since the requested update.
pub messages: Vec<Message>,
}
#[derive(Debug, Deserialize)]
pub struct CompactPollRequestBody {
pub room_id: String,
pub auth_token: String,
pub from_deletion_server_id: Option<i64>,
#[serde(rename = "room_id")]
pub room_token: String,
// Deprecated: older Session clients pass the authorization token through this. Newer clients
// should use signed requests instead.
pub auth_token: Option<String>,
// Input parameters to query. If these are omitted (or null) then this returns the latest 256
// messages/deletions, in reverse order from what you get with regular polling. New clients
// should update to the new polling endpoints ASAP.
pub from_message_server_id: Option<i64>,
pub from_deletion_server_id: Option<i64>,
}
#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Serialize)]
pub struct CompactPollResponseBody {
pub room_id: String,
#[serde(rename = "room_id")]
pub room_token: String,
pub status_code: u16,
pub deletions: Vec<DeletedMessage>,
pub messages: Vec<Message>,
pub messages: Vec<OldMessage>,
pub moderators: Vec<String>,
}

View File

@ -23,25 +23,25 @@ struct OnionRequestPayloadMetadata {
pub async fn handle_onion_request(blob: warp::hyper::body::Bytes) -> Result<Response, Rejection> {
let payload = parse_onion_request_payload(blob)?;
let (plaintext, symmetric_key) = decrypt_onion_request_payload(payload)?;
// From this point on we can wrap any error that occurs in a HTTP response that's
// encrypted with the given symmetric key, so that the error that occurred is
// propagated back to the client that made the onion request.
// From this point on we can wrap any error that occurs in a HTTP response that's encrypted
// with the given symmetric key, so that the error that occurred is propagated back to the
// client that made the onion request.
//
// If an error occurred before this point we'll have responded to the Service Node
// with a unsuccessful status code, which it'll have propagated back to the client
// as a "Loki server error" (i.e. the actual error is hidden from the client that
// made the onion request). This is unfortunate but cannot be solved without
// fundamentally changing how onion requests work.
// If an error occurred before this point we'll have responded to the Service Node with a
// unsuccessful status code, which it'll have propagated back to the client as a "Loki server
// error" (i.e. the actual error is hidden from the client that made the onion request). This
// is unfortunate but cannot be solved without fundamentally changing how onion requests work.
return handle_decrypted_onion_request(&plaintext, &symmetric_key).await;
}
async fn handle_decrypted_onion_request(
plaintext: &[u8], symmetric_key: &[u8],
plaintext: &[u8],
symmetric_key: &[u8],
) -> Result<Response, Rejection> {
let rpc_call = match serde_json::from_slice(plaintext) {
Ok(rpc_call) => rpc_call,
Err(e) => {
warn!("Couldn't parse RPC call from JSON due to error: {}.", e);
warn!("Couldn't parse RPC call from JSON: {}.", e);
return Err(warp::reject::custom(Error::InvalidOnionRequest));
}
};
@ -49,16 +49,20 @@ async fn handle_decrypted_onion_request(
let result = rpc::handle_rpc_call(rpc_call)
.await
// Turn any error that occurred into an HTTP response
// Unwrapping is safe because at this point any error should be caught and turned into an HTTP response (i.e. an OK result)
// Unwrapping is safe because at this point any error should be caught and turned into an
// HTTP response (i.e. an OK result)
.or_else(super::errors::into_response)?;
// Encrypt the HTTP response so that it's propagated back to the client that made the onion request
// Encrypt the HTTP response so that it's propagated back to the client that made the onion
// request
return encrypt_response(result, symmetric_key).await;
}
fn parse_onion_request_payload(
blob: warp::hyper::body::Bytes,
) -> Result<OnionRequestPayload, Rejection> {
// The encoding of an onion request looks like: | 4 bytes: size N of ciphertext | N bytes: ciphertext | json as utf8 |
// The encoding of an onion request looks like:
//
// | 4 bytes: size N of ciphertext | N bytes: ciphertext | json as utf8 |
if blob.len() < 4 {
warn!("Ignoring blob of invalid size.");
return Err(warp::reject::custom(Error::InvalidOnionRequest));
@ -78,7 +82,7 @@ fn parse_onion_request_payload(
let json = match String::from_utf8(utf8_json) {
Ok(json) => json,
Err(e) => {
warn!("Couldn't parse onion request payload metadata due to error: {}.", e);
warn!("Couldn't parse onion request payload metadata: {}.", e);
return Err(warp::reject::custom(Error::InvalidOnionRequest));
}
};
@ -86,7 +90,7 @@ fn parse_onion_request_payload(
let metadata: OnionRequestPayloadMetadata = match serde_json::from_str(&json) {
Ok(metadata) => metadata,
Err(e) => {
warn!("Couldn't parse onion request payload metadata due to error: {}.", e);
warn!("Couldn't parse onion request payload metadata: {}.", e);
return Err(warp::reject::custom(Error::InvalidOnionRequest));
}
};
@ -99,7 +103,8 @@ fn parse_onion_request_payload(
return Ok(OnionRequestPayload { ciphertext, metadata });
}
/// Returns the decrypted `payload.ciphertext` plus the `symmetric_key` that was used for decryption if successful.
/// Returns the decrypted `payload.ciphertext` plus the `symmetric_key` that was used for
/// decryption if successful.
fn decrypt_onion_request_payload(
payload: OnionRequestPayload,
) -> Result<(Vec<u8>, Vec<u8>), Rejection> {

View File

@ -2,8 +2,8 @@ use std::net::Ipv4Addr;
use structopt::StructOpt;
// The default is * not * to run in TLS mode. This is because normally the server communicates through
// onion requests, eliminating the need for TLS.
// The default is *not* to run in TLS mode. This is because normally the server communicates
// through onion requests, eliminating the need for TLS.
#[derive(StructOpt)]
#[structopt(name = "Session Open Group Server")]
@ -28,6 +28,10 @@ pub struct Opt {
#[structopt(long = "log-file")]
pub log_file: Option<String>,
/// Log level, one of: trace, debug, info, warn, error. If omitted the default is info.
#[structopt(long = "log-level")]
pub log_level: Option<String>,
/// Run in TLS mode.
#[structopt(long = "tls")]
pub tls: bool,
@ -40,19 +44,21 @@ pub struct Opt {
#[structopt(long = "tls-private-key", default_value = "tls_private_key.pem")]
pub tls_private_key: String,
/// Add a room with the given ID and name.
/// Add a room: call with the token string followed by a descriptive room name.
#[structopt(long = "add-room")]
pub add_room: Option<Vec<String>>,
/// Deletes the room with the given ID.
/// Deletes the room with the given token.
#[structopt(long = "delete-room")]
pub delete_room: Option<String>,
/// Makes the given public key a moderator for the room with the given ID.
/// Makes the given public key a moderator for the given room. Call with the moderator public
/// key followed by the room token.
#[structopt(long = "add-moderator")]
pub add_moderator: Option<Vec<String>>,
/// Removes moderator permission for the given public key in the room with the given ID.
/// Removes moderator permission for the given public key in the given room. Call with the
/// moderator public key followed by the room token.
#[structopt(long = "delete-moderator")]
pub delete_moderator: Option<Vec<String>>,

View File

@ -104,7 +104,8 @@ pub async fn root_html() -> Result<Response, Rejection> {
}
pub async fn fallback_html(
room: String, query_map: HashMap<String, String>,
room: String,
query_map: HashMap<String, String>,
) -> Result<Response, Rejection> {
if !query_map.contains_key("public_key") || room == "" {
return fallback_nopubkey_html().await;

View File

@ -1,12 +1,15 @@
use std::collections::HashMap;
use ed25519_dalek;
use log::warn;
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use serde_json::json;
use warp::{http::StatusCode, reply::Reply, reply::Response, Rejection};
use super::crypto;
use super::errors::Error;
use super::handlers;
use super::models;
use super::models::{self, Room, User};
use super::storage;
#[allow(dead_code)]
@ -15,94 +18,197 @@ pub enum Mode {
OpenGroupServer,
}
#[derive(Deserialize, Serialize, Debug)]
#[derive(Deserialize, Debug)]
pub struct RpcCall {
pub endpoint: String,
pub body: String,
pub method: String,
// TODO: deprecate headers; currently it only ever contains Authorization (for a deprecated
// token) and Room, which we should replace by encoding the room token in the endpoint.
pub headers: HashMap<String, String>,
// For new, token-less requests; requests without these should be considered deprecated.
/// Ed25519 pubkey, in hex (will be used to derive the Session key by converting and prepending
/// 05).
pub ed25519_pubkey: Option<String>,
/// Arbitrary string; must be different on each request
pub nonce: Option<String>,
/// Ed25519 signature (in base64 or hex) of (method || endpoint || body || nonce)
pub signature: Option<String>,
}
pub const MODE: Mode = Mode::OpenGroupServer;
pub async fn handle_rpc_call(rpc_call: RpcCall) -> Result<Response, Rejection> {
// Gets a user from a reflected auth token. Returns None if there is no auth token, User if there
// is a parseable auth token, and an error for anything else.
fn get_user_from_auth_header(
conn: &rusqlite::Connection,
rpc: &RpcCall,
) -> Result<Option<User>, Error> {
if let Some(auth_token_str) = rpc.headers.get("Authorization") {
return Ok(Some(handlers::get_user_from_token(conn, auth_token_str)?));
}
return Ok(None);
}
// FIXME TODO - these calls using warp::Rejection as an error type are Doing It Wrong:
// warp::Rejection means "decline this handler, try another one" but everywhere we are using it we
// mean "return an error".
//
// c.f. https://github.com/seanmonstar/warp/issues/388
pub async fn handle_rpc_call(mut rpc_call: RpcCall) -> Result<Response, Rejection> {
let have_sig = rpc_call.ed25519_pubkey.is_some();
if rpc_call.nonce.is_some() != have_sig || rpc_call.signature.is_some() != have_sig {
warn!(
"Invalid request: all or none of {{ed25519_pubkey, nonce, signature}} must be provided"
);
return Err(Error::InvalidRpcCall.into());
}
let mut user: Option<User> = None;
if have_sig {
let (edpk, _xpk, sessionid) =
crypto::get_pubkeys(rpc_call.ed25519_pubkey.as_ref().unwrap())?;
let nonce = rpc_call.nonce.as_ref().unwrap();
// TODO FIXME: reject recent pubkey/nonce combinations.
let mut sig_bytes: [u8; 64] = [0; 64];
sig_bytes.copy_from_slice(
&handlers::decode_hex_or_b64(rpc_call.signature.as_ref().unwrap(), 64)?[0..64],
);
let sig = ed25519_dalek::Signature::new(sig_bytes);
if let Err(sigerr) = crypto::verify_signature(
&edpk,
&sig,
&vec![
rpc_call.endpoint.as_bytes(),
rpc_call.method.as_bytes(),
rpc_call.body.as_bytes(),
nonce.as_bytes(),
],
) {
warn!("Signature verification failed for request from {}", sessionid);
return Err(sigerr.into());
}
let usr = handlers::insert_or_update_user(&*storage::get_conn()?, &sessionid)?;
// Check for a global ban, and if so, terminate the request right away.
if usr.banned {
return Err(Error::Unauthorized.into());
}
user = Some(usr);
}
// Check that the endpoint is a valid URI and deconstruct it into a path
// and query parameters.
// Adding "http://placeholder.io" in front of the endpoint is a workaround
// for the fact that the URL crate doesn't accept relative URLs. There are
// other (cleaner) ways to fix this but they tend to be much more complex.
let raw_uri = format!("http://placeholder.io/{}", rpc_call.endpoint.trim_start_matches('/'));
let path: String = match raw_uri.parse::<http::Uri>() {
Ok(uri) => uri.path().trim_start_matches('/').to_string(),
if !rpc_call.endpoint.starts_with('/') {
rpc_call.endpoint = format!("/{}", rpc_call.endpoint);
}
let path: String;
let query_params: HashMap<String, String>;
match rpc_call.endpoint.parse::<http::Uri>() {
Ok(uri) => {
path = uri.path().trim_start_matches('/').to_string();
query_params = match uri.query() {
Some(qs) => form_urlencoded::parse(qs.as_bytes()).into_owned().collect(),
None => HashMap::new(),
};
}
Err(e) => {
warn!("Couldn't parse URI from: {} due to error: {}.", &raw_uri, e);
return Err(warp::reject::custom(Error::InvalidRpcCall));
warn!("Couldn't parse URI from '{}': {}.", &rpc_call.endpoint, e);
return Err(Error::InvalidRpcCall.into());
}
};
let query_params: HashMap<String, String> = match url::Url::parse(&raw_uri) {
Ok(url) => url.query_pairs().into_owned().collect(),
Err(e) => {
warn!("Couldn't parse URL from: {} due to error: {}.", &raw_uri, e);
return Err(warp::reject::custom(Error::InvalidRpcCall));
}
};
// Get the auth token if possible
let auth_token = get_auth_token(&rpc_call);
// Get the room ID
let room_id_str = get_room_id(&rpc_call);
// TODO FIXME: rather than get the room from a header, here, we should consistently rewrite
// urls to include the room identifier, e.g. POST /r/room123/message (or similar), and handle
// that in the handle_xxx_request.
// Get the room and check that it exists, if a room is provided
let room = get_room(&rpc_call)?;
// Get the user from an auth token (and we are not using signed requests, in which case we
// already know the user).
if room.is_some() && user.is_none() {
user = get_user_from_auth_header(&*storage::get_conn()?, &rpc_call)?;
}
// Switch on the HTTP method
match rpc_call.method.as_ref() {
"GET" => {
return handle_get_request(room_id_str, rpc_call, &path, auth_token, query_params).await
}
"POST" => return handle_post_request(room_id_str, rpc_call, &path, auth_token).await,
"GET" => return handle_get_request(room, rpc_call, &path, user, query_params).await,
"POST" => return handle_post_request(room, rpc_call, &path, user).await,
"DELETE" => {
let pool = get_pool_for_room(&rpc_call)?;
return handle_delete_request(rpc_call, &path, auth_token, &pool).await;
return handle_delete_request(room.ok_or(Error::NoSuchRoom)?, rpc_call, &path, user)
.await
}
_ => {
warn!("Ignoring RPC call with invalid or unused HTTP method: {}.", rpc_call.method);
return Err(warp::reject::custom(Error::InvalidRpcCall));
return Err(Error::InvalidRpcCall.into());
}
}
}
async fn handle_get_request(
room_id: Option<String>, rpc_call: RpcCall, path: &str, auth_token: Option<String>,
room: Option<Room>,
rpc_call: RpcCall,
path: &str,
user: Option<User>,
query_params: HashMap<String, String>,
) -> Result<Response, Rejection> {
let mut components: Vec<&str> = path.split('/').collect();
if components.len() == 0 {
components.push("");
}
// Handle routes that don't require authorization first
if path == "auth_token_challenge" {
if components[0] == "auth_token_challenge" && components.len() == 1 {
reject_if_file_server_mode(path)?;
let pool = get_pool_for_room(&rpc_call)?;
let challenge = handlers::get_auth_token_challenge(query_params, &pool)?;
#[derive(Debug, Deserialize, Serialize)]
struct Response {
status_code: u16,
challenge: models::Challenge,
}
let response = Response { status_code: StatusCode::OK.as_u16(), challenge };
let challenge = handlers::get_auth_token_challenge(
query_params.get("public_key").ok_or(Error::InvalidRpcCall)?,
)?;
let response = json!({ "status_code": StatusCode::OK.as_u16(), "challenge": challenge });
return Ok(warp::reply::json(&response).into_response());
} else if path.starts_with("rooms") {
}
// /rooms/* endpoint: Deprecated.
//
// Use `GET /rooms` or `GET /r/ROOMID` or `GET /r/ROOMID/file/ID` instead.
//
// FIXME TODO
if components[0] == "rooms" {
reject_if_file_server_mode(path)?;
let components: Vec<&str> = path.split('/').collect(); // Split on subsequent slashes
if components.len() == 1 {
return handlers::get_all_rooms();
} else if components.len() == 2 {
let room_id = components[1];
return handlers::get_room(&room_id);
} else if components.len() == 3 && components[2] == "image" {
let room_id = components[1];
return handlers::get_group_image(&room_id).await;
} else {
warn!("Invalid endpoint: {}.", rpc_call.endpoint);
return Err(warp::reject::custom(Error::InvalidRpcCall));
return handlers::get_all_rooms_v01x();
}
} else if path.starts_with("session_version") {
let room_token = components[1];
let room = match room {
None => storage::get_room_from_token(&*storage::get_conn()?, room_token)?,
Some(room) => {
if room.token != room_token {
warn!("Attempt to access /rooms/ROOM with mismatched path/header room tokens");
return Err(Error::InvalidRpcCall.into());
}
room
}
};
if components.len() == 2 {
return handlers::get_room_v01x(&room);
} else if components[2] == "image" && components.len() == 3 {
return handlers::get_room_image(room).await;
}
warn!("Invalid endpoint: {}.", rpc_call.endpoint);
return Err(Error::InvalidRpcCall.into());
}
if path.starts_with("session_version") {
match MODE {
Mode::OpenGroupServer => {
warn!("Ignoring RPC call with invalid or unused endpoint: {}.", path);
return Err(warp::reject::custom(Error::InvalidRpcCall));
return Err(Error::InvalidRpcCall.into());
}
Mode::FileServer => (),
}
@ -116,86 +222,98 @@ async fn handle_get_request(
};
return Ok(warp::reply::json(&response).into_response());
}
// This route requires auth in open group server mode, but not in file server mode
let pool = get_pool_for_room(&rpc_call)?;
if path.starts_with("files") {
let components: Vec<&str> = path.split('/').collect(); // Split on subsequent slashes
if components.len() != 2 {
warn!("Invalid endpoint: {}.", rpc_call.endpoint);
return Err(warp::reject::custom(Error::InvalidRpcCall));
}
let file_id: u64 = match components[1].parse() {
Ok(file_id) => file_id,
Err(_) => {
warn!("Invalid endpoint: {}.", rpc_call.endpoint);
return Err(warp::reject::custom(Error::InvalidRpcCall));
}
};
return handlers::get_file(room_id, file_id, auth_token, &pool)
.await
.map(|json| warp::reply::json(&json).into_response());
if matches!(MODE, Mode::FileServer) && path.starts_with("files") {
panic!("FIXME -- file server retrieval");
}
// Handle routes that require authorization
let auth_token = auth_token.ok_or_else(|| warp::reject::custom(Error::NoAuthToken))?;
match path {
let user = user.ok_or(Error::NoAuthToken)?;
// TODO FIXME: new endpoints:
// - /r/ROOMID - retrieves room metadata
//
// - /r/ROOMID/recent - retrieves recent messages
//
// - /r/ROOMID/message/ID - retrieve a message by ID
//
// - /r/ROOMID/file/FILEID/filename - retrieve a file by id (the "filename" part is optional and
// only suggestive)
//
// - /r/ROOMID/moderators - retrieves publicly visible room moderators and admins
//
// - /r/ROOMID/moderators/all - retrieves visible + hidden room moderators/admins (requires
// moderator permission)
//
// - /r/ROOMID/bans - retrieves banned public keys. The full list is only visible to
// moderators; for regular users this will be either empty or include just their own session
// ID (if banned).
// Everything below this point requires a room:
let room = room.ok_or(Error::NoSuchRoom)?;
// All of these are deprecated; should be using /r/ROOMID/whatever instead.
match components[0] {
"messages" => {
reject_if_file_server_mode(path)?;
let messages = handlers::get_messages(query_params, &auth_token, &pool)?;
#[derive(Debug, Deserialize, Serialize)]
struct Response {
status_code: u16,
messages: Vec<models::Message>,
}
let response = Response { status_code: StatusCode::OK.as_u16(), messages };
return Ok(warp::reply::json(&response).into_response());
return Ok(warp::reply::json(&json!({
"status_code": StatusCode::OK.as_u16(),
"messages": handlers::get_messages(query_params, user, room)?
}))
.into_response());
// FIXME: can drop `.into_response()` I think?
}
"deleted_messages" => {
reject_if_file_server_mode(path)?;
let deletions = handlers::get_deleted_messages(query_params, &auth_token, &pool)?;
#[derive(Debug, Deserialize, Serialize)]
struct Response {
status_code: u16,
ids: Vec<models::DeletedMessage>,
}
let response = Response { status_code: StatusCode::OK.as_u16(), ids: deletions };
let deletions = handlers::get_deleted_messages(query_params, user, room)?;
let response = json!({ "status_code": StatusCode::OK.as_u16(), "ids": deletions });
return Ok(warp::reply::json(&response).into_response());
}
"files" if components.len() == 2 => {
if let Ok(file_id) = components[1].parse::<i64>() {
return handlers::get_file(room, file_id, user);
}
}
"moderators" => {
reject_if_file_server_mode(path)?;
let public_keys = handlers::get_moderators(&auth_token, &pool)?;
#[derive(Debug, Deserialize, Serialize)]
struct Response {
status_code: u16,
moderators: Vec<String>,
}
let public_keys = handlers::get_moderators(&*storage::get_conn()?, &user, &room)?;
let response =
Response { status_code: StatusCode::OK.as_u16(), moderators: public_keys };
json!({ "status_code": StatusCode::OK.as_u16(), "moderators": public_keys });
return Ok(warp::reply::json(&response).into_response());
}
"block_list" => {
reject_if_file_server_mode(path)?;
return handlers::get_banned_public_keys(&auth_token, &pool);
return handlers::get_banned_public_keys(&user, &room);
}
"member_count" => {
reject_if_file_server_mode(path)?;
return handlers::get_member_count(&auth_token, &pool);
return handlers::get_member_count(user, room);
}
_ => {
warn!("Ignoring RPC call with invalid or unused endpoint: {}.", rpc_call.endpoint);
return Err(warp::reject::custom(Error::InvalidRpcCall));
}
}
_ => {}
};
warn!("Ignoring RPC call with invalid or unused endpoint: {}.", rpc_call.endpoint);
return Err(Error::InvalidRpcCall.into());
}
async fn handle_post_request(
room_id: Option<String>, rpc_call: RpcCall, path: &str, auth_token: Option<String>,
room: Option<Room>,
rpc_call: RpcCall,
path: &str,
user: Option<User>,
) -> Result<Response, Rejection> {
// Handle routes that don't require authorization first
// The compact poll endpoint expects the auth token to be in the request body; not
// in the headers.
// The compact poll endpoint expects the auth token to be in the request body; not in the
// headers.
//
// TODO FIXME: Deprecated; replace this with a /multi endpoint that takes a list of requests to
// submit (but rather than be specific to that endpoint, it would allow *any* other endpoints
// to be invoked).
if path == "compact_poll" {
reject_if_file_server_mode(path)?;
#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Deserialize)]
struct CompactPollRequestBodyWrapper {
requests: Vec<models::CompactPollRequestBody>,
}
@ -203,33 +321,25 @@ async fn handle_post_request(
Ok(bodies) => bodies,
Err(e) => {
warn!(
"Couldn't parse compact poll request body wrapper from: {} due to error: {}.",
"Couldn't parse compact poll request body wrapper from '{}': {}.",
rpc_call.body, e
);
return Err(warp::reject::custom(Error::InvalidRpcCall));
return Err(Error::InvalidRpcCall.into());
}
};
return handlers::compact_poll(wrapper.requests);
return handlers::compact_poll(user, wrapper.requests);
}
// This route requires auth in open group server mode, but not in file server mode
let pool = get_pool_for_room(&rpc_call)?;
if path == "files" {
#[derive(Debug, Deserialize)]
struct JSON {
file: String,
}
let json: JSON = match serde_json::from_str(&rpc_call.body) {
Ok(json) => json,
Err(e) => {
warn!("Couldn't parse JSON from: {} due to error: {}.", rpc_call.body, e);
return Err(warp::reject::custom(Error::InvalidRpcCall));
}
};
return handlers::store_file(room_id, &json.file, auth_token, &pool).await;
if path == "files" && matches!(MODE, Mode::FileServer) {
// This route doesn't requires auth in file server mode
// TODO FIXME
panic!("No file server mode");
}
// Handle routes that require authorization
let auth_token = auth_token.ok_or_else(|| warp::reject::custom(Error::NoAuthToken))?;
if path.starts_with("rooms") {
let user = user.ok_or(Error::NoAuthToken)?;
if path == "rooms" || path.starts_with("rooms/") {
reject_if_file_server_mode(path)?;
let components: Vec<&str> = path.split('/').collect(); // Split on subsequent slashes
if components.len() == 3 && components[2] == "image" {
@ -240,29 +350,81 @@ async fn handle_post_request(
let json: JSON = match serde_json::from_str(&rpc_call.body) {
Ok(json) => json,
Err(e) => {
warn!("Couldn't parse JSON from: {} due to error: {}.", rpc_call.body, e);
return Err(warp::reject::custom(Error::InvalidRpcCall));
warn!("Couldn't parse JSON from '{}': {}.", rpc_call.body, e);
return Err(Error::InvalidRpcCall.into());
}
};
let room_id = components[1];
return handlers::set_group_image(&json.file, &room_id, &auth_token, &pool).await;
// Why does this method pass the room differently than most of the other functions?!
let room_token = components[1];
let room = match room {
None => storage::get_room_from_token(&*storage::get_conn()?, room_token)?,
Some(room) => {
if room.token != room_token {
warn!("Attempt to access POST /rooms/ROOM with mismatched path/header room tokens");
return Err(Error::InvalidRpcCall.into());
}
room
}
};
// FIXME TODO: add an input field so that the uploader can pass the filename
let filename: Option<&str> = None;
return handlers::set_room_image(room, user, &json.file, filename).await;
} else {
warn!("Invalid endpoint: {}.", rpc_call.endpoint);
return Err(warp::reject::custom(Error::InvalidRpcCall));
return Err(Error::InvalidRpcCall.into());
}
}
// Everything below this port requires a room:
//
// FIXME -- "moderators" (which adds a moderator) might be broken by this for older clients
// because it used to take room_id *both* via the header *and* as a field in request body (and
// then only used the one in the request body). If Session is passing both room values then
// everything should be fine.
//
let room = room.ok_or(Error::NoSuchRoom)?;
match path {
"messages" => {
// FIXME TODO - Deprecated, returns old message format. Rewrite this as
// `POST /r/ROOMID/message`.
// FIXME 2: Add a `POST /r/ROOMID/message/ID` for editing a message.
reject_if_file_server_mode(path)?;
let message = match serde_json::from_str(&rpc_call.body) {
let message: models::PostMessage = match serde_json::from_str(&rpc_call.body) {
Ok(message) => message,
Err(e) => {
warn!("Couldn't parse message from: {} due to error: {}.", rpc_call.body, e);
return Err(warp::reject::custom(Error::InvalidRpcCall));
warn!("Couldn't parse message from '{}': {}.", rpc_call.body, e);
return Err(Error::InvalidRpcCall.into());
}
};
return handlers::insert_message(message, &auth_token, &pool);
return handlers::insert_message(room, user, &message.data, &message.signature);
}
"files" => {
// FIXME TODO - Deprecated; rewrite as `POST /r/ROOMID/file`, make it require a
// filename
#[derive(Debug, Deserialize)]
struct JSON {
file: String,
}
let json: JSON = match serde_json::from_str(&rpc_call.body) {
Ok(json) => json,
Err(e) => {
warn!("Couldn't parse JSON from '{}': {}.", rpc_call.body, e);
return Err(Error::InvalidRpcCall.into());
}
};
// FIXME TODO: add an input field so that the uploader can pass the filename
let filename: Option<&str> = None;
return handlers::store_file(&room, &user, &json.file, filename);
}
// FIXME: deprecate these next two separate endpoints and replace with a single
// "/r/ROOMID/ban" endpoint that has a "delete all?" flag, and has options for different
// types of bans and ban expiries.
"block_list" => {
reject_if_file_server_mode(path)?;
#[derive(Debug, Deserialize)]
@ -272,11 +434,11 @@ async fn handle_post_request(
let json: JSON = match serde_json::from_str(&rpc_call.body) {
Ok(json) => json,
Err(e) => {
warn!("Couldn't parse JSON from: {} due to error: {}.", rpc_call.body, e);
return Err(warp::reject::custom(Error::InvalidRpcCall));
warn!("Couldn't parse JSON from '{}': {}.", rpc_call.body, e);
return Err(Error::InvalidRpcCall.into());
}
};
return handlers::ban(&json.public_key, &auth_token, &pool);
return handlers::ban(&json.public_key, false, &user, &room).await;
}
"ban_and_delete_all" => {
reject_if_file_server_mode(path)?;
@ -287,40 +449,43 @@ async fn handle_post_request(
let json: JSON = match serde_json::from_str(&rpc_call.body) {
Ok(json) => json,
Err(e) => {
warn!("Couldn't parse JSON from: {} due to error: {}.", rpc_call.body, e);
return Err(warp::reject::custom(Error::InvalidRpcCall));
warn!("Couldn't parse JSON from '{}': {}.", rpc_call.body, e);
return Err(Error::InvalidRpcCall.into());
}
};
return handlers::ban_and_delete_all_messages(&json.public_key, &auth_token, &pool);
return handlers::ban(&json.public_key, true, &user, &room).await;
}
"claim_auth_token" => {
// Deprecated; has no purpose anymore (but here for older clients to not get an error)
// because we're already verified the token (and there are no ephemeral tokens
// anymore).
reject_if_file_server_mode(path)?;
#[derive(Debug, Deserialize)]
struct JSON {
public_key: String,
}
let json: JSON = match serde_json::from_str(&rpc_call.body) {
Ok(json) => json,
Err(e) => {
warn!("Couldn't parse JSON from: {} due to error: {}.", rpc_call.body, e);
return Err(warp::reject::custom(Error::InvalidRpcCall));
}
};
return handlers::claim_auth_token(&json.public_key, &auth_token, &pool);
let json = models::StatusCode { status_code: StatusCode::OK.as_u16() };
return Ok(warp::reply::json(&json).into_response());
}
"moderators" => {
// FIXME TODO - Deprecated; Rewrite as /r/ROOMID/moderator and allow it to support new
// moderator options such as being a hidden mod
reject_if_file_server_mode(path)?;
let body: models::ChangeModeratorRequestBody =
match serde_json::from_str(&rpc_call.body) {
Ok(body) => body,
Err(e) => {
warn!("Couldn't parse JSON from: {} due to error: {}.", rpc_call.body, e);
return Err(warp::reject::custom(Error::InvalidRpcCall));
warn!("Couldn't parse JSON from '{}': {}.", rpc_call.body, e);
return Err(Error::InvalidRpcCall.into());
}
};
return handlers::add_moderator_public(body, &auth_token).await;
return handlers::add_moderator_public(
room,
user,
&body.session_id,
body.admin.unwrap_or(false),
);
}
"delete_messages" => {
// FIXME TODO - Deprecated; this should be a DELETE /r/ROOMID/ID request, and if we
// need multiple deletes in one request then should use the POST /multi endpoint to
// submit them.
reject_if_file_server_mode(path)?;
#[derive(Debug, Deserialize)]
struct JSON {
@ -329,117 +494,116 @@ async fn handle_post_request(
let json: JSON = match serde_json::from_str(&rpc_call.body) {
Ok(json) => json,
Err(e) => {
warn!("Couldn't parse JSON from: {} due to error: {}.", rpc_call.body, e);
return Err(warp::reject::custom(Error::InvalidRpcCall));
warn!("Couldn't parse JSON from '{}': {}.", rpc_call.body, e);
return Err(Error::InvalidRpcCall.into());
}
};
return handlers::delete_messages(json.ids, &auth_token, &pool);
return handlers::delete_messages(json.ids, &user, &room);
}
_ => {
warn!("Ignoring RPC call with invalid or unused endpoint: {}.", path);
return Err(warp::reject::custom(Error::InvalidRpcCall));
return Err(Error::InvalidRpcCall.into());
}
}
}
async fn handle_delete_request(
rpc_call: RpcCall, path: &str, auth_token: Option<String>,
pool: &storage::DatabaseConnectionPool,
room: Room,
rpc_call: RpcCall,
path: &str,
user: Option<User>,
) -> Result<Response, Rejection> {
// Check that the auth token is present
let auth_token = auth_token.ok_or_else(|| warp::reject::custom(Error::NoAuthToken))?;
let user = user.ok_or(Error::NoAuthToken)?;
// DELETE /messages/:server_id
// FIXME TODO: Deprecated; use DELETE /r/ROOMID/message/ID instead.
if path.starts_with("messages") {
reject_if_file_server_mode(path)?;
let components: Vec<&str> = path.split('/').collect(); // Split on subsequent slashes
if components.len() != 2 {
warn!("Invalid endpoint: {}.", path);
return Err(warp::reject::custom(Error::InvalidRpcCall));
return Err(Error::InvalidRpcCall.into());
}
let server_id: i64 = match components[1].parse() {
Ok(server_id) => server_id,
Err(_) => {
warn!("Invalid endpoint: {}.", path);
return Err(warp::reject::custom(Error::InvalidRpcCall));
return Err(Error::InvalidRpcCall.into());
}
};
return handlers::delete_message(server_id, &auth_token, pool);
return handlers::delete_message(&*storage::get_conn()?, server_id, &user, &room);
}
// DELETE /block_list/:public_key
// FIXME TODO: Deprecated; use DELETE /r/ROOMID/unban/ID instead.
if path.starts_with("block_list") {
reject_if_file_server_mode(path)?;
let components: Vec<&str> = path.split('/').collect(); // Split on subsequent slashes
if components.len() != 2 {
warn!("Invalid endpoint: {}.", path);
return Err(warp::reject::custom(Error::InvalidRpcCall));
return Err(Error::InvalidRpcCall.into());
}
let public_key = components[1].to_string();
return handlers::unban(&public_key, &auth_token, pool);
return handlers::unban(&public_key, &user, &room);
}
// DELETE /auth_token
// DELETE /auth_token. Deprecated and does nothing.
if path == "auth_token" {
reject_if_file_server_mode(path)?;
return handlers::delete_auth_token(&auth_token, pool);
// No-op; this is here for backwards compat with Session clients that try to use auth
// tokens.
let json = models::StatusCode { status_code: StatusCode::OK.as_u16() };
return Ok(warp::reply::json(&json).into_response());
}
// DELETE /moderators/:public_key
// FIXME TODO: Deprecated; use DELETE /r/ROOMID/moderator/SESSIONID
if path.starts_with("moderators") {
reject_if_file_server_mode(path)?;
let components: Vec<&str> = path.split('/').collect(); // Split on subsequent slashes
if components.len() != 2 {
warn!("Invalid endpoint: {}.", path);
return Err(warp::reject::custom(Error::InvalidRpcCall));
return Err(Error::InvalidRpcCall.into());
}
let public_key = components[1].to_string();
let room_id = match get_room_id(&rpc_call) {
Some(room_id) => room_id,
let session_id = components[1].to_string();
let room = match get_room(&rpc_call)? {
Some(room) => room,
None => {
warn!("Missing room ID.");
return Err(warp::reject::custom(Error::InvalidRpcCall));
return Err(Error::InvalidRpcCall.into());
}
};
let body = models::ChangeModeratorRequestBody { public_key, room_id };
return handlers::delete_moderator_public(body, &auth_token).await;
return handlers::delete_moderator_public(&session_id, user, room);
}
// Unrecognized endpoint
warn!("Ignoring RPC call with invalid or unused endpoint: {}.", path);
return Err(warp::reject::custom(Error::InvalidRpcCall));
return Err(Error::InvalidRpcCall.into());
}
// Utilities
fn get_pool_for_room(rpc_call: &RpcCall) -> Result<storage::DatabaseConnectionPool, Rejection> {
let room_id = get_room_id(&rpc_call).ok_or(Error::ValidationFailed)?;
return storage::pool_by_room_id(
&storage::RoomId::new(&room_id).ok_or(Error::ValidationFailed)?,
).map_err(|e| e.into());
}
fn get_room(rpc_call: &RpcCall) -> Result<Option<Room>, Error> {
if matches!(MODE, Mode::FileServer) {
// WTF giant FIXME:
// In file server mode we don't have a concept of rooms, but for convenience (i.e. so we
// can use the same database structure) we just always use the main room
panic!("FIXME");
}
assert!(matches!(MODE, Mode::OpenGroupServer));
fn get_auth_token(rpc_call: &RpcCall) -> Option<String> {
if rpc_call.headers.is_empty() {
return None;
return Ok(None);
}
return rpc_call.headers.get("Authorization").map(|s| s.to_string());
}
fn get_room_id(rpc_call: &RpcCall) -> Option<String> {
match MODE {
// In file server mode we don't have a concept of rooms, but for convenience (i.e. so
// we can use the same database structure) we just always use the main room
Mode::FileServer => return Some("main".to_string()),
Mode::OpenGroupServer => {
if rpc_call.headers.is_empty() {
return None;
}
return rpc_call.headers.get("Room").map(|s| s.to_string());
}
}
let room_token = match rpc_call.headers.get("Room") {
Some(s) => s,
None => return Ok(None),
};
return Ok(Some(storage::get_room_from_token(&*storage::get_conn()?, room_token)?));
}
fn reject_if_file_server_mode(path: &str) -> Result<(), Rejection> {
match MODE {
Mode::FileServer => {
warn!("Ignoring RPC call with invalid or unused endpoint: {}.", path);
return Err(warp::reject::custom(Error::InvalidRpcCall));
return Err(Error::InvalidRpcCall.into());
}
Mode::OpenGroupServer => return Ok(()),
}

319
src/schema.sql Normal file
View File

@ -0,0 +1,319 @@
PRAGMA journal_mode=WAL;
BEGIN;
CREATE TABLE rooms (
id INTEGER NOT NULL PRIMARY KEY, /* internal database id of the room */
token TEXT NOT NULL UNIQUE COLLATE NOCASE, /* case-insensitive room identifier used in URLs, etc. */
name TEXT NOT NULL, /* Publicly visible room name */
description TEXT, /* Publicly visible room description */
image INTEGER REFERENCES files(id) ON DELETE SET NULL,
created FLOAT NOT NULL DEFAULT ((julianday('now') - 2440587.5)*86400.0), /* unix epoch */
updates INTEGER NOT NULL DEFAULT 0, /* +1 for each new message, edit or deletion */
info_updates INTEGER NOT NULL DEFAULT 0, /* +1 for any room metadata update (name/desc/image/pinned/mods) */
read BOOLEAN NOT NULL DEFAULT TRUE, /* Whether users can read by default */
write BOOLEAN NOT NULL DEFAULT TRUE, /* Whether users can post by default */
upload BOOLEAN NOT NULL DEFAULT TRUE, /* Whether file uploads are allowed */
CHECK(token NOT GLOB '*[^a-zA-Z0-9_-]*')
);
CREATE INDEX rooms_token ON rooms(token);
-- Trigger to expire an old room image attachment when the room image is changed
CREATE TRIGGER room_image_expiry AFTER UPDATE ON rooms
FOR EACH ROW WHEN NEW.image IS NOT OLD.image AND OLD.image IS NOT NULL
BEGIN
UPDATE files SET expiry = 0.0 WHERE id = OLD.image;
END;
CREATE TABLE messages (
id INTEGER NOT NULL PRIMARY KEY,
room INTEGER NOT NULL REFERENCES rooms(id) ON DELETE CASCADE,
user INTEGER NOT NULL REFERENCES users(id),
posted FLOAT NOT NULL DEFAULT ((julianday('now') - 2440587.5)*86400.0), /* unix epoch */
edited FLOAT,
updated INTEGER NOT NULL DEFAULT 0, /* set to the room's `updates` counter when posted/edited/deleted */
data BLOB, /* Actual message content, not including trailing padding; set to null to delete a message */
data_size INTEGER, /* The message size, including trailing padding (needed because the signature is over the padded data) */
signature BLOB /* Signature of `data` by `public_key`; set to null when deleting a message */
);
CREATE INDEX messages_room ON messages(room, posted);
CREATE INDEX messages_updated ON messages(room, updated);
CREATE INDEX messages_id ON messages(room, id);
CREATE TABLE message_history (
message INTEGER NOT NULL REFERENCES messages(id) ON DELETE CASCADE,
replaced FLOAT NOT NULL DEFAULT ((julianday('now') - 2440587.5)*86400.0), /* unix epoch when this historic value was replaced by an edit or deletion */
data TEXT NOT NULL, /* the content prior to the update/delete */
signature BLOB NOT NULL /* signature prior to the update/delete */
);
CREATE INDEX message_history_message ON message_history(message);
CREATE INDEX message_history_replaced ON message_history(replaced);
-- Trigger to increment a room's `updates` counter and assign it to the messages `updated` field for
-- new messages.
CREATE TRIGGER messages_insert_counter AFTER INSERT ON messages
FOR EACH ROW
BEGIN
UPDATE rooms SET updates = updates + 1 WHERE id = NEW.room;
UPDATE messages SET updated = (SELECT updates FROM rooms WHERE id = NEW.room) WHERE id = NEW.id;
END;
-- Trigger to do various tasks needed when a message is edited/deleted:
-- * record the old value into message_history
-- * update the room's `updates` counter (so that clients can learn about the update)
-- * update the message's `updated` value to that new counter
-- * update the message's `edit` timestamp
CREATE TRIGGER messages_insert_history AFTER UPDATE OF data ON messages
FOR EACH ROW WHEN NEW.data IS NOT OLD.data
BEGIN
INSERT INTO message_history (message, data, signature) VALUES (NEW.id, OLD.data, OLD.signature);
UPDATE rooms SET updates = updates + 1 WHERE id = NEW.room;
UPDATE messages SET
updated = (SELECT updates FROM rooms WHERE id = NEW.room),
edited = (julianday('now') - 2440587.5)*86400.0
WHERE id = NEW.id;
END;
CREATE TABLE pinned_messages (
room INTEGER NOT NULL REFERENCES rooms(id) ON DELETE CASCADE,
message INTEGER NOT NULL REFERENCES rooms(id) ON DELETE CASCADE,
updated INTEGER NOT NULL DEFAULT 0, /* set to the room's `info_updated` counter when pinned (used for ordering). */
PRIMARY KEY(room, message)
);
-- Trigger to handle moving a message from one room to another; we reset the posted time to now, and
-- reset the updated value to the new room's value so that the moved message is treated as a brand new message in
-- the new room. We also clear the message as the pinned message from the moved-from room.
-- FIXME TODO: this isn't right because the old room won't have any record of it being moved, and so
-- clients won't know that they should remove it. Perhaps instead we should implement moving as a
-- delete + reinsert, via a INSTEAD OF trigger.
/*
CREATE TRIGGER message_mover AFTER UPDATE OF room ON messages
FOR EACH ROW WHEN NEW.room != OLD.room
BEGIN
UPDATE messages SET posted = ((julianday('now') - 2440587.5)*86400.0), updated = FALSE
WHERE messages.id = NEW.id;
UPDATE rooms SET pinned = NULL WHERE id = OLD.room AND pinned = OLD.id;
END;
*/
CREATE TABLE files (
id INTEGER NOT NULL PRIMARY KEY,
room INTEGER REFERENCES rooms(id) ON DELETE SET NULL,
uploader INTEGER REFERENCES users(id),
size INTEGER NOT NULL,
uploaded FLOAT NOT NULL DEFAULT ((julianday('now') - 2440587.5)*86400.0), /* unix epoch */
expiry FLOAT DEFAULT ((julianday('now') - 2440587.5 + 15.0)*86400.0), /* unix epoch */
filename TEXT, /* user-provided filename */
path TEXT NOT NULL /* path on disk */
);
CREATE INDEX files_room ON files(room);
CREATE INDEX files_expiry ON files(expiry);
-- When we delete a room all its files will have room set to NULL but we *also* need to make them
-- for immediate expiry so that the file pruner finds them to clean them up at the next cleanup
-- check.
CREATE TRIGGER room_expire_roomless AFTER UPDATE OF room ON files
FOR EACH ROW WHEN NEW.room IS NULL
BEGIN
UPDATE files SET expiry = 0.0 WHERE id = NEW.id;
END;
CREATE TABLE users (
id INTEGER NOT NULL PRIMARY KEY,
session_id TEXT NOT NULL UNIQUE,
created FLOAT NOT NULL DEFAULT ((julianday('now') - 2440587.5)*86400.0), /* unix epoch */
last_active FLOAT NOT NULL DEFAULT ((julianday('now') - 2440587.5)*86400.0), /* unix epoch */
banned BOOLEAN NOT NULL DEFAULT FALSE, /* true = globally banned from all rooms */
moderator BOOLEAN NOT NULL DEFAULT FALSE, /* true = moderator of all rooms, and can add global bans */
admin BOOLEAN NOT NULL DEFAULT FALSE, /* true = admin of all rooms, and can appoint global bans/mod/admins */
visible_mod BOOLEAN NOT NULL DEFAULT FALSE, /* if true this user's moderator status is viewable by regular room users of all rooms */
CHECK(NOT (banned AND (moderator OR admin))) /* someone cannot be banned *and* a moderator at the same time */
);
CREATE INDEX users_last_active ON users(last_active);
-- Create a trigger to maintain the implication "admin implies moderator"
CREATE TRIGGER user_update_admins_are_mods AFTER UPDATE OF moderator, admin ON users
FOR EACH ROW WHEN NEW.admin AND NOT NEW.moderator
BEGIN
UPDATE users SET moderator = TRUE WHERE id = NEW.id;
END;
CREATE TRIGGER user_insert_admins_are_mods AFTER INSERT ON users
FOR EACH ROW WHEN NEW.admin AND NOT NEW.moderator
BEGIN
UPDATE users SET moderator = TRUE WHERE id = NEW.id;
END;
-- Effectively the same as `messages` except that it also includes the `session_id` from the users
-- table of the user who posted it, which we often need when returning a message list to clients.
CREATE VIEW message_details AS
SELECT messages.*, users.session_id FROM messages JOIN users ON messages.user = users.id;
-- View of `messages` that is useful for manually inspecting table contents by only returning the
-- length (rather than raw bytes) for data/signature.
CREATE VIEW message_metadata AS
SELECT id, room, user, session_id, posted, edited, updated, length(data) AS data_unpadded, data_size, length(signature) as signature_length
FROM message_details;
CREATE TABLE room_users (
room INTEGER NOT NULL REFERENCES rooms(id) ON DELETE CASCADE,
user INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
last_active FLOAT NOT NULL DEFAULT ((julianday('now') - 2440587.5)*86400.0), /* unix epoch */
PRIMARY KEY(room, user)
) WITHOUT ROWID;
CREATE INDEX room_users_room_activity ON room_users(room, last_active);
CREATE INDEX room_users_activity ON room_users(last_active);
-- Stores permissions or restrictions on a user. Null values (for read/write) mean "user the room's
-- default".
CREATE TABLE user_permission_overrides (
room INTEGER NOT NULL REFERENCES rooms(id) ON DELETE CASCADE,
user INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
banned BOOLEAN NOT NULL DEFAULT FALSE, /* If true the user is banned */
read BOOLEAN, /* If false the user may not fetch messages; null uses room default; true allows reading */
write BOOLEAN, /* If false the user may not post; null uses room default; true allows posting */
upload BOOLEAN, /* If false the user may not upload files; null uses room default; true allows uploading */
moderator BOOLEAN NOT NULL DEFAULT FALSE, /* If true the user may moderate non-moderators */
admin BOOLEAN NOT NULL DEFAULT FALSE, /* If true the user may moderate anyone (including other moderators and admins) */
visible_mod BOOLEAN NOT NULL DEFAULT TRUE, /* If true then this user (if a moderator) is included in the list of a room's public moderators */
PRIMARY KEY(room, user),
CHECK(NOT (banned AND (moderator OR admin))) /* Mods/admins cannot be banned */
) WITHOUT ROWID;
CREATE INDEX user_permission_overrides_public_mods ON
user_permission_overrides(room) WHERE moderator OR admin;
-- Create a trigger to maintain the implication "admin implies moderator"
CREATE TRIGGER user_perms_update_admins_are_mods AFTER UPDATE OF moderator, admin ON user_permission_overrides
FOR EACH ROW WHEN NEW.admin AND NOT NEW.moderator
BEGIN
UPDATE user_permission_overrides SET moderator = TRUE WHERE room = NEW.room AND user = NEW.user;
END;
CREATE TRIGGER user_perms_insert_admins_are_mods AFTER INSERT ON user_permission_overrides
FOR EACH ROW WHEN NEW.admin AND NOT NEW.moderator
BEGIN
UPDATE user_permission_overrides SET moderator = TRUE WHERE room = NEW.room AND user = NEW.user;
END;
-- Trigger that removes useless empty permission override rows (e.g. after a ban gets removed, and
-- no other permissions roles are set).
CREATE TRIGGER user_perms_empty_cleanup AFTER UPDATE ON user_permission_overrides
FOR EACH ROW WHEN NOT (NEW.banned OR NEW.moderator OR NEW.admin) AND COALESCE(NEW.read, NEW.write, NEW.upload) IS NULL
BEGIN
DELETE from user_permission_overrides WHERE room = NEW.room AND user = NEW.user;
END;
-- Triggers than remove a user from `room_users` when they are banned from the room
CREATE TRIGGER room_users_remove_banned AFTER UPDATE OF banned ON user_permission_overrides
FOR EACH ROW WHEN NEW.banned
BEGIN
DELETE FROM room_users WHERE room = NEW.room AND user = NEW.user;
END;
-- Triggers to update `rooms.info_updates` on metadata column changes
CREATE TRIGGER room_metadata_update AFTER UPDATE ON rooms
FOR EACH ROW WHEN
NEW.name IS NOT OLD.name OR
NEW.description IS NOT OLD.description OR
NEW.image IS NOT OLD.image
BEGIN
UPDATE rooms SET updates = updates + 1, info_updates = info_updates + 1 WHERE id = NEW.id;
END;
-- Triggers to update `info_updates` when the mod list changes:
CREATE TRIGGER room_metadata_mods_insert AFTER INSERT ON user_permission_overrides
FOR EACH ROW WHEN NEW.moderator OR NEW.admin
BEGIN
UPDATE rooms SET info_updates = info_updates + 1 WHERE id = NEW.room;
END;
CREATE TRIGGER room_metadata_mods_update AFTER UPDATE ON user_permission_overrides
FOR EACH ROW WHEN NEW.moderator != OLD.moderator OR NEW.admin != OLD.admin
BEGIN
UPDATE rooms SET info_updates = info_updates + 1 WHERE id = NEW.room;
END;
CREATE TRIGGER room_metadata_mods_delete AFTER DELETE ON user_permission_overrides
FOR EACH ROW WHEN OLD.moderator OR OLD.admin
BEGIN
UPDATE rooms SET info_updates = info_updates + 1 WHERE id = OLD.room;
END;
-- Trigger to update `info_updates` of all rooms whenever we add/remove a global moderator/admin
-- because global mod settings affect the permissions of all rooms (and polling clients need to pick
-- up on this).
CREATE TRIGGER room_metadata_global_mods_insert AFTER INSERT ON users
FOR EACH ROW WHEN (NEW.admin OR NEW.moderator) AND NEW.visible_mod
BEGIN
UPDATE rooms SET info_updates = info_updates + 1; -- WHERE everything!
END;
CREATE TRIGGER room_metadata_global_mods_update AFTER UPDATE ON users
FOR EACH ROW WHEN (NEW.moderator != OLD.moderator OR NEW.admin != OLD.admin) AND (NEW.visible_mod OR OLD.visible_mod)
BEGIN
UPDATE rooms SET info_updates = info_updates + 1; -- WHERE everything!
END;
CREATE TRIGGER room_metadata_global_mods_delete AFTER DELETE ON users
FOR EACH ROW WHEN (OLD.moderator OR OLD.admin) AND OLD.visible_mod
BEGIN
UPDATE rooms SET info_updates = info_updates + 1; -- WHERE everything!
END;
-- Triggers for change to pinned messages
CREATE TRIGGER room_metadata_pinned_add AFTER INSERT ON pinned_messages
FOR EACH ROW
BEGIN
UPDATE rooms SET info_updates = info_updates + 1 WHERE id = NEW.room;
UPDATE pinned_messages SET updated = (SELECT info_updates FROM rooms WHERE id = NEW.room) WHERE id = NEW.id;
END;
CREATE TRIGGER room_metadata_pinned_remove AFTER DELETE ON pinned_messages
FOR EACH ROW
BEGIN
UPDATE rooms SET info_updates = info_updates + 1 WHERE id = OLD.room;
END;
-- View of permissions; for users with an entry in user_permissions we use those values; for null
-- values or no user_permissions entry we return the room's default read/write values (and false for
-- the other fields).
CREATE VIEW user_permissions AS
SELECT
rooms.id AS room,
users.id AS user,
users.session_id,
CASE WHEN users.banned THEN TRUE ELSE COALESCE(user_permission_overrides.banned, FALSE) END AS banned,
COALESCE(user_permission_overrides.read, rooms.read) AS read,
COALESCE(user_permission_overrides.write, rooms.write) AS write,
COALESCE(user_permission_overrides.upload, rooms.upload) AS upload,
CASE WHEN users.moderator THEN TRUE ELSE COALESCE(user_permission_overrides.moderator, FALSE) END AS moderator,
CASE WHEN users.admin THEN TRUE ELSE COALESCE(user_permission_overrides.admin, FALSE) END AS admin,
-- room_moderator will be TRUE if the user is specifically listed as a moderator of the room
COALESCE(user_permission_overrides.moderator OR user_permission_overrides.admin, FALSE) AS room_moderator,
-- global_moderator will be TRUE if the user is a global moderator/admin (note that this is
-- *not* exclusive of room_moderator: a moderator/admin could be listed in both).
COALESCE(users.moderator OR users.admin, FALSE) as global_moderator,
-- visible_mod will be TRUE if this mod is a publicly viewable moderator of the room
CASE
WHEN user_permission_overrides.moderator OR user_permission_overrides.admin THEN user_permission_overrides.visible_mod
WHEN users.moderator OR users.admin THEN users.visible_mod
ELSE FALSE
END AS visible_mod
FROM
users JOIN rooms LEFT OUTER JOIN user_permission_overrides ON
users.id = user_permission_overrides.user AND rooms.id = user_permission_overrides.room;
-- Scheduled changes to user permissions. For example, to implement a 2-day timeout you would set
-- their user_permissions.write to false, then set a `write = true` entry with a +2d timestamp here.
-- Or to implement a join delay you could set room defaults to false then insert a value here to be
-- applied after a delay.
CREATE TABLE user_permission_futures (
room INTEGER NOT NULL REFERENCES rooms(id) ON DELETE CASCADE,
user INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
at FLOAT NOT NULL, /* when the change should take effect (unix epoch) */
read BOOLEAN, /* Set this value @ at, if non-null */
write BOOLEAN, /* Set this value @ at, if non-null */
upload BOOLEAN, /* Set this value @ at, if non-null */
PRIMARY KEY(room, user)
) WITHOUT ROWID;
CREATE INDEX user_permissions_future_at ON user_permission_futures(at);
COMMIT;

View File

@ -1,17 +1,21 @@
use regex::Regex;
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::sync::Mutex;
use std::time::{Duration, SystemTime};
use log::{error, warn, info};
use log::{error, info, warn};
use r2d2::PooledConnection;
use r2d2_sqlite::SqliteConnectionManager;
use rusqlite::params;
use rusqlite_migration::{Migrations, M};
use regex::Regex;
use rusqlite::{config::DbConfig, params};
use super::errors::Error;
use super::migration;
use super::models::Room;
pub type DatabaseConnection = r2d2::PooledConnection<SqliteConnectionManager>;
pub type DatabaseConnectionPool = r2d2::Pool<SqliteConnectionManager>;
pub type DatabaseTransaction<'a> = rusqlite::Transaction<'a>;
#[derive(PartialEq, Eq, Hash)]
pub struct RoomId {
@ -24,12 +28,17 @@ lazy_static::lazy_static! {
}
impl RoomId {
pub fn new(room_id: &str) -> Option<RoomId> {
if REGULAR_CHARACTERS_ONLY.is_match(room_id) {
return Some(RoomId { id: room_id.to_string() });
pub fn validate(room_id: &str) -> Result<(), Error> {
return if REGULAR_CHARACTERS_ONLY.is_match(room_id) {
Ok(())
} else {
return None;
}
Err(Error::ValidationFailed)
};
}
pub fn new(room_id: &str) -> Result<RoomId, Error> {
RoomId::validate(room_id)?;
Ok(RoomId { id: room_id.to_string() })
}
pub fn get_id(&self) -> &str {
@ -37,375 +46,329 @@ impl RoomId {
}
}
// Main
// How long without activity before we drop user-room activity info.
pub const ROOM_ACTIVE_PRUNE_THRESHOLD: Duration = Duration::from_secs(60 * 86400);
// How long we keep message edit/deletion history.
pub const MESSAGE_HISTORY_PRUNE_THRESHOLD: Duration = Duration::from_secs(30 * 86400);
// Migration support: when migrating to 0.2.x old room ids cannot be preserved, so we map the old
// id range [1, max] to the new range [offset+1, offset+max].
pub struct RoomMigrationMap {
pub max: i64,
pub offset: i64,
}
lazy_static::lazy_static! {
pub static ref MAIN_POOL: DatabaseConnectionPool = {
let file_name = "database.db";
let db_manager = r2d2_sqlite::SqliteConnectionManager::file(file_name);
static ref DB_POOL: DatabaseConnectionPool = {
let file_name = "sogs.db";
let db_manager = r2d2_sqlite::SqliteConnectionManager::file(file_name)
.with_init(|c| {
c.set_prepared_statement_cache_capacity(100);
c.execute_batch("
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
")?;
if !c.set_db_config(DbConfig::SQLITE_DBCONFIG_ENABLE_FKEY, true)? {
panic!("Unable to enable foreign key support; perhaps sqlite3 is compiled without it‽");
}
if !c.set_db_config(DbConfig::SQLITE_DBCONFIG_ENABLE_TRIGGER, true)? {
panic!("Unable to enable trigger support; perhaps sqlite3 is built without it‽");
}
Ok(())
});
return r2d2::Pool::new(db_manager).unwrap();
};
}
pub fn create_main_database_if_needed() {
let pool = &MAIN_POOL;
let conn = pool.get().unwrap();
create_main_tables_if_needed(&conn);
}
fn create_main_tables_if_needed(conn: &DatabaseConnection) {
let main_table_cmd = "CREATE TABLE IF NOT EXISTS main (
id TEXT PRIMARY KEY,
name TEXT,
image_id TEXT
)";
conn.execute(&main_table_cmd, params![]).expect("Couldn't create main table.");
}
// Rooms
pub const PENDING_TOKEN_EXPIRATION: i64 = 10 * 60;
pub const TOKEN_EXPIRATION: i64 = 7 * 24 * 60 * 60;
pub const FILE_EXPIRATION: i64 = 15 * 24 * 60 * 60;
lazy_static::lazy_static! {
static ref POOLS: Mutex<HashMap<String, DatabaseConnectionPool>> = Mutex::new(HashMap::new());
}
pub fn pool_by_room_id(room_id: &RoomId) -> Result<DatabaseConnectionPool, Error> {
let mut pools = POOLS.lock().unwrap();
if let Some(pool) = pools.get(room_id.get_id()) {
return Ok(pool.clone());
} else {
let pool = &MAIN_POOL;
if let Ok(conn) = pool.get() {
if let Ok(count) = conn.query_row("SELECT COUNT(*) FROM main WHERE id = ?", params![room_id.get_id()],
|row| row.get::<_, i64>(0)) {
if count == 0 {
warn!("Cannot access room database: room {} does not exist", room_id.get_id());
return Err(Error::NoSuchRoom);
// True if we have a room import hacks table that we might need to use for room polling.
pub static ref ROOM_IMPORT_HACKS: Option<HashMap<i64, RoomMigrationMap>> = {
if let Ok(conn) = DB_POOL.get() {
if let Ok(hacks) = get_room_hacks(&conn) {
if !hacks.is_empty() {
return Some(hacks);
}
let raw_path = format!("rooms/{}.db", room_id.get_id());
let path = Path::new(&raw_path);
let db_manager = r2d2_sqlite::SqliteConnectionManager::file(path);
let pool = match r2d2::Pool::new(db_manager) {
Ok(p) => p,
Err(e) => {
error!("Unable to access {} database: {}", room_id.get_id(), e);
return Err(Error::DatabaseFailedInternally);
}
};
pools.insert(room_id.get_id().to_string(), pool);
return Ok(pools[room_id.get_id()].clone());
}
}
error!("Failed to query main database for room {} existence", room_id.get_id());
return Err(Error::DatabaseFailedInternally);
}
}
pub fn create_database_if_needed(room_id: &RoomId) {
let pool = pool_by_room_id(room_id);
let conn = pool.unwrap().get().unwrap();
create_room_tables_if_needed(&conn);
}
pub fn create_room_tables_if_needed(conn: &DatabaseConnection) {
// Messages
// The `id` field is needed to make `rowid` stable, which is important because otherwise
// the `id`s in this table won't correspond to those in the deleted messages table
let messages_table_cmd = "CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY,
public_key TEXT,
timestamp INTEGER,
data TEXT,
signature TEXT,
is_deleted INTEGER
)";
conn.execute(&messages_table_cmd, params![]).expect("Couldn't create messages table.");
// Deleted messages
let deleted_messages_table_cmd = "CREATE TABLE IF NOT EXISTS deleted_messages (
id INTEGER PRIMARY KEY,
deleted_message_id INTEGER
)";
conn.execute(&deleted_messages_table_cmd, params![])
.expect("Couldn't create deleted messages table.");
// Moderators
let moderators_table_cmd = "CREATE TABLE IF NOT EXISTS moderators (
public_key TEXT
)";
conn.execute(&moderators_table_cmd, params![]).expect("Couldn't create moderators table.");
// Block list
let block_list_table_cmd = "CREATE TABLE IF NOT EXISTS block_list (
public_key TEXT
)";
conn.execute(&block_list_table_cmd, params![]).expect("Couldn't create block list table.");
// Pending tokens
// Note that a given public key can have multiple pending tokens
let pending_tokens_table_cmd = "CREATE TABLE IF NOT EXISTS pending_tokens (
public_key TEXT,
timestamp INTEGER,
token BLOB
)";
conn.execute(&pending_tokens_table_cmd, params![])
.expect("Couldn't create pending tokens table.");
// Tokens
// The token is stored as hex here (rather than as bytes) because it's more convenient for lookup
let tokens_table_cmd = "CREATE TABLE IF NOT EXISTS tokens (
public_key TEXT,
timestamp INTEGER,
token TEXT PRIMARY KEY
)";
conn.execute(&tokens_table_cmd, params![]).expect("Couldn't create tokens table.");
// Files
let files_table_cmd = "CREATE TABLE IF NOT EXISTS files (
id TEXT PRIMARY KEY,
timestamp INTEGER
)";
conn.execute(&files_table_cmd, params![]).expect("Couldn't create files table.");
// User activity table
let user_activity_table_cmd = "CREATE TABLE IF NOT EXISTS user_activity (
public_key TEXT PRIMARY KEY,
last_active INTEGER NOT NULL
)";
conn.execute(&user_activity_table_cmd, params![])
.expect("Couldn't create user activity table.");
}
// Pruning
pub async fn prune_tokens_periodically() {
let mut timer = tokio::time::interval(chrono::Duration::minutes(10).to_std().unwrap());
loop {
timer.tick().await;
tokio::spawn(async {
prune_tokens().await;
});
}
}
pub async fn prune_pending_tokens_periodically() {
let mut timer = tokio::time::interval(chrono::Duration::minutes(10).to_std().unwrap());
loop {
timer.tick().await;
tokio::spawn(async {
prune_pending_tokens().await;
});
}
}
pub async fn prune_files_periodically() {
let mut timer = tokio::time::interval(chrono::Duration::days(1).to_std().unwrap());
loop {
timer.tick().await;
tokio::spawn(async {
prune_files(FILE_EXPIRATION).await;
});
}
}
async fn prune_tokens() {
let rooms = match get_all_room_ids() {
Ok(rooms) => rooms,
Err(_) => return,
None
};
for room in rooms {
let pool = match pool_by_room_id(&room) {
Ok(p) => p,
Err(_) => return
};
// It's not catastrophic if we fail to prune the database for a given room
let conn = match pool.get() {
Ok(conn) => conn,
Err(e) => return error!("Couldn't prune tokens due to error: {}.", e),
};
let stmt = "DELETE FROM tokens WHERE timestamp < (?1)";
let now = chrono::Utc::now().timestamp();
let expiration = now - TOKEN_EXPIRATION;
match conn.execute(&stmt, params![expiration]) {
Ok(_) => (),
Err(e) => return error!("Couldn't prune tokens due to error: {}.", e),
};
}
info!("Pruned tokens.");
}
async fn prune_pending_tokens() {
let rooms = match get_all_room_ids() {
Ok(rooms) => rooms,
Err(_) => return,
};
for room in rooms {
let pool = match pool_by_room_id(&room) {
Ok(p) => p,
Err(_) => return
};
// It's not catastrophic if we fail to prune the database for a given room
let conn = match pool.get() {
Ok(conn) => conn,
Err(e) => return error!("Couldn't prune pending tokens due to error: {}.", e),
};
let stmt = "DELETE FROM pending_tokens WHERE timestamp < (?1)";
let now = chrono::Utc::now().timestamp();
let expiration = now - PENDING_TOKEN_EXPIRATION;
match conn.execute(&stmt, params![expiration]) {
Ok(_) => (),
Err(e) => return error!("Couldn't prune pending tokens due to error: {}.", e),
};
}
info!("Pruned pending tokens.");
}
fn get_expired_file_ids(
pool: &DatabaseConnectionPool, file_expiration: i64,
) -> Result<Vec<String>, ()> {
let now = chrono::Utc::now().timestamp();
let expiration = now - file_expiration;
// Get a database connection and open a transaction
let conn = pool.get().map_err(|e| {
error!("Couldn't get database connection to prune files due to error: {}.", e);
})?;
// Get the IDs of the files to delete
let raw_query = "SELECT id FROM files WHERE timestamp < (?1)";
let mut query = conn.prepare(&raw_query).map_err(|e| {
error!("Couldn't prepare query to prune files due to error: {}.", e);
})?;
let rows = query.query_map(params![expiration], |row| row.get(0)).map_err(|e| {
error!("Couldn't prune files due to error: {} (expiration = {}).", e, expiration);
})?;
Ok(rows.filter_map(|result| result.ok()).collect())
}
pub async fn prune_files_for_room(
pool: &DatabaseConnectionPool, room: &RoomId, file_expiration: i64,
) {
let ids = get_expired_file_ids(&pool, file_expiration);
match ids {
Ok(ids) if !ids.is_empty() => {
// Delete the files
let futs = ids.iter().map(|id| async move {
(
tokio::fs::remove_file(format!("files/{}_files/{}", room.get_id(), id)).await,
id.to_owned(),
)
});
let results = futures::future::join_all(futs).await;
for (res, id) in results {
if let Err(err) = res {
error!(
"Couldn't delete file: {} from room: {} due to error: {}.",
id,
room.get_id(),
err
);
pub static ref HAVE_FILE_ID_HACKS: bool = {
if let Ok(conn) = DB_POOL.get() {
if match conn.query_row(
"SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = 'file_id_hacks')",
[], |row| row.get::<_, bool>(0)) {
Ok(exists) => exists,
Err(_) => false
}
{
// If the table exists, but is empty, then drop it (it will be empty if all the
// files we had temporary id mappings for are now expired and deleted).
if let Ok(count) = conn.query_row("SELECT COUNT(*) FROM file_id_hacks", [], |row| row.get::<_, i64>(0)) {
if count == 0 {
let _ = conn.execute("DROP TABLE file_id_hacks", []);
return false;
}
}
return true;
}
}
return false;
};
}
let conn = match pool.get() {
Ok(conn) => conn,
pub fn get_conn() -> Result<DatabaseConnection, Error> {
match DB_POOL.get() {
Ok(conn) => Ok(conn),
Err(e) => {
error!("Unable to get database connection: {}", e);
return Err(Error::DatabaseFailedInternally);
}
}
}
fn get_room_hacks(
conn: &rusqlite::Connection,
) -> Result<HashMap<i64, RoomMigrationMap>, rusqlite::Error> {
let mut hacks = HashMap::new();
let mut st =
conn.prepare("SELECT room, old_message_id_max, message_id_offset FROM room_import_hacks")?;
let mut query = st.query([])?;
while let Some(row) = query.next()? {
hacks.insert(row.get(0)?, RoomMigrationMap { max: row.get(1)?, offset: row.get(2)? });
}
Ok(hacks)
}
pub fn get_transaction<'a>(
conn: &'a mut DatabaseConnection,
) -> Result<DatabaseTransaction<'a>, Error> {
conn.transaction().map_err(db_error)
}
pub fn db_error(e: rusqlite::Error) -> Error {
error!("Database query failed: {}", e);
return Error::DatabaseFailedInternally;
}
/// Initialize the database, creating and migrating its structure if necessary.
pub fn setup_database() {
let mut conn = get_conn().unwrap();
setup_database_with_conn(&mut conn);
}
pub fn setup_database_with_conn(conn: &mut PooledConnection<SqliteConnectionManager>) {
if rusqlite::version_number() < 3035000 {
panic!("SQLite 3.35.0+ is required!");
}
let have_messages = match conn.query_row(
"SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = 'messages')",
params![],
|row| row.get::<_, bool>(0),
) {
Ok(exists) => exists,
Err(e) => {
panic!("Error querying database: {}", e);
}
};
if !have_messages {
warn!("No database detected; creating new database schema");
conn.execute_batch(include_str!("schema.sql")).expect("Couldn't create database schema.");
}
let n_rooms =
match conn.query_row("SELECT COUNT(*) FROM rooms", params![], |row| row.get::<_, i64>(0)) {
Ok(r) => r,
Err(e) => {
panic!("Error querying database for # of rooms: {}", e);
}
};
if n_rooms == 0 && Path::new("database.db").exists() {
// If we have no rooms then check to see if there is an old (pre-v0.2) set of databases to
// import from.
warn!("No rooms found, but database.db exists; attempting migration");
if let Err(e) = migration::migrate_0_2_0(conn) {
panic!("\n\ndatabase.db exists but migration failed:\n\n {}.\n\n\
Please report this bug!\n\n\
If no migration from 0.1.x is needed then rename or delete database.db to start up with a fresh (new) database.\n\n", e);
}
}
// Future migrations here
}
// Performs periodic DB maintenance: file pruning, delayed permission applying,
// etc.
pub async fn db_maintenance_job() {
let mut timer = tokio::time::interval(chrono::Duration::seconds(10).to_std().unwrap());
loop {
timer.tick().await;
tokio::spawn(async {
let now = SystemTime::now();
if let Ok(mut conn) = get_conn() {
prune_files(&mut conn, &now);
prune_message_history(&mut conn, &now);
prune_room_activity(&mut conn, &now);
apply_permission_updates(&mut conn, &now);
} else {
warn!("Couldn't get a free db connection to perform database maintenance; will retry soon");
}
});
}
}
/// Removes all files with expiries <= the given time (which should generally by
/// `SystemTime::now()`, except in the test suite).
pub fn prune_files(conn: &mut DatabaseConnection, now: &SystemTime) {
let mut st = match conn.prepare_cached("DELETE FROM files WHERE expiry <= ? RETURNING path") {
Ok(st) => st,
Err(e) => {
error!("Unable to prepare statement: {}", e);
return;
}
};
let now_secs = now.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs_f64();
let mut rows = match st.query(params![now_secs]) {
Ok(rows) => rows,
Err(e) => {
error!("Unable to delete expired file rows: {}", e);
return;
}
};
let mut count = 0;
while let Ok(Some(row)) = rows.next() {
if let Ok(path) = row.get_ref_unwrap(0).as_str() {
if let Err(e) = fs::remove_file(path) {
error!("Couldn't delete expired file '{}': {}", path, e);
} else {
count += 1;
}
}
}
if count > 0 {
info!("Pruned {} expired/deleted files", count);
}
}
/// Prune old message edit/deletion history
fn prune_message_history(conn: &mut DatabaseConnection, now: &SystemTime) {
let mut st = match conn.prepare_cached("DELETE FROM message_history WHERE replaced <= ?") {
Ok(st) => st,
Err(e) => {
error!("Unable to prepare message history prune statement: {}", e);
return;
}
};
let now_secs = (*now - MESSAGE_HISTORY_PRUNE_THRESHOLD)
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs_f64();
let count = match st.execute(params![now_secs]) {
Ok(count) => count,
Err(e) => {
error!("Unable to prune message history: {}", e);
return;
}
};
if count > 0 {
info!("Pruned {} message edits/deletions", count);
}
}
fn prune_room_activity(conn: &mut DatabaseConnection, now: &SystemTime) {
let mut st = match conn.prepare_cached("DELETE FROM room_users WHERE last_active <= ?") {
Ok(st) => st,
Err(e) => {
error!("Unable to prepare room activity prune statement: {}", e);
return;
}
};
let now_secs = (*now - ROOM_ACTIVE_PRUNE_THRESHOLD)
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs_f64();
let count = match st.execute(params![now_secs]) {
Ok(count) => count,
Err(e) => {
error!("Unable to prune room activity: {}", e);
return;
}
};
if count > 0 {
info!("Pruned {} old room activity records", count);
}
}
fn apply_permission_updates(conn: &mut DatabaseConnection, now: &SystemTime) {
let now_secs = now.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs_f64();
let tx = match conn.transaction() {
Ok(tx) => tx,
Err(e) => {
error!("Unable to begin transaction: {}", e);
return;
}
};
{
let mut ins_st = match tx.prepare_cached(
"
INSERT INTO user_permission_overrides (room, user, read, write, upload)
SELECT room, user, read, write, upload FROM user_permission_futures WHERE at <= ?
ON CONFLICT DO UPDATE SET
read = COALESCE(excluded.read, read),
write = COALESCE(excluded.write, write),
upload = COALESCE(excluded.upload, upload)",
) {
Ok(st) => st,
Err(e) => {
error!("Unable to prepare statement: {}", e);
return;
}
};
let mut del_st =
match tx.prepare_cached("DELETE FROM user_permission_futures WHERE at <= ?") {
Ok(st) => st,
Err(e) => {
return error!(
"Couldn't get database connection to prune files due to error: {}.",
e
)
error!("Unable to prepare statement: {}", e);
return;
}
};
// Measure the time it takes to delete all files sequentially
// (this might become a problem since we're not using an async interface)
let now = std::time::Instant::now();
// Remove the file records from the database
// FIXME: It'd be great to do this in a single statement, but apparently this is not supported very well
for id in ids {
let stmt = "DELETE FROM files WHERE id = (?1)";
match conn.execute(&stmt, params![id]) {
Ok(_) => (),
Err(e) => {
return error!("Couldn't prune file with ID: {} due to error: {}.", id, e)
}
};
let num_applied = match ins_st.execute(params![now_secs]) {
Ok(num) => num,
Err(e) => {
error!("Unable to apply scheduled future permissions: {}", e);
return;
}
};
if num_applied > 0 {
info!("Applied {} user permission updates", num_applied);
if let Err(e) = del_st.execute(params![now_secs]) {
error!("Unable to delete applied future permissions: {}", e);
return;
}
// Log the result
info!("Pruned files for room: {}. Took: {:?}", room.get_id(), now.elapsed());
}
Ok(_) => {
// empty
}
Err(_) => {
// It's not catastrophic if we fail to prune the database for a given room
}
}
}
pub async fn prune_files(file_expiration: i64) {
// The expiration setting is passed in for testing purposes
let rooms = match get_all_room_ids() {
Ok(rooms) => rooms,
Err(_) => return,
};
let futs = rooms.into_iter().map(|room| async move {
if let Ok(pool) = pool_by_room_id(&room) {
prune_files_for_room(&pool, &room, file_expiration).await;
}
});
futures::future::join_all(futs).await;
}
// Migration
pub fn perform_migration() {
let rooms = match get_all_room_ids() {
Ok(ids) => ids,
Err(_e) => {
return error!("Couldn't get all room IDs.");
}
};
let create_tokens_table_cmd = "CREATE TABLE IF NOT EXISTS tokens (
public_key TEXT,
timestamp INTEGER,
token TEXT PRIMARY KEY
)";
let migrations =
Migrations::new(vec![M::up("DROP TABLE tokens"), M::up(&create_tokens_table_cmd)]);
for room in rooms {
create_database_if_needed(&room);
let pool = pool_by_room_id(&room);
let mut conn = pool.unwrap().get().unwrap();
migrations.to_latest(&mut conn).unwrap();
if let Err(e) = tx.commit() {
error!("Failed to commit scheduled user permission updates: {}", e);
return;
}
}
// Utilities
fn get_all_room_ids() -> Result<Vec<RoomId>, Error> {
// Get a database connection
let conn = MAIN_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?;
// Query the database
let raw_query = "SELECT id FROM main";
let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?;
let rows = match query.query_map(params![], |row| row.get(0)) {
Ok(rows) => rows,
Err(e) => {
error!("Couldn't query database due to error: {}.", e);
return Err(Error::DatabaseFailedInternally);
}
};
let room_ids: Vec<_> = rows
.filter_map(|result: Result<String, _>| result.ok())
.map(|opt| RoomId::new(&opt))
.flatten()
.collect();
// Return
return Ok(room_ids);
pub fn get_room_from_token(conn: &rusqlite::Connection, token: &str) -> Result<Room, Error> {
match conn
.prepare_cached("SELECT * FROM rooms WHERE token = ?")
.map_err(db_error)?
.query_row(params![&token], Room::from_row)
{
Ok(room) => return Ok(room),
Err(rusqlite::Error::QueryReturnedNoRows) => return Err(Error::NoSuchRoom.into()),
Err(_) => return Err(Error::DatabaseFailedInternally.into()),
}
}

View File

@ -1,18 +1,25 @@
use std::collections::HashMap;
//use std::collections::HashMap;
use r2d2::PooledConnection;
use r2d2_sqlite::SqliteConnectionManager;
use std::fs;
use std::time::{Duration, SystemTime};
use rand::{thread_rng, Rng};
//use rand::{thread_rng, Rng};
use rusqlite::params;
use rusqlite::OpenFlags;
use warp::http::StatusCode;
use warp::{hyper, Reply};
use crate::storage::DatabaseConnectionPool;
use super::crypto;
use super::handlers;
use super::handlers::CreateRoom;
use super::models::User;
use super::storage;
use crate::handlers::GenericStringResponse;
async fn set_up_test_room() -> DatabaseConnectionPool {
async fn set_up_test_room() -> (PooledConnection<SqliteConnectionManager>, DatabaseConnectionPool) {
let manager = r2d2_sqlite::SqliteConnectionManager::file("file::memory:?cache=shared");
let mut flags = OpenFlags::default();
flags.set(OpenFlags::SQLITE_OPEN_URI, true);
@ -21,94 +28,71 @@ async fn set_up_test_room() -> DatabaseConnectionPool {
let pool = r2d2::Pool::<r2d2_sqlite::SqliteConnectionManager>::new(manager).unwrap();
let conn = pool.get().unwrap();
let mut conn = pool.get().unwrap();
storage::create_room_tables_if_needed(&conn);
storage::setup_database_with_conn(&mut conn);
let success = handlers::create_room_with_conn(
&conn,
&CreateRoom { token: "test_room".to_string(), name: "Test".to_string() },
);
assert!(success.is_ok());
pool
return (conn, pool);
}
fn get_auth_token(pool: &DatabaseConnectionPool) -> (String, String) {
fn get_user(conn: &rusqlite::Connection) -> User {
// Generate a fake user key pair
let (user_private_key, user_public_key) = crypto::generate_x25519_key_pair();
let (_, user_public_key) = crypto::generate_x25519_key_pair();
let hex_user_public_key = format!("05{}", hex::encode(user_public_key.to_bytes()));
// Get a challenge
let mut query_params: HashMap<String, String> = HashMap::new();
query_params.insert("public_key".to_string(), hex_user_public_key.clone());
let challenge = handlers::get_auth_token_challenge(query_params, &pool).unwrap();
// Generate a symmetric key
let ephemeral_public_key = base64::decode(challenge.ephemeral_public_key).unwrap();
let symmetric_key =
crypto::get_x25519_symmetric_key(&ephemeral_public_key, &user_private_key).unwrap();
// Decrypt the challenge
let ciphertext = base64::decode(challenge.ciphertext).unwrap();
let plaintext = crypto::decrypt_aes_gcm(&ciphertext, &symmetric_key).unwrap();
let auth_token = hex::encode(plaintext);
// Try to claim the token
let response = handlers::claim_auth_token(&hex_user_public_key, &auth_token, &pool).unwrap();
assert_eq!(response.status(), StatusCode::OK);
// return
return (auth_token, hex_user_public_key);
}
#[tokio::test]
async fn test_authorization() {
// Ensure the test room is set up and get a database connection pool
let pool = set_up_test_room().await;
// Get an auth token
// This tests claiming a token internally
let (_, hex_user_public_key) = get_auth_token(&pool);
// Try to claim an incorrect token
let mut incorrect_token = [0u8; 48];
thread_rng().fill(&mut incorrect_token[..]);
let hex_incorrect_token = hex::encode(incorrect_token);
match handlers::claim_auth_token(&hex_user_public_key, &hex_incorrect_token, &pool) {
Ok(_) => assert!(false),
Err(_) => (),
}
let result = handlers::insert_or_update_user(conn, &hex_user_public_key);
assert!(result.is_ok());
return result.unwrap();
}
#[tokio::test]
async fn test_file_handling() {
// Ensure the test room is set up and get a database connection pool
let pool = set_up_test_room().await;
let (mut conn, pool) = set_up_test_room().await;
let test_room_id = storage::RoomId::new("test_room").unwrap();
let room = storage::get_room_from_token(&conn, "test_room").unwrap();
// Get an auth token
let (auth_token, _) = get_auth_token(&pool);
let user = get_user(&conn);
// Store the test file
handlers::store_file(
Some(test_room_id.get_id().to_string()),
TEST_FILE,
Some(auth_token.clone()),
&pool,
)
.await
.unwrap();
// Check that there's a file record
let conn = pool.get().unwrap();
let raw_query = "SELECT id FROM files";
let id_as_string: String =
conn.query_row(&raw_query, params![], |row| Ok(row.get(0)?)).unwrap();
let id = id_as_string.parse::<u64>().unwrap();
let filename: Option<&str> = None;
let auth = handlers::AuthorizationRequired { upload: true, write: true, ..Default::default() };
let id =
match handlers::store_file_impl(&mut conn, &room, &user, auth, TEST_FILE, filename, true)
.ok()
{
Some(mut upload) => {
let result = upload.commit();
assert!(result.is_ok());
Some(upload.id)
}
_ => None,
}
.unwrap();
// Retrieve the file and check the content
let base64_encoded_file = handlers::get_file(
Some(test_room_id.get_id().to_string()),
id,
Some(auth_token.clone()),
&pool,
)
.await
.unwrap()
.result;
assert_eq!(base64_encoded_file, TEST_FILE);
let room_id = room.id;
let response = handlers::get_file_conn(&mut conn, &room, id, user).unwrap();
let response_bytes = hyper::body::to_bytes(response).await.ok();
// The expected json response
let json = GenericStringResponse {
status_code: StatusCode::OK.as_u16(),
result: TEST_FILE.to_string(),
};
let expected_result = warp::reply::json(&json).into_response();
let expected_result_bytes = hyper::body::to_bytes(expected_result).await.ok();
assert_eq!(response_bytes, expected_result_bytes);
// Prune the file and check that it's gone
// Will evaluate to now + 60
storage::prune_files_for_room(&pool, &test_room_id, -60).await;
let sixty_seconds_ago = SystemTime::now() - Duration::new(60, 0);
storage::prune_files(&mut conn, &sixty_seconds_ago);
// It should be gone now
fs::read(format!("files/{}_files/{}", test_room_id.get_id(), id)).unwrap_err();
fs::read(format!("files/{}_files/{}", room_id, id)).unwrap_err();
// Check that the file record is also gone
let conn = pool.get().unwrap();
let raw_query = "SELECT id FROM files";