Implement rough RPC layer

This commit is contained in:
Niels Andriesse 2021-03-12 15:11:12 +11:00
parent b81e9e927a
commit 5ee4907669
7 changed files with 172 additions and 176 deletions

5
Cargo.lock generated
View File

@ -1120,6 +1120,7 @@ dependencies = [
"curve25519-parser",
"hex",
"hmac",
"http",
"r2d2",
"r2d2_sqlite",
"regex",
@ -1275,9 +1276,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.2.0"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8190d04c665ea9e6b6a0dc45523ade572c088d2e6566244c1122671dbf4ae3a"
checksum = "8d56477f6ed99e10225f38f9f75f872f29b8b8bd8c0b946f63345bb144e9eeda"
dependencies = [
"autocfg",
"bytes",

View File

@ -5,17 +5,18 @@ authors = ["Niels Andriesse <niels@oxen.io>"]
edition = "2018"
[dependencies]
aes-gcm = "0.8.0"
curve25519-parser = "0.2.0"
hex = "0.4.3"
hmac = "0.10.1"
aes-gcm = "0.8"
curve25519-parser = "0.2"
hex = "0.4"
hmac = "0.10"
http = "0.2"
regex = "1.4"
rusqlite = "0.24"
r2d2_sqlite = "0.17.0"
r2d2 = "0.8.9"
r2d2_sqlite = "0.17"
r2d2 = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.9.3"
tokio = { version = "1", features = ["full"] }
sha2 = "0.9"
tokio = { version = "1.3", features = ["full"] }
warp = "0.3"
x25519-dalek = "1.1.0"
x25519-dalek = "1.1"

View File

@ -3,7 +3,7 @@ use rusqlite::params;
use warp::{Rejection, http::StatusCode};
use super::models;
use super::routes;
use super::rpc;
use super::storage;
#[derive(Debug)]
@ -11,7 +11,7 @@ pub struct UnauthorizedError;
impl warp::reject::Reject for UnauthorizedError { }
/// Inserts the given `message` into the database if it's valid.
pub async fn insert_message(mut message: models::Message, pool: storage::DatabaseConnectionPool) -> Result<impl warp::Reply, Rejection> {
pub async fn insert_message(mut message: models::Message, pool: &storage::DatabaseConnectionPool) -> Result<warp::reply::Json, Rejection> {
// Validate the message
if !message.is_valid() {
println!("Ignoring invalid message.");
@ -21,7 +21,7 @@ pub async fn insert_message(mut message: models::Message, pool: storage::Databas
// TODO: Check that the requesting user isn't banned
// Get a connection and open a transaction
let mut conn = storage::conn(&pool)?;
let mut conn = storage::conn(pool)?;
let tx = storage::tx(&mut conn)?;
// Insert the message
let stmt = format!("INSERT INTO {} (text) VALUES (?1)", storage::MESSAGES_TABLE);
@ -35,9 +35,9 @@ pub async fn insert_message(mut message: models::Message, pool: storage::Databas
}
/// Returns either the last `limit` messages or all messages since `from_server_id, limited to `limit`.
pub async fn get_messages(options: routes::QueryOptions, pool: storage::DatabaseConnectionPool) -> Result<impl warp::Reply, Rejection> {
pub async fn get_messages(options: rpc::QueryOptions, pool: &storage::DatabaseConnectionPool) -> Result<warp::reply::Json, Rejection> {
// Get a database connection
let conn = storage::conn(&pool)?;
let conn = storage::conn(pool)?;
// Unwrap parameters
let from_server_id = options.from_server_id.unwrap_or(0);
let limit = options.limit.unwrap_or(256); // Never return more than 256 messages at once
@ -64,12 +64,12 @@ pub async fn get_messages(options: routes::QueryOptions, pool: storage::Database
}
/// Deletes the message with the given `row_id` from the database, if it's present.
pub async fn delete_message(row_id: i64, pool: storage::DatabaseConnectionPool) -> Result<impl warp::Reply, Rejection> {
pub async fn delete_message(row_id: i64, pool: &storage::DatabaseConnectionPool) -> Result<StatusCode, Rejection> {
// TODO: Check that the requesting user has permission (either it's their own message or they're a moderator)
// Get a connection and open a transaction
let mut conn = storage::conn(&pool)?;
let mut conn = storage::conn(pool)?;
let tx = storage::tx(&mut conn)?;
// Delete the message if it's present
let stmt = format!("DELETE FROM {} WHERE rowid = (?1)", storage::MESSAGES_TABLE);
@ -86,9 +86,9 @@ pub async fn delete_message(row_id: i64, pool: storage::DatabaseConnectionPool)
}
/// Returns either the last `limit` deleted messages or all deleted messages since `from_server_id, limited to `limit`.
pub async fn get_deleted_messages(options: routes::QueryOptions, pool: storage::DatabaseConnectionPool) -> Result<impl warp::Reply, Rejection> {
pub async fn get_deleted_messages(options: rpc::QueryOptions, pool: &storage::DatabaseConnectionPool) -> Result<warp::reply::Json, Rejection> {
// Get a database connection
let conn = storage::conn(&pool)?;
let conn = storage::conn(pool)?;
// Unwrap parameters
let from_server_id = options.from_server_id.unwrap_or(0);
let limit = options.limit.unwrap_or(256); // Never return more than 256 deleted messages at once
@ -115,13 +115,13 @@ pub async fn get_deleted_messages(options: routes::QueryOptions, pool: storage::
}
/// Returns the full list of moderators.
pub async fn get_moderators(pool: storage::DatabaseConnectionPool) -> Result<impl warp::Reply, Rejection> {
let public_keys = get_moderators_vector(&pool)?;
pub async fn get_moderators(pool: &storage::DatabaseConnectionPool) -> Result<warp::reply::Json, Rejection> {
let public_keys = get_moderators_vector(pool)?;
return Ok(warp::reply::json(&public_keys));
}
/// Bans the given `public_key`, if the requesting user is a moderator.
pub async fn ban(public_key: String, pool: storage::DatabaseConnectionPool) -> Result<impl warp::Reply, Rejection> {
pub async fn ban(public_key: String, pool: &storage::DatabaseConnectionPool) -> Result<StatusCode, Rejection> {
// Validate the public key
if !is_valid_public_key(&public_key) {
println!("Ignoring ban request for invalid public key.");
@ -131,9 +131,9 @@ pub async fn ban(public_key: String, pool: storage::DatabaseConnectionPool) -> R
// TODO: Check that the requesting user is a moderator
// Don't double ban public keys
if is_banned(&public_key, &pool)? { return Ok(warp::reply::reply()); }
if is_banned(&public_key, pool)? { return Ok(StatusCode::OK); }
// Get a connection and open a transaction
let mut conn = storage::conn(&pool)?;
let mut conn = storage::conn(pool)?;
let tx = storage::tx(&mut conn)?;
// Insert the message
let stmt = format!("INSERT INTO {} (public_key) VALUES (?1)", storage::BLOCK_LIST_TABLE);
@ -141,11 +141,11 @@ pub async fn ban(public_key: String, pool: storage::DatabaseConnectionPool) -> R
// Commit
tx.commit(); // TODO: Unwrap
// Return
return Ok(warp::reply::reply());
return Ok(StatusCode::OK);
}
/// Unbans the given `public_key`, if the requesting user is a moderator.
pub async fn unban(public_key: String, pool: storage::DatabaseConnectionPool) -> Result<impl warp::Reply, Rejection> {
pub async fn unban(public_key: String, pool: &storage::DatabaseConnectionPool) -> Result<StatusCode, Rejection> {
// Validate the public key
if !is_valid_public_key(&public_key) {
println!("Ignoring unban request for invalid public key.");
@ -154,8 +154,10 @@ pub async fn unban(public_key: String, pool: storage::DatabaseConnectionPool) ->
// TODO: Check that the requesting user is a moderator
// Don't double unban public keys
if !is_banned(&public_key, pool)? { return Ok(StatusCode::OK); }
// Get a connection and open a transaction
let mut conn = storage::conn(&pool)?;
let mut conn = storage::conn(pool)?;
let tx = storage::tx(&mut conn)?;
// Insert the message
let stmt = format!("DELETE FROM {} WHERE public_key = (?1)", storage::BLOCK_LIST_TABLE);
@ -163,19 +165,19 @@ pub async fn unban(public_key: String, pool: storage::DatabaseConnectionPool) ->
// Commit
tx.commit(); // TODO: Unwrap
// Return
return Ok(warp::reply::reply());
return Ok(StatusCode::OK);
}
/// Returns the full list of banned public keys.
pub async fn get_banned_public_keys(pool: storage::DatabaseConnectionPool) -> Result<impl warp::Reply, Rejection> {
pub async fn get_banned_public_keys(pool: &storage::DatabaseConnectionPool) -> Result<warp::reply::Json, Rejection> {
// TODO: Check that the requesting user is a moderator
let public_keys = get_banned_public_keys_vector(&pool)?;
let public_keys = get_banned_public_keys_vector(pool)?;
return Ok(warp::reply::json(&public_keys));
}
pub async fn get_member_count(pool: storage::DatabaseConnectionPool) -> Result<impl warp::Reply, Rejection> {
pub async fn get_member_count(pool: &storage::DatabaseConnectionPool) -> Result<warp::reply::Json, Rejection> {
let member_count = 5; // TODO: Implement
return Ok(warp::reply::json(&member_count));
}

View File

@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
use warp::Rejection;
use super::crypto;
use super::rpc;
use super::storage;
#[derive(Deserialize, Serialize, Debug)]
@ -19,17 +20,24 @@ struct LsrpcPayloadMetadata {
}
#[derive(Deserialize, Serialize, Debug)]
struct RpcCall {
pub struct RpcCall {
pub endpoint: String,
pub body: String,
pub method: String
}
#[derive(Debug)]
pub struct RequestSizeExceededError;
impl warp::reject::Reject for RequestSizeExceededError { }
#[derive(Debug)]
pub struct ParsingError;
impl warp::reject::Reject for ParsingError { }
pub async fn handle_lsrpc_request(blob: warp::hyper::body::Bytes, pool: storage::DatabaseConnectionPool) -> Result<impl warp::Reply, Rejection> {
if blob.len() > 10 * 1024 * 1024 { // Match storage server
return Err(warp::reject::custom(RequestSizeExceededError));
}
let payload = parse_lsrpc_payload(blob)?;
let plaintext = decrypt_lsrpc_payload(payload)?;
let json = match String::from_utf8(plaintext) {
@ -46,7 +54,7 @@ pub async fn handle_lsrpc_request(blob: warp::hyper::body::Bytes, pool: storage:
return Err(warp::reject::custom(ParsingError));
}
};
return Ok(warp::reply::reply());
return rpc::handle_rpc_call(rpc_call, &pool).await;
}
fn parse_lsrpc_payload(blob: warp::hyper::body::Bytes) -> Result<LsrpcPayload, Rejection> {

View File

@ -3,15 +3,13 @@ mod handlers;
mod lsrpc;
mod models;
mod routes;
mod rpc;
mod storage;
#[tokio::main]
async fn main() {
// Database
let pool = storage::pool();
let conn = storage::conn(&pool).unwrap(); // Force
let conn = storage::conn(&pool).unwrap();
storage::create_tables_if_needed(&conn);
// Routes
let routes = routes::get_all(&pool);
warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
warp::serve(routes::lsrpc(pool.clone())).run(([127, 0, 0, 1], 3030)).await;
}

View File

@ -1,18 +1,12 @@
use serde::Deserialize;
use warp::{Filter, http::StatusCode, Rejection};
use super::crypto;
use super::handlers;
use super::lsrpc;
use super::models;
use super::rpc;
use super::storage;
#[derive(Debug, Deserialize)]
pub struct QueryOptions {
pub limit: Option<u16>,
pub from_server_id: Option<i64>
}
/// POST /loki/v3/lsrpc
pub fn lsrpc(
db_pool: storage::DatabaseConnectionPool
@ -26,136 +20,6 @@ pub fn lsrpc(
.recover(handle_error);
}
/// POST /messages
pub fn send_message(
db_pool: storage::DatabaseConnectionPool
) -> impl Filter<Extract = impl warp::Reply, Error = Rejection> + Clone {
return warp::post()
.and(warp::path("messages"))
.and(warp::body::content_length_limit(10 * 1024 * 1024)) // Match storage server
.and(warp::body::json()) // Expect JSON
.and(warp::any().map(move || db_pool.clone()))
.and_then(handlers::insert_message)
.recover(handle_error);
}
/// GET /messages
///
/// Returns either the last `limit` messages or all messages since `from_server_id, limited to `limit`.
pub fn get_messages(
db_pool: storage::DatabaseConnectionPool
) -> impl Filter<Extract = impl warp::Reply, Error = Rejection> + Clone {
return warp::get()
.and(warp::path("messages"))
.and(warp::query::<QueryOptions>())
.and(warp::any().map(move || db_pool.clone()))
.and_then(handlers::get_messages)
.recover(handle_error);
}
/// DELETE /messages/:id
pub fn delete_message(
db_pool: storage::DatabaseConnectionPool
) -> impl Filter<Extract = impl warp::Reply, Error = Rejection> + Clone {
return warp::delete()
.and(warp::path!("messages" / i64))
.and(warp::any().map(move || db_pool.clone()))
.and_then(handlers::delete_message)
.recover(handle_error);
}
/// GET /deleted_messages
///
/// Returns either the last `limit` deleted message IDs or all deleted message IDs since `from_server_id, limited to `limit`.
pub fn get_deleted_messages(
db_pool: storage::DatabaseConnectionPool
) -> impl Filter<Extract = impl warp::Reply, Error = Rejection> + Clone {
return warp::get()
.and(warp::path("deleted_messages"))
.and(warp::query::<QueryOptions>())
.and(warp::any().map(move || db_pool.clone()))
.and_then(handlers::get_deleted_messages)
.recover(handle_error);
}
/// GET /moderators
///
/// Returns the full list of moderators.
pub fn get_moderators(
db_pool: storage::DatabaseConnectionPool
) -> impl Filter<Extract = impl warp::Reply, Error = Rejection> + Clone {
return warp::get()
.and(warp::path("moderators"))
.and(warp::any().map(move || db_pool.clone()))
.and_then(handlers::get_moderators)
.recover(handle_error);
}
/// POST /block_list
pub fn ban(
db_pool: storage::DatabaseConnectionPool
) -> impl Filter<Extract = impl warp::Reply, Error = Rejection> + Clone {
return warp::post()
.and(warp::path("block_list"))
.and(warp::body::content_length_limit(256 * 1024)) // Limit body to an arbitrary low-ish size
.and(warp::body::json()) // Expect JSON
.and(warp::any().map(move || db_pool.clone()))
.and_then(handlers::ban)
.recover(handle_error);
}
/// DELETE /block_list/:public_key
pub fn unban(
db_pool: storage::DatabaseConnectionPool
) -> impl Filter<Extract = impl warp::Reply, Error = Rejection> + Clone {
return warp::delete()
.and(warp::path!("block_list" / String))
.and(warp::any().map(move || db_pool.clone()))
.and_then(handlers::unban)
.recover(handle_error);
}
/// GET /block_list
///
/// Returns the full list of banned public keys.
pub fn get_block_list(
db_pool: storage::DatabaseConnectionPool
) -> impl Filter<Extract = impl warp::Reply, Error = Rejection> + Clone {
return warp::get()
.and(warp::path("block_list"))
.and(warp::any().map(move || db_pool.clone()))
.and_then(handlers::get_banned_public_keys)
.recover(handle_error);
}
/// GET /member_count
pub fn get_member_count(
db_pool: storage::DatabaseConnectionPool
) -> impl Filter<Extract = impl warp::Reply, Error = Rejection> + Clone {
return warp::get()
.and(warp::path("member_count"))
.and(warp::any().map(move || db_pool.clone()))
.and_then(handlers::get_member_count)
.recover(handle_error);
}
// Utilities
pub fn get_all(
db_pool: &storage::DatabaseConnectionPool
) -> impl Filter<Extract = impl warp::Reply, Error = Rejection> + Clone {
return lsrpc(db_pool.clone())
.or(send_message(db_pool.clone()))
.or(get_messages(db_pool.clone()))
.or(delete_message(db_pool.clone()))
.or(get_deleted_messages(db_pool.clone()))
.or(get_moderators(db_pool.clone()))
.or(ban(db_pool.clone()))
.or(unban(db_pool.clone()))
.or(get_block_list(db_pool.clone()))
.or(get_member_count(db_pool.clone()));
}
async fn handle_error(e: Rejection) -> Result<impl warp::Reply, Rejection> {
let reply = warp::reply::reply();
if let Some(models::ValidationError) = e.find() {
@ -167,6 +31,9 @@ async fn handle_error(e: Rejection) -> Result<impl warp::Reply, Rejection> {
if let Some(lsrpc::ParsingError) = e.find() {
return Ok(warp::reply::with_status(reply, StatusCode::BAD_REQUEST)); // 400
}
if let Some(rpc::InvalidRequestError) = e.find() {
return Ok(warp::reply::with_status(reply, StatusCode::BAD_REQUEST)); // 400
}
if let Some(handlers::UnauthorizedError) = e.find() {
return Ok(warp::reply::with_status(reply, StatusCode::FORBIDDEN)); // 403
}

119
src/rpc.rs Normal file
View File

@ -0,0 +1,119 @@
use std::convert::TryFrom;
use serde::Deserialize;
use warp::{Filter, http::StatusCode, Rejection};
use super::crypto;
use super::handlers;
use super::lsrpc;
use super::models;
use super::storage;
#[derive(Debug, Deserialize)]
pub struct QueryOptions {
pub limit: Option<u16>,
pub from_server_id: Option<i64>
}
#[derive(Debug)]
pub struct InvalidRequestError;
impl warp::reject::Reject for InvalidRequestError { }
pub async fn handle_rpc_call(rpc_call: lsrpc::RpcCall, pool: &storage::DatabaseConnectionPool) -> Result<impl warp::Reply, Rejection> {
// Check that the endpoint is a valid URI
let uri = match rpc_call.endpoint.parse::<http::Uri>() {
Ok(uri) => uri,
Err(e) => {
println!("Couldn't parse URI from: {:?} due to error: {:?}.", rpc_call.endpoint, e);
return Err(warp::reject::custom(InvalidRequestError));
}
};
// Switch on the HTTP method
match rpc_call.method.as_ref() {
"GET" => return handle_get_rpc_call(rpc_call, uri, pool).await,
"POST" => return handle_post_rpc_call(rpc_call, uri, pool).await,
"DELETE" => return handle_delete_rpc_call(rpc_call, uri, pool).await,
_ => {
println!("Ignoring RPC call with invalid or unused HTTP method: {:?}.", rpc_call.method);
return Err(warp::reject::custom(InvalidRequestError));
}
}
}
pub async fn handle_get_rpc_call(rpc_call: lsrpc::RpcCall, uri: http::Uri, pool: &storage::DatabaseConnectionPool) -> Result<warp::reply::Json, Rejection> {
// Parse query options if needed
let mut query_options = QueryOptions { limit : None, from_server_id : None };
if let Some(query) = uri.query() {
query_options = match serde_json::from_str(&query) {
Ok(query_options) => query_options,
Err(e) => {
println!("Couldn't parse query options from: {:?} due to error: {:?}.", query, e);
return Err(warp::reject::custom(InvalidRequestError));
}
};
}
// Switch on the path
match uri.path() {
"/messages" => return handlers::get_messages(query_options, pool).await,
"/deleted_messages" => return handlers::get_deleted_messages(query_options, pool).await,
"/moderators" => return handlers::get_moderators(pool).await,
"/block_list" => return handlers::get_banned_public_keys(pool).await,
"/member_count" => return handlers::get_member_count(pool).await,
_ => {
println!("Ignoring RPC call with invalid or unused endpoint: {:?}.", rpc_call.endpoint);
return Err(warp::reject::custom(InvalidRequestError));
}
}
}
pub async fn handle_post_rpc_call(rpc_call: lsrpc::RpcCall, uri: http::Uri, pool: &storage::DatabaseConnectionPool) -> Result<impl warp::Reply, Rejection> {
match uri.path() {
"/messages" => {
let message = match serde_json::from_str(&rpc_call.body) {
Ok(query_options) => query_options,
Err(e) => {
println!("Couldn't parse message from: {:?} due to error: {:?}.", rpc_call.body, e);
return Err(warp::reject::custom(InvalidRequestError));
}
};
return handlers::insert_message(message, pool).await;
},
"/block_list" => return handlers::ban(rpc_call.body, pool).await,
_ => {
println!("Ignoring RPC call with invalid or unused endpoint: {:?}.", rpc_call.endpoint);
return Err(warp::reject::custom(InvalidRequestError));
}
}
}
pub async fn handle_delete_rpc_call(rpc_call: lsrpc::RpcCall, uri: http::Uri, pool: &storage::DatabaseConnectionPool) -> Result<StatusCode, Rejection> {
// DELETE /messages/:server_id
if uri.path().starts_with("/messages") {
let components: Vec<&str> = uri.path()[1..].split("/").collect(); // Drop the leading slash and split on subsequent slashes
if components.len() != 2 {
println!("Invalid endpoint: {:?}.", rpc_call.endpoint);
return Err(warp::reject::custom(InvalidRequestError));
}
let server_id: i64 = match components[1].parse() {
Ok(server_id) => server_id,
Err(e) => {
println!("Invalid endpoint: {:?}.", rpc_call.endpoint);
return Err(warp::reject::custom(InvalidRequestError));
}
};
return handlers::delete_message(server_id, pool).await;
}
// DELETE /block_list/:public_key
if uri.path().starts_with("/block_list") {
let components: Vec<&str> = uri.path()[1..].split("/").collect(); // Drop the leading slash and split on subsequent slashes
if components.len() != 2 {
println!("Invalid endpoint: {:?}.", rpc_call.endpoint);
return Err(warp::reject::custom(InvalidRequestError));
}
let public_key = components[1].to_string();
return handlers::unban(public_key, pool).await;
}
// Unrecognized endpoint
println!("Invalid endpoint: {:?}.", rpc_call.endpoint);
return Err(warp::reject::custom(InvalidRequestError));
}