2021-03-25 00:56:16 +01:00
use std ::collections ::HashMap ;
2021-03-22 03:55:43 +01:00
use std ::convert ::TryInto ;
2021-03-23 04:25:52 +01:00
use std ::path ::Path ;
2021-03-19 00:09:13 +01:00
2021-04-01 00:55:47 +02:00
use log ::{ error , info , warn } ;
2021-05-31 02:21:31 +02:00
use parking_lot ::RwLock ;
2021-03-16 06:25:59 +01:00
use rand ::{ thread_rng , Rng } ;
2021-03-25 00:56:16 +01:00
use rusqlite ::params ;
use serde ::{ Deserialize , Serialize } ;
2021-04-01 01:32:25 +02:00
use tokio ::fs ::File ;
use tokio ::io ::{ AsyncReadExt , AsyncWriteExt } ;
2021-03-25 00:56:16 +01:00
use warp ::{ http ::StatusCode , reply ::Reply , reply ::Response , Rejection } ;
2021-03-10 03:08:34 +01:00
2021-03-16 06:25:59 +01:00
use super ::crypto ;
2021-03-12 06:40:24 +01:00
use super ::errors ::Error ;
2021-03-10 03:08:34 +01:00
use super ::models ;
2021-04-27 01:12:07 +02:00
use super ::rpc ;
2021-03-10 03:08:34 +01:00
use super ::storage ;
2021-03-17 23:58:45 +01:00
enum AuthorizationLevel {
2021-03-25 00:56:16 +01:00
Basic ,
2021-03-25 01:38:06 +01:00
Moderator ,
2021-03-17 23:58:45 +01:00
}
2021-03-22 03:55:43 +01:00
#[ derive(Debug, Deserialize, Serialize) ]
2021-03-25 00:56:16 +01:00
pub struct GenericStringResponse {
2021-03-24 00:02:53 +01:00
pub status_code : u16 ,
2021-03-25 01:38:06 +01:00
pub result : String ,
2021-03-22 03:55:43 +01:00
}
2021-05-28 05:55:51 +02:00
pub const SESSION_VERSION_UPDATE_INTERVAL : i64 = 30 * 60 ;
lazy_static ::lazy_static! {
2021-05-31 02:21:31 +02:00
pub static ref SESSION_VERSIONS : RwLock < HashMap < String , ( i64 , String ) > > = RwLock ::new ( HashMap ::new ( ) ) ;
2021-05-28 05:55:51 +02:00
}
2021-03-23 05:22:54 +01:00
// Rooms
2021-03-31 02:00:02 +02:00
// Not publicly exposed.
pub async fn create_room ( room : models ::Room ) -> Result < Response , Rejection > {
2021-03-23 05:22:54 +01:00
// Get a connection
let pool = & storage ::MAIN_POOL ;
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-03-23 05:27:47 +01:00
// Insert the room
let stmt = format! ( " REPLACE INTO {} (id, name) VALUES (?1, ?2) " , storage ::MAIN_TABLE ) ;
2021-03-31 02:00:02 +02:00
match conn . execute ( & stmt , params! [ & room . id , & room . name ] ) {
2021-03-23 05:22:54 +01:00
Ok ( _ ) = > ( ) ,
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't create room due to error: {}. " , e ) ;
2021-03-23 05:22:54 +01:00
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
}
2021-03-23 05:27:47 +01:00
// Set up the database
2021-03-31 02:00:02 +02:00
storage ::create_database_if_needed ( & room . id ) ;
2021-03-23 05:22:54 +01:00
// Return
2021-04-01 00:55:47 +02:00
info! ( " Added room with ID: {} " , & room . id ) ;
2021-03-25 00:56:16 +01:00
let json = models ::StatusCode { status_code : StatusCode ::OK . as_u16 ( ) } ;
2021-03-23 05:22:54 +01:00
return Ok ( warp ::reply ::json ( & json ) . into_response ( ) ) ;
}
2021-03-31 02:00:02 +02:00
// Not publicly exposed.
pub async fn delete_room ( id : String ) -> Result < Response , Rejection > {
2021-03-30 05:01:59 +02:00
// Get a connection
let pool = & storage ::MAIN_POOL ;
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
// Insert the room
let stmt = format! ( " DELETE FROM {} WHERE id = (?1) " , storage ::MAIN_TABLE ) ;
2021-03-31 02:00:02 +02:00
match conn . execute ( & stmt , params! [ & id ] ) {
2021-03-30 05:01:59 +02:00
Ok ( _ ) = > ( ) ,
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't delete room due to error: {}. " , e ) ;
2021-03-30 05:01:59 +02:00
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
}
2021-04-01 00:55:47 +02:00
// Don't auto-delete the database file (the server operator might want to keep it around)
2021-03-30 05:01:59 +02:00
// Return
2021-04-01 00:55:47 +02:00
info! ( " Deleted room with ID: {} " , & id ) ;
2021-03-30 05:01:59 +02:00
let json = models ::StatusCode { status_code : StatusCode ::OK . as_u16 ( ) } ;
return Ok ( warp ::reply ::json ( & json ) . into_response ( ) ) ;
}
2021-03-31 02:23:45 +02:00
pub fn get_room ( room_id : & str ) -> Result < Response , Rejection > {
2021-03-25 04:05:46 +01:00
// Get a connection
let pool = & storage ::MAIN_POOL ;
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
// Get the room info if possible
2021-04-01 00:55:47 +02:00
let raw_query = format! ( " SELECT id, name FROM {} where id = (?1) " , storage ::MAIN_TABLE ) ;
2021-03-25 04:11:45 +01:00
let room = match conn . query_row ( & raw_query , params! [ room_id ] , | row | {
2021-04-01 00:55:47 +02:00
Ok ( models ::Room { id : row . get ( 0 ) ? , name : row . get ( 1 ) ? } )
2021-03-25 04:11:45 +01:00
} ) {
2021-03-25 04:05:46 +01:00
Ok ( info ) = > info ,
Err ( _ ) = > return Err ( warp ::reject ::custom ( Error ::NoSuchRoom ) ) ,
} ;
// Return
#[ derive(Debug, Deserialize, Serialize) ]
struct Response {
status_code : u16 ,
2021-03-31 02:00:02 +02:00
room : models ::Room ,
2021-03-25 04:05:46 +01:00
}
let response = Response { status_code : StatusCode ::OK . as_u16 ( ) , room } ;
return Ok ( warp ::reply ::json ( & response ) . into_response ( ) ) ;
}
2021-03-31 02:23:45 +02:00
pub fn get_all_rooms ( ) -> Result < Response , Rejection > {
2021-03-25 04:05:46 +01:00
// Get a connection
let pool = & storage ::MAIN_POOL ;
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
// Get the room info if possible
2021-04-01 00:55:47 +02:00
let raw_query = format! ( " SELECT id, name FROM {} " , storage ::MAIN_TABLE ) ;
2021-03-25 04:05:46 +01:00
let mut query = conn . prepare ( & raw_query ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-04-01 00:55:47 +02:00
let rows = match query
. query_map ( params! [ ] , | row | Ok ( models ::Room { id : row . get ( 0 ) ? , name : row . get ( 1 ) ? } ) )
{
2021-03-25 04:05:46 +01:00
Ok ( rows ) = > rows ,
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't get rooms due to error: {}. " , e ) ;
2021-03-25 04:05:46 +01:00
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
} ;
2021-03-31 02:00:02 +02:00
let rooms : Vec < models ::Room > = rows . filter_map ( | result | result . ok ( ) ) . collect ( ) ;
2021-03-25 04:05:46 +01:00
// Return
#[ derive(Debug, Deserialize, Serialize) ]
struct Response {
status_code : u16 ,
2021-03-31 02:00:02 +02:00
rooms : Vec < models ::Room > ,
2021-03-25 04:05:46 +01:00
}
let response = Response { status_code : StatusCode ::OK . as_u16 ( ) , rooms } ;
return Ok ( warp ::reply ::json ( & response ) . into_response ( ) ) ;
}
2021-03-19 00:14:14 +01:00
// Files
2021-04-01 01:32:25 +02:00
pub async fn store_file (
2021-04-27 07:51:53 +02:00
room_id : Option < String > , base64_encoded_bytes : & str , auth_token : Option < String > ,
pool : & storage ::DatabaseConnectionPool ,
2021-03-25 00:56:16 +01:00
) -> Result < Response , Rejection > {
2021-03-26 05:24:02 +01:00
// It'd be nice to use the UUID crate for the file ID, but clients want an integer ID
2021-04-27 07:35:50 +02:00
const UPPER_BOUND : u64 = 2 u64 . pow ( 53 ) ; // JS has trouble if we go higher than this
let id : u64 = thread_rng ( ) . gen_range ( 0 .. UPPER_BOUND ) ;
2021-04-28 03:56:28 +02:00
let now = chrono ::Utc ::now ( ) . timestamp ( ) ;
2021-04-27 01:12:07 +02:00
// Check authorization level if needed
match rpc ::MODE {
rpc ::Mode ::OpenGroupServer = > {
2021-05-14 06:22:33 +02:00
let auth_token = auth_token . ok_or_else ( | | warp ::reject ::custom ( Error ::NoAuthToken ) ) ? ;
2021-04-27 01:12:07 +02:00
let ( has_authorization_level , _ ) =
has_authorization_level ( & auth_token , AuthorizationLevel ::Basic , pool ) ? ;
if ! has_authorization_level {
return Err ( warp ::reject ::custom ( Error ::Unauthorized ) ) ;
}
}
rpc ::Mode ::FileServer = > { /* Do nothing */ }
2021-03-25 00:56:16 +01:00
}
2021-03-19 00:09:13 +01:00
// Parse bytes
let bytes = match base64 ::decode ( base64_encoded_bytes ) {
Ok ( bytes ) = > bytes ,
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't parse bytes from invalid base64 encoding due to error: {}. " , e ) ;
2021-03-19 00:09:13 +01:00
return Err ( warp ::reject ::custom ( Error ::ValidationFailed ) ) ;
}
} ;
// Update the database
// We do this * before * storing the actual file, so that in case something goes
// wrong we're not left with files that'll never be pruned.
2021-03-19 03:26:53 +01:00
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-04-01 00:55:47 +02:00
// INSERT rather than REPLACE so that on the off chance there's already a file with this exact
// id (i.e. timestamp) we simply error out and get the client to retry.
2021-03-19 00:09:13 +01:00
let stmt = format! ( " INSERT INTO {} (id, timestamp) VALUES (?1, ?2) " , storage ::FILES_TABLE ) ;
2021-04-27 03:42:00 +02:00
let _ = match conn . execute ( & stmt , params! [ id . to_string ( ) , now ] ) {
2021-03-19 00:09:13 +01:00
Ok ( rows ) = > rows ,
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't insert file record due to error: {}. " , e ) ;
2021-03-19 00:09:13 +01:00
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
} ;
// Write to file
2021-04-28 02:30:39 +02:00
// room_id is guaranteed to be present at this point because we checked the auth
// token (the auth token will have been rejected if room_id is missing).
2021-04-27 07:51:53 +02:00
let room_id = room_id . unwrap ( ) ;
2021-05-14 06:22:33 +02:00
let _ = std ::fs ::create_dir_all ( format! ( " files/ {} _files " , & room_id ) ) ;
2021-04-27 08:21:50 +02:00
let raw_path = format! ( " files/ {} _files/ {} " , & room_id , & id ) ;
2021-03-23 04:25:52 +01:00
let path = Path ::new ( & raw_path ) ;
2021-04-01 01:32:25 +02:00
let mut file = match File ::create ( path ) . await {
Ok ( file ) = > file ,
Err ( e ) = > {
error! ( " Couldn't store file due to error: {}. " , e ) ;
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
} ;
match file . write_all ( & bytes ) . await {
Ok ( _ ) = > ( ) ,
2021-03-19 00:09:13 +01:00
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't store file due to error: {}. " , e ) ;
2021-03-19 00:09:13 +01:00
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
} ;
// Return
2021-03-26 05:24:02 +01:00
#[ derive(Debug, Deserialize, Serialize) ]
struct Response {
status_code : u16 ,
2021-04-27 03:01:20 +02:00
result : u64 ,
2021-03-26 05:24:02 +01:00
}
2021-04-27 03:01:20 +02:00
let response = Response { status_code : StatusCode ::OK . as_u16 ( ) , result : id } ;
2021-03-26 05:24:02 +01:00
return Ok ( warp ::reply ::json ( & response ) . into_response ( ) ) ;
2021-03-19 00:09:13 +01:00
}
2021-04-01 01:32:25 +02:00
pub async fn get_file (
2021-04-27 07:51:53 +02:00
room_id : Option < String > , id : u64 , auth_token : Option < String > ,
pool : & storage ::DatabaseConnectionPool ,
2021-03-25 00:56:16 +01:00
) -> Result < GenericStringResponse , Rejection > {
// Doesn't return a response directly for testing purposes
2021-04-27 01:12:07 +02:00
// Check authorization level if needed
match rpc ::MODE {
rpc ::Mode ::OpenGroupServer = > {
2021-05-14 06:22:33 +02:00
let auth_token = auth_token . ok_or_else ( | | warp ::reject ::custom ( Error ::NoAuthToken ) ) ? ;
2021-04-27 01:12:07 +02:00
let ( has_authorization_level , _ ) =
has_authorization_level ( & auth_token , AuthorizationLevel ::Basic , pool ) ? ;
if ! has_authorization_level {
return Err ( warp ::reject ::custom ( Error ::Unauthorized ) ) ;
}
}
rpc ::Mode ::FileServer = > { /* Do nothing */ }
2021-03-25 00:56:16 +01:00
}
2021-03-19 00:09:13 +01:00
// Try to read the file
2021-04-01 01:32:25 +02:00
let mut bytes = vec! [ ] ;
2021-04-28 02:30:39 +02:00
// room_id is guaranteed to be present at this point because we checked the auth
// token (the auth token will have been rejected if room_id is missing).
2021-04-27 08:21:50 +02:00
let raw_path = format! ( " files/ {} _files/ {} " , room_id . unwrap ( ) , id ) ;
2021-03-23 04:25:52 +01:00
let path = Path ::new ( & raw_path ) ;
2021-04-01 01:32:25 +02:00
let mut file = match File ::open ( path ) . await {
Ok ( file ) = > file ,
Err ( e ) = > {
error! ( " Couldn't read file due to error: {}. " , e ) ;
return Err ( warp ::reject ::custom ( Error ::ValidationFailed ) ) ;
}
} ;
match file . read_to_end ( & mut bytes ) . await {
Ok ( _ ) = > ( ) ,
2021-03-19 00:09:13 +01:00
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't read file due to error: {}. " , e ) ;
2021-03-19 00:09:13 +01:00
return Err ( warp ::reject ::custom ( Error ::ValidationFailed ) ) ;
}
} ;
// Base64 encode the result
let base64_encoded_bytes = base64 ::encode ( bytes ) ;
// Return
2021-03-25 00:56:16 +01:00
let json = GenericStringResponse {
status_code : StatusCode ::OK . as_u16 ( ) ,
2021-03-25 01:38:06 +01:00
result : base64_encoded_bytes ,
2021-03-25 00:56:16 +01:00
} ;
2021-03-19 06:44:07 +01:00
return Ok ( json ) ;
2021-03-19 00:09:13 +01:00
}
2021-04-01 01:32:25 +02:00
pub async fn get_group_image ( room_id : & str ) -> Result < Response , Rejection > {
2021-03-29 07:11:56 +02:00
// Try to read the file
2021-04-01 01:32:25 +02:00
let mut bytes = vec! [ ] ;
2021-03-29 07:11:56 +02:00
let raw_path = format! ( " files/ {} " , room_id ) ;
let path = Path ::new ( & raw_path ) ;
2021-04-01 01:32:25 +02:00
let mut file = match File ::open ( path ) . await {
Ok ( file ) = > file ,
Err ( e ) = > {
error! ( " Couldn't read file due to error: {}. " , e ) ;
return Err ( warp ::reject ::custom ( Error ::ValidationFailed ) ) ;
}
} ;
match file . read_to_end ( & mut bytes ) . await {
Ok ( _ ) = > ( ) ,
2021-03-29 07:11:56 +02:00
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't read file due to error: {}. " , e ) ;
2021-03-29 07:11:56 +02:00
return Err ( warp ::reject ::custom ( Error ::ValidationFailed ) ) ;
}
} ;
// Base64 encode the result
let base64_encoded_bytes = base64 ::encode ( bytes ) ;
// Return
let json = GenericStringResponse {
status_code : StatusCode ::OK . as_u16 ( ) ,
result : base64_encoded_bytes ,
} ;
return Ok ( warp ::reply ::json ( & json ) . into_response ( ) ) ;
}
2021-04-23 03:16:31 +02:00
pub async fn set_group_image (
base64_encoded_bytes : & str , room_id : & str , auth_token : & str ,
pool : & storage ::DatabaseConnectionPool ,
) -> Result < Response , Rejection > {
// Check authorization level
let ( has_authorization_level , _ ) =
has_authorization_level ( auth_token , AuthorizationLevel ::Moderator , pool ) ? ;
if ! has_authorization_level {
return Err ( warp ::reject ::custom ( Error ::Unauthorized ) ) ;
}
// Parse bytes
let bytes = match base64 ::decode ( base64_encoded_bytes ) {
Ok ( bytes ) = > bytes ,
Err ( e ) = > {
error! ( " Couldn't parse bytes from invalid base64 encoding due to error: {}. " , e ) ;
return Err ( warp ::reject ::custom ( Error ::ValidationFailed ) ) ;
}
} ;
// Write to file
let raw_path = format! ( " files/ {} " , room_id ) ;
let path = Path ::new ( & raw_path ) ;
let mut file = match File ::create ( path ) . await {
Ok ( file ) = > file ,
Err ( e ) = > {
error! ( " Couldn't set group image due to error: {}. " , e ) ;
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
} ;
match file . write_all ( & bytes ) . await {
Ok ( _ ) = > ( ) ,
Err ( e ) = > {
error! ( " Couldn't set group image due to error: {}. " , e ) ;
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
} ;
// Return
#[ derive(Debug, Deserialize, Serialize) ]
struct Response {
status_code : u16 ,
room_id : String ,
}
let response = Response { status_code : StatusCode ::OK . as_u16 ( ) , room_id : room_id . to_string ( ) } ;
return Ok ( warp ::reply ::json ( & response ) . into_response ( ) ) ;
}
2021-03-19 00:09:13 +01:00
// Authentication
2021-03-31 02:23:45 +02:00
pub fn get_auth_token_challenge (
2021-03-25 01:38:06 +01:00
query_params : HashMap < String , String > , pool : & storage ::DatabaseConnectionPool ,
2021-03-25 00:56:16 +01:00
) -> Result < models ::Challenge , Rejection > {
// Doesn't return a response directly for testing purposes
2021-03-23 23:12:54 +01:00
// Get the public key
2021-05-14 06:22:33 +02:00
let hex_public_key = query_params
. get ( " public_key " )
. ok_or_else ( | | warp ::reject ::custom ( Error ::InvalidRpcCall ) ) ? ;
2021-03-16 06:25:59 +01:00
// Validate the public key
2021-03-25 00:56:16 +01:00
if ! is_valid_public_key ( hex_public_key ) {
2021-04-01 00:55:47 +02:00
warn! ( " Ignoring challenge request for invalid public key: {}. " , hex_public_key ) ;
2021-03-25 00:56:16 +01:00
return Err ( warp ::reject ::custom ( Error ::ValidationFailed ) ) ;
2021-03-16 06:25:59 +01:00
}
// Convert the public key to bytes and cut off the version byte
2021-03-29 06:53:57 +02:00
// This is safe because we know it has a length of 32 at this point
let public_key : [ u8 ; 32 ] = hex ::decode ( hex_public_key ) . unwrap ( ) [ 1 .. ] . try_into ( ) . unwrap ( ) ;
// Generate an ephemeral key pair
2021-03-31 02:23:45 +02:00
let ( ephemeral_private_key , ephemeral_public_key ) = crypto ::generate_x25519_key_pair ( ) ;
2021-03-16 06:28:54 +01:00
// Generate a symmetric key from the requesting user's public key and the ephemeral private key
2021-03-31 02:23:45 +02:00
let symmetric_key = crypto ::get_x25519_symmetric_key ( & public_key , & ephemeral_private_key ) ? ;
2021-03-29 06:53:57 +02:00
// Generate a random token (or get the currently pending one if possible)
2021-03-31 02:23:45 +02:00
let pending_tokens = get_pending_tokens ( & hex_public_key , & pool ) ? ;
2021-03-29 06:53:57 +02:00
let token : Vec < u8 > ;
if ! pending_tokens . is_empty ( ) {
token = pending_tokens [ 0 ] . 1. clone ( ) ;
} else {
let mut buffer = [ 0 u8 ; 48 ] ;
thread_rng ( ) . fill ( & mut buffer [ .. ] ) ;
token = buffer . to_vec ( ) ;
}
2021-03-17 00:40:50 +01:00
// Store the (pending) token
2021-03-18 00:06:59 +01:00
// Note that a given public key can have multiple pending tokens
2021-03-19 03:26:53 +01:00
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-03-29 06:53:57 +02:00
let now = chrono ::Utc ::now ( ) . timestamp ( ) ;
2021-03-25 00:56:16 +01:00
let stmt = format! (
" INSERT INTO {} (public_key, timestamp, token) VALUES (?1, ?2, ?3) " ,
storage ::PENDING_TOKENS_TABLE
) ;
2021-03-29 06:53:57 +02:00
let _ = match conn . execute ( & stmt , params! [ hex_public_key , now , token ] ) {
2021-03-19 03:26:53 +01:00
Ok ( rows ) = > rows ,
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't insert pending token due to error: {}. " , e ) ;
2021-03-19 03:26:53 +01:00
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
2021-03-17 01:51:11 +01:00
} ;
2021-03-16 06:25:59 +01:00
// Encrypt the token with the symmetric key
2021-03-31 02:23:45 +02:00
let ciphertext = crypto ::encrypt_aes_gcm ( & token , & symmetric_key ) ? ;
2021-03-16 06:25:59 +01:00
// Return
2021-03-25 00:56:16 +01:00
return Ok ( models ::Challenge {
ciphertext : base64 ::encode ( ciphertext ) ,
2021-03-25 01:38:06 +01:00
ephemeral_public_key : base64 ::encode ( ephemeral_public_key . to_bytes ( ) ) ,
2021-03-25 00:56:16 +01:00
} ) ;
2021-03-16 06:25:59 +01:00
}
2021-03-31 02:23:45 +02:00
pub fn claim_auth_token (
2021-03-25 01:38:06 +01:00
public_key : & str , auth_token : & str , pool : & storage ::DatabaseConnectionPool ,
2021-03-25 00:56:16 +01:00
) -> Result < Response , Rejection > {
2021-03-17 00:10:26 +01:00
// Validate the public key
2021-03-25 00:56:16 +01:00
if ! is_valid_public_key ( & public_key ) {
2021-04-01 00:55:47 +02:00
warn! ( " Ignoring claim token request for invalid public key. " ) ;
2021-03-25 00:56:16 +01:00
return Err ( warp ::reject ::custom ( Error ::ValidationFailed ) ) ;
2021-03-17 00:10:26 +01:00
}
// Validate the token
2021-03-25 00:56:16 +01:00
if hex ::decode ( auth_token ) . is_err ( ) {
2021-04-01 00:55:47 +02:00
warn! ( " Ignoring claim token request for invalid token. " ) ;
2021-03-25 00:56:16 +01:00
return Err ( warp ::reject ::custom ( Error ::ValidationFailed ) ) ;
2021-03-17 00:10:26 +01:00
}
2021-03-19 03:26:53 +01:00
// Get a database connection
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-03-17 00:10:26 +01:00
// Get the pending tokens for the given public key
2021-03-31 02:23:45 +02:00
let pending_tokens = get_pending_tokens ( & public_key , & pool ) ? ;
2021-03-17 00:10:26 +01:00
// Check that the token being claimed is in fact one of the pending tokens
2021-03-25 00:17:47 +01:00
let claim = hex ::decode ( auth_token ) . unwrap ( ) ; // Safe because we validated it above
2021-03-25 00:56:16 +01:00
let index = pending_tokens
. iter ( )
. position ( | ( _ , pending_token ) | * pending_token = = claim )
2021-05-14 06:22:33 +02:00
. ok_or ( Error ::Unauthorized ) ? ;
2021-03-17 00:40:50 +01:00
let token = & pending_tokens [ index ] . 1 ;
2021-03-17 00:10:26 +01:00
// Store the claimed token
2021-03-25 00:56:16 +01:00
let stmt = format! (
2021-06-11 01:47:34 +02:00
" INSERT INTO {} (public_key, timestamp, token) VALUES (?1, ?2, ?3) " ,
2021-03-25 00:56:16 +01:00
storage ::TOKENS_TABLE
) ;
2021-06-11 01:47:34 +02:00
let now = chrono ::Utc ::now ( ) . timestamp ( ) ;
match conn . execute ( & stmt , params! [ public_key , now , hex ::encode ( token ) ] ) {
2021-03-17 00:10:26 +01:00
Ok ( _ ) = > ( ) ,
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't insert token due to error: {}. " , e ) ;
2021-03-17 00:10:26 +01:00
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
}
2021-03-19 03:26:53 +01:00
// Delete all pending tokens for the given public key
let stmt = format! ( " DELETE FROM {} WHERE public_key = (?1) " , storage ::PENDING_TOKENS_TABLE ) ;
2021-03-25 00:56:16 +01:00
match conn . execute ( & stmt , params! [ public_key ] ) {
2021-03-19 03:26:53 +01:00
Ok ( _ ) = > ( ) ,
2021-04-01 00:55:47 +02:00
Err ( e ) = > error! ( " Couldn't delete pending tokens due to error: {}. " , e ) , // It's not catastrophic if this fails
2021-03-19 03:26:53 +01:00
} ;
2021-03-17 00:10:26 +01:00
// Return
2021-03-25 00:56:16 +01:00
let json = models ::StatusCode { status_code : StatusCode ::OK . as_u16 ( ) } ;
2021-03-19 06:44:07 +01:00
return Ok ( warp ::reply ::json ( & json ) . into_response ( ) ) ;
2021-03-16 06:40:51 +01:00
}
2021-03-31 02:23:45 +02:00
pub fn delete_auth_token (
2021-03-25 01:38:06 +01:00
auth_token : & str , pool : & storage ::DatabaseConnectionPool ,
2021-03-25 00:56:16 +01:00
) -> Result < Response , Rejection > {
2021-03-18 00:23:43 +01:00
// Check authorization level
2021-03-25 00:56:16 +01:00
let ( has_authorization_level , requesting_public_key ) =
2021-03-31 02:23:45 +02:00
has_authorization_level ( auth_token , AuthorizationLevel ::Basic , pool ) ? ;
2021-03-25 00:56:16 +01:00
if ! has_authorization_level {
return Err ( warp ::reject ::custom ( Error ::Unauthorized ) ) ;
}
2021-03-19 03:26:53 +01:00
// Get a database connection
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-03-18 00:23:43 +01:00
// Delete the token
let stmt = format! ( " DELETE FROM {} WHERE public_key = (?1) " , storage ::TOKENS_TABLE ) ;
2021-03-25 00:56:16 +01:00
match conn . execute ( & stmt , params! [ requesting_public_key ] ) {
2021-03-18 01:52:39 +01:00
Ok ( _ ) = > ( ) ,
2021-03-18 00:23:43 +01:00
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't delete token due to error: {}. " , e ) ;
2021-03-18 00:23:43 +01:00
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
} ;
// Return
2021-03-25 00:56:16 +01:00
let json = models ::StatusCode { status_code : StatusCode ::OK . as_u16 ( ) } ;
2021-03-19 06:44:07 +01:00
return Ok ( warp ::reply ::json ( & json ) . into_response ( ) ) ;
2021-03-18 00:23:43 +01:00
}
2021-03-19 00:09:13 +01:00
// Message sending & receiving
2021-03-10 03:08:34 +01:00
/// Inserts the given `message` into the database if it's valid.
2021-03-31 02:23:45 +02:00
pub fn insert_message (
2021-03-25 01:38:06 +01:00
mut message : models ::Message , auth_token : & str , pool : & storage ::DatabaseConnectionPool ,
2021-03-25 00:56:16 +01:00
) -> Result < Response , Rejection > {
2021-03-10 03:08:34 +01:00
// Validate the message
2021-03-25 00:56:16 +01:00
if ! message . is_valid ( ) {
2021-04-01 00:55:47 +02:00
warn! ( " Ignoring invalid message. " ) ;
2021-03-25 00:56:16 +01:00
return Err ( warp ::reject ::custom ( Error ::ValidationFailed ) ) ;
2021-03-11 04:20:36 +01:00
}
2021-03-17 23:58:45 +01:00
// Check authorization level
2021-03-25 00:56:16 +01:00
let ( has_authorization_level , requesting_public_key ) =
2021-03-31 02:23:45 +02:00
has_authorization_level ( auth_token , AuthorizationLevel ::Basic , pool ) ? ;
2021-03-25 00:56:16 +01:00
if ! has_authorization_level {
return Err ( warp ::reject ::custom ( Error ::Unauthorized ) ) ;
}
2021-07-13 00:57:46 +02:00
// Get a timestamp
let timestamp = chrono ::Utc ::now ( ) . timestamp_millis ( ) ;
2021-03-10 05:38:32 +01:00
// Get a connection and open a transaction
2021-03-15 00:16:06 +01:00
let mut conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
let tx = conn . transaction ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-07-12 08:13:24 +02:00
// Check if the requesting user needs to be rate limited
let last_5_messages = get_last_5_messages ( & requesting_public_key , pool ) ? ;
let should_rate_limit : bool ;
if last_5_messages . len ( ) = = 5 {
2021-07-13 00:57:46 +02:00
let interval = timestamp - last_5_messages [ 4 ] . timestamp ;
2021-07-13 06:00:44 +02:00
// Rate limit if the interval between the fifth last message and the current timestamp is
// less than 16 seconds; in other words, the user can send 5 messages every 16 seconds. This
// is a very crude way of rate limiting, but it should be sufficient for now.
should_rate_limit = interval < 16 * 1000 ;
2021-07-12 08:13:24 +02:00
} else {
should_rate_limit = false ;
}
if should_rate_limit {
return Err ( warp ::reject ::custom ( Error ::RateLimited ) ) ;
}
2021-03-10 03:08:34 +01:00
// Insert the message
2021-05-31 03:11:40 +02:00
message . timestamp = timestamp ;
2021-03-25 00:56:16 +01:00
let stmt = format! (
2021-04-29 03:18:05 +02:00
" INSERT INTO {} (public_key, timestamp, data, signature, is_deleted) VALUES (?1, ?2, ?3, ?4, ?5) " ,
2021-03-25 00:56:16 +01:00
storage ::MESSAGES_TABLE
) ;
2021-03-26 00:21:08 +01:00
match tx . execute (
& stmt ,
2021-04-29 03:18:05 +02:00
params! [ & requesting_public_key , message . timestamp , message . data , message . signature , 0 ] ,
2021-03-26 00:21:08 +01:00
) {
2021-03-15 00:16:06 +01:00
Ok ( _ ) = > ( ) ,
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't insert message due to error: {}. " , e ) ;
2021-03-15 00:16:06 +01:00
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
}
2021-03-10 05:50:24 +01:00
let id = tx . last_insert_rowid ( ) ;
2021-03-10 05:13:58 +01:00
message . server_id = Some ( id ) ;
2021-03-24 04:11:06 +01:00
message . public_key = Some ( requesting_public_key ) ;
2021-03-10 05:13:58 +01:00
// Commit
2021-03-15 00:16:06 +01:00
tx . commit ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-03-10 05:13:58 +01:00
// Return
2021-03-24 00:02:53 +01:00
#[ derive(Debug, Deserialize, Serialize) ]
struct Response {
status_code : u16 ,
2021-03-25 01:38:06 +01:00
message : models ::Message ,
2021-03-24 00:02:53 +01:00
}
2021-03-25 00:56:16 +01:00
let response = Response { status_code : StatusCode ::OK . as_u16 ( ) , message } ;
2021-03-24 00:02:53 +01:00
return Ok ( warp ::reply ::json ( & response ) . into_response ( ) ) ;
2021-03-10 03:08:34 +01:00
}
2021-07-12 08:13:24 +02:00
fn get_last_5_messages (
public_key : & str , pool : & storage ::DatabaseConnectionPool ,
) -> Result < Vec < models ::Message > , Rejection > {
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
let raw_query = format! (
" SELECT id, public_key, timestamp, data, signature FROM {} WHERE public_key = (?1) ORDER BY timestamp DESC LIMIT 5 " ,
storage ::MESSAGES_TABLE
) ;
let mut query = conn . prepare ( & raw_query ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
let rows = match query . query_map ( params! [ public_key ] , | row | {
Ok ( models ::Message {
server_id : row . get ( 0 ) ? ,
public_key : row . get ( 1 ) ? ,
timestamp : row . get ( 2 ) ? ,
data : row . get ( 3 ) ? ,
signature : row . get ( 4 ) ? ,
} )
} ) {
Ok ( rows ) = > rows ,
Err ( e ) = > {
error! ( " Couldn't get last 5 messages due to error: {}. " , e ) ;
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
} ;
return Ok ( rows . filter_map ( | result | result . ok ( ) ) . collect ( ) ) ;
}
2021-03-10 06:01:08 +01:00
/// Returns either the last `limit` messages or all messages since `from_server_id, limited to `limit`.
2021-03-31 02:23:45 +02:00
pub fn get_messages (
2021-03-25 01:38:06 +01:00
query_params : HashMap < String , String > , auth_token : & str , pool : & storage ::DatabaseConnectionPool ,
2021-04-16 07:02:43 +02:00
) -> Result < Vec < models ::Message > , Rejection > {
2021-03-25 00:17:47 +01:00
// Check authorization level
2021-03-25 00:56:16 +01:00
let ( has_authorization_level , _ ) =
2021-03-31 02:23:45 +02:00
has_authorization_level ( auth_token , AuthorizationLevel ::Basic , pool ) ? ;
2021-03-25 00:56:16 +01:00
if ! has_authorization_level {
return Err ( warp ::reject ::custom ( Error ::Unauthorized ) ) ;
}
2021-03-10 03:08:34 +01:00
// Get a database connection
2021-03-15 00:16:06 +01:00
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-03-23 23:12:54 +01:00
// Unwrap query parameters
let from_server_id : i64 ;
if let Some ( str ) = query_params . get ( " from_server_id " ) {
2021-03-23 23:35:26 +01:00
from_server_id = str . parse ( ) . unwrap_or ( 0 ) ;
2021-03-23 23:12:54 +01:00
} else {
from_server_id = 0 ;
}
2021-03-24 23:33:47 +01:00
let limit : u16 ; // Never return more than 256 messages at once
2021-03-23 23:12:54 +01:00
if let Some ( str ) = query_params . get ( " limit " ) {
2021-03-24 23:33:47 +01:00
limit = std ::cmp ::min ( str . parse ( ) . unwrap_or ( 256 ) , 256 ) ;
2021-03-23 23:12:54 +01:00
} else {
2021-03-23 23:35:26 +01:00
limit = 256 ;
2021-03-23 23:12:54 +01:00
}
2021-03-10 04:06:17 +01:00
// Query the database
2021-03-10 06:29:56 +01:00
let raw_query : String ;
2021-03-23 23:12:54 +01:00
if query_params . get ( " from_server_id " ) . is_some ( ) {
2021-04-29 03:18:05 +02:00
raw_query = format! ( " SELECT id, public_key, timestamp, data, signature FROM {} WHERE id > (?1) AND is_deleted = 0 ORDER BY id ASC LIMIT (?2) " , storage ::MESSAGES_TABLE ) ;
2021-03-10 04:06:17 +01:00
} else {
2021-03-25 00:56:16 +01:00
raw_query = format! (
2021-04-29 03:18:05 +02:00
" SELECT id, public_key, timestamp, data, signature FROM {} WHERE is_deleted = 0 ORDER BY id DESC LIMIT (?2) " ,
2021-03-25 00:56:16 +01:00
storage ::MESSAGES_TABLE
) ;
2021-03-10 04:06:17 +01:00
}
2021-03-15 00:16:06 +01:00
let mut query = conn . prepare ( & raw_query ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-03-25 00:56:16 +01:00
let rows = match query . query_map ( params! [ from_server_id , limit ] , | row | {
Ok ( models ::Message {
server_id : row . get ( 0 ) ? ,
public_key : row . get ( 1 ) ? ,
2021-03-26 00:21:08 +01:00
timestamp : row . get ( 2 ) ? ,
data : row . get ( 3 ) ? ,
signature : row . get ( 4 ) ? ,
2021-03-25 00:56:16 +01:00
} )
2021-03-10 03:08:34 +01:00
} ) {
Ok ( rows ) = > rows ,
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't get messages due to error: {}. " , e ) ;
2021-03-12 06:40:24 +01:00
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
2021-03-10 03:08:34 +01:00
}
} ;
2021-03-10 23:56:32 +01:00
let messages : Vec < models ::Message > = rows . filter_map ( | result | result . ok ( ) ) . collect ( ) ;
2021-06-04 07:38:19 +02:00
// Record activity for usage statistics
// We want to fail silently if any of this goes wrong
match update_usage_statistics ( auth_token , pool ) {
Ok ( _ ) = > ( ) ,
Err ( _ ) = > println! ( " Couldn't update usage stats. " ) ,
} ;
2021-03-10 03:08:34 +01:00
// Return the messages
2021-04-16 07:02:43 +02:00
return Ok ( messages ) ;
2021-03-10 03:29:04 +01:00
}
2021-06-04 07:38:19 +02:00
fn update_usage_statistics (
auth_token : & str , pool : & storage ::DatabaseConnectionPool ,
) -> Result < ( ) , Rejection > {
let public_key = get_public_key_for_auth_token ( auth_token , pool ) ? ;
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
let now = chrono ::Utc ::now ( ) . timestamp ( ) ;
let stmt = format! (
" INSERT OR REPLACE INTO {} (public_key, last_active) VALUES(?1, ?2) " ,
storage ::USER_ACTIVITY_TABLE
) ;
conn . execute ( & stmt , params! [ public_key , now ] ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
return Ok ( ( ) ) ;
}
2021-03-19 00:09:13 +01:00
// Message deletion
2021-04-29 02:21:27 +02:00
/// Deletes the messages with the given `ids` from the database, if present.
pub fn delete_messages (
ids : Vec < i64 > , auth_token : & str , pool : & storage ::DatabaseConnectionPool ,
) -> Result < Response , Rejection > {
2021-04-29 02:27:04 +02:00
// FIXME: Right now a situation can occur where a non-moderator user selects multiple
// messages, some of which are their own and some of which aren't, and then hits this endpoint.
// When they do, some of the messages would be deleted but an error status code would be
// returned, prompting the client to roll back the deletions they made locally. The only thing
// preventing this scenario from occurring right now is that we don't allow users to make such
// a selection in the Session UI. In the future we should take a better approach to make it
// impossible.
2021-04-29 02:21:27 +02:00
for id in ids {
delete_message ( id , auth_token , pool ) ? ;
}
let json = models ::StatusCode { status_code : StatusCode ::OK . as_u16 ( ) } ;
return Ok ( warp ::reply ::json ( & json ) . into_response ( ) ) ;
}
2021-04-28 01:50:03 +02:00
/// Deletes the message with the given `id` from the database, if it's present.
2021-03-31 02:23:45 +02:00
pub fn delete_message (
2021-04-28 01:50:03 +02:00
id : i64 , auth_token : & str , pool : & storage ::DatabaseConnectionPool ,
2021-03-25 00:56:16 +01:00
) -> Result < Response , Rejection > {
2021-03-17 23:58:45 +01:00
// Check authorization level
2021-03-25 00:56:16 +01:00
let ( has_authorization_level , requesting_public_key ) =
2021-03-31 02:23:45 +02:00
has_authorization_level ( auth_token , AuthorizationLevel ::Basic , pool ) ? ;
2021-03-25 00:56:16 +01:00
if ! has_authorization_level {
return Err ( warp ::reject ::custom ( Error ::Unauthorized ) ) ;
}
2021-03-18 03:21:10 +01:00
// Check that the requesting user is either the sender of the message or a moderator
2021-03-17 23:58:45 +01:00
let sender_option : Option < String > = {
2021-03-18 00:38:55 +01:00
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-03-25 00:56:16 +01:00
let raw_query =
2021-04-28 01:50:03 +02:00
format! ( " SELECT public_key FROM {} WHERE id = (?1) " , storage ::MESSAGES_TABLE ) ;
2021-03-18 00:38:55 +01:00
let mut query = conn . prepare ( & raw_query ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-05-14 06:22:33 +02:00
let rows = match query . query_map ( params! [ id ] , | row | row . get ( 0 ) ) {
2021-03-17 23:35:51 +01:00
Ok ( rows ) = > rows ,
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't delete message due to error: {}. " , e ) ;
2021-03-17 23:35:51 +01:00
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
} ;
2021-05-31 02:11:13 +02:00
let public_key = rows . filter_map ( | result | result . ok ( ) ) . next ( ) ;
public_key
2021-03-17 23:35:51 +01:00
} ;
2021-05-14 06:22:33 +02:00
let sender =
sender_option . ok_or_else ( | | warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ? ;
2021-03-31 02:23:45 +02:00
if ! is_moderator ( & requesting_public_key , pool ) ? & & requesting_public_key ! = sender {
2021-03-25 00:56:16 +01:00
return Err ( warp ::reject ::custom ( Error ::Unauthorized ) ) ;
}
2021-03-18 00:38:55 +01:00
// Get a connection and open a transaction
let mut conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
let tx = conn . transaction ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-03-10 03:29:04 +01:00
// Delete the message if it's present
2021-04-29 03:18:05 +02:00
let stmt = format! ( " UPDATE {} SET public_key = 'deleted', timestamp = 0, data = 'deleted', signature = 'deleted', is_deleted = 1 WHERE id = (?1) " , storage ::MESSAGES_TABLE ) ;
2021-04-28 01:50:03 +02:00
let count = match tx . execute ( & stmt , params! [ id ] ) {
2021-03-15 00:16:06 +01:00
Ok ( count ) = > count ,
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't delete message due to error: {}. " , e ) ;
2021-03-15 00:16:06 +01:00
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
} ;
2021-03-10 04:55:58 +01:00
// Update the deletions table if needed
2021-03-10 05:38:32 +01:00
if count > 0 {
2021-04-28 01:50:03 +02:00
let stmt = format! (
" INSERT INTO {} (deleted_message_id) VALUES (?1) " ,
storage ::DELETED_MESSAGES_TABLE
) ;
match tx . execute ( & stmt , params! [ id ] ) {
2021-03-15 00:16:06 +01:00
Ok ( _ ) = > ( ) ,
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't delete message due to error: {}. " , e ) ;
2021-03-15 00:16:06 +01:00
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
} ;
2021-03-10 03:29:04 +01:00
}
2021-03-10 04:55:58 +01:00
// Commit
2021-03-15 00:18:51 +01:00
tx . commit ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-03-10 04:55:58 +01:00
// Return
2021-03-25 00:56:16 +01:00
let json = models ::StatusCode { status_code : StatusCode ::OK . as_u16 ( ) } ;
2021-03-19 06:44:07 +01:00
return Ok ( warp ::reply ::json ( & json ) . into_response ( ) ) ;
2021-03-10 06:01:08 +01:00
}
/// Returns either the last `limit` deleted messages or all deleted messages since `from_server_id, limited to `limit`.
2021-03-31 02:23:45 +02:00
pub fn get_deleted_messages (
2021-03-25 01:38:06 +01:00
query_params : HashMap < String , String > , auth_token : & str , pool : & storage ::DatabaseConnectionPool ,
2021-04-28 01:50:03 +02:00
) -> Result < Vec < models ::DeletedMessage > , Rejection > {
2021-03-25 00:17:47 +01:00
// Check authorization level
2021-03-25 00:56:16 +01:00
let ( has_authorization_level , _ ) =
2021-03-31 02:23:45 +02:00
has_authorization_level ( auth_token , AuthorizationLevel ::Basic , pool ) ? ;
2021-03-25 00:56:16 +01:00
if ! has_authorization_level {
return Err ( warp ::reject ::custom ( Error ::Unauthorized ) ) ;
}
2021-03-10 06:01:08 +01:00
// Get a database connection
2021-03-15 00:16:06 +01:00
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-03-23 23:12:54 +01:00
// Unwrap query parameters
let from_server_id : i64 ;
if let Some ( str ) = query_params . get ( " from_server_id " ) {
2021-03-23 23:35:26 +01:00
from_server_id = str . parse ( ) . unwrap_or ( 0 ) ;
2021-03-23 23:12:54 +01:00
} else {
from_server_id = 0 ;
}
2021-03-24 23:33:47 +01:00
let limit : u16 ; // Never return more than 256 messages at once
2021-03-23 23:12:54 +01:00
if let Some ( str ) = query_params . get ( " limit " ) {
2021-03-24 23:33:47 +01:00
limit = std ::cmp ::min ( str . parse ( ) . unwrap_or ( 256 ) , 256 ) ;
2021-03-23 23:12:54 +01:00
} else {
2021-03-23 23:35:26 +01:00
limit = 256 ;
2021-03-23 23:12:54 +01:00
}
2021-03-10 06:01:08 +01:00
// Query the database
2021-03-10 06:29:56 +01:00
let raw_query : String ;
2021-03-23 23:12:54 +01:00
if query_params . get ( " from_server_id " ) . is_some ( ) {
2021-03-25 00:56:16 +01:00
raw_query = format! (
2021-04-28 03:08:42 +02:00
" SELECT id, deleted_message_id FROM {} WHERE id > (?1) ORDER BY id ASC LIMIT (?2) " ,
2021-03-25 00:56:16 +01:00
storage ::DELETED_MESSAGES_TABLE
) ;
2021-03-10 06:01:08 +01:00
} else {
2021-03-25 00:56:16 +01:00
raw_query = format! (
2021-04-28 03:08:42 +02:00
" SELECT id, deleted_message_id FROM {} ORDER BY id DESC LIMIT (?2) " ,
2021-03-25 00:56:16 +01:00
storage ::DELETED_MESSAGES_TABLE
) ;
2021-03-10 06:01:08 +01:00
}
2021-03-15 00:16:06 +01:00
let mut query = conn . prepare ( & raw_query ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-04-28 01:50:03 +02:00
let rows = match query . query_map ( params! [ from_server_id , limit ] , | row | {
Ok ( models ::DeletedMessage { id : row . get ( 0 ) ? , deleted_message_id : row . get ( 1 ) ? } )
} ) {
2021-03-10 06:01:08 +01:00
Ok ( rows ) = > rows ,
Err ( e ) = > {
2021-04-28 01:50:03 +02:00
error! ( " Couldn't get deleted messages due to error: {}. " , e ) ;
2021-03-12 06:40:24 +01:00
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
2021-03-10 06:01:08 +01:00
}
} ;
2021-04-28 01:50:03 +02:00
let deleted_messages : Vec < models ::DeletedMessage > =
rows . filter_map ( | result | result . ok ( ) ) . collect ( ) ;
2021-03-10 06:01:08 +01:00
// Return the IDs
2021-04-28 01:50:03 +02:00
return Ok ( deleted_messages ) ;
2021-03-11 00:06:09 +01:00
}
2021-03-19 00:09:13 +01:00
// Moderation
2021-04-27 05:48:34 +02:00
pub async fn add_moderator_public (
body : models ::ChangeModeratorRequestBody , auth_token : & str ,
) -> Result < Response , Rejection > {
let pool = storage ::pool_by_room_id ( & body . room_id ) ;
let ( has_authorization_level , _ ) =
has_authorization_level ( auth_token , AuthorizationLevel ::Moderator , & pool ) ? ;
if ! has_authorization_level {
return Err ( warp ::reject ::custom ( Error ::Unauthorized ) ) ;
}
return add_moderator ( body ) . await ;
}
2021-03-31 02:46:54 +02:00
// Not publicly exposed.
pub async fn add_moderator (
body : models ::ChangeModeratorRequestBody ,
2021-03-26 06:10:16 +01:00
) -> Result < Response , Rejection > {
// Get a database connection
2021-03-31 02:00:02 +02:00
let pool = storage ::pool_by_room_id ( & body . room_id ) ;
2021-03-26 06:10:16 +01:00
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
// Insert the moderator
let stmt = format! ( " INSERT INTO {} (public_key) VALUES (?1) " , storage ::MODERATORS_TABLE ) ;
2021-03-31 02:00:02 +02:00
match conn . execute ( & stmt , params! [ & body . public_key ] ) {
2021-03-26 06:10:16 +01:00
Ok ( _ ) = > ( ) ,
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't make public key moderator due to error: {}. " , e ) ;
2021-03-26 06:10:16 +01:00
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
}
// Return
2021-04-01 00:55:47 +02:00
info! ( " Added moderator: {} to room with ID: {} " , & body . public_key , & body . room_id ) ;
2021-03-26 06:10:16 +01:00
let json = models ::StatusCode { status_code : StatusCode ::OK . as_u16 ( ) } ;
return Ok ( warp ::reply ::json ( & json ) . into_response ( ) ) ;
}
2021-04-27 05:48:34 +02:00
pub async fn delete_moderator_public (
body : models ::ChangeModeratorRequestBody , auth_token : & str ,
) -> Result < Response , Rejection > {
let pool = storage ::pool_by_room_id ( & body . room_id ) ;
let ( has_authorization_level , _ ) =
has_authorization_level ( auth_token , AuthorizationLevel ::Moderator , & pool ) ? ;
if ! has_authorization_level {
return Err ( warp ::reject ::custom ( Error ::Unauthorized ) ) ;
}
return delete_moderator ( body ) . await ;
}
2021-03-31 06:05:23 +02:00
// Not publicly exposed.
2021-03-31 02:46:54 +02:00
pub async fn delete_moderator (
body : models ::ChangeModeratorRequestBody ,
) -> Result < Response , Rejection > {
// Get a database connection
let pool = storage ::pool_by_room_id ( & body . room_id ) ;
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
// Insert the moderator
let stmt = format! ( " DELETE FROM {} WHERE public_key = (?1) " , storage ::MODERATORS_TABLE ) ;
match conn . execute ( & stmt , params! [ & body . public_key ] ) {
Ok ( _ ) = > ( ) ,
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't delete moderator due to error: {}. " , e ) ;
2021-03-31 02:46:54 +02:00
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
}
// Return
2021-04-01 00:55:47 +02:00
info! ( " Deleted moderator: {} from room with ID: {} " , & body . public_key , & body . room_id ) ;
2021-03-31 02:46:54 +02:00
let json = models ::StatusCode { status_code : StatusCode ::OK . as_u16 ( ) } ;
return Ok ( warp ::reply ::json ( & json ) . into_response ( ) ) ;
}
2021-03-11 00:06:09 +01:00
/// Returns the full list of moderators.
2021-03-31 02:23:45 +02:00
pub fn get_moderators (
2021-03-25 01:38:06 +01:00
auth_token : & str , pool : & storage ::DatabaseConnectionPool ,
2021-04-16 07:02:43 +02:00
) -> Result < Vec < String > , Rejection > {
2021-03-25 00:17:47 +01:00
// Check authorization level
2021-03-25 00:56:16 +01:00
let ( has_authorization_level , _ ) =
2021-03-31 02:23:45 +02:00
has_authorization_level ( auth_token , AuthorizationLevel ::Basic , pool ) ? ;
2021-03-25 00:56:16 +01:00
if ! has_authorization_level {
return Err ( warp ::reject ::custom ( Error ::Unauthorized ) ) ;
}
2021-03-25 00:17:47 +01:00
// Return
2021-03-31 02:23:45 +02:00
let public_keys = get_moderators_vector ( pool ) ? ;
2021-04-16 07:02:43 +02:00
return Ok ( public_keys ) ;
2021-03-11 00:38:02 +01:00
}
2021-07-13 06:13:49 +02:00
/// Bans the given `public_key` if the requesting user is a moderator, and deletes
/// all messages sent by `public_key`.
pub fn ban_and_delete_all_messages (
public_key : & str , auth_token : & str , pool : & storage ::DatabaseConnectionPool ,
) -> Result < Response , Rejection > {
// Validate the public key
if ! is_valid_public_key ( & public_key ) {
warn! ( " Ignoring ban and delete all messages request for invalid public key. " ) ;
return Err ( warp ::reject ::custom ( Error ::ValidationFailed ) ) ;
}
// Check authorization level
let ( has_authorization_level , _ ) =
has_authorization_level ( auth_token , AuthorizationLevel ::Moderator , pool ) ? ;
if ! has_authorization_level {
return Err ( warp ::reject ::custom ( Error ::Unauthorized ) ) ;
}
// Ban the user
ban ( public_key , auth_token , pool ) ? ;
// Get the IDs of the messages to delete
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-07-13 06:22:23 +02:00
let raw_query = format! (
" SELECT id FROM {} WHERE public_key = (?1) AND is_deleted = 0 " ,
storage ::MESSAGES_TABLE
) ;
2021-07-13 06:13:49 +02:00
let mut query = conn . prepare ( & raw_query ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-07-13 06:22:23 +02:00
let rows = match query . query_map ( params! [ public_key ] , | row | Ok ( row . get ( 0 ) ? ) ) {
2021-07-13 06:13:49 +02:00
Ok ( rows ) = > rows ,
Err ( e ) = > {
error! ( " Couldn't delete messages due to error: {}. " , e ) ;
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
} ;
let ids : Vec < i64 > = rows . filter_map ( | result | result . ok ( ) ) . collect ( ) ;
// Delete all messages sent by the given public key
delete_messages ( ids , auth_token , pool ) ? ;
// Return
let json = models ::StatusCode { status_code : StatusCode ::OK . as_u16 ( ) } ;
return Ok ( warp ::reply ::json ( & json ) . into_response ( ) ) ;
}
2021-03-18 03:21:10 +01:00
/// Bans the given `public_key` if the requesting user is a moderator.
2021-03-31 02:23:45 +02:00
pub fn ban (
2021-03-25 01:38:06 +01:00
public_key : & str , auth_token : & str , pool : & storage ::DatabaseConnectionPool ,
2021-03-25 00:56:16 +01:00
) -> Result < Response , Rejection > {
2021-03-11 00:50:17 +01:00
// Validate the public key
2021-03-25 00:56:16 +01:00
if ! is_valid_public_key ( & public_key ) {
2021-04-01 00:55:47 +02:00
warn! ( " Ignoring ban request for invalid public key. " ) ;
2021-03-25 00:56:16 +01:00
return Err ( warp ::reject ::custom ( Error ::ValidationFailed ) ) ;
2021-03-11 04:20:36 +01:00
}
2021-03-17 23:58:45 +01:00
// Check authorization level
2021-03-25 00:56:16 +01:00
let ( has_authorization_level , _ ) =
2021-03-31 02:23:45 +02:00
has_authorization_level ( auth_token , AuthorizationLevel ::Moderator , pool ) ? ;
2021-03-25 00:56:16 +01:00
if ! has_authorization_level {
return Err ( warp ::reject ::custom ( Error ::Unauthorized ) ) ;
}
2021-03-11 01:13:37 +01:00
// Don't double ban public keys
2021-03-31 02:23:45 +02:00
if is_banned ( & public_key , pool ) ? {
2021-03-25 00:56:16 +01:00
return Ok ( StatusCode ::OK . into_response ( ) ) ;
}
2021-03-19 03:26:53 +01:00
// Get a database connection
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-03-11 00:50:17 +01:00
// Insert the message
let stmt = format! ( " INSERT INTO {} (public_key) VALUES (?1) " , storage ::BLOCK_LIST_TABLE ) ;
2021-03-25 00:56:16 +01:00
match conn . execute ( & stmt , params! [ public_key ] ) {
2021-03-15 00:16:06 +01:00
Ok ( _ ) = > ( ) ,
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't ban public key due to error: {}. " , e ) ;
2021-03-15 00:16:06 +01:00
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
} ;
2021-03-11 00:50:17 +01:00
// Return
2021-03-25 00:56:16 +01:00
let json = models ::StatusCode { status_code : StatusCode ::OK . as_u16 ( ) } ;
2021-03-19 06:44:07 +01:00
return Ok ( warp ::reply ::json ( & json ) . into_response ( ) ) ;
2021-03-11 00:38:02 +01:00
}
2021-03-18 03:21:10 +01:00
/// Unbans the given `public_key` if the requesting user is a moderator.
2021-03-31 02:23:45 +02:00
pub fn unban (
2021-03-25 01:38:06 +01:00
public_key : & str , auth_token : & str , pool : & storage ::DatabaseConnectionPool ,
2021-03-25 00:56:16 +01:00
) -> Result < Response , Rejection > {
2021-03-11 00:50:17 +01:00
// Validate the public key
2021-03-25 00:56:16 +01:00
if ! is_valid_public_key ( & public_key ) {
2021-04-01 00:55:47 +02:00
warn! ( " Ignoring unban request for invalid public key. " ) ;
2021-03-25 00:56:16 +01:00
return Err ( warp ::reject ::custom ( Error ::ValidationFailed ) ) ;
2021-03-11 04:20:36 +01:00
}
2021-03-17 23:58:45 +01:00
// Check authorization level
2021-03-25 00:56:16 +01:00
let ( has_authorization_level , _ ) =
2021-03-31 02:23:45 +02:00
has_authorization_level ( auth_token , AuthorizationLevel ::Moderator , pool ) ? ;
2021-03-25 00:56:16 +01:00
if ! has_authorization_level {
return Err ( warp ::reject ::custom ( Error ::Unauthorized ) ) ;
}
2021-03-12 05:11:12 +01:00
// Don't double unban public keys
2021-03-31 02:23:45 +02:00
if ! is_banned ( & public_key , pool ) ? {
2021-03-25 00:56:16 +01:00
return Ok ( StatusCode ::OK . into_response ( ) ) ;
}
2021-03-19 03:26:53 +01:00
// Get a database connection
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-03-11 00:50:17 +01:00
// Insert the message
let stmt = format! ( " DELETE FROM {} WHERE public_key = (?1) " , storage ::BLOCK_LIST_TABLE ) ;
2021-03-25 00:56:16 +01:00
match conn . execute ( & stmt , params! [ public_key ] ) {
2021-03-15 00:16:06 +01:00
Ok ( _ ) = > ( ) ,
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't unban public key due to error: {}. " , e ) ;
2021-03-15 00:16:06 +01:00
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
} ;
2021-03-11 00:50:17 +01:00
// Return
2021-03-25 00:56:16 +01:00
let json = models ::StatusCode { status_code : StatusCode ::OK . as_u16 ( ) } ;
2021-03-19 06:44:07 +01:00
return Ok ( warp ::reply ::json ( & json ) . into_response ( ) ) ;
2021-03-11 00:38:02 +01:00
}
2021-03-11 01:02:28 +01:00
/// Returns the full list of banned public keys.
2021-03-31 02:23:45 +02:00
pub fn get_banned_public_keys (
2021-03-25 01:38:06 +01:00
auth_token : & str , pool : & storage ::DatabaseConnectionPool ,
2021-03-25 00:56:16 +01:00
) -> Result < Response , Rejection > {
2021-03-25 00:17:47 +01:00
// Check authorization level
2021-03-25 00:56:16 +01:00
let ( has_authorization_level , _ ) =
2021-03-31 02:23:45 +02:00
has_authorization_level ( auth_token , AuthorizationLevel ::Basic , pool ) ? ;
2021-03-25 00:56:16 +01:00
if ! has_authorization_level {
return Err ( warp ::reject ::custom ( Error ::Unauthorized ) ) ;
}
2021-03-25 00:17:47 +01:00
// Return
2021-03-31 02:23:45 +02:00
let public_keys = get_banned_public_keys_vector ( pool ) ? ;
2021-03-19 06:44:07 +01:00
#[ derive(Debug, Deserialize, Serialize) ]
2021-03-25 00:56:16 +01:00
struct Response {
2021-03-24 00:02:53 +01:00
status_code : u16 ,
2021-03-25 01:38:06 +01:00
banned_members : Vec < String > ,
2021-03-19 06:44:07 +01:00
}
2021-03-25 00:56:16 +01:00
let response = Response { status_code : StatusCode ::OK . as_u16 ( ) , banned_members : public_keys } ;
2021-03-19 06:44:07 +01:00
return Ok ( warp ::reply ::json ( & response ) . into_response ( ) ) ;
2021-03-11 01:02:28 +01:00
}
2021-03-19 00:09:13 +01:00
// General
2021-03-31 02:23:45 +02:00
pub fn get_member_count (
2021-03-25 01:38:06 +01:00
auth_token : & str , pool : & storage ::DatabaseConnectionPool ,
2021-03-25 00:56:16 +01:00
) -> Result < Response , Rejection > {
2021-03-25 00:17:47 +01:00
// Check authorization level
2021-03-25 00:56:16 +01:00
let ( has_authorization_level , _ ) =
2021-03-31 02:23:45 +02:00
has_authorization_level ( auth_token , AuthorizationLevel ::Basic , pool ) ? ;
2021-03-25 00:56:16 +01:00
if ! has_authorization_level {
return Err ( warp ::reject ::custom ( Error ::Unauthorized ) ) ;
}
2021-03-18 00:23:43 +01:00
// Get a database connection
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
// Query the database
2021-06-11 01:47:34 +02:00
let raw_query = format! ( " SELECT COUNT(DISTINCT public_key) FROM {} " , storage ::TOKENS_TABLE ) ;
2021-03-18 00:23:43 +01:00
let mut query = conn . prepare ( & raw_query ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-05-14 06:22:33 +02:00
let rows = match query . query_map ( params! [ ] , | row | row . get ( 0 ) ) {
2021-03-18 00:23:43 +01:00
Ok ( rows ) = > rows ,
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't query database due to error: {}. " , e ) ;
2021-03-18 00:23:43 +01:00
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
} ;
2021-05-31 02:11:13 +02:00
let public_key_count : u32 = rows
. filter_map ( | result | result . ok ( ) )
. next ( )
. ok_or_else ( | | warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ? ;
2021-03-18 05:35:18 +01:00
// Return
2021-03-19 06:44:07 +01:00
#[ derive(Debug, Deserialize, Serialize) ]
2021-03-25 00:56:16 +01:00
struct Response {
2021-03-24 00:02:53 +01:00
status_code : u16 ,
2021-05-27 14:27:22 +02:00
member_count : u32 ,
2021-03-19 06:44:07 +01:00
}
2021-03-25 00:56:16 +01:00
let response =
Response { status_code : StatusCode ::OK . as_u16 ( ) , member_count : public_key_count } ;
2021-03-19 06:44:07 +01:00
return Ok ( warp ::reply ::json ( & response ) . into_response ( ) ) ;
2021-03-11 01:10:49 +01:00
}
2021-04-16 07:02:43 +02:00
pub fn compact_poll (
request_bodies : Vec < models ::CompactPollRequestBody > ,
) -> Result < Response , Rejection > {
let mut response_bodies : Vec < models ::CompactPollResponseBody > = vec! [ ] ;
2021-06-04 07:54:36 +02:00
let main_pool = & storage ::MAIN_POOL ;
let main_conn = main_pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-04-16 07:02:43 +02:00
for request_body in request_bodies {
// Unwrap the request body
2021-04-19 03:00:03 +02:00
let models ::CompactPollRequestBody {
room_id ,
auth_token ,
from_message_server_id ,
from_deletion_server_id ,
} = request_body ;
2021-06-04 07:54:36 +02:00
// Check that the room hasn't been deleted
2021-06-04 06:52:53 +02:00
let raw_query = format! ( " SELECT id, name FROM {} where id = (?1) " , storage ::MAIN_TABLE ) ;
2021-06-04 07:54:36 +02:00
match main_conn . query_row ( & raw_query , params! [ room_id ] , | row | {
2021-06-04 06:52:53 +02:00
Ok ( models ::Room { id : row . get ( 0 ) ? , name : row . get ( 1 ) ? } )
} ) {
2021-06-04 07:54:36 +02:00
Ok ( _ ) = > ( ) ,
Err ( _ ) = > {
let status_code = StatusCode ::NOT_FOUND . as_u16 ( ) ;
2021-06-04 06:52:53 +02:00
let response_body = models ::CompactPollResponseBody {
room_id ,
status_code ,
messages : vec ! [ ] ,
deletions : vec ! [ ] ,
moderators : vec ! [ ] ,
} ;
response_bodies . push ( response_body ) ;
continue ;
}
} ;
2021-04-16 07:02:43 +02:00
// Get the database connection pool
let pool = storage ::pool_by_room_id ( & room_id ) ;
// Get the new messages
let mut get_messages_query_params : HashMap < String , String > = HashMap ::new ( ) ;
2021-04-19 03:00:03 +02:00
if let Some ( from_message_server_id ) = from_message_server_id {
get_messages_query_params
. insert ( " from_server_id " . to_string ( ) , from_message_server_id . to_string ( ) ) ;
}
2021-04-22 07:41:29 +02:00
let messages = match get_messages ( get_messages_query_params , & auth_token , & pool ) {
Ok ( messages ) = > messages ,
Err ( e ) = > {
let status_code = super ::errors ::status_code ( e ) ;
let response_body = models ::CompactPollResponseBody {
room_id ,
status_code : status_code . as_u16 ( ) ,
messages : vec ! [ ] ,
deletions : vec ! [ ] ,
moderators : vec ! [ ] ,
} ;
response_bodies . push ( response_body ) ;
continue ;
}
} ;
2021-04-16 07:02:43 +02:00
// Get the new deletions
let mut get_deletions_query_params : HashMap < String , String > = HashMap ::new ( ) ;
2021-04-19 03:00:03 +02:00
if let Some ( from_deletion_server_id ) = from_deletion_server_id {
get_deletions_query_params
. insert ( " from_server_id " . to_string ( ) , from_deletion_server_id . to_string ( ) ) ;
}
2021-04-22 07:41:29 +02:00
let deletions = match get_deleted_messages ( get_deletions_query_params , & auth_token , & pool ) {
Ok ( deletions ) = > deletions ,
Err ( e ) = > {
let status_code = super ::errors ::status_code ( e ) ;
let response_body = models ::CompactPollResponseBody {
room_id ,
status_code : status_code . as_u16 ( ) ,
messages : vec ! [ ] ,
deletions : vec ! [ ] ,
moderators : vec ! [ ] ,
} ;
response_bodies . push ( response_body ) ;
continue ;
}
} ;
2021-04-16 07:02:43 +02:00
// Get the moderators
2021-04-22 07:41:29 +02:00
let moderators = match get_moderators ( & auth_token , & pool ) {
Ok ( moderators ) = > moderators ,
Err ( e ) = > {
let status_code = super ::errors ::status_code ( e ) ;
let response_body = models ::CompactPollResponseBody {
room_id ,
status_code : status_code . as_u16 ( ) ,
messages : vec ! [ ] ,
deletions : vec ! [ ] ,
moderators : vec ! [ ] ,
} ;
response_bodies . push ( response_body ) ;
continue ;
}
} ;
2021-04-16 07:02:43 +02:00
// Add to the response
2021-04-22 07:41:29 +02:00
let response_body = models ::CompactPollResponseBody {
room_id ,
status_code : StatusCode ::OK . as_u16 ( ) ,
deletions ,
messages ,
moderators ,
} ;
2021-04-16 07:02:43 +02:00
response_bodies . push ( response_body ) ;
}
// Return
#[ derive(Debug, Deserialize, Serialize) ]
struct Response {
status_code : u16 ,
results : Vec < models ::CompactPollResponseBody > ,
}
let response = Response { status_code : StatusCode ::OK . as_u16 ( ) , results : response_bodies } ;
return Ok ( warp ::reply ::json ( & response ) . into_response ( ) ) ;
2021-04-16 06:34:32 +02:00
}
2021-04-23 01:40:03 +02:00
// Not publicly exposed.
pub async fn get_url ( ) -> Result < Response , Rejection > {
let url = super ::get_url ( ) ;
return Ok ( warp ::reply ::json ( & url ) . into_response ( ) ) ;
}
2021-05-28 05:55:51 +02:00
pub async fn get_session_version ( platform : & str ) -> Result < String , Rejection > {
2021-05-31 02:21:31 +02:00
let mut session_versions = SESSION_VERSIONS . read ( ) . clone ( ) ;
2021-05-28 05:55:51 +02:00
let now = chrono ::Utc ::now ( ) . timestamp ( ) ;
if let Some ( version_info ) = session_versions . get ( platform ) {
let last_updated = version_info . 0 ;
if now - last_updated < SESSION_VERSION_UPDATE_INTERVAL {
let tag = version_info . 1. to_string ( ) ;
println! ( " Returning cached value: {} " , tag ) ;
return Ok ( tag ) ;
}
}
let octocrab = octocrab ::instance ( ) ;
let repo = format! ( " session- {} " , platform ) ;
let handler = octocrab . repos ( " oxen-io " , repo ) ;
let release = handler . releases ( ) . get_latest ( ) . await . unwrap ( ) ;
let tag = release . tag_name ;
let tuple = ( now , tag . clone ( ) ) ;
session_versions . insert ( platform . to_string ( ) , tuple ) ;
2021-05-31 02:21:31 +02:00
* SESSION_VERSIONS . write ( ) = session_versions . clone ( ) ;
return Ok ( tag ) ;
2021-05-28 05:55:51 +02:00
}
2021-05-28 14:49:02 +02:00
// not publicly exposed.
pub async fn get_stats_for_room (
room : String , query_map : HashMap < String , i64 > ,
) -> Result < Response , Rejection > {
let now = chrono ::Utc ::now ( ) . timestamp ( ) ;
let window = match query_map . get ( " window " ) {
Some ( val ) = > val ,
None = > & 3600 i64 ,
} ;
let upperbound = match query_map . get ( " start " ) {
Some ( val ) = > val ,
None = > & now ,
} ;
let lowerbound = upperbound - window ;
let pool = storage ::pool_by_room_id ( & room ) ;
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
let raw_query_users = format! (
" SELECT COUNT(public_key) FROM {} WHERE last_active > ?1 AND last_active <= ?2 " ,
storage ::USER_ACTIVITY_TABLE
) ;
let mut query_users =
conn . prepare ( & raw_query_users ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
let active = match query_users
. query_row ( params! [ lowerbound , upperbound ] , | row | Ok ( row . get ::< _ , u32 > ( 0 ) ? ) )
{
Ok ( row ) = > row ,
Err ( _e ) = > return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ,
} ;
let raw_query_posts = format! (
2021-06-04 07:42:10 +02:00
" SELECT COUNT(id) FROM {} WHERE timestamp >= ?1 AND timestamp <= ?2 " ,
2021-05-28 14:49:02 +02:00
storage ::MESSAGES_TABLE
) ;
let mut query_posts =
conn . prepare ( & raw_query_posts ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
let posts = match query_posts
. query_row ( params! [ lowerbound * 1000 , upperbound * 1000 ] , | row | Ok ( row . get ::< _ , u32 > ( 0 ) ? ) )
{
Ok ( row ) = > row ,
Err ( _e ) = > return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ,
} ;
// Return value
#[ derive(Debug, Deserialize, Serialize) ]
struct Response {
posts : u32 ,
active_users : u32 ,
}
let response = Response { active_users : active , posts } ;
return Ok ( warp ::reply ::json ( & response ) . into_response ( ) ) ;
}
2021-03-11 00:38:02 +01:00
// Utilities
2021-03-31 02:23:45 +02:00
fn get_pending_tokens (
2021-03-29 06:53:57 +02:00
public_key : & str , pool : & storage ::DatabaseConnectionPool ,
) -> Result < Vec < ( i64 , Vec < u8 > ) > , Rejection > {
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
let raw_query = format! (
" SELECT timestamp, token FROM {} WHERE public_key = (?1) AND timestamp > (?2) " ,
storage ::PENDING_TOKENS_TABLE
) ;
let mut query = conn . prepare ( & raw_query ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
let now = chrono ::Utc ::now ( ) . timestamp ( ) ;
let expiration = now - storage ::PENDING_TOKEN_EXPIRATION ;
let rows = match query
. query_map ( params! [ public_key , expiration ] , | row | Ok ( ( row . get ( 0 ) ? , row . get ( 1 ) ? ) ) )
{
Ok ( rows ) = > rows ,
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't get pending tokens due to error: {}. " , e ) ;
2021-03-29 06:53:57 +02:00
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
} ;
let pending_tokens : Vec < ( i64 , Vec < u8 > ) > = rows . filter_map ( | result | result . ok ( ) ) . collect ( ) ;
return Ok ( pending_tokens ) ;
}
2021-03-31 02:23:45 +02:00
fn get_moderators_vector ( pool : & storage ::DatabaseConnectionPool ) -> Result < Vec < String > , Rejection > {
2021-03-11 00:06:09 +01:00
// Get a database connection
2021-03-15 00:16:06 +01:00
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-03-11 00:06:09 +01:00
// Query the database
let raw_query = format! ( " SELECT public_key FROM {} " , storage ::MODERATORS_TABLE ) ;
2021-03-15 00:16:06 +01:00
let mut query = conn . prepare ( & raw_query ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-05-14 06:22:33 +02:00
let rows = match query . query_map ( params! [ ] , | row | row . get ( 0 ) ) {
2021-03-11 00:06:09 +01:00
Ok ( rows ) = > rows ,
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't query database due to error: {}. " , e ) ;
2021-03-12 06:40:24 +01:00
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
2021-03-11 00:06:09 +01:00
}
} ;
2021-03-11 00:38:02 +01:00
// Return
return Ok ( rows . filter_map ( | result | result . ok ( ) ) . collect ( ) ) ;
}
2021-03-31 02:23:45 +02:00
fn is_moderator (
2021-03-25 01:38:06 +01:00
public_key : & str , pool : & storage ::DatabaseConnectionPool ,
2021-03-25 00:56:16 +01:00
) -> Result < bool , Rejection > {
2021-03-31 02:23:45 +02:00
let public_keys = get_moderators_vector ( & pool ) ? ;
2021-03-11 00:38:02 +01:00
return Ok ( public_keys . contains ( & public_key . to_owned ( ) ) ) ;
2021-03-11 00:50:17 +01:00
}
2021-03-31 02:23:45 +02:00
fn get_banned_public_keys_vector (
2021-03-25 01:38:06 +01:00
pool : & storage ::DatabaseConnectionPool ,
2021-03-25 00:56:16 +01:00
) -> Result < Vec < String > , Rejection > {
2021-03-11 01:02:28 +01:00
// Get a database connection
2021-03-15 00:16:06 +01:00
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-03-11 01:02:28 +01:00
// Query the database
let raw_query = format! ( " SELECT public_key FROM {} " , storage ::BLOCK_LIST_TABLE ) ;
2021-03-15 00:16:06 +01:00
let mut query = conn . prepare ( & raw_query ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-05-14 06:22:33 +02:00
let rows = match query . query_map ( params! [ ] , | row | row . get ( 0 ) ) {
2021-03-11 01:02:28 +01:00
Ok ( rows ) = > rows ,
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't query database due to error: {}. " , e ) ;
2021-03-12 06:40:24 +01:00
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
2021-03-11 01:02:28 +01:00
}
} ;
// Return
return Ok ( rows . filter_map ( | result | result . ok ( ) ) . collect ( ) ) ;
}
2021-03-31 02:23:45 +02:00
fn is_banned ( public_key : & str , pool : & storage ::DatabaseConnectionPool ) -> Result < bool , Rejection > {
2021-05-27 14:27:22 +02:00
// Get a database connection
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
// Query the database
let raw_query = format! (
" SELECT COUNT(public_key) FROM {} WHERE public_key = (?1) " ,
storage ::BLOCK_LIST_TABLE
) ;
let mut query = conn . prepare ( & raw_query ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-05-31 02:11:13 +02:00
let rows = match query . query_map ( params! [ public_key ] , | row | row . get ( 0 ) ) {
2021-05-27 14:27:22 +02:00
Ok ( rows ) = > rows ,
Err ( e ) = > {
error! ( " Couldn't query database due to error: {}. " , e ) ;
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
} ;
2021-05-31 02:11:13 +02:00
let public_key_count : u32 = rows
. filter_map ( | result | result . ok ( ) )
. next ( )
. ok_or_else ( | | warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ? ;
2021-05-27 14:27:22 +02:00
return Ok ( public_key_count ! = 0 ) ;
2021-03-11 01:02:28 +01:00
}
2021-03-17 23:35:51 +01:00
fn is_valid_public_key ( public_key : & str ) -> bool {
2021-03-11 01:02:28 +01:00
// Check that it's a valid hex encoding
2021-03-25 00:56:16 +01:00
if hex ::decode ( public_key ) . is_err ( ) {
return false ;
}
2021-03-11 01:02:28 +01:00
// Check that it's the right length
2021-03-25 00:56:16 +01:00
if public_key . len ( ) ! = 66 {
return false ;
} // The version byte + 32 bytes of random data
// It appears to be a valid public key
return true ;
2021-03-17 23:35:51 +01:00
}
2021-03-31 02:23:45 +02:00
fn get_public_key_for_auth_token (
2021-03-25 01:38:06 +01:00
auth_token : & str , pool : & storage ::DatabaseConnectionPool ,
2021-03-25 00:56:16 +01:00
) -> Result < Option < String > , Rejection > {
2021-03-17 23:35:51 +01:00
// Get a database connection
let conn = pool . get ( ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
// Query the database
let raw_query = format! ( " SELECT public_key FROM {} WHERE token = (?1) " , storage ::TOKENS_TABLE ) ;
let mut query = conn . prepare ( & raw_query ) . map_err ( | _ | Error ::DatabaseFailedInternally ) ? ;
2021-05-14 06:22:33 +02:00
let rows = match query . query_map ( params! [ auth_token ] , | row | row . get ( 0 ) ) {
2021-03-17 23:35:51 +01:00
Ok ( rows ) = > rows ,
Err ( e ) = > {
2021-04-01 00:55:47 +02:00
error! ( " Couldn't query database due to error: {}. " , e ) ;
2021-03-17 23:35:51 +01:00
return Err ( warp ::reject ::custom ( Error ::DatabaseFailedInternally ) ) ;
}
} ;
2021-05-31 02:11:13 +02:00
let public_key : Option < String > = rows . filter_map ( | result | result . ok ( ) ) . next ( ) ;
2021-03-17 23:35:51 +01:00
// Return
2021-05-31 02:11:13 +02:00
return Ok ( public_key ) ;
2021-03-17 23:58:45 +01:00
}
2021-03-31 02:23:45 +02:00
fn has_authorization_level (
2021-03-25 01:38:06 +01:00
auth_token : & str , level : AuthorizationLevel , pool : & storage ::DatabaseConnectionPool ,
2021-03-25 00:56:16 +01:00
) -> Result < ( bool , String ) , Rejection > {
2021-03-17 23:58:45 +01:00
// Check that we have a public key associated with the given auth token
2021-03-31 02:23:45 +02:00
let public_key_option = get_public_key_for_auth_token ( auth_token , pool ) ? ;
2021-05-14 06:22:33 +02:00
let public_key = public_key_option . ok_or_else ( | | warp ::reject ::custom ( Error ::NoAuthToken ) ) ? ;
2021-03-17 23:58:45 +01:00
// Check that the given public key isn't banned
2021-03-31 02:23:45 +02:00
if is_banned ( & public_key , pool ) ? {
2021-03-25 00:56:16 +01:00
return Err ( warp ::reject ::custom ( Error ::Unauthorized ) ) ;
}
2021-03-17 23:58:45 +01:00
// If needed, check that the given public key is a moderator
match level {
2021-03-18 01:52:39 +01:00
AuthorizationLevel ::Basic = > return Ok ( ( true , public_key ) ) ,
AuthorizationLevel ::Moderator = > {
2021-03-31 02:23:45 +02:00
if ! is_moderator ( & public_key , pool ) ? {
2021-03-25 00:56:16 +01:00
return Err ( warp ::reject ::custom ( Error ::Unauthorized ) ) ;
}
2021-03-17 23:58:45 +01:00
return Ok ( ( true , public_key ) ) ;
}
} ;
}