From edca4b9e951be904ab786f6debd14b5400841635 Mon Sep 17 00:00:00 2001 From: Niels Andriesse Date: Thu, 25 Mar 2021 10:56:16 +1100 Subject: [PATCH] Add .rustfmt.toml --- .rustfmt.toml | 15 ++ src/crypto.rs | 29 ++-- src/errors.rs | 14 +- src/handlers.rs | 391 +++++++++++++++++++++++++++--------------- src/main.rs | 20 ++- src/models.rs | 5 +- src/onion_requests.rs | 50 +++--- src/routes.rs | 12 +- src/rpc.rs | 69 +++++--- src/storage.rs | 89 ++++++---- src/tests.rs | 14 +- 11 files changed, 457 insertions(+), 251 deletions(-) create mode 100644 .rustfmt.toml diff --git a/.rustfmt.toml b/.rustfmt.toml new file mode 100644 index 0000000..11d86d5 --- /dev/null +++ b/.rustfmt.toml @@ -0,0 +1,15 @@ +edition = "2018" +unstable_features = true + +blank_lines_upper_bound = 3 +brace_style = "PreferSameLine" +combine_control_expr = true +fn_args_layout = "Compressed" +fn_single_line = true +imports_indent = "Visual" +overflow_delimited_expr = true +group_imports = "StdExternalCrate" +trailing_comma = "Never" +use_field_init_shorthand = true +use_small_heuristics = "Max" +where_single_line = true diff --git a/src/crypto.rs b/src/crypto.rs index dfb16c9..b63c92e 100644 --- a/src/crypto.rs +++ b/src/crypto.rs @@ -1,7 +1,7 @@ use std::convert::TryInto; +use aes_gcm::aead::{generic_array::GenericArray, Aead, NewAead}; use aes_gcm::Aes256Gcm; -use aes_gcm::aead::{Aead, NewAead, generic_array::GenericArray}; use hmac::{Hmac, Mac, NewMac}; use rand::{thread_rng, Rng}; use rand_core::OsRng; @@ -11,7 +11,7 @@ use super::errors::Error; type HmacSha256 = Hmac; -// By default the aes-gcm crate will use software implementations of both AES and the POLYVAL universal hash function. When +// By default the aes-gcm crate will use software implementations of both AES and the POLYVAL universal hash function. When // targeting modern x86/x86_64 CPUs, use the following RUSTFLAGS to take advantage of high performance AES-NI and CLMUL CPU // intrinsics: // @@ -32,10 +32,15 @@ lazy_static::lazy_static! { }; } -pub async fn get_x25519_symmetric_key(public_key: &[u8], private_key: &x25519_dalek::StaticSecret) -> Result, warp::reject::Rejection> { +pub async fn get_x25519_symmetric_key( + public_key: &[u8], private_key: &x25519_dalek::StaticSecret +) -> Result, warp::reject::Rejection> { if public_key.len() != 32 { - println!("Couldn't create symmetric key using public key of invalid length: {}.", hex::encode(public_key)); - return Err(warp::reject::custom(Error::DecryptionFailed)); + println!( + "Couldn't create symmetric key using public key of invalid length: {}.", + hex::encode(public_key) + ); + return Err(warp::reject::custom(Error::DecryptionFailed)); } let public_key: [u8; 32] = public_key.try_into().unwrap(); // Safe because we know it has a length of 32 at this point let dalek_public_key = x25519_dalek::PublicKey::from(public_key); @@ -45,7 +50,9 @@ pub async fn get_x25519_symmetric_key(public_key: &[u8], private_key: &x25519_da return Ok(mac.finalize().into_bytes().to_vec()); } -pub async fn encrypt_aes_gcm(plaintext: &[u8], symmetric_key: &[u8]) -> Result, warp::reject::Rejection> { +pub async fn encrypt_aes_gcm( + plaintext: &[u8], symmetric_key: &[u8] +) -> Result, warp::reject::Rejection> { let mut iv = [0u8; IV_SIZE]; thread_rng().fill(&mut iv[..]); let cipher = Aes256Gcm::new(&GenericArray::from_slice(symmetric_key)); @@ -54,7 +61,7 @@ pub async fn encrypt_aes_gcm(plaintext: &[u8], symmetric_key: &[u8]) -> Result { println!("Couldn't encrypt ciphertext due to error: {}.", e); return Err(warp::reject::custom(Error::DecryptionFailed)); @@ -62,10 +69,12 @@ pub async fn encrypt_aes_gcm(plaintext: &[u8], symmetric_key: &[u8]) -> Result Result, warp::reject::Rejection> { - if iv_and_ciphertext.len() < IV_SIZE { +pub async fn decrypt_aes_gcm( + iv_and_ciphertext: &[u8], symmetric_key: &[u8] +) -> Result, warp::reject::Rejection> { + if iv_and_ciphertext.len() < IV_SIZE { println!("Ignoring ciphertext of invalid size: {}.", iv_and_ciphertext.len()); - return Err(warp::reject::custom(Error::DecryptionFailed)); + return Err(warp::reject::custom(Error::DecryptionFailed)); } let iv: [u8; IV_SIZE] = iv_and_ciphertext[0..IV_SIZE].try_into().unwrap(); // Safe because we know iv_and_ciphertext has a length of at least IV_SIZE bytes let ciphertext: Vec = iv_and_ciphertext[IV_SIZE..].try_into().unwrap(); // Safe because we know iv_and_ciphertext has a length of at least IV_SIZE bytes diff --git a/src/errors.rs b/src/errors.rs index b4ee622..a1929ee 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,4 +1,4 @@ -use warp::{http::StatusCode, Rejection, reply::Reply, reply::Response}; +use warp::{http::StatusCode, reply::Reply, reply::Response, Rejection}; #[derive(Debug)] pub enum Error { @@ -10,15 +10,19 @@ pub enum Error { Unauthorized, ValidationFailed } -impl warp::reject::Reject for Error { } +impl warp::reject::Reject for Error {} pub fn into_response(e: Rejection) -> Result { if let Some(error) = e.find::() { match error { - Error::DecryptionFailed | Error::InvalidOnionRequest | Error::InvalidRpcCall - | Error::ValidationFailed => return Ok(StatusCode::BAD_REQUEST.into_response()), + Error::DecryptionFailed + | Error::InvalidOnionRequest + | Error::InvalidRpcCall + | Error::ValidationFailed => return Ok(StatusCode::BAD_REQUEST.into_response()), Error::Unauthorized => return Ok(StatusCode::FORBIDDEN.into_response()), - Error::DatabaseFailedInternally => return Ok(StatusCode::INTERNAL_SERVER_ERROR.into_response()) + Error::DatabaseFailedInternally => { + return Ok(StatusCode::INTERNAL_SERVER_ERROR.into_response()) + } }; } else { return Ok(StatusCode::INTERNAL_SERVER_ERROR.into_response()); diff --git a/src/handlers.rs b/src/handlers.rs index f7fe896..6a9f7ce 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -1,15 +1,15 @@ +use std::collections::HashMap; use std::convert::TryInto; use std::fs; -use std::collections::HashMap; use std::io::prelude::*; use std::path::Path; use chrono; -use serde::{Deserialize, Serialize}; -use rusqlite::params; use rand::{thread_rng, Rng}; +use rusqlite::params; +use serde::{Deserialize, Serialize}; use uuid::Uuid; -use warp::{Rejection, http::StatusCode, reply::Reply, reply::Response}; +use warp::{http::StatusCode, reply::Reply, reply::Response, Rejection}; use super::crypto; use super::errors::Error; @@ -17,12 +17,12 @@ use super::models; use super::storage; enum AuthorizationLevel { - Basic, + Basic, Moderator } #[derive(Debug, Deserialize, Serialize)] -pub struct GenericStringResponse { +pub struct GenericStringResponse { pub status_code: u16, pub result: String } @@ -36,7 +36,7 @@ pub async fn create_room(id: &str, name: &str) -> Result { let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Insert the room let stmt = format!("REPLACE INTO {} (id, name) VALUES (?1, ?2)", storage::MAIN_TABLE); - match conn.execute(&stmt, params![ id, name ]) { + match conn.execute(&stmt, params![id, name]) { Ok(_) => (), Err(e) => { println!("Couldn't create room due to error: {}.", e); @@ -46,16 +46,21 @@ pub async fn create_room(id: &str, name: &str) -> Result { // Set up the database storage::create_database_if_needed(id); // Return - let json = models::StatusCode { status_code : StatusCode::OK.as_u16() }; + let json = models::StatusCode { status_code: StatusCode::OK.as_u16() }; return Ok(warp::reply::json(&json).into_response()); } // Files -pub async fn store_file(base64_encoded_bytes: &str, auth_token: &str, pool: &storage::DatabaseConnectionPool) -> Result { +pub async fn store_file( + base64_encoded_bytes: &str, auth_token: &str, pool: &storage::DatabaseConnectionPool +) -> Result { // Check authorization level - let (has_authorization_level, _) = has_authorization_level(auth_token, AuthorizationLevel::Basic, pool).await?; - if !has_authorization_level { return Err(warp::reject::custom(Error::Unauthorized)); } + let (has_authorization_level, _) = + has_authorization_level(auth_token, AuthorizationLevel::Basic, pool).await?; + if !has_authorization_level { + return Err(warp::reject::custom(Error::Unauthorized)); + } // Parse bytes let bytes = match base64::decode(base64_encoded_bytes) { Ok(bytes) => bytes, @@ -74,7 +79,7 @@ pub async fn store_file(base64_encoded_bytes: &str, auth_token: &str, pool: &sto let now = chrono::Utc::now().timestamp(); let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; let stmt = format!("INSERT INTO {} (id, timestamp) VALUES (?1, ?2)", storage::FILES_TABLE); - let _ = match conn.execute(&stmt, params![ id, now ]) { + let _ = match conn.execute(&stmt, params![id, now]) { Ok(rows) => rows, Err(e) => { println!("Couldn't insert file record due to error: {}.", e); @@ -103,14 +108,20 @@ pub async fn store_file(base64_encoded_bytes: &str, auth_token: &str, pool: &sto pos += count; } // Return - let json = GenericStringResponse { status_code : StatusCode::OK.as_u16(), result : id }; + let json = GenericStringResponse { status_code: StatusCode::OK.as_u16(), result: id }; return Ok(warp::reply::json(&json).into_response()); } -pub async fn get_file(id: &str, auth_token: &str, pool: &storage::DatabaseConnectionPool) -> Result { // Doesn't return a response directly for testing purposes +pub async fn get_file( + id: &str, auth_token: &str, pool: &storage::DatabaseConnectionPool +) -> Result { + // Doesn't return a response directly for testing purposes // Check authorization level - let (has_authorization_level, _) = has_authorization_level(auth_token, AuthorizationLevel::Basic, pool).await?; - if !has_authorization_level { return Err(warp::reject::custom(Error::Unauthorized)); } + let (has_authorization_level, _) = + has_authorization_level(auth_token, AuthorizationLevel::Basic, pool).await?; + if !has_authorization_level { + return Err(warp::reject::custom(Error::Unauthorized)); + } // Check that the ID is a valid UUID match Uuid::parse_str(id) { Ok(_) => (), @@ -132,26 +143,34 @@ pub async fn get_file(id: &str, auth_token: &str, pool: &storage::DatabaseConnec // Base64 encode the result let base64_encoded_bytes = base64::encode(bytes); // Return - let json = GenericStringResponse { status_code : StatusCode::OK.as_u16(), result : base64_encoded_bytes }; + let json = GenericStringResponse { + status_code: StatusCode::OK.as_u16(), + result: base64_encoded_bytes + }; return Ok(json); } // Authentication -pub async fn get_auth_token_challenge(query_params: HashMap, pool: &storage::DatabaseConnectionPool) -> Result { // Doesn't return a response directly for testing purposes +pub async fn get_auth_token_challenge( + query_params: HashMap, pool: &storage::DatabaseConnectionPool +) -> Result { + // Doesn't return a response directly for testing purposes // Get the public key - let hex_public_key = query_params.get("public_key").ok_or(warp::reject::custom(Error::InvalidRpcCall))?; + let hex_public_key = + query_params.get("public_key").ok_or(warp::reject::custom(Error::InvalidRpcCall))?; // Validate the public key - if !is_valid_public_key(hex_public_key) { + if !is_valid_public_key(hex_public_key) { println!("Ignoring challenge request for invalid public key: {}.", hex_public_key); - return Err(warp::reject::custom(Error::ValidationFailed)); + return Err(warp::reject::custom(Error::ValidationFailed)); } // Convert the public key to bytes and cut off the version byte let public_key: [u8; 32] = hex::decode(hex_public_key).unwrap()[1..].try_into().unwrap(); // Safe because we know it has a length of 32 at this point - // Generate an ephemeral key pair + // Generate an ephemeral key pair let (ephemeral_private_key, ephemeral_public_key) = crypto::generate_x25519_key_pair().await; // Generate a symmetric key from the requesting user's public key and the ephemeral private key - let symmetric_key = crypto::get_x25519_symmetric_key(&public_key, &ephemeral_private_key).await?; + let symmetric_key = + crypto::get_x25519_symmetric_key(&public_key, &ephemeral_private_key).await?; // Generate a random token let mut token = [0u8; 48]; thread_rng().fill(&mut token[..]); @@ -159,8 +178,11 @@ pub async fn get_auth_token_challenge(query_params: HashMap, poo // Note that a given public key can have multiple pending tokens let now = chrono::Utc::now().timestamp(); let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; - let stmt = format!("INSERT INTO {} (public_key, timestamp, token) VALUES (?1, ?2, ?3)", storage::PENDING_TOKENS_TABLE); - let _ = match conn.execute(&stmt, params![ hex_public_key, now, token.to_vec() ]) { + let stmt = format!( + "INSERT INTO {} (public_key, timestamp, token) VALUES (?1, ?2, ?3)", + storage::PENDING_TOKENS_TABLE + ); + let _ = match conn.execute(&stmt, params![hex_public_key, now, token.to_vec()]) { Ok(rows) => rows, Err(e) => { println!("Couldn't insert pending token due to error: {}.", e); @@ -170,30 +192,38 @@ pub async fn get_auth_token_challenge(query_params: HashMap, poo // Encrypt the token with the symmetric key let ciphertext = crypto::encrypt_aes_gcm(&token, &symmetric_key).await?; // Return - return Ok(models::Challenge { ciphertext : base64::encode(ciphertext), ephemeral_public_key : base64::encode(ephemeral_public_key.to_bytes()) }); + return Ok(models::Challenge { + ciphertext: base64::encode(ciphertext), + ephemeral_public_key: base64::encode(ephemeral_public_key.to_bytes()) + }); } -pub async fn claim_auth_token(public_key: &str, auth_token: &str, pool: &storage::DatabaseConnectionPool) -> Result { +pub async fn claim_auth_token( + public_key: &str, auth_token: &str, pool: &storage::DatabaseConnectionPool +) -> Result { // Validate the public key - if !is_valid_public_key(&public_key) { + if !is_valid_public_key(&public_key) { println!("Ignoring claim token request for invalid public key."); - return Err(warp::reject::custom(Error::ValidationFailed)); + return Err(warp::reject::custom(Error::ValidationFailed)); } // Validate the token - if hex::decode(auth_token).is_err() { + if hex::decode(auth_token).is_err() { println!("Ignoring claim token request for invalid token."); - return Err(warp::reject::custom(Error::ValidationFailed)); + return Err(warp::reject::custom(Error::ValidationFailed)); } // Get a database connection let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Get the pending tokens for the given public key - let raw_query = format!("SELECT timestamp, token FROM {} WHERE public_key = (?1) AND timestamp > (?2)", storage::PENDING_TOKENS_TABLE); + let raw_query = format!( + "SELECT timestamp, token FROM {} WHERE public_key = (?1) AND timestamp > (?2)", + storage::PENDING_TOKENS_TABLE + ); let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; let now = chrono::Utc::now().timestamp(); let expiration = now - storage::PENDING_TOKEN_EXPIRATION; - let rows = match query.query_map(params![ public_key, expiration ], |row| { - Ok((row.get(0)?, row.get(1)?)) - }) { + let rows = match query + .query_map(params![public_key, expiration], |row| Ok((row.get(0)?, row.get(1)?))) + { Ok(rows) => rows, Err(e) => { println!("Couldn't get pending tokens due to error: {}.", e); @@ -203,11 +233,17 @@ pub async fn claim_auth_token(public_key: &str, auth_token: &str, pool: &storage let pending_tokens: Vec<(i64, Vec)> = rows.filter_map(|result| result.ok()).collect(); // Check that the token being claimed is in fact one of the pending tokens let claim = hex::decode(auth_token).unwrap(); // Safe because we validated it above - let index = pending_tokens.iter().position(|(_, pending_token)| *pending_token == claim).ok_or_else(|| Error::Unauthorized)?; + let index = pending_tokens + .iter() + .position(|(_, pending_token)| *pending_token == claim) + .ok_or_else(|| Error::Unauthorized)?; let token = &pending_tokens[index].1; // Store the claimed token - let stmt = format!("INSERT OR REPLACE INTO {} (public_key, token) VALUES (?1, ?2)", storage::TOKENS_TABLE); - match conn.execute(&stmt, params![ public_key, hex::encode(token) ]) { + let stmt = format!( + "INSERT OR REPLACE INTO {} (public_key, token) VALUES (?1, ?2)", + storage::TOKENS_TABLE + ); + match conn.execute(&stmt, params![public_key, hex::encode(token)]) { Ok(_) => (), Err(e) => { println!("Couldn't insert token due to error: {}.", e); @@ -216,24 +252,29 @@ pub async fn claim_auth_token(public_key: &str, auth_token: &str, pool: &storage } // Delete all pending tokens for the given public key let stmt = format!("DELETE FROM {} WHERE public_key = (?1)", storage::PENDING_TOKENS_TABLE); - match conn.execute(&stmt, params![ public_key ]) { + match conn.execute(&stmt, params![public_key]) { Ok(_) => (), Err(e) => println!("Couldn't delete pending tokens due to error: {}.", e) // It's not catastrophic if this fails }; // Return - let json = models::StatusCode { status_code : StatusCode::OK.as_u16() }; + let json = models::StatusCode { status_code: StatusCode::OK.as_u16() }; return Ok(warp::reply::json(&json).into_response()); } -pub async fn delete_auth_token(auth_token: &str, pool: &storage::DatabaseConnectionPool) -> Result { +pub async fn delete_auth_token( + auth_token: &str, pool: &storage::DatabaseConnectionPool +) -> Result { // Check authorization level - let (has_authorization_level, requesting_public_key) = has_authorization_level(auth_token, AuthorizationLevel::Basic, pool).await?; - if !has_authorization_level { return Err(warp::reject::custom(Error::Unauthorized)); } + let (has_authorization_level, requesting_public_key) = + has_authorization_level(auth_token, AuthorizationLevel::Basic, pool).await?; + if !has_authorization_level { + return Err(warp::reject::custom(Error::Unauthorized)); + } // Get a database connection let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Delete the token let stmt = format!("DELETE FROM {} WHERE public_key = (?1)", storage::TOKENS_TABLE); - match conn.execute(&stmt, params![ requesting_public_key ]) { + match conn.execute(&stmt, params![requesting_public_key]) { Ok(_) => (), Err(e) => { println!("Couldn't delete token due to error: {}.", e); @@ -241,28 +282,36 @@ pub async fn delete_auth_token(auth_token: &str, pool: &storage::DatabaseConnect } }; // Return - let json = models::StatusCode { status_code : StatusCode::OK.as_u16() }; + let json = models::StatusCode { status_code: StatusCode::OK.as_u16() }; return Ok(warp::reply::json(&json).into_response()); } // Message sending & receiving /// Inserts the given `message` into the database if it's valid. -pub async fn insert_message(mut message: models::Message, auth_token: &str, pool: &storage::DatabaseConnectionPool) -> Result { +pub async fn insert_message( + mut message: models::Message, auth_token: &str, pool: &storage::DatabaseConnectionPool +) -> Result { // Validate the message - if !message.is_valid() { + if !message.is_valid() { println!("Ignoring invalid message."); - return Err(warp::reject::custom(Error::ValidationFailed)); + return Err(warp::reject::custom(Error::ValidationFailed)); } // Check authorization level - let (has_authorization_level, requesting_public_key) = has_authorization_level(auth_token, AuthorizationLevel::Basic, pool).await?; - if !has_authorization_level { return Err(warp::reject::custom(Error::Unauthorized)); } + let (has_authorization_level, requesting_public_key) = + has_authorization_level(auth_token, AuthorizationLevel::Basic, pool).await?; + if !has_authorization_level { + return Err(warp::reject::custom(Error::Unauthorized)); + } // Get a connection and open a transaction let mut conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; let tx = conn.transaction().map_err(|_| Error::DatabaseFailedInternally)?; // Insert the message - let stmt = format!("INSERT INTO {} (public_key, data, signature) VALUES (?1, ?2, ?3)", storage::MESSAGES_TABLE); - match tx.execute(&stmt, params![ &requesting_public_key, message.data, message.signature ]) { + let stmt = format!( + "INSERT INTO {} (public_key, data, signature) VALUES (?1, ?2, ?3)", + storage::MESSAGES_TABLE + ); + match tx.execute(&stmt, params![&requesting_public_key, message.data, message.signature]) { Ok(_) => (), Err(e) => { println!("Couldn't insert message due to error: {}.", e); @@ -280,15 +329,20 @@ pub async fn insert_message(mut message: models::Message, auth_token: &str, pool status_code: u16, message: models::Message } - let response = Response { status_code : StatusCode::OK.as_u16(), message : message }; + let response = Response { status_code: StatusCode::OK.as_u16(), message }; return Ok(warp::reply::json(&response).into_response()); } /// Returns either the last `limit` messages or all messages since `from_server_id, limited to `limit`. -pub async fn get_messages(query_params: HashMap, auth_token: &str, pool: &storage::DatabaseConnectionPool) -> Result { +pub async fn get_messages( + query_params: HashMap, auth_token: &str, pool: &storage::DatabaseConnectionPool +) -> Result { // Check authorization level - let (has_authorization_level, _) = has_authorization_level(auth_token, AuthorizationLevel::Basic, pool).await?; - if !has_authorization_level { return Err(warp::reject::custom(Error::Unauthorized)); } + let (has_authorization_level, _) = + has_authorization_level(auth_token, AuthorizationLevel::Basic, pool).await?; + if !has_authorization_level { + return Err(warp::reject::custom(Error::Unauthorized)); + } // Get a database connection let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Unwrap query parameters @@ -309,11 +363,19 @@ pub async fn get_messages(query_params: HashMap, auth_token: &st if query_params.get("from_server_id").is_some() { raw_query = format!("SELECT id, public_key, data, signature FROM {} WHERE rowid > (?1) ORDER BY rowid ASC LIMIT (?2)", storage::MESSAGES_TABLE); } else { - raw_query = format!("SELECT id, public_key, data, signature FROM {} ORDER BY rowid DESC LIMIT (?2)", storage::MESSAGES_TABLE); + raw_query = format!( + "SELECT id, public_key, data, signature FROM {} ORDER BY rowid DESC LIMIT (?2)", + storage::MESSAGES_TABLE + ); } let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; - let rows = match query.query_map(params![ from_server_id, limit ], |row| { - Ok(models::Message { server_id : row.get(0)?, public_key : row.get(1)?, data : row.get(2)?, signature : row.get(3)? }) + let rows = match query.query_map(params![from_server_id, limit], |row| { + Ok(models::Message { + server_id: row.get(0)?, + public_key: row.get(1)?, + data: row.get(2)?, + signature: row.get(3)? + }) }) { Ok(rows) => rows, Err(e) => { @@ -328,25 +390,29 @@ pub async fn get_messages(query_params: HashMap, auth_token: &st status_code: u16, messages: Vec } - let response = Response { status_code : StatusCode::OK.as_u16(), messages : messages }; + let response = Response { status_code: StatusCode::OK.as_u16(), messages }; return Ok(warp::reply::json(&response).into_response()); } // Message deletion /// Deletes the message with the given `row_id` from the database, if it's present. -pub async fn delete_message(row_id: i64, auth_token: &str, pool: &storage::DatabaseConnectionPool) -> Result { +pub async fn delete_message( + row_id: i64, auth_token: &str, pool: &storage::DatabaseConnectionPool +) -> Result { // Check authorization level - let (has_authorization_level, requesting_public_key) = has_authorization_level(auth_token, AuthorizationLevel::Basic, pool).await?; - if !has_authorization_level { return Err(warp::reject::custom(Error::Unauthorized)); } + let (has_authorization_level, requesting_public_key) = + has_authorization_level(auth_token, AuthorizationLevel::Basic, pool).await?; + if !has_authorization_level { + return Err(warp::reject::custom(Error::Unauthorized)); + } // Check that the requesting user is either the sender of the message or a moderator let sender_option: Option = { let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; - let raw_query = format!("SELECT public_key FROM {} WHERE rowid = (?1)", storage::MESSAGES_TABLE); + let raw_query = + format!("SELECT public_key FROM {} WHERE rowid = (?1)", storage::MESSAGES_TABLE); let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; - let rows = match query.query_map(params![ row_id ], |row| { - Ok(row.get(0)?) - }) { + let rows = match query.query_map(params![row_id], |row| Ok(row.get(0)?)) { Ok(rows) => rows, Err(e) => { println!("Couldn't delete message due to error: {}.", e); @@ -357,13 +423,15 @@ pub async fn delete_message(row_id: i64, auth_token: &str, pool: &storage::Datab public_keys.get(0).map(|s| s.to_string()) }; let sender = sender_option.ok_or(warp::reject::custom(Error::DatabaseFailedInternally))?; - if !is_moderator(&requesting_public_key, pool).await? && requesting_public_key != sender { return Err(warp::reject::custom(Error::Unauthorized)); } + if !is_moderator(&requesting_public_key, pool).await? && requesting_public_key != sender { + return Err(warp::reject::custom(Error::Unauthorized)); + } // Get a connection and open a transaction let mut conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; let tx = conn.transaction().map_err(|_| Error::DatabaseFailedInternally)?; // Delete the message if it's present let stmt = format!("DELETE FROM {} WHERE rowid = (?1)", storage::MESSAGES_TABLE); - let count = match tx.execute(&stmt, params![ row_id ]) { + let count = match tx.execute(&stmt, params![row_id]) { Ok(count) => count, Err(e) => { println!("Couldn't delete message due to error: {}.", e); @@ -373,7 +441,7 @@ pub async fn delete_message(row_id: i64, auth_token: &str, pool: &storage::Datab // Update the deletions table if needed if count > 0 { let stmt = format!("INSERT INTO {} (id) VALUES (?1)", storage::DELETED_MESSAGES_TABLE); - match tx.execute(&stmt, params![ row_id ]) { + match tx.execute(&stmt, params![row_id]) { Ok(_) => (), Err(e) => { println!("Couldn't delete message due to error: {}.", e); @@ -384,15 +452,20 @@ pub async fn delete_message(row_id: i64, auth_token: &str, pool: &storage::Datab // Commit tx.commit().map_err(|_| Error::DatabaseFailedInternally)?; // Return - let json = models::StatusCode { status_code : StatusCode::OK.as_u16() }; + let json = models::StatusCode { status_code: StatusCode::OK.as_u16() }; return Ok(warp::reply::json(&json).into_response()); } /// Returns either the last `limit` deleted messages or all deleted messages since `from_server_id, limited to `limit`. -pub async fn get_deleted_messages(query_params: HashMap, auth_token: &str, pool: &storage::DatabaseConnectionPool) -> Result { +pub async fn get_deleted_messages( + query_params: HashMap, auth_token: &str, pool: &storage::DatabaseConnectionPool +) -> Result { // Check authorization level - let (has_authorization_level, _) = has_authorization_level(auth_token, AuthorizationLevel::Basic, pool).await?; - if !has_authorization_level { return Err(warp::reject::custom(Error::Unauthorized)); } + let (has_authorization_level, _) = + has_authorization_level(auth_token, AuthorizationLevel::Basic, pool).await?; + if !has_authorization_level { + return Err(warp::reject::custom(Error::Unauthorized)); + } // Get a database connection let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Unwrap query parameters @@ -411,14 +484,18 @@ pub async fn get_deleted_messages(query_params: HashMap, auth_to // Query the database let raw_query: String; if query_params.get("from_server_id").is_some() { - raw_query = format!("SELECT id FROM {} WHERE rowid > (?1) ORDER BY rowid ASC LIMIT (?2)", storage::DELETED_MESSAGES_TABLE); + raw_query = format!( + "SELECT id FROM {} WHERE rowid > (?1) ORDER BY rowid ASC LIMIT (?2)", + storage::DELETED_MESSAGES_TABLE + ); } else { - raw_query = format!("SELECT id FROM {} ORDER BY rowid DESC LIMIT (?2)", storage::DELETED_MESSAGES_TABLE); + raw_query = format!( + "SELECT id FROM {} ORDER BY rowid DESC LIMIT (?2)", + storage::DELETED_MESSAGES_TABLE + ); } let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; - let rows = match query.query_map(params![ from_server_id, limit ], |row| { - Ok(row.get(0)?) - }) { + let rows = match query.query_map(params![from_server_id, limit], |row| Ok(row.get(0)?)) { Ok(rows) => rows, Err(e) => { println!("Couldn't query database due to error: {}.", e); @@ -428,49 +505,61 @@ pub async fn get_deleted_messages(query_params: HashMap, auth_to let ids: Vec = rows.filter_map(|result| result.ok()).collect(); // Return the IDs #[derive(Debug, Deserialize, Serialize)] - struct Response { + struct Response { status_code: u16, ids: Vec } - let response = Response { status_code : StatusCode::OK.as_u16(), ids : ids }; + let response = Response { status_code: StatusCode::OK.as_u16(), ids }; return Ok(warp::reply::json(&response).into_response()); } // Moderation /// Returns the full list of moderators. -pub async fn get_moderators(auth_token: &str, pool: &storage::DatabaseConnectionPool) -> Result { +pub async fn get_moderators( + auth_token: &str, pool: &storage::DatabaseConnectionPool +) -> Result { // Check authorization level - let (has_authorization_level, _) = has_authorization_level(auth_token, AuthorizationLevel::Basic, pool).await?; - if !has_authorization_level { return Err(warp::reject::custom(Error::Unauthorized)); } + let (has_authorization_level, _) = + has_authorization_level(auth_token, AuthorizationLevel::Basic, pool).await?; + if !has_authorization_level { + return Err(warp::reject::custom(Error::Unauthorized)); + } // Return let public_keys = get_moderators_vector(pool).await?; #[derive(Debug, Deserialize, Serialize)] - struct Response { + struct Response { status_code: u16, moderators: Vec } - let response = Response { status_code : StatusCode::OK.as_u16(), moderators : public_keys }; + let response = Response { status_code: StatusCode::OK.as_u16(), moderators: public_keys }; return Ok(warp::reply::json(&response).into_response()); } /// Bans the given `public_key` if the requesting user is a moderator. -pub async fn ban(public_key: &str, auth_token: &str, pool: &storage::DatabaseConnectionPool) -> Result { +pub async fn ban( + public_key: &str, auth_token: &str, pool: &storage::DatabaseConnectionPool +) -> Result { // Validate the public key - if !is_valid_public_key(&public_key) { + if !is_valid_public_key(&public_key) { println!("Ignoring ban request for invalid public key."); - return Err(warp::reject::custom(Error::ValidationFailed)); + return Err(warp::reject::custom(Error::ValidationFailed)); } // Check authorization level - let (has_authorization_level, _) = has_authorization_level(auth_token, AuthorizationLevel::Moderator, pool).await?; - if !has_authorization_level { return Err(warp::reject::custom(Error::Unauthorized)); } + let (has_authorization_level, _) = + has_authorization_level(auth_token, AuthorizationLevel::Moderator, pool).await?; + if !has_authorization_level { + return Err(warp::reject::custom(Error::Unauthorized)); + } // Don't double ban public keys - if is_banned(&public_key, pool).await? { return Ok(StatusCode::OK.into_response()); } + if is_banned(&public_key, pool).await? { + return Ok(StatusCode::OK.into_response()); + } // Get a database connection let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Insert the message let stmt = format!("INSERT INTO {} (public_key) VALUES (?1)", storage::BLOCK_LIST_TABLE); - match conn.execute(&stmt, params![ public_key ]) { + match conn.execute(&stmt, params![public_key]) { Ok(_) => (), Err(e) => { println!("Couldn't ban public key due to error: {}.", e); @@ -478,27 +567,34 @@ pub async fn ban(public_key: &str, auth_token: &str, pool: &storage::DatabaseCon } }; // Return - let json = models::StatusCode { status_code : StatusCode::OK.as_u16() }; + let json = models::StatusCode { status_code: StatusCode::OK.as_u16() }; return Ok(warp::reply::json(&json).into_response()); } /// Unbans the given `public_key` if the requesting user is a moderator. -pub async fn unban(public_key: &str, auth_token: &str, pool: &storage::DatabaseConnectionPool) -> Result { +pub async fn unban( + public_key: &str, auth_token: &str, pool: &storage::DatabaseConnectionPool +) -> Result { // Validate the public key - if !is_valid_public_key(&public_key) { + if !is_valid_public_key(&public_key) { println!("Ignoring unban request for invalid public key."); - return Err(warp::reject::custom(Error::ValidationFailed)); + return Err(warp::reject::custom(Error::ValidationFailed)); } // Check authorization level - let (has_authorization_level, _) = has_authorization_level(auth_token, AuthorizationLevel::Moderator, pool).await?; - if !has_authorization_level { return Err(warp::reject::custom(Error::Unauthorized)); } + let (has_authorization_level, _) = + has_authorization_level(auth_token, AuthorizationLevel::Moderator, pool).await?; + if !has_authorization_level { + return Err(warp::reject::custom(Error::Unauthorized)); + } // Don't double unban public keys - if !is_banned(&public_key, pool).await? { return Ok(StatusCode::OK.into_response()); } + if !is_banned(&public_key, pool).await? { + return Ok(StatusCode::OK.into_response()); + } // Get a database connection let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Insert the message let stmt = format!("DELETE FROM {} WHERE public_key = (?1)", storage::BLOCK_LIST_TABLE); - match conn.execute(&stmt, params![ public_key ]) { + match conn.execute(&stmt, params![public_key]) { Ok(_) => (), Err(e) => { println!("Couldn't unban public key due to error: {}.", e); @@ -506,40 +602,48 @@ pub async fn unban(public_key: &str, auth_token: &str, pool: &storage::DatabaseC } }; // Return - let json = models::StatusCode { status_code : StatusCode::OK.as_u16() }; + let json = models::StatusCode { status_code: StatusCode::OK.as_u16() }; return Ok(warp::reply::json(&json).into_response()); } /// Returns the full list of banned public keys. -pub async fn get_banned_public_keys(auth_token: &str, pool: &storage::DatabaseConnectionPool) -> Result { +pub async fn get_banned_public_keys( + auth_token: &str, pool: &storage::DatabaseConnectionPool +) -> Result { // Check authorization level - let (has_authorization_level, _) = has_authorization_level(auth_token, AuthorizationLevel::Basic, pool).await?; - if !has_authorization_level { return Err(warp::reject::custom(Error::Unauthorized)); } + let (has_authorization_level, _) = + has_authorization_level(auth_token, AuthorizationLevel::Basic, pool).await?; + if !has_authorization_level { + return Err(warp::reject::custom(Error::Unauthorized)); + } // Return let public_keys = get_banned_public_keys_vector(pool).await?; #[derive(Debug, Deserialize, Serialize)] - struct Response { + struct Response { status_code: u16, banned_members: Vec } - let response = Response { status_code : StatusCode::OK.as_u16(), banned_members : public_keys }; + let response = Response { status_code: StatusCode::OK.as_u16(), banned_members: public_keys }; return Ok(warp::reply::json(&response).into_response()); } // General -pub async fn get_member_count(auth_token: &str, pool: &storage::DatabaseConnectionPool) -> Result { +pub async fn get_member_count( + auth_token: &str, pool: &storage::DatabaseConnectionPool +) -> Result { // Check authorization level - let (has_authorization_level, _) = has_authorization_level(auth_token, AuthorizationLevel::Basic, pool).await?; - if !has_authorization_level { return Err(warp::reject::custom(Error::Unauthorized)); } + let (has_authorization_level, _) = + has_authorization_level(auth_token, AuthorizationLevel::Basic, pool).await?; + if !has_authorization_level { + return Err(warp::reject::custom(Error::Unauthorized)); + } // Get a database connection let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Query the database let raw_query = format!("SELECT public_key FROM {}", storage::TOKENS_TABLE); let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; - let rows = match query.query_map(params![], |row| { - Ok(row.get(0)?) - }) { + let rows = match query.query_map(params![], |row| Ok(row.get(0)?)) { Ok(rows) => rows, Err(e) => { println!("Couldn't query database due to error: {}.", e); @@ -550,25 +654,26 @@ pub async fn get_member_count(auth_token: &str, pool: &storage::DatabaseConnecti let public_key_count = public_keys.len(); // Return #[derive(Debug, Deserialize, Serialize)] - struct Response { + struct Response { status_code: u16, member_count: usize } - let response = Response { status_code : StatusCode::OK.as_u16(), member_count : public_key_count }; + let response = + Response { status_code: StatusCode::OK.as_u16(), member_count: public_key_count }; return Ok(warp::reply::json(&response).into_response()); } // Utilities -async fn get_moderators_vector(pool: &storage::DatabaseConnectionPool) -> Result, Rejection> { +async fn get_moderators_vector( + pool: &storage::DatabaseConnectionPool +) -> Result, Rejection> { // Get a database connection let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Query the database let raw_query = format!("SELECT public_key FROM {}", storage::MODERATORS_TABLE); let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; - let rows = match query.query_map(params![], |row| { - Ok(row.get(0)?) - }) { + let rows = match query.query_map(params![], |row| Ok(row.get(0)?)) { Ok(rows) => rows, Err(e) => { println!("Couldn't query database due to error: {}.", e); @@ -579,20 +684,22 @@ async fn get_moderators_vector(pool: &storage::DatabaseConnectionPool) -> Result return Ok(rows.filter_map(|result| result.ok()).collect()); } -async fn is_moderator(public_key: &str, pool: &storage::DatabaseConnectionPool) -> Result { +async fn is_moderator( + public_key: &str, pool: &storage::DatabaseConnectionPool +) -> Result { let public_keys = get_moderators_vector(&pool).await?; return Ok(public_keys.contains(&public_key.to_owned())); } -async fn get_banned_public_keys_vector(pool: &storage::DatabaseConnectionPool) -> Result, Rejection> { +async fn get_banned_public_keys_vector( + pool: &storage::DatabaseConnectionPool +) -> Result, Rejection> { // Get a database connection let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Query the database let raw_query = format!("SELECT public_key FROM {}", storage::BLOCK_LIST_TABLE); let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; - let rows = match query.query_map(params![], |row| { - Ok(row.get(0)?) - }) { + let rows = match query.query_map(params![], |row| Ok(row.get(0)?)) { Ok(rows) => rows, Err(e) => { println!("Couldn't query database due to error: {}.", e); @@ -603,29 +710,35 @@ async fn get_banned_public_keys_vector(pool: &storage::DatabaseConnectionPool) - return Ok(rows.filter_map(|result| result.ok()).collect()); } -async fn is_banned(public_key: &str, pool: &storage::DatabaseConnectionPool) -> Result { +async fn is_banned( + public_key: &str, pool: &storage::DatabaseConnectionPool +) -> Result { let public_keys = get_banned_public_keys_vector(&pool).await?; return Ok(public_keys.contains(&public_key.to_owned())); } fn is_valid_public_key(public_key: &str) -> bool { // Check that it's a valid hex encoding - if hex::decode(public_key).is_err() { return false; } + if hex::decode(public_key).is_err() { + return false; + } // Check that it's the right length - if public_key.len() != 66 { return false } // The version byte + 32 bytes of random data - // It appears to be a valid public key - return true + if public_key.len() != 66 { + return false; + } // The version byte + 32 bytes of random data + // It appears to be a valid public key + return true; } -async fn get_public_key_for_auth_token(auth_token: &str, pool: &storage::DatabaseConnectionPool) -> Result, Rejection> { +async fn get_public_key_for_auth_token( + auth_token: &str, pool: &storage::DatabaseConnectionPool +) -> Result, Rejection> { // Get a database connection let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?; // Query the database let raw_query = format!("SELECT public_key FROM {} WHERE token = (?1)", storage::TOKENS_TABLE); let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; - let rows = match query.query_map(params![ auth_token ], |row| { - Ok(row.get(0)?) - }) { + let rows = match query.query_map(params![auth_token], |row| Ok(row.get(0)?)) { Ok(rows) => rows, Err(e) => { println!("Couldn't query database due to error: {}.", e); @@ -637,17 +750,23 @@ async fn get_public_key_for_auth_token(auth_token: &str, pool: &storage::Databas return Ok(public_keys.get(0).map(|s| s.to_string())); } -async fn has_authorization_level(auth_token: &str, level: AuthorizationLevel, pool: &storage::DatabaseConnectionPool) -> Result<(bool, String), Rejection> { +async fn has_authorization_level( + auth_token: &str, level: AuthorizationLevel, pool: &storage::DatabaseConnectionPool +) -> Result<(bool, String), Rejection> { // Check that we have a public key associated with the given auth token let public_key_option = get_public_key_for_auth_token(auth_token, pool).await?; let public_key = public_key_option.ok_or(warp::reject::custom(Error::Unauthorized))?; // Check that the given public key isn't banned - if is_banned(&public_key, pool).await? { return Err(warp::reject::custom(Error::Unauthorized)); } + if is_banned(&public_key, pool).await? { + return Err(warp::reject::custom(Error::Unauthorized)); + } // If needed, check that the given public key is a moderator match level { AuthorizationLevel::Basic => return Ok((true, public_key)), AuthorizationLevel::Moderator => { - if !is_moderator(&public_key, pool).await? { return Err(warp::reject::custom(Error::Unauthorized)); } + if !is_moderator(&public_key, pool).await? { + return Err(warp::reject::custom(Error::Unauthorized)); + } return Ok((true, public_key)); } }; diff --git a/src/main.rs b/src/main.rs index 69f33bf..4cf1f2d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,8 @@ use std::fs; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use futures::join; use structopt::StructOpt; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use tokio; use warp::Filter; @@ -39,7 +39,7 @@ struct Opt { /// Set IP to bind to. #[structopt(short = "H", long = "host", default_value = "0.0.0.0")] - host: Ipv4Addr, + host: Ipv4Addr } #[tokio::main] @@ -55,7 +55,7 @@ async fn main() { // Create required folders fs::create_dir_all("./rooms").unwrap(); fs::create_dir_all("./files").unwrap(); - // Create the main room + // Create the main room let main_room_id = "main"; let main_room_name = "Main"; handlers::create_room(&main_room_id, &main_room_name).await.unwrap(); @@ -69,7 +69,12 @@ async fn main() { println!("Running in plaintext mode on {}.", addr); let serve_routes_future = warp::serve(routes).run(addr); // Keep futures alive - join!(prune_pending_tokens_future, prune_tokens_future, prune_files_future, serve_routes_future); + join!( + prune_pending_tokens_future, + prune_tokens_future, + prune_files_future, + serve_routes_future + ); } else { println!("Running on {} with TLS.", addr); let serve_routes_future = warp::serve(routes) @@ -78,6 +83,11 @@ async fn main() { .key_path(opt.tls_priv_key_file) .run(addr); // Keep futures alive - join!(prune_pending_tokens_future, prune_tokens_future, prune_files_future, serve_routes_future); + join!( + prune_pending_tokens_future, + prune_tokens_future, + prune_files_future, + serve_routes_future + ); } } diff --git a/src/models.rs b/src/models.rs index e34cfec..30bbac7 100644 --- a/src/models.rs +++ b/src/models.rs @@ -9,10 +9,7 @@ pub struct Message { } impl Message { - - pub fn is_valid(&self) -> bool { - return !self.data.is_empty() && !self.signature.is_empty(); - } + pub fn is_valid(&self) -> bool { return !self.data.is_empty() && !self.signature.is_empty(); } } #[derive(Debug, Deserialize, Serialize)] diff --git a/src/onion_requests.rs b/src/onion_requests.rs index 1c22782..498eddf 100644 --- a/src/onion_requests.rs +++ b/src/onion_requests.rs @@ -1,7 +1,7 @@ use std::convert::TryInto; use serde::{Deserialize, Serialize}; -use warp::{Rejection, reply::Reply, reply::Response, http::StatusCode}; +use warp::{http::StatusCode, reply::Reply, reply::Response, Rejection}; use super::crypto; use super::errors::Error; @@ -34,7 +34,9 @@ pub async fn handle_onion_request(blob: warp::hyper::body::Bytes) -> Result Result { +async fn handle_decrypted_onion_request( + plaintext: &[u8], symmetric_key: &[u8] +) -> Result { let rpc_call = match serde_json::from_slice(plaintext) { Ok(rpc_call) => rpc_call, Err(e) => { @@ -43,19 +45,22 @@ async fn handle_decrypted_onion_request(plaintext: &[u8], symmetric_key: &[u8]) } }; // Perform the RPC call - let result = rpc::handle_rpc_call(rpc_call).await + let result = rpc::handle_rpc_call(rpc_call) + .await // Turn any error that occurred into an HTTP response .or_else(super::errors::into_response)?; // Safe because at this point any error should be caught and turned into an HTTP response (i.e. an OK result) - // Encrypt the HTTP response so that it's propagated back to the client that made - // the onion request + // Encrypt the HTTP response so that it's propagated back to the client that made + // the onion request return encrypt_response(result, symmetric_key).await; } -async fn parse_onion_request_payload(blob: warp::hyper::body::Bytes) -> Result { +async fn parse_onion_request_payload( + blob: warp::hyper::body::Bytes +) -> Result { // The encoding of an onion request looks like: | 4 bytes: size N of ciphertext | N bytes: ciphertext | json as utf8 | - if blob.len() < 4 { + if blob.len() < 4 { println!("Ignoring blob of invalid size."); - return Err(warp::reject::custom(Error::InvalidOnionRequest)); + return Err(warp::reject::custom(Error::InvalidOnionRequest)); } // Extract the different components // This is safe because we know blob has a length of at least 4 bytes @@ -79,18 +84,21 @@ async fn parse_onion_request_payload(blob: warp::hyper::body::Bytes) -> Result Result<(Vec, Vec), Rejection> { +async fn decrypt_onion_request_payload( + payload: OnionRequestPayload +) -> Result<(Vec, Vec), Rejection> { let ephemeral_key = hex::decode(payload.metadata.ephemeral_key).unwrap(); // Safe because it was validated in the parsing step - let symmetric_key = crypto::get_x25519_symmetric_key(&ephemeral_key, &crypto::PRIVATE_KEY).await?; + let symmetric_key = + crypto::get_x25519_symmetric_key(&ephemeral_key, &crypto::PRIVATE_KEY).await?; let plaintext = crypto::decrypt_aes_gcm(&payload.ciphertext, &symmetric_key).await?; return Ok((plaintext, symmetric_key)); } @@ -101,23 +109,21 @@ async fn encrypt_response(response: Response, symmetric_key: &[u8]) -> Result u32 { - ((array[0] as u32) << 00) + - ((array[1] as u32) << 08) + - ((array[2] as u32) << 16) + - ((array[3] as u32) << 24) -} \ No newline at end of file + ((array[0] as u32) << 00) + + ((array[1] as u32) << 08) + + ((array[2] as u32) << 16) + + ((array[3] as u32) << 24) +} diff --git a/src/routes.rs b/src/routes.rs index a2514eb..9ec356b 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -1,19 +1,19 @@ -use warp::{Filter, Rejection, reply::Reply, reply::Response}; +use warp::{reply::Reply, reply::Response, Filter, Rejection}; use super::errors; use super::onion_requests; /// GET / pub fn root() -> impl Filter + Clone { - return warp::get() - .and(warp::path::end()) - .and_then(root_html); + return warp::get().and(warp::path::end()).and_then(root_html); } /// POST /loki/v3/lsrpc pub fn lsrpc() -> impl Filter + Clone { return warp::post() - .and(warp::path("loki")).and(warp::path("v3")).and(warp::path("lsrpc")) + .and(warp::path("loki")) + .and(warp::path("v3")) + .and(warp::path("lsrpc")) .and(warp::body::content_length_limit(10 * 1024 * 1024)) // Match storage server .and(warp::body::bytes()) // Expect bytes .and_then(onion_requests::handle_onion_request) @@ -39,4 +39,4 @@ pub async fn root_html() -> Result { pub async fn into_response(e: Rejection) -> Result { return errors::into_response(e); -} \ No newline at end of file +} diff --git a/src/rpc.rs b/src/rpc.rs index 32ca30d..de6394e 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use serde::{Deserialize, Serialize}; -use warp::{http::StatusCode, Rejection, reply::Reply, reply::Response}; +use warp::{http::StatusCode, reply::Reply, reply::Response, Rejection}; use super::errors::Error; use super::handlers; @@ -22,7 +22,7 @@ pub async fn handle_rpc_call(rpc_call: RpcCall) -> Result { Some(room_id) => room_id, None => { println!("Missing room ID."); - return Err(warp::reject::custom(Error::InvalidRpcCall)) + return Err(warp::reject::custom(Error::InvalidRpcCall)); } }; let pool = storage::pool_by_room_id(&room_id); @@ -60,7 +60,10 @@ pub async fn handle_rpc_call(rpc_call: RpcCall) -> Result { } } -async fn handle_get_request(rpc_call: RpcCall, path: &str, auth_token: Option, query_params: HashMap, pool: &storage::DatabaseConnectionPool) -> Result { +async fn handle_get_request( + rpc_call: RpcCall, path: &str, auth_token: Option, + query_params: HashMap, pool: &storage::DatabaseConnectionPool +) -> Result { // Getting an auth token challenge doesn't require authorization, so we // handle it first if path == "auth_token_challenge" { @@ -70,7 +73,7 @@ async fn handle_get_request(rpc_call: RpcCall, path: &str, auth_token: Option return handlers::get_messages(query_params, &auth_token, pool).await, - "deleted_messages" => return handlers::get_deleted_messages(query_params, &auth_token, pool).await, + "deleted_messages" => { + return handlers::get_deleted_messages(query_params, &auth_token, pool).await + } "moderators" => return handlers::get_moderators(&auth_token, pool).await, "block_list" => return handlers::get_banned_public_keys(&auth_token, pool).await, "member_count" => return handlers::get_member_count(&auth_token, pool).await, @@ -98,17 +105,20 @@ async fn handle_get_request(rpc_call: RpcCall, path: &str, auth_token: Option { println!("Ignoring RPC call with invalid or unused endpoint: {}.", rpc_call.endpoint); - return Err(warp::reject::custom(Error::InvalidRpcCall)); + return Err(warp::reject::custom(Error::InvalidRpcCall)); } } } -async fn handle_post_request(rpc_call: RpcCall, path: &str, auth_token: Option, pool: &storage::DatabaseConnectionPool) -> Result { +async fn handle_post_request( + rpc_call: RpcCall, path: &str, auth_token: Option, + pool: &storage::DatabaseConnectionPool +) -> Result { // Check that the auth token is present let auth_token = auth_token.ok_or(warp::reject::custom(Error::Unauthorized))?; // Switch on the path @@ -121,11 +131,13 @@ async fn handle_post_request(rpc_call: RpcCall, path: &str, auth_token: Option { #[derive(Debug, Deserialize)] - struct JSON { public_key: String } + struct JSON { + public_key: String + } let json: JSON = match serde_json::from_str(&rpc_call.body) { Ok(message) => message, Err(e) => { @@ -134,10 +146,12 @@ async fn handle_post_request(rpc_call: RpcCall, path: &str, auth_token: Option { #[derive(Debug, Deserialize)] - struct JSON { public_key: String } + struct JSON { + public_key: String + } let json: JSON = match serde_json::from_str(&rpc_call.body) { Ok(message) => message, Err(e) => { @@ -146,10 +160,12 @@ async fn handle_post_request(rpc_call: RpcCall, path: &str, auth_token: Option { #[derive(Debug, Deserialize)] - struct JSON { file: String } + struct JSON { + file: String + } let json: JSON = match serde_json::from_str(&rpc_call.body) { Ok(message) => message, Err(e) => { @@ -158,15 +174,18 @@ async fn handle_post_request(rpc_call: RpcCall, path: &str, auth_token: Option { println!("Ignoring RPC call with invalid or unused endpoint: {}.", rpc_call.endpoint); - return Err(warp::reject::custom(Error::InvalidRpcCall)); + return Err(warp::reject::custom(Error::InvalidRpcCall)); } } } -async fn handle_delete_request(rpc_call: RpcCall, path: &str, auth_token: Option, pool: &storage::DatabaseConnectionPool) -> Result { +async fn handle_delete_request( + rpc_call: RpcCall, path: &str, auth_token: Option, + pool: &storage::DatabaseConnectionPool +) -> Result { // Check that the auth token is present let auth_token = auth_token.ok_or(warp::reject::custom(Error::Unauthorized))?; // DELETE /messages/:server_id @@ -207,11 +226,15 @@ async fn handle_delete_request(rpc_call: RpcCall, path: &str, auth_token: Option // Utilities fn get_auth_token(rpc_call: &RpcCall) -> Option { - if rpc_call.headers.is_empty() { return None; } + if rpc_call.headers.is_empty() { + return None; + } return rpc_call.headers.get("Authorization").map(|s| s.to_string()); } fn get_room_id(rpc_call: &RpcCall) -> Option { - if rpc_call.headers.is_empty() { return None; } + if rpc_call.headers.is_empty() { + return None; + } return rpc_call.headers.get("Room").map(|s| s.to_string()); -} \ No newline at end of file +} diff --git a/src/storage.rs b/src/storage.rs index cc9e5fd..3f092da 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -3,8 +3,8 @@ use std::fs; use std::path::Path; use std::sync::Mutex; -use rusqlite::params; use r2d2_sqlite::SqliteConnectionManager; +use rusqlite::params; use super::errors::Error; @@ -32,10 +32,12 @@ pub fn create_main_database_if_needed() { fn create_main_tables_if_needed(conn: &DatabaseConnection) { let main_table_cmd = format!( - "CREATE TABLE IF NOT EXISTS {} ( + "CREATE TABLE IF NOT EXISTS {} ( id TEXT PRIMARY KEY, name TEXT - )", MAIN_TABLE); + )", + MAIN_TABLE + ); conn.execute(&main_table_cmd, params![]).expect("Couldn't create main table."); } @@ -83,55 +85,71 @@ fn create_room_tables_if_needed(conn: &DatabaseConnection) { // The `id` field is needed to make `rowid` stable, which is important because otherwise // the `id`s in this table won't correspond to those in the deleted messages table let messages_table_cmd = format!( - "CREATE TABLE IF NOT EXISTS {} ( + "CREATE TABLE IF NOT EXISTS {} ( id INTEGER PRIMARY KEY, public_key TEXT, data TEXT, signature TEXT - )", MESSAGES_TABLE); + )", + MESSAGES_TABLE + ); conn.execute(&messages_table_cmd, params![]).expect("Couldn't create messages table."); // Deleted messages let deleted_messages_table_cmd = format!( - "CREATE TABLE IF NOT EXISTS {} ( + "CREATE TABLE IF NOT EXISTS {} ( id INTEGER PRIMARY KEY - )", DELETED_MESSAGES_TABLE); - conn.execute(&deleted_messages_table_cmd, params![]).expect("Couldn't create deleted messages table."); + )", + DELETED_MESSAGES_TABLE + ); + conn.execute(&deleted_messages_table_cmd, params![]) + .expect("Couldn't create deleted messages table."); // Moderators let moderators_table_cmd = format!( - "CREATE TABLE IF NOT EXISTS {} ( + "CREATE TABLE IF NOT EXISTS {} ( public_key TEXT - )", MODERATORS_TABLE); + )", + MODERATORS_TABLE + ); conn.execute(&moderators_table_cmd, params![]).expect("Couldn't create moderators table."); // Block list let block_list_table_cmd = format!( - "CREATE TABLE IF NOT EXISTS {} ( + "CREATE TABLE IF NOT EXISTS {} ( public_key TEXT - )", BLOCK_LIST_TABLE); + )", + BLOCK_LIST_TABLE + ); conn.execute(&block_list_table_cmd, params![]).expect("Couldn't create block list table."); // Pending tokens // Note that a given public key can have multiple pending tokens let pending_tokens_table_cmd = format!( - "CREATE TABLE IF NOT EXISTS {} ( + "CREATE TABLE IF NOT EXISTS {} ( public_key STRING, timestamp INTEGER, token BLOB - )", PENDING_TOKENS_TABLE); - conn.execute(&pending_tokens_table_cmd, params![]).expect("Couldn't create pending tokens table."); + )", + PENDING_TOKENS_TABLE + ); + conn.execute(&pending_tokens_table_cmd, params![]) + .expect("Couldn't create pending tokens table."); // Tokens // The token is stored as hex here (rather than as bytes) because it's more convenient for lookup let tokens_table_cmd = format!( - "CREATE TABLE IF NOT EXISTS {} ( + "CREATE TABLE IF NOT EXISTS {} ( public_key STRING PRIMARY KEY, timestamp INTEGER, token TEXT - )", TOKENS_TABLE); + )", + TOKENS_TABLE + ); conn.execute(&tokens_table_cmd, params![]).expect("Couldn't create tokens table."); // Files let files_table_cmd = format!( - "CREATE TABLE IF NOT EXISTS {} ( + "CREATE TABLE IF NOT EXISTS {} ( id STRING PRIMARY KEY, timestamp INTEGER - )", FILES_TABLE); + )", + FILES_TABLE + ); conn.execute(&files_table_cmd, params![]).expect("Couldn't create files table."); } @@ -141,7 +159,9 @@ pub async fn prune_tokens_periodically() { let mut timer = tokio::time::interval(chrono::Duration::minutes(10).to_std().unwrap()); loop { timer.tick().await; - tokio::spawn(async { prune_tokens().await; }); + tokio::spawn(async { + prune_tokens().await; + }); } } @@ -149,7 +169,9 @@ pub async fn prune_pending_tokens_periodically() { let mut timer = tokio::time::interval(chrono::Duration::minutes(10).to_std().unwrap()); loop { timer.tick().await; - tokio::spawn(async { prune_pending_tokens().await; }); + tokio::spawn(async { + prune_pending_tokens().await; + }); } } @@ -157,7 +179,9 @@ pub async fn prune_files_periodically() { let mut timer = tokio::time::interval(chrono::Duration::days(1).to_std().unwrap()); loop { timer.tick().await; - tokio::spawn(async { prune_files(FILE_EXPIRATION).await; }); + tokio::spawn(async { + prune_files(FILE_EXPIRATION).await; + }); } } @@ -176,7 +200,7 @@ async fn prune_tokens() { let stmt = format!("DELETE FROM {} WHERE timestamp < (?1)", TOKENS_TABLE); let now = chrono::Utc::now().timestamp(); let expiration = now - TOKEN_EXPIRATION; - match conn.execute(&stmt, params![ expiration ]) { + match conn.execute(&stmt, params![expiration]) { Ok(_) => (), Err(e) => return println!("Couldn't prune tokens due to error: {}.", e) }; @@ -199,7 +223,7 @@ async fn prune_pending_tokens() { let stmt = format!("DELETE FROM {} WHERE timestamp < (?1)", PENDING_TOKENS_TABLE); let now = chrono::Utc::now().timestamp(); let expiration = now - PENDING_TOKEN_EXPIRATION; - match conn.execute(&stmt, params![ expiration ]) { + match conn.execute(&stmt, params![expiration]) { Ok(_) => (), Err(e) => return println!("Couldn't prune pending tokens due to error: {}.", e) }; @@ -207,7 +231,8 @@ async fn prune_pending_tokens() { println!("Pruned pending tokens."); } -pub async fn prune_files(file_expiration: i64) { // The expiration setting is passed in for testing purposes +pub async fn prune_files(file_expiration: i64) { + // The expiration setting is passed in for testing purposes let rooms = match get_all_room_ids().await { Ok(rooms) => rooms, Err(_) => return @@ -228,9 +253,7 @@ pub async fn prune_files(file_expiration: i64) { // The expiration setting is pa Ok(query) => query, Err(e) => return println!("Couldn't prune files due to error: {}.", e) }; - let rows = match query.query_map(params![ expiration ], |row| { - Ok(row.get(0)?) - }) { + let rows = match query.query_map(params![expiration], |row| Ok(row.get(0)?)) { Ok(rows) => rows, Err(e) => { return println!("Couldn't prune files due to error: {}.", e); @@ -238,8 +261,8 @@ pub async fn prune_files(file_expiration: i64) { // The expiration setting is pa }; let ids: Vec = rows.filter_map(|result| result.ok()).collect(); if !ids.is_empty() { - // Delete the files - let mut deleted_ids: Vec = vec![]; + // Delete the files + let mut deleted_ids: Vec = vec![]; for id in ids { match fs::remove_file(format!("files/{}", id)) { Ok(_) => deleted_ids.push(id), @@ -263,9 +286,7 @@ async fn get_all_room_ids() -> Result, Error> { // Query the database let raw_query = format!("SELECT id FROM {}", MAIN_TABLE); let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?; - let rows = match query.query_map(params![], |row| { - Ok(row.get(0)?) - }) { + let rows = match query.query_map(params![], |row| Ok(row.get(0)?)) { Ok(rows) => rows, Err(e) => { println!("Couldn't query database due to error: {}.", e); @@ -275,4 +296,4 @@ async fn get_all_room_ids() -> Result, Error> { let ids: Vec = rows.filter_map(|result| result.ok()).collect(); // Return return Ok(ids); -} \ No newline at end of file +} diff --git a/src/tests.rs b/src/tests.rs index 924c531..67f810e 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -14,7 +14,7 @@ macro_rules! aw { ($e:expr) => { tokio_test::block_on($e) }; - } +} fn perform_main_setup() { storage::create_main_database_if_needed(); @@ -29,7 +29,7 @@ fn set_up_test_room() { aw!(handlers::create_room(&test_room_id, &test_room_name)).unwrap(); let raw_path = format!("rooms/{}.db", test_room_id); let path = Path::new(&raw_path); - fs::read(path).unwrap(); // Fail if this doesn't exist + fs::read(path).unwrap(); // Fail if this doesn't exist } fn get_auth_token() -> (String, String) { @@ -45,7 +45,8 @@ fn get_auth_token() -> (String, String) { let challenge = aw!(handlers::get_auth_token_challenge(query_params, &pool)).unwrap(); // Generate a symmetric key let ephemeral_public_key = base64::decode(challenge.ephemeral_public_key).unwrap(); - let symmetric_key = aw!(crypto::get_x25519_symmetric_key(&ephemeral_public_key, &user_private_key)).unwrap(); + let symmetric_key = + aw!(crypto::get_x25519_symmetric_key(&ephemeral_public_key, &user_private_key)).unwrap(); // Decrypt the challenge let ciphertext = base64::decode(challenge.ciphertext).unwrap(); let plaintext = aw!(crypto::decrypt_aes_gcm(&ciphertext, &symmetric_key)).unwrap(); @@ -70,7 +71,8 @@ fn test_authorization() { Err(_) => () } // Try to claim the correct token - let response = aw!(handlers::claim_auth_token(&hex_user_public_key, &auth_token, &pool)).unwrap(); + let response = + aw!(handlers::claim_auth_token(&hex_user_public_key, &auth_token, &pool)).unwrap(); assert_eq!(response.status(), StatusCode::OK); } @@ -87,7 +89,7 @@ fn test_file_handling() { // Check that there's a file record let conn = pool.get().unwrap(); let raw_query = format!("SELECT id FROM {}", storage::FILES_TABLE); - let id: String = conn.query_row(&raw_query, params![], |row| { Ok(row.get(0)?) }).unwrap(); + let id: String = conn.query_row(&raw_query, params![], |row| Ok(row.get(0)?)).unwrap(); // Retrieve the file and check the content let base64_encoded_file = aw!(handlers::get_file(&id, &auth_token, &pool)).unwrap().result; assert_eq!(base64_encoded_file, TEST_FILE); @@ -100,7 +102,7 @@ fn test_file_handling() { // Check that the file record is also gone let conn = pool.get().unwrap(); let raw_query = format!("SELECT id FROM {}", storage::FILES_TABLE); - let result: Result = conn.query_row(&raw_query, params![], |row| { Ok(row.get(0)?) }); + let result: Result = conn.query_row(&raw_query, params![], |row| Ok(row.get(0)?)); match result { Ok(_) => assert!(false), // It should be gone now Err(_) => ()