diff --git a/Cargo.lock b/Cargo.lock index 9bee29e..9049e91 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -71,9 +71,14 @@ dependencies = [ [[package]] name = "ahash" -version = "0.4.7" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "739f4a8db6605981345c5654f3a85b056ce52f37a39d34da03f25bf2151ea16e" +checksum = "43bb833f0bf979d8475d38fbf09ed3b8a55e1885fe93ad3f93239fc6a4f17b98" +dependencies = [ + "getrandom 0.2.2", + "once_cell", + "version_check", +] [[package]] name = "aho-corasick" @@ -627,17 +632,23 @@ name = "hashbrown" version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" + +[[package]] +name = "hashbrown" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" dependencies = [ "ahash", ] [[package]] name = "hashlink" -version = "0.6.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d99cf782f0dc4372d26846bec3de7804ceb5df083c2d4462c0b8d2330e894fa8" +checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf" dependencies = [ - "hashbrown", + "hashbrown 0.11.2", ] [[package]] @@ -811,7 +822,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "824845a0bf897a9042383849b02c1bc219c2383772efcd5c6f9766fa4b81aef3" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.9.1", ] [[package]] @@ -886,9 +897,9 @@ checksum = "56d855069fafbb9b344c0f962150cd2c1187975cb1c22c1522c240d8c4986714" [[package]] name = "libsqlite3-sys" -version = "0.20.1" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64d31059f22935e6c31830db5249ba2b7ecd54fd73a9909286f0a67aa55c2fbd" +checksum = "290b64917f8b0cb885d9de0f9959fe1f775d7fa12f1da2db9001c1c8ab60f89d" dependencies = [ "cc", "pkg-config", @@ -1360,9 +1371,9 @@ dependencies = [ [[package]] name = "r2d2_sqlite" -version = "0.17.0" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "227ab35ff4cbb01fa76da8f062590fe677b93c8d9e8415eb5fa981f2c1dba9d8" +checksum = "9d24607049214c5e42d3df53ac1d8a23c34cc6a5eefe3122acb2c72174719959" dependencies = [ "r2d2", "rusqlite", @@ -1542,9 +1553,9 @@ dependencies = [ [[package]] name = "rusqlite" -version = "0.24.2" +version = "0.25.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5f38ee71cbab2c827ec0ac24e76f82eca723cee92c509a65f67dee393c25112" +checksum = "57adcf67c8faaf96f3248c2a7b419a0dbc52ebe36ba83dd57fe83827c1ea4eb3" dependencies = [ "bitflags", "fallible-iterator", diff --git a/Cargo.toml b/Cargo.toml index 19a946a..ba46dd1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,15 +29,15 @@ rand = "0.8" rand_core = "0.5" regex = "1" reqwest = { version = "0.11", features = ["json"] } -rusqlite = { version = "0.24", features = ["bundled"] } +rusqlite = { version = "^0.25", features = ["bundled"] } rusqlite_migration = "0.4" -r2d2_sqlite = "0.17" -r2d2 = "0.8" +r2d2_sqlite = "^0.18" +r2d2 = "^0.8" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" sha2 = "0.9" structopt = "0.3" -tokio = { version = "1.3", features = ["full"] } +tokio = { version = "^1.3", features = ["full"] } url = "2.2.1" warp = { version = "0.3", features = ["tls"] } x25519-dalek = "1.1" diff --git a/src/handlers.rs b/src/handlers.rs index 63df40f..a02b5d7 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -40,10 +40,12 @@ lazy_static::lazy_static! { // Not publicly exposed. pub async fn create_room(room: models::Room) -> Result { // Get a connection - let pool = &storage::MAIN_POOL; - let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; + let conn = storage::DB_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?; + + storage::RoomId::validate(&room.id)?; + // Insert the room - let stmt = "REPLACE INTO main (id, name) VALUES (?1, ?2)"; + let stmt = "INSERT OR REPLACE INTO rooms (identifier, name) VALUES (?, ?)"; match conn.execute(&stmt, params![&room.id, &room.name]) { Ok(_) => (), Err(e) => { @@ -51,10 +53,7 @@ pub async fn create_room(room: models::Room) -> Result { return Err(warp::reject::custom(Error::DatabaseFailedInternally)); } } - // Set up the database - storage::create_database_if_needed( - &storage::RoomId::new(&room.id).ok_or(Error::ValidationFailed)?, - ); + // Return info!("Added room with ID: {}", &room.id); let json = models::StatusCode { status_code: StatusCode::OK.as_u16() }; @@ -64,18 +63,13 @@ pub async fn create_room(room: models::Room) -> Result { // Not publicly exposed. pub async fn delete_room(id: String) -> Result { // Get a connection - let pool = &storage::MAIN_POOL; - let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; + let conn = storage::DB_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?; // Insert the room - let stmt = "DELETE FROM main WHERE id = (?1)"; - match conn.execute(&stmt, params![&id]) { - Ok(_) => (), - Err(e) => { - error!("Couldn't delete room due to error: {}.", e); - return Err(warp::reject::custom(Error::DatabaseFailedInternally)); - } + let stmt = "DELETE FROM rooms WHERE identifier = ?"; + if Err(e) = conn.execute(&stmt, params![&id]) { + error!("Couldn't delete room due to error: {}.", e); + return Err(warp::reject::custom(Error::DatabaseFailedInternally)); } - // Don't auto-delete the database file (the server operator might want to keep it around) // Return info!("Deleted room with ID: {}", &id); let json = models::StatusCode { status_code: StatusCode::OK.as_u16() }; @@ -84,8 +78,7 @@ pub async fn delete_room(id: String) -> Result { pub fn get_room(room_id: &str) -> Result { // Get a connection - let pool = &storage::MAIN_POOL; - let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; + let conn = storage::DB_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?; // Get the room info if possible let raw_query = "SELECT id, name FROM main where id = (?1)"; let room = match conn.query_row(&raw_query, params![room_id], |row| { @@ -106,8 +99,7 @@ pub fn get_room(room_id: &str) -> Result { pub fn get_all_rooms() -> Result { // Get a connection - let pool = &storage::MAIN_POOL; - let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; + let conn = storage::DB_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?; // Get the room info if possible let raw_query = "SELECT id, name FROM main"; let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; @@ -745,10 +737,8 @@ pub fn get_deleted_messages( pub async fn add_moderator_public( body: models::ChangeModeratorRequestBody, auth_token: &str, ) -> Result { - let room_id = storage::RoomId::new(&body.room_id).ok_or(Error::ValidationFailed)?; - let pool = storage::pool_by_room_id(&room_id)?; let (has_authorization_level, _) = - has_authorization_level(auth_token, AuthorizationLevel::Moderator, &pool)?; + has_authorization_level(auth_token, AuthorizationLevel::Moderator, &storage::DB_POOL)?; if !has_authorization_level { return Err(warp::reject::custom(Error::Unauthorized)); } @@ -759,11 +749,9 @@ pub async fn add_moderator_public( pub async fn add_moderator( body: models::ChangeModeratorRequestBody, ) -> Result { + storage::RoomId::validate(&body.room_id)?; // Get a database connection - let pool = storage::pool_by_room_id( - &storage::RoomId::new(&body.room_id).ok_or(Error::ValidationFailed)?, - )?; - let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; + let conn = storage::DB_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?; // Insert the moderator let stmt = "INSERT INTO moderators (public_key) VALUES (?1)"; match conn.execute(&stmt, params![&body.public_key]) { @@ -782,11 +770,8 @@ pub async fn add_moderator( pub async fn delete_moderator_public( body: models::ChangeModeratorRequestBody, auth_token: &str, ) -> Result { - let pool = storage::pool_by_room_id( - &storage::RoomId::new(&body.room_id).ok_or(Error::ValidationFailed)?, - )?; let (has_authorization_level, _) = - has_authorization_level(auth_token, AuthorizationLevel::Moderator, &pool)?; + has_authorization_level(auth_token, AuthorizationLevel::Moderator, &storage::DB_POOL)?; if !has_authorization_level { return Err(warp::reject::custom(Error::Unauthorized)); } @@ -798,10 +783,7 @@ pub async fn delete_moderator( body: models::ChangeModeratorRequestBody, ) -> Result { // Get a database connection - let pool = storage::pool_by_room_id( - &storage::RoomId::new(&body.room_id).ok_or(Error::ValidationFailed)?, - )?; - let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; + let conn = storage::DB_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?; // Insert the moderator let stmt = "DELETE FROM moderators WHERE public_key = (?1)"; match conn.execute(&stmt, params![&body.public_key]) { @@ -1002,8 +984,8 @@ pub fn compact_poll( request_bodies: Vec, ) -> Result { let mut response_bodies: Vec = vec![]; - let main_pool = &storage::MAIN_POOL; - let main_conn = main_pool.get().map_err(|_| Error::DatabaseFailedInternally)?; + let main_conn = storage::DB_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?; + let pool = &storage::DB_POOL; for request_body in request_bodies { // Unwrap the request body let models::CompactPollRequestBody { @@ -1031,10 +1013,6 @@ pub fn compact_poll( continue; } }; - // Get the database connection pool - let pool = storage::pool_by_room_id( - &storage::RoomId::new(&room_id).ok_or(Error::ValidationFailed)?, - )?; // Get the new messages let mut get_messages_query_params: HashMap = HashMap::new(); if let Some(from_message_server_id) = from_message_server_id { @@ -1157,9 +1135,7 @@ pub async fn get_stats_for_room( }; let lowerbound = upperbound - window; - let pool = - storage::pool_by_room_id(&storage::RoomId::new(&room_id).ok_or(Error::ValidationFailed)?)?; - let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; + let conn = storage::DB_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?; let raw_query_users = "SELECT COUNT(public_key) FROM user_activity WHERE last_active > ?1 AND last_active <= ?2"; @@ -1206,7 +1182,7 @@ fn get_pending_tokens( "SELECT timestamp, token FROM pending_tokens WHERE public_key = (?1) AND timestamp > (?2)"; let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; let now = chrono::Utc::now().timestamp(); - let expiration = now - storage::PENDING_TOKEN_EXPIRATION; + let expiration = now; // FIXME - storage::PENDING_TOKEN_EXPIRATION; let rows = match query .query_map(params![public_key, expiration], |row| Ok((row.get(0)?, row.get(1)?))) { diff --git a/src/main.rs b/src/main.rs index b2cda9b..7eb98f8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -64,15 +64,10 @@ async fn main() { info!("Users can join rooms on this open group server using the following URL format:"); info!("{}", get_url()); // Create the main database - storage::create_main_database_if_needed(); + storage::create_database_if_needed(); // Create required folders - fs::create_dir_all("./rooms").unwrap(); fs::create_dir_all("./files").unwrap(); - // Perform migration - storage::perform_migration(); // Set up pruning jobs - let prune_pending_tokens_future = storage::prune_pending_tokens_periodically(); - let prune_tokens_future = storage::prune_tokens_periodically(); let prune_files_future = storage::prune_files_periodically(); // Serve routes let public_routes = routes::root().or(routes::fallback()).or(routes::lsrpc()); @@ -92,8 +87,6 @@ async fn main() { let serve_private_routes_future = warp::serve(private_routes).run(localhost); // Keep futures alive join!( - prune_pending_tokens_future, - prune_tokens_future, prune_files_future, serve_public_routes_future, serve_private_routes_future @@ -104,8 +97,6 @@ async fn main() { let serve_private_routes_future = warp::serve(private_routes).run(localhost); // Keep futures alive join!( - prune_pending_tokens_future, - prune_tokens_future, prune_files_future, serve_public_routes_future, serve_private_routes_future diff --git a/src/rpc.rs b/src/rpc.rs index 7a87456..81d43e7 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -56,10 +56,7 @@ pub async fn handle_rpc_call(rpc_call: RpcCall) -> Result { return handle_get_request(room_id_str, rpc_call, &path, auth_token, query_params).await } "POST" => return handle_post_request(room_id_str, rpc_call, &path, auth_token).await, - "DELETE" => { - let pool = get_pool_for_room(&rpc_call)?; - return handle_delete_request(rpc_call, &path, auth_token, &pool).await; - } + "DELETE" => return handle_delete_request(rpc_call, &path, auth_token).await, _ => { warn!("Ignoring RPC call with invalid or unused HTTP method: {}.", rpc_call.method); return Err(warp::reject::custom(Error::InvalidRpcCall)); @@ -71,10 +68,10 @@ async fn handle_get_request( room_id: Option, rpc_call: RpcCall, path: &str, auth_token: Option, query_params: HashMap, ) -> Result { + let pool = &storage::DB_POOL; // Handle routes that don't require authorization first if path == "auth_token_challenge" { reject_if_file_server_mode(path)?; - let pool = get_pool_for_room(&rpc_call)?; let challenge = handlers::get_auth_token_challenge(query_params, &pool)?; #[derive(Debug, Deserialize, Serialize)] struct Response { @@ -117,7 +114,6 @@ async fn handle_get_request( return Ok(warp::reply::json(&response).into_response()); } // This route requires auth in open group server mode, but not in file server mode - let pool = get_pool_for_room(&rpc_call)?; if path.starts_with("files") { let components: Vec<&str> = path.split('/').collect(); // Split on subsequent slashes if components.len() != 2 { @@ -212,7 +208,7 @@ async fn handle_post_request( return handlers::compact_poll(wrapper.requests); } // This route requires auth in open group server mode, but not in file server mode - let pool = get_pool_for_room(&rpc_call)?; + let pool = &storage::DB_POOL; if path == "files" { #[derive(Debug, Deserialize)] struct JSON { @@ -343,9 +339,9 @@ async fn handle_post_request( } async fn handle_delete_request( - rpc_call: RpcCall, path: &str, auth_token: Option, - pool: &storage::DatabaseConnectionPool, + rpc_call: RpcCall, path: &str, auth_token: Option ) -> Result { + let pool = &storage::DB_POOL; // Check that the auth token is present let auth_token = auth_token.ok_or_else(|| warp::reject::custom(Error::NoAuthToken))?; // DELETE /messages/:server_id @@ -407,13 +403,6 @@ async fn handle_delete_request( // Utilities -fn get_pool_for_room(rpc_call: &RpcCall) -> Result { - let room_id = get_room_id(&rpc_call).ok_or(Error::ValidationFailed)?; - return storage::pool_by_room_id( - &storage::RoomId::new(&room_id).ok_or(Error::ValidationFailed)?, - ).map_err(|e| e.into()); -} - fn get_auth_token(rpc_call: &RpcCall) -> Option { if rpc_call.headers.is_empty() { return None; diff --git a/src/schema.sql b/src/schema.sql new file mode 100644 index 0000000..461d1c7 --- /dev/null +++ b/src/schema.sql @@ -0,0 +1,146 @@ + +CREATE TABLE IF NOT EXISTS rooms ( + id INTEGER NOT NULL PRIMARY KEY, /* internal database id of the room */ + identifier TEXT NOT NULL UNIQUE, /* room identifier used in URLs, etc. */ + name TEXT NOT NULL, /* Publicly visible room name */ + image INTEGER REFERENCES files(id) ON DELETE SET NULL, + created FLOAT NOT NULL DEFAULT ((julianday('now') - 2440587.5)*86400.0), /* unix epoch */ + pinned INTEGER REFERENCES messages(id) ON DELETE SET NULL, + update_counter INTEGER NOT NULL DEFAULT 0, /* +1 for each edit or deletion */ + read BOOLEAN NOT NULL DEFAULT TRUE, /* Whether users can read by default */ + write BOOLEAN NOT NULL DEFAULT TRUE, /* Whether users can post by default */ + upload BOOLEAN NOT NULL DEFAULT TRUE /* Whether file uploads are allowed */ +); +CREATE INDEX IF NOT EXISTS rooms_identifier ON rooms(identifier); + +CREATE TABLE IF NOT EXISTS messages ( + id INTEGER NOT NULL PRIMARY KEY, + room INTEGER NOT NULL REFERENCES rooms(id) ON DELETE CASCADE, + user INTEGER REFERENCES users(id), + posted FLOAT NOT NULL DEFAULT ((julianday('now') - 2440587.5)*86400.0), /* unix epoch */ + updated INTEGER, /* set to the room's incremented update counter when edit/deletion occurs */ + data TEXT, /* Actual message content; set to null to delete a message */ + signature BLOB /* Signature of `data` by `public_key`; set to null when deleting a message */ +); +CREATE INDEX IF NOT EXISTS messages_room ON messages(room, posted); +CREATE INDEX IF NOT EXISTS messages_updated ON messages(room, updated); + +CREATE TABLE message_history ( + message INTEGER NOT NULL REFERENCES messages(id) ON DELETE CASCADE, + replaced FLOAT NOT NULL DEFAULT ((julianday('now') - 2440587.5)*86400.0), /* unix epoch when this historic value was replaced by an edit or deletion */ + data TEXT NOT NULL, /* the content prior to the update/delete */ + signature BLOB NOT NULL /* signature prior to the update/delete */ +); +CREATE INDEX IF NOT EXISTS message_history_message ON message_history(message); + +-- Trigger to record the old value into message_history whenever data is updated, and update the +-- room's update_counter so that clients can query to learn about the update. +CREATE TRIGGER messages_insert_history AFTER UPDATE OF data ON messages +FOR EACH ROW WHEN NEW.data IS NOT OLD.data +BEGIN + INSERT INTO message_history (message, data, signature) VALUES (NEW.id, OLD.data, OLD.signature); + UPDATE rooms SET update_counter = update_counter + 1 WHERE id = NEW.room; + UPDATE messages SET updated = (SELECT update_counter FROM rooms WHERE id = NEW.room); +END; + +-- Trigger to remove the room's pinned message when that message is deleted +CREATE TRIGGER messages_unpin_on_delete AFTER UPDATE OF data ON messages +FOR EACH ROW WHEN NEW.data IS NULL +BEGIN + UPDATE rooms SET pinned = NULL WHERE id = OLD.room AND pinned = OLD.id; +END; + +-- Trigger to handle moving a message from one room to another; we reset the posted time to now, and +-- reset the updated value to NULL, so that the moved message is treated as a brand new message in +-- the new room. We also clear the message as the pinned message from the moved-from room. +CREATE TRIGGER message_mover AFTER UPDATE OF room ON messages +FOR EACH ROW WHEN NEW.room != OLD.room +BEGIN + UPDATE messages SET posted = ((julianday('now') - 2440587.5)*86400.0), updated = FALSE + WHERE messages.id = NEW.id; + UPDATE rooms SET pinned = NULL WHERE id = OLD.room AND pinned = OLD.id; +END; + +CREATE TABLE IF NOT EXISTS files ( + id INTEGER NOT NULL PRIMARY KEY, + room INTEGER NOT NULL REFERENCES rooms(id) ON DELETE CASCADE, + uploader INTEGER REFERENCES users(id), + size INTEGER NOT NULL, + filename TEXT, /* user-provided filename */ + path TEXT NOT NULL /* path on disk */ +); +CREATE INDEX IF NOT EXISTS files_room ON files(room); + +CREATE TABLE IF NOT EXISTS users ( + id INTEGER NOT NULL PRIMARY KEY, + public_key TEXT NOT NULL UNIQUE, + created FLOAT NOT NULL DEFAULT ((julianday('now') - 2440587.5)*86400.0), /* unix epoch */ + last_active FLOAT NOT NULL DEFAULT ((julianday('now') - 2440587.5)*86400.0), /* unix epoch */ + banned BOOLEAN NOT NULL DEFAULT FALSE, /* true = globally banned from all rooms */ + moderator BOOLEAN NOT NULL DEFAULT FALSE, /* true = moderator of all rooms, and can add global bans */ + admin BOOLEAN NOT NULL DEFAULT FALSE /* true = admin of all rooms, and can appoint global bans/mod/admins */ +); +CREATE INDEX IF NOT EXISTS users_last_active ON users(last_active); + +CREATE TABLE IF NOT EXISTS room_users ( + room INTEGER NOT NULL REFERENCES rooms(id) ON DELETE CASCADE, + user INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, + last_active FLOAT NOT NULL DEFAULT ((julianday('now') - 2440587.5)*86400.0), /* unix epoch */ + PRIMARY KEY(room, user) +) WITHOUT ROWID; +CREATE INDEX IF NOT EXISTS room_users_activity ON room_users(room, last_active); + +-- Stores permissions or restrictions on a user. Null values (for read/write) mean "user the room's +-- default". +CREATE TABLE IF NOT EXISTS user_permission_overrides ( + room INTEGER NOT NULL REFERENCES rooms(id) ON DELETE CASCADE, + user INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, + banned BOOLEAN NOT NULL DEFAULT FALSE, /* If true the user is banned */ + read BOOLEAN, /* If false the user may not fetch messages */ + write BOOLEAN, /* If false the user may not post */ + upload BOOLEAN, /* If false the user may not upload files */ + moderator BOOLEAN NOT NULL DEFAULT FALSE, /* If true the user may moderate non-moderators */ + admin BOOLEAN NOT NULL DEFAULT FALSE, /* If true the user may moderate anyone (including other moderators and admins) */ + PRIMARY KEY(room, user), + CHECK(NOT (banned AND (moderator OR admin))) /* Mods/admins cannot be banned */ +) WITHOUT ROWID; + +-- Triggers than remove a user from `room_users` when they are banned from the room +CREATE TRIGGER room_users_remove_banned AFTER UPDATE OF banned ON user_permission_overrides +FOR EACH ROW WHEN NEW.banned +BEGIN + DELETE FROM room_users WHERE room = NEW.room AND user = NEW.user; +END; + +-- View of permissions; for users with an entry in user_permissions we use those values; for null +-- values or no user_permissions entry we return the room's default read/write values (and false for +-- the other fields). We also apply some other properties: admin implies moderator, and moderator +-- implies read&write. +CREATE VIEW IF NOT EXISTS user_permissions AS +SELECT + rooms.id AS room, + users.id AS user, + users.public_key, + CASE WHEN users.banned THEN TRUE ELSE COALESCE(user_permission_overrides.banned, FALSE) END AS banned, + COALESCE(user_permission_overrides.read, rooms.read) AS read, + COALESCE(user_permission_overrides.write, rooms.write) AS write, + COALESCE(user_permission_overrides.upload, rooms.upload) AS upload, + CASE WHEN users.moderator THEN TRUE ELSE COALESCE(user_permission_overrides.moderator, FALSE) END AS moderator, + CASE WHEN users.admin THEN TRUE ELSE COALESCE(user_permission_overrides.admin, FALSE) END AS admin +FROM + users JOIN rooms LEFT OUTER JOIN user_permission_overrides ON + users.id = user_permission_overrides.user AND rooms.id = user_permission_overrides.room; + +-- Scheduled changes to user permissions. For example, to implement a 2-day timeout you would set +-- their user_permissions.write to false, then set a `write = true` entry with a +2d timestamp here. +-- Or to implement a join delay you could set room defaults to false then insert a value here to be +-- applied after a delay. +CREATE TABLE IF NOT EXISTS user_permissions_future ( + user INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, + room INTEGER NOT NULL REFERENCES rooms(id) ON DELETE CASCADE, + at FLOAT NOT NULL, /* when the change should take effect (unix epoch) */ + read BOOLEAN, /* Set this value @ at */ + write BOOLEAN, /* Set this value @ at */ + PRIMARY KEY(user, room) +) WITHOUT ROWID; +CREATE INDEX user_permissions_future_at ON user_permissions_future(at); diff --git a/src/storage.rs b/src/storage.rs index 3ae52a0..0004488 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,16 +1,13 @@ -use regex::Regex; -use std::collections::HashMap; -use std::path::Path; -use std::sync::Mutex; +use std::fs; +use std::time::SystemTime; use log::{error, warn, info}; use r2d2_sqlite::SqliteConnectionManager; +use regex::Regex; use rusqlite::params; -use rusqlite_migration::{Migrations, M}; +//use rusqlite_migration::{Migrations, M}; use super::errors::Error; - -pub type DatabaseConnection = r2d2::PooledConnection; pub type DatabaseConnectionPool = r2d2::Pool; #[derive(PartialEq, Eq, Hash)] @@ -24,14 +21,19 @@ lazy_static::lazy_static! { } impl RoomId { - pub fn new(room_id: &str) -> Option { - if REGULAR_CHARACTERS_ONLY.is_match(room_id) { - return Some(RoomId { id: room_id.to_string() }); + pub fn validate(room_id: &str) -> Result<(), Error> { + return if REGULAR_CHARACTERS_ONLY.is_match(room_id) { + Ok(()) } else { - return None; + Err(Error::ValidationFailed) } } + pub fn new(room_id: &str) -> Result { + RoomId::validate(room_id)?; + Ok(RoomId { id: room_id.to_string() }) + } + pub fn get_id(&self) -> &str { &self.id } @@ -41,371 +43,80 @@ impl RoomId { lazy_static::lazy_static! { - pub static ref MAIN_POOL: DatabaseConnectionPool = { - let file_name = "database.db"; + pub static ref DB_POOL: DatabaseConnectionPool = { + let file_name = "sogs.db"; let db_manager = r2d2_sqlite::SqliteConnectionManager::file(file_name); + // FIXME: enable wal, normal journal mode return r2d2::Pool::new(db_manager).unwrap(); }; } -pub fn create_main_database_if_needed() { - let pool = &MAIN_POOL; - let conn = pool.get().unwrap(); - create_main_tables_if_needed(&conn); -} +/// Initialize the database, creating and migrating its structure if necessary. +pub fn create_database_if_needed() { -fn create_main_tables_if_needed(conn: &DatabaseConnection) { - let main_table_cmd = "CREATE TABLE IF NOT EXISTS main ( - id TEXT PRIMARY KEY, - name TEXT, - image_id TEXT - )"; - conn.execute(&main_table_cmd, params![]).expect("Couldn't create main table."); -} - -// Rooms - -pub const PENDING_TOKEN_EXPIRATION: i64 = 10 * 60; -pub const TOKEN_EXPIRATION: i64 = 7 * 24 * 60 * 60; -pub const FILE_EXPIRATION: i64 = 15 * 24 * 60 * 60; - -lazy_static::lazy_static! { - - static ref POOLS: Mutex> = Mutex::new(HashMap::new()); -} - -pub fn pool_by_room_id(room_id: &RoomId) -> Result { - let mut pools = POOLS.lock().unwrap(); - if let Some(pool) = pools.get(room_id.get_id()) { - return Ok(pool.clone()); - } else { - let pool = &MAIN_POOL; - if let Ok(conn) = pool.get() { - if let Ok(count) = conn.query_row("SELECT COUNT(*) FROM main WHERE id = ?", params![room_id.get_id()], - |row| row.get::<_, i64>(0)) { - if count == 0 { - warn!("Cannot access room database: room {} does not exist", room_id.get_id()); - return Err(Error::NoSuchRoom); - } - let raw_path = format!("rooms/{}.db", room_id.get_id()); - let path = Path::new(&raw_path); - let db_manager = r2d2_sqlite::SqliteConnectionManager::file(path); - let pool = match r2d2::Pool::new(db_manager) { - Ok(p) => p, - Err(e) => { - error!("Unable to access {} database: {}", room_id.get_id(), e); - return Err(Error::DatabaseFailedInternally); - } - }; - pools.insert(room_id.get_id().to_string(), pool); - return Ok(pools[room_id.get_id()].clone()); - } - } - error!("Failed to query main database for room {} existence", room_id.get_id()); - return Err(Error::DatabaseFailedInternally); + if rusqlite::version_number() < 3035000 { + panic!("SQLite 3.35.0+ is required!"); } -} -pub fn create_database_if_needed(room_id: &RoomId) { - let pool = pool_by_room_id(room_id); - let conn = pool.unwrap().get().unwrap(); - create_room_tables_if_needed(&conn); -} + let conn = DB_POOL.get().unwrap(); -pub fn create_room_tables_if_needed(conn: &DatabaseConnection) { - // Messages - // The `id` field is needed to make `rowid` stable, which is important because otherwise - // the `id`s in this table won't correspond to those in the deleted messages table - let messages_table_cmd = "CREATE TABLE IF NOT EXISTS messages ( - id INTEGER PRIMARY KEY, - public_key TEXT, - timestamp INTEGER, - data TEXT, - signature TEXT, - is_deleted INTEGER - )"; - conn.execute(&messages_table_cmd, params![]).expect("Couldn't create messages table."); - // Deleted messages - let deleted_messages_table_cmd = "CREATE TABLE IF NOT EXISTS deleted_messages ( - id INTEGER PRIMARY KEY, - deleted_message_id INTEGER - )"; - conn.execute(&deleted_messages_table_cmd, params![]) - .expect("Couldn't create deleted messages table."); - // Moderators - let moderators_table_cmd = "CREATE TABLE IF NOT EXISTS moderators ( - public_key TEXT - )"; - conn.execute(&moderators_table_cmd, params![]).expect("Couldn't create moderators table."); - // Block list - let block_list_table_cmd = "CREATE TABLE IF NOT EXISTS block_list ( - public_key TEXT - )"; - conn.execute(&block_list_table_cmd, params![]).expect("Couldn't create block list table."); - // Pending tokens - // Note that a given public key can have multiple pending tokens - let pending_tokens_table_cmd = "CREATE TABLE IF NOT EXISTS pending_tokens ( - public_key TEXT, - timestamp INTEGER, - token BLOB - )"; - conn.execute(&pending_tokens_table_cmd, params![]) - .expect("Couldn't create pending tokens table."); - // Tokens - // The token is stored as hex here (rather than as bytes) because it's more convenient for lookup - let tokens_table_cmd = "CREATE TABLE IF NOT EXISTS tokens ( - public_key TEXT, - timestamp INTEGER, - token TEXT PRIMARY KEY - )"; - conn.execute(&tokens_table_cmd, params![]).expect("Couldn't create tokens table."); - // Files - let files_table_cmd = "CREATE TABLE IF NOT EXISTS files ( - id TEXT PRIMARY KEY, - timestamp INTEGER - )"; - conn.execute(&files_table_cmd, params![]).expect("Couldn't create files table."); - // User activity table - let user_activity_table_cmd = "CREATE TABLE IF NOT EXISTS user_activity ( - public_key TEXT PRIMARY KEY, - last_active INTEGER NOT NULL - )"; - conn.execute(&user_activity_table_cmd, params![]) - .expect("Couldn't create user activity table."); + let have_messages = match conn.query_row( + "SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = 'messages')", + params![], + |row| row.get::<_, bool>(0)) { + Ok(exists) => exists, + Err(e) => { panic!("Error querying database: {}", e); } + }; + + if !have_messages { + conn.execute(include_str!("create-schema.sql"), params![]).expect("Couldn't create database schema."); + + // TODO: migration code from old multi-DB structure goes here + } + + // Future DB migration code goes here } // Pruning -pub async fn prune_tokens_periodically() { - let mut timer = tokio::time::interval(chrono::Duration::minutes(10).to_std().unwrap()); - loop { - timer.tick().await; - tokio::spawn(async { - prune_tokens().await; - }); - } -} - -pub async fn prune_pending_tokens_periodically() { - let mut timer = tokio::time::interval(chrono::Duration::minutes(10).to_std().unwrap()); - loop { - timer.tick().await; - tokio::spawn(async { - prune_pending_tokens().await; - }); - } -} - pub async fn prune_files_periodically() { - let mut timer = tokio::time::interval(chrono::Duration::days(1).to_std().unwrap()); + let mut timer = tokio::time::interval(chrono::Duration::seconds(15).to_std().unwrap()); loop { timer.tick().await; tokio::spawn(async { - prune_files(FILE_EXPIRATION).await; + prune_files(SystemTime::now()).await; }); } } -async fn prune_tokens() { - let rooms = match get_all_room_ids() { - Ok(rooms) => rooms, - Err(_) => return, +/// Removes all files with expiries <= the given time (which should generally by +/// `SystemTime::now()`, except in the test suite). +pub async fn prune_files(now: SystemTime) { + + let conn = match DB_POOL.get() { + Ok(conn) => conn, + Err(e) => return error!("Couldn't prune files: {}.", e) }; - for room in rooms { - let pool = match pool_by_room_id(&room) { - Ok(p) => p, - Err(_) => return - }; - // It's not catastrophic if we fail to prune the database for a given room - let conn = match pool.get() { - Ok(conn) => conn, - Err(e) => return error!("Couldn't prune tokens due to error: {}.", e), - }; - let stmt = "DELETE FROM tokens WHERE timestamp < (?1)"; - let now = chrono::Utc::now().timestamp(); - let expiration = now - TOKEN_EXPIRATION; - match conn.execute(&stmt, params![expiration]) { - Ok(_) => (), - Err(e) => return error!("Couldn't prune tokens due to error: {}.", e), - }; - } - info!("Pruned tokens."); -} - -async fn prune_pending_tokens() { - let rooms = match get_all_room_ids() { - Ok(rooms) => rooms, - Err(_) => return, + let mut st = match conn.prepare_cached("DELETE FROM files WHERE expiry <= ? RETURNING path") { + Ok(st) => st, + Err(e) => return error!("Unable to prepare statement: {}", e) }; - for room in rooms { - let pool = match pool_by_room_id(&room) { - Ok(p) => p, - Err(_) => return - }; - // It's not catastrophic if we fail to prune the database for a given room - let conn = match pool.get() { - Ok(conn) => conn, - Err(e) => return error!("Couldn't prune pending tokens due to error: {}.", e), - }; - let stmt = "DELETE FROM pending_tokens WHERE timestamp < (?1)"; - let now = chrono::Utc::now().timestamp(); - let expiration = now - PENDING_TOKEN_EXPIRATION; - match conn.execute(&stmt, params![expiration]) { - Ok(_) => (), - Err(e) => return error!("Couldn't prune pending tokens due to error: {}.", e), - }; - } - info!("Pruned pending tokens."); -} - -fn get_expired_file_ids( - pool: &DatabaseConnectionPool, file_expiration: i64, -) -> Result, ()> { - let now = chrono::Utc::now().timestamp(); - let expiration = now - file_expiration; - // Get a database connection and open a transaction - let conn = pool.get().map_err(|e| { - error!("Couldn't get database connection to prune files due to error: {}.", e); - })?; - // Get the IDs of the files to delete - let raw_query = "SELECT id FROM files WHERE timestamp < (?1)"; - - let mut query = conn.prepare(&raw_query).map_err(|e| { - error!("Couldn't prepare query to prune files due to error: {}.", e); - })?; - - let rows = query.query_map(params![expiration], |row| row.get(0)).map_err(|e| { - error!("Couldn't prune files due to error: {} (expiration = {}).", e, expiration); - })?; - - Ok(rows.filter_map(|result| result.ok()).collect()) -} - -pub async fn prune_files_for_room( - pool: &DatabaseConnectionPool, room: &RoomId, file_expiration: i64, -) { - let ids = get_expired_file_ids(&pool, file_expiration); - - match ids { - Ok(ids) if !ids.is_empty() => { - // Delete the files - let futs = ids.iter().map(|id| async move { - ( - tokio::fs::remove_file(format!("files/{}_files/{}", room.get_id(), id)).await, - id.to_owned(), - ) - }); - - let results = futures::future::join_all(futs).await; - - for (res, id) in results { - if let Err(err) = res { - error!( - "Couldn't delete file: {} from room: {} due to error: {}.", - id, - room.get_id(), - err - ); - } - } - - let conn = match pool.get() { - Ok(conn) => conn, - Err(e) => { - return error!( - "Couldn't get database connection to prune files due to error: {}.", - e - ) - } - }; - - // Measure the time it takes to delete all files sequentially - // (this might become a problem since we're not using an async interface) - let now = std::time::Instant::now(); - - // Remove the file records from the database - // FIXME: It'd be great to do this in a single statement, but apparently this is not supported very well - for id in ids { - let stmt = "DELETE FROM files WHERE id = (?1)"; - match conn.execute(&stmt, params![id]) { - Ok(_) => (), - Err(e) => { - return error!("Couldn't prune file with ID: {} due to error: {}.", id, e) - } - }; - } - // Log the result - info!("Pruned files for room: {}. Took: {:?}", room.get_id(), now.elapsed()); - } - Ok(_) => { - // empty - } - Err(_) => { - // It's not catastrophic if we fail to prune the database for a given room - } - } -} - -pub async fn prune_files(file_expiration: i64) { - // The expiration setting is passed in for testing purposes - let rooms = match get_all_room_ids() { - Ok(rooms) => rooms, - Err(_) => return, - }; - - let futs = rooms.into_iter().map(|room| async move { - if let Ok(pool) = pool_by_room_id(&room) { - prune_files_for_room(&pool, &room, file_expiration).await; - } - }); - - futures::future::join_all(futs).await; -} - -// Migration - -pub fn perform_migration() { - let rooms = match get_all_room_ids() { - Ok(ids) => ids, - Err(_e) => { - return error!("Couldn't get all room IDs."); - } - }; - let create_tokens_table_cmd = "CREATE TABLE IF NOT EXISTS tokens ( - public_key TEXT, - timestamp INTEGER, - token TEXT PRIMARY KEY - )"; - let migrations = - Migrations::new(vec![M::up("DROP TABLE tokens"), M::up(&create_tokens_table_cmd)]); - for room in rooms { - create_database_if_needed(&room); - let pool = pool_by_room_id(&room); - let mut conn = pool.unwrap().get().unwrap(); - migrations.to_latest(&mut conn).unwrap(); - } -} - -// Utilities - -fn get_all_room_ids() -> Result, Error> { - // Get a database connection - let conn = MAIN_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?; - // Query the database - let raw_query = "SELECT id FROM main"; - let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; - let rows = match query.query_map(params![], |row| row.get(0)) { + let now_secs = now.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs_f64(); + let mut rows = match st.query(params![now_secs]) { Ok(rows) => rows, - Err(e) => { - error!("Couldn't query database due to error: {}.", e); - return Err(Error::DatabaseFailedInternally); - } + Err(e) => return error!("Unable to query expired files: {}", e) }; - let room_ids: Vec<_> = rows - .filter_map(|result: Result| result.ok()) - .map(|opt| RoomId::new(&opt)) - .flatten() - .collect(); - // Return - return Ok(room_ids); + + let mut count = 0; + while let Ok(Some(row)) = rows.next() { + if let Ok(path) = row.get_ref_unwrap(1).as_str() { + let p = format!("files/{}", path); + if let Err(e) = fs::remove_file(p) { + error!("Couldn't delete expired file 'files/{}': {}", path, e); + } else { + count += 1; + } + } + } + info!("Pruned {} files", count); }