Apply arrived-at future user permissions

This commit is contained in:
Jason Rhinelander 2021-09-14 13:26:05 -03:00
parent ace493f643
commit b5fcac5e48
4 changed files with 76 additions and 25 deletions

View File

@ -66,7 +66,7 @@ pub async fn delete_room(id: String) -> Result<Response, Rejection> {
let conn = storage::DB_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?;
// Insert the room
let stmt = "DELETE FROM rooms WHERE identifier = ?";
if Err(e) = conn.execute(&stmt, params![&id]) {
if let Err(e) = conn.execute(&stmt, params![&id]) {
error!("Couldn't delete room due to error: {}.", e);
return Err(warp::reject::custom(Error::DatabaseFailedInternally));
}

View File

@ -68,7 +68,7 @@ async fn main() {
// Create required folders
fs::create_dir_all("./files").unwrap();
// Set up pruning jobs
let prune_files_future = storage::prune_files_periodically();
let db_maintenance_future = storage::db_maintenance_job();
// Serve routes
let public_routes = routes::root().or(routes::fallback()).or(routes::lsrpc());
let private_routes = routes::create_room()
@ -87,7 +87,7 @@ async fn main() {
let serve_private_routes_future = warp::serve(private_routes).run(localhost);
// Keep futures alive
join!(
prune_files_future,
db_maintenance_future,
serve_public_routes_future,
serve_private_routes_future
);
@ -97,7 +97,7 @@ async fn main() {
let serve_private_routes_future = warp::serve(private_routes).run(localhost);
// Keep futures alive
join!(
prune_files_future,
db_maintenance_future,
serve_public_routes_future,
serve_private_routes_future
);

View File

@ -1,4 +1,8 @@
PRAGMA journal_mode=WAL;
BEGIN;
CREATE TABLE IF NOT EXISTS rooms (
id INTEGER NOT NULL PRIMARY KEY, /* internal database id of the room */
identifier TEXT NOT NULL UNIQUE, /* room identifier used in URLs, etc. */
@ -96,9 +100,9 @@ CREATE TABLE IF NOT EXISTS user_permission_overrides (
room INTEGER NOT NULL REFERENCES rooms(id) ON DELETE CASCADE,
user INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
banned BOOLEAN NOT NULL DEFAULT FALSE, /* If true the user is banned */
read BOOLEAN, /* If false the user may not fetch messages */
write BOOLEAN, /* If false the user may not post */
upload BOOLEAN, /* If false the user may not upload files */
read BOOLEAN, /* If false the user may not fetch messages; null uses room default; true allows reading */
write BOOLEAN, /* If false the user may not post; null uses room default; true allows posting */
upload BOOLEAN, /* If false the user may not upload files; null uses room default; true allows uploading */
moderator BOOLEAN NOT NULL DEFAULT FALSE, /* If true the user may moderate non-moderators */
admin BOOLEAN NOT NULL DEFAULT FALSE, /* If true the user may moderate anyone (including other moderators and admins) */
PRIMARY KEY(room, user),
@ -135,12 +139,15 @@ FROM
-- their user_permissions.write to false, then set a `write = true` entry with a +2d timestamp here.
-- Or to implement a join delay you could set room defaults to false then insert a value here to be
-- applied after a delay.
CREATE TABLE IF NOT EXISTS user_permissions_future (
user INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
CREATE TABLE IF NOT EXISTS user_permission_futures (
room INTEGER NOT NULL REFERENCES rooms(id) ON DELETE CASCADE,
user INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
at FLOAT NOT NULL, /* when the change should take effect (unix epoch) */
read BOOLEAN, /* Set this value @ at */
write BOOLEAN, /* Set this value @ at */
PRIMARY KEY(user, room)
read BOOLEAN, /* Set this value @ at, if non-null */
write BOOLEAN, /* Set this value @ at, if non-null */
upload BOOLEAN, /* Set this value @ at, if non-null */
PRIMARY KEY(room, user)
) WITHOUT ROWID;
CREATE INDEX user_permissions_future_at ON user_permissions_future(at);
COMMIT;

View File

@ -8,6 +8,8 @@ use rusqlite::params;
//use rusqlite_migration::{Migrations, M};
use super::errors::Error;
pub type DatabaseConnection = r2d2::PooledConnection<SqliteConnectionManager>;
pub type DatabaseConnectionPool = r2d2::Pool<SqliteConnectionManager>;
#[derive(PartialEq, Eq, Hash)]
@ -69,7 +71,7 @@ pub fn create_database_if_needed() {
};
if !have_messages {
conn.execute(include_str!("create-schema.sql"), params![]).expect("Couldn't create database schema.");
conn.execute_batch(include_str!("create-schema.sql")).expect("Couldn't create database schema.");
// TODO: migration code from old multi-DB structure goes here
}
@ -77,34 +79,35 @@ pub fn create_database_if_needed() {
// Future DB migration code goes here
}
// Pruning
pub async fn prune_files_periodically() {
let mut timer = tokio::time::interval(chrono::Duration::seconds(15).to_std().unwrap());
// Performs periodic DB maintenance: file pruning, delayed permission applying, etc.
pub async fn db_maintenance_job() {
let mut timer = tokio::time::interval(chrono::Duration::seconds(10).to_std().unwrap());
loop {
timer.tick().await;
tokio::spawn(async {
prune_files(SystemTime::now()).await;
let now = SystemTime::now();
if let Ok(mut conn) = DB_POOL.get() {
prune_files(&mut conn, now);
apply_permission_updates(&mut conn, now);
} else {
warn!("Couldn't get a free db connection to perform database maintenance; will retry soon");
}
});
}
}
/// Removes all files with expiries <= the given time (which should generally by
/// `SystemTime::now()`, except in the test suite).
pub async fn prune_files(now: SystemTime) {
pub fn prune_files(conn: &mut DatabaseConnection, now: SystemTime) {
let conn = match DB_POOL.get() {
Ok(conn) => conn,
Err(e) => return error!("Couldn't prune files: {}.", e)
};
let mut st = match conn.prepare_cached("DELETE FROM files WHERE expiry <= ? RETURNING path") {
Ok(st) => st,
Err(e) => return error!("Unable to prepare statement: {}", e)
Err(e) => { error!("Unable to prepare statement: {}", e); return; }
};
let now_secs = now.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs_f64();
let mut rows = match st.query(params![now_secs]) {
Ok(rows) => rows,
Err(e) => return error!("Unable to query expired files: {}", e)
Err(e) => { error!("Unable to query expired files: {}", e); return; }
};
let mut count = 0;
@ -120,3 +123,44 @@ pub async fn prune_files(now: SystemTime) {
}
info!("Pruned {} files", count);
}
pub fn apply_permission_updates(conn: &mut DatabaseConnection, now: SystemTime) {
let now_secs = now.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs_f64();
let tx = match conn.transaction() {
Ok(tx) => tx,
Err(e) => { error!("Unable to begin transaction: {}", e); return; }
};
{
let mut ins_st = match tx.prepare_cached("
INSERT INTO user_permission_overrides (room, user, read, write, upload)
SELECT room, user, read, write, upload FROM user_permissions_future WHERE at <= ?
ON CONFLICT DO UPDATE SET
read = COALESCE(excluded.read, read),
write = COALESCE(excluded.write, write),
upload = COALESCE(excluded.upload, upload)") {
Ok(st) => st,
Err(e) => { error!("Unable to prepare statement: {}", e); return; }
};
let mut del_st = match tx.prepare_cached("DELETE FROM user_permissions_future WHERE at <= ?") {
Ok(st) => st,
Err(e) => { error!("Unable to prepare statement: {}", e); return; }
};
let num_applied = match ins_st.execute(params![now_secs]) {
Ok(num) => num,
Err(e) => { error!("Unable to apply scheduled future permissions: {}", e); return; }
};
if num_applied > 0 {
info!("Applied {} user permission updates", num_applied);
if let Err(e) = del_st.execute(params![now_secs]) {
error!("Unable to delete applied future permissions: {}", e);
return;
}
}
}
if let Err(e) = tx.commit() {
error!("Failed to commit scheduled user permission updates: {}", e);
return;
}
}