Implement auth token verification

This commit is contained in:
Niels Andriesse 2021-03-18 09:35:51 +11:00
parent d0b0106173
commit 8ece0a549c
3 changed files with 95 additions and 40 deletions

View File

@ -96,7 +96,7 @@ pub async fn claim_token(public_key: String, token: String, pool: &storage::Data
};
// Store the claimed token
let stmt = format!("INSERT OR REPLACE INTO {} (public_key, token) VALUES (?1, ?2)", storage::TOKENS_TABLE);
match tx.execute(&stmt, params![ public_key, token ]) {
match tx.execute(&stmt, params![ public_key, hex::encode(token) ]) {
Ok(_) => (),
Err(e) => {
println!("Couldn't insert token due to error: {}.", e);
@ -110,21 +110,25 @@ pub async fn claim_token(public_key: String, token: String, pool: &storage::Data
}
/// Inserts the given `message` into the database if it's valid.
pub async fn insert_message(mut message: models::Message, pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
pub async fn insert_message(mut message: models::Message, auth_token: Option<String>, pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
// Validate the message
if !message.is_valid() {
println!("Ignoring invalid message.");
return Err(warp::reject::custom(Error::ValidationFailed));
}
// TODO: Check that the requesting user isn't banned
// Check permissions
if auth_token == None { return Err(warp::reject::custom(Error::Unauthorized)); }
let auth_token = auth_token.unwrap();
let public_key = get_public_key_for_token(&auth_token, pool).await?;
if public_key == None { return Err(warp::reject::custom(Error::Unauthorized)); }
let public_key = public_key.unwrap();
if is_banned(&public_key, pool).await? { return Err(warp::reject::custom(Error::Unauthorized)); }
// Get a connection and open a transaction
let mut conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
let tx = conn.transaction().map_err(|_| Error::DatabaseFailedInternally)?;
// Insert the message
let stmt = format!("INSERT INTO {} (text) VALUES (?1)", storage::MESSAGES_TABLE);
match tx.execute(&stmt, params![ message.text ]) {
let stmt = format!("INSERT INTO {} (public_key, text) VALUES (?1, ?2)", storage::MESSAGES_TABLE);
match tx.execute(&stmt, params![ &public_key, message.text ]) {
Ok(_) => (),
Err(e) => {
println!("Couldn't insert message due to error: {}.", e);
@ -169,13 +173,34 @@ pub async fn get_messages(options: rpc::QueryOptions, pool: &storage::DatabaseCo
}
/// Deletes the message with the given `row_id` from the database, if it's present.
pub async fn delete_message(row_id: i64, pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
// TODO: Check that the requesting user has permission (either it's their own message or they're a moderator)
pub async fn delete_message(row_id: i64, auth_token: Option<String>, pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
// Get a connection and open a transaction
let mut conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
let tx = conn.transaction().map_err(|_| Error::DatabaseFailedInternally)?;
// Check permissions
if auth_token == None { return Err(warp::reject::custom(Error::Unauthorized)); }
let auth_token = auth_token.unwrap();
let public_key = get_public_key_for_token(&auth_token, pool).await?;
if public_key == None { return Err(warp::reject::custom(Error::Unauthorized)); }
let public_key = public_key.unwrap();
if is_banned(&public_key, pool).await? { return Err(warp::reject::custom(Error::Unauthorized)); }
let sender: Option<String> = {
let raw_query = format!("SELECT public_key FROM {} WHERE rowid = (?1)", storage::MESSAGES_TABLE);
let mut query = tx.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?;
let rows = match query.query_map(params![ row_id ], |row| {
Ok(row.get(0)?)
}) {
Ok(rows) => rows,
Err(e) => {
println!("Couldn't delete message due to error: {}.", e);
return Err(warp::reject::custom(Error::DatabaseFailedInternally));
}
};
let public_keys: Vec<String> = rows.filter_map(|result| result.ok()).collect();
public_keys.get(0).map(|s| s.to_string())
};
if sender == None { return Err(warp::reject::custom(Error::Unauthorized)); }
if !is_moderator(&public_key, pool).await? && public_key != sender.unwrap() { return Err(warp::reject::custom(Error::Unauthorized)); }
// Delete the message if it's present
let stmt = format!("DELETE FROM {} WHERE rowid = (?1)", storage::MESSAGES_TABLE);
let count = match tx.execute(&stmt, params![ row_id ]) {
@ -238,15 +263,20 @@ pub async fn get_moderators(pool: &storage::DatabaseConnectionPool) -> Result<Re
}
/// Bans the given `public_key`, if the requesting user is a moderator.
pub async fn ban(public_key: &str, pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
pub async fn ban(public_key: &str, auth_token: Option<String>, pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
// Validate the public key
if !is_valid_public_key(&public_key) {
println!("Ignoring ban request for invalid public key.");
return Err(warp::reject::custom(Error::ValidationFailed));
}
// TODO: Check that the requesting user is a moderator
// Check permissions
if auth_token == None { return Err(warp::reject::custom(Error::Unauthorized)); }
let auth_token = auth_token.unwrap();
let public_key = get_public_key_for_token(&auth_token, pool).await?;
if public_key == None { return Err(warp::reject::custom(Error::Unauthorized)); }
let public_key = public_key.unwrap();
if is_banned(&public_key, pool).await? { return Err(warp::reject::custom(Error::Unauthorized)); }
if !is_moderator(&public_key, pool).await? { return Err(warp::reject::custom(Error::Unauthorized)); }
// Don't double ban public keys
if is_banned(&public_key, pool).await? { return Ok(StatusCode::OK.into_response()); }
// Get a connection and open a transaction
@ -268,15 +298,20 @@ pub async fn ban(public_key: &str, pool: &storage::DatabaseConnectionPool) -> Re
}
/// Unbans the given `public_key`, if the requesting user is a moderator.
pub async fn unban(public_key: &str, pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
pub async fn unban(public_key: &str, auth_token: Option<String>, pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
// Validate the public key
if !is_valid_public_key(&public_key) {
println!("Ignoring unban request for invalid public key.");
return Err(warp::reject::custom(Error::ValidationFailed));
}
// TODO: Check that the requesting user is a moderator
// Check permissions
if auth_token == None { return Err(warp::reject::custom(Error::Unauthorized)); }
let auth_token = auth_token.unwrap();
let public_key = get_public_key_for_token(&auth_token, pool).await?;
if public_key == None { return Err(warp::reject::custom(Error::Unauthorized)); }
let public_key = public_key.unwrap();
if is_banned(&public_key, pool).await? { return Err(warp::reject::custom(Error::Unauthorized)); }
if !is_moderator(&public_key, pool).await? { return Err(warp::reject::custom(Error::Unauthorized)); }
// Don't double unban public keys
if !is_banned(&public_key, pool).await? { return Ok(StatusCode::OK.into_response()); }
// Get a connection and open a transaction
@ -299,9 +334,6 @@ pub async fn unban(public_key: &str, pool: &storage::DatabaseConnectionPool) ->
/// Returns the full list of banned public keys.
pub async fn get_banned_public_keys(pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
// TODO: Check that the requesting user is a moderator
let public_keys = get_banned_public_keys_vector(pool).await?;
return Ok(warp::reply::json(&public_keys).into_response());
}
@ -313,7 +345,7 @@ pub async fn get_member_count(pool: &storage::DatabaseConnectionPool) -> Result<
// Utilities
pub async fn get_moderators_vector(pool: &storage::DatabaseConnectionPool) -> Result<Vec<String>, Rejection> {
async fn get_moderators_vector(pool: &storage::DatabaseConnectionPool) -> Result<Vec<String>, Rejection> {
// Get a database connection
let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
// Query the database
@ -332,12 +364,12 @@ pub async fn get_moderators_vector(pool: &storage::DatabaseConnectionPool) -> Re
return Ok(rows.filter_map(|result| result.ok()).collect());
}
pub async fn is_moderator(public_key: &str, pool: &storage::DatabaseConnectionPool) -> Result<bool, Rejection> {
async fn is_moderator(public_key: &str, pool: &storage::DatabaseConnectionPool) -> Result<bool, Rejection> {
let public_keys = get_moderators_vector(&pool).await?;
return Ok(public_keys.contains(&public_key.to_owned()));
}
pub async fn get_banned_public_keys_vector(pool: &storage::DatabaseConnectionPool) -> Result<Vec<String>, Rejection> {
async fn get_banned_public_keys_vector(pool: &storage::DatabaseConnectionPool) -> Result<Vec<String>, Rejection> {
// Get a database connection
let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
// Query the database
@ -356,16 +388,36 @@ pub async fn get_banned_public_keys_vector(pool: &storage::DatabaseConnectionPoo
return Ok(rows.filter_map(|result| result.ok()).collect());
}
pub async fn is_banned(public_key: &str, pool: &storage::DatabaseConnectionPool) -> Result<bool, Rejection> {
async fn is_banned(public_key: &str, pool: &storage::DatabaseConnectionPool) -> Result<bool, Rejection> {
let public_keys = get_banned_public_keys_vector(&pool).await?;
return Ok(public_keys.contains(&public_key.to_owned()));
}
pub fn is_valid_public_key(public_key: &str) -> bool {
fn is_valid_public_key(public_key: &str) -> bool {
// Check that it's a valid hex encoding
if hex::decode(public_key).is_err() { return false; }
// Check that it's the right length
if public_key.len() != 66 { return false } // The version byte + 32 bytes of random data
// It appears to be a valid public key
return true
}
async fn get_public_key_for_token(token: &str, pool: &storage::DatabaseConnectionPool) -> Result<Option<String>, Rejection> {
// Get a database connection
let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
// Query the database
let raw_query = format!("SELECT public_key FROM {} WHERE token = (?1)", storage::TOKENS_TABLE);
let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?;
let rows = match query.query_map(params![ token ], |row| {
Ok(row.get(0)?)
}) {
Ok(rows) => rows,
Err(e) => {
println!("Couldn't query database due to error: {}.", e);
return Err(warp::reject::custom(Error::DatabaseFailedInternally));
}
};
let public_keys: Vec<String> = rows.filter_map(|result| result.ok()).collect();
// Return
return Ok(public_keys.get(0).map(|s| s.to_string()));
}

View File

@ -30,11 +30,13 @@ pub async fn handle_rpc_call(rpc_call: RpcCall, pool: &storage::DatabaseConnecti
return Err(warp::reject::custom(Error::InvalidRpcCall));
}
};
// Get the auth token if possible
let auth_token = get_auth_token(&rpc_call);
// Switch on the HTTP method
match rpc_call.method.as_ref() {
"GET" => return handle_get_request(rpc_call, uri, pool).await,
"POST" => return handle_post_request(rpc_call, uri, pool).await,
"DELETE" => return handle_delete_request(rpc_call, uri, pool).await,
"GET" => return handle_get_request(rpc_call, uri, auth_token, pool).await,
"POST" => return handle_post_request(rpc_call, uri, auth_token, pool).await,
"DELETE" => return handle_delete_request(rpc_call, uri, auth_token, pool).await,
_ => {
println!("Ignoring RPC call with invalid or unused HTTP method: {}.", rpc_call.method);
return Err(warp::reject::custom(Error::InvalidRpcCall));
@ -42,7 +44,7 @@ pub async fn handle_rpc_call(rpc_call: RpcCall, pool: &storage::DatabaseConnecti
}
}
async fn handle_get_request(rpc_call: RpcCall, uri: http::Uri, pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
async fn handle_get_request(rpc_call: RpcCall, uri: http::Uri, auth_token: Option<String>, pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
// Parse query options if needed
let mut query_options = QueryOptions { limit : None, from_server_id : None };
if let Some(query) = uri.query() {
@ -88,7 +90,7 @@ async fn handle_get_request(rpc_call: RpcCall, uri: http::Uri, pool: &storage::D
}
}
async fn handle_post_request(rpc_call: RpcCall, uri: http::Uri, pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
async fn handle_post_request(rpc_call: RpcCall, uri: http::Uri, auth_token: Option<String>, pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
match uri.path() {
"/messages" => {
let message = match serde_json::from_str(&rpc_call.body) {
@ -98,11 +100,11 @@ async fn handle_post_request(rpc_call: RpcCall, uri: http::Uri, pool: &storage::
return Err(warp::reject::custom(Error::InvalidRpcCall));
}
};
return handlers::insert_message(message, pool).await;
return handlers::insert_message(message, auth_token, pool).await;
},
"/block_list" => {
let public_key = rpc_call.body;
return handlers::ban(&public_key, pool).await;
return handlers::ban(&public_key, auth_token, pool).await;
},
_ => {
println!("Ignoring RPC call with invalid or unused endpoint: {}.", rpc_call.endpoint);
@ -111,7 +113,7 @@ async fn handle_post_request(rpc_call: RpcCall, uri: http::Uri, pool: &storage::
}
}
async fn handle_delete_request(rpc_call: RpcCall, uri: http::Uri, pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
async fn handle_delete_request(rpc_call: RpcCall, uri: http::Uri, auth_token: Option<String>, pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
// DELETE /messages/:server_id
if uri.path().starts_with("/messages") {
let components: Vec<&str> = uri.path()[1..].split("/").collect(); // Drop the leading slash and split on subsequent slashes
@ -126,7 +128,7 @@ async fn handle_delete_request(rpc_call: RpcCall, uri: http::Uri, pool: &storage
return Err(warp::reject::custom(Error::InvalidRpcCall));
}
};
return handlers::delete_message(server_id, pool).await;
return handlers::delete_message(server_id, auth_token, pool).await;
}
// DELETE /block_list/:public_key
if uri.path().starts_with("/block_list") {
@ -136,7 +138,7 @@ async fn handle_delete_request(rpc_call: RpcCall, uri: http::Uri, pool: &storage
return Err(warp::reject::custom(Error::InvalidRpcCall));
}
let public_key = components[1].to_string();
return handlers::unban(&public_key, pool).await;
return handlers::unban(&public_key, auth_token, pool).await;
}
// Unrecognized endpoint
println!("Ignoring RPC call with invalid or unused endpoint: {}.", rpc_call.endpoint);
@ -144,11 +146,11 @@ async fn handle_delete_request(rpc_call: RpcCall, uri: http::Uri, pool: &storage
}
// Utilities
fn get_auth_token(rpc_call: RpcCall) -> Option<String> {
fn get_auth_token(rpc_call: &RpcCall) -> Option<String> {
if rpc_call.headers.is_empty() { return None; }
let headers: HashMap<String, String> = match serde_json::from_str(&rpc_call.headers) {
Ok(headers) => headers,
Err(e) => return None
Err(_) => return None
};
let header = headers.get("Authorization");
if header == None { return None; }

View File

@ -21,6 +21,7 @@ pub fn create_tables_if_needed(conn: &DatabaseConnection) {
let messages_table_cmd = format!(
"CREATE TABLE IF NOT EXISTS {} (
id INTEGER PRIMARY KEY,
public_key TEXT,
text TEXT
)", MESSAGES_TABLE);
conn.execute(&messages_table_cmd, params![]).expect("Couldn't create messages table.");
@ -55,7 +56,7 @@ pub fn create_tables_if_needed(conn: &DatabaseConnection) {
let tokens_table_cmd = format!(
"CREATE TABLE IF NOT EXISTS {} (
public_key STRING PRIMARY KEY,
token BLOB
token TEXT
)", TOKENS_TABLE);
conn.execute(&tokens_table_cmd, params![]).expect("Couldn't create tokens table.");
}