session-open-group-server/src/handlers.rs

240 lines
9.9 KiB
Rust
Raw Normal View History

2021-03-11 00:50:17 +01:00
use regex::Regex;
2021-03-10 03:08:34 +01:00
use rusqlite::params;
2021-03-12 05:46:06 +01:00
use warp::{Rejection, http::StatusCode, reply::Reply, reply::Response};
2021-03-10 03:08:34 +01:00
2021-03-12 06:40:24 +01:00
use super::errors::Error;
2021-03-10 03:08:34 +01:00
use super::models;
2021-03-12 05:11:12 +01:00
use super::rpc;
2021-03-10 03:08:34 +01:00
use super::storage;
/// Inserts the given `message` into the database if it's valid.
2021-03-12 05:46:06 +01:00
pub async fn insert_message(mut message: models::Message, pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
2021-03-10 03:08:34 +01:00
// Validate the message
2021-03-11 04:20:36 +01:00
if !message.is_valid() {
println!("Ignoring invalid message.");
2021-03-12 06:40:24 +01:00
return Err(warp::reject::custom(Error::ValidationFailed));
2021-03-11 04:20:36 +01:00
}
2021-03-11 00:50:17 +01:00
// TODO: Check that the requesting user isn't banned
2021-03-10 05:38:32 +01:00
// Get a connection and open a transaction
2021-03-12 05:11:12 +01:00
let mut conn = storage::conn(pool)?;
2021-03-10 05:38:32 +01:00
let tx = storage::tx(&mut conn)?;
2021-03-10 03:08:34 +01:00
// Insert the message
2021-03-10 06:29:56 +01:00
let stmt = format!("INSERT INTO {} (text) VALUES (?1)", storage::MESSAGES_TABLE);
storage::exec(&stmt, params![ message.text ], &tx)?;
2021-03-10 05:50:24 +01:00
let id = tx.last_insert_rowid();
2021-03-10 05:13:58 +01:00
message.server_id = Some(id);
// Commit
2021-03-10 05:50:24 +01:00
tx.commit(); // TODO: Unwrap
2021-03-10 05:13:58 +01:00
// Return
2021-03-12 05:46:06 +01:00
return Ok(warp::reply::json(&message).into_response());
2021-03-10 03:08:34 +01:00
}
/// Returns either the last `limit` messages or all messages since `from_server_id, limited to `limit`.
2021-03-12 05:46:06 +01:00
pub async fn get_messages(options: rpc::QueryOptions, pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
2021-03-10 03:08:34 +01:00
// Get a database connection
2021-03-12 05:11:12 +01:00
let conn = storage::conn(pool)?;
2021-03-10 04:06:17 +01:00
// Unwrap parameters
let from_server_id = options.from_server_id.unwrap_or(0);
2021-03-10 03:08:34 +01:00
let limit = options.limit.unwrap_or(256); // Never return more than 256 messages at once
2021-03-10 04:06:17 +01:00
// Query the database
2021-03-10 06:29:56 +01:00
let raw_query: String;
2021-03-10 04:06:17 +01:00
if options.from_server_id.is_some() {
2021-03-10 06:29:56 +01:00
raw_query = format!("SELECT id, text FROM {} WHERE rowid > (?1) LIMIT (?2)", storage::MESSAGES_TABLE);
2021-03-10 04:06:17 +01:00
} else {
2021-03-10 06:29:56 +01:00
raw_query = format!("SELECT id, text FROM {} ORDER BY rowid DESC LIMIT (?2)", storage::MESSAGES_TABLE);
2021-03-10 04:06:17 +01:00
}
2021-03-10 06:17:03 +01:00
let mut query = storage::query(&raw_query, &conn)?;
2021-03-10 06:29:56 +01:00
let rows = match query.query_map(params![ from_server_id, limit ], |row| {
2021-03-10 04:21:44 +01:00
Ok(models::Message { server_id : row.get(0)?, text : row.get(1)? })
2021-03-10 03:08:34 +01:00
}) {
Ok(rows) => rows,
Err(e) => {
println!("Couldn't query database due to error: {:?}.", e);
2021-03-12 06:40:24 +01:00
return Err(warp::reject::custom(Error::DatabaseFailedInternally));
2021-03-10 03:08:34 +01:00
}
};
2021-03-10 23:56:32 +01:00
let messages: Vec<models::Message> = rows.filter_map(|result| result.ok()).collect();
2021-03-10 03:08:34 +01:00
// Return the messages
2021-03-12 05:46:06 +01:00
return Ok(warp::reply::json(&messages).into_response());
2021-03-10 03:29:04 +01:00
}
/// Deletes the message with the given `row_id` from the database, if it's present.
2021-03-12 05:46:06 +01:00
pub async fn delete_message(row_id: i64, pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
2021-03-11 01:02:28 +01:00
// TODO: Check that the requesting user has permission (either it's their own message or they're a moderator)
2021-03-10 05:38:32 +01:00
// Get a connection and open a transaction
2021-03-12 05:11:12 +01:00
let mut conn = storage::conn(pool)?;
2021-03-10 05:38:32 +01:00
let tx = storage::tx(&mut conn)?;
2021-03-10 03:29:04 +01:00
// Delete the message if it's present
2021-03-10 06:29:56 +01:00
let stmt = format!("DELETE FROM {} WHERE rowid = (?1)", storage::MESSAGES_TABLE);
let count = storage::exec(&stmt, params![ row_id ], &tx)?;
2021-03-10 04:55:58 +01:00
// Update the deletions table if needed
2021-03-10 05:38:32 +01:00
if count > 0 {
2021-03-10 06:29:56 +01:00
let stmt = format!("INSERT INTO {} (id) VALUES (?1)", storage::DELETED_MESSAGES_TABLE);
storage::exec(&stmt, params![ row_id ], &tx)?;
2021-03-10 03:29:04 +01:00
}
2021-03-10 04:55:58 +01:00
// Commit
2021-03-10 05:50:24 +01:00
tx.commit(); // TODO: Unwrap
2021-03-10 04:55:58 +01:00
// Return
2021-03-12 05:46:06 +01:00
return Ok(StatusCode::OK.into_response());
}
/// Returns either the last `limit` deleted messages or all deleted messages since `from_server_id, limited to `limit`.
2021-03-12 05:46:06 +01:00
pub async fn get_deleted_messages(options: rpc::QueryOptions, pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
// Get a database connection
2021-03-12 05:11:12 +01:00
let conn = storage::conn(pool)?;
// Unwrap parameters
let from_server_id = options.from_server_id.unwrap_or(0);
let limit = options.limit.unwrap_or(256); // Never return more than 256 deleted messages at once
// Query the database
2021-03-10 06:29:56 +01:00
let raw_query: String;
if options.from_server_id.is_some() {
2021-03-10 06:29:56 +01:00
raw_query = format!("SELECT id FROM {} WHERE rowid > (?1) LIMIT (?2)", storage::DELETED_MESSAGES_TABLE);
} else {
2021-03-10 06:29:56 +01:00
raw_query = format!("SELECT id FROM {} ORDER BY rowid DESC LIMIT (?2)", storage::DELETED_MESSAGES_TABLE);
}
2021-03-10 06:17:03 +01:00
let mut query = storage::query(&raw_query, &conn)?;
2021-03-10 06:29:56 +01:00
let rows = match query.query_map(params![ from_server_id, limit ], |row| {
Ok(row.get(0)?)
}) {
Ok(rows) => rows,
Err(e) => {
println!("Couldn't query database due to error: {:?}.", e);
2021-03-12 06:40:24 +01:00
return Err(warp::reject::custom(Error::DatabaseFailedInternally));
}
};
2021-03-10 23:56:32 +01:00
let ids: Vec<i64> = rows.filter_map(|result| result.ok()).collect();
// Return the IDs
2021-03-12 05:46:06 +01:00
return Ok(warp::reply::json(&ids).into_response());
2021-03-11 00:06:09 +01:00
}
/// Returns the full list of moderators.
2021-03-12 05:46:06 +01:00
pub async fn get_moderators(pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
let public_keys = get_moderators_vector(pool).await?;
return Ok(warp::reply::json(&public_keys).into_response());
2021-03-11 00:38:02 +01:00
}
/// Bans the given `public_key`, if the requesting user is a moderator.
2021-03-12 05:46:06 +01:00
pub async fn ban(public_key: String, pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
2021-03-11 00:50:17 +01:00
// Validate the public key
2021-03-11 04:20:36 +01:00
if !is_valid_public_key(&public_key) {
println!("Ignoring ban request for invalid public key.");
2021-03-12 06:40:24 +01:00
return Err(warp::reject::custom(Error::ValidationFailed));
2021-03-11 04:20:36 +01:00
}
2021-03-11 00:50:17 +01:00
2021-03-11 01:02:28 +01:00
// TODO: Check that the requesting user is a moderator
2021-03-11 00:50:17 +01:00
2021-03-11 01:13:37 +01:00
// Don't double ban public keys
2021-03-12 05:46:06 +01:00
if is_banned(&public_key, pool).await? { return Ok(StatusCode::OK.into_response()); }
2021-03-11 00:50:17 +01:00
// Get a connection and open a transaction
2021-03-12 05:11:12 +01:00
let mut conn = storage::conn(pool)?;
2021-03-11 00:50:17 +01:00
let tx = storage::tx(&mut conn)?;
// Insert the message
let stmt = format!("INSERT INTO {} (public_key) VALUES (?1)", storage::BLOCK_LIST_TABLE);
storage::exec(&stmt, params![ public_key ], &tx)?;
// Commit
tx.commit(); // TODO: Unwrap
// Return
2021-03-12 05:46:06 +01:00
return Ok(StatusCode::OK.into_response());
2021-03-11 00:38:02 +01:00
}
/// Unbans the given `public_key`, if the requesting user is a moderator.
2021-03-12 05:46:06 +01:00
pub async fn unban(public_key: String, pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
2021-03-11 00:50:17 +01:00
// Validate the public key
2021-03-11 04:20:36 +01:00
if !is_valid_public_key(&public_key) {
println!("Ignoring unban request for invalid public key.");
2021-03-12 06:40:24 +01:00
return Err(warp::reject::custom(Error::ValidationFailed));
2021-03-11 04:20:36 +01:00
}
2021-03-11 00:50:17 +01:00
2021-03-11 01:02:28 +01:00
// TODO: Check that the requesting user is a moderator
2021-03-11 00:50:17 +01:00
2021-03-12 05:11:12 +01:00
// Don't double unban public keys
2021-03-12 05:46:06 +01:00
if !is_banned(&public_key, pool).await? { return Ok(StatusCode::OK.into_response()); }
2021-03-11 00:50:17 +01:00
// Get a connection and open a transaction
2021-03-12 05:11:12 +01:00
let mut conn = storage::conn(pool)?;
2021-03-11 00:50:17 +01:00
let tx = storage::tx(&mut conn)?;
// Insert the message
let stmt = format!("DELETE FROM {} WHERE public_key = (?1)", storage::BLOCK_LIST_TABLE);
storage::exec(&stmt, params![ public_key ], &tx)?;
// Commit
tx.commit(); // TODO: Unwrap
// Return
2021-03-12 05:46:06 +01:00
return Ok(StatusCode::OK.into_response());
2021-03-11 00:38:02 +01:00
}
2021-03-11 01:02:28 +01:00
/// Returns the full list of banned public keys.
2021-03-12 05:46:06 +01:00
pub async fn get_banned_public_keys(pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
2021-03-11 01:02:28 +01:00
// TODO: Check that the requesting user is a moderator
2021-03-12 05:46:06 +01:00
let public_keys = get_banned_public_keys_vector(pool).await?;
return Ok(warp::reply::json(&public_keys).into_response());
2021-03-11 01:02:28 +01:00
}
2021-03-12 05:46:06 +01:00
pub async fn get_member_count(pool: &storage::DatabaseConnectionPool) -> Result<Response, Rejection> {
2021-03-11 01:11:49 +01:00
let member_count = 5; // TODO: Implement
2021-03-12 05:46:06 +01:00
return Ok(warp::reply::json(&member_count).into_response());
2021-03-11 01:10:49 +01:00
}
2021-03-11 00:38:02 +01:00
// Utilities
2021-03-12 05:46:06 +01:00
pub async fn get_moderators_vector(pool: &storage::DatabaseConnectionPool) -> Result<Vec<String>, Rejection> {
2021-03-11 00:06:09 +01:00
// Get a database connection
let conn = storage::conn(&pool)?;
// Query the database
let raw_query = format!("SELECT public_key FROM {}", storage::MODERATORS_TABLE);
let mut query = storage::query(&raw_query, &conn)?;
let rows = match query.query_map(params![], |row| {
Ok(row.get(0)?)
}) {
Ok(rows) => rows,
Err(e) => {
println!("Couldn't query database due to error: {:?}.", e);
2021-03-12 06:40:24 +01:00
return Err(warp::reject::custom(Error::DatabaseFailedInternally));
2021-03-11 00:06:09 +01:00
}
};
2021-03-11 00:38:02 +01:00
// Return
return Ok(rows.filter_map(|result| result.ok()).collect());
}
2021-03-12 05:46:06 +01:00
pub async fn is_moderator(public_key: &str, pool: &storage::DatabaseConnectionPool) -> Result<bool, Rejection> {
let public_keys = get_moderators_vector(&pool).await?;
2021-03-11 00:38:02 +01:00
return Ok(public_keys.contains(&public_key.to_owned()));
2021-03-11 00:50:17 +01:00
}
2021-03-12 05:46:06 +01:00
pub async fn get_banned_public_keys_vector(pool: &storage::DatabaseConnectionPool) -> Result<Vec<String>, Rejection> {
2021-03-11 01:02:28 +01:00
// Get a database connection
let conn = storage::conn(&pool)?;
// Query the database
let raw_query = format!("SELECT public_key FROM {}", storage::BLOCK_LIST_TABLE);
let mut query = storage::query(&raw_query, &conn)?;
let rows = match query.query_map(params![], |row| {
Ok(row.get(0)?)
}) {
Ok(rows) => rows,
Err(e) => {
println!("Couldn't query database due to error: {:?}.", e);
2021-03-12 06:40:24 +01:00
return Err(warp::reject::custom(Error::DatabaseFailedInternally));
2021-03-11 01:02:28 +01:00
}
};
// Return
return Ok(rows.filter_map(|result| result.ok()).collect());
}
2021-03-12 05:46:06 +01:00
pub async fn is_banned(public_key: &str, pool: &storage::DatabaseConnectionPool) -> Result<bool, Rejection> {
let public_keys = get_banned_public_keys_vector(&pool).await?;
2021-03-11 01:02:28 +01:00
return Ok(public_keys.contains(&public_key.to_owned()));
}
2021-03-11 00:50:17 +01:00
pub fn is_valid_public_key(public_key: &str) -> bool {
2021-03-11 01:02:28 +01:00
// Check that it's a valid hex encoding
2021-03-11 00:50:17 +01:00
let re = Regex::new(r"^[0-9a-fA-F]+$").unwrap(); // Force
2021-03-11 01:02:28 +01:00
if !re.is_match(public_key) { return false; };
// Check that it's the right length
if public_key.len() != 66 { return false } // The version byte + 32 bytes of random data
// It appears to be a valid public key
return true
2021-03-10 03:08:34 +01:00
}