mirror of
https://github.com/oxen-io/session-open-group-server.git
synced 2023-12-13 20:30:35 +01:00
Compare commits
4 commits
e56e32d11a
...
26851becfd
Author | SHA1 | Date | |
---|---|---|---|
|
26851becfd | ||
|
734ee32897 | ||
|
ecc6212f5d | ||
|
56fda143fb |
4
Cargo.lock
generated
4
Cargo.lock
generated
|
@ -1,7 +1,5 @@
|
||||||
# This file is automatically @generated by Cargo.
|
# This file is automatically @generated by Cargo.
|
||||||
# It is not intended for manual editing.
|
# It is not intended for manual editing.
|
||||||
version = 3
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "addr2line"
|
name = "addr2line"
|
||||||
version = "0.14.1"
|
version = "0.14.1"
|
||||||
|
@ -1747,7 +1745,7 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "session-open-group-server"
|
name = "session-open-group-server"
|
||||||
version = "0.1.8"
|
version = "0.1.10"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"aes-gcm",
|
"aes-gcm",
|
||||||
"base64",
|
"base64",
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
[package]
|
[package]
|
||||||
name = "session-open-group-server"
|
name = "session-open-group-server"
|
||||||
version = "0.1.9"
|
version = "0.1.10"
|
||||||
authors = ["Niels Andriesse <niels@oxen.io>"]
|
authors = ["The Oxen Project <team@oxen.io>"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
description = "The Session open group server. Use this to run a custom open group."
|
description = "The Session open group server. Use this to run a custom open group."
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
|
|
|
@ -746,7 +746,7 @@ pub async fn add_moderator_public(
|
||||||
body: models::ChangeModeratorRequestBody, auth_token: &str,
|
body: models::ChangeModeratorRequestBody, auth_token: &str,
|
||||||
) -> Result<Response, Rejection> {
|
) -> Result<Response, Rejection> {
|
||||||
let room_id = storage::RoomId::new(&body.room_id).ok_or(Error::ValidationFailed)?;
|
let room_id = storage::RoomId::new(&body.room_id).ok_or(Error::ValidationFailed)?;
|
||||||
let pool = storage::pool_by_room_id(&room_id);
|
let pool = storage::pool_by_room_id(&room_id)?;
|
||||||
let (has_authorization_level, _) =
|
let (has_authorization_level, _) =
|
||||||
has_authorization_level(auth_token, AuthorizationLevel::Moderator, &pool)?;
|
has_authorization_level(auth_token, AuthorizationLevel::Moderator, &pool)?;
|
||||||
if !has_authorization_level {
|
if !has_authorization_level {
|
||||||
|
@ -762,7 +762,7 @@ pub async fn add_moderator(
|
||||||
// Get a database connection
|
// Get a database connection
|
||||||
let pool = storage::pool_by_room_id(
|
let pool = storage::pool_by_room_id(
|
||||||
&storage::RoomId::new(&body.room_id).ok_or(Error::ValidationFailed)?,
|
&storage::RoomId::new(&body.room_id).ok_or(Error::ValidationFailed)?,
|
||||||
);
|
)?;
|
||||||
let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
|
let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
|
||||||
// Insert the moderator
|
// Insert the moderator
|
||||||
let stmt = "INSERT INTO moderators (public_key) VALUES (?1)";
|
let stmt = "INSERT INTO moderators (public_key) VALUES (?1)";
|
||||||
|
@ -784,7 +784,7 @@ pub async fn delete_moderator_public(
|
||||||
) -> Result<Response, Rejection> {
|
) -> Result<Response, Rejection> {
|
||||||
let pool = storage::pool_by_room_id(
|
let pool = storage::pool_by_room_id(
|
||||||
&storage::RoomId::new(&body.room_id).ok_or(Error::ValidationFailed)?,
|
&storage::RoomId::new(&body.room_id).ok_or(Error::ValidationFailed)?,
|
||||||
);
|
)?;
|
||||||
let (has_authorization_level, _) =
|
let (has_authorization_level, _) =
|
||||||
has_authorization_level(auth_token, AuthorizationLevel::Moderator, &pool)?;
|
has_authorization_level(auth_token, AuthorizationLevel::Moderator, &pool)?;
|
||||||
if !has_authorization_level {
|
if !has_authorization_level {
|
||||||
|
@ -800,7 +800,7 @@ pub async fn delete_moderator(
|
||||||
// Get a database connection
|
// Get a database connection
|
||||||
let pool = storage::pool_by_room_id(
|
let pool = storage::pool_by_room_id(
|
||||||
&storage::RoomId::new(&body.room_id).ok_or(Error::ValidationFailed)?,
|
&storage::RoomId::new(&body.room_id).ok_or(Error::ValidationFailed)?,
|
||||||
);
|
)?;
|
||||||
let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
|
let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
|
||||||
// Insert the moderator
|
// Insert the moderator
|
||||||
let stmt = "DELETE FROM moderators WHERE public_key = (?1)";
|
let stmt = "DELETE FROM moderators WHERE public_key = (?1)";
|
||||||
|
@ -1034,7 +1034,7 @@ pub fn compact_poll(
|
||||||
// Get the database connection pool
|
// Get the database connection pool
|
||||||
let pool = storage::pool_by_room_id(
|
let pool = storage::pool_by_room_id(
|
||||||
&storage::RoomId::new(&room_id).ok_or(Error::ValidationFailed)?,
|
&storage::RoomId::new(&room_id).ok_or(Error::ValidationFailed)?,
|
||||||
);
|
)?;
|
||||||
// Get the new messages
|
// Get the new messages
|
||||||
let mut get_messages_query_params: HashMap<String, String> = HashMap::new();
|
let mut get_messages_query_params: HashMap<String, String> = HashMap::new();
|
||||||
if let Some(from_message_server_id) = from_message_server_id {
|
if let Some(from_message_server_id) = from_message_server_id {
|
||||||
|
@ -1158,7 +1158,7 @@ pub async fn get_stats_for_room(
|
||||||
|
|
||||||
let lowerbound = upperbound - window;
|
let lowerbound = upperbound - window;
|
||||||
let pool =
|
let pool =
|
||||||
storage::pool_by_room_id(&storage::RoomId::new(&room_id).ok_or(Error::ValidationFailed)?);
|
storage::pool_by_room_id(&storage::RoomId::new(&room_id).ok_or(Error::ValidationFailed)?)?;
|
||||||
let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
|
let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
|
||||||
|
|
||||||
let raw_query_users =
|
let raw_query_users =
|
||||||
|
|
12
src/main.rs
12
src/main.rs
|
@ -68,8 +68,6 @@ async fn main() {
|
||||||
// Create required folders
|
// Create required folders
|
||||||
fs::create_dir_all("./rooms").unwrap();
|
fs::create_dir_all("./rooms").unwrap();
|
||||||
fs::create_dir_all("./files").unwrap();
|
fs::create_dir_all("./files").unwrap();
|
||||||
// Create default rooms
|
|
||||||
create_default_rooms().await;
|
|
||||||
// Perform migration
|
// Perform migration
|
||||||
storage::perform_migration();
|
storage::perform_migration();
|
||||||
// Set up pruning jobs
|
// Set up pruning jobs
|
||||||
|
@ -170,13 +168,3 @@ fn get_url() -> String {
|
||||||
hex_public_key
|
hex_public_key
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn create_default_rooms() {
|
|
||||||
let info: Vec<(&str, &str)> = vec![("main", "Main")];
|
|
||||||
for info in info {
|
|
||||||
let id = info.0.to_string();
|
|
||||||
let name = info.1.to_string();
|
|
||||||
let room = models::Room { id, name };
|
|
||||||
handlers::create_room(room).await.unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -409,9 +409,9 @@ async fn handle_delete_request(
|
||||||
|
|
||||||
fn get_pool_for_room(rpc_call: &RpcCall) -> Result<storage::DatabaseConnectionPool, Rejection> {
|
fn get_pool_for_room(rpc_call: &RpcCall) -> Result<storage::DatabaseConnectionPool, Rejection> {
|
||||||
let room_id = get_room_id(&rpc_call).ok_or(Error::ValidationFailed)?;
|
let room_id = get_room_id(&rpc_call).ok_or(Error::ValidationFailed)?;
|
||||||
return Ok(storage::pool_by_room_id(
|
return storage::pool_by_room_id(
|
||||||
&storage::RoomId::new(&room_id).ok_or(Error::ValidationFailed)?,
|
&storage::RoomId::new(&room_id).ok_or(Error::ValidationFailed)?,
|
||||||
));
|
).map_err(|e| e.into());
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_auth_token(rpc_call: &RpcCall) -> Option<String> {
|
fn get_auth_token(rpc_call: &RpcCall) -> Option<String> {
|
||||||
|
|
|
@ -3,7 +3,7 @@ use std::collections::HashMap;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
|
|
||||||
use log::{error, info};
|
use log::{error, warn, info};
|
||||||
use r2d2_sqlite::SqliteConnectionManager;
|
use r2d2_sqlite::SqliteConnectionManager;
|
||||||
use rusqlite::params;
|
use rusqlite::params;
|
||||||
use rusqlite_migration::{Migrations, M};
|
use rusqlite_migration::{Migrations, M};
|
||||||
|
@ -74,23 +74,41 @@ lazy_static::lazy_static! {
|
||||||
static ref POOLS: Mutex<HashMap<String, DatabaseConnectionPool>> = Mutex::new(HashMap::new());
|
static ref POOLS: Mutex<HashMap<String, DatabaseConnectionPool>> = Mutex::new(HashMap::new());
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn pool_by_room_id(room_id: &RoomId) -> DatabaseConnectionPool {
|
pub fn pool_by_room_id(room_id: &RoomId) -> Result<DatabaseConnectionPool, Error> {
|
||||||
let mut pools = POOLS.lock().unwrap();
|
let mut pools = POOLS.lock().unwrap();
|
||||||
if let Some(pool) = pools.get(room_id.get_id()) {
|
if let Some(pool) = pools.get(room_id.get_id()) {
|
||||||
return pool.clone();
|
return Ok(pool.clone());
|
||||||
} else {
|
} else {
|
||||||
|
let pool = &MAIN_POOL;
|
||||||
|
if let Ok(conn) = pool.get() {
|
||||||
|
if let Ok(count) = conn.query_row("SELECT COUNT(*) FROM main WHERE id = ?", params![room_id.get_id()],
|
||||||
|
|row| row.get::<_, i64>(0)) {
|
||||||
|
if count == 0 {
|
||||||
|
warn!("Cannot access room database: room {} does not exist", room_id.get_id());
|
||||||
|
return Err(Error::NoSuchRoom);
|
||||||
|
}
|
||||||
let raw_path = format!("rooms/{}.db", room_id.get_id());
|
let raw_path = format!("rooms/{}.db", room_id.get_id());
|
||||||
let path = Path::new(&raw_path);
|
let path = Path::new(&raw_path);
|
||||||
let db_manager = r2d2_sqlite::SqliteConnectionManager::file(path);
|
let db_manager = r2d2_sqlite::SqliteConnectionManager::file(path);
|
||||||
let pool = r2d2::Pool::new(db_manager).unwrap();
|
let pool = match r2d2::Pool::new(db_manager) {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(e) => {
|
||||||
|
error!("Unable to access {} database: {}", room_id.get_id(), e);
|
||||||
|
return Err(Error::DatabaseFailedInternally);
|
||||||
|
}
|
||||||
|
};
|
||||||
pools.insert(room_id.get_id().to_string(), pool);
|
pools.insert(room_id.get_id().to_string(), pool);
|
||||||
return pools[room_id.get_id()].clone();
|
return Ok(pools[room_id.get_id()].clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
error!("Failed to query main database for room {} existence", room_id.get_id());
|
||||||
|
return Err(Error::DatabaseFailedInternally);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn create_database_if_needed(room_id: &RoomId) {
|
pub fn create_database_if_needed(room_id: &RoomId) {
|
||||||
let pool = pool_by_room_id(room_id);
|
let pool = pool_by_room_id(room_id);
|
||||||
let conn = pool.get().unwrap();
|
let conn = pool.unwrap().get().unwrap();
|
||||||
create_room_tables_if_needed(&conn);
|
create_room_tables_if_needed(&conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -194,7 +212,10 @@ async fn prune_tokens() {
|
||||||
Err(_) => return,
|
Err(_) => return,
|
||||||
};
|
};
|
||||||
for room in rooms {
|
for room in rooms {
|
||||||
let pool = pool_by_room_id(&room);
|
let pool = match pool_by_room_id(&room) {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(_) => return
|
||||||
|
};
|
||||||
// It's not catastrophic if we fail to prune the database for a given room
|
// It's not catastrophic if we fail to prune the database for a given room
|
||||||
let conn = match pool.get() {
|
let conn = match pool.get() {
|
||||||
Ok(conn) => conn,
|
Ok(conn) => conn,
|
||||||
|
@ -217,7 +238,10 @@ async fn prune_pending_tokens() {
|
||||||
Err(_) => return,
|
Err(_) => return,
|
||||||
};
|
};
|
||||||
for room in rooms {
|
for room in rooms {
|
||||||
let pool = pool_by_room_id(&room);
|
let pool = match pool_by_room_id(&room) {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(_) => return
|
||||||
|
};
|
||||||
// It's not catastrophic if we fail to prune the database for a given room
|
// It's not catastrophic if we fail to prune the database for a given room
|
||||||
let conn = match pool.get() {
|
let conn = match pool.get() {
|
||||||
Ok(conn) => conn,
|
Ok(conn) => conn,
|
||||||
|
@ -330,8 +354,9 @@ pub async fn prune_files(file_expiration: i64) {
|
||||||
};
|
};
|
||||||
|
|
||||||
let futs = rooms.into_iter().map(|room| async move {
|
let futs = rooms.into_iter().map(|room| async move {
|
||||||
let pool = pool_by_room_id(&room);
|
if let Ok(pool) = pool_by_room_id(&room) {
|
||||||
prune_files_for_room(&pool, &room, file_expiration).await;
|
prune_files_for_room(&pool, &room, file_expiration).await;
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
futures::future::join_all(futs).await;
|
futures::future::join_all(futs).await;
|
||||||
|
@ -356,7 +381,7 @@ pub fn perform_migration() {
|
||||||
for room in rooms {
|
for room in rooms {
|
||||||
create_database_if_needed(&room);
|
create_database_if_needed(&room);
|
||||||
let pool = pool_by_room_id(&room);
|
let pool = pool_by_room_id(&room);
|
||||||
let mut conn = pool.get().unwrap();
|
let mut conn = pool.unwrap().get().unwrap();
|
||||||
migrations.to_latest(&mut conn).unwrap();
|
migrations.to_latest(&mut conn).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue