mirror of
https://github.com/oxen-io/session-open-group-server.git
synced 2023-12-13 20:30:35 +01:00
Make file deletion async; use in-memory database in tests
This commit is contained in:
parent
427b8241c3
commit
fa7c375595
104
src/storage.rs
104
src/storage.rs
|
@ -1,5 +1,4 @@
|
|||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::sync::Mutex;
|
||||
|
||||
|
@ -84,7 +83,7 @@ pub fn create_database_if_needed(room_id: &str) {
|
|||
create_room_tables_if_needed(&conn);
|
||||
}
|
||||
|
||||
fn create_room_tables_if_needed(conn: &DatabaseConnection) {
|
||||
pub fn create_room_tables_if_needed(conn: &DatabaseConnection) {
|
||||
// Messages
|
||||
// The `id` field is needed to make `rowid` stable, which is important because otherwise
|
||||
// the `id`s in this table won't correspond to those in the deleted messages table
|
||||
|
@ -248,48 +247,39 @@ async fn prune_pending_tokens() {
|
|||
info!("Pruned pending tokens.");
|
||||
}
|
||||
|
||||
pub async fn prune_files(file_expiration: i64) {
|
||||
// The expiration setting is passed in for testing purposes
|
||||
let rooms = match get_all_room_ids() {
|
||||
Ok(rooms) => rooms,
|
||||
Err(_) => return,
|
||||
};
|
||||
for room in rooms {
|
||||
// It's not catastrophic if we fail to prune the database for a given room
|
||||
let pool = pool_by_room_id(&room);
|
||||
let now = chrono::Utc::now().timestamp();
|
||||
let expiration = now - file_expiration;
|
||||
// Get a database connection and open a transaction
|
||||
let conn = match pool.get() {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => {
|
||||
return error!(
|
||||
"Couldn't get database connection to prune files due to error: {}.",
|
||||
e
|
||||
)
|
||||
}
|
||||
};
|
||||
// Get the IDs of the files to delete
|
||||
let raw_query = format!("SELECT id FROM {} WHERE timestamp < (?1)", FILES_TABLE);
|
||||
let mut query = match conn.prepare(&raw_query) {
|
||||
Ok(query) => query,
|
||||
Err(e) => return error!("Couldn't prepare query to prune files due to error: {}.", e),
|
||||
};
|
||||
let rows = match query.query_map(params![expiration], |row| row.get(0)) {
|
||||
Ok(rows) => rows,
|
||||
Err(e) => {
|
||||
return error!(
|
||||
"Couldn't prune files due to error: {} (expiration = {}).",
|
||||
e, expiration
|
||||
);
|
||||
}
|
||||
};
|
||||
let ids: Vec<String> = rows.filter_map(|result| result.ok()).collect();
|
||||
if !ids.is_empty() {
|
||||
fn get_expired_file_ids(
|
||||
pool: &DatabaseConnectionPool, file_expiration: i64,
|
||||
) -> Result<Vec<String>, ()> {
|
||||
let now = chrono::Utc::now().timestamp();
|
||||
let expiration = now - file_expiration;
|
||||
// Get a database connection and open a transaction
|
||||
let conn = pool.get().map_err(|e| {
|
||||
error!("Couldn't get database connection to prune files due to error: {}.", e);
|
||||
})?;
|
||||
// Get the IDs of the files to delete
|
||||
let raw_query = format!("SELECT id FROM {} WHERE timestamp < (?1)", FILES_TABLE);
|
||||
|
||||
let mut query = conn.prepare(&raw_query).map_err(|e| {
|
||||
error!("Couldn't prepare query to prune files due to error: {}.", e);
|
||||
})?;
|
||||
|
||||
let rows = query.query_map(params![expiration], |row| row.get(0)).map_err(|e| {
|
||||
error!("Couldn't prune files due to error: {} (expiration = {}).", e, expiration);
|
||||
})?;
|
||||
|
||||
Ok(rows.filter_map(|result| result.ok()).collect())
|
||||
}
|
||||
|
||||
pub async fn prune_files_for_room(pool: &DatabaseConnectionPool, room: &str, file_expiration: i64) {
|
||||
let ids = get_expired_file_ids(&pool, file_expiration);
|
||||
|
||||
match ids {
|
||||
Ok(ids) if !ids.is_empty() => {
|
||||
// Delete the files
|
||||
let mut deleted_ids: Vec<String> = vec![];
|
||||
|
||||
for id in ids {
|
||||
match fs::remove_file(format!("files/{}_files/{}", room, id)) {
|
||||
match tokio::fs::remove_file(format!("files/{}_files/{}", room, id)).await {
|
||||
Ok(_) => deleted_ids.push(id),
|
||||
Err(e) => {
|
||||
error!(
|
||||
|
@ -300,6 +290,17 @@ pub async fn prune_files(file_expiration: i64) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
let conn = match pool.get() {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => {
|
||||
return error!(
|
||||
"Couldn't get database connection to prune files due to error: {}.",
|
||||
e
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
// Remove the file records from the database
|
||||
// FIXME: It'd be great to do this in a single statement, but apparently this is not supported very well
|
||||
for id in deleted_ids {
|
||||
|
@ -314,9 +315,30 @@ pub async fn prune_files(file_expiration: i64) {
|
|||
// Log the result
|
||||
info!("Pruned files for room: {}.", room);
|
||||
}
|
||||
Ok(_) => {
|
||||
// empty
|
||||
}
|
||||
Err(_) => {
|
||||
// It's not catastrophic if we fail to prune the database for a given room
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn prune_files(file_expiration: i64) {
|
||||
// The expiration setting is passed in for testing purposes
|
||||
let rooms = match get_all_room_ids() {
|
||||
Ok(rooms) => rooms,
|
||||
Err(_) => return,
|
||||
};
|
||||
|
||||
let futs = rooms.into_iter().map(|room| async move {
|
||||
let pool = pool_by_room_id(&room);
|
||||
prune_files_for_room(&pool, &room, file_expiration).await;
|
||||
});
|
||||
|
||||
futures::future::join_all(futs).await;
|
||||
}
|
||||
|
||||
// Migration
|
||||
|
||||
pub fn perform_migration() {
|
||||
|
|
55
src/tests.rs
55
src/tests.rs
|
@ -1,37 +1,34 @@
|
|||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
|
||||
use rand::{thread_rng, Rng};
|
||||
use rusqlite::params;
|
||||
use rusqlite::OpenFlags;
|
||||
use warp::http::StatusCode;
|
||||
|
||||
use crate::storage::DatabaseConnectionPool;
|
||||
|
||||
use super::crypto;
|
||||
use super::handlers;
|
||||
use super::models;
|
||||
use super::storage;
|
||||
|
||||
fn perform_main_setup() {
|
||||
storage::create_main_database_if_needed();
|
||||
fs::create_dir_all("rooms").unwrap();
|
||||
fs::create_dir_all("files").unwrap();
|
||||
async fn set_up_test_room() -> DatabaseConnectionPool {
|
||||
let manager = r2d2_sqlite::SqliteConnectionManager::file("file::memory:?cache=shared");
|
||||
let mut flags = OpenFlags::default();
|
||||
flags.set(OpenFlags::SQLITE_OPEN_URI, true);
|
||||
|
||||
let manager = manager.with_flags(flags);
|
||||
|
||||
let pool = r2d2::Pool::<r2d2_sqlite::SqliteConnectionManager>::new(manager).unwrap();
|
||||
|
||||
let conn = pool.get().unwrap();
|
||||
|
||||
storage::create_room_tables_if_needed(&conn);
|
||||
|
||||
pool
|
||||
}
|
||||
|
||||
async fn set_up_test_room() {
|
||||
perform_main_setup();
|
||||
let test_room_id = "test_room";
|
||||
let test_room_name = "Test Room";
|
||||
let test_room = models::Room { id: test_room_id.to_string(), name: test_room_name.to_string() };
|
||||
handlers::create_room(test_room).await.unwrap();
|
||||
let raw_path = format!("rooms/{}.db", test_room_id);
|
||||
let path = Path::new(&raw_path);
|
||||
fs::read(path).unwrap(); // Fail if this doesn't exist
|
||||
}
|
||||
|
||||
fn get_auth_token() -> (String, String) {
|
||||
// Get a database connection pool
|
||||
let test_room_id = "test_room";
|
||||
let pool = storage::pool_by_room_id(&test_room_id);
|
||||
fn get_auth_token(pool: &DatabaseConnectionPool) -> (String, String) {
|
||||
// Generate a fake user key pair
|
||||
let (user_private_key, user_public_key) = crypto::generate_x25519_key_pair();
|
||||
let hex_user_public_key = format!("05{}", hex::encode(user_public_key.to_bytes()));
|
||||
|
@ -57,12 +54,12 @@ fn get_auth_token() -> (String, String) {
|
|||
#[tokio::test]
|
||||
async fn test_authorization() {
|
||||
// Ensure the test room is set up and get a database connection pool
|
||||
set_up_test_room().await;
|
||||
let test_room_id = "test_room";
|
||||
let pool = storage::pool_by_room_id(&test_room_id);
|
||||
|
||||
let pool = set_up_test_room().await;
|
||||
|
||||
// Get an auth token
|
||||
// This tests claiming a token internally
|
||||
let (_, hex_user_public_key) = get_auth_token();
|
||||
let (_, hex_user_public_key) = get_auth_token(&pool);
|
||||
// Try to claim an incorrect token
|
||||
let mut incorrect_token = [0u8; 48];
|
||||
thread_rng().fill(&mut incorrect_token[..]);
|
||||
|
@ -76,11 +73,11 @@ async fn test_authorization() {
|
|||
#[tokio::test]
|
||||
async fn test_file_handling() {
|
||||
// Ensure the test room is set up and get a database connection pool
|
||||
set_up_test_room().await;
|
||||
let pool = set_up_test_room().await;
|
||||
|
||||
let test_room_id = "test_room";
|
||||
let pool = storage::pool_by_room_id(&test_room_id);
|
||||
// Get an auth token
|
||||
let (auth_token, _) = get_auth_token();
|
||||
let (auth_token, _) = get_auth_token(&pool);
|
||||
// Store the test file
|
||||
handlers::store_file(
|
||||
Some(test_room_id.to_string()),
|
||||
|
@ -105,7 +102,7 @@ async fn test_file_handling() {
|
|||
assert_eq!(base64_encoded_file, TEST_FILE);
|
||||
// Prune the file and check that it's gone
|
||||
// Will evaluate to now + 60
|
||||
storage::prune_files(-60).await;
|
||||
storage::prune_files_for_room(&pool, test_room_id, -60).await;
|
||||
// It should be gone now
|
||||
fs::read(format!("files/{}_files/{}", test_room_id, id)).unwrap_err();
|
||||
// Check that the file record is also gone
|
||||
|
|
Loading…
Reference in a new issue