Implement modes

This commit is contained in:
Niels Andriesse 2021-03-29 10:06:56 +11:00
parent 5993b21eb8
commit 46f0837439
2 changed files with 70 additions and 35 deletions

View File

@ -43,7 +43,6 @@ struct Opt {
}
// TODO: Rate limiting
// TODO: OGS mode & FS mode
// TODO: Distribute as binary
#[tokio::main]

View File

@ -8,6 +8,11 @@ use super::handlers;
use super::models;
use super::storage;
enum Mode {
FileServer,
OpenGroupServer,
}
#[derive(Deserialize, Serialize, Debug)]
pub struct RpcCall {
pub endpoint: String,
@ -16,6 +21,8 @@ pub struct RpcCall {
pub headers: HashMap<String, String>,
}
const MODE: Mode = Mode::OpenGroupServer;
pub async fn handle_rpc_call(rpc_call: RpcCall) -> Result<Response, Rejection> {
// Check that the endpoint is a valid URI and deconstruct it into a path
// and query parameters.
@ -39,17 +46,13 @@ pub async fn handle_rpc_call(rpc_call: RpcCall) -> Result<Response, Rejection> {
};
// Get the auth token if possible
let auth_token = get_auth_token(&rpc_call);
// Get a database connection pool
let pool = get_pool_for_room(&rpc_call)?;
// Switch on the HTTP method
match rpc_call.method.as_ref() {
"GET" => return handle_get_request(rpc_call, &path, auth_token, query_params).await,
"POST" => {
let pool = get_pool_for_room(&rpc_call)?;
return handle_post_request(rpc_call, &path, auth_token, &pool).await;
}
"DELETE" => {
let pool = get_pool_for_room(&rpc_call)?;
return handle_delete_request(rpc_call, &path, auth_token, &pool).await;
}
"GET" => return handle_get_request(rpc_call, &path, auth_token, query_params, &pool).await,
"POST" => return handle_post_request(rpc_call, &path, auth_token, &pool).await,
"DELETE" => return handle_delete_request(&path, auth_token, &pool).await,
_ => {
println!("Ignoring RPC call with invalid or unused HTTP method: {}.", rpc_call.method);
return Err(warp::reject::custom(Error::InvalidRpcCall));
@ -59,12 +62,11 @@ pub async fn handle_rpc_call(rpc_call: RpcCall) -> Result<Response, Rejection> {
async fn handle_get_request(
rpc_call: RpcCall, path: &str, auth_token: Option<String>,
query_params: HashMap<String, String>,
query_params: HashMap<String, String>, pool: &storage::DatabaseConnectionPool,
) -> Result<Response, Rejection> {
// Handle routes that don't require authorization first
if path == "auth_token_challenge" {
let pool = get_pool_for_room(&rpc_call)?;
let challenge = handlers::get_auth_token_challenge(query_params, &pool).await?;
let challenge = handlers::get_auth_token_challenge(query_params, pool).await?;
#[derive(Debug, Deserialize, Serialize)]
struct Response {
status_code: u16,
@ -73,6 +75,7 @@ async fn handle_get_request(
let response = Response { status_code: StatusCode::OK.as_u16(), challenge };
return Ok(warp::reply::json(&response).into_response());
} else if path.starts_with("rooms") {
reject_if_file_server_mode(path)?;
let components: Vec<&str> = path.split("/").collect(); // Split on subsequent slashes
if components.len() == 1 {
return handlers::get_all_rooms().await;
@ -87,7 +90,6 @@ async fn handle_get_request(
// Check that the auth token is present
let auth_token = auth_token.ok_or(warp::reject::custom(Error::NoAuthToken))?;
// Switch on the path
let pool = get_pool_for_room(&rpc_call)?;
if path.starts_with("files") {
let components: Vec<&str> = path.split("/").collect(); // Split on subsequent slashes
if components.len() != 2 {
@ -101,20 +103,33 @@ async fn handle_get_request(
return Err(warp::reject::custom(Error::InvalidRpcCall));
}
};
return handlers::get_file(file_id, &auth_token, &pool)
return handlers::get_file(file_id, &auth_token, pool)
.await
.map(|json| warp::reply::json(&json).into_response());
}
match path {
"messages" => return handlers::get_messages(query_params, &auth_token, &pool).await,
"deleted_messages" => {
return handlers::get_deleted_messages(query_params, &auth_token, &pool).await
"messages" => {
reject_if_file_server_mode(path)?;
return handlers::get_messages(query_params, &auth_token, pool).await;
}
"deleted_messages" => {
reject_if_file_server_mode(path)?;
return handlers::get_deleted_messages(query_params, &auth_token, pool).await;
}
"moderators" => {
reject_if_file_server_mode(path)?;
return handlers::get_moderators(&auth_token, pool).await;
}
"block_list" => {
reject_if_file_server_mode(path)?;
return handlers::get_banned_public_keys(&auth_token, pool).await;
}
"member_count" => {
reject_if_file_server_mode(path)?;
return handlers::get_member_count(&auth_token, pool).await;
}
"moderators" => return handlers::get_moderators(&auth_token, &pool).await,
"block_list" => return handlers::get_banned_public_keys(&auth_token, &pool).await,
"member_count" => return handlers::get_member_count(&auth_token, &pool).await,
"auth_token_challenge" => {
let challenge = handlers::get_auth_token_challenge(query_params, &pool).await?;
let challenge = handlers::get_auth_token_challenge(query_params, pool).await?;
#[derive(Debug, Deserialize, Serialize)]
struct Response {
status_code: u16,
@ -139,6 +154,7 @@ async fn handle_post_request(
// Switch on the path
match path {
"messages" => {
reject_if_file_server_mode(path)?;
let message = match serde_json::from_str(&rpc_call.body) {
Ok(message) => message,
Err(e) => {
@ -149,6 +165,7 @@ async fn handle_post_request(
return handlers::insert_message(message, &auth_token, pool).await;
}
"block_list" => {
reject_if_file_server_mode(path)?;
#[derive(Debug, Deserialize)]
struct JSON {
public_key: String,
@ -191,29 +208,29 @@ async fn handle_post_request(
return handlers::store_file(&json.file, &auth_token, pool).await;
}
_ => {
println!("Ignoring RPC call with invalid or unused endpoint: {}.", rpc_call.endpoint);
println!("Ignoring RPC call with invalid or unused endpoint: {}.", path);
return Err(warp::reject::custom(Error::InvalidRpcCall));
}
}
}
async fn handle_delete_request(
rpc_call: RpcCall, path: &str, auth_token: Option<String>,
pool: &storage::DatabaseConnectionPool,
path: &str, auth_token: Option<String>, pool: &storage::DatabaseConnectionPool,
) -> Result<Response, Rejection> {
// Check that the auth token is present
let auth_token = auth_token.ok_or(warp::reject::custom(Error::NoAuthToken))?;
// DELETE /messages/:server_id
if path.starts_with("messages") {
reject_if_file_server_mode(path)?;
let components: Vec<&str> = path.split("/").collect(); // Split on subsequent slashes
if components.len() != 2 {
println!("Invalid endpoint: {}.", rpc_call.endpoint);
println!("Invalid endpoint: {}.", path);
return Err(warp::reject::custom(Error::InvalidRpcCall));
}
let server_id: i64 = match components[1].parse() {
Ok(server_id) => server_id,
Err(_) => {
println!("Invalid endpoint: {}.", rpc_call.endpoint);
println!("Invalid endpoint: {}.", path);
return Err(warp::reject::custom(Error::InvalidRpcCall));
}
};
@ -221,9 +238,10 @@ async fn handle_delete_request(
}
// DELETE /block_list/:public_key
if path.starts_with("block_list") {
reject_if_file_server_mode(path)?;
let components: Vec<&str> = path.split("/").collect(); // Split on subsequent slashes
if components.len() != 2 {
println!("Invalid endpoint: {}.", rpc_call.endpoint);
println!("Invalid endpoint: {}.", path);
return Err(warp::reject::custom(Error::InvalidRpcCall));
}
let public_key = components[1].to_string();
@ -234,20 +252,28 @@ async fn handle_delete_request(
return handlers::delete_auth_token(&auth_token, pool).await;
}
// Unrecognized endpoint
println!("Ignoring RPC call with invalid or unused endpoint: {}.", rpc_call.endpoint);
println!("Ignoring RPC call with invalid or unused endpoint: {}.", path);
return Err(warp::reject::custom(Error::InvalidRpcCall));
}
// Utilities
fn get_pool_for_room(rpc_call: &RpcCall) -> Result<storage::DatabaseConnectionPool, Rejection> {
let room_id = match get_room_id(&rpc_call) {
Some(room_id) => room_id,
None => {
println!("Missing room ID.");
return Err(warp::reject::custom(Error::InvalidRpcCall));
let room_id: String;
match MODE {
// In file server mode we don't have a concept of rooms, but for convenience we just
// always use the main room
Mode::FileServer => room_id = "main".to_string(),
Mode::OpenGroupServer => {
room_id = match get_room_id(&rpc_call) {
Some(room_id) => room_id,
None => {
println!("Missing room ID.");
return Err(warp::reject::custom(Error::InvalidRpcCall));
}
};
}
};
}
return Ok(storage::pool_by_room_id(&room_id));
}
@ -264,3 +290,13 @@ fn get_room_id(rpc_call: &RpcCall) -> Option<String> {
}
return rpc_call.headers.get("Room").map(|s| s.to_string());
}
fn reject_if_file_server_mode(path: &str) -> Result<(), Rejection> {
match MODE {
Mode::FileServer => {
println!("Ignoring RPC call with invalid or unused endpoint: {}.", path);
return Err(warp::reject::custom(Error::InvalidRpcCall));
}
Mode::OpenGroupServer => return Ok(()),
}
}