Merge pull request #5 from nielsandriesse/files

File Storage & Retrieval
This commit is contained in:
Niels Andriesse 2021-03-19 10:10:06 +11:00 committed by GitHub
commit 32c6c24eac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 201 additions and 2 deletions

10
Cargo.lock generated
View File

@ -1253,6 +1253,7 @@ dependencies = [
"sha2",
"tokio",
"tokio-test",
"uuid",
"warp",
"x25519-dalek",
]
@ -1632,6 +1633,15 @@ version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7"
[[package]]
name = "uuid"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7"
dependencies = [
"getrandom 0.2.2",
]
[[package]]
name = "vcpkg"
version = "0.2.11"

View File

@ -23,6 +23,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.9"
tokio = { version = "1.3", features = ["full"] }
uuid = { version = "0.8", features = ["v4"] }
warp = { version = "0.3", features = ["tls"] }
x25519-dalek = "1.1"

View File

@ -1,7 +1,11 @@
use std::fs;
use std::io::prelude::*;
use chrono;
use rusqlite::params;
use rand::{thread_rng, Rng};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use warp::{Rejection, http::StatusCode, reply::Reply, reply::Response};
use super::crypto;
@ -15,6 +19,84 @@ enum AuthorizationLevel {
Moderator
}
pub async fn store_file(base64_encoded_bytes: &str, pool: &storage:: DatabaseConnectionPool) -> Result<Response, Rejection> {
// Parse bytes
let bytes = match base64::decode(base64_encoded_bytes) {
Ok(bytes) => bytes,
Err(e) => {
println!("Couldn't parse bytes from invalid base64 encoding due to error: {}.", e);
return Err(warp::reject::custom(Error::ValidationFailed));
}
};
// Generate UUID
let id = Uuid::new_v4();
let mut buffer = Uuid::encode_buffer();
let id = id.to_simple().encode_lower(&mut buffer);
// Update the database
// We do this * before * storing the actual file, so that in case something goes
// wrong we're not left with files that'll never be pruned.
let now = chrono::Utc::now().timestamp();
let mut conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
let tx = conn.transaction().map_err(|_| Error::DatabaseFailedInternally)?;
let stmt = format!("INSERT INTO {} (id, timestamp) VALUES (?1, ?2)", storage::FILES_TABLE);
let _ = match tx.execute(&stmt, params![ &id, now ]) {
Ok(rows) => rows,
Err(e) => {
println!("Couldn't insert file record due to error: {}.", e);
return Err(warp::reject::custom(Error::DatabaseFailedInternally));
}
};
tx.commit().map_err(|_| Error::DatabaseFailedInternally)?;
// Write to file
let mut pos = 0;
let mut buffer = match fs::File::create(&id) {
Ok(buffer) => buffer,
Err(e) => {
println!("Couldn't store file due to error: {}.", e);
return Err(warp::reject::custom(Error::DatabaseFailedInternally));
}
};
while pos < bytes.len() {
let count = match buffer.write(&bytes[pos..]) {
Ok(count) => count,
Err(e) => {
println!("Couldn't store file due to error: {}.", e);
return Err(warp::reject::custom(Error::DatabaseFailedInternally));
}
};
pos += count;
}
// Return
return Ok(warp::reply::json(&id).into_response());
}
pub async fn get_file(id: &str, pool: &storage:: DatabaseConnectionPool) -> Result<Response, Rejection> {
// Check that the ID is a valid UUID
match Uuid::parse_str(id) {
Ok(_) => (),
Err(e) => {
println!("Couldn't parse UUID from: {} due to error: {}.", id, e);
return Err(warp::reject::custom(Error::ValidationFailed));
}
};
// Get a database connection
let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
// Try to read the file
let bytes = match fs::read(format!("files/{}", id)) {
Ok(bytes) => bytes,
Err(e) => {
println!("Couldn't read file due to error: {}.", e);
return Err(warp::reject::custom(Error::ValidationFailed));
}
};
// Base64 encode the result
let base64_encoded_bytes = base64::encode(bytes);
// Return
return Ok(warp::reply::json(&base64_encoded_bytes).into_response());
}
// Authentication
pub async fn get_auth_token_challenge(hex_public_key: &str, pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
// Validate the public key
if !is_valid_public_key(hex_public_key) {
@ -137,6 +219,8 @@ pub async fn delete_auth_token(auth_token: Option<String>, pool: &storage::Datab
return Ok(StatusCode::OK.into_response());
}
// Message sending & receiving
/// Inserts the given `message` into the database if it's valid.
pub async fn insert_message(mut message: models::Message, auth_token: Option<String>, pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
// Validate the message
@ -196,6 +280,8 @@ pub async fn get_messages(options: rpc::QueryOptions, pool: &storage::DatabaseCo
return Ok(warp::reply::json(&messages).into_response());
}
// Message deletion
/// Deletes the message with the given `row_id` from the database, if it's present.
pub async fn delete_message(row_id: i64, auth_token: Option<String>, pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
// Check authorization level
@ -278,6 +364,8 @@ pub async fn get_deleted_messages(options: rpc::QueryOptions, pool: &storage::Da
return Ok(warp::reply::json(&ids).into_response());
}
// Moderation
/// Returns the full list of moderators.
pub async fn get_moderators(pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
let public_keys = get_moderators_vector(pool).await?;
@ -350,6 +438,8 @@ pub async fn get_banned_public_keys(pool: &storage::DatabaseConnectionPool) -> R
return Ok(warp::reply::json(&public_keys).into_response());
}
// General
pub async fn get_member_count(pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
// Get a database connection
let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;

View File

@ -17,19 +17,28 @@ mod tests;
#[tokio::main]
async fn main() {
// Print the server public key
let public_key = hex::encode(crypto::PUBLIC_KEY.as_bytes());
println!("The public key of this server is: {}", public_key);
// Create the main database
storage::create_main_database_if_needed();
// Create required folders
fs::create_dir_all("./rooms").unwrap();
fs::create_dir_all("./files").unwrap();
// Create the main room
let main_room = "main";
storage::create_database_if_needed(main_room);
// Set up pruning jobs
let prune_pending_tokens_future = storage::prune_pending_tokens_periodically();
let prune_tokens_future = storage::prune_tokens_periodically();
let prune_files_future = storage::prune_files_periodically();
// Serve routes
let routes = routes::root().or(routes::lsrpc());
let serve_routes_future = warp::serve(routes)
.tls()
.cert_path("tls_certificate.pem")
.key_path("tls_private_key.pem")
.run(([0, 0, 0, 0], 443));
// Keep futures alive
join!(prune_pending_tokens_future, prune_tokens_future, serve_routes_future);
}

View File

@ -63,6 +63,15 @@ async fn handle_get_request(rpc_call: RpcCall, uri: http::Uri, pool: &storage::D
};
}
// Switch on the path
if uri.path().starts_with("/files") {
let components: Vec<&str> = uri.path()[1..].split("/").collect(); // Drop the leading slash and split on subsequent slashes
if components.len() != 2 {
println!("Invalid endpoint: {}.", rpc_call.endpoint);
return Err(warp::reject::custom(Error::InvalidRpcCall));
}
let file_id = components[1];
return handlers::get_file(file_id, pool).await;
}
match uri.path() {
"/messages" => return handlers::get_messages(query_options, pool).await,
"/deleted_messages" => return handlers::get_deleted_messages(query_options, pool).await,
@ -115,7 +124,11 @@ async fn handle_post_request(rpc_call: RpcCall, uri: http::Uri, auth_token: Opti
"/claim_auth_token" => {
let public_key = rpc_call.body;
return handlers::claim_auth_token(&public_key, auth_token, pool).await;
}
},
"/files" => {
let base64_encoded_bytes = rpc_call.body;
return handlers::store_file(&base64_encoded_bytes, pool).await;
},
_ => {
println!("Ignoring RPC call with invalid or unused endpoint: {}.", rpc_call.endpoint);
return Err(warp::reject::custom(Error::InvalidRpcCall));
@ -151,7 +164,7 @@ async fn handle_delete_request(rpc_call: RpcCall, uri: http::Uri, auth_token: Op
return handlers::unban(&public_key, auth_token, pool).await;
}
// DELETE /auth_token
if uri.path().starts_with("/auth_token") {
if uri.path() == "/auth_token" {
return handlers::delete_auth_token(auth_token, pool).await;
}
// Unrecognized endpoint

View File

@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::fs;
use std::sync::Mutex;
use rusqlite::params;
@ -41,6 +42,7 @@ fn create_main_tables_if_needed(conn: &DatabaseConnection) {
pub const PENDING_TOKEN_EXPIRATION: i64 = 10 * 60;
pub const TOKEN_EXPIRATION: i64 = 7 * 24 * 60 * 60;
pub const FILE_EXPIRATION: i64 = 60 * 24 * 60 * 60;
pub const MESSAGES_TABLE: &str = "messages";
pub const DELETED_MESSAGES_TABLE: &str = "deleted_messages";
@ -48,6 +50,7 @@ pub const MODERATORS_TABLE: &str = "moderators";
pub const BLOCK_LIST_TABLE: &str = "block_list";
pub const PENDING_TOKENS_TABLE: &str = "pending_tokens";
pub const TOKENS_TABLE: &str = "tokens";
pub const FILES_TABLE: &str = "files";
lazy_static::lazy_static! {
@ -143,6 +146,13 @@ fn create_room_tables_if_needed(conn: &DatabaseConnection) {
token TEXT
)", TOKENS_TABLE);
conn.execute(&tokens_table_cmd, params![]).expect("Couldn't create tokens table.");
// Files
let files_table_cmd = format!(
"CREATE TABLE IF NOT EXISTS {} (
id STRING PRIMARY KEY,
timestamp INTEGER
)", FILES_TABLE);
conn.execute(&files_table_cmd, params![]).expect("Couldn't create files table.");
}
// Pruning
@ -163,6 +173,14 @@ pub async fn prune_pending_tokens_periodically() {
}
}
pub async fn prune_files_periodically() {
let mut timer = tokio::time::interval(chrono::Duration::days(1).to_std().unwrap());
loop {
timer.tick().await;
tokio::spawn(async { prune_files().await; });
}
}
async fn prune_tokens() {
let rooms = match get_all_rooms().await {
Ok(rooms) => rooms,
@ -225,6 +243,64 @@ async fn prune_pending_tokens() {
println!("Pruned pending tokens.");
}
async fn prune_files() {
let rooms = match get_all_rooms().await {
Ok(rooms) => rooms,
Err(_) => return
};
for room in rooms {
// It's not catastrophic if we fail to prune the database for a given room
let pool = pool_by_room_name(&room);
let now = chrono::Utc::now().timestamp();
let expiration = now - FILE_EXPIRATION;
// Get a database connection and open a transaction
let mut conn = match pool.get() {
Ok(conn) => conn,
Err(e) => return println!("Couldn't prune files due to error: {}.", e)
};
let tx = match conn.transaction() {
Ok(tx) => tx,
Err(e) => return println!("Couldn't prune files due to error: {}.", e)
};
// Get the IDs of the files to delete
let ids: Vec<String> = {
let raw_query = format!("SELECT id FROM {} WHERE timestamp < (?1)", FILES_TABLE);
let mut query = match tx.prepare(&raw_query) {
Ok(query) => query,
Err(e) => return println!("Couldn't prune files due to error: {}.", e)
};
let rows = match query.query_map(params![ expiration ], |row| {
Ok(row.get(0)?)
}) {
Ok(rows) => rows,
Err(e) => {
return println!("Couldn't prune files due to error: {}.", e);
}
};
rows.filter_map(|result| result.ok()).collect()
};
// Delete the files
let mut deleted_ids: Vec<String> = vec![];
for id in ids {
match fs::remove_file(format!("files/{}", id)) {
Ok(_) => deleted_ids.push(id),
Err(e) => println!("Couldn't delete file due to error: {}.", e)
}
}
// Remove the file records from the database (only for the files that were actually deleted)
let stmt = format!("DELETE FROM {} WHERE id IN (?1)", FILES_TABLE);
match tx.execute(&stmt, params![ deleted_ids ]) {
Ok(_) => (),
Err(e) => return println!("Couldn't prune files due to error: {}.", e)
};
match tx.commit() {
Ok(_) => (),
Err(e) => return println!("Couldn't prune files due to error: {}.", e)
};
}
println!("Pruned files.");
}
async fn get_all_rooms() -> Result<Vec<String>, Error> {
// Get a database connection
let conn = MAIN_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?;