mirror of
https://github.com/oxen-io/session-open-group-server.git
synced 2023-12-13 20:30:35 +01:00
Initial DB redesign
Completely overhauls the underlying database design: - referential integrity - data normalization - using triggers for various updates to reduce the amount of data logic in code. - indexes - one single database rather than 1 + 1/room - various structural improvements for useful features such as: - individually and default-assignable read/write/upload - time-based permission expiries (e.g. restrict this permission for x amount of time) - distinguishing between admins (who can add/remove moderators) and moderators (who can only moderate but not control the moderator list) - global server moderators/admins - global server bans - single VIEW for simple querying of a user's effective permissions (i.e. with database-side coalescing of global/local permissions). - pinned message via foreign key - room icon as a regular uploaded file - per-file file expiries (so that admins/mods can upload non-expiry files, such as the room icon, or pinned downloads). - message history so that when messages get edited or deleted a log is kept of the old value (the idea being that mods would be able to look this up). - movable messages (e.g. so there could be a "bad posts" room that only moderators have access to).
This commit is contained in:
parent
177ae7fd2e
commit
ace493f643
35
Cargo.lock
generated
35
Cargo.lock
generated
|
@ -71,9 +71,14 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "ahash"
|
||||
version = "0.4.7"
|
||||
version = "0.7.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "739f4a8db6605981345c5654f3a85b056ce52f37a39d34da03f25bf2151ea16e"
|
||||
checksum = "43bb833f0bf979d8475d38fbf09ed3b8a55e1885fe93ad3f93239fc6a4f17b98"
|
||||
dependencies = [
|
||||
"getrandom 0.2.2",
|
||||
"once_cell",
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "aho-corasick"
|
||||
|
@ -627,17 +632,23 @@ name = "hashbrown"
|
|||
version = "0.9.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04"
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.11.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hashlink"
|
||||
version = "0.6.0"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d99cf782f0dc4372d26846bec3de7804ceb5df083c2d4462c0b8d2330e894fa8"
|
||||
checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf"
|
||||
dependencies = [
|
||||
"hashbrown",
|
||||
"hashbrown 0.11.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -811,7 +822,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "824845a0bf897a9042383849b02c1bc219c2383772efcd5c6f9766fa4b81aef3"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"hashbrown",
|
||||
"hashbrown 0.9.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -886,9 +897,9 @@ checksum = "56d855069fafbb9b344c0f962150cd2c1187975cb1c22c1522c240d8c4986714"
|
|||
|
||||
[[package]]
|
||||
name = "libsqlite3-sys"
|
||||
version = "0.20.1"
|
||||
version = "0.22.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "64d31059f22935e6c31830db5249ba2b7ecd54fd73a9909286f0a67aa55c2fbd"
|
||||
checksum = "290b64917f8b0cb885d9de0f9959fe1f775d7fa12f1da2db9001c1c8ab60f89d"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"pkg-config",
|
||||
|
@ -1360,9 +1371,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "r2d2_sqlite"
|
||||
version = "0.17.0"
|
||||
version = "0.18.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "227ab35ff4cbb01fa76da8f062590fe677b93c8d9e8415eb5fa981f2c1dba9d8"
|
||||
checksum = "9d24607049214c5e42d3df53ac1d8a23c34cc6a5eefe3122acb2c72174719959"
|
||||
dependencies = [
|
||||
"r2d2",
|
||||
"rusqlite",
|
||||
|
@ -1542,9 +1553,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "rusqlite"
|
||||
version = "0.24.2"
|
||||
version = "0.25.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d5f38ee71cbab2c827ec0ac24e76f82eca723cee92c509a65f67dee393c25112"
|
||||
checksum = "57adcf67c8faaf96f3248c2a7b419a0dbc52ebe36ba83dd57fe83827c1ea4eb3"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"fallible-iterator",
|
||||
|
|
|
@ -29,15 +29,15 @@ rand = "0.8"
|
|||
rand_core = "0.5"
|
||||
regex = "1"
|
||||
reqwest = { version = "0.11", features = ["json"] }
|
||||
rusqlite = { version = "0.24", features = ["bundled"] }
|
||||
rusqlite = { version = "^0.25", features = ["bundled"] }
|
||||
rusqlite_migration = "0.4"
|
||||
r2d2_sqlite = "0.17"
|
||||
r2d2 = "0.8"
|
||||
r2d2_sqlite = "^0.18"
|
||||
r2d2 = "^0.8"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
sha2 = "0.9"
|
||||
structopt = "0.3"
|
||||
tokio = { version = "1.3", features = ["full"] }
|
||||
tokio = { version = "^1.3", features = ["full"] }
|
||||
url = "2.2.1"
|
||||
warp = { version = "0.3", features = ["tls"] }
|
||||
x25519-dalek = "1.1"
|
||||
|
|
|
@ -40,10 +40,12 @@ lazy_static::lazy_static! {
|
|||
// Not publicly exposed.
|
||||
pub async fn create_room(room: models::Room) -> Result<Response, Rejection> {
|
||||
// Get a connection
|
||||
let pool = &storage::MAIN_POOL;
|
||||
let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
|
||||
let conn = storage::DB_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?;
|
||||
|
||||
storage::RoomId::validate(&room.id)?;
|
||||
|
||||
// Insert the room
|
||||
let stmt = "REPLACE INTO main (id, name) VALUES (?1, ?2)";
|
||||
let stmt = "INSERT OR REPLACE INTO rooms (identifier, name) VALUES (?, ?)";
|
||||
match conn.execute(&stmt, params![&room.id, &room.name]) {
|
||||
Ok(_) => (),
|
||||
Err(e) => {
|
||||
|
@ -51,10 +53,7 @@ pub async fn create_room(room: models::Room) -> Result<Response, Rejection> {
|
|||
return Err(warp::reject::custom(Error::DatabaseFailedInternally));
|
||||
}
|
||||
}
|
||||
// Set up the database
|
||||
storage::create_database_if_needed(
|
||||
&storage::RoomId::new(&room.id).ok_or(Error::ValidationFailed)?,
|
||||
);
|
||||
|
||||
// Return
|
||||
info!("Added room with ID: {}", &room.id);
|
||||
let json = models::StatusCode { status_code: StatusCode::OK.as_u16() };
|
||||
|
@ -64,18 +63,13 @@ pub async fn create_room(room: models::Room) -> Result<Response, Rejection> {
|
|||
// Not publicly exposed.
|
||||
pub async fn delete_room(id: String) -> Result<Response, Rejection> {
|
||||
// Get a connection
|
||||
let pool = &storage::MAIN_POOL;
|
||||
let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
|
||||
let conn = storage::DB_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?;
|
||||
// Insert the room
|
||||
let stmt = "DELETE FROM main WHERE id = (?1)";
|
||||
match conn.execute(&stmt, params![&id]) {
|
||||
Ok(_) => (),
|
||||
Err(e) => {
|
||||
error!("Couldn't delete room due to error: {}.", e);
|
||||
return Err(warp::reject::custom(Error::DatabaseFailedInternally));
|
||||
}
|
||||
let stmt = "DELETE FROM rooms WHERE identifier = ?";
|
||||
if Err(e) = conn.execute(&stmt, params![&id]) {
|
||||
error!("Couldn't delete room due to error: {}.", e);
|
||||
return Err(warp::reject::custom(Error::DatabaseFailedInternally));
|
||||
}
|
||||
// Don't auto-delete the database file (the server operator might want to keep it around)
|
||||
// Return
|
||||
info!("Deleted room with ID: {}", &id);
|
||||
let json = models::StatusCode { status_code: StatusCode::OK.as_u16() };
|
||||
|
@ -84,8 +78,7 @@ pub async fn delete_room(id: String) -> Result<Response, Rejection> {
|
|||
|
||||
pub fn get_room(room_id: &str) -> Result<Response, Rejection> {
|
||||
// Get a connection
|
||||
let pool = &storage::MAIN_POOL;
|
||||
let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
|
||||
let conn = storage::DB_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?;
|
||||
// Get the room info if possible
|
||||
let raw_query = "SELECT id, name FROM main where id = (?1)";
|
||||
let room = match conn.query_row(&raw_query, params![room_id], |row| {
|
||||
|
@ -106,8 +99,7 @@ pub fn get_room(room_id: &str) -> Result<Response, Rejection> {
|
|||
|
||||
pub fn get_all_rooms() -> Result<Response, Rejection> {
|
||||
// Get a connection
|
||||
let pool = &storage::MAIN_POOL;
|
||||
let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
|
||||
let conn = storage::DB_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?;
|
||||
// Get the room info if possible
|
||||
let raw_query = "SELECT id, name FROM main";
|
||||
let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?;
|
||||
|
@ -745,10 +737,8 @@ pub fn get_deleted_messages(
|
|||
pub async fn add_moderator_public(
|
||||
body: models::ChangeModeratorRequestBody, auth_token: &str,
|
||||
) -> Result<Response, Rejection> {
|
||||
let room_id = storage::RoomId::new(&body.room_id).ok_or(Error::ValidationFailed)?;
|
||||
let pool = storage::pool_by_room_id(&room_id)?;
|
||||
let (has_authorization_level, _) =
|
||||
has_authorization_level(auth_token, AuthorizationLevel::Moderator, &pool)?;
|
||||
has_authorization_level(auth_token, AuthorizationLevel::Moderator, &storage::DB_POOL)?;
|
||||
if !has_authorization_level {
|
||||
return Err(warp::reject::custom(Error::Unauthorized));
|
||||
}
|
||||
|
@ -759,11 +749,9 @@ pub async fn add_moderator_public(
|
|||
pub async fn add_moderator(
|
||||
body: models::ChangeModeratorRequestBody,
|
||||
) -> Result<Response, Rejection> {
|
||||
storage::RoomId::validate(&body.room_id)?;
|
||||
// Get a database connection
|
||||
let pool = storage::pool_by_room_id(
|
||||
&storage::RoomId::new(&body.room_id).ok_or(Error::ValidationFailed)?,
|
||||
)?;
|
||||
let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
|
||||
let conn = storage::DB_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?;
|
||||
// Insert the moderator
|
||||
let stmt = "INSERT INTO moderators (public_key) VALUES (?1)";
|
||||
match conn.execute(&stmt, params![&body.public_key]) {
|
||||
|
@ -782,11 +770,8 @@ pub async fn add_moderator(
|
|||
pub async fn delete_moderator_public(
|
||||
body: models::ChangeModeratorRequestBody, auth_token: &str,
|
||||
) -> Result<Response, Rejection> {
|
||||
let pool = storage::pool_by_room_id(
|
||||
&storage::RoomId::new(&body.room_id).ok_or(Error::ValidationFailed)?,
|
||||
)?;
|
||||
let (has_authorization_level, _) =
|
||||
has_authorization_level(auth_token, AuthorizationLevel::Moderator, &pool)?;
|
||||
has_authorization_level(auth_token, AuthorizationLevel::Moderator, &storage::DB_POOL)?;
|
||||
if !has_authorization_level {
|
||||
return Err(warp::reject::custom(Error::Unauthorized));
|
||||
}
|
||||
|
@ -798,10 +783,7 @@ pub async fn delete_moderator(
|
|||
body: models::ChangeModeratorRequestBody,
|
||||
) -> Result<Response, Rejection> {
|
||||
// Get a database connection
|
||||
let pool = storage::pool_by_room_id(
|
||||
&storage::RoomId::new(&body.room_id).ok_or(Error::ValidationFailed)?,
|
||||
)?;
|
||||
let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
|
||||
let conn = storage::DB_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?;
|
||||
// Insert the moderator
|
||||
let stmt = "DELETE FROM moderators WHERE public_key = (?1)";
|
||||
match conn.execute(&stmt, params![&body.public_key]) {
|
||||
|
@ -1002,8 +984,8 @@ pub fn compact_poll(
|
|||
request_bodies: Vec<models::CompactPollRequestBody>,
|
||||
) -> Result<Response, Rejection> {
|
||||
let mut response_bodies: Vec<models::CompactPollResponseBody> = vec![];
|
||||
let main_pool = &storage::MAIN_POOL;
|
||||
let main_conn = main_pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
|
||||
let main_conn = storage::DB_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?;
|
||||
let pool = &storage::DB_POOL;
|
||||
for request_body in request_bodies {
|
||||
// Unwrap the request body
|
||||
let models::CompactPollRequestBody {
|
||||
|
@ -1031,10 +1013,6 @@ pub fn compact_poll(
|
|||
continue;
|
||||
}
|
||||
};
|
||||
// Get the database connection pool
|
||||
let pool = storage::pool_by_room_id(
|
||||
&storage::RoomId::new(&room_id).ok_or(Error::ValidationFailed)?,
|
||||
)?;
|
||||
// Get the new messages
|
||||
let mut get_messages_query_params: HashMap<String, String> = HashMap::new();
|
||||
if let Some(from_message_server_id) = from_message_server_id {
|
||||
|
@ -1157,9 +1135,7 @@ pub async fn get_stats_for_room(
|
|||
};
|
||||
|
||||
let lowerbound = upperbound - window;
|
||||
let pool =
|
||||
storage::pool_by_room_id(&storage::RoomId::new(&room_id).ok_or(Error::ValidationFailed)?)?;
|
||||
let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
|
||||
let conn = storage::DB_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?;
|
||||
|
||||
let raw_query_users =
|
||||
"SELECT COUNT(public_key) FROM user_activity WHERE last_active > ?1 AND last_active <= ?2";
|
||||
|
@ -1206,7 +1182,7 @@ fn get_pending_tokens(
|
|||
"SELECT timestamp, token FROM pending_tokens WHERE public_key = (?1) AND timestamp > (?2)";
|
||||
let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?;
|
||||
let now = chrono::Utc::now().timestamp();
|
||||
let expiration = now - storage::PENDING_TOKEN_EXPIRATION;
|
||||
let expiration = now; // FIXME - storage::PENDING_TOKEN_EXPIRATION;
|
||||
let rows = match query
|
||||
.query_map(params![public_key, expiration], |row| Ok((row.get(0)?, row.get(1)?)))
|
||||
{
|
||||
|
|
11
src/main.rs
11
src/main.rs
|
@ -64,15 +64,10 @@ async fn main() {
|
|||
info!("Users can join rooms on this open group server using the following URL format:");
|
||||
info!("{}", get_url());
|
||||
// Create the main database
|
||||
storage::create_main_database_if_needed();
|
||||
storage::create_database_if_needed();
|
||||
// Create required folders
|
||||
fs::create_dir_all("./rooms").unwrap();
|
||||
fs::create_dir_all("./files").unwrap();
|
||||
// Perform migration
|
||||
storage::perform_migration();
|
||||
// Set up pruning jobs
|
||||
let prune_pending_tokens_future = storage::prune_pending_tokens_periodically();
|
||||
let prune_tokens_future = storage::prune_tokens_periodically();
|
||||
let prune_files_future = storage::prune_files_periodically();
|
||||
// Serve routes
|
||||
let public_routes = routes::root().or(routes::fallback()).or(routes::lsrpc());
|
||||
|
@ -92,8 +87,6 @@ async fn main() {
|
|||
let serve_private_routes_future = warp::serve(private_routes).run(localhost);
|
||||
// Keep futures alive
|
||||
join!(
|
||||
prune_pending_tokens_future,
|
||||
prune_tokens_future,
|
||||
prune_files_future,
|
||||
serve_public_routes_future,
|
||||
serve_private_routes_future
|
||||
|
@ -104,8 +97,6 @@ async fn main() {
|
|||
let serve_private_routes_future = warp::serve(private_routes).run(localhost);
|
||||
// Keep futures alive
|
||||
join!(
|
||||
prune_pending_tokens_future,
|
||||
prune_tokens_future,
|
||||
prune_files_future,
|
||||
serve_public_routes_future,
|
||||
serve_private_routes_future
|
||||
|
|
21
src/rpc.rs
21
src/rpc.rs
|
@ -56,10 +56,7 @@ pub async fn handle_rpc_call(rpc_call: RpcCall) -> Result<Response, Rejection> {
|
|||
return handle_get_request(room_id_str, rpc_call, &path, auth_token, query_params).await
|
||||
}
|
||||
"POST" => return handle_post_request(room_id_str, rpc_call, &path, auth_token).await,
|
||||
"DELETE" => {
|
||||
let pool = get_pool_for_room(&rpc_call)?;
|
||||
return handle_delete_request(rpc_call, &path, auth_token, &pool).await;
|
||||
}
|
||||
"DELETE" => return handle_delete_request(rpc_call, &path, auth_token).await,
|
||||
_ => {
|
||||
warn!("Ignoring RPC call with invalid or unused HTTP method: {}.", rpc_call.method);
|
||||
return Err(warp::reject::custom(Error::InvalidRpcCall));
|
||||
|
@ -71,10 +68,10 @@ async fn handle_get_request(
|
|||
room_id: Option<String>, rpc_call: RpcCall, path: &str, auth_token: Option<String>,
|
||||
query_params: HashMap<String, String>,
|
||||
) -> Result<Response, Rejection> {
|
||||
let pool = &storage::DB_POOL;
|
||||
// Handle routes that don't require authorization first
|
||||
if path == "auth_token_challenge" {
|
||||
reject_if_file_server_mode(path)?;
|
||||
let pool = get_pool_for_room(&rpc_call)?;
|
||||
let challenge = handlers::get_auth_token_challenge(query_params, &pool)?;
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
struct Response {
|
||||
|
@ -117,7 +114,6 @@ async fn handle_get_request(
|
|||
return Ok(warp::reply::json(&response).into_response());
|
||||
}
|
||||
// This route requires auth in open group server mode, but not in file server mode
|
||||
let pool = get_pool_for_room(&rpc_call)?;
|
||||
if path.starts_with("files") {
|
||||
let components: Vec<&str> = path.split('/').collect(); // Split on subsequent slashes
|
||||
if components.len() != 2 {
|
||||
|
@ -212,7 +208,7 @@ async fn handle_post_request(
|
|||
return handlers::compact_poll(wrapper.requests);
|
||||
}
|
||||
// This route requires auth in open group server mode, but not in file server mode
|
||||
let pool = get_pool_for_room(&rpc_call)?;
|
||||
let pool = &storage::DB_POOL;
|
||||
if path == "files" {
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct JSON {
|
||||
|
@ -343,9 +339,9 @@ async fn handle_post_request(
|
|||
}
|
||||
|
||||
async fn handle_delete_request(
|
||||
rpc_call: RpcCall, path: &str, auth_token: Option<String>,
|
||||
pool: &storage::DatabaseConnectionPool,
|
||||
rpc_call: RpcCall, path: &str, auth_token: Option<String>
|
||||
) -> Result<Response, Rejection> {
|
||||
let pool = &storage::DB_POOL;
|
||||
// Check that the auth token is present
|
||||
let auth_token = auth_token.ok_or_else(|| warp::reject::custom(Error::NoAuthToken))?;
|
||||
// DELETE /messages/:server_id
|
||||
|
@ -407,13 +403,6 @@ async fn handle_delete_request(
|
|||
|
||||
// Utilities
|
||||
|
||||
fn get_pool_for_room(rpc_call: &RpcCall) -> Result<storage::DatabaseConnectionPool, Rejection> {
|
||||
let room_id = get_room_id(&rpc_call).ok_or(Error::ValidationFailed)?;
|
||||
return storage::pool_by_room_id(
|
||||
&storage::RoomId::new(&room_id).ok_or(Error::ValidationFailed)?,
|
||||
).map_err(|e| e.into());
|
||||
}
|
||||
|
||||
fn get_auth_token(rpc_call: &RpcCall) -> Option<String> {
|
||||
if rpc_call.headers.is_empty() {
|
||||
return None;
|
||||
|
|
146
src/schema.sql
Normal file
146
src/schema.sql
Normal file
|
@ -0,0 +1,146 @@
|
|||
|
||||
CREATE TABLE IF NOT EXISTS rooms (
|
||||
id INTEGER NOT NULL PRIMARY KEY, /* internal database id of the room */
|
||||
identifier TEXT NOT NULL UNIQUE, /* room identifier used in URLs, etc. */
|
||||
name TEXT NOT NULL, /* Publicly visible room name */
|
||||
image INTEGER REFERENCES files(id) ON DELETE SET NULL,
|
||||
created FLOAT NOT NULL DEFAULT ((julianday('now') - 2440587.5)*86400.0), /* unix epoch */
|
||||
pinned INTEGER REFERENCES messages(id) ON DELETE SET NULL,
|
||||
update_counter INTEGER NOT NULL DEFAULT 0, /* +1 for each edit or deletion */
|
||||
read BOOLEAN NOT NULL DEFAULT TRUE, /* Whether users can read by default */
|
||||
write BOOLEAN NOT NULL DEFAULT TRUE, /* Whether users can post by default */
|
||||
upload BOOLEAN NOT NULL DEFAULT TRUE /* Whether file uploads are allowed */
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS rooms_identifier ON rooms(identifier);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS messages (
|
||||
id INTEGER NOT NULL PRIMARY KEY,
|
||||
room INTEGER NOT NULL REFERENCES rooms(id) ON DELETE CASCADE,
|
||||
user INTEGER REFERENCES users(id),
|
||||
posted FLOAT NOT NULL DEFAULT ((julianday('now') - 2440587.5)*86400.0), /* unix epoch */
|
||||
updated INTEGER, /* set to the room's incremented update counter when edit/deletion occurs */
|
||||
data TEXT, /* Actual message content; set to null to delete a message */
|
||||
signature BLOB /* Signature of `data` by `public_key`; set to null when deleting a message */
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS messages_room ON messages(room, posted);
|
||||
CREATE INDEX IF NOT EXISTS messages_updated ON messages(room, updated);
|
||||
|
||||
CREATE TABLE message_history (
|
||||
message INTEGER NOT NULL REFERENCES messages(id) ON DELETE CASCADE,
|
||||
replaced FLOAT NOT NULL DEFAULT ((julianday('now') - 2440587.5)*86400.0), /* unix epoch when this historic value was replaced by an edit or deletion */
|
||||
data TEXT NOT NULL, /* the content prior to the update/delete */
|
||||
signature BLOB NOT NULL /* signature prior to the update/delete */
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS message_history_message ON message_history(message);
|
||||
|
||||
-- Trigger to record the old value into message_history whenever data is updated, and update the
|
||||
-- room's update_counter so that clients can query to learn about the update.
|
||||
CREATE TRIGGER messages_insert_history AFTER UPDATE OF data ON messages
|
||||
FOR EACH ROW WHEN NEW.data IS NOT OLD.data
|
||||
BEGIN
|
||||
INSERT INTO message_history (message, data, signature) VALUES (NEW.id, OLD.data, OLD.signature);
|
||||
UPDATE rooms SET update_counter = update_counter + 1 WHERE id = NEW.room;
|
||||
UPDATE messages SET updated = (SELECT update_counter FROM rooms WHERE id = NEW.room);
|
||||
END;
|
||||
|
||||
-- Trigger to remove the room's pinned message when that message is deleted
|
||||
CREATE TRIGGER messages_unpin_on_delete AFTER UPDATE OF data ON messages
|
||||
FOR EACH ROW WHEN NEW.data IS NULL
|
||||
BEGIN
|
||||
UPDATE rooms SET pinned = NULL WHERE id = OLD.room AND pinned = OLD.id;
|
||||
END;
|
||||
|
||||
-- Trigger to handle moving a message from one room to another; we reset the posted time to now, and
|
||||
-- reset the updated value to NULL, so that the moved message is treated as a brand new message in
|
||||
-- the new room. We also clear the message as the pinned message from the moved-from room.
|
||||
CREATE TRIGGER message_mover AFTER UPDATE OF room ON messages
|
||||
FOR EACH ROW WHEN NEW.room != OLD.room
|
||||
BEGIN
|
||||
UPDATE messages SET posted = ((julianday('now') - 2440587.5)*86400.0), updated = FALSE
|
||||
WHERE messages.id = NEW.id;
|
||||
UPDATE rooms SET pinned = NULL WHERE id = OLD.room AND pinned = OLD.id;
|
||||
END;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS files (
|
||||
id INTEGER NOT NULL PRIMARY KEY,
|
||||
room INTEGER NOT NULL REFERENCES rooms(id) ON DELETE CASCADE,
|
||||
uploader INTEGER REFERENCES users(id),
|
||||
size INTEGER NOT NULL,
|
||||
filename TEXT, /* user-provided filename */
|
||||
path TEXT NOT NULL /* path on disk */
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS files_room ON files(room);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS users (
|
||||
id INTEGER NOT NULL PRIMARY KEY,
|
||||
public_key TEXT NOT NULL UNIQUE,
|
||||
created FLOAT NOT NULL DEFAULT ((julianday('now') - 2440587.5)*86400.0), /* unix epoch */
|
||||
last_active FLOAT NOT NULL DEFAULT ((julianday('now') - 2440587.5)*86400.0), /* unix epoch */
|
||||
banned BOOLEAN NOT NULL DEFAULT FALSE, /* true = globally banned from all rooms */
|
||||
moderator BOOLEAN NOT NULL DEFAULT FALSE, /* true = moderator of all rooms, and can add global bans */
|
||||
admin BOOLEAN NOT NULL DEFAULT FALSE /* true = admin of all rooms, and can appoint global bans/mod/admins */
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS users_last_active ON users(last_active);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS room_users (
|
||||
room INTEGER NOT NULL REFERENCES rooms(id) ON DELETE CASCADE,
|
||||
user INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
||||
last_active FLOAT NOT NULL DEFAULT ((julianday('now') - 2440587.5)*86400.0), /* unix epoch */
|
||||
PRIMARY KEY(room, user)
|
||||
) WITHOUT ROWID;
|
||||
CREATE INDEX IF NOT EXISTS room_users_activity ON room_users(room, last_active);
|
||||
|
||||
-- Stores permissions or restrictions on a user. Null values (for read/write) mean "user the room's
|
||||
-- default".
|
||||
CREATE TABLE IF NOT EXISTS user_permission_overrides (
|
||||
room INTEGER NOT NULL REFERENCES rooms(id) ON DELETE CASCADE,
|
||||
user INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
||||
banned BOOLEAN NOT NULL DEFAULT FALSE, /* If true the user is banned */
|
||||
read BOOLEAN, /* If false the user may not fetch messages */
|
||||
write BOOLEAN, /* If false the user may not post */
|
||||
upload BOOLEAN, /* If false the user may not upload files */
|
||||
moderator BOOLEAN NOT NULL DEFAULT FALSE, /* If true the user may moderate non-moderators */
|
||||
admin BOOLEAN NOT NULL DEFAULT FALSE, /* If true the user may moderate anyone (including other moderators and admins) */
|
||||
PRIMARY KEY(room, user),
|
||||
CHECK(NOT (banned AND (moderator OR admin))) /* Mods/admins cannot be banned */
|
||||
) WITHOUT ROWID;
|
||||
|
||||
-- Triggers than remove a user from `room_users` when they are banned from the room
|
||||
CREATE TRIGGER room_users_remove_banned AFTER UPDATE OF banned ON user_permission_overrides
|
||||
FOR EACH ROW WHEN NEW.banned
|
||||
BEGIN
|
||||
DELETE FROM room_users WHERE room = NEW.room AND user = NEW.user;
|
||||
END;
|
||||
|
||||
-- View of permissions; for users with an entry in user_permissions we use those values; for null
|
||||
-- values or no user_permissions entry we return the room's default read/write values (and false for
|
||||
-- the other fields). We also apply some other properties: admin implies moderator, and moderator
|
||||
-- implies read&write.
|
||||
CREATE VIEW IF NOT EXISTS user_permissions AS
|
||||
SELECT
|
||||
rooms.id AS room,
|
||||
users.id AS user,
|
||||
users.public_key,
|
||||
CASE WHEN users.banned THEN TRUE ELSE COALESCE(user_permission_overrides.banned, FALSE) END AS banned,
|
||||
COALESCE(user_permission_overrides.read, rooms.read) AS read,
|
||||
COALESCE(user_permission_overrides.write, rooms.write) AS write,
|
||||
COALESCE(user_permission_overrides.upload, rooms.upload) AS upload,
|
||||
CASE WHEN users.moderator THEN TRUE ELSE COALESCE(user_permission_overrides.moderator, FALSE) END AS moderator,
|
||||
CASE WHEN users.admin THEN TRUE ELSE COALESCE(user_permission_overrides.admin, FALSE) END AS admin
|
||||
FROM
|
||||
users JOIN rooms LEFT OUTER JOIN user_permission_overrides ON
|
||||
users.id = user_permission_overrides.user AND rooms.id = user_permission_overrides.room;
|
||||
|
||||
-- Scheduled changes to user permissions. For example, to implement a 2-day timeout you would set
|
||||
-- their user_permissions.write to false, then set a `write = true` entry with a +2d timestamp here.
|
||||
-- Or to implement a join delay you could set room defaults to false then insert a value here to be
|
||||
-- applied after a delay.
|
||||
CREATE TABLE IF NOT EXISTS user_permissions_future (
|
||||
user INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
||||
room INTEGER NOT NULL REFERENCES rooms(id) ON DELETE CASCADE,
|
||||
at FLOAT NOT NULL, /* when the change should take effect (unix epoch) */
|
||||
read BOOLEAN, /* Set this value @ at */
|
||||
write BOOLEAN, /* Set this value @ at */
|
||||
PRIMARY KEY(user, room)
|
||||
) WITHOUT ROWID;
|
||||
CREATE INDEX user_permissions_future_at ON user_permissions_future(at);
|
417
src/storage.rs
417
src/storage.rs
|
@ -1,16 +1,13 @@
|
|||
use regex::Regex;
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::sync::Mutex;
|
||||
use std::fs;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use log::{error, warn, info};
|
||||
use r2d2_sqlite::SqliteConnectionManager;
|
||||
use regex::Regex;
|
||||
use rusqlite::params;
|
||||
use rusqlite_migration::{Migrations, M};
|
||||
//use rusqlite_migration::{Migrations, M};
|
||||
|
||||
use super::errors::Error;
|
||||
|
||||
pub type DatabaseConnection = r2d2::PooledConnection<SqliteConnectionManager>;
|
||||
pub type DatabaseConnectionPool = r2d2::Pool<SqliteConnectionManager>;
|
||||
|
||||
#[derive(PartialEq, Eq, Hash)]
|
||||
|
@ -24,14 +21,19 @@ lazy_static::lazy_static! {
|
|||
}
|
||||
|
||||
impl RoomId {
|
||||
pub fn new(room_id: &str) -> Option<RoomId> {
|
||||
if REGULAR_CHARACTERS_ONLY.is_match(room_id) {
|
||||
return Some(RoomId { id: room_id.to_string() });
|
||||
pub fn validate(room_id: &str) -> Result<(), Error> {
|
||||
return if REGULAR_CHARACTERS_ONLY.is_match(room_id) {
|
||||
Ok(())
|
||||
} else {
|
||||
return None;
|
||||
Err(Error::ValidationFailed)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new(room_id: &str) -> Result<RoomId, Error> {
|
||||
RoomId::validate(room_id)?;
|
||||
Ok(RoomId { id: room_id.to_string() })
|
||||
}
|
||||
|
||||
pub fn get_id(&self) -> &str {
|
||||
&self.id
|
||||
}
|
||||
|
@ -41,371 +43,80 @@ impl RoomId {
|
|||
|
||||
lazy_static::lazy_static! {
|
||||
|
||||
pub static ref MAIN_POOL: DatabaseConnectionPool = {
|
||||
let file_name = "database.db";
|
||||
pub static ref DB_POOL: DatabaseConnectionPool = {
|
||||
let file_name = "sogs.db";
|
||||
let db_manager = r2d2_sqlite::SqliteConnectionManager::file(file_name);
|
||||
// FIXME: enable wal, normal journal mode
|
||||
return r2d2::Pool::new(db_manager).unwrap();
|
||||
};
|
||||
}
|
||||
|
||||
pub fn create_main_database_if_needed() {
|
||||
let pool = &MAIN_POOL;
|
||||
let conn = pool.get().unwrap();
|
||||
create_main_tables_if_needed(&conn);
|
||||
}
|
||||
/// Initialize the database, creating and migrating its structure if necessary.
|
||||
pub fn create_database_if_needed() {
|
||||
|
||||
fn create_main_tables_if_needed(conn: &DatabaseConnection) {
|
||||
let main_table_cmd = "CREATE TABLE IF NOT EXISTS main (
|
||||
id TEXT PRIMARY KEY,
|
||||
name TEXT,
|
||||
image_id TEXT
|
||||
)";
|
||||
conn.execute(&main_table_cmd, params![]).expect("Couldn't create main table.");
|
||||
}
|
||||
|
||||
// Rooms
|
||||
|
||||
pub const PENDING_TOKEN_EXPIRATION: i64 = 10 * 60;
|
||||
pub const TOKEN_EXPIRATION: i64 = 7 * 24 * 60 * 60;
|
||||
pub const FILE_EXPIRATION: i64 = 15 * 24 * 60 * 60;
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
|
||||
static ref POOLS: Mutex<HashMap<String, DatabaseConnectionPool>> = Mutex::new(HashMap::new());
|
||||
}
|
||||
|
||||
pub fn pool_by_room_id(room_id: &RoomId) -> Result<DatabaseConnectionPool, Error> {
|
||||
let mut pools = POOLS.lock().unwrap();
|
||||
if let Some(pool) = pools.get(room_id.get_id()) {
|
||||
return Ok(pool.clone());
|
||||
} else {
|
||||
let pool = &MAIN_POOL;
|
||||
if let Ok(conn) = pool.get() {
|
||||
if let Ok(count) = conn.query_row("SELECT COUNT(*) FROM main WHERE id = ?", params![room_id.get_id()],
|
||||
|row| row.get::<_, i64>(0)) {
|
||||
if count == 0 {
|
||||
warn!("Cannot access room database: room {} does not exist", room_id.get_id());
|
||||
return Err(Error::NoSuchRoom);
|
||||
}
|
||||
let raw_path = format!("rooms/{}.db", room_id.get_id());
|
||||
let path = Path::new(&raw_path);
|
||||
let db_manager = r2d2_sqlite::SqliteConnectionManager::file(path);
|
||||
let pool = match r2d2::Pool::new(db_manager) {
|
||||
Ok(p) => p,
|
||||
Err(e) => {
|
||||
error!("Unable to access {} database: {}", room_id.get_id(), e);
|
||||
return Err(Error::DatabaseFailedInternally);
|
||||
}
|
||||
};
|
||||
pools.insert(room_id.get_id().to_string(), pool);
|
||||
return Ok(pools[room_id.get_id()].clone());
|
||||
}
|
||||
}
|
||||
error!("Failed to query main database for room {} existence", room_id.get_id());
|
||||
return Err(Error::DatabaseFailedInternally);
|
||||
if rusqlite::version_number() < 3035000 {
|
||||
panic!("SQLite 3.35.0+ is required!");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_database_if_needed(room_id: &RoomId) {
|
||||
let pool = pool_by_room_id(room_id);
|
||||
let conn = pool.unwrap().get().unwrap();
|
||||
create_room_tables_if_needed(&conn);
|
||||
}
|
||||
let conn = DB_POOL.get().unwrap();
|
||||
|
||||
pub fn create_room_tables_if_needed(conn: &DatabaseConnection) {
|
||||
// Messages
|
||||
// The `id` field is needed to make `rowid` stable, which is important because otherwise
|
||||
// the `id`s in this table won't correspond to those in the deleted messages table
|
||||
let messages_table_cmd = "CREATE TABLE IF NOT EXISTS messages (
|
||||
id INTEGER PRIMARY KEY,
|
||||
public_key TEXT,
|
||||
timestamp INTEGER,
|
||||
data TEXT,
|
||||
signature TEXT,
|
||||
is_deleted INTEGER
|
||||
)";
|
||||
conn.execute(&messages_table_cmd, params![]).expect("Couldn't create messages table.");
|
||||
// Deleted messages
|
||||
let deleted_messages_table_cmd = "CREATE TABLE IF NOT EXISTS deleted_messages (
|
||||
id INTEGER PRIMARY KEY,
|
||||
deleted_message_id INTEGER
|
||||
)";
|
||||
conn.execute(&deleted_messages_table_cmd, params![])
|
||||
.expect("Couldn't create deleted messages table.");
|
||||
// Moderators
|
||||
let moderators_table_cmd = "CREATE TABLE IF NOT EXISTS moderators (
|
||||
public_key TEXT
|
||||
)";
|
||||
conn.execute(&moderators_table_cmd, params![]).expect("Couldn't create moderators table.");
|
||||
// Block list
|
||||
let block_list_table_cmd = "CREATE TABLE IF NOT EXISTS block_list (
|
||||
public_key TEXT
|
||||
)";
|
||||
conn.execute(&block_list_table_cmd, params![]).expect("Couldn't create block list table.");
|
||||
// Pending tokens
|
||||
// Note that a given public key can have multiple pending tokens
|
||||
let pending_tokens_table_cmd = "CREATE TABLE IF NOT EXISTS pending_tokens (
|
||||
public_key TEXT,
|
||||
timestamp INTEGER,
|
||||
token BLOB
|
||||
)";
|
||||
conn.execute(&pending_tokens_table_cmd, params![])
|
||||
.expect("Couldn't create pending tokens table.");
|
||||
// Tokens
|
||||
// The token is stored as hex here (rather than as bytes) because it's more convenient for lookup
|
||||
let tokens_table_cmd = "CREATE TABLE IF NOT EXISTS tokens (
|
||||
public_key TEXT,
|
||||
timestamp INTEGER,
|
||||
token TEXT PRIMARY KEY
|
||||
)";
|
||||
conn.execute(&tokens_table_cmd, params![]).expect("Couldn't create tokens table.");
|
||||
// Files
|
||||
let files_table_cmd = "CREATE TABLE IF NOT EXISTS files (
|
||||
id TEXT PRIMARY KEY,
|
||||
timestamp INTEGER
|
||||
)";
|
||||
conn.execute(&files_table_cmd, params![]).expect("Couldn't create files table.");
|
||||
// User activity table
|
||||
let user_activity_table_cmd = "CREATE TABLE IF NOT EXISTS user_activity (
|
||||
public_key TEXT PRIMARY KEY,
|
||||
last_active INTEGER NOT NULL
|
||||
)";
|
||||
conn.execute(&user_activity_table_cmd, params![])
|
||||
.expect("Couldn't create user activity table.");
|
||||
let have_messages = match conn.query_row(
|
||||
"SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = 'messages')",
|
||||
params![],
|
||||
|row| row.get::<_, bool>(0)) {
|
||||
Ok(exists) => exists,
|
||||
Err(e) => { panic!("Error querying database: {}", e); }
|
||||
};
|
||||
|
||||
if !have_messages {
|
||||
conn.execute(include_str!("create-schema.sql"), params![]).expect("Couldn't create database schema.");
|
||||
|
||||
// TODO: migration code from old multi-DB structure goes here
|
||||
}
|
||||
|
||||
// Future DB migration code goes here
|
||||
}
|
||||
|
||||
// Pruning
|
||||
|
||||
pub async fn prune_tokens_periodically() {
|
||||
let mut timer = tokio::time::interval(chrono::Duration::minutes(10).to_std().unwrap());
|
||||
loop {
|
||||
timer.tick().await;
|
||||
tokio::spawn(async {
|
||||
prune_tokens().await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn prune_pending_tokens_periodically() {
|
||||
let mut timer = tokio::time::interval(chrono::Duration::minutes(10).to_std().unwrap());
|
||||
loop {
|
||||
timer.tick().await;
|
||||
tokio::spawn(async {
|
||||
prune_pending_tokens().await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn prune_files_periodically() {
|
||||
let mut timer = tokio::time::interval(chrono::Duration::days(1).to_std().unwrap());
|
||||
let mut timer = tokio::time::interval(chrono::Duration::seconds(15).to_std().unwrap());
|
||||
loop {
|
||||
timer.tick().await;
|
||||
tokio::spawn(async {
|
||||
prune_files(FILE_EXPIRATION).await;
|
||||
prune_files(SystemTime::now()).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn prune_tokens() {
|
||||
let rooms = match get_all_room_ids() {
|
||||
Ok(rooms) => rooms,
|
||||
Err(_) => return,
|
||||
/// Removes all files with expiries <= the given time (which should generally by
|
||||
/// `SystemTime::now()`, except in the test suite).
|
||||
pub async fn prune_files(now: SystemTime) {
|
||||
|
||||
let conn = match DB_POOL.get() {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => return error!("Couldn't prune files: {}.", e)
|
||||
};
|
||||
for room in rooms {
|
||||
let pool = match pool_by_room_id(&room) {
|
||||
Ok(p) => p,
|
||||
Err(_) => return
|
||||
};
|
||||
// It's not catastrophic if we fail to prune the database for a given room
|
||||
let conn = match pool.get() {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => return error!("Couldn't prune tokens due to error: {}.", e),
|
||||
};
|
||||
let stmt = "DELETE FROM tokens WHERE timestamp < (?1)";
|
||||
let now = chrono::Utc::now().timestamp();
|
||||
let expiration = now - TOKEN_EXPIRATION;
|
||||
match conn.execute(&stmt, params![expiration]) {
|
||||
Ok(_) => (),
|
||||
Err(e) => return error!("Couldn't prune tokens due to error: {}.", e),
|
||||
};
|
||||
}
|
||||
info!("Pruned tokens.");
|
||||
}
|
||||
|
||||
async fn prune_pending_tokens() {
|
||||
let rooms = match get_all_room_ids() {
|
||||
Ok(rooms) => rooms,
|
||||
Err(_) => return,
|
||||
let mut st = match conn.prepare_cached("DELETE FROM files WHERE expiry <= ? RETURNING path") {
|
||||
Ok(st) => st,
|
||||
Err(e) => return error!("Unable to prepare statement: {}", e)
|
||||
};
|
||||
for room in rooms {
|
||||
let pool = match pool_by_room_id(&room) {
|
||||
Ok(p) => p,
|
||||
Err(_) => return
|
||||
};
|
||||
// It's not catastrophic if we fail to prune the database for a given room
|
||||
let conn = match pool.get() {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => return error!("Couldn't prune pending tokens due to error: {}.", e),
|
||||
};
|
||||
let stmt = "DELETE FROM pending_tokens WHERE timestamp < (?1)";
|
||||
let now = chrono::Utc::now().timestamp();
|
||||
let expiration = now - PENDING_TOKEN_EXPIRATION;
|
||||
match conn.execute(&stmt, params![expiration]) {
|
||||
Ok(_) => (),
|
||||
Err(e) => return error!("Couldn't prune pending tokens due to error: {}.", e),
|
||||
};
|
||||
}
|
||||
info!("Pruned pending tokens.");
|
||||
}
|
||||
|
||||
fn get_expired_file_ids(
|
||||
pool: &DatabaseConnectionPool, file_expiration: i64,
|
||||
) -> Result<Vec<String>, ()> {
|
||||
let now = chrono::Utc::now().timestamp();
|
||||
let expiration = now - file_expiration;
|
||||
// Get a database connection and open a transaction
|
||||
let conn = pool.get().map_err(|e| {
|
||||
error!("Couldn't get database connection to prune files due to error: {}.", e);
|
||||
})?;
|
||||
// Get the IDs of the files to delete
|
||||
let raw_query = "SELECT id FROM files WHERE timestamp < (?1)";
|
||||
|
||||
let mut query = conn.prepare(&raw_query).map_err(|e| {
|
||||
error!("Couldn't prepare query to prune files due to error: {}.", e);
|
||||
})?;
|
||||
|
||||
let rows = query.query_map(params![expiration], |row| row.get(0)).map_err(|e| {
|
||||
error!("Couldn't prune files due to error: {} (expiration = {}).", e, expiration);
|
||||
})?;
|
||||
|
||||
Ok(rows.filter_map(|result| result.ok()).collect())
|
||||
}
|
||||
|
||||
pub async fn prune_files_for_room(
|
||||
pool: &DatabaseConnectionPool, room: &RoomId, file_expiration: i64,
|
||||
) {
|
||||
let ids = get_expired_file_ids(&pool, file_expiration);
|
||||
|
||||
match ids {
|
||||
Ok(ids) if !ids.is_empty() => {
|
||||
// Delete the files
|
||||
let futs = ids.iter().map(|id| async move {
|
||||
(
|
||||
tokio::fs::remove_file(format!("files/{}_files/{}", room.get_id(), id)).await,
|
||||
id.to_owned(),
|
||||
)
|
||||
});
|
||||
|
||||
let results = futures::future::join_all(futs).await;
|
||||
|
||||
for (res, id) in results {
|
||||
if let Err(err) = res {
|
||||
error!(
|
||||
"Couldn't delete file: {} from room: {} due to error: {}.",
|
||||
id,
|
||||
room.get_id(),
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let conn = match pool.get() {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => {
|
||||
return error!(
|
||||
"Couldn't get database connection to prune files due to error: {}.",
|
||||
e
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
// Measure the time it takes to delete all files sequentially
|
||||
// (this might become a problem since we're not using an async interface)
|
||||
let now = std::time::Instant::now();
|
||||
|
||||
// Remove the file records from the database
|
||||
// FIXME: It'd be great to do this in a single statement, but apparently this is not supported very well
|
||||
for id in ids {
|
||||
let stmt = "DELETE FROM files WHERE id = (?1)";
|
||||
match conn.execute(&stmt, params![id]) {
|
||||
Ok(_) => (),
|
||||
Err(e) => {
|
||||
return error!("Couldn't prune file with ID: {} due to error: {}.", id, e)
|
||||
}
|
||||
};
|
||||
}
|
||||
// Log the result
|
||||
info!("Pruned files for room: {}. Took: {:?}", room.get_id(), now.elapsed());
|
||||
}
|
||||
Ok(_) => {
|
||||
// empty
|
||||
}
|
||||
Err(_) => {
|
||||
// It's not catastrophic if we fail to prune the database for a given room
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn prune_files(file_expiration: i64) {
|
||||
// The expiration setting is passed in for testing purposes
|
||||
let rooms = match get_all_room_ids() {
|
||||
Ok(rooms) => rooms,
|
||||
Err(_) => return,
|
||||
};
|
||||
|
||||
let futs = rooms.into_iter().map(|room| async move {
|
||||
if let Ok(pool) = pool_by_room_id(&room) {
|
||||
prune_files_for_room(&pool, &room, file_expiration).await;
|
||||
}
|
||||
});
|
||||
|
||||
futures::future::join_all(futs).await;
|
||||
}
|
||||
|
||||
// Migration
|
||||
|
||||
pub fn perform_migration() {
|
||||
let rooms = match get_all_room_ids() {
|
||||
Ok(ids) => ids,
|
||||
Err(_e) => {
|
||||
return error!("Couldn't get all room IDs.");
|
||||
}
|
||||
};
|
||||
let create_tokens_table_cmd = "CREATE TABLE IF NOT EXISTS tokens (
|
||||
public_key TEXT,
|
||||
timestamp INTEGER,
|
||||
token TEXT PRIMARY KEY
|
||||
)";
|
||||
let migrations =
|
||||
Migrations::new(vec![M::up("DROP TABLE tokens"), M::up(&create_tokens_table_cmd)]);
|
||||
for room in rooms {
|
||||
create_database_if_needed(&room);
|
||||
let pool = pool_by_room_id(&room);
|
||||
let mut conn = pool.unwrap().get().unwrap();
|
||||
migrations.to_latest(&mut conn).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// Utilities
|
||||
|
||||
fn get_all_room_ids() -> Result<Vec<RoomId>, Error> {
|
||||
// Get a database connection
|
||||
let conn = MAIN_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?;
|
||||
// Query the database
|
||||
let raw_query = "SELECT id FROM main";
|
||||
let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?;
|
||||
let rows = match query.query_map(params![], |row| row.get(0)) {
|
||||
let now_secs = now.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs_f64();
|
||||
let mut rows = match st.query(params![now_secs]) {
|
||||
Ok(rows) => rows,
|
||||
Err(e) => {
|
||||
error!("Couldn't query database due to error: {}.", e);
|
||||
return Err(Error::DatabaseFailedInternally);
|
||||
}
|
||||
Err(e) => return error!("Unable to query expired files: {}", e)
|
||||
};
|
||||
let room_ids: Vec<_> = rows
|
||||
.filter_map(|result: Result<String, _>| result.ok())
|
||||
.map(|opt| RoomId::new(&opt))
|
||||
.flatten()
|
||||
.collect();
|
||||
// Return
|
||||
return Ok(room_ids);
|
||||
|
||||
let mut count = 0;
|
||||
while let Ok(Some(row)) = rows.next() {
|
||||
if let Ok(path) = row.get_ref_unwrap(1).as_str() {
|
||||
let p = format!("files/{}", path);
|
||||
if let Err(e) = fs::remove_file(p) {
|
||||
error!("Couldn't delete expired file 'files/{}': {}", path, e);
|
||||
} else {
|
||||
count += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("Pruned {} files", count);
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue