From 8df05a912991bfcaeab9cdcf971ec00f5b4e9213 Mon Sep 17 00:00:00 2001 From: Niels Andriesse Date: Fri, 19 Mar 2021 10:09:13 +1100 Subject: [PATCH] Implement file storage & retrieval --- Cargo.lock | 10 ++++++ Cargo.toml | 1 + src/handlers.rs | 90 +++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 9 +++++ src/rpc.rs | 17 ++++++++-- src/storage.rs | 76 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 201 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b4b8997..79cf907 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1253,6 +1253,7 @@ dependencies = [ "sha2", "tokio", "tokio-test", + "uuid", "warp", "x25519-dalek", ] @@ -1632,6 +1633,15 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7" +[[package]] +name = "uuid" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" +dependencies = [ + "getrandom 0.2.2", +] + [[package]] name = "vcpkg" version = "0.2.11" diff --git a/Cargo.toml b/Cargo.toml index 8f5be24..a87f21e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" sha2 = "0.9" tokio = { version = "1.3", features = ["full"] } +uuid = { version = "0.8", features = ["v4"] } warp = { version = "0.3", features = ["tls"] } x25519-dalek = "1.1" diff --git a/src/handlers.rs b/src/handlers.rs index 5f300f6..a46e200 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -1,7 +1,11 @@ +use std::fs; +use std::io::prelude::*; + use chrono; use rusqlite::params; use rand::{thread_rng, Rng}; use serde::{Deserialize, Serialize}; +use uuid::Uuid; use warp::{Rejection, http::StatusCode, reply::Reply, reply::Response}; use super::crypto; @@ -15,6 +19,84 @@ enum AuthorizationLevel { Moderator } +pub async fn store_file(base64_encoded_bytes: &str, pool: &storage:: DatabaseConnectionPool) -> Result { + // Parse bytes + let bytes = match base64::decode(base64_encoded_bytes) { + Ok(bytes) => bytes, + Err(e) => { + println!("Couldn't parse bytes from invalid base64 encoding due to error: {}.", e); + return Err(warp::reject::custom(Error::ValidationFailed)); + } + }; + // Generate UUID + let id = Uuid::new_v4(); + let mut buffer = Uuid::encode_buffer(); + let id = id.to_simple().encode_lower(&mut buffer); + // Update the database + // We do this * before * storing the actual file, so that in case something goes + // wrong we're not left with files that'll never be pruned. + let now = chrono::Utc::now().timestamp(); + let mut conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; + let tx = conn.transaction().map_err(|_| Error::DatabaseFailedInternally)?; + let stmt = format!("INSERT INTO {} (id, timestamp) VALUES (?1, ?2)", storage::FILES_TABLE); + let _ = match tx.execute(&stmt, params![ &id, now ]) { + Ok(rows) => rows, + Err(e) => { + println!("Couldn't insert file record due to error: {}.", e); + return Err(warp::reject::custom(Error::DatabaseFailedInternally)); + } + }; + tx.commit().map_err(|_| Error::DatabaseFailedInternally)?; + // Write to file + let mut pos = 0; + let mut buffer = match fs::File::create(&id) { + Ok(buffer) => buffer, + Err(e) => { + println!("Couldn't store file due to error: {}.", e); + return Err(warp::reject::custom(Error::DatabaseFailedInternally)); + } + }; + while pos < bytes.len() { + let count = match buffer.write(&bytes[pos..]) { + Ok(count) => count, + Err(e) => { + println!("Couldn't store file due to error: {}.", e); + return Err(warp::reject::custom(Error::DatabaseFailedInternally)); + } + }; + pos += count; + } + // Return + return Ok(warp::reply::json(&id).into_response()); +} + +pub async fn get_file(id: &str, pool: &storage:: DatabaseConnectionPool) -> Result { + // Check that the ID is a valid UUID + match Uuid::parse_str(id) { + Ok(_) => (), + Err(e) => { + println!("Couldn't parse UUID from: {} due to error: {}.", id, e); + return Err(warp::reject::custom(Error::ValidationFailed)); + } + }; + // Get a database connection + let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; + // Try to read the file + let bytes = match fs::read(format!("files/{}", id)) { + Ok(bytes) => bytes, + Err(e) => { + println!("Couldn't read file due to error: {}.", e); + return Err(warp::reject::custom(Error::ValidationFailed)); + } + }; + // Base64 encode the result + let base64_encoded_bytes = base64::encode(bytes); + // Return + return Ok(warp::reply::json(&base64_encoded_bytes).into_response()); +} + +// Authentication + pub async fn get_auth_token_challenge(hex_public_key: &str, pool: &storage::DatabaseConnectionPool) -> Result { // Validate the public key if !is_valid_public_key(hex_public_key) { @@ -137,6 +219,8 @@ pub async fn delete_auth_token(auth_token: Option, pool: &storage::Datab return Ok(StatusCode::OK.into_response()); } +// Message sending & receiving + /// Inserts the given `message` into the database if it's valid. pub async fn insert_message(mut message: models::Message, auth_token: Option, pool: &storage::DatabaseConnectionPool) -> Result { // Validate the message @@ -196,6 +280,8 @@ pub async fn get_messages(options: rpc::QueryOptions, pool: &storage::DatabaseCo return Ok(warp::reply::json(&messages).into_response()); } +// Message deletion + /// Deletes the message with the given `row_id` from the database, if it's present. pub async fn delete_message(row_id: i64, auth_token: Option, pool: &storage::DatabaseConnectionPool) -> Result { // Check authorization level @@ -278,6 +364,8 @@ pub async fn get_deleted_messages(options: rpc::QueryOptions, pool: &storage::Da return Ok(warp::reply::json(&ids).into_response()); } +// Moderation + /// Returns the full list of moderators. pub async fn get_moderators(pool: &storage::DatabaseConnectionPool) -> Result { let public_keys = get_moderators_vector(pool).await?; @@ -350,6 +438,8 @@ pub async fn get_banned_public_keys(pool: &storage::DatabaseConnectionPool) -> R return Ok(warp::reply::json(&public_keys).into_response()); } +// General + pub async fn get_member_count(pool: &storage::DatabaseConnectionPool) -> Result { // Get a database connection let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; diff --git a/src/main.rs b/src/main.rs index e456768..503b483 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,19 +17,28 @@ mod tests; #[tokio::main] async fn main() { + // Print the server public key let public_key = hex::encode(crypto::PUBLIC_KEY.as_bytes()); println!("The public key of this server is: {}", public_key); + // Create the main database storage::create_main_database_if_needed(); + // Create required folders fs::create_dir_all("./rooms").unwrap(); + fs::create_dir_all("./files").unwrap(); + // Create the main room let main_room = "main"; storage::create_database_if_needed(main_room); + // Set up pruning jobs let prune_pending_tokens_future = storage::prune_pending_tokens_periodically(); let prune_tokens_future = storage::prune_tokens_periodically(); + let prune_files_future = storage::prune_files_periodically(); + // Serve routes let routes = routes::root().or(routes::lsrpc()); let serve_routes_future = warp::serve(routes) .tls() .cert_path("tls_certificate.pem") .key_path("tls_private_key.pem") .run(([0, 0, 0, 0], 443)); + // Keep futures alive join!(prune_pending_tokens_future, prune_tokens_future, serve_routes_future); } diff --git a/src/rpc.rs b/src/rpc.rs index 12e37ae..9ad79f2 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -63,6 +63,15 @@ async fn handle_get_request(rpc_call: RpcCall, uri: http::Uri, pool: &storage::D }; } // Switch on the path + if uri.path().starts_with("/files") { + let components: Vec<&str> = uri.path()[1..].split("/").collect(); // Drop the leading slash and split on subsequent slashes + if components.len() != 2 { + println!("Invalid endpoint: {}.", rpc_call.endpoint); + return Err(warp::reject::custom(Error::InvalidRpcCall)); + } + let file_id = components[1]; + return handlers::get_file(file_id, pool).await; + } match uri.path() { "/messages" => return handlers::get_messages(query_options, pool).await, "/deleted_messages" => return handlers::get_deleted_messages(query_options, pool).await, @@ -115,7 +124,11 @@ async fn handle_post_request(rpc_call: RpcCall, uri: http::Uri, auth_token: Opti "/claim_auth_token" => { let public_key = rpc_call.body; return handlers::claim_auth_token(&public_key, auth_token, pool).await; - } + }, + "/files" => { + let base64_encoded_bytes = rpc_call.body; + return handlers::store_file(&base64_encoded_bytes, pool).await; + }, _ => { println!("Ignoring RPC call with invalid or unused endpoint: {}.", rpc_call.endpoint); return Err(warp::reject::custom(Error::InvalidRpcCall)); @@ -151,7 +164,7 @@ async fn handle_delete_request(rpc_call: RpcCall, uri: http::Uri, auth_token: Op return handlers::unban(&public_key, auth_token, pool).await; } // DELETE /auth_token - if uri.path().starts_with("/auth_token") { + if uri.path() == "/auth_token" { return handlers::delete_auth_token(auth_token, pool).await; } // Unrecognized endpoint diff --git a/src/storage.rs b/src/storage.rs index b61bfc7..552e6d5 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::fs; use std::sync::Mutex; use rusqlite::params; @@ -41,6 +42,7 @@ fn create_main_tables_if_needed(conn: &DatabaseConnection) { pub const PENDING_TOKEN_EXPIRATION: i64 = 10 * 60; pub const TOKEN_EXPIRATION: i64 = 7 * 24 * 60 * 60; +pub const FILE_EXPIRATION: i64 = 60 * 24 * 60 * 60; pub const MESSAGES_TABLE: &str = "messages"; pub const DELETED_MESSAGES_TABLE: &str = "deleted_messages"; @@ -48,6 +50,7 @@ pub const MODERATORS_TABLE: &str = "moderators"; pub const BLOCK_LIST_TABLE: &str = "block_list"; pub const PENDING_TOKENS_TABLE: &str = "pending_tokens"; pub const TOKENS_TABLE: &str = "tokens"; +pub const FILES_TABLE: &str = "files"; lazy_static::lazy_static! { @@ -143,6 +146,13 @@ fn create_room_tables_if_needed(conn: &DatabaseConnection) { token TEXT )", TOKENS_TABLE); conn.execute(&tokens_table_cmd, params![]).expect("Couldn't create tokens table."); + // Files + let files_table_cmd = format!( + "CREATE TABLE IF NOT EXISTS {} ( + id STRING PRIMARY KEY, + timestamp INTEGER + )", FILES_TABLE); + conn.execute(&files_table_cmd, params![]).expect("Couldn't create files table."); } // Pruning @@ -163,6 +173,14 @@ pub async fn prune_pending_tokens_periodically() { } } +pub async fn prune_files_periodically() { + let mut timer = tokio::time::interval(chrono::Duration::days(1).to_std().unwrap()); + loop { + timer.tick().await; + tokio::spawn(async { prune_files().await; }); + } +} + async fn prune_tokens() { let rooms = match get_all_rooms().await { Ok(rooms) => rooms, @@ -225,6 +243,64 @@ async fn prune_pending_tokens() { println!("Pruned pending tokens."); } +async fn prune_files() { + let rooms = match get_all_rooms().await { + Ok(rooms) => rooms, + Err(_) => return + }; + for room in rooms { + // It's not catastrophic if we fail to prune the database for a given room + let pool = pool_by_room_name(&room); + let now = chrono::Utc::now().timestamp(); + let expiration = now - FILE_EXPIRATION; + // Get a database connection and open a transaction + let mut conn = match pool.get() { + Ok(conn) => conn, + Err(e) => return println!("Couldn't prune files due to error: {}.", e) + }; + let tx = match conn.transaction() { + Ok(tx) => tx, + Err(e) => return println!("Couldn't prune files due to error: {}.", e) + }; + // Get the IDs of the files to delete + let ids: Vec = { + let raw_query = format!("SELECT id FROM {} WHERE timestamp < (?1)", FILES_TABLE); + let mut query = match tx.prepare(&raw_query) { + Ok(query) => query, + Err(e) => return println!("Couldn't prune files due to error: {}.", e) + }; + let rows = match query.query_map(params![ expiration ], |row| { + Ok(row.get(0)?) + }) { + Ok(rows) => rows, + Err(e) => { + return println!("Couldn't prune files due to error: {}.", e); + } + }; + rows.filter_map(|result| result.ok()).collect() + }; + // Delete the files + let mut deleted_ids: Vec = vec![]; + for id in ids { + match fs::remove_file(format!("files/{}", id)) { + Ok(_) => deleted_ids.push(id), + Err(e) => println!("Couldn't delete file due to error: {}.", e) + } + } + // Remove the file records from the database (only for the files that were actually deleted) + let stmt = format!("DELETE FROM {} WHERE id IN (?1)", FILES_TABLE); + match tx.execute(&stmt, params![ deleted_ids ]) { + Ok(_) => (), + Err(e) => return println!("Couldn't prune files due to error: {}.", e) + }; + match tx.commit() { + Ok(_) => (), + Err(e) => return println!("Couldn't prune files due to error: {}.", e) + }; + } + println!("Pruned files."); +} + async fn get_all_rooms() -> Result, Error> { // Get a database connection let conn = MAIN_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?;